Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 121 additions & 103 deletions src/cfdb/services/fourdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import asyncio
import logging
import re
from collections.abc import Iterable
from typing import Optional

import aiohttp
Expand All @@ -96,6 +97,12 @@
# Rate limit: max 10 requests/second → 100ms between requests
_REQUEST_INTERVAL = 0.1

# Accessions per Search-API request when fetching file metadata by accession.
# Kept well under the Fourfront 10,000-row result-window cap (and short
# enough to keep the request URL within limits) so each batch returns every
# requested file rather than a truncated deep-pagination window.
_FILE_METADATA_BATCH_SIZE = 100

# 4DN accession pattern: 4DNF followed by alphanumeric characters
_ACCESSION_RE = re.compile(r"4DNF[A-Z0-9]+")

Expand Down Expand Up @@ -156,127 +163,138 @@ def parse_extra_files(extra_files_raw: list) -> list[dict]:
return parsed_files


async def fetch_file_metadata_bulk() -> dict[str, dict]:
async def fetch_file_metadata_bulk(accessions: Iterable[str]) -> dict[str, dict]:
"""
Fetch file metadata from the 4DN Search API for FileProcessed and FileFastq types.
Fetch file metadata from the 4DN Search API for the given accessions.

Queries the Search API filtered by accession in bounded batches rather
than deep-paginating every 4DN file. The Fourfront/Elasticsearch Search
API caps every result window at 10,000 rows (a ``from``-paginated scan
silently stops there and reports ``total`` clamped to 10,000), so the
old full scan retrieved only the first ~10k of each file type and left
the tens of thousands of remaining files un-enriched. Filtering each
query by a batch of accessions keeps its result set far under the
window, so every requested file is fetched regardless of corpus size.

A single ``type=File`` query covers both ``FileProcessed`` and
``FileFastq`` (and any other file subtype).

Paginates through all results and returns a dict keyed by accession.
Args:
accessions: 4DN file accessions (e.g. ``4DNF...``) to fetch metadata
for — typically the accessions of the materialized 4DN files.

Returns:
{accession: {genome_assembly, file_type, file_type_detailed, condition,
biosource_name, dataset, experiment_type, assay_info,
replicate_info}}
replicate_info, extra_files}}
"""
config = get_dcc_config("4dn")
api_base = config["api_base"]
results: dict[str, dict] = {}

file_types = ["FileProcessed", "FileFastq"]

# Dedupe and order for deterministic batching.
unique = sorted({acc for acc in accessions if acc})
if not unique:
return results

field_params = (
"&field=accession"
"&field=genome_assembly"
"&field=file_type"
"&field=file_type_detailed"
"&field=track_and_facet_info"
"&field=extra_files"
)

failed_batches = 0
async with aiohttp.ClientSession() as session:
for file_type in file_types:
offset = 0
limit = 1000

while True:
url = (
f"{api_base}/search/"
f"?type={file_type}"
f"&field=accession"
f"&field=genome_assembly"
f"&field=file_type"
f"&field=file_type_detailed"
f"&field=track_and_facet_info"
f"&field=extra_files"
f"&limit={limit}"
f"&from={offset}"
f"&format=json"
)

try:
async with session.get(
url,
headers={"Accept": "application/json", "User-Agent": "cfdb/1.0"},
timeout=aiohttp.ClientTimeout(total=60),
) as response:
if response.status != 200:
logger.error(
f"4DN Search API error for {file_type}: HTTP {response.status}"
)
break

data = await response.json()

except aiohttp.ClientError as e:
logger.error(f"4DN Search API network error: {e}")
break

await asyncio.sleep(_REQUEST_INTERVAL)

graph = data.get("@graph", [])
if not graph:
break
for start in range(0, len(unique), _FILE_METADATA_BATCH_SIZE):
batch = unique[start : start + _FILE_METADATA_BATCH_SIZE]
acc_params = "".join(f"&accession={acc}" for acc in batch)
url = (
f"{api_base}/search/"
f"?type=File"
f"{acc_params}"
f"{field_params}"
f"&limit={len(batch)}"
f"&format=json"
)

for item in graph:
accession = item.get("accession")
if not accession:
try:
async with session.get(
url,
headers={"Accept": "application/json", "User-Agent": "cfdb/1.0"},
timeout=aiohttp.ClientTimeout(total=60),
) as response:
if response.status != 200:
# Skip only this batch — never silently truncate the
# whole fetch (the deep-pagination bug this replaces).
logger.warning(
"4DN Search API error for accession batch "
f"({len(batch)} accessions): HTTP {response.status}"
)
failed_batches += 1
continue

entry: dict = {}

# Direct fields
genome_assembly = item.get("genome_assembly")
if genome_assembly:
entry["genome_assembly"] = genome_assembly

file_type_val = item.get("file_type")
if file_type_val:
entry["file_type"] = file_type_val

file_type_detailed = item.get("file_type_detailed")
if file_type_detailed:
entry["file_type_detailed"] = file_type_detailed

# Fields from track_and_facet_info
track_info = item.get("track_and_facet_info", {})
if track_info:
for key in (
"condition",
"biosource_name",
"dataset",
"experiment_type",
"assay_info",
"replicate_info",
):
val = track_info.get(key)
if val:
entry[key] = val

# Extra files (index files like .px2, .bai)
extra_files = parse_extra_files(item.get("extra_files", []))
if extra_files:
entry["extra_files"] = extra_files

if entry:
results[accession] = entry

total = data.get("total", 0)
offset += limit

if offset % 5000 == 0:
logger.info(
f"Fetched {min(offset, total)}/{total} {file_type} records from 4DN API"
)
data = await response.json()

if offset >= total:
break
except aiohttp.ClientError as e:
logger.warning(
"4DN Search API network error for accession batch "
f"({len(batch)} accessions): {e}"
)
failed_batches += 1
continue

logger.info(
f"4DN API: fetched {file_type} metadata, "
f"{sum(1 for a, e in results.items() if e)} entries so far"
)
await asyncio.sleep(_REQUEST_INTERVAL)

logger.info(f"4DN API: {len(results)} total file metadata entries fetched")
for item in data.get("@graph", []):
accession = item.get("accession")
if not accession:
continue

entry: dict = {}

# Direct fields
genome_assembly = item.get("genome_assembly")
if genome_assembly:
entry["genome_assembly"] = genome_assembly

file_type_val = item.get("file_type")
if file_type_val:
entry["file_type"] = file_type_val

file_type_detailed = item.get("file_type_detailed")
if file_type_detailed:
entry["file_type_detailed"] = file_type_detailed

# Fields from track_and_facet_info
track_info = item.get("track_and_facet_info", {})
if track_info:
for key in (
"condition",
"biosource_name",
"dataset",
"experiment_type",
"assay_info",
"replicate_info",
):
val = track_info.get(key)
if val:
entry[key] = val

# Extra files (index files like .px2, .bai)
extra_files = parse_extra_files(item.get("extra_files", []))
if extra_files:
entry["extra_files"] = extra_files

if entry:
results[accession] = entry

logger.info(
f"4DN API: fetched metadata for {len(results)}/{len(unique)} "
f"requested files ({failed_batches} batch(es) failed)"
)
return results


Expand Down
24 changes: 14 additions & 10 deletions src/cfdb/services/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,9 @@ async def _enrich_4dn_api_metadata() -> None:
if api.db is None:
raise RuntimeError("Database not initialized")

# Fetch API data
file_metadata = await fetch_file_metadata_bulk()
biosource_tiers = await fetch_biosource_tiers()

logger.info(
f"4DN enrichment: {len(file_metadata)} file entries, "
f"{len(biosource_tiers)} biosource tier entries"
)

# Build accession -> _id lookup from existing files (avoids $regex per update)
# Build accession -> _id lookup from existing files (avoids $regex per
# update) first, so enrichment metadata is fetched for exactly those
# accessions.
accession_to_id: dict[str, object] = {}
cursor = api.db.files.find(
{"submission": "4dn"},
Expand All @@ -267,6 +260,17 @@ async def _enrich_4dn_api_metadata() -> None:

logger.info(f"4DN enrichment: {len(accession_to_id)} files in DB mapped by accession")

# Fetch API data for exactly the accessions we hold. Batching the Search
# API query by accession bypasses its 10k result-window cap, which a
# blind deep-pagination scan would silently hit and truncate.
file_metadata = await fetch_file_metadata_bulk(accession_to_id.keys())
biosource_tiers = await fetch_biosource_tiers()

logger.info(
f"4DN enrichment: {len(file_metadata)} file entries, "
f"{len(biosource_tiers)} biosource tier entries"
)

# Build bulk update operations matched by _id
operations = []
for accession, meta in file_metadata.items():
Expand Down
Loading
Loading