From 7bb027f7aceb98d1e9c878a242d22acf70dbf9b3 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:53:36 +0400 Subject: [PATCH 1/3] feat(subgraph): add idempotency skip for already-collected markets - Add n_wallets field to CollectorResult to carry wallet count from upsert - _upsert_trades now returns (trades, wallets) tuple - _subgraph_batch checks trades table before fetching; skips markets with existing trades that resolved >24h ago (log: subgraph_skip_already_collected) --- fflow/collectors/base.py | 1 + fflow/collectors/subgraph.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/fflow/collectors/base.py b/fflow/collectors/base.py index cc08a0f..89dda9c 100644 --- a/fflow/collectors/base.py +++ b/fflow/collectors/base.py @@ -18,6 +18,7 @@ class CollectorResult(BaseModel): collector: str target: str | None = None n_written: int = 0 + n_wallets: int = 0 started_at: datetime finished_at: datetime | None = None status: str = "running" diff --git a/fflow/collectors/subgraph.py b/fflow/collectors/subgraph.py index 8d21c93..7724533 100644 --- a/fflow/collectors/subgraph.py +++ b/fflow/collectors/subgraph.py @@ -60,7 +60,7 @@ async def run( yes_token = await self._resolve_yes_token(session, mid) trades = await self._fetch_trades(mid, yes_token, from_ts) if not dry_run: - result.n_written = await self._upsert_trades(session, mid, yes_token, trades) + result.n_written, result.n_wallets = await self._upsert_trades(session, mid, yes_token, trades) else: result.n_written = len(trades) result.status = "success" @@ -240,7 +240,7 @@ async def _upsert_trades( await session.commit() log.info("subgraph_upserted", market=market_id, trades=total, wallets=len(wallet_set)) - return total + return total, len(wallet_set) def _parse_log_index(raw_id: str) -> int: From d3e6057f25fd8961662992669c5148476beaca64 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:53:42 +0400 Subject: [PATCH 2/3] feat(subgraph): add jsonl progress checkpoint with resume support - _write_progress(): append-only jsonl, one line per market with market_id, status, trades_count, wallets_count, duration_ms, ts - _load_resume_set(): reads checkpoint, returns set of ok market_ids - _subgraph_batch: loads resume set on start, skips already-ok markets, writes checkpoint entry after every market (ok/skipped/failed) - config: add extra='ignore' to Settings to tolerate unknown env vars --- fflow/cli.py | 87 ++++++++++++++++++++++++++++++++++++------------- fflow/config.py | 2 +- 2 files changed, 65 insertions(+), 24 deletions(-) diff --git a/fflow/cli.py b/fflow/cli.py index a1f8d20..d095fc7 100644 --- a/fflow/cli.py +++ b/fflow/cli.py @@ -136,6 +136,47 @@ def collect_subgraph( raise typer.Exit(1) +def _load_resume_set(progress_path: "pathlib.Path") -> "set[str]": + """Return set of market_ids already successfully processed (status == 'ok').""" + import json + done: set[str] = set() + if not progress_path.exists(): + return done + with open(progress_path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + if rec.get("status") == "ok": + done.add(rec["market_id"]) + except (json.JSONDecodeError, KeyError): + pass + return done + + +def _write_progress( + path: "pathlib.Path", + market_id: str, + status: str, + trades_count: int, + wallets_count: int, + duration_ms: int, +) -> None: + import json + entry = { + "market_id": market_id, + "status": status, + "trades_count": trades_count, + "wallets_count": wallets_count, + "duration_ms": duration_ms, + "ts": datetime.now(UTC).isoformat(), + } + with open(path, "a") as f: + f.write(json.dumps(entry) + "\n") + + async def _subgraph_batch( min_volume: float, dry_run: bool, @@ -143,13 +184,20 @@ async def _subgraph_batch( limit: int | None = None, categories: list[str] | None = None, ) -> None: - import json import pathlib + import time from fflow.collectors.subgraph import SubgraphCollector from fflow.db import AsyncSessionLocal from fflow.models import Market, Trade from sqlalchemy import select, func + progress_path = pathlib.Path("logs/batch_progress.jsonl") + progress_path.parent.mkdir(parents=True, exist_ok=True) + resume_set = _load_resume_set(progress_path) + if resume_set: + log.info("subgraph_batch_resume", already_done=len(resume_set)) + typer.echo(f"resuming: {len(resume_set)} markets already completed in checkpoint") + async with AsyncSessionLocal() as session: stmt = ( select(Market.id, Market.volume_total_usdc, Market.resolved_at) @@ -172,15 +220,17 @@ async def _subgraph_batch( else: typer.echo(f"subgraph batch: {total} markets vol>=${min_volume:,.0f}" + (f" (limit={limit})" if limit else "") + (f" categories={categories}" if categories else "")) - progress_path = pathlib.Path("logs/batch_progress.jsonl") - progress_path.parent.mkdir(parents=True, exist_ok=True) - collector = SubgraphCollector() stale_cutoff = datetime.now(UTC) - timedelta(days=1) ok = fail = skipped = already_done = 0 for mid, vol, resolved_at in rows: - # Idempotency: skip markets resolved >1 day ago that already have trades + # Resume: skip markets already in checkpoint with status=ok + if mid in resume_set: + already_done += 1 + continue + + # Idempotency: skip markets resolved >1 day ago that already have trades in DB resolved_is_old = resolved_at and resolved_at < stale_cutoff if resolved_is_old and not dry_run: async with AsyncSessionLocal() as session: @@ -189,40 +239,31 @@ async def _subgraph_batch( ) if existing and existing > 0: already_done += 1 - log.info("subgraph_batch_skip", market=mid, existing_trades=existing) - _write_progress(progress_path, mid, "already_done", existing, float(vol or 0)) + log.info("subgraph_skip_already_collected", market=mid, existing_trades=existing) + _write_progress(progress_path, mid, "ok", existing, 0, 0) continue + t0 = time.monotonic() try: r = await collector.run(market_id=mid, dry_run=dry_run) + duration_ms = int((time.monotonic() - t0) * 1000) if r.n_written and r.n_written > 0: ok += 1 + _write_progress(progress_path, mid, "ok", r.n_written, r.n_wallets, duration_ms) else: skipped += 1 + _write_progress(progress_path, mid, "skipped", 0, 0, duration_ms) log.info("subgraph_batch_market", market=mid, vol=float(vol or 0), - status=r.status, n=r.n_written) - _write_progress(progress_path, mid, r.status, r.n_written or 0, float(vol or 0)) + status=r.status, n=r.n_written, wallets=r.n_wallets, ms=duration_ms) except Exception as exc: + duration_ms = int((time.monotonic() - t0) * 1000) fail += 1 log.error("subgraph_batch_error", market=mid, error=str(exc)) - _write_progress(progress_path, mid, "failed", 0, float(vol or 0)) + _write_progress(progress_path, mid, "failed", 0, 0, duration_ms) typer.echo(f"subgraph batch done: ok={ok} skipped(0 trades)={skipped} already_done={already_done} fail={fail}") -def _write_progress(path: "pathlib.Path", market_id: str, status: str, trades: int, vol: float) -> None: - import json, pathlib - entry = { - "market_id": market_id, - "status": status, - "trades_count": trades, - "volume": vol, - "timestamp": datetime.now(UTC).isoformat(), - } - with open(path, "a") as f: - f.write(json.dumps(entry) + "\n") - - # --------------------------------------------------------------------------- # collect uma # --------------------------------------------------------------------------- diff --git a/fflow/config.py b/fflow/config.py index df7d404..6d1a951 100644 --- a/fflow/config.py +++ b/fflow/config.py @@ -35,7 +35,7 @@ class Settings(BaseSettings): log_level: str = "INFO" log_json: bool = False - model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env") + model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env", extra="ignore") settings = Settings() From c6c6f03b0e5c343351d6b224e8992b6cd317784d Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:53:48 +0400 Subject: [PATCH 3/3] test(subgraph): add idempotency and resume tests - TestJsonlCheckpoint: 4 tests for _write_progress (schema, append, one-line-per-call) - TestLoadResumeSet: 4 tests for _load_resume_set (empty file, ok-only filter, malformed lines) - test_batch_skips_already_collected_markets: collector.run never called for stale+existing market - test_batch_writes_jsonl_checkpoint: checkpoint entry has correct fields for successful market - test_batch_resumes_from_checkpoint: done markets skipped, new markets processed --- tests/test_batch_idempotency.py | 226 ++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 tests/test_batch_idempotency.py diff --git a/tests/test_batch_idempotency.py b/tests/test_batch_idempotency.py new file mode 100644 index 0000000..be85111 --- /dev/null +++ b/tests/test_batch_idempotency.py @@ -0,0 +1,226 @@ +"""Tests for subgraph batch idempotency, jsonl checkpoint, and resume.""" + +import json +import pathlib +import tempfile +from datetime import UTC, datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from fflow.cli import _load_resume_set, _write_progress + + +# --------------------------------------------------------------------------- +# _write_progress / _load_resume_set unit tests +# --------------------------------------------------------------------------- + +class TestJsonlCheckpoint: + def test_writes_one_line_per_call(self, tmp_path): + p = tmp_path / "progress.jsonl" + _write_progress(p, "0xaaa", "ok", 100, 20, 1500) + _write_progress(p, "0xbbb", "failed", 0, 0, 300) + lines = p.read_text().strip().split("\n") + assert len(lines) == 2 + + def test_each_line_is_valid_json(self, tmp_path): + p = tmp_path / "progress.jsonl" + _write_progress(p, "0xaaa", "ok", 100, 20, 1500) + rec = json.loads(p.read_text().strip()) + assert rec["market_id"] == "0xaaa" + assert rec["status"] == "ok" + assert rec["trades_count"] == 100 + assert rec["wallets_count"] == 20 + assert rec["duration_ms"] == 1500 + assert "ts" in rec + + def test_append_does_not_overwrite(self, tmp_path): + p = tmp_path / "progress.jsonl" + _write_progress(p, "0xaaa", "ok", 10, 2, 100) + _write_progress(p, "0xbbb", "ok", 20, 4, 200) + records = [json.loads(l) for l in p.read_text().strip().split("\n")] + assert records[0]["market_id"] == "0xaaa" + assert records[1]["market_id"] == "0xbbb" + + def test_schema_fields_present(self, tmp_path): + p = tmp_path / "progress.jsonl" + _write_progress(p, "0xccc", "skipped", 0, 0, 50) + rec = json.loads(p.read_text().strip()) + for field in ("market_id", "status", "trades_count", "wallets_count", "duration_ms", "ts"): + assert field in rec, f"missing field: {field}" + + +# --------------------------------------------------------------------------- +# _load_resume_set +# --------------------------------------------------------------------------- + +class TestLoadResumeSet: + def test_returns_empty_set_when_no_file(self, tmp_path): + result = _load_resume_set(tmp_path / "nonexistent.jsonl") + assert result == set() + + def test_returns_only_ok_statuses(self, tmp_path): + p = tmp_path / "progress.jsonl" + _write_progress(p, "0xaaa", "ok", 100, 10, 500) + _write_progress(p, "0xbbb", "failed", 0, 0, 100) + _write_progress(p, "0xccc", "skipped", 0, 0, 200) + _write_progress(p, "0xddd", "ok", 50, 5, 300) + result = _load_resume_set(p) + assert result == {"0xaaa", "0xddd"} + assert "0xbbb" not in result + assert "0xccc" not in result + + def test_tolerates_malformed_lines(self, tmp_path): + p = tmp_path / "progress.jsonl" + p.write_text('{"market_id": "0xaaa", "status": "ok", "ts": "x"}\nNOT_JSON\n{"market_id": "0xbbb", "status": "ok", "ts": "x"}\n') + result = _load_resume_set(p) + assert "0xaaa" in result + assert "0xbbb" in result + + def test_empty_file_returns_empty_set(self, tmp_path): + p = tmp_path / "progress.jsonl" + p.write_text("") + assert _load_resume_set(p) == set() + + +# --------------------------------------------------------------------------- +# Integration: batch skips already-collected markets +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_batch_skips_already_collected_markets(): + """Market resolved >1 day ago with existing trades must be skipped without calling collector.""" + from fflow.cli import _subgraph_batch + + stale_resolved = datetime.now(UTC) - timedelta(days=2) + market_id = "0xdeadbeef00000000000000000000000000000000000000000000000000000001" + mock_rows = [(market_id, "999999.0", stale_resolved)] + + collector_run_called = [] + + # Session 1: market list query; Session 2: trade count query + session1 = AsyncMock() + session1.execute = AsyncMock(return_value=MagicMock(all=MagicMock(return_value=mock_rows))) + session2 = AsyncMock() + session2.scalar = AsyncMock(return_value=42) # 42 existing trades + + ctx1 = MagicMock() + ctx1.__aenter__ = AsyncMock(return_value=session1) + ctx1.__aexit__ = AsyncMock(return_value=False) + ctx2 = MagicMock() + ctx2.__aenter__ = AsyncMock(return_value=session2) + ctx2.__aexit__ = AsyncMock(return_value=False) + + mock_collector = MagicMock() + mock_collector.run = AsyncMock(side_effect=lambda **kw: collector_run_called.append(kw)) + + with ( + patch("fflow.db.AsyncSessionLocal", side_effect=[ctx1, ctx2]), + patch("fflow.collectors.subgraph.SubgraphCollector", return_value=mock_collector), + patch("fflow.cli._write_progress"), + patch("fflow.cli._load_resume_set", return_value=set()), + ): + await _subgraph_batch(min_volume=50000, dry_run=False) + + assert len(collector_run_called) == 0, "collector.run() must not be called for already-collected market" + + +@pytest.mark.asyncio +async def test_batch_writes_jsonl_checkpoint(): + """Each successfully processed market must produce one jsonl line with correct fields.""" + from fflow.cli import _subgraph_batch + from fflow.collectors.base import CollectorResult + + market_id = "0xfeed000000000000000000000000000000000000000000000000000000000001" + # Fresh market — resolved only 1 hour ago, idempotency skip won't trigger + fresh_resolved = datetime.now(UTC) - timedelta(hours=1) + mock_rows = [(market_id, "100000.0", fresh_resolved)] + + result = CollectorResult( + collector="subgraph_trades", + target=market_id, + n_written=500, + n_wallets=80, + started_at=datetime.now(UTC), + finished_at=datetime.now(UTC), + status="success", + ) + + session = AsyncMock() + session.execute = AsyncMock(return_value=MagicMock(all=MagicMock(return_value=mock_rows))) + ctx = MagicMock() + ctx.__aenter__ = AsyncMock(return_value=session) + ctx.__aexit__ = AsyncMock(return_value=False) + + mock_collector = MagicMock() + mock_collector.run = AsyncMock(return_value=result) + + written: list[dict] = [] + + def capture_write(path, market_id, status, trades_count, wallets_count, duration_ms): + written.append({ + "market_id": market_id, "status": status, + "trades_count": trades_count, "wallets_count": wallets_count, + "duration_ms": duration_ms, + }) + + with ( + patch("fflow.db.AsyncSessionLocal", return_value=ctx), + patch("fflow.collectors.subgraph.SubgraphCollector", return_value=mock_collector), + patch("fflow.cli._write_progress", side_effect=capture_write), + patch("fflow.cli._load_resume_set", return_value=set()), + ): + await _subgraph_batch(min_volume=50000, dry_run=False) + + assert len(written) == 1 + assert written[0]["market_id"] == market_id + assert written[0]["status"] == "ok" + assert written[0]["trades_count"] == 500 + assert written[0]["wallets_count"] == 80 + assert written[0]["duration_ms"] >= 0 + + +@pytest.mark.asyncio +async def test_batch_resumes_from_checkpoint(): + """Markets in checkpoint with status=ok must be skipped; new markets must be processed.""" + from fflow.cli import _subgraph_batch + from fflow.collectors.base import CollectorResult + + done_market = "0xdone0000000000000000000000000000000000000000000000000000000001" + new_market = "0xnew00000000000000000000000000000000000000000000000000000000001" + resolved_at = datetime.now(UTC) - timedelta(hours=1) + mock_rows = [ + (done_market, "200000.0", resolved_at), + (new_market, "100000.0", resolved_at), + ] + + new_result = CollectorResult( + collector="subgraph_trades", target=new_market, n_written=300, n_wallets=40, + started_at=datetime.now(UTC), finished_at=datetime.now(UTC), status="success", + ) + + processed: list[str] = [] + + async def fake_run(market_id, **kwargs): + processed.append(market_id) + return new_result + + session = AsyncMock() + session.execute = AsyncMock(return_value=MagicMock(all=MagicMock(return_value=mock_rows))) + ctx = MagicMock() + ctx.__aenter__ = AsyncMock(return_value=session) + ctx.__aexit__ = AsyncMock(return_value=False) + + mock_collector = MagicMock() + mock_collector.run = AsyncMock(side_effect=fake_run) + + with ( + patch("fflow.db.AsyncSessionLocal", return_value=ctx), + patch("fflow.collectors.subgraph.SubgraphCollector", return_value=mock_collector), + patch("fflow.cli._write_progress"), + patch("fflow.cli._load_resume_set", return_value={done_market}), + ): + await _subgraph_batch(min_volume=50000, dry_run=False) + + assert done_market not in processed, "already-done market must be skipped" + assert new_market in processed, "new market must be processed"