From 17d3fb92e2b49d7a7821bef85362a4108135afa6 Mon Sep 17 00:00:00 2001 From: Conrad Date: Thu, 2 Jul 2026 10:14:32 -0400 Subject: [PATCH 1/2] fix: Fetch 4DN file enrichment by accession batches The 4DN Search API (Fourfront/Elasticsearch) caps every result window at 10,000 rows, so fetch_file_metadata_bulk's from-based deep pagination of FileProcessed and FileFastq retrieved at most 10k per type and silently stopped there. With tens of thousands of 4DN files, most never had their metadata fetched, so their genome_assembly and output_type were never promoted to the top-level file fields the API exposes as genomeAssembly. Downstream visualizations that key off the assembly rendered empty. Take the accessions to enrich as an argument and query the Search API filtered by accession in bounded batches (a single type=File query per batch, one accession filter each). Every batch's result set stays far under the 10k window, so every requested file is fetched regardless of corpus size. A per-batch non-200 or network error is logged and skipped rather than aborting or silently truncating the whole fetch. The 4DN enrichment step now builds the accession-to-id map from the materialized files first and passes those accessions to the fetch, so enrichment is scoped to files actually held rather than a blind scan. --- src/cfdb/services/fourdn.py | 224 +++++++++++++++++++----------------- src/cfdb/services/sync.py | 24 ++-- 2 files changed, 135 insertions(+), 113 deletions(-) diff --git a/src/cfdb/services/fourdn.py b/src/cfdb/services/fourdn.py index 3246178..abc4e2f 100644 --- a/src/cfdb/services/fourdn.py +++ b/src/cfdb/services/fourdn.py @@ -80,6 +80,7 @@ import asyncio import logging import re +from collections.abc import Iterable from typing import Optional import aiohttp @@ -96,6 +97,12 @@ # Rate limit: max 10 requests/second → 100ms between requests _REQUEST_INTERVAL = 0.1 +# Accessions per Search-API request when fetching file metadata by accession. +# Kept well under the Fourfront 10,000-row result-window cap (and short +# enough to keep the request URL within limits) so each batch returns every +# requested file rather than a truncated deep-pagination window. +_FILE_METADATA_BATCH_SIZE = 100 + # 4DN accession pattern: 4DNF followed by alphanumeric characters _ACCESSION_RE = re.compile(r"4DNF[A-Z0-9]+") @@ -156,127 +163,138 @@ def parse_extra_files(extra_files_raw: list) -> list[dict]: return parsed_files -async def fetch_file_metadata_bulk() -> dict[str, dict]: +async def fetch_file_metadata_bulk(accessions: Iterable[str]) -> dict[str, dict]: """ - Fetch file metadata from the 4DN Search API for FileProcessed and FileFastq types. + Fetch file metadata from the 4DN Search API for the given accessions. + + Queries the Search API filtered by accession in bounded batches rather + than deep-paginating every 4DN file. The Fourfront/Elasticsearch Search + API caps every result window at 10,000 rows (a ``from``-paginated scan + silently stops there and reports ``total`` clamped to 10,000), so the + old full scan retrieved only the first ~10k of each file type and left + the tens of thousands of remaining files un-enriched. Filtering each + query by a batch of accessions keeps its result set far under the + window, so every requested file is fetched regardless of corpus size. + + A single ``type=File`` query covers both ``FileProcessed`` and + ``FileFastq`` (and any other file subtype). - Paginates through all results and returns a dict keyed by accession. + Args: + accessions: 4DN file accessions (e.g. ``4DNF...``) to fetch metadata + for — typically the accessions of the materialized 4DN files. Returns: {accession: {genome_assembly, file_type, file_type_detailed, condition, biosource_name, dataset, experiment_type, assay_info, - replicate_info}} + replicate_info, extra_files}} """ config = get_dcc_config("4dn") api_base = config["api_base"] results: dict[str, dict] = {} - file_types = ["FileProcessed", "FileFastq"] - + # Dedupe and order for deterministic batching. + unique = sorted({acc for acc in accessions if acc}) + if not unique: + return results + + field_params = ( + "&field=accession" + "&field=genome_assembly" + "&field=file_type" + "&field=file_type_detailed" + "&field=track_and_facet_info" + "&field=extra_files" + ) + + failed_batches = 0 async with aiohttp.ClientSession() as session: - for file_type in file_types: - offset = 0 - limit = 1000 - - while True: - url = ( - f"{api_base}/search/" - f"?type={file_type}" - f"&field=accession" - f"&field=genome_assembly" - f"&field=file_type" - f"&field=file_type_detailed" - f"&field=track_and_facet_info" - f"&field=extra_files" - f"&limit={limit}" - f"&from={offset}" - f"&format=json" - ) - - try: - async with session.get( - url, - headers={"Accept": "application/json", "User-Agent": "cfdb/1.0"}, - timeout=aiohttp.ClientTimeout(total=60), - ) as response: - if response.status != 200: - logger.error( - f"4DN Search API error for {file_type}: HTTP {response.status}" - ) - break - - data = await response.json() - - except aiohttp.ClientError as e: - logger.error(f"4DN Search API network error: {e}") - break - - await asyncio.sleep(_REQUEST_INTERVAL) - - graph = data.get("@graph", []) - if not graph: - break + for start in range(0, len(unique), _FILE_METADATA_BATCH_SIZE): + batch = unique[start : start + _FILE_METADATA_BATCH_SIZE] + acc_params = "".join(f"&accession={acc}" for acc in batch) + url = ( + f"{api_base}/search/" + f"?type=File" + f"{acc_params}" + f"{field_params}" + f"&limit={len(batch)}" + f"&format=json" + ) - for item in graph: - accession = item.get("accession") - if not accession: + try: + async with session.get( + url, + headers={"Accept": "application/json", "User-Agent": "cfdb/1.0"}, + timeout=aiohttp.ClientTimeout(total=60), + ) as response: + if response.status != 200: + # Skip only this batch — never silently truncate the + # whole fetch (the deep-pagination bug this replaces). + logger.warning( + "4DN Search API error for accession batch " + f"({len(batch)} accessions): HTTP {response.status}" + ) + failed_batches += 1 continue - entry: dict = {} - - # Direct fields - genome_assembly = item.get("genome_assembly") - if genome_assembly: - entry["genome_assembly"] = genome_assembly - - file_type_val = item.get("file_type") - if file_type_val: - entry["file_type"] = file_type_val - - file_type_detailed = item.get("file_type_detailed") - if file_type_detailed: - entry["file_type_detailed"] = file_type_detailed - - # Fields from track_and_facet_info - track_info = item.get("track_and_facet_info", {}) - if track_info: - for key in ( - "condition", - "biosource_name", - "dataset", - "experiment_type", - "assay_info", - "replicate_info", - ): - val = track_info.get(key) - if val: - entry[key] = val - - # Extra files (index files like .px2, .bai) - extra_files = parse_extra_files(item.get("extra_files", [])) - if extra_files: - entry["extra_files"] = extra_files - - if entry: - results[accession] = entry - - total = data.get("total", 0) - offset += limit - - if offset % 5000 == 0: - logger.info( - f"Fetched {min(offset, total)}/{total} {file_type} records from 4DN API" - ) + data = await response.json() - if offset >= total: - break + except aiohttp.ClientError as e: + logger.warning( + "4DN Search API network error for accession batch " + f"({len(batch)} accessions): {e}" + ) + failed_batches += 1 + continue - logger.info( - f"4DN API: fetched {file_type} metadata, " - f"{sum(1 for a, e in results.items() if e)} entries so far" - ) + await asyncio.sleep(_REQUEST_INTERVAL) - logger.info(f"4DN API: {len(results)} total file metadata entries fetched") + for item in data.get("@graph", []): + accession = item.get("accession") + if not accession: + continue + + entry: dict = {} + + # Direct fields + genome_assembly = item.get("genome_assembly") + if genome_assembly: + entry["genome_assembly"] = genome_assembly + + file_type_val = item.get("file_type") + if file_type_val: + entry["file_type"] = file_type_val + + file_type_detailed = item.get("file_type_detailed") + if file_type_detailed: + entry["file_type_detailed"] = file_type_detailed + + # Fields from track_and_facet_info + track_info = item.get("track_and_facet_info", {}) + if track_info: + for key in ( + "condition", + "biosource_name", + "dataset", + "experiment_type", + "assay_info", + "replicate_info", + ): + val = track_info.get(key) + if val: + entry[key] = val + + # Extra files (index files like .px2, .bai) + extra_files = parse_extra_files(item.get("extra_files", [])) + if extra_files: + entry["extra_files"] = extra_files + + if entry: + results[accession] = entry + + logger.info( + f"4DN API: fetched metadata for {len(results)}/{len(unique)} " + f"requested files ({failed_batches} batch(es) failed)" + ) return results diff --git a/src/cfdb/services/sync.py b/src/cfdb/services/sync.py index e2ad3bd..9a088b2 100644 --- a/src/cfdb/services/sync.py +++ b/src/cfdb/services/sync.py @@ -245,16 +245,9 @@ async def _enrich_4dn_api_metadata() -> None: if api.db is None: raise RuntimeError("Database not initialized") - # Fetch API data - file_metadata = await fetch_file_metadata_bulk() - biosource_tiers = await fetch_biosource_tiers() - - logger.info( - f"4DN enrichment: {len(file_metadata)} file entries, " - f"{len(biosource_tiers)} biosource tier entries" - ) - - # Build accession -> _id lookup from existing files (avoids $regex per update) + # Build accession -> _id lookup from existing files (avoids $regex per + # update) first, so enrichment metadata is fetched for exactly those + # accessions. accession_to_id: dict[str, object] = {} cursor = api.db.files.find( {"submission": "4dn"}, @@ -267,6 +260,17 @@ async def _enrich_4dn_api_metadata() -> None: logger.info(f"4DN enrichment: {len(accession_to_id)} files in DB mapped by accession") + # Fetch API data for exactly the accessions we hold. Batching the Search + # API query by accession bypasses its 10k result-window cap, which a + # blind deep-pagination scan would silently hit and truncate. + file_metadata = await fetch_file_metadata_bulk(accession_to_id.keys()) + biosource_tiers = await fetch_biosource_tiers() + + logger.info( + f"4DN enrichment: {len(file_metadata)} file entries, " + f"{len(biosource_tiers)} biosource tier entries" + ) + # Build bulk update operations matched by _id operations = [] for accession, meta in file_metadata.items(): From 612fb1c900d27442a0f834ecb71c9ffcf6caee3c Mon Sep 17 00:00:00 2001 From: Conrad Date: Thu, 2 Jul 2026 10:14:40 -0400 Subject: [PATCH 2/2] test: Cover accession-batched 4DN metadata fetch Add TestFetchFileMetadataBulk coverage for the batched fetch: empty and all-falsy input short-circuit with no request, accession dedup, exact query-URL shape (type=File, per-batch limit, format, field set, no from offset), batch boundaries at exactly the batch size and one over, graph parsing (missing accession skipped, empty entry dropped, all direct and track fields mapped, falsy track fields skipped, extra_files parsed or omitted, accession absent from the graph), and failure isolation across all-fail and middle-batch-fail runs. Add property-based tests asserting the requested union equals the deduped input, batches partition the input within the size bound, result keys are a subset of the input, and no query ever emits a from offset. --- tests/test_fourdn.py | 831 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 830 insertions(+), 1 deletion(-) diff --git a/tests/test_fourdn.py b/tests/test_fourdn.py index 275f791..7c4bf85 100644 --- a/tests/test_fourdn.py +++ b/tests/test_fourdn.py @@ -2,11 +2,17 @@ from __future__ import annotations +import asyncio +import math +import re + +import aiohttp import pytest -from hypothesis import given +from hypothesis import HealthCheck, given, settings from hypothesis import strategies as st from cfdb.models import NUMERIC_PROTOCOL_FIELDS, EnrichedFourdnCollection +from cfdb.services import fourdn from cfdb.services.fourdn import parse_experiment_metadata, parse_extra_files @@ -474,3 +480,826 @@ def test_pbt_001_each_numeric_field_stringified_in_output( # Assert assert result[field_name] == str(value) + + +class _FakeResponse: + """Async-context-manager stand-in for an aiohttp response.""" + + def __init__(self, status: int, payload: dict): + self.status = status + self._payload = payload + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + async def json(self): + return self._payload + + +class _EchoSession: + """Fake aiohttp session that records request URLs and echoes an ``@graph`` + for whatever accessions each URL asks for, drawing field values from + ``field_map``. ``failures`` maps a zero-based call index to an exception to + raise or a non-200 status to return, modelling a transient per-batch failure. + """ + + def __init__(self, field_map=None, failures=None): + self._field_map = field_map or {} + self._failures = failures or {} + self.get_urls: list[str] = [] + self._calls = 0 + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + def get(self, url, **kwargs): + index = self._calls + self._calls += 1 + self.get_urls.append(url) + failure = self._failures.get(index) + if isinstance(failure, Exception): + raise failure + status = failure if isinstance(failure, int) else 200 + accessions = _accession_params(url) + graph = ( + [{"accession": acc, **self._field_map.get(acc, {})} for acc in accessions] + if status == 200 + else [] + ) + return _FakeResponse(status, {"@graph": graph, "total": len(graph)}) + + +def _accession_params(url: str) -> list[str]: + """Return the accession filter values in a Search-API request URL.""" + return re.findall(r"[?&]accession=([^&]+)", url) + + +class _FixedGraphSession: + """Fake aiohttp session that returns a fixed ``@graph`` for every request, + independent of the requested accessions. Lets a test drive exact item shapes + (an item missing ``accession``, an item with no mappable fields, a raw + ``extra_files`` payload). ``failures`` maps a zero-based call index to an + exception to raise or a non-200 status to return. + """ + + def __init__(self, graph, failures=None): + self._graph = graph + self._failures = failures or {} + self.get_urls: list[str] = [] + self._calls = 0 + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + def get(self, url, **kwargs): + index = self._calls + self._calls += 1 + self.get_urls.append(url) + failure = self._failures.get(index) + if isinstance(failure, Exception): + raise failure + status = failure if isinstance(failure, int) else 200 + graph = self._graph if status == 200 else [] + return _FakeResponse(status, {"@graph": graph, "total": len(graph)}) + + +class TestFetchFileMetadataBulk: + """Tests for fetch_file_metadata_bulk.""" + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_batch_accessions_into_bounded_queries( + self, mocker + ): + """Test that accessions are queried in bounded batches. + + Given: + More accessions than the per-request batch size. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should issue one accession-filtered query per batch, each within + the batch size and without a deep-pagination ``from`` offset, together + covering every accession exactly once. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(5)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 3 # ceil(5 / 2) + assert all("from=" not in url for url in session.get_urls) + assert all(0 < len(_accession_params(url)) <= 2 for url in session.get_urls) + requested = [acc for url in session.get_urls for acc in _accession_params(url)] + assert sorted(requested) == accessions + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_aggregate_entries_across_batches( + self, mocker + ): + """Test that entries from every batch are aggregated. + + Given: + Accessions spanning multiple request batches, each with metadata + upstream. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should return an entry for every accession, not just the first + batch's. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + accessions = [f"4DNF{i:07d}" for i in range(5)] + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions} + session = _EchoSession(field_map=field_map) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert set(result) == set(accessions) + assert all(entry["genome_assembly"] == "GRCh38" for entry in result.values()) + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_map_genome_assembly_and_track_fields( + self, mocker + ): + """Test that direct and track_and_facet_info fields are mapped. + + Given: + A file whose Search-API item carries genome_assembly, file_type, and + track_and_facet_info fields. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should map both the direct fields and the track_and_facet_info + fields into the accession's entry. + """ + # Arrange + field_map = { + "4DNFIMTTOWBN": { + "genome_assembly": "GRCh38", + "file_type": "conservative peaks", + "track_and_facet_info": { + "condition": "untreated", + "biosource_name": "H1-hESC", + }, + } + } + session = _EchoSession(field_map=field_map) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + entry = result["4DNFIMTTOWBN"] + assert entry["genome_assembly"] == "GRCh38" + assert entry["file_type"] == "conservative peaks" + assert entry["condition"] == "untreated" + assert entry["biosource_name"] == "H1-hESC" + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "failure", + [500, aiohttp.ClientError("boom")], + ids=["http_500", "network_error"], + ) + async def test_fetch_file_metadata_bulk_should_continue_when_a_batch_fails( + self, mocker, failure + ): + """Test that a failed batch does not abort the whole fetch. + + Given: + Two batches of accessions where the first request fails with an + error status or a network error. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should still return the second batch's entries and drop only the + failed batch's accessions. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + accessions = [f"4DNF{i:07d}" for i in range(4)] + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions} + session = _EchoSession(field_map=field_map, failures={0: failure}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert accessions[0] not in result + assert accessions[1] not in result + assert accessions[2] in result + assert accessions[3] in result + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_return_empty_without_http_when_accessions_empty( + self, mocker + ): + """Test that an empty accession list issues no request. + + Given: + An empty iterable of accessions. + When: + fetch_file_metadata_bulk is awaited. + Then: + It should return an empty dict and issue no HTTP request. + """ + # Arrange + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk([]) + + # Assert + assert result == {} + assert session.get_urls == [] + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_return_empty_when_accessions_all_falsy( + self, mocker + ): + """Test that only-falsy accessions issue no request. + + Given: + An iterable whose accessions are all falsy (empty strings). + When: + fetch_file_metadata_bulk is awaited. + Then: + It should filter them out, return an empty dict, and issue no request. + """ + # Arrange + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["", ""]) + + # Assert + assert result == {} + assert session.get_urls == [] + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_deduplicate_repeated_accessions( + self, mocker + ): + """Test that duplicate accessions are requested only once. + + Given: + An input containing duplicate accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should request each distinct accession exactly once. + """ + # Arrange + session = _EchoSession(field_map={"4DNF0000001": {"genome_assembly": "GRCh38"}}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk( + ["4DNF0000001", "4DNF0000001", "4DNF0000000"] + ) + + # Assert + requested = [acc for url in session.get_urls for acc in _accession_params(url)] + assert sorted(requested) == ["4DNF0000000", "4DNF0000001"] + assert set(result) <= {"4DNF0000000", "4DNF0000001"} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_build_correct_query_url(self, mocker): + """Test the Search-API query URL for a batch. + + Given: + A single accession. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should issue one ``type=File`` accession-filtered query carrying the + field set, ``limit``, and ``format``, with no ``from`` offset or + per-subtype filter. + """ + # Arrange + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert len(session.get_urls) == 1 + url = session.get_urls[0] + assert url.startswith("https://data.4dnucleome.org/search/") + assert "type=File" in url + assert "type=FileProcessed" not in url + assert "type=FileFastq" not in url + assert _accession_params(url) == ["4DNFIMTTOWBN"] + assert "limit=1" in url + assert "format=json" in url + assert "from=" not in url + for field in ( + "accession", + "genome_assembly", + "file_type", + "file_type_detailed", + "track_and_facet_info", + "extra_files", + ): + assert f"field={field}" in url + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_set_limit_to_batch_length(self, mocker): + """Test that each query's limit matches its batch size. + + Given: + More accessions than a small batch size, leaving a partial final batch. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + Each query's limit should equal that batch's accession count. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(3)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 2 + assert "limit=2" in session.get_urls[0] + assert len(_accession_params(session.get_urls[0])) == 2 + assert "limit=1" in session.get_urls[1] + assert len(_accession_params(session.get_urls[1])) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_issue_single_batch_when_count_equals_batch_size( + self, mocker + ): + """Test the batch boundary at exactly the batch size. + + Given: + Exactly batch-size accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should issue exactly one query. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(3)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_issue_two_batches_when_count_is_batch_size_plus_one( + self, mocker + ): + """Test the batch boundary one past the batch size. + + Given: + One more than batch-size accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should issue two queries, the second carrying a single accession. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + accessions = [f"4DNF{i:07d}" for i in range(4)] + + # Act + await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert len(session.get_urls) == 2 + assert len(_accession_params(session.get_urls[1])) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_skip_graph_item_missing_accession( + self, mocker + ): + """Test that a graph item without an accession is skipped. + + Given: + A response graph containing an item with no accession alongside a valid + item. + When: + fetch_file_metadata_bulk requests the metadata. + Then: + It should skip the accession-less item and keep the valid one. + """ + # Arrange + graph = [ + {"genome_assembly": "GRCh38"}, + {"accession": "4DNFIMTTOWBN", "genome_assembly": "GRCh38"}, + ] + session = _FixedGraphSession(graph) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert set(result) == {"4DNFIMTTOWBN"} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_omit_accession_when_item_has_no_mappable_fields( + self, mocker + ): + """Test that an item with no mappable fields is dropped. + + Given: + A response item with an accession but no metadata fields (a FASTQ-style + item without a genome assembly). + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should omit that accession from the result. + """ + # Arrange + session = _FixedGraphSession([{"accession": "4DNFFASTQ01"}]) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFFASTQ01"]) + + # Assert + assert result == {} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_map_file_type_detailed_and_all_track_fields( + self, mocker + ): + """Test that file_type_detailed and every track field are mapped. + + Given: + An item carrying file_type_detailed and all six track_and_facet_info + sub-fields. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should lift file_type_detailed and every track sub-field into the + entry. + """ + # Arrange + track = { + "condition": "untreated", + "biosource_name": "H1-hESC", + "dataset": "ds1", + "experiment_type": "ChIP-seq", + "assay_info": "info", + "replicate_info": "Biorep 1 Techrep 1", + } + graph = [ + { + "accession": "4DNFIMTTOWBN", + "file_type_detailed": "conservative peaks (bigbed)", + "track_and_facet_info": track, + } + ] + session = _FixedGraphSession(graph) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + entry = result["4DNFIMTTOWBN"] + assert entry["file_type_detailed"] == "conservative peaks (bigbed)" + for key, value in track.items(): + assert entry[key] == value + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_skip_falsy_track_fields(self, mocker): + """Test that falsy track sub-fields are omitted. + + Given: + An item whose track_and_facet_info mixes truthy and falsy sub-fields. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should keep only the truthy sub-fields. + """ + # Arrange + graph = [ + { + "accession": "4DNFIMTTOWBN", + "track_and_facet_info": { + "condition": "untreated", + "biosource_name": "", + "dataset": None, + }, + } + ] + session = _FixedGraphSession(graph) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + entry = result["4DNFIMTTOWBN"] + assert entry["condition"] == "untreated" + assert "biosource_name" not in entry + assert "dataset" not in entry + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_pass_extra_files_through_parser( + self, mocker + ): + """Test that extra_files are normalized via parse_extra_files. + + Given: + An item whose extra_files list carries a raw sidecar entry. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + Its entry's extra_files should equal parse_extra_files of the raw list. + """ + # Arrange + raw_extra = [ + {"href": "/x.pairs.gz.px2", "file_format": "pairs_px2", "file_size": 5} + ] + session = _FixedGraphSession( + [{"accession": "4DNFIMTTOWBN", "extra_files": raw_extra}] + ) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert result["4DNFIMTTOWBN"]["extra_files"] == parse_extra_files(raw_extra) + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_omit_extra_files_when_none(self, mocker): + """Test that a missing extra_files list yields no extra_files key. + + Given: + An item with a mappable field but no extra_files. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + Its entry should have no extra_files key. + """ + # Arrange + session = _FixedGraphSession( + [{"accession": "4DNFIMTTOWBN", "genome_assembly": "GRCh38"}] + ) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFIMTTOWBN"]) + + # Assert + assert "extra_files" not in result["4DNFIMTTOWBN"] + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_omit_accession_absent_from_graph( + self, mocker + ): + """Test that an accession with no upstream item is omitted. + + Given: + A requested accession that the upstream graph does not return. + When: + fetch_file_metadata_bulk requests its metadata. + Then: + It should still issue the request but omit that accession without error. + """ + # Arrange + session = _FixedGraphSession([]) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(["4DNFMISSING1"]) + + # Assert + assert result == {} + assert len(session.get_urls) == 1 + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_return_empty_when_all_batches_fail( + self, mocker + ): + """Test that all-failing batches return empty without raising. + + Given: + Two batches whose requests both fail (a status error and a network + error). + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should return an empty dict and not raise. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 2) + accessions = [f"4DNF{i:07d}" for i in range(4)] + session = _EchoSession(failures={0: 500, 1: aiohttp.ClientError("boom")}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert result == {} + + @pytest.mark.asyncio + async def test_fetch_file_metadata_bulk_should_continue_when_middle_batch_fails( + self, mocker + ): + """Test that a failed middle batch does not drop later batches. + + Given: + Three single-accession batches where the middle request fails. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + It should keep the first and third batches and drop only the middle. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 1) + accessions = [f"4DNF{i:07d}" for i in range(3)] + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions} + session = _EchoSession(field_map=field_map, failures={1: 500}) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = await fourdn.fetch_file_metadata_bulk(accessions) + + # Assert + assert accessions[0] in result + assert accessions[1] not in result + assert accessions[2] in result + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), max_size=20 + ) + ) + def test_pbt_001_request_union_equals_deduped_input(self, mocker, accessions): + """Test that every distinct accession is requested exactly once. + + Given: + Any list of 4DN accessions (possibly with duplicates). + When: + fetch_file_metadata_bulk requests their metadata. + Then: + The union of accessions across all issued queries should equal the + deduped, non-empty input. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + requested = {acc for url in session.get_urls for acc in _accession_params(url)} + assert requested == {acc for acc in accessions if acc} + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), min_size=1, max_size=20 + ) + ) + def test_pbt_002_batches_partition_input_and_are_size_bounded( + self, mocker, accessions + ): + """Test that batches partition the input within the size bound. + + Given: + Any non-empty list of 4DN accessions and a fixed batch size. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + Each accession should appear in exactly one query, every query should + hold between one and the batch size, and the query count should be the + ceiling of the deduped count over the batch size. + """ + # Arrange + batch_size = 3 + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", batch_size) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + per_url = [_accession_params(url) for url in session.get_urls] + flat = [acc for params in per_url for acc in params] + deduped = {acc for acc in accessions if acc} + assert len(flat) == len(set(flat)) + assert set(flat) == deduped + assert all(1 <= len(params) <= batch_size for params in per_url) + assert len(session.get_urls) == math.ceil(len(deduped) / batch_size) + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), max_size=20 + ) + ) + def test_pbt_003_result_keys_subset_of_deduped_input(self, mocker, accessions): + """Test that result keys never exceed the requested accessions. + + Given: + Any list of 4DN accessions, each with upstream metadata. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + The result keys should be a subset of the deduped, non-empty input. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + field_map = {acc: {"genome_assembly": "GRCh38"} for acc in accessions if acc} + session = _EchoSession(field_map=field_map) + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + result = asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + assert set(result) <= {acc for acc in accessions if acc} + + @settings( + max_examples=50, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + accessions=st.lists( + st.from_regex(r"4DNF[A-Z0-9]{7}", fullmatch=True), min_size=1, max_size=20 + ) + ) + def test_pbt_004_no_url_contains_from_offset(self, mocker, accessions): + """Test that deep-pagination offsets are never emitted. + + Given: + Any non-empty list of 4DN accessions. + When: + fetch_file_metadata_bulk requests their metadata. + Then: + No issued query should contain a ``from`` offset. + """ + # Arrange + mocker.patch.object(fourdn, "_FILE_METADATA_BATCH_SIZE", 3) + mocker.patch.object(fourdn.asyncio, "sleep", mocker.AsyncMock()) + session = _EchoSession() + mocker.patch.object(fourdn.aiohttp, "ClientSession", return_value=session) + + # Act + asyncio.run(fourdn.fetch_file_metadata_bulk(accessions)) + + # Assert + assert all("from=" not in url for url in session.get_urls)