Skip to content
Open
11 changes: 11 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@ jobs:
python-version: ["3.12", "3.13"]
os: [ubuntu-latest, windows-latest, macos-latest]
fail-fast: false

steps:
- name: Check out repository code
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}
- name: Setup PostgreSQL Binaries
# GitHub action to set up postgreSQL for all 3 platforms
uses: ikalnytskyi/action-setup-postgres@v8
with:
username: test_user
password: postgres
database: test
port: 5432
postgres-version: '14'
id: postgres
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
2 changes: 2 additions & 0 deletions echopop/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .biological import (
apply_composite_key,
generate_composite_key,
load_biodata_db_views,
load_biodata_views,
load_biological_data,
)
Expand All @@ -31,6 +32,7 @@
"load_biological_data",
"load_isobath_data",
"load_biodata_views",
"load_biodata_db_views",
"load_mesh_data",
"load_kriging_variogram_params",
"join_geostrata_by_latitude",
Expand Down
118 changes: 118 additions & 0 deletions echopop/ingest/biological.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import numpy as np
import pandas as pd
from sqlalchemy import create_engine

from ..utils.base import add_haul_uids

Expand Down Expand Up @@ -102,6 +103,123 @@ def load_single_biological_view(
return df_filtered


def load_biodata_db_views(
db_credentials: dict[str, str],
biodata_table_map: dict[str, str],
column_name_map: dict[str, str] = None,
subset_dict: dict | None = None,
biodata_label_map: dict[str, dict] | None = None,
haul_uid_config: dict[str, Any] = None,
) -> dict[str, pd.DataFrame] | None:
"""
Load biological data from a postgres database.

Parameters
----------
db_credentials : dict
Dictionary containing database credentials
(e.g., {"host": "localhost", "port": "5432", "dbname": "fisheries", "schema": "biodata"
"user": "<USERNAME>", "password": "<PASSWORD>"})
biodata_table_map : dict
Dictionary mapping dataset names to database table names
(e.g., {"specimen": "biodata_specimen", "length": "biodata_length",
"catch": "biodata_catch"})
column_name_map : dict, optional
Dictionary mapping original column names to new column names
(e.g., {"frequency": "length_count", "haul": "haul_num"})
subset_dict : dict, optional
Subset dictionary containing ships and species_code for filtering
Format: {"ships": {ship_id: {"survey": survey_id, "haul_offset": offset}}, "species_code":
[codes]}
biodata_label_map : dict, optional
Dictionary mapping column names to value replacement dictionaries
(e.g., {"sex": {1: "male", 2: "female", 3: "unsexed"}})
haul_uid_config : Dict[str, Any]
Optional keyword arguments to override defaults or DataFrame values:

- ship_id (dict): Region-specific IDs, e.g., ``{'US': 10, 'CAN': 20}``.

- survey_id (dict): Region-specific IDs, e.g., ``{'US': 1, 'CAN': 2}``.

- species_id (int/str): A global species code override.

- haul_offset (int/float): A value subtracted from ``'haul_num'`` for records identified as
'CAN' (where ``haul_num - offset >= 0``).

Returns
-------
dict
Dictionary containing processed biological DataFrames keyed by dataset name

Examples
--------
>>> subset = {"ships": {160: {"survey": 201906}}, "species_code": [22500]}
>>> col_map = {"frequency": "length_count", "haul": "haul_num"}
>>> label_map = {"sex": {1: "male", 2: "female", 3: "unsexed"}}
"""
try:
db_url = (
f"postgresql+psycopg://"
f"{db_credentials['user']}:{db_credentials['password']}@"
f"{db_credentials['host']}:{db_credentials['port']}/"
f"{db_credentials['dbname']}"
)

engine = create_engine(db_url)

biodata_dict = {}

with engine.connect() as connection:
for data_set, table in biodata_table_map.items():
query = f"SELECT * FROM {db_credentials['schema']}.{table};"

df_initial = pd.read_sql_query(query, connection)

# Force the column names to be lower case
df_initial.columns = df_initial.columns.str.lower()

# Rename the columns
if column_name_map:
df_initial.rename(columns=column_name_map, inplace=True)

# # Validate data types for ship and survey before filtering
df_initial["ship"] = pd.to_numeric(df_initial["ship"])
df_initial["survey"] = pd.to_numeric(df_initial["survey"])

biodata_dict[data_set] = apply_ship_survey_filters(df_initial, subset_dict)

# Apply label mappings if provided
if biodata_label_map:
for col, mapping in biodata_label_map.items():
for _name, df in biodata_dict.items():
if isinstance(df, pd.DataFrame) and col in df.columns:
df[col] = df[col].map(mapping).fillna(df[col])

# # Validate data types
biodata_dict["specimen"]["length"] = pd.to_numeric(biodata_dict["specimen"]["length"])
biodata_dict["specimen"]["weight"] = pd.to_numeric(biodata_dict["specimen"]["weight"])

# Reformat haul datatype
biodata_dict = {
k: v.assign(haul_num=v["haul_num"].astype(float)) for k, v in biodata_dict.items()
}

# Add UID labels
_ = {
k: add_haul_uids(v, _dataset_type=f"biodata.{k}", **(haul_uid_config or {}))
for k, v in biodata_dict.items()
}

return biodata_dict

except Exception as e:
print(f"Database error: {e}")

finally:
if "engine" in locals():
engine.dispose()


def load_biodata_views(
biodata_filepaths: dict[str, Path],
column_name_map: dict[str, str] = None,
Expand Down
112 changes: 112 additions & 0 deletions echopop/tests/fixtures/fixtures_biodata_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,92 @@
import os
from pathlib import Path

import pandas as pd
import pytest
from sqlalchemy import create_engine, text

HERE = Path(__file__).parent.absolute()
TEST_DATA_ROOT = HERE.parent / "test_data"
TEST_SQL_FILE = TEST_DATA_ROOT / "ingest" / "test_bio_data.sql"


@pytest.fixture(scope="session")
def postgres_container():
"""
Session-scoped fixture to get database connection.

- In GitHub Actions: Uses the postgres service from the workflow
- Locally: Uses Testcontainers if Docker is available, skips if not
"""
is_github_action = os.environ.get("GITHUB_ACTIONS")

if is_github_action:
# In GitHub Actions use the postgres service from workflow
yield type(
"obj",
(object,),
{
"get_connection_url": lambda self: "postgresql+psycopg://test_user:postgres@localhost:5432/test",
"get_container_host_ip": lambda self: "localhost",
"get_exposed_port": lambda self, port: 5432,
},
)()
else:
# Local development
try:
from testcontainers.postgres import PostgresContainer

container = PostgresContainer(
image="postgres:16", username="test_user", password="postgres", dbname="test"
)
container.start()
yield container
container.stop()
except Exception as e:
# Docker not available - skip integration tests
pytest.skip(f"Docker must be running for Testcontainers: {e}")


@pytest.fixture(scope="session")
def database_credentials(postgres_container):
"""
Session-scoped fixture to:
1. Connect to the PostgreSQL database (CI or local).
2. Load 'test_bio_data.sql' into it.
3. Yield the credentials dictionary in the format expected by load_biodata_db_views.

Returns dict with keys: host, port, dbname, user, password, schema
"""

host = postgres_container.get_container_host_ip()
port = postgres_container.get_exposed_port(5432)

creds = {
"host": host,
"port": port,
"dbname": "test",
"user": "test_user",
"password": "postgres",
"schema": "public",
}

db_url = (
f"postgresql+psycopg://"
f"{creds['user']}:{creds['password']}@"
f"{creds['host']}:{creds['port']}/"
f"{creds['dbname']}"
)

try:
engine = create_engine(db_url)
with engine.begin() as connection:
with open(TEST_SQL_FILE) as f:
sql_script = f.read()
connection.execute(text(sql_script))
except Exception as e:
pytest.fail(f"Failed to load {TEST_SQL_FILE}: {e}")

yield creds


@pytest.fixture
Expand Down Expand Up @@ -77,6 +164,31 @@ def subset_dict():
}


@pytest.fixture
def pg_subset_dict():
"""Create subset dictionary for filtering biological data."""
return {
"ships": {101: {"survey": 2024}},
"species_code": [22500],
}


@pytest.fixture
def bio_data_table_map():
"""Create table mapping for biological data in the database."""
return {"catch": "echopop_catch", "specimen": "echopop_fish"}


@pytest.fixture
def column_name_map():
"""Create column mapping for biological data loaded from the database."""
return {
"haul": "haul_num",
"weight_in_haul": "haul_weight",
"species_id": "species_code",
}


@pytest.fixture
def label_map():
"""Create label mapping dictionary for biological data."""
Expand Down
57 changes: 56 additions & 1 deletion echopop/tests/ingest/test_biodata_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import pandas as pd
import pytest

from echopop.ingest.biological import apply_ship_survey_filters, load_biological_data
from echopop.ingest.biological import (
apply_ship_survey_filters,
load_biodata_db_views,
load_biological_data,
)


def test_load_biological_data_basic(bio_excel_file, bio_sheet_map):
Expand Down Expand Up @@ -94,3 +98,54 @@ def test_apply_ship_survey_filters_no_subset(biological_data):

assert result is not df # Not the same object
pd.testing.assert_frame_equal(result, df) # But same content


# Ingest from database tests
def test_load_biological_data_basic_from_db(database_credentials, bio_data_table_map):
"""Test basic loading of biological data without optional parameters."""
result = load_biodata_db_views(database_credentials, bio_data_table_map)

assert isinstance(result, dict)

for df in result.values():
assert isinstance(df, pd.DataFrame)
assert not df.empty


def test_load_biological_data_with_column_map_from_db(
database_credentials, bio_data_table_map, column_name_map
):
"""Test loading with column name mapping."""
result = load_biodata_db_views(
database_credentials, bio_data_table_map, column_name_map=column_name_map
)

if "catch" in result:
assert "haul_weight" in result["catch"].columns
assert result["catch"].loc[3, "haul_weight"] == 250.0
assert "haul_num" in result["catch"].columns
assert "weight_in_haul" not in result["catch"].columns

if "specimen" in result:
assert "species_code" in result["specimen"].columns
assert result["specimen"].loc[2, "species_code"] == 22500
assert "haul_num" in result["catch"].columns


def test_load_biological_data_with_subset_from_db(
database_credentials, bio_data_table_map, pg_subset_dict, column_name_map
):
"""Test loading with subset filtering."""
result = load_biodata_db_views(
database_credentials,
bio_data_table_map,
subset_dict=pg_subset_dict,
column_name_map=column_name_map,
)

for df in result.values():
if "species_code" in df.columns:
assert (df["species_code"] == 22500).all()

if "ship" in df.columns:
assert (df["ship"] == 101).all()
Loading
Loading