diff --git a/src/cfdb/services/fourdn.py b/src/cfdb/services/fourdn.py index 3246178..abc4e2f 100644 --- a/src/cfdb/services/fourdn.py +++ b/src/cfdb/services/fourdn.py @@ -80,6 +80,7 @@ import asyncio import logging import re +from collections.abc import Iterable from typing import Optional import aiohttp @@ -96,6 +97,12 @@ # Rate limit: max 10 requests/second → 100ms between requests _REQUEST_INTERVAL = 0.1 +# Accessions per Search-API request when fetching file metadata by accession. +# Kept well under the Fourfront 10,000-row result-window cap (and short +# enough to keep the request URL within limits) so each batch returns every +# requested file rather than a truncated deep-pagination window. +_FILE_METADATA_BATCH_SIZE = 100 + # 4DN accession pattern: 4DNF followed by alphanumeric characters _ACCESSION_RE = re.compile(r"4DNF[A-Z0-9]+") @@ -156,127 +163,138 @@ def parse_extra_files(extra_files_raw: list) -> list[dict]: return parsed_files -async def fetch_file_metadata_bulk() -> dict[str, dict]: +async def fetch_file_metadata_bulk(accessions: Iterable[str]) -> dict[str, dict]: """ - Fetch file metadata from the 4DN Search API for FileProcessed and FileFastq types. + Fetch file metadata from the 4DN Search API for the given accessions. + + Queries the Search API filtered by accession in bounded batches rather + than deep-paginating every 4DN file. The Fourfront/Elasticsearch Search + API caps every result window at 10,000 rows (a ``from``-paginated scan + silently stops there and reports ``total`` clamped to 10,000), so the + old full scan retrieved only the first ~10k of each file type and left + the tens of thousands of remaining files un-enriched. Filtering each + query by a batch of accessions keeps its result set far under the + window, so every requested file is fetched regardless of corpus size. + + A single ``type=File`` query covers both ``FileProcessed`` and + ``FileFastq`` (and any other file subtype). - Paginates through all results and returns a dict keyed by accession. + Args: + accessions: 4DN file accessions (e.g. ``4DNF...``) to fetch metadata + for — typically the accessions of the materialized 4DN files. Returns: {accession: {genome_assembly, file_type, file_type_detailed, condition, biosource_name, dataset, experiment_type, assay_info, - replicate_info}} + replicate_info, extra_files}} """ config = get_dcc_config("4dn") api_base = config["api_base"] results: dict[str, dict] = {} - file_types = ["FileProcessed", "FileFastq"] - + # Dedupe and order for deterministic batching. + unique = sorted({acc for acc in accessions if acc}) + if not unique: + return results + + field_params = ( + "&field=accession" + "&field=genome_assembly" + "&field=file_type" + "&field=file_type_detailed" + "&field=track_and_facet_info" + "&field=extra_files" + ) + + failed_batches = 0 async with aiohttp.ClientSession() as session: - for file_type in file_types: - offset = 0 - limit = 1000 - - while True: - url = ( - f"{api_base}/search/" - f"?type={file_type}" - f"&field=accession" - f"&field=genome_assembly" - f"&field=file_type" - f"&field=file_type_detailed" - f"&field=track_and_facet_info" - f"&field=extra_files" - f"&limit={limit}" - f"&from={offset}" - f"&format=json" - ) - - try: - async with session.get( - url, - headers={"Accept": "application/json", "User-Agent": "cfdb/1.0"}, - timeout=aiohttp.ClientTimeout(total=60), - ) as response: - if response.status != 200: - logger.error( - f"4DN Search API error for {file_type}: HTTP {response.status}" - ) - break - - data = await response.json() - - except aiohttp.ClientError as e: - logger.error(f"4DN Search API network error: {e}") - break - - await asyncio.sleep(_REQUEST_INTERVAL) - - graph = data.get("@graph", []) - if not graph: - break + for start in range(0, len(unique), _FILE_METADATA_BATCH_SIZE): + batch = unique[start : start + _FILE_METADATA_BATCH_SIZE] + acc_params = "".join(f"&accession={acc}" for acc in batch) + url = ( + f"{api_base}/search/" + f"?type=File" + f"{acc_params}" + f"{field_params}" + f"&limit={len(batch)}" + f"&format=json" + ) - for item in graph: - accession = item.get("accession") - if not accession: + try: + async with session.get( + url, + headers={"Accept": "application/json", "User-Agent": "cfdb/1.0"}, + timeout=aiohttp.ClientTimeout(total=60), + ) as response: + if response.status != 200: + # Skip only this batch — never silently truncate the + # whole fetch (the deep-pagination bug this replaces). + logger.warning( + "4DN Search API error for accession batch " + f"({len(batch)} accessions): HTTP {response.status}" + ) + failed_batches += 1 continue - entry: dict = {} - - # Direct fields - genome_assembly = item.get("genome_assembly") - if genome_assembly: - entry["genome_assembly"] = genome_assembly - - file_type_val = item.get("file_type") - if file_type_val: - entry["file_type"] = file_type_val - - file_type_detailed = item.get("file_type_detailed") - if file_type_detailed: - entry["file_type_detailed"] = file_type_detailed - - # Fields from track_and_facet_info - track_info = item.get("track_and_facet_info", {}) - if track_info: - for key in ( - "condition", - "biosource_name", - "dataset", - "experiment_type", - "assay_info", - "replicate_info", - ): - val = track_info.get(key) - if val: - entry[key] = val - - # Extra files (index files like .px2, .bai) - extra_files = parse_extra_files(item.get("extra_files", [])) - if extra_files: - entry["extra_files"] = extra_files - - if entry: - results[accession] = entry - - total = data.get("total", 0) - offset += limit - - if offset % 5000 == 0: - logger.info( - f"Fetched {min(offset, total)}/{total} {file_type} records from 4DN API" - ) + data = await response.json() - if offset >= total: - break + except aiohttp.ClientError as e: + logger.warning( + "4DN Search API network error for accession batch " + f"({len(batch)} accessions): {e}" + ) + failed_batches += 1 + continue - logger.info( - f"4DN API: fetched {file_type} metadata, " - f"{sum(1 for a, e in results.items() if e)} entries so far" - ) + await asyncio.sleep(_REQUEST_INTERVAL) - logger.info(f"4DN API: {len(results)} total file metadata entries fetched") + for item in data.get("@graph", []): + accession = item.get("accession") + if not accession: + continue + + entry: dict = {} + + # Direct fields + genome_assembly = item.get("genome_assembly") + if genome_assembly: + entry["genome_assembly"] = genome_assembly + + file_type_val = item.get("file_type") + if file_type_val: + entry["file_type"] = file_type_val + + file_type_detailed = item.get("file_type_detailed") + if file_type_detailed: + entry["file_type_detailed"] = file_type_detailed + + # Fields from track_and_facet_info + track_info = item.get("track_and_facet_info", {}) + if track_info: + for key in ( + "condition", + "biosource_name", + "dataset", + "experiment_type", + "assay_info", + "replicate_info", + ): + val = track_info.get(key) + if val: + entry[key] = val + + # Extra files (index files like .px2, .bai) + extra_files = parse_extra_files(item.get("extra_files", [])) + if extra_files: + entry["extra_files"] = extra_files + + if entry: + results[accession] = entry + + logger.info( + f"4DN API: fetched metadata for {len(results)}/{len(unique)} " + f"requested files ({failed_batches} batch(es) failed)" + ) return results diff --git a/src/cfdb/services/sync.py b/src/cfdb/services/sync.py index e2ad3bd..9a088b2 100644 --- a/src/cfdb/services/sync.py +++ b/src/cfdb/services/sync.py @@ -245,16 +245,9 @@ async def _enrich_4dn_api_metadata() -> None: if api.db is None: raise RuntimeError("Database not initialized") - # Fetch API data - file_metadata = await fetch_file_metadata_bulk() - biosource_tiers = await fetch_biosource_tiers() - - logger.info( - f"4DN enrichment: {len(file_metadata)} file entries, " - f"{len(biosource_tiers)} biosource tier entries" - ) - - # Build accession -> _id lookup from existing files (avoids $regex per update) + # Build accession -> _id lookup from existing files (avoids $regex per + # update) first, so enrichment metadata is fetched for exactly those + # accessions. accession_to_id: dict[str, object] = {} cursor = api.db.files.find( {"submission": "4dn"}, @@ -267,6 +260,17 @@ async def _enrich_4dn_api_metadata() -> None: logger.info(f"4DN enrichment: {len(accession_to_id)} files in DB mapped by accession") + # Fetch API data for exactly the accessions we hold. Batching the Search + # API query by accession bypasses its 10k result-window cap, which a + # blind deep-pagination scan would silently hit and truncate. + file_metadata = await fetch_file_metadata_bulk(accession_to_id.keys()) + biosource_tiers = await fetch_biosource_tiers() + + logger.info( + f"4DN enrichment: {len(file_metadata)} file entries, " + f"{len(biosource_tiers)} biosource tier entries" + ) + # Build bulk update operations matched by _id operations = [] for accession, meta in file_metadata.items(): diff --git a/tests/test_fourdn.py b/tests/test_fourdn.py index 275f791..7c4bf85 100644 --- a/tests/test_fourdn.py +++ b/tests/test_fourdn.py @@ -2,11 +2,17 @@ from __future__ import annotations +import asyncio +import math +import re + +import aiohttp import pytest -from hypothesis import given +from hypothesis import HealthCheck, given, settings from hypothesis import strategies as st from cfdb.models import NUMERIC_PROTOCOL_FIELDS, EnrichedFourdnCollection +from cfdb.services import fourdn from cfdb.services.fourdn import parse_experiment_metadata, parse_extra_files @@ -474,3 +480,826 @@ def test_pbt_001_each_numeric_field_stringified_in_output( # Assert assert result[field_name] == str(value) + + +class _FakeResponse: + """Async-context-manager stand-in for an aiohttp response.""" + + def __init__(self, status: int, payload: dict): + self.status = status + self._payload = payload + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + async def json(self): + return self._payload + + +class _EchoSession: + """Fake aiohttp session that records request URLs and echoes an ``@graph`` + for whatever accessions each URL asks for, drawing field values from + ``field_map``. ``failures`` maps a zero-based call index to an exception to + raise or a non-200 status to return, modelling a transient per-batch failure. + """ + + def __init__(self, field_map=None, failures=None): + self._field_map = field_map or {} + self._failures = failures or {} + self.get_urls: list[str] = [] + self._calls = 0 + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + def get(self, url, **kwargs): + index = self._calls + self._calls += 1 + self.get_urls.append(url) + failure = self._failures.get(index) + if isinstance(failure, Exception): + raise failure + status = failure if isinstance(failure, int) else 200 + accessions = _accession_params(url) + graph = ( + [{"accession": acc, **self._field_map.get(acc, {})} for acc in accessions] + if status == 200 + else [] + ) + return _FakeResponse(status, {"@graph": graph, "total": len(graph)}) + + +def _accession_params(url: str) -> list[str]: + """Return the accession filter values in a Search-API request URL.""" + return re.findall(r"[?&]accession=([^&]+)", url) + + +class _FixedGraphSession: + """Fake aiohttp session that returns a fixed ``@graph`` for every request, + independent of the requested accessions. Lets a test drive exact item shapes + (an item missing ``accession``, an item with no mappable fields, a raw + ``extra_files`` payload). ``failures`` maps a zero-based call index to an + exception to raise or a non-200 status to return. + """ + + def __init__(self, graph, failures=None): + self._graph = graph + self._failures = failures or {} + self.get_urls: list[str] = [] + self._calls = 0 + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + def get(self, url, **kwargs): + index = self._calls + self._calls += 1 + self.get_urls.append(url) + failure = self._failures.get(index) + if isinstance(failure, Exception): + raise failure + status = failure if isinstance(failure, int) else 200 + graph = self._graph if status == 200 else [] + return _FakeResponse(status, {"@graph": graph, "total": len(graph)}) + + +class TestFetchFileMetadataBulk: + """Tests for fetch_file_metadata_bulk.""" + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_batch_accessions_into_bounded_queries( + self, mocker + ): + """Test that accessions are queried in bounded batches. + + Given: + More accessions than the per-request batch size. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should issue one accession-filtered query per batch, each within + the batch size and without a deep-pagination ``from`` offset, together + covering every accession exactly once. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(5)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 3 # ceil(5 / 2) + assert all("from=" not in url for url in session.get_urls) + assert all(0 < len(_accession_params(url)) <= 2 for url in session.get_urls) + requested = [acc for url in session.get_urls for acc in _accession_params(url)] + assert sorted(requested) == accessions + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_aggregate_entries_across_batches( + self, mocker + ): + """Test that entries from every batch are aggregated. + + Given: + Accessions spanning multiple request batches, each with metadata + upstream. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should return an entry for every accession, not just the first + batch's. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + accessions = [f"4DNF{i:07d}" for i in range(5)] + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions} + session = _EchoSession(field_map=field_map) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert set(result) == set(accessions) + assert all(entry["genome_assembly"] == "GRCh38" for entry in result.values()) + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_map_genome_assembly_and_track_fields( + self, mocker + ): + """Test that direct and track_and_facet_info fields are mapped. + + Given: + A file whose Search-API item carries genome_assembly, file_type, and + track_and_facet_info fields. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should map both the direct fields and the track_and_facet_info + fields into the accession's entry. + """ + # Arrange + field_map = { + "4DNFIMTTOWBN": { + "genome_assembly": "GRCh38", + "file_type": "conservative peaks", + "track_and_facet_info": { + "condition": "untreated", + "biosource_name": "H1-hESC", + }, + } + } + session = _EchoSession(field_map=field_map) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + entry = result["4DNFIMTTOWBN"] + assert entry["genome_assembly"] == "GRCh38" + assert entry["file_type"] == "conservative peaks" + assert entry["condition"] == "untreated" + assert entry["biosource_name"] == "H1-hESC" + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "failure", + [500, aiohttp.ClientError("boom")], + ids=["http_500", "network_error"], + ) + async def test_fetch_file_metadata_bulk_should_continue_when_a_batch_fails( + self, mocker, failure + ): + """Test that a failed batch does not abort the whole fetch. + + Given: + Two batches of accessions where the first request fails with an + error status or a network error. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should still return the second batch's entries and drop only the + failed batch's accessions. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + accessions = [f"4DNF{i:07d}" for i in range(4)] + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions} + session = _EchoSession(field_map=field_map, failures={0: failure}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert accessions[0] not in result + assert accessions[1] not in result + assert accessions[2] in result + assert accessions[3] in result + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_return_empty_without_http_when_accessions_empty( + self, mocker + ): + """Test that an empty accession list issues no request. + + Given: + An empty iterable of accessions. + When: + fetch_file_metadata_bulk is awaited. + Then: + It should return an empty dict and issue no HTTP request. + """ + # Arrange + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk([]) + + # Assert + assert result == {} + assert session.get_urls == [] + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_return_empty_when_accessions_all_falsy( + self, mocker + ): + """Test that only-falsy accessions issue no request. + + Given: + An iterable whose accessions are all falsy (empty strings). + When: + fetch_file_metadata_bulk is awaited. + Then: + It should filter them out, return an empty dict, and issue no request. + """ + # Arrange + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["", ""]) + + # Assert + assert result == {} + assert session.get_urls == [] + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_deduplicate_repeated_accessions( + self, mocker + ): + """Test that duplicate accessions are requested only once. + + Given: + An input containing duplicate accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should request each distinct accession exactly once. + """ + # Arrange + session = _EchoSession(field_map={"4DNF0000001": {"genome_assembly": "GRCh38"}}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk( + ["4DNF0000001", "4DNF0000001", "4DNF0000000"] + ) + + # Assert + requested = [acc for url in session.get_urls for acc in _accession_params(url)] + assert sorted(requested) == ["4DNF0000000", "4DNF0000001"] + assert set(result) <= {"4DNF0000000", "4DNF0000001"} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_build_correct_query_url(self, mocker): + """Test the Search-API query URL for a batch. + + Given: + A single accession. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should issue one ``type=File`` accession-filtered query carrying the + field set, ``limit``, and ``format``, with no ``from`` offset or + per-subtype filter. + """ + # Arrange + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert len(session.get_urls) == 1 + url = session.get_urls[0] + assert url.startswith("https://data.4dnucleome.org/search/") + assert "type=File" in url + assert "type=FileProcessed" not in url + assert "type=FileFastq" not in url + assert _accession_params(url) == ["4DNFIMTTOWBN"] + assert "limit=1" in url + assert "format=json" in url + assert "from=" not in url + for field in ( + "accession", + "genome_assembly", + "file_type", + "file_type_detailed", + "track_and_facet_info", + "extra_files", + ): + assert f"field={field}" in url + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_set_limit_to_batch_length(self, mocker): + """Test that each query's limit matches its batch size. + + Given: + More accessions than a small batch size, leaving a partial final batch. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + Each query's limit should equal that batch's accession count. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(3)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 2 + assert "limit=2" in session.get_urls[0] + assert len(_accession_params(session.get_urls[0])) == 2 + assert "limit=1" in session.get_urls[1] + assert len(_accession_params(session.get_urls[1])) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_issue_single_batch_when_count_equals_batch_size( + self, mocker + ): + """Test the batch boundary at exactly the batch size. + + Given: + Exactly batch-size accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should issue exactly one query. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(3)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_issue_two_batches_when_count_is_batch_size_plus_one( + self, mocker + ): + """Test the batch boundary one past the batch size. + + Given: + One more than batch-size accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should issue two queries, the second carrying a single accession. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(4)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 2 + assert len(_accession_params(session.get_urls[1])) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_skip_graph_item_missing_accession( + self, mocker + ): + """Test that a graph item without an accession is skipped. + + Given: + A response graph containing an item with no accession alongside a valid + item. + When: + fetch_file_metadata_bulk requests the metadata. + Then: + It should skip the accession-less item and keep the valid one. + """ + # Arrange + graph = [ + {"genome_assembly": "GRCh38"}, + {"accession": "4DNFIMTTOWBN", "genome_assembly": "GRCh38"}, + ] + session = _FixedGraphSession(graph) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert set(result) == {"4DNFIMTTOWBN"} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_omit_accession_when_item_has_no_mappable_fields( + self, mocker + ): + """Test that an item with no mappable fields is dropped. + + Given: + A response item with an accession but no metadata fields (a FASTQ-style + item without a genome assembly). + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should omit that accession from the result. + """ + # Arrange + session = _FixedGraphSession([{"accession": "4DNFFASTQ01"}]) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFFASTQ01"]) + + # Assert + assert result == {} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_map_file_type_detailed_and_all_track_fields( + self, mocker + ): + """Test that file_type_detailed and every track field are mapped. + + Given: + An item carrying file_type_detailed and all six track_and_facet_info + sub-fields. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should lift file_type_detailed and every track sub-field into the + entry. + """ + # Arrange + track = { + "condition": "untreated", + "biosource_name": "H1-hESC", + "dataset": "ds1", + "experiment_type": "ChIP-seq", + "assay_info": "info", + "replicate_info": "Biorep 1 Techrep 1", + } + graph = [ + { + "accession": "4DNFIMTTOWBN", + "file_type_detailed": "conservative peaks (bigbed)", + "track_and_facet_info": track, + } + ] + session = _FixedGraphSession(graph) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + entry = result["4DNFIMTTOWBN"] + assert entry["file_type_detailed"] == "conservative peaks (bigbed)" + for key, value in track.items(): + assert entry[key] == value + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_skip_falsy_track_fields(self, mocker): + """Test that falsy track sub-fields are omitted. + + Given: + An item whose track_and_facet_info mixes truthy and falsy sub-fields. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should keep only the truthy sub-fields. + """ + # Arrange + graph = [ + { + "accession": "4DNFIMTTOWBN", + "track_and_facet_info": { + "condition": "untreated", + "biosource_name": "", + "dataset": None, + }, + } + ] + session = _FixedGraphSession(graph) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + entry = result["4DNFIMTTOWBN"] + assert entry["condition"] == "untreated" + assert "biosource_name" not in entry + assert "dataset" not in entry + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_pass_extra_files_through_parser( + self, mocker + ): + """Test that extra_files are normalized via parse_extra_files. + + Given: + An item whose extra_files list carries a raw sidecar entry. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + Its entry's extra_files should equal parse_extra_files of the raw list. + """ + # Arrange + raw_extra = [ + {"href": "/x.pairs.gz.px2", "file_format": "pairs_px2", "file_size": 5} + ] + session = _FixedGraphSession( + [{"accession": "4DNFIMTTOWBN", "extra_files": raw_extra}] + ) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert result["4DNFIMTTOWBN"]["extra_files"] == parse_extra_files(raw_extra) + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_omit_extra_files_when_none(self, mocker): + """Test that a missing extra_files list yields no extra_files key. + + Given: + An item with a mappable field but no extra_files. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + Its entry should have no extra_files key. + """ + # Arrange + session = _FixedGraphSession( + [{"accession": "4DNFIMTTOWBN", "genome_assembly": "GRCh38"}] + ) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert "extra_files" not in result["4DNFIMTTOWBN"] + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_omit_accession_absent_from_graph( + self, mocker + ): + """Test that an accession with no upstream item is omitted. + + Given: + A requested accession that the upstream graph does not return. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should still issue the request but omit that accession without error. + """ + # Arrange + session = _FixedGraphSession([]) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFMISSING1"]) + + # Assert + assert result == {} + assert len(session.get_urls) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_return_empty_when_all_batches_fail( + self, mocker + ): + """Test that all-failing batches return empty without raising. + + Given: + Two batches whose requests both fail (a status error and a network + error). + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should return an empty dict and not raise. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + accessions = [f"4DNF{i:07d}" for i in range(4)] + session = _EchoSession(failures={0: 500, 1: aiohttp.ClientError("boom")}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert result == {} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_continue_when_middle_batch_fails( + self, mocker + ): + """Test that a failed middle batch does not drop later batches. + + Given: + Three single-accession batches where the middle request fails. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should keep the first and third batches and drop only the middle. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 1) + accessions = [f"4DNF{i:07d}" for i in range(3)] + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions} + session = _EchoSession(field_map=field_map, failures={1: 500}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert accessions[0] in result + assert accessions[1] not in result + assert accessions[2] in result + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), max_size=20 + ) + ) + def test_pbt_001_request_union_equals_deduped_input(self, mocker, accessions): + """Test that every distinct accession is requested exactly once. + + Given: + Any list of 4DN accessions (possibly with duplicates). + When: + fetch_file_metadata_bulk requests their metadata. + Then: + The union of accessions across all issued queries should equal the + deduped, non-empty input. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + requested = {acc for url in session.get_urls for acc in _accession_params(url)} + assert requested == {acc for acc in accessions if acc} + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), min_size=1, max_size=20 + ) + ) + def test_pbt_002_batches_partition_input_and_are_size_bounded( + self, mocker, accessions + ): + """Test that batches partition the input within the size bound. + + Given: + Any non-empty list of 4DN accessions and a fixed batch size. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + Each accession should appear in exactly one query, every query should + hold between one and the batch size, and the query count should be the + ceiling of the deduped count over the batch size. + """ + # Arrange + batch_size = 3 + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", batch_size) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + per_url = [_accession_params(url) for url in session.get_urls] + flat = [acc for params in per_url for acc in params] + deduped = {acc for acc in accessions if acc} + assert len(flat) == len(set(flat)) + assert set(flat) == deduped + assert all(1 <= len(params) <= batch_size for params in per_url) + assert len(session.get_urls) == math.ceil(len(deduped) / batch_size) + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), max_size=20 + ) + ) + def test_pbt_003_result_keys_subset_of_deduped_input(self, mocker, accessions): + """Test that result keys never exceed the requested accessions. + + Given: + Any list of 4DN accessions, each with upstream metadata. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + The result keys should be a subset of the deduped, non-empty input. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions if acc} + session = _EchoSession(field_map=field_map) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + assert set(result) <= {acc for acc in accessions if acc} + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), min_size=1, max_size=20 + ) + ) + def test_pbt_004_no_url_contains_from_offset(self, mocker, accessions): + """Test that deep-pagination offsets are never emitted. + + Given: + Any non-empty list of 4DN accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + No issued query should contain a ``from`` offset. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + assert all("from=" not in url for url in session.get_urls)