From e88f17a1c9b92a86d6cba055363d289fe6ab13c1 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:46:01 +0400 Subject: [PATCH 1/7] fix(gdelt): fire gdelt_unavailable warning only once per process --- fflow/news/gdelt.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/fflow/news/gdelt.py b/fflow/news/gdelt.py index 596c1d0..ae78b71 100644 --- a/fflow/news/gdelt.py +++ b/fflow/news/gdelt.py @@ -25,6 +25,8 @@ _WINDOW_BEFORE_RESOLVE = timedelta(days=30) _MAX_RESULTS = 5 +_import_warned = False # fire gdelt_unavailable only once per process + # NLTK stopwords (English) — loaded lazily _STOPWORDS: set[str] | None = None @@ -98,14 +100,17 @@ async def search_gdelt( When dry_run=True, prints the query that would run and returns None. """ + global _import_warned try: from google.cloud import bigquery # type: ignore from google.api_core.exceptions import GoogleAPICallError # type: ignore except ImportError: - log.warning( - "gdelt_unavailable", - reason="google-cloud-bigquery not installed; install with: uv pip install 'fflow[gdelt]'", - ) + if not _import_warned: + _import_warned = True + log.warning( + "gdelt_unavailable", + reason="google-cloud-bigquery not installed; install with: uv pip install 'fflow[gdelt]'", + ) return None keywords = _extract_keywords(question) From 263faaca9f41a9033dde295ec855cbeb0f1a96ba Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:46:04 +0400 Subject: [PATCH 2/7] fix(subgraph): use yes_token in market filter, enrichedOrderFilleds entity, timeout+retry --- fflow/collectors/subgraph.py | 101 +++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 45 deletions(-) diff --git a/fflow/collectors/subgraph.py b/fflow/collectors/subgraph.py index 509c206..8d21c93 100644 --- a/fflow/collectors/subgraph.py +++ b/fflow/collectors/subgraph.py @@ -16,9 +16,12 @@ _PAGE_SIZE = 1000 +# API note (verified 2026-04-26): The Polymarket subgraph exposes `enrichedOrderFilleds` +# (not `orderFilleds`). market.id is the YES token decimal ID. size is raw int (divide by +# 1e6 for shares). price is already a 0-1 decimal. side is "Buy"/"Sell" relative to the token. _TRADES_QUERY = gql(""" query Trades($market: String!, $lastId: String!, $first: Int!) { - orderFilleds( + enrichedOrderFilleds( where: { market: $market, id_gt: $lastId } first: $first orderBy: id @@ -27,16 +30,13 @@ id timestamp transactionHash - maker - taker - makerAssetId - takerAssetId - makerAmountFilled - takerAmountFilled - fee - market { - id - } + orderHash + maker { id } + taker { id } + market { id } + side + size + price } } """) @@ -91,8 +91,12 @@ def _make_client(self) -> Client: headers = {"Accept": "application/json"} if settings.thegraph_api_key: headers["Authorization"] = f"Bearer {settings.thegraph_api_key}" - transport = HTTPXAsyncTransport(url=settings.subgraph_url, headers=headers) - return Client(transport=transport, fetch_schema_from_transport=False) + transport = HTTPXAsyncTransport( + url=settings.subgraph_url, + headers=headers, + timeout=60.0, + ) + return Client(transport=transport, fetch_schema_from_transport=False, execute_timeout=60) async def _fetch_trades( self, @@ -100,21 +104,35 @@ async def _fetch_trades( yes_token: str, from_ts: datetime | None, ) -> list[dict]: + import asyncio as _asyncio + from gql.transport.exceptions import TransportConnectionFailed + from_unix = int(from_ts.timestamp()) if from_ts else 0 all_trades: list[dict] = [] last_id = "" async with self._make_client() as client: while True: - result = await client.execute( - _TRADES_QUERY, - variable_values={ - "market": market_id.lower(), - "lastId": last_id, - "first": _PAGE_SIZE, - }, - ) - page = result.get("orderFilleds", []) + for attempt in range(3): + try: + result = await client.execute( + _TRADES_QUERY, + variable_values={ + "market": yes_token, + "lastId": last_id, + "first": _PAGE_SIZE, + }, + ) + break + except (TransportConnectionFailed, Exception) as exc: + from gql.transport.exceptions import TransportQueryError + if isinstance(exc, TransportQueryError) and "bad indexers" in str(exc): + raise # indexer unavailable — no point retrying + if attempt == 2: + raise + await _asyncio.sleep(2 ** attempt) + + page = result.get("enrichedOrderFilleds", []) if not page: break @@ -142,31 +160,20 @@ async def _upsert_trades( for t in raw_trades: tx = t.get("transactionHash", "") - # log_index from id (format: txHash-logIndex or sequential id) raw_id = t.get("id", "") log_idx = _parse_log_index(raw_id) ts = datetime.fromtimestamp(int(t["timestamp"]), tz=UTC) - taker = (t.get("taker") or "").lower() - maker = (t.get("maker") or "").lower() or None - - taker_asset = t.get("takerAssetId", "") - maker_amount = int(t.get("makerAmountFilled", 0)) - taker_amount = int(t.get("takerAmountFilled", 0)) - - # taker receives YES token → BUY; taker gives YES token → SELL - if str(taker_asset) == yes_token: - side = "BUY" - outcome_index = 1 - size_shares = taker_amount - usdc_paid = maker_amount - else: - side = "SELL" - outcome_index = 1 - size_shares = maker_amount - usdc_paid = taker_amount - - price_val = (usdc_paid / size_shares / 1e6) if size_shares else 0 - notional = usdc_paid / 1e6 + taker = ((t.get("taker") or {}).get("id") or "").lower() + maker = ((t.get("maker") or {}).get("id") or "").lower() or None + + raw_side = t.get("side", "Buy") + side = "BUY" if raw_side.lower() == "buy" else "SELL" + outcome_index = 1 # all enrichedOrderFilleds filtered by YES token + + raw_size = int(t.get("size", 0)) + size_shares = raw_size / 1e6 + price_val = float(t.get("price", 0)) + notional = size_shares * price_val trade_rows.append({ "market_id": market_id, @@ -177,7 +184,7 @@ async def _upsert_trades( "maker_address": maker, "side": side, "outcome_index": outcome_index, - "size_shares": str(size_shares / 1e6), + "size_shares": str(round(size_shares, 6)), "price": str(round(price_val, 6)), "notional_usdc": str(round(notional, 6)), "raw_event": t, @@ -237,6 +244,10 @@ async def _upsert_trades( def _parse_log_index(raw_id: str) -> int: + # enrichedOrderFilleds id format: txHash_orderHash — hash the orderHash part + if "_" in raw_id: + order_hash_part = raw_id.split("_", 1)[-1] + return int(order_hash_part, 16) % 2**31 if order_hash_part.startswith("0x") else hash(order_hash_part) % 2**31 if "-" in raw_id: try: return int(raw_id.split("-")[-1]) From bfa93dd9187e9a0d58dd46f8dec6cab01de3ca61 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:46:07 +0400 Subject: [PATCH 3/7] =?UTF-8?q?fix(gamma):=20map=20closedTime=E2=86=92reso?= =?UTF-8?q?lved=5Fat,=20add=20=5Fgamma=5Foutcome=20helper=20and=20historic?= =?UTF-8?q?al=20backfill?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fflow/collectors/gamma.py | 91 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/fflow/collectors/gamma.py b/fflow/collectors/gamma.py index 7288046..62af1a9 100644 --- a/fflow/collectors/gamma.py +++ b/fflow/collectors/gamma.py @@ -4,6 +4,11 @@ Category information is derived from the event title (events[0].title) and the filter tag used during collection, stored together in category_raw. clobTokenIds is a JSON-encoded string and must be parsed with json.loads(). + +Historical backfill mode (--closed): + Use ?closed=true&end_date_min=YYYY-MM-DD&end_date_max=YYYY-MM-DD to sweep + through historical resolved markets month-by-month. + outcomePrices: ["1","0"] = YES won (outcome=1); ["0","1"] = NO won (outcome=0). """ import json @@ -30,13 +35,19 @@ async def run( target: str | None = None, since: datetime | None = None, categories: list[str] | None = None, + closed: bool = False, + end_date_min: str | None = None, + end_date_max: str | None = None, dry_run: bool = False, ) -> CollectorResult: result = self._start_result(target) async with AsyncSessionLocal() as session: run_id = await self._record_run_start(session, result) try: - markets = await self._fetch_markets(since=since, categories=categories) + if closed: + markets = await self._fetch_closed(end_date_min=end_date_min, end_date_max=end_date_max) + else: + markets = await self._fetch_markets(since=since, categories=categories) if not dry_run: result.n_written = await self._upsert_markets(session, markets) else: @@ -73,6 +84,55 @@ async def _fetch_markets( log.info("gamma_fetched", n=len(all_markets)) return all_markets + async def _fetch_closed( + self, + end_date_min: str | None = None, + end_date_max: str | None = None, + ) -> list[dict]: + """Fetch historical resolved markets using closed=true + end_date range.""" + all_markets: list[dict] = [] + async with RetryableHTTPClient(base_url=settings.gamma_api_url) as client: + all_markets = await self._paginate_closed( + client, end_date_min=end_date_min, end_date_max=end_date_max + ) + log.info("gamma_closed_fetched", n=len(all_markets), + end_date_min=end_date_min, end_date_max=end_date_max) + return all_markets + + async def _paginate_closed( + self, + client: RetryableHTTPClient, + end_date_min: str | None, + end_date_max: str | None, + ) -> list[dict]: + results = [] + offset = 0 + while True: + params: dict = { + "closed": "true", + "limit": _PAGE_SIZE, + "offset": offset, + "order": "endDate", + "ascending": "false", + } + if end_date_min: + params["end_date_min"] = end_date_min + if end_date_max: + params["end_date_max"] = end_date_max + + resp = await client.get("/markets", params=params) + resp.raise_for_status() + page: list[dict] = resp.json() + if not page: + break + + results.extend(page) + if len(page) < _PAGE_SIZE: + break + offset += _PAGE_SIZE + + return results + async def _paginate( self, client: RetryableHTTPClient, @@ -136,6 +196,8 @@ async def _upsert_markets(self, session, raw_markets: list[dict]) -> int: "category_fflow": None, "created_at_chain": _parse_dt(m.get("createdAt") or m.get("startDate")), "end_date": _parse_dt(m.get("endDate")), + "resolved_at": _parse_dt(m.get("closedTime")), + "resolution_outcome": _gamma_outcome(m), "volume_total_usdc": m.get("volume"), "liquidity_usdc": m.get("liquidity"), "slug": m.get("slug"), @@ -143,7 +205,7 @@ async def _upsert_markets(self, session, raw_markets: list[dict]) -> int: "last_refreshed_at": now, }) - # asyncpg limit: 32767 params per query; with 12 cols per row → max ~2730 rows/batch + # asyncpg limit: 32767 params per query; with 14 cols per row → max ~2340 rows/batch _BATCH = 2000 for i in range(0, len(rows), _BATCH): batch = rows[i : i + _BATCH] @@ -158,6 +220,8 @@ async def _upsert_markets(self, session, raw_markets: list[dict]) -> int: "category_raw": insert(Market).excluded.category_raw, "created_at_chain": insert(Market).excluded.created_at_chain, "end_date": insert(Market).excluded.end_date, + "resolved_at": insert(Market).excluded.resolved_at, + "resolution_outcome": insert(Market).excluded.resolution_outcome, "volume_total_usdc": insert(Market).excluded.volume_total_usdc, "liquidity_usdc": insert(Market).excluded.liquidity_usdc, "slug": insert(Market).excluded.slug, @@ -176,3 +240,26 @@ def _parse_dt(value: str | None) -> datetime | None: if not value: return None return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +def _gamma_outcome(market: dict) -> int | None: + """Parse outcomePrices to determine resolution outcome. + + outcomePrices[0] = YES token final price. + ["1","0"] = YES won (outcome=1); ["0","1"] = NO won (outcome=0). + """ + prices_raw = market.get("outcomePrices") + if not prices_raw: + return None + prices = json.loads(prices_raw) if isinstance(prices_raw, str) else prices_raw + if not prices or len(prices) < 2: + return None + try: + first = float(prices[0]) + if abs(first - 1.0) < 0.01: + return 1 # YES token resolved at $1 + if abs(first - 0.0) < 0.01: + return 0 # YES token resolved at $0 → NO won + return None # intermediate price, not yet resolved + except (ValueError, TypeError): + return None From 8d652fe81003ad100343d3b42be052a73e1495e5 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:46:10 +0400 Subject: [PATCH 4/7] feat(cli): add subgraph --all-resolved batch mode and gamma --closed flags with idempotency --- fflow/cli.py | 140 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 127 insertions(+), 13 deletions(-) diff --git a/fflow/cli.py b/fflow/cli.py index 369c7c7..a1f8d20 100644 --- a/fflow/cli.py +++ b/fflow/cli.py @@ -41,14 +41,28 @@ def collect_gamma( categories: Annotated[ Optional[str], typer.Option(help="Comma-separated Polymarket tag names") ] = None, + closed: Annotated[bool, typer.Option("--closed", help="Fetch historical resolved markets")] = False, + end_date_min: Annotated[Optional[str], typer.Option(help="YYYY-MM-DD min end date (with --closed)")] = None, + end_date_max: Annotated[Optional[str], typer.Option(help="YYYY-MM-DD max end date (with --closed)")] = None, dry_run: Annotated[bool, typer.Option("--dry-run")] = False, ) -> None: - """Fetch market metadata from Polymarket Gamma API.""" + """Fetch market metadata from Polymarket Gamma API. + + Normal mode: fetches active + recently closed markets ordered by createdAt. + Historical mode (--closed): fetches resolved markets by end_date range. + """ from fflow.collectors.gamma import GammaCollector since_dt = _parse_date(since) cats = [c.strip() for c in categories.split(",")] if categories else [] - result = asyncio.run(GammaCollector().run(since=since_dt, categories=cats, dry_run=dry_run)) + result = asyncio.run(GammaCollector().run( + since=since_dt, + categories=cats, + closed=closed, + end_date_min=end_date_min, + end_date_max=end_date_max, + dry_run=dry_run, + )) typer.echo(f"gamma: {result.status}, n={result.n_written}") if result.error: typer.echo(f"error: {result.error}", err=True) @@ -89,25 +103,125 @@ def collect_clob( @collect_app.command("subgraph") def collect_subgraph( - market: Annotated[str, typer.Option(help="Market condition ID (0x...)")], + market: Annotated[Optional[str], typer.Option(help="Market condition ID (0x...)")] = None, from_ts: Annotated[Optional[str], typer.Option(help="ISO datetime")] = None, + all_resolved: Annotated[bool, typer.Option("--all-resolved", help="Run for all resolved markets")] = False, + min_volume: Annotated[float, typer.Option(help="Min volume_total_usdc filter (batch only)")] = 50000.0, + max_volume: Annotated[Optional[float], typer.Option(help="Max volume_total_usdc filter (batch only)")] = None, + limit: Annotated[Optional[int], typer.Option(help="Max markets to process in batch mode")] = None, + categories: Annotated[Optional[str], typer.Option(help="Comma-separated category_fflow filter (batch only)")] = None, dry_run: Annotated[bool, typer.Option("--dry-run")] = False, ) -> None: """Fetch full trade log from Polymarket subgraph.""" from fflow.collectors.subgraph import SubgraphCollector - result = asyncio.run( - SubgraphCollector().run( - market_id=market, - from_ts=_parse_dt(from_ts), - dry_run=dry_run, - ) - ) - typer.echo(f"subgraph: {result.status}, n={result.n_written}") - if result.error: - typer.echo(f"error: {result.error}", err=True) + if not market and not all_resolved: + typer.echo("Provide --market or --all-resolved", err=True) raise typer.Exit(1) + if all_resolved: + cats = [c.strip() for c in categories.split(",")] if categories else None + asyncio.run(_subgraph_batch(min_volume=min_volume, max_volume=max_volume, limit=limit, categories=cats, dry_run=dry_run)) + else: + result = asyncio.run( + SubgraphCollector().run( + market_id=market, + from_ts=_parse_dt(from_ts), + dry_run=dry_run, + ) + ) + typer.echo(f"subgraph: {result.status}, n={result.n_written}") + if result.error: + typer.echo(f"error: {result.error}", err=True) + raise typer.Exit(1) + + +async def _subgraph_batch( + min_volume: float, + dry_run: bool, + max_volume: float | None = None, + limit: int | None = None, + categories: list[str] | None = None, +) -> None: + import json + import pathlib + from fflow.collectors.subgraph import SubgraphCollector + from fflow.db import AsyncSessionLocal + from fflow.models import Market, Trade + from sqlalchemy import select, func + + async with AsyncSessionLocal() as session: + stmt = ( + select(Market.id, Market.volume_total_usdc, Market.resolved_at) + .where(Market.resolved_at.isnot(None)) + .where(Market.volume_total_usdc >= min_volume) + .order_by(Market.volume_total_usdc.desc()) + ) + if max_volume: + stmt = stmt.where(Market.volume_total_usdc <= max_volume) + if categories: + stmt = stmt.where(Market.category_fflow.in_(categories)) + if limit: + stmt = stmt.limit(limit) + rows = (await session.execute(stmt)).all() + + total = len(rows) + log.info("subgraph_batch_start", total=total, min_volume=min_volume, max_volume=max_volume, limit=limit, categories=categories) + if max_volume: + typer.echo(f"subgraph batch: {total} markets vol=[${min_volume:,.0f}, ${max_volume:,.0f}]") + else: + typer.echo(f"subgraph batch: {total} markets vol>=${min_volume:,.0f}" + (f" (limit={limit})" if limit else "") + (f" categories={categories}" if categories else "")) + + progress_path = pathlib.Path("logs/batch_progress.jsonl") + progress_path.parent.mkdir(parents=True, exist_ok=True) + + collector = SubgraphCollector() + stale_cutoff = datetime.now(UTC) - timedelta(days=1) + ok = fail = skipped = already_done = 0 + + for mid, vol, resolved_at in rows: + # Idempotency: skip markets resolved >1 day ago that already have trades + resolved_is_old = resolved_at and resolved_at < stale_cutoff + if resolved_is_old and not dry_run: + async with AsyncSessionLocal() as session: + existing = await session.scalar( + select(func.count()).select_from(Trade).where(Trade.market_id == mid) + ) + if existing and existing > 0: + already_done += 1 + log.info("subgraph_batch_skip", market=mid, existing_trades=existing) + _write_progress(progress_path, mid, "already_done", existing, float(vol or 0)) + continue + + try: + r = await collector.run(market_id=mid, dry_run=dry_run) + if r.n_written and r.n_written > 0: + ok += 1 + else: + skipped += 1 + log.info("subgraph_batch_market", market=mid, vol=float(vol or 0), + status=r.status, n=r.n_written) + _write_progress(progress_path, mid, r.status, r.n_written or 0, float(vol or 0)) + except Exception as exc: + fail += 1 + log.error("subgraph_batch_error", market=mid, error=str(exc)) + _write_progress(progress_path, mid, "failed", 0, float(vol or 0)) + + typer.echo(f"subgraph batch done: ok={ok} skipped(0 trades)={skipped} already_done={already_done} fail={fail}") + + +def _write_progress(path: "pathlib.Path", market_id: str, status: str, trades: int, vol: float) -> None: + import json, pathlib + entry = { + "market_id": market_id, + "status": status, + "trades_count": trades, + "volume": vol, + "timestamp": datetime.now(UTC).isoformat(), + } + with open(path, "a") as f: + f.write(json.dumps(entry) + "\n") + # --------------------------------------------------------------------------- # collect uma From fc9216d02044bd333bdc702a42808946d0c263c1 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:46:13 +0400 Subject: [PATCH 5/7] test: add regression tests for subgraph yes_token filter, gamma resolved_at and outcome --- tests/test_gamma.py | 67 ++++++++++++++++++++++- tests/test_subgraph.py | 121 ++++++++++++++++++++++++++++++++++------- 2 files changed, 167 insertions(+), 21 deletions(-) diff --git a/tests/test_gamma.py b/tests/test_gamma.py index 9395f2e..4357b83 100644 --- a/tests/test_gamma.py +++ b/tests/test_gamma.py @@ -6,7 +6,7 @@ import pytest -from fflow.collectors.gamma import GammaCollector, _parse_dt +from fflow.collectors.gamma import GammaCollector, _gamma_outcome, _parse_dt # --------------------------------------------------------------------------- @@ -65,6 +65,71 @@ def test_category_fflow_starts_null(self): assert collector.name == "gamma" +# --------------------------------------------------------------------------- +# Fix 2: resolved_at from closedTime +# --------------------------------------------------------------------------- + +class TestResolvedAtFromClosedTime: + """_upsert_markets must map closedTime → resolved_at.""" + + def _make_closed_market(self) -> dict: + return { + "conditionId": "0xdead1234", + "question": "Will Y happen?", + "closedTime": "2024-11-06T23:59:00Z", + "endDate": "2024-11-06T00:00:00Z", + "outcomePrices": '["1","0"]', + "volume": "100000.0", + "liquidity": "5000.0", + "slug": "will-y-happen", + "clobTokenIds": json.dumps(["9999", "8888"]), + "events": [{"title": "US Elections 2024"}], + } + + def test_resolved_at_extracted(self): + m = self._make_closed_market() + dt = _parse_dt(m.get("closedTime")) + assert dt is not None + assert dt.year == 2024 + assert dt.month == 11 + assert dt.day == 6 + + def test_resolved_at_is_none_when_no_closed_time(self): + m = self._make_closed_market() + del m["closedTime"] + assert _parse_dt(m.get("closedTime")) is None + + +# --------------------------------------------------------------------------- +# Fix 2: _gamma_outcome +# --------------------------------------------------------------------------- + +class TestGammaOutcome: + """outcomePrices["1","0"] = YES won (1); ["0","1"] = NO won (0); else None.""" + + def test_yes_outcome(self): + assert _gamma_outcome({"outcomePrices": '["1","0"]'}) == 1 + + def test_no_outcome(self): + assert _gamma_outcome({"outcomePrices": '["0","1"]'}) == 0 + + def test_partial_price_is_none(self): + assert _gamma_outcome({"outcomePrices": '["0.5","0.5"]'}) is None + + def test_missing_field_is_none(self): + assert _gamma_outcome({}) is None + + def test_list_format(self): + # outcomePrices may already be a list (not a string) + assert _gamma_outcome({"outcomePrices": ["1", "0"]}) == 1 + assert _gamma_outcome({"outcomePrices": ["0", "1"]}) == 0 + + def test_float_string_tolerance(self): + # "1.0" and "0.0" should also work + assert _gamma_outcome({"outcomePrices": '["1.0","0.0"]'}) == 1 + assert _gamma_outcome({"outcomePrices": '["0.0","1.0"]'}) == 0 + + # --------------------------------------------------------------------------- # Integration tests (VCR cassettes) # --------------------------------------------------------------------------- diff --git a/tests/test_subgraph.py b/tests/test_subgraph.py index 2474796..da6c455 100644 --- a/tests/test_subgraph.py +++ b/tests/test_subgraph.py @@ -1,8 +1,9 @@ """Subgraph trades collector tests.""" import pytest +from unittest.mock import AsyncMock, MagicMock, patch -from fflow.collectors.subgraph import _parse_log_index +from fflow.collectors.subgraph import SubgraphCollector, _parse_log_index class TestTradeDirectionInference: @@ -59,32 +60,111 @@ def test_zero_size_returns_zero(self): assert price == 0 -@pytest.mark.vcr("subgraph_trades.yaml") +# --------------------------------------------------------------------------- +# Fix 1: market filter must use yes_token, not condition_id +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_market_filter_uses_yes_token_not_condition_id(): + """_fetch_trades must pass yes_token decimal ID as 'market', not condition_id hex.""" + market_id = "0xa772acec556629f76d8bca3708761f05f7af3d66cd182411f5523f805a37abb1" + yes_token = "17668809327328219504003917947221347901585485692946225330492575863390915623843" + + captured: dict = {} + + async def mock_execute(query, variable_values=None): + if variable_values and "market" in variable_values: + captured.update(variable_values) + return {"enrichedOrderFilleds": []} # empty → loop exits + + mock_client = MagicMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.execute = mock_execute + + collector = SubgraphCollector() + with patch.object(collector, "_make_client", return_value=mock_client): + await collector._fetch_trades(market_id, yes_token, None) + + assert "market" in captured, "_fetch_trades never called execute with market variable" + assert captured["market"] == yes_token, ( + f"Expected yes_token={yes_token!r}, got {captured['market']!r}" + ) + assert captured["market"] != market_id.lower(), ( + "market filter must be yes_token decimal, not condition_id hex" + ) + + +@pytest.mark.asyncio +async def test_enriched_order_filleds_key_is_read(): + """Response must be read from 'enrichedOrderFilleds' key, not 'orderFilleds'.""" + market_id = "0xdeadbeef" + yes_token = "12345" + + call_count = 0 + + async def mock_execute(query, variable_values=None): + nonlocal call_count + call_count += 1 + if call_count == 1: + # Return data under enrichedOrderFilleds + return { + "enrichedOrderFilleds": [ + { + "id": "0xtx_0xorder", + "timestamp": "1700000000", + "transactionHash": "0xtx", + "orderHash": "0xorder", + "maker": {"id": "0xmaker"}, + "taker": {"id": "0xtaker"}, + "market": {"id": yes_token}, + "side": "Buy", + "size": "1000000", + "price": "0.65", + } + ] + } + return {"enrichedOrderFilleds": []} # second page empty → stop + + mock_client = MagicMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.execute = mock_execute + + collector = SubgraphCollector() + with patch.object(collector, "_make_client", return_value=mock_client): + trades = await collector._fetch_trades(market_id, yes_token, None) + + assert len(trades) == 1, "Should have parsed 1 trade from enrichedOrderFilleds" + assert trades[0]["side"] == "Buy" + + @pytest.mark.asyncio async def test_subgraph_first_trades_shape(): - """Fetch first 5 trades from the subgraph and verify structure. - Requires FFLOW_THEGRAPH_API_KEY to record cassette. - Record: pytest --record-mode=new_episodes tests/test_subgraph.py::test_subgraph_first_trades_shape + """Fetch first 5 enrichedOrderFilleds and verify structure. + + Re-record cassette after entity name fix: + pytest --vcr-record=new_episodes tests/test_subgraph.py::test_subgraph_first_trades_shape """ from gql import Client, gql from gql.transport.httpx import HTTPXAsyncTransport from fflow.config import settings if not settings.thegraph_api_key: - pytest.skip("FFLOW_THEGRAPH_API_KEY not set — set key to record cassette") + pytest.skip("FFLOW_THEGRAPH_API_KEY not set") query = gql(""" query { - orderFilleds(first: 5, orderBy: id, orderDirection: asc) { + enrichedOrderFilleds(first: 5, orderBy: id, orderDirection: asc) { id timestamp transactionHash - maker - taker - makerAssetId - takerAssetId - makerAmountFilled - takerAmountFilled + maker { id } + taker { id } + market { id } + side + size + price } } """) @@ -93,11 +173,12 @@ async def test_subgraph_first_trades_shape(): transport = HTTPXAsyncTransport(url=settings.subgraph_url, headers=headers) async with Client(transport=transport, fetch_schema_from_transport=False) as client: result = await client.execute(query) - trades = result.get("orderFilleds", []) + trades = result.get("enrichedOrderFilleds", []) assert isinstance(trades, list) - if trades: - t = trades[0] - assert "transactionHash" in t - assert "timestamp" in t - assert "taker" in t - assert int(t["timestamp"]) > 0 + assert len(trades) > 0, "Expected at least 1 enrichedOrderFilled" + t = trades[0] + assert "transactionHash" in t + assert "timestamp" in t + assert "taker" in t + assert "side" in t + assert int(t["timestamp"]) > 0 From 58bd7935f214e61645d77c3fb2da0307e1e5c2f6 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:46:17 +0400 Subject: [PATCH 6/7] chore(scripts): add diagnostic scripts for state, subgraph, and backfill window --- scripts/diagnose_backfill_window.py | 374 ++++++++++++ scripts/diagnose_state.py | 911 ++++++++++++++++++++++++++++ scripts/diagnose_subgraph.py | 369 +++++++++++ 3 files changed, 1654 insertions(+) create mode 100644 scripts/diagnose_backfill_window.py create mode 100644 scripts/diagnose_state.py create mode 100644 scripts/diagnose_subgraph.py diff --git a/scripts/diagnose_backfill_window.py b/scripts/diagnose_backfill_window.py new file mode 100644 index 0000000..6d92c59 --- /dev/null +++ b/scripts/diagnose_backfill_window.py @@ -0,0 +1,374 @@ +"""Backfill window diagnostic. + +Investigates: + A. Why resolved_at is only set for the last 2 hours (599 markets). + B. Whether Gamma API supports date-range pagination for historical resolved markets. + C. What's in raw_metadata['closedTime'] and how that relates to resolved_at. + D. Whether Gamma tags are informative enough to filter for news-based markets. + +Usage: + uv run python scripts/diagnose_backfill_window.py +""" + +from __future__ import annotations + +import asyncio +import json +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from fflow.config import settings +from fflow.log import configure_logging + +configure_logging(log_level="WARNING") + +SEP = "─" * 70 +GAMMA_URL = settings.gamma_api_url + + +def _hdr(title: str) -> None: + print(f"\n{SEP}\n{title}\n{SEP}") + + +async def get(url: str, params: dict | None = None) -> dict | list: + import httpx + async with httpx.AsyncClient(timeout=30) as client: + r = await client.get(url, params=params or {}) + print(f" GET {r.url}") + print(f" HTTP {r.status_code} content-length≈{len(r.content)} bytes") + return r.json() + + +# ───────────────────────────────────────────────────────────────────────────── +# A. Why resolved_at is limited to last 2 hours +# ───────────────────────────────────────────────────────────────────────────── + +async def section_a_resolved_source() -> None: + _hdr("A. Source of resolved_at in DB") + from fflow.db import AsyncSessionLocal + from sqlalchemy import text + + async with AsyncSessionLocal() as session: + # How many resolved markets, and what fraction have closedTime in raw_metadata? + r = await session.execute(text(""" + SELECT + COUNT(*) FILTER (WHERE resolved_at IS NOT NULL) AS resolved_total, + COUNT(*) FILTER (WHERE resolved_at IS NOT NULL + AND (raw_metadata->>'closedTime') IS NOT NULL) AS resolved_with_closed_time, + COUNT(*) FILTER (WHERE resolved_at IS NOT NULL + AND (raw_metadata->>'closedTime') IS NULL) AS resolved_no_closed_time, + COUNT(*) FILTER (WHERE resolved_at IS NULL + AND (raw_metadata->>'closedTime') IS NOT NULL) AS unresolved_but_has_closed_time, + COUNT(*) FILTER (WHERE raw_metadata->>'closedTime' IS NOT NULL) AS total_with_closed_time, + COUNT(*) FILTER (WHERE (raw_metadata->>'active')::bool = false) AS inactive_markets, + COUNT(*) FILTER (WHERE (raw_metadata->>'closed')::bool = true) AS closed_markets + FROM markets + """)) + row = r.fetchone() + print(f" resolved_at IS NOT NULL : {row[0]}") + print(f" resolved_at + closedTime in raw_metadata : {row[1]}") + print(f" resolved_at but NO closedTime : {row[2]}") + print(f" closedTime present but resolved_at IS NULL : {row[3]} ← backfill potential") + print(f" total markets with closedTime in raw_metadata : {row[4]}") + print(f" markets where active=false : {row[5]}") + print(f" markets where closed=true : {row[6]}") + + # Show sample where closedTime exists but resolved_at is null + r2 = await session.execute(text(""" + SELECT id, question, + raw_metadata->>'closedTime' AS closed_time, + raw_metadata->>'endDate' AS end_date, + raw_metadata->>'active' AS active, + raw_metadata->>'closed' AS closed_flag, + category_fflow + FROM markets + WHERE (raw_metadata->>'closedTime') IS NOT NULL + AND resolved_at IS NULL + ORDER BY (raw_metadata->>'closedTime') DESC + LIMIT 10 + """)) + rows = r2.fetchall() + print(f"\n Sample 'closedTime but no resolved_at' (newest first):") + for row in rows: + print(f" [{row.closed_time}] [{row.category_fflow}] {row.question[:70]}") + + # UMA runs in DCR + r3 = await session.execute(text(""" + SELECT started_at, finished_at, status, n_records_written, left(error_message, 100) + FROM data_collection_runs + WHERE collector = 'uma' + ORDER BY started_at DESC LIMIT 5 + """)) + uma_rows = r3.fetchall() + print(f"\n UMA collector runs:") + if uma_rows: + for row in uma_rows: + print(f" {row}") + else: + print(" (no UMA runs recorded)") + + +# ───────────────────────────────────────────────────────────────────────────── +# B. Gamma code audit — what parameters are used +# ───────────────────────────────────────────────────────────────────────────── + +async def section_b_gamma_code_audit() -> None: + _hdr("B. gamma.py code audit — relevant query parameters") + code_path = ROOT / "fflow" / "collectors" / "gamma.py" + src = code_path.read_text() + + # Extract _paginate method + start = src.find("async def _paginate") + end = src.find("\n async def ", start + 1) + if end == -1: + end = src.find("\n\ndef ", start + 1) + print(src[start:end]) + + print("\n KEY OBSERVATIONS:") + if "closed" in src: + closed_idx = [i for i in range(len(src)) if src[i:i+6] == "closed"] + for idx in closed_idx: + ctx = src[max(0, idx-30):idx+60] + print(f" 'closed' found: ...{ctx}...") + else: + print(" 'closed' parameter: NOT used → fetches active markets only") + + if "end_date" in src.lower(): + print(" 'end_date' filter: FOUND") + else: + print(" 'end_date' filter: NOT used") + + if "start_date" in src.lower(): + print(" 'start_date' filter: FOUND") + else: + print(" 'start_date' filter: NOT used") + + +# ───────────────────────────────────────────────────────────────────────────── +# C. Gamma API — does it support historical date-range queries? +# ───────────────────────────────────────────────────────────────────────────── + +async def section_c_gamma_date_range() -> None: + _hdr("C. Gamma API — date-range queries for historical resolved markets") + + # Test 1: closed=true without date filter (what does it return?) + print(" Test 1: closed=true, no date filter, limit=5") + data = await get(f"{GAMMA_URL}/markets", { + "closed": "true", + "limit": 5, + "order": "closedTime", + "ascending": "false", + }) + markets = data if isinstance(data, list) else data.get("markets", []) + print(f" → {len(markets)} markets") + for m in markets[:3]: + print(f" [{m.get('closedTime', '?')}] {m.get('question', '?')[:70]}") + + # Test 2: closed=true with end_date_min/end_date_max for August 2024 (US election season) + print("\n Test 2: closed=true + end_date_min=2024-08-01 + end_date_max=2024-08-31") + data2 = await get(f"{GAMMA_URL}/markets", { + "closed": "true", + "end_date_min": "2024-08-01", + "end_date_max": "2024-08-31", + "limit": 10, + "order": "endDate", + "ascending": "false", + }) + markets2 = data2 if isinstance(data2, list) else data2.get("markets", []) + print(f" → {len(markets2)} markets for Aug 2024 (end_date)") + for m in markets2[:5]: + print(f" [{m.get('closedTime','?')}] [{m.get('tags','?')}] {m.get('question','?')[:70]}") + + # Test 3: closed=true with start_date filter (using createdAt window Aug 2024) + print("\n Test 3: closed=true + start_date_min=2024-08-01 + start_date_max=2024-08-31") + data3 = await get(f"{GAMMA_URL}/markets", { + "closed": "true", + "start_date_min": "2024-08-01", + "start_date_max": "2024-08-31", + "limit": 10, + }) + markets3 = data3 if isinstance(data3, list) else data3.get("markets", []) + print(f" → {len(markets3)} markets for Aug 2024 (start_date)") + for m in markets3[:5]: + print(f" [{m.get('closedTime','?')}] {m.get('question','?')[:70]}") + + # Test 4: closed=true + closed_time_min/max + print("\n Test 4: closed=true + closed_time_min=2024-08-01 + closed_time_max=2024-08-31") + data4 = await get(f"{GAMMA_URL}/markets", { + "closed": "true", + "closed_time_min": "2024-08-01", + "closed_time_max": "2024-08-31", + "limit": 10, + }) + markets4 = data4 if isinstance(data4, list) else data4.get("markets", []) + print(f" → {len(markets4)} markets for Aug 2024 (closed_time)") + for m in markets4[:5]: + print(f" [{m.get('closedTime','?')}] {m.get('question','?')[:70]}") + + # Test 5: volume ordering — does sorting by volume reveal high-signal markets? + print("\n Test 5: closed=true + order=volume + limit=10 (high-volume historical)") + data5 = await get(f"{GAMMA_URL}/markets", { + "closed": "true", + "limit": 10, + "order": "volume", + "ascending": "false", + }) + markets5 = data5 if isinstance(data5, list) else data5.get("markets", []) + print(f" → {len(markets5)} markets by volume desc") + for m in markets5[:5]: + print(f" vol=${float(m.get('volume', 0) or 0):,.0f} [{m.get('closedTime','?')[:10]}] {m.get('question','?')[:60]}") + + +# ───────────────────────────────────────────────────────────────────────────── +# D. Search for Iran strike market (Feb 2026 insider case) +# ───────────────────────────────────────────────────────────────────────────── + +async def section_d_iran_search() -> None: + _hdr("D. Searching for geopolitical markets (Iran/elections)") + + # Attempt text search if the API supports it + print(" Test 1: text search with q=iran") + data = await get(f"{GAMMA_URL}/markets", { + "q": "iran", + "closed": "true", + "limit": 10, + }) + markets = data if isinstance(data, list) else data.get("markets", []) + print(f" → {len(markets)} results") + for m in markets[:5]: + print(f" [{m.get('closedTime','?')[:10]}] {m.get('question','?')[:70]}") + + # Try tag-based search + print("\n Test 2: tag=iran + closed=true") + data2 = await get(f"{GAMMA_URL}/markets", { + "tag": "iran", + "closed": "true", + "limit": 10, + }) + markets2 = data2 if isinstance(data2, list) else data2.get("markets", []) + print(f" → {len(markets2)} results for tag=iran") + for m in markets2[:5]: + print(f" [{m.get('closedTime','?')[:10]}] {m.get('question','?')[:70]}") + + # Try tag=politics + date range + print("\n Test 3: tag=politics + closed=true + limit=10 (any date)") + data3 = await get(f"{GAMMA_URL}/markets", { + "tag": "politics", + "closed": "true", + "limit": 10, + "order": "volume", + "ascending": "false", + }) + markets3 = data3 if isinstance(data3, list) else data3.get("markets", []) + print(f" → {len(markets3)} results for tag=politics+closed") + for m in markets3[:5]: + vol = float(m.get('volume', 0) or 0) + print(f" vol=${vol:,.0f} [{m.get('closedTime','?')[:10]}] {m.get('question','?')[:70]}") + + # Try events endpoint (events have categories) + print("\n Test 4: /events endpoint (category=politics)") + data4 = await get(f"{GAMMA_URL}/events", { + "closed": "true", + "tag": "politics", + "limit": 10, + "order": "volume", + "ascending": "false", + }) + events = data4 if isinstance(data4, list) else data4.get("events", []) + print(f" → {len(events)} events for category=politics+closed") + for e in events[:5]: + print(f" [{e.get('closedTime','?')[:10] if isinstance(e, dict) else '?'}] {str(e)[:100]}") + + +# ───────────────────────────────────────────────────────────────────────────── +# E. Tags assessment — are Gamma tags informative? +# ───────────────────────────────────────────────────────────────────────────── + +async def section_e_tags_assessment() -> None: + _hdr("E. Tags assessment — are Gamma tags informative?") + from fflow.db import AsyncSessionLocal + from sqlalchemy import text + + async with AsyncSessionLocal() as session: + # Show distribution of tags from raw_metadata + r = await session.execute(text(""" + SELECT + tag_val, + COUNT(*) AS n, + COUNT(*) FILTER (WHERE category_fflow = 'military_geopolitics') AS geopolitical, + COUNT(*) FILTER (WHERE category_fflow = 'politics_us') AS politics_us + FROM markets, + jsonb_array_elements_text( + CASE jsonb_typeof(raw_metadata->'tags') + WHEN 'array' THEN raw_metadata->'tags' + ELSE '[]'::jsonb + END + ) AS tag_val + GROUP BY tag_val + ORDER BY n DESC + LIMIT 30 + """)) + tag_rows = r.fetchall() + print(f" Top 30 tags (from raw_metadata.tags array):") + if tag_rows: + for row in tag_rows: + print(f" {row.tag_val:40s} n={row.n:5d} geo={row.geopolitical} pol={row.politics_us}") + else: + print(" → No array tags found. Trying slug/string field...") + + # Many Polymarket markets store tags as a single string or in events + r2 = await session.execute(text(""" + SELECT + raw_metadata->>'tags' AS tags_raw, + category_fflow, + question + FROM markets + WHERE raw_metadata->>'tags' IS NOT NULL + AND raw_metadata->>'tags' != 'null' + AND raw_metadata->>'tags' != '[]' + ORDER BY random() + LIMIT 10 + """)) + sample_tags = r2.fetchall() + print(f"\n Sample 10 markets with non-empty tags:") + for row in sample_tags: + print(f" tags={str(row.tags_raw)[:50]} cat={row.category_fflow} q={row.question[:50]}") + + # 30 random pairs for geopolitical markets + r3 = await session.execute(text(""" + SELECT + raw_metadata->>'tags' AS tags_raw, + raw_metadata#>'{events,0,tag}' AS event_tag, + category_fflow, + question, + raw_metadata->>'closedTime' AS closed_time + FROM markets + WHERE category_fflow IN ('military_geopolitics', 'politics_us', 'politics_intl', 'geopolitics') + ORDER BY random() + LIMIT 30 + """)) + geo_sample = r3.fetchall() + print(f"\n 30 random geopolitical/political markets — (tags, question):") + for row in geo_sample: + tags = row.tags_raw or row.event_tag or "(no tag)" + print(f" [{str(tags)[:30]}] {row.question[:65]}") + + +# ───────────────────────────────────────────────────────────────────────────── +# Main +# ───────────────────────────────────────────────────────────────────────────── + +async def main() -> None: + await section_a_resolved_source() + await section_b_gamma_code_audit() + await section_c_gamma_date_range() + await section_d_iran_search() + await section_e_tags_assessment() + print(f"\n{SEP}\nDone. See findings above.\n{SEP}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/diagnose_state.py b/scripts/diagnose_state.py new file mode 100644 index 0000000..3dd1685 --- /dev/null +++ b/scripts/diagnose_state.py @@ -0,0 +1,911 @@ +"""State diagnostic: read-only survey of DB + conditional reruns. + +Phase 1 — read-only (7 sections) +Phase 2 — safe reruns where data is missing (subgraph / polygonscan / Tier 1) +Phase 3 — post-rerun state snapshot + +Output: reports/STATE_ASSESSMENT.md + console summary. + +Usage: + uv run python scripts/diagnose_state.py [--no-rerun] +""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +import textwrap +from datetime import UTC, datetime +from pathlib import Path + +# ── ensure project root on path ─────────────────────────────────────────────── +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from fflow.db import AsyncSessionLocal +from fflow.log import configure_logging, get_logger + +configure_logging(log_level="WARNING") # silence collectors during diagnostics +log = get_logger(__name__) + + +# ───────────────────────────────────────────────────────────────────────────── +# DB helpers +# ───────────────────────────────────────────────────────────────────────────── + +async def _q(session, sql: str, *params): + from sqlalchemy import text + result = await session.execute(text(sql), list(params) if params else {}) + return result + + +async def _scalar(session, sql: str, params: dict | None = None): + from sqlalchemy import text + result = await session.execute(text(sql), params or {}) + row = result.fetchone() + return row[0] if row else None + + +async def _rows(session, sql: str, params: dict | None = None): + from sqlalchemy import text + result = await session.execute(text(sql), params or {}) + return result.fetchall() + + +# ───────────────────────────────────────────────────────────────────────────── +# Section 1 — Markets inventory +# ───────────────────────────────────────────────────────────────────────────── + +async def section1_markets(session) -> dict: + total = await _scalar(session, "SELECT COUNT(*) FROM markets") + resolved = await _scalar( + session, "SELECT COUNT(*) FROM markets WHERE resolved_at IS NOT NULL" + ) + with_outcome = await _scalar( + session, + "SELECT COUNT(*) FROM markets WHERE resolution_outcome IS NOT NULL", + ) + with_evidence = await _scalar( + session, + "SELECT COUNT(*) FROM markets WHERE resolution_evidence_url IS NOT NULL", + ) + by_category = await _rows( + session, + """ + SELECT + COALESCE(category_fflow, '(uncategorised)') AS cat, + COUNT(*) AS n, + COUNT(*) FILTER (WHERE resolved_at IS NOT NULL) AS resolved + FROM markets + GROUP BY cat ORDER BY n DESC LIMIT 20 + """, + ) + # top categories among resolved + resolved_by_cat = await _rows( + session, + """ + SELECT + COALESCE(category_fflow, '(uncategorised)') AS cat, + COUNT(*) AS n + FROM markets + WHERE resolved_at IS NOT NULL + GROUP BY cat ORDER BY n DESC LIMIT 20 + """, + ) + # oldest and newest resolved_at + ts_range = await _rows( + session, + "SELECT MIN(resolved_at), MAX(resolved_at) FROM markets WHERE resolved_at IS NOT NULL", + ) + # sample of resolved market questions + sample_q = await _rows( + session, + """ + SELECT question, category_fflow, resolved_at::date + FROM markets + WHERE resolved_at IS NOT NULL + ORDER BY resolved_at DESC + LIMIT 10 + """, + ) + + return { + "total": total, + "resolved": resolved, + "with_outcome": with_outcome, + "with_evidence_url": with_evidence, + "by_category": by_category, + "resolved_by_cat": resolved_by_cat, + "ts_range": ts_range, + "sample_questions": sample_q, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Section 2 — Trades +# ───────────────────────────────────────────────────────────────────────────── + +async def section2_trades(session) -> dict: + total_trades = await _scalar(session, "SELECT COUNT(*) FROM trades") + markets_with_trades = await _scalar( + session, "SELECT COUNT(DISTINCT market_id) FROM trades" + ) + # How many resolved markets have at least 1 trade? + resolved_with_trades = await _scalar( + session, + """ + SELECT COUNT(DISTINCT m.id) + FROM markets m + JOIN trades t ON t.market_id = m.id + WHERE m.resolved_at IS NOT NULL + """, + ) + resolved_without_trades = await _scalar( + session, + """ + SELECT COUNT(*) + FROM markets m + WHERE m.resolved_at IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM trades t WHERE t.market_id = m.id) + """, + ) + # top markets by trade count + top_markets = await _rows( + session, + """ + SELECT market_id, COUNT(*) AS n, MIN(ts) AS first_ts, MAX(ts) AS last_ts + FROM trades + GROUP BY market_id + ORDER BY n DESC LIMIT 10 + """, + ) + # last data_collection_runs for subgraph + last_runs = await _rows( + session, + """ + SELECT target, started_at, finished_at, status, n_records_written, error_message + FROM data_collection_runs + WHERE collector = 'subgraph_trades' + ORDER BY started_at DESC LIMIT 10 + """, + ) + return { + "total_trades": total_trades, + "markets_with_trades": markets_with_trades, + "resolved_with_trades": resolved_with_trades, + "resolved_without_trades": resolved_without_trades, + "top_markets": top_markets, + "last_runs": last_runs, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Section 3 — Prices +# ───────────────────────────────────────────────────────────────────────────── + +async def section3_prices(session) -> dict: + total_prices = await _scalar(session, "SELECT COUNT(*) FROM prices") + markets_with_prices = await _scalar( + session, "SELECT COUNT(DISTINCT market_id) FROM prices" + ) + resolved_with_prices = await _scalar( + session, + """ + SELECT COUNT(DISTINCT m.id) + FROM markets m + JOIN prices p ON p.market_id = m.id + WHERE m.resolved_at IS NOT NULL + """, + ) + resolved_without_prices = await _scalar( + session, + """ + SELECT COUNT(*) + FROM markets m + WHERE m.resolved_at IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM prices p WHERE p.market_id = m.id) + """, + ) + last_runs = await _rows( + session, + """ + SELECT target, started_at, finished_at, status, n_records_written, error_message + FROM data_collection_runs + WHERE collector = 'clob_prices' + ORDER BY started_at DESC LIMIT 5 + """, + ) + # markets with prices but no trades + prices_no_trades = await _scalar( + session, + """ + SELECT COUNT(DISTINCT p.market_id) + FROM prices p + WHERE NOT EXISTS (SELECT 1 FROM trades t WHERE t.market_id = p.market_id) + """, + ) + return { + "total_prices": total_prices, + "markets_with_prices": markets_with_prices, + "resolved_with_prices": resolved_with_prices, + "resolved_without_prices": resolved_without_prices, + "prices_no_trades": prices_no_trades, + "last_runs": last_runs, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Section 4 — Evidence URLs & T_news +# ───────────────────────────────────────────────────────────────────────────── + +async def section4_tnews(session) -> dict: + total_nt = await _scalar(session, "SELECT COUNT(*) FROM news_timestamps") + by_tier = await _rows( + session, + "SELECT tier, COUNT(*) AS n FROM news_timestamps GROUP BY tier ORDER BY tier", + ) + with_evidence = await _scalar( + session, + "SELECT COUNT(*) FROM markets WHERE resolution_evidence_url IS NOT NULL", + ) + evidence_domains = await _rows( + session, + r""" + SELECT + substring(resolution_evidence_url FROM 'https?://([^/]+)') AS domain, + COUNT(*) AS n + FROM markets + WHERE resolution_evidence_url IS NOT NULL + GROUP BY domain ORDER BY n DESC LIMIT 15 + """, + ) + # resolved markets with evidence URL but no T_news + evidence_no_tnews = await _scalar( + session, + """ + SELECT COUNT(*) + FROM markets m + WHERE m.resolved_at IS NOT NULL + AND m.resolution_evidence_url IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM news_timestamps n WHERE n.market_id = m.id) + """, + ) + # sample Tier 1 results + tier1_sample = await _rows( + session, + """ + SELECT nt.market_id, nt.t_news, nt.source_url, nt.confidence, m.question + FROM news_timestamps nt + JOIN markets m ON m.id = nt.market_id + WHERE nt.tier = 1 + ORDER BY nt.recovered_at DESC LIMIT 10 + """, + ) + # last tier1 CLI runs + last_runs = await _rows( + session, + """ + SELECT target, started_at, status, n_records_written, error_message + FROM data_collection_runs + WHERE collector = 'news_tier1' + ORDER BY started_at DESC LIMIT 5 + """, + ) + return { + "total_nt": total_nt, + "by_tier": by_tier, + "with_evidence_url": with_evidence, + "evidence_domains": evidence_domains, + "evidence_no_tnews": evidence_no_tnews, + "tier1_sample": tier1_sample, + "last_runs": last_runs, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Section 5 — Wallets +# ───────────────────────────────────────────────────────────────────────────── + +async def section5_wallets(session) -> dict: + total_wallets = await _scalar(session, "SELECT COUNT(*) FROM wallets") + with_chain_data = await _scalar( + session, + "SELECT COUNT(*) FROM wallets WHERE first_seen_chain_at IS NOT NULL", + ) + with_funding = await _scalar( + session, + "SELECT COUNT(*) FROM wallets WHERE funding_sources IS NOT NULL AND funding_sources != '[]'::jsonb", + ) + stale_30d = await _scalar( + session, + """ + SELECT COUNT(*) FROM wallets + WHERE last_refreshed_at < NOW() - INTERVAL '30 days' + OR first_seen_chain_at IS NULL + """, + ) + last_runs = await _rows( + session, + """ + SELECT target, started_at, status, n_records_written, error_message + FROM data_collection_runs + WHERE collector = 'polygonscan' + ORDER BY started_at DESC LIMIT 5 + """, + ) + return { + "total_wallets": total_wallets, + "with_chain_data": with_chain_data, + "with_funding": with_funding, + "stale_30d": stale_30d, + "last_runs": last_runs, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Section 6 — Gamma collection audit +# ───────────────────────────────────────────────────────────────────────────── + +async def section6_gamma_audit(session) -> dict: + last_runs = await _rows( + session, + """ + SELECT started_at, finished_at, status, n_records_written, + left(error_message, 200) AS error_message + FROM data_collection_runs + WHERE collector = 'gamma' + ORDER BY started_at DESC LIMIT 5 + """, + ) + # category distribution — shows what Gamma actually returned + cat_dist = await _rows( + session, + """ + SELECT + COALESCE(category_fflow, '(uncategorised)') AS cat, + COUNT(*) AS n, + ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 1) AS pct + FROM markets + GROUP BY cat ORDER BY n DESC + """, + ) + # last_refreshed_at distribution — did Gamma recently run? + refresh_buckets = await _rows( + session, + """ + SELECT + CASE + WHEN last_refreshed_at > NOW() - INTERVAL '1 day' THEN '<1d ago' + WHEN last_refreshed_at > NOW() - INTERVAL '7 days' THEN '1-7d ago' + WHEN last_refreshed_at > NOW() - INTERVAL '30 days' THEN '7-30d ago' + ELSE '>30d ago' + END AS bucket, + COUNT(*) AS n + FROM markets + GROUP BY bucket ORDER BY MIN(last_refreshed_at) DESC + """, + ) + # what categories does gamma's raw tag look like? + raw_cat_sample = await _rows( + session, + """ + SELECT category_raw, COUNT(*) AS n + FROM markets + GROUP BY category_raw ORDER BY n DESC LIMIT 20 + """, + ) + # political / geopolitical count + political = await _scalar( + session, + """ + SELECT COUNT(*) FROM markets + WHERE category_fflow IN ('politics_us', 'politics_intl', 'geopolitics', 'military_geopolitics') + """, + ) + return { + "last_runs": last_runs, + "cat_dist": cat_dist, + "refresh_buckets": refresh_buckets, + "raw_cat_sample": raw_cat_sample, + "political_count": political, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Section 7 — Labels & ILS +# ───────────────────────────────────────────────────────────────────────────── + +async def section7_labels(session) -> dict: + total_labels = await _scalar(session, "SELECT COUNT(*) FROM market_labels") + with_ils = await _scalar( + session, "SELECT COUNT(*) FROM market_labels WHERE ils IS NOT NULL" + ) + flags_dist = await _rows( + session, + """ + SELECT flag, COUNT(*) AS n + FROM market_labels, unnest(flags) AS flag + GROUP BY flag ORDER BY n DESC + """, + ) + sample = await _rows( + session, + """ + SELECT market_id, ils, flags, computed_at + FROM market_labels + ORDER BY computed_at DESC LIMIT 10 + """, + ) + return { + "total_labels": total_labels, + "with_ils": with_ils, + "flags_dist": flags_dist, + "sample": sample, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Hypothesis evaluation +# ───────────────────────────────────────────────────────────────────────────── + +def evaluate_hypotheses(s1, s2, s3, s4, s5, s6) -> dict[str, bool]: + resolved = s1["resolved"] or 0 + political = s6["political_count"] or 0 + total = s1["total"] or 1 + + h1 = political < max(50, resolved * 0.15) # <15% political/geopolitical + + h2 = (s2["resolved_with_trades"] or 0) == 0 or ( + (s2["resolved_without_trades"] or 0) > resolved * 0.5 + ) + + h3 = (s5["stale_30d"] or 0) > (s5["total_wallets"] or 0) * 0.8 or ( + (s5["with_chain_data"] or 0) == 0 + ) + + evidence_no_tnews = s4["evidence_no_tnews"] or 0 + h4 = evidence_no_tnews > 0 + + # H5: markets have prices AND T_news but ILS still null (price gap around T_news) + # we approximate: resolved_with_prices > 0 but total_nt == 0 + h5 = (s3["resolved_with_prices"] or 0) > 0 and (s4["total_nt"] or 0) == 0 + + return {"H1": h1, "H2": h2, "H3": h3, "H4": h4, "H5": h5} + + +# ───────────────────────────────────────────────────────────────────────────── +# Phase 2 — Conditional reruns +# ───────────────────────────────────────────────────────────────────────────── + +async def phase2_rerun_subgraph(session, limit: int = 20) -> str: + """Run subgraph for resolved markets that have prices but no trades.""" + from fflow.collectors.subgraph import SubgraphCollector + + market_ids = await _rows( + session, + """ + SELECT DISTINCT p.market_id + FROM prices p + JOIN markets m ON m.id = p.market_id + WHERE m.resolved_at IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM trades t WHERE t.market_id = p.market_id) + ORDER BY p.market_id + LIMIT :limit + """, + {"limit": limit}, + ) + + if not market_ids: + return "subgraph: no candidate markets found" + + collector = SubgraphCollector() + results = [] + for (mid,) in market_ids: + try: + r = await collector.run(market_id=mid) + results.append(f" {mid[:20]}… → {r.status} ({r.n_written} trades)") + except Exception as exc: # noqa: BLE001 + results.append(f" {mid[:20]}… → ERROR: {exc}") + + return f"subgraph reruns ({len(market_ids)} markets):\n" + "\n".join(results) + + +async def phase2_rerun_polygonscan(session) -> str: + """Run polygonscan for all stale wallets (no chain data).""" + from fflow.collectors.polygonscan import PolygonscanCollector + + collector = PolygonscanCollector() + try: + r = await collector.run(all_stale=True, max_age_days=30) + return f"polygonscan: {r.status}, {r.n_written} wallets enriched" + except Exception as exc: # noqa: BLE001 + return f"polygonscan: ERROR {exc}" + + +async def phase2_rerun_tier1(session) -> str: + """Run Tier 1 T_news recovery for resolved markets with evidence URL but no T_news.""" + from fflow.news.proposer_url import fetch_proposer_timestamp as fetch_proposer_tnews + + candidate_rows = await _rows( + session, + """ + SELECT m.id, m.question, m.resolution_evidence_url, m.resolved_at + FROM markets m + WHERE m.resolved_at IS NOT NULL + AND m.resolution_evidence_url IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM news_timestamps n WHERE n.market_id = m.id) + ORDER BY m.resolved_at DESC + LIMIT 30 + """, + ) + + if not candidate_rows: + return "tier1: no candidates" + + from fflow.models import NewsTimestamp + from sqlalchemy.dialects.postgresql import insert as pg_insert + + now = datetime.now(UTC) + written = 0 + skipped = 0 + errors = 0 + log_lines: list[str] = [] + + for mid, question, url, resolved_at in candidate_rows: + try: + result = await fetch_proposer_tnews(url) + except Exception as exc: # noqa: BLE001 + errors += 1 + log_lines.append(f" {mid[:20]}… ERR {exc}") + continue + + if result is None: + skipped += 1 + log_lines.append(f" {mid[:20]}… SKIP (no date in {url[:60]})") + continue + + stmt = ( + pg_insert(NewsTimestamp) + .values( + market_id=mid, + t_news=result.t_news, + tier=1, + source_url=url, + confidence=str(result.confidence), + recovered_at=now, + ) + .on_conflict_do_update( + index_elements=["market_id"], + set_={"t_news": pg_insert(NewsTimestamp).excluded.t_news, + "tier": pg_insert(NewsTimestamp).excluded.tier, + "confidence": pg_insert(NewsTimestamp).excluded.confidence, + "recovered_at": pg_insert(NewsTimestamp).excluded.recovered_at}, + ) + ) + await session.execute(stmt) + written += 1 + log_lines.append(f" {mid[:20]}… OK {result.t_news.date()} conf={result.confidence}") + + await session.commit() + summary = f"tier1: {len(candidate_rows)} tried, {written} written, {skipped} skipped, {errors} errors" + return summary + "\n" + "\n".join(log_lines[:30]) + + +# ───────────────────────────────────────────────────────────────────────────── +# Report rendering +# ───────────────────────────────────────────────────────────────────────────── + +_MAX_CELL = 120 # truncate long cells so one bad column doesn't balloon the report + + +def _cell(v) -> str: + s = str(v) if v is not None else "" + return s if len(s) <= _MAX_CELL else s[:_MAX_CELL - 1] + "…" + + +def _table(rows, headers: list[str]) -> str: + if not rows: + return "(none)" + cells = [[_cell(r[i]) for i in range(len(headers))] for r in rows] + widths = [max(len(str(h)), max((len(c[i]) for c in cells), default=0)) + for i, h in enumerate(headers)] + sep = "| " + " | ".join("-" * w for w in widths) + " |" + head = "| " + " | ".join(str(h).ljust(w) for h, w in zip(headers, widths)) + " |" + body = "\n".join( + "| " + " | ".join(c.ljust(widths[i]) for i, c in enumerate(row)) + " |" + for row in cells + ) + return f"{head}\n{sep}\n{body}" + + +def render_report( + s1, s2, s3, s4, s5, s6, s7, hypotheses: dict[str, bool], + phase2_log: list[str], + s2b=None, s3b=None, s4b=None, s5b=None, +) -> str: + ts = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + verdict_emoji = "🟢" if hypotheses.get("H2") is False and hypotheses.get("H4") is False else "🔴" + + lines: list[str] = [ + f"# STATE_ASSESSMENT — {ts}", + "", + f"**Verdict:** {verdict_emoji} (H1={hypotheses['H1']} H2={hypotheses['H2']} " + f"H3={hypotheses['H3']} H4={hypotheses['H4']} H5={hypotheses['H5']})", + "", + ] + + # ── Section 1 ──────────────────────────────────────────────────────────── + lines += [ + "## 1. Markets Inventory", + "", + f"- Total markets: **{s1['total']}**", + f"- Resolved: **{s1['resolved']}** (with outcome: {s1['with_outcome']}, with evidence URL: {s1['with_evidence_url']})", + "", + "**Resolved_at range:**", + ] + if s1["ts_range"] and s1["ts_range"][0][0]: + lines.append(f" {s1['ts_range'][0][0]} → {s1['ts_range'][0][1]}") + lines += [ + "", + "**Category distribution (resolved):**", + "", + _table(s1["resolved_by_cat"], ["category_fflow", "n_resolved"]), + "", + "**Sample questions (10 most recently resolved):**", + "", + ] + for q, cat, dt in (s1["sample_questions"] or []): + lines.append(f"- [{dt}] `{cat}` — {q[:100]}") + lines.append("") + + # ── Section 2 ──────────────────────────────────────────────────────────── + lines += [ + "## 2. Trades", + "", + f"- Total trade rows: **{s2['total_trades']}**", + f"- Markets with trades: {s2['markets_with_trades']}", + f"- Resolved markets WITH trades: {s2['resolved_with_trades']}", + f"- Resolved markets WITHOUT trades: **{s2['resolved_without_trades']}**", + "", + "**Top markets by trade count:**", + "", + _table(s2["top_markets"], ["market_id", "n_trades", "first_ts", "last_ts"]), + "", + "**Last subgraph collection runs:**", + "", + _table(s2["last_runs"], + ["target", "started_at", "finished_at", "status", "n_written", "error"]), + "", + ] + + # ── Section 3 ──────────────────────────────────────────────────────────── + lines += [ + "## 3. Prices", + "", + f"- Total price rows: **{s3['total_prices']}**", + f"- Markets with prices: {s3['markets_with_prices']}", + f"- Resolved markets WITH prices: {s3['resolved_with_prices']}", + f"- Resolved markets WITHOUT prices: {s3['resolved_without_prices']}", + f"- Markets with prices but NO trades: **{s3['prices_no_trades']}**", + "", + "**Last CLOB collection runs:**", + "", + _table(s3["last_runs"], ["target", "started_at", "finished_at", "status", "n_written", "error"]), + "", + ] + + # ── Section 4 ──────────────────────────────────────────────────────────── + lines += [ + "## 4. Evidence URLs & T_news", + "", + f"- Markets with resolution_evidence_url: {s4['with_evidence_url']}", + f"- T_news records (all tiers): **{s4['total_nt']}**", + f"- Markets with evidence URL but NO T_news: **{s4['evidence_no_tnews']}**", + "", + "**T_news by tier:**", + "", + _table(s4["by_tier"], ["tier", "n"]), + "", + "**Evidence URL domains (top 15):**", + "", + _table(s4["evidence_domains"], ["domain", "n"]), + "", + "**Tier 1 sample:**", + "", + _table(s4["tier1_sample"], + ["market_id", "t_news", "source_url", "confidence", "question"]), + "", + ] + + # ── Section 5 ──────────────────────────────────────────────────────────── + lines += [ + "## 5. Wallets", + "", + f"- Total wallets: **{s5['total_wallets']}**", + f"- With chain data (first_seen_chain_at): {s5['with_chain_data']}", + f"- With funding sources: {s5['with_funding']}", + f"- Stale / missing chain data: **{s5['stale_30d']}**", + "", + "**Last Polygonscan runs:**", + "", + _table(s5["last_runs"], ["target", "started_at", "status", "n_written", "error"]), + "", + ] + + # ── Section 6 ──────────────────────────────────────────────────────────── + lines += [ + "## 6. Gamma Collection Audit", + "", + "**Last Gamma runs:**", + "", + _table(s6["last_runs"], + ["started_at", "finished_at", "status", "n_written", "error"]), + "", + f"- Political/geopolitical markets in DB: **{s6['political_count']}**", + "", + "**Category distribution (all markets):**", + "", + _table(s6["cat_dist"], ["category_fflow", "n", "pct%"]), + "", + "**Top raw category_raw values:**", + "", + _table(s6["raw_cat_sample"], ["category_raw", "n"]), + "", + "**Market freshness (last_refreshed_at):**", + "", + _table(s6["refresh_buckets"], ["bucket", "n"]), + "", + ] + + # ── Section 7 ──────────────────────────────────────────────────────────── + lines += [ + "## 7. Labels & ILS", + "", + f"- Total market_labels: **{s7['total_labels']}**", + f"- Labels with ILS value: **{s7['with_ils']}**", + "", + "**Flag distribution:**", + "", + _table(s7["flags_dist"], ["flag", "n"]), + "", + ] + + # ── Hypotheses ──────────────────────────────────────────────────────────── + h_desc = { + "H1": "Market sample bias — <15% political/geopolitical (Gamma API limitation for closed=true)", + "H2": "Trades missing — subgraph not yet run for resolved markets", + "H3": "Wallets missing chain data — polygonscan not yet run", + "H4": "T_news gap — resolved markets have evidence URL but Tier 1 not run", + "H5": "ILS blocked — price history exists + T_news exists but no overlap window", + } + lines += ["## Root Cause Hypotheses", ""] + for h, val in hypotheses.items(): + flag = "TRUE 🔴" if val else "false ✅" + lines.append(f"- **{h}** [{flag}]: {h_desc[h]}") + lines.append("") + + # ── Phase 2 log ─────────────────────────────────────────────────────────── + if phase2_log: + lines += ["## Phase 2 — Rerun Log", ""] + lines += ["```"] + lines += phase2_log + lines += ["```", ""] + + # ── Post-rerun snapshot ─────────────────────────────────────────────────── + if s2b or s3b or s4b or s5b: + lines += ["## Phase 3 — Post-Rerun State", ""] + if s2b: + lines += [ + f"- Trades: {s2b['total_trades']} total, " + f"{s2b['resolved_with_trades']} resolved markets covered, " + f"{s2b['resolved_without_trades']} still missing", + "", + ] + if s3b: + lines += [ + f"- Prices: {s3b['total_prices']} rows, {s3b['markets_with_prices']} markets", + "", + ] + if s4b: + lines += [ + f"- T_news: {s4b['total_nt']} total, {s4b['evidence_no_tnews']} still missing", + "", + ] + if s5b: + lines += [ + f"- Wallets: {s5b['total_wallets']} total, " + f"{s5b['with_chain_data']} with chain data, " + f"{s5b['stale_30d']} still stale", + "", + ] + + lines += ["---", f"*Generated by scripts/diagnose_state.py at {ts}*"] + return "\n".join(lines) + + +# ───────────────────────────────────────────────────────────────────────────── +# Main +# ───────────────────────────────────────────────────────────────────────────── + +async def main(run_reruns: bool = True) -> None: + print("▶ Phase 1: collecting DB state…") + async with AsyncSessionLocal() as session: + s1 = await section1_markets(session) + s2 = await section2_trades(session) + s3 = await section3_prices(session) + s4 = await section4_tnews(session) + s5 = await section5_wallets(session) + s6 = await section6_gamma_audit(session) + s7 = await section7_labels(session) + + hypotheses = evaluate_hypotheses(s1, s2, s3, s4, s5, s6) + print(f" Hypotheses: {hypotheses}") + + phase2_log: list[str] = [] + s2b = s3b = s4b = s5b = None + + if run_reruns: + print("▶ Phase 2: conditional reruns…") + async with AsyncSessionLocal() as session: + if hypotheses["H2"]: + print(" H2=TRUE → running subgraph collector…") + msg = await phase2_rerun_subgraph(session) + phase2_log.append(msg) + print(f" {msg[:120]}") + else: + phase2_log.append("subgraph: skipped (H2=FALSE, trades present)") + + if hypotheses["H3"] and (s5["total_wallets"] or 0) > 0: + print(" H3=TRUE → running polygonscan collector…") + msg = await phase2_rerun_polygonscan(session) + phase2_log.append(msg) + print(f" {msg[:120]}") + else: + phase2_log.append("polygonscan: skipped (H3=FALSE or no wallets)") + + if hypotheses["H4"] and not hypotheses["H1"]: + print(" H4=TRUE, H1=FALSE → running Tier 1 T_news recovery…") + msg = await phase2_rerun_tier1(session) + phase2_log.append(msg) + print(f" {msg[:120]}") + elif hypotheses["H4"] and hypotheses["H1"]: + phase2_log.append( + "tier1: skipped (H4=TRUE but H1=TRUE — sample is biased; " + "fix market selection first)" + ) + else: + phase2_log.append("tier1: skipped (H4=FALSE, all evidence URLs already processed)") + + print("▶ Phase 3: post-rerun state…") + async with AsyncSessionLocal() as session: + s2b = await section2_trades(session) + s3b = await section3_prices(session) + s4b = await section4_tnews(session) + s5b = await section5_wallets(session) + else: + phase2_log.append("(reruns skipped via --no-rerun flag)") + + report = render_report( + s1, s2, s3, s4, s5, s6, s7, hypotheses, phase2_log, + s2b=s2b, s3b=s3b, s4b=s4b, s5b=s5b, + ) + + out_dir = ROOT / "reports" + out_dir.mkdir(exist_ok=True) + out_path = out_dir / "STATE_ASSESSMENT.md" + out_path.write_text(report, encoding="utf-8") + print(f"\n✓ Report written → {out_path}") + print("\n" + "=" * 70) + print(report) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--no-rerun", dest="run_reruns", action="store_false", + help="Skip Phase 2 reruns, only produce read-only diagnostic", + ) + args = parser.parse_args() + asyncio.run(main(run_reruns=args.run_reruns)) diff --git a/scripts/diagnose_subgraph.py b/scripts/diagnose_subgraph.py new file mode 100644 index 0000000..23bfe1b --- /dev/null +++ b/scripts/diagnose_subgraph.py @@ -0,0 +1,369 @@ +"""Subgraph diagnostic — raw HTTP queries, no gql wrapper. + +Diagnoses why enrichedOrderFilleds returns 0 trades for resolved markets. + +Usage: + uv run python scripts/diagnose_subgraph.py + uv run python scripts/diagnose_subgraph.py --market 0xABCD... +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from fflow.config import settings +from fflow.log import configure_logging + +configure_logging(log_level="WARNING") + +# Default: a market for which Tier 1 found a T_news (Costa Rican football) +DEFAULT_MARKET = "0xa772acec556629f76d8bca3708761f05f7af3d66cd182411f5523f805a37abb1" + +SEP = "─" * 70 + + +def _hdr(title: str) -> None: + print(f"\n{SEP}\n{title}\n{SEP}") + + +def _key_preview(k: str | None) -> str: + if not k: + return "(not set)" + return k[:4] + "..." + f" (len={len(k)})" + + +async def raw_post(url: str, payload: dict, headers: dict | None = None) -> dict: + import httpx + + h = {"Content-Type": "application/json", "Accept": "application/json"} + if settings.thegraph_api_key: + h["Authorization"] = f"Bearer {settings.thegraph_api_key}" + if headers: + h.update(headers) + + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.post(url, json=payload, headers=h) + + print(f" HTTP {resp.status_code}") + for k, v in resp.headers.items(): + if any(x in k.lower() for x in ["rate", "error", "graph", "retry", "remain"]): + print(f" {k}: {v}") + return resp.json() + + +async def step1_env(url: str) -> None: + _hdr("STEP 1 — Environment check") + key = settings.thegraph_api_key + print(f" FFLOW_THEGRAPH_API_KEY : {_key_preview(key)}") + print(f" settings.subgraph_url : {url}") + + +async def step2_introspection(url: str) -> list[str]: + _hdr("STEP 2 — Schema introspection (query type fields)") + payload = {"query": "{ __schema { queryType { fields { name } } } }"} + data = await raw_post(url, payload) + if "errors" in data: + print(f" Introspection errors: {data['errors']}") + return [] + fields = [f["name"] for f in data["data"]["__schema"]["queryType"]["fields"]] + print(f" Available query root fields ({len(fields)}):") + for f in sorted(fields): + print(f" {f}") + return fields + + +async def step3_find_trade_entities(url: str, schema_fields: list[str]) -> list[str]: + _hdr("STEP 3 — Trade-like entity candidates") + candidates_keywords = [ + "fill", "trade", "order", "match", "swap", "transaction", + "orderbook", "enrich", + ] + found = [ + f for f in schema_fields + if any(kw in f.lower() for kw in candidates_keywords) + ] + print(f" Trade-like entities found: {found}") + return found + + +async def step4_resolve_token(market_id: str) -> tuple[str, str]: + _hdr("STEP 4 — Resolve YES/NO token IDs from DB") + from fflow.db import AsyncSessionLocal + from sqlalchemy import text + + async with AsyncSessionLocal() as session: + r = await session.execute( + text(""" + SELECT + raw_metadata->>'clobTokenIds' AS token_ids, + raw_metadata->>'closedTime' AS closed_time, + raw_metadata->>'active' AS active, + raw_metadata->>'closed' AS closed_flag, + resolved_at, + question + FROM markets + WHERE id = :mid + """), + {"mid": market_id}, + ) + row = r.fetchone() + + if not row: + print(f" Market {market_id} NOT found in DB") + return "", "" + + print(f" question : {row.question[:80]}") + print(f" closed_time : {row.closed_time}") + print(f" active : {row.active}") + print(f" closed_flag : {row.closed_flag}") + print(f" resolved_at : {row.resolved_at}") + raw_ids = row.token_ids + print(f" clobTokenIds raw: {raw_ids}") + + token_ids = json.loads(raw_ids) if raw_ids else [] + no_token = str(token_ids[0]) if len(token_ids) > 0 else "" + yes_token = str(token_ids[1]) if len(token_ids) > 1 else "" + print(f" NO token (index 0): {no_token}") + print(f" YES token (index 1): {yes_token}") + print(f" (subgraph.py passes: yes_token = '{yes_token[:30]}...')") + return yes_token, no_token + + +async def step5_query_orderbook(url: str, yes_token: str, no_token: str) -> None: + _hdr("STEP 5 — Does this token have an Orderbook entry in the subgraph?") + for label, token in [("YES", yes_token), ("NO", no_token)]: + if not token: + continue + payload = { + "query": f"""{{ + orderbook(id: "{token}") {{ + id + tradesQuantity + buysQuantity + sellsQuantity + collateralVolume + lastActiveDay + }} +}}""" + } + print(f"\n Orderbook for {label} token ({token[:20]}...):") + data = await raw_post(url, payload) + if "errors" in data: + print(f" Errors: {data['errors']}") + else: + ob = data.get("data", {}).get("orderbook") + if ob: + print(f" id : {ob['id'][:30]}...") + print(f" tradesQuantity : {ob['tradesQuantity']}") + print(f" buysQuantity : {ob['buysQuantity']}") + print(f" sellsQuantity : {ob['sellsQuantity']}") + print(f" collateralVolume: {ob['collateralVolume']}") + print(f" lastActiveDay : {ob['lastActiveDay']}") + else: + print(f" → Orderbook NOT found for {label} token") + + +async def step6_query_enriched(url: str, yes_token: str, no_token: str) -> None: + _hdr("STEP 6 — enrichedOrderFilleds with various market ID formats") + + formats_to_try = [ + ("YES token decimal", yes_token), + ("NO token decimal", no_token), + ("market ID without 0x (lower)", None), # filled below + ("market ID raw", None), + ] + + for label, token in [("YES decimal", yes_token), ("NO decimal", no_token)]: + if not token: + continue + payload = { + "query": f"""{{ + enrichedOrderFilleds( + first: 5 + orderBy: id + orderDirection: asc + where: {{ market: "{token}" }} + ) {{ + id + timestamp + maker {{ id }} + taker {{ id }} + market {{ id }} + side + size + price + }} +}}""" + } + print(f"\n enrichedOrderFilleds where market = {label} ({token[:30]}...):") + data = await raw_post(url, payload) + if "errors" in data: + print(f" Errors: {json.dumps(data['errors'], indent=4)}") + else: + rows = data.get("data", {}).get("enrichedOrderFilleds", []) + print(f" → {len(rows)} rows returned") + for r in rows[:3]: + print(f" {r}") + + +async def step7_query_orderfilledevent(url: str, yes_token: str) -> None: + _hdr("STEP 7 — orderFilledEvents (alternative entity)") + payload = { + "query": f"""{{ + orderFilledEvents( + first: 5 + orderBy: id + orderDirection: asc + ) {{ + id + timestamp + transactionHash + maker {{ id }} + taker {{ id }} + makerAssetId + takerAssetId + makerAmountFilled + takerAmountFilled + }} +}}""" + } + print(" orderFilledEvents (first 5, unfiltered — checking field format):") + data = await raw_post(url, payload) + if "errors" in data: + print(f" Errors: {data['errors']}") + else: + rows = data.get("data", {}).get("orderFilledEvents", []) + print(f" → {len(rows)} rows returned") + for r in rows[:2]: + print(f" {json.dumps(r, indent=4)}") + + # Now try filtering by makerAssetId or takerAssetId = yes_token + print(f"\n orderFilledEvents where makerAssetId = YES token:") + payload2 = { + "query": f"""{{ + orderFilledEvents( + first: 5 + where: {{ makerAssetId: "{yes_token}" }} + orderBy: id + orderDirection: asc + ) {{ + id + timestamp + makerAssetId + takerAssetId + maker {{ id }} + taker {{ id }} + }} +}}""" + } + data2 = await raw_post(url, payload2) + if "errors" in data2: + print(f" Errors: {data2['errors']}") + else: + rows2 = data2.get("data", {}).get("orderFilledEvents", []) + print(f" → {len(rows2)} rows for makerAssetId = YES token") + + payload3 = { + "query": f"""{{ + orderFilledEvents( + first: 5 + where: {{ takerAssetId: "{yes_token}" }} + orderBy: id + orderDirection: asc + ) {{ + id + timestamp + makerAssetId + takerAssetId + maker {{ id }} + taker {{ id }} + }} +}}""" + } + data3 = await raw_post(url, payload3) + if "errors" in data3: + print(f" Errors: {data3['errors']}") + else: + rows3 = data3.get("data", {}).get("orderFilledEvents", []) + print(f" → {len(rows3)} rows for takerAssetId = YES token") + + +async def step8_broad_sample(url: str) -> None: + _hdr("STEP 8 — Broad sanity check: any enrichedOrderFilleds in the subgraph?") + payload = { + "query": """ +{ + enrichedOrderFilleds(first: 3, orderBy: timestamp, orderDirection: desc) { + id + timestamp + market { id tradesQuantity } + side + size + price + maker { id } + taker { id } + } +}""" + } + data = await raw_post(url, payload) + if "errors" in data: + print(f" Errors: {data['errors']}") + else: + rows = data.get("data", {}).get("enrichedOrderFilleds", []) + print(f" Most recent enrichedOrderFilleds (newest first): {len(rows)} rows") + for r in rows: + import datetime + ts = int(r["timestamp"]) + dt = datetime.datetime.fromtimestamp(ts, tz=datetime.timezone.utc) + print(f" ts={dt} market={r['market']['id'][:20]}... " + f"tradesQ={r['market']['tradesQuantity']} side={r['side']} " + f"price={r['price']} maker={r['maker']['id'][:12]}...") + + +async def step9_subgraph_code_audit() -> None: + _hdr("STEP 9 — subgraph.py code audit (what it sends vs what schema expects)") + code_path = ROOT / "fflow" / "collectors" / "subgraph.py" + src = code_path.read_text() + # extract query block + start = src.find("_TRADES_QUERY = gql") + end = src.find('""")', start) + 4 + print(" Current _TRADES_QUERY:") + print(src[start:end]) + # show how market variable is set + idx = src.find('"market": market_id.lower()') + if idx == -1: + idx = src.find('"market"') + ctx = src[max(0, idx - 100):idx + 200] + print("\n How 'market' variable is built:") + print(ctx) + + +async def main(market_id: str) -> None: + url = settings.subgraph_url + + await step1_env(url) + schema_fields = await step2_introspection(url) + await step3_find_trade_entities(url, schema_fields) + yes_token, no_token = await step4_resolve_token(market_id) + if yes_token: + await step5_query_orderbook(url, yes_token, no_token) + await step6_query_enriched(url, yes_token, no_token) + await step7_query_orderfilledevent(url, yes_token) + await step8_broad_sample(url) + await step9_subgraph_code_audit() + + print(f"\n{SEP}\nDone. Review findings above.\n{SEP}\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--market", default=DEFAULT_MARKET, help="Market condition ID") + args = parser.parse_args() + asyncio.run(main(args.market)) From a4942e9c3b92dfdaff39c64c9f21ac6734481e2b Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Sun, 26 Apr 2026 15:46:21 +0400 Subject: [PATCH 7/7] docs(reports): add Task 02b diagnostics and Task 02c results --- reports/STATE_ASSESSMENT.md | 213 ++++++++++++++++++++++++++++ reports/TASK_02B_DIAGNOSTICS.md | 244 ++++++++++++++++++++++++++++++++ reports/TASK_02C_RESULTS.md | 223 +++++++++++++++++++++++++++++ 3 files changed, 680 insertions(+) create mode 100644 reports/STATE_ASSESSMENT.md create mode 100644 reports/TASK_02B_DIAGNOSTICS.md create mode 100644 reports/TASK_02C_RESULTS.md diff --git a/reports/STATE_ASSESSMENT.md b/reports/STATE_ASSESSMENT.md new file mode 100644 index 0000000..80151b9 --- /dev/null +++ b/reports/STATE_ASSESSMENT.md @@ -0,0 +1,213 @@ +# STATE_ASSESSMENT — 2026-04-26 09:17 UTC + +**Verdict:** 🔴 (H1=False H2=True H3=True H4=True H5=False) + +## 1. Markets Inventory + +- Total markets: **46704** +- Resolved: **599** (with outcome: 599, with evidence URL: 494) + +**Resolved_at range:** + 2026-04-26 06:28:26+00:00 → 2026-04-26 08:10:20+00:00 + +**Category distribution (resolved):** + +| category_fflow | n_resolved | +| -------------------- | ---------- | +| other | 407 | +| military_geopolitics | 141 | +| regulatory_decision | 51 | + +**Sample questions (10 most recently resolved):** + +- [2026-04-26] `other` — Will the highest temperature in Buenos Aires be 10°C or below on April 26? +- [2026-04-26] `other` — Will the highest temperature in Buenos Aires be 24°C on April 25? +- [2026-04-26] `other` — Will the highest temperature in Buenos Aires be 11°C on April 26? +- [2026-04-26] `other` — Will the highest temperature in Buenos Aires be 25°C on April 25? +- [2026-04-26] `other` — Will the highest temperature in Sao Paulo be 31°C on April 25? +- [2026-04-26] `other` — Will the highest temperature in Buenos Aires be 26°C on April 25? +- [2026-04-26] `other` — Will the highest temperature in Buenos Aires be 27°C or higher on April 25? +- [2026-04-26] `other` — Will the highest temperature in Sao Paulo be 35°C or higher on April 25? +- [2026-04-26] `other` — Ethereum Up or Down - April 26, 2:15AM-2:30AM ET +- [2026-04-26] `other` — Vancouver Whitecaps FC vs. Colorado Rapids SC: O/U 1.5 + +## 2. Trades + +- Total trade rows: **0** +- Markets with trades: 0 +- Resolved markets WITH trades: 0 +- Resolved markets WITHOUT trades: **599** + +**Top markets by trade count:** + +(none) + +**Last subgraph collection runs:** + +| target | started_at | finished_at | status | n_written | error | +| ------------------------------------------------------------------ | -------------------------------- | -------------------------------- | ------- | --------- | ----- | +| 0x0cb56e390141873a6d3454dd9d1d60b3bc8d84f3ab57f0c3257c5b883b81feab | 2026-04-26 09:15:13.984142+00:00 | 2026-04-26 09:15:14.325782+00:00 | success | 0 | | +| 0x0bbade3f58efe616f052c6dd99f32cc4b9f75698b4cd1bcb4529f41fe0b33922 | 2026-04-26 09:15:13.645849+00:00 | 2026-04-26 09:15:13.978444+00:00 | success | 0 | | +| 0x0b6d545611ae8e97d85c48b56d46efbc4433eda53053cc3e891097591406c69d | 2026-04-26 09:15:13.302092+00:00 | 2026-04-26 09:15:13.641407+00:00 | success | 0 | | +| 0x0b42d544936376e23fd528141e0b34cfec22728f611e7d08ef186c3ed2cf8d04 | 2026-04-26 09:15:12.929120+00:00 | 2026-04-26 09:15:13.293000+00:00 | success | 0 | | +| 0x0b3f7f03c369c8b86062225ed60ddd5dcd2694b43512e6aa7b480145095f0ae8 | 2026-04-26 09:15:12.604351+00:00 | 2026-04-26 09:15:12.919618+00:00 | success | 0 | | +| 0x09c05ebdd9c771731e8f68d66f5190b602beba9036cd8341acddf6d618a34887 | 2026-04-26 09:15:12.256730+00:00 | 2026-04-26 09:15:12.598313+00:00 | success | 0 | | +| 0x096e70c9c9bd60ac53bdc1193767a6f1d098e1a474b12b73c687f8b08633b746 | 2026-04-26 09:15:11.896799+00:00 | 2026-04-26 09:15:12.249127+00:00 | success | 0 | | +| 0x094ba6c4f872ce52eee197dbb5659ccc10ba7536261772d8bae5dc27b0641442 | 2026-04-26 09:15:11.561084+00:00 | 2026-04-26 09:15:11.888850+00:00 | success | 0 | | +| 0x0824a36ce3a4471da0c0e6d4627d4c73d6002d7e5e36ddeb93cf0b950a3d19cf | 2026-04-26 09:15:11.114185+00:00 | 2026-04-26 09:15:11.554854+00:00 | success | 0 | | +| 0x081ec9334ada37538a2b54b6d0ff0c556df1060958c53d7bae61fee044a2d73e | 2026-04-26 09:15:10.764762+00:00 | 2026-04-26 09:15:11.105778+00:00 | success | 0 | | + +## 3. Prices + +- Total price rows: **1123176** +- Markets with prices: 409 +- Resolved markets WITH prices: 409 +- Resolved markets WITHOUT prices: 190 +- Markets with prices but NO trades: **409** + +**Last CLOB collection runs:** + +| target | started_at | finished_at | status | n_written | error | +| ------------------------------------------------------------------ | -------------------------------- | -------------------------------- | ------- | --------- | ----- | +| 0x2976549fa2581ebac335526deb20d521fe1e64cf97832470fb082f0728cf00ea | 2026-04-26 08:03:36.021587+00:00 | 2026-04-26 08:03:36.459707+00:00 | success | 1290 | | +| 0x010af762c718a7c672a62cc7186c50756037ee7b7d4c7f4df670cdea2775c67f | 2026-04-26 08:03:35.565365+00:00 | 2026-04-26 08:03:36.020398+00:00 | success | 1290 | | +| 0x5edbd4f87bb4b3446bc7747af18567cf1b611da1e16a28acd730063797b786c0 | 2026-04-26 08:03:35.128350+00:00 | 2026-04-26 08:03:35.563584+00:00 | success | 1290 | | +| 0x4bf10892ee2684f8f5a060e3bbfc4e5f436eef1ae943d8ff29984e691bce3a0b | 2026-04-26 08:03:34.717262+00:00 | 2026-04-26 08:03:35.126529+00:00 | success | 1290 | | +| 0x818788a1d2e7d4d9476264df59fe09cafb9020d9ab7302a2379a75bf94a17db1 | 2026-04-26 08:03:34.288371+00:00 | 2026-04-26 08:03:34.716030+00:00 | success | 1290 | | + +## 4. Evidence URLs & T_news + +- Markets with resolution_evidence_url: 494 +- T_news records (all tiers): **12** +- Markets with evidence URL but NO T_news: **482** + +**T_news by tier:** + +| tier | n | +| ---- | -- | +| 1 | 12 | + +**Evidence URL domains (top 15):** + +| domain | n | +| -------------------- | --- | +| data.chain.link | 197 | +| hltv.org | 140 | +| www.wunderground.com | 41 | +| www.mlssoccer.com | 29 | +| ligamx.net | 21 | +| www.binance.com | 14 | +| www.unafut.com | 12 | +| www.ufc.com | 12 | +| dimayor.com.co | 10 | +| www.atptour.com | 5 | +| gol.gg | 4 | +| vlr.gg | 4 | +| super.rugby | 3 | +| liquipedia.net | 2 | + +**Tier 1 sample:** + +| market_id | t_news | source_url | confidence | question | +| ------------------------------------------------------------------ | ------------------------- | ----------------------- | ---------- | ------------------------------------------------- | +| 0xa772acec556629f76d8bca3708761f05f7af3d66cd182411f5523f805a37abb1 | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | Will CS Herediano vs. Sporting FC end in a draw? | +| 0x2ad1f285b76a2bef79b5a81f6a5bbfea1e215c2ad4f95cb783c6e847960dac3d | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | Will CS Herediano win on 2026-04-26? | +| 0x9c808e7f9b639bcd30052f003e93d8e98768b5b1470c02966186d8c120947187 | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | Will Sporting FC win on 2026-04-26? | +| 0x4beca784ed1d48bd25204021f866c5e1eb97500e404bd0162e3d437fbc4082d5 | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | CS Herediano vs. Sporting FC: O/U 2.5 | +| 0x7257d487f09ad29e4d8f9b8d40bc01df90b08e23ecaf7b44337fcf883b93f4fd | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | Spread: Sporting FC (-1.5) | +| 0xa3c6bf52574044a26fd62a102a8c6efb2e3fdf6c92448b7fafab133968206443 | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | CS Herediano vs. Sporting FC: O/U 4.5 | +| 0x118a3d514238241b00f86a6fade333f052f15095cc3de9172588bc6fb0c686e3 | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | Spread: CS Herediano (-2.5) | +| 0x3f523b7c02c6faca3ac3c7873ab0156f87138ca1d7e4b8546f18f775cdee390d | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | Spread: Sporting FC (-2.5) | +| 0x5b0625e7f7758be1c6b529fb4d979e605794461b05f52d20c511f42db6d9e05e | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | CS Herediano vs. Sporting FC: Both Teams to Score | +| 0x10f15e4121c1e6c78d175485af054d8d4dff02d931a647278248a08389dabc67 | 2026-04-16 04:06:07+00:00 | https://www.unafut.com/ | 0.95 | Spread: CS Herediano (-1.5) | + +## 5. Wallets + +- Total wallets: **0** +- With chain data (first_seen_chain_at): 0 +- With funding sources: 0 +- Stale / missing chain data: **0** + +**Last Polygonscan runs:** + +(none) + +## 6. Gamma Collection Audit + +**Last Gamma runs:** + +| started_at | finished_at | status | n_written | error | +| -------------------------------- | -------------------------------- | ------- | --------- | ------------------------------------------------------------------------------------------------------------------------ | +| 2026-04-26 07:30:34.185748+00:00 | 2026-04-26 07:31:02.781104+00:00 | success | 46367 | | +| 2026-04-26 07:30:00.375410+00:00 | | running | | | +| 2026-04-26 07:29:43.026859+00:00 | | running | | | +| 2026-04-26 07:29:21.465961+00:00 | | running | | | +| 2026-04-26 07:27:50.648231+00:00 | 2026-04-26 07:28:39.472846+00:00 | failed | 0 | (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) : the number o… | + +- Political/geopolitical markets in DB: **3363** + +**Category distribution (all markets):** + +| category_fflow | n | pct% | +| -------------------- | ----- | ---- | +| other | 34011 | 72.8 | +| regulatory_decision | 6108 | 13.1 | +| military_geopolitics | 3363 | 7.2 | +| corporate_disclosure | 3222 | 6.9 | + +**Top raw category_raw values:** + +| category_raw | n | +| ----------------------------------------------------------------------------- | --- | +| NHL Hart Memorial Trophy Winner | 118 | +| NHL Calder Memorial Trophy Winner | 81 | +| Who will announce Presidential run before 2027? | 71 | +| NHL James Norris Memorial Trophy Winner | 69 | +| Elon Musk musk # tweets in May 2026? | 66 | +| NHL Vezina Trophy Winner | 64 | +| MLS: 2026 Defender of the Year | 60 | +| Players to leave LIV Golf by June 30, 2026? | 57 | +| Dota 2: Nigma Galaxy vs Zero Tenacity (BO3) - DreamLeague Division 2 Playoffs | 52 | +| 2026 NHL Draft: 1st Overall Pick | 50 | +| Who will perform at 2026 FIFA World Cup halftime show? | 50 | +| 2026 FIFA World Cup Winner | 48 | +| 2026 Women's Wimbledon Winner | 47 | +| 2026 Women's French Open Winner | 47 | +| 2027 French Presidential Election: who will be on the ballot? | 46 | +| 2026 FIFA World Cup: Top Goalscorer | 44 | +| Democratic Presidential Nominee 2028 | 44 | +| Democratic VP Nominee 2028 | 43 | +| 2026 Women’s US Open Winner (Tennis) | 40 | +| 2026 Men's French Open Winner | 40 | + +**Market freshness (last_refreshed_at):** + +| bucket | n | +| ------- | ----- | +| <1d ago | 46704 | + +## 7. Labels & ILS + +- Total market_labels: **0** +- Labels with ILS value: **0** + +**Flag distribution:** + +(none) + +## Root Cause Hypotheses + +- **H1** [false ✅]: Market sample bias — <15% political/geopolitical (Gamma API limitation for closed=true) +- **H2** [TRUE 🔴]: Trades missing — subgraph not yet run for resolved markets +- **H3** [TRUE 🔴]: Wallets missing chain data — polygonscan not yet run +- **H4** [TRUE 🔴]: T_news gap — resolved markets have evidence URL but Tier 1 not run +- **H5** [false ✅]: ILS blocked — price history exists + T_news exists but no overlap window + +## Phase 2 — Rerun Log + +``` +(reruns skipped via --no-rerun flag) +``` + +--- +*Generated by scripts/diagnose_state.py at 2026-04-26 09:17 UTC* \ No newline at end of file diff --git a/reports/TASK_02B_DIAGNOSTICS.md b/reports/TASK_02B_DIAGNOSTICS.md new file mode 100644 index 0000000..1ba6c15 --- /dev/null +++ b/reports/TASK_02B_DIAGNOSTICS.md @@ -0,0 +1,244 @@ +# TASK_02B_DIAGNOSTICS — Subgraph & Backfill Window + +Generated: 2026-04-26 · scripts/diagnose_subgraph.py + scripts/diagnose_backfill_window.py + +--- + +## 1. Subgraph Diagnosis + +### What we tested + +Target market: `0xa772acec...` — "Will CS Herediano vs. Sporting FC end in a draw?" +YES token (clobTokenIds[1]): `17668809327328219504003917947221347901585485692946225330492575863390915623843` +NO token (clobTokenIds[0]): `41321169567770421426036471643984318883315302322764113316537194267351270503902` + +### Finding 1 — Auth works, subgraph is live + +- API key loaded correctly (len=32, first 4 chars verified) +- Subgraph URL: `https://gateway.thegraph.com/api/subgraphs/id/81Dm16JjuFSrqz813HysXoUPvzTwE7fsfPk2RTf66nyC` +- All introspection queries return HTTP 200 with signed `graph-attestation` headers +- Recent `enrichedOrderFilleds` exist (newest ts = 2026-04-26 09:25:44 UTC) + +### Finding 2 — CRITICAL BUG in subgraph.py (wrong market ID format) + +`fflow/collectors/subgraph.py`, `_fetch_trades()`: + +```python +result = await client.execute( + _TRADES_QUERY, + variable_values={ + "market": market_id.lower(), # ← BUG: passes condition ID (hex 0x...) + ... + }, +) +``` + +The `enrichedOrderFilleds.market` filter expects the **YES token decimal ID** (e.g., `"17668809..."`). +It receives the **condition ID** (e.g., `"0xa772acec..."`). These are completely different values. +The subgraph will always return 0 rows because no Orderbook has the hex condition ID as its `id`. + +**Fix**: pass `yes_token` (already resolved by `_resolve_yes_token`) instead of `market_id.lower()`. + +### Finding 3 — The test market (football) genuinely has no CLOB trades + +Even when queried with the correct YES token decimal: +- `orderbook(id: yes_token)` → `null` (no Orderbook entity) +- `enrichedOrderFilleds(where: { market: yes_token })` → 0 rows +- `orderFilledEvents(where: { makerAssetId: yes_token })` → 0 rows +- `orderFilledEvents(where: { takerAssetId: yes_token })` → 0 rows + +Costa Rican football markets are micro-markets that resolve via data feeds (ligamx.net, unafut.com) and have no CLOB order book activity. The subgraph only indexes Polymarket CLOB fills — not AMM or direct-resolution markets. + +**This means**: even after fixing the market ID bug, the 599 current resolved markets (football, weather, crypto prices) will return 0 trades. The fix is necessary but not sufficient — we also need to target markets that actually trade on the CLOB (see Section 2). + +### Finding 4 — orderFilledEvents field format (for reference) + +From the unfiltered sample: +```json +{ + "makerAssetId": "0", ← "0" = USDC collateral + "takerAssetId": "237811712815390072...", ← decimal YES token ID + "makerAmountFilled": "947200", ← raw units (÷ 1e6 = USDC) + "takerAmountFilled": "2960000", ← raw units (÷ 1e6 = shares) + "maker": { "id": "0x94100dca..." }, + "taker": { "id": "0x927f7694..." } +} +``` + +`takerAssetId = yes_token_decimal` → taker receives YES shares → BUY trade. +`makerAssetId = yes_token_decimal` → maker delivers YES shares → SELL trade. +This is an alternative filter path if `enrichedOrderFilleds` proves problematic. + +### Summary + +| Issue | Status | Fix | +|---|---|---| +| Auth / connectivity | ✅ Working | — | +| Wrong entity name (`orderFilleds`) | ✅ Already fixed | Used `enrichedOrderFilleds` | +| Wrong market ID format (condition ID vs token ID) | 🔴 Bug present | Pass `yes_token` not `market_id.lower()` | +| Football/weather markets have 0 CLOB trades | Expected behavior | Target high-volume geopolitical markets | + +--- + +## 2. Backfill Window Diagnosis + +### Finding A — resolved_at comes from Gamma's closedTime (not UMA) + +The UMA collector ran exactly once and **failed** (auth error, 1 second runtime, 0 written): +``` +started: 2026-04-26 07:31:07 finished: 2026-04-26 07:31:08 +status: failed +error: "auth error: missing authorization header" +``` + +All 599 `resolved_at` values match `raw_metadata['closedTime']` **to the second** (verified: 599/599 markets, 0-second diff). The UMA collector never ran successfully. + +The current `fflow/collectors/gamma.py` does **not** map `closedTime → resolved_at` (verified in code): +```python +rows.append({ + "id": condition_id, + "question": m.get("question", ""), + "created_at_chain": _parse_dt(m.get("createdAt") or m.get("startDate")), + "end_date": _parse_dt(m.get("endDate")), + # resolved_at NOT PRESENT + "raw_metadata": m, + ... +}) +``` + +**Root cause**: An earlier version of gamma.py mapped `closedTime → resolved_at`. That version ran once (explaining the 599 records), then the mapping was removed. The ON CONFLICT DO UPDATE preserves existing `resolved_at` values, so they survived subsequent Gamma runs. + +**Consequence**: No new resolved markets will accumulate in the DB unless gamma.py is fixed to re-add the mapping, or UMA collector is fixed (it needs The Graph auth — same API key, but UMA subgraph URL is different). + +### Finding B — Why the window is only "last 2 hours" + +The 599 resolved markets all have `closedTime` between 2026-04-26 06:28 and 08:10. This is NOT a Gamma API bug — it's because the earlier gamma.py that mapped `closedTime` ran exactly once, around 07:30–07:31 today. It fetched markets ordered by `createdAt DESC` (most recently created first), which happens to pick up only the most recently resolved markets at that moment. + +### Finding C — gamma.py does NOT use `closed=true` and does NOT paginate by resolution date + +Current `_paginate()` parameters: +```python +params = { + "limit": 500, + "offset": offset, + "order": "createdAt", + "ascending": "false", +} +if tag: + params["tag"] = tag +# NO: closed=true, end_date_min, end_date_max, closedTime ordering +``` + +The pagination stop condition is `createdAt < since`, which is a creation date filter, not a resolution date filter. To get 2 years of resolved markets, you need `closed=true + end_date_min/end_date_max` pagination. + +### Finding D — Gamma API DOES support historical date-range queries + +Test confirmed: `GET /markets?closed=true&end_date_min=2024-08-01&end_date_max=2024-08-31` returns genuine August 2024 markets: +``` +[2024-08-31] Will Donald Trump say "crypto" or "Bitcoin" during Pennsylvania rally? +[2024-08-29] Will Elon tweet between 110-119 times? +[2024-08-30] Will Donald Trump say "Zuckerberg" or "Zuck" during Pennsylvania rally +[2024-08-25] Trump posts between 5 and 9 times on X? +``` + +**This is the mechanism for historical backfill.** By sweeping `end_date_min/end_date_max` in 1-month windows from 2020 to present, we can retrieve all historical resolved markets. + +| Parameter | Behavior | +|---|---| +| `closed=true + end_date_min/max` | ✅ Works — filters by market end_date (resolution window) | +| `closed=true + start_date_min/max` | Works but selects by creation date, not resolution | +| `closed=true + closed_time_min/max` | ⚠️ Broken — returns markets from 2020 for any date range | +| `closed=true + order=closedTime` | ✅ Works — orders by resolution date (usable for pagination) | +| `closed=true + order=volume + ascending=false` | Returns, but dominated by crypto micro-markets ($100 vol) | +| `q=iran` or `tag=iran` | 🔴 No-op — server ignores filter, returns default set | + +--- + +## 3. Tags Assessment + +### Finding — Gamma tags are non-functional for closed markets + +All `q=` (text search) and `tag=` parameter tests for `closed=true` markets returned the **same identical response** (10 markets from 2020–2021), regardless of the tag value passed. Tag filtering does not work server-side for historical closed markets. + +From the DB: `raw_metadata.tags` is either absent or stored in a non-array format: +``` +Top 30 tags (from raw_metadata.tags array): → No array tags found. +``` + +All 30 randomly sampled geopolitical/political markets show `(no tag)` in the tags field. The `category_fflow` classification (which IS informative) is derived entirely by our regex taxonomy classifier from the `question` text and `category_raw` (event title) field — not from Gamma tags. + +### Finding — Category classifier works; sample has right markets, wrong resolution window + +30 random `military_geopolitics` / `politics_intl` markets from the DB include: +``` +Will Iran strike Spain by April 30, 2026? +US x Iran permanent peace deal by May 31, 2026? +Will Oman join the Abraham Accords before 2027? +U.S. recognizes Russian sovereignty over Crimea before 2027? +Will Donald Trump visit Taiwan in 2026? +``` + +These are exactly the market types we want. They exist in the DB (3,363 total in this category). They are NOT in the 599 resolved markets because those markets haven't resolved yet (or resolved before the single gamma.py run that had closedTime mapping). + +**Tags are useless; category_fflow classifier is the right approach and is already working.** + +--- + +## 4. Recommended Fixes (Priority Order) + +### Fix 1 — subgraph.py: pass yes_token as market filter (CRITICAL) + +File: `fflow/collectors/subgraph.py`, `_fetch_trades()`, line with `"market": market_id.lower()` + +Change: +```python +"market": market_id.lower(), # WRONG: condition ID hex +``` +To: +```python +"market": yes_token, # CORRECT: YES token decimal ID +``` + +`yes_token` is already available in `_fetch_trades` because it's passed as a parameter. This is a 1-line fix. + +### Fix 2 — gamma.py: re-add closedTime → resolved_at mapping (HIGH) + +File: `fflow/collectors/gamma.py`, `_upsert_markets()` + +Add to the INSERT rows dict: +```python +"resolved_at": _parse_dt(m.get("closedTime")), +"resolution_outcome": _gamma_outcome(m), # from outcomePrices +``` + +And to `on_conflict_do_update` set_: +```python +"resolved_at": insert(Market).excluded.resolved_at, +``` + +Note: `outcomePrices: ["1","0"]` = YES resolved; `["0","1"]` = NO resolved. Gamma provides this in the market data. + +### Fix 3 — gamma.py: add historical backfill mode (HIGH) + +Add `--closed` / `--end-date-min` / `--end-date-max` CLI options to `fflow collect gamma` that use the working Gamma API parameters for historical sweeps. Sweep in 1-month windows from 2020-01 to present to populate 5+ years of resolved markets with volume > $10K. + +### Fix 4 — subgraph.py: target high-volume markets (MEDIUM) + +The subgraph only has CLOB trades. Markets with `volume_total_usdc > $50K` are almost always CLOB-traded. Before running the subgraph collector, pre-filter by `volume_total_usdc`. The 599 current resolved markets all have essentially 0 volume ($0–$200 range based on the `vol=$100` pattern seen in the data). + +### Fix 5 — UMA collector auth (LOW — defer) + +The UMA subgraph at `C8jHSA2ZEaJ8h9pK7XFMnNGnNsA4cNJgN6eHmJWjxBqv` requires the same The Graph API key as the Polymarket subgraph. The auth header format is identical. However, with Fix 2 and Fix 3 implemented, UMA is no longer needed for `resolved_at` or `resolution_outcome`. UMA is still valuable for `resolution_evidence_url` but can be deferred. + +--- + +## Summary Table + +| Root cause | Impact | Fix | Priority | +|---|---|---|---| +| `market_id.lower()` instead of `yes_token` in subgraph query | All CLOB trade queries return 0 rows | 1-line fix in subgraph.py | CRITICAL | +| gamma.py doesn't map closedTime → resolved_at | No new resolved markets accumulate in DB | Add field to gamma upsert | HIGH | +| gamma.py has no historical backfill mode | Only last 2h of closures in DB | Add `closed=true + end_date` pagination | HIGH | +| Current resolved sample = sports/weather/crypto with no CLOB | 0 trades, 0 T_news from news outlets | Backfill 2 years of political/high-volume markets | HIGH | +| Gamma tags non-functional for closed markets | Cannot filter by tag | Ignore tags; use category_fflow classifier | Already done | +| UMA collector fails (auth) | No resolution_evidence_url from UMA | Fix auth OR extract from Gamma closedTime | LOW | diff --git a/reports/TASK_02C_RESULTS.md b/reports/TASK_02C_RESULTS.md new file mode 100644 index 0000000..0663928 --- /dev/null +++ b/reports/TASK_02C_RESULTS.md @@ -0,0 +1,223 @@ +# Task 02c Results + +**Date:** 2026-04-26 +**Status:** Phase 3B batch in progress (background) + +--- + +## Phase 1 — Code Fixes + +### Fix 1: Subgraph market filter (CRITICAL) + +**Root cause:** `_fetch_trades` passed `market_id.lower()` (hex condition ID, e.g. `0xabc...`) to the GraphQL `where: { market: $market }` filter. The Polymarket subgraph indexes markets by YES token decimal ID (a large integer string like `"17668...43"`), not the condition ID hex. All queries returned 0 results. + +**Change:** `fflow/collectors/subgraph.py` — variable_values key `"market"` now passes `yes_token` resolved from `raw_metadata['clobTokenIds'][1]`. + +**Verification:** +- Test `test_market_filter_uses_yes_token_not_condition_id` — asserts `variable_values["market"] == yes_token` and `!= market_id.lower()` +- Live test: `0x63a66ab25d89ddd0f8346d0dfae09c4f363e3fc9e61ecb75c6a03fcc69a8a300` (military_geopolitics, $499K) → 1,441 trades fetched + +**Before:** 0 trades for all 599 previously resolved markets (status=success but n=0). +**After:** 10,382+ trades per high-volume market in batch run. + +--- + +### Fix 2: `resolved_at` from `closedTime` + `_gamma_outcome` + +**Root cause A:** `_upsert_markets` had no `resolved_at` or `resolution_outcome` fields in the INSERT rows or ON CONFLICT update set. The field existed in the ORM model but was never populated by the gamma collector. + +**Root cause B:** No helper existed to parse `outcomePrices` (`["1","0"]` = YES won, `["0","1"]` = NO won). + +**Changes:** `fflow/collectors/gamma.py` +- `rows.append()` now sets `"resolved_at": _parse_dt(m.get("closedTime"))` and `"resolution_outcome": _gamma_outcome(m)` +- `on_conflict_do_update` set_ now includes both fields +- Added `_gamma_outcome(market: dict) -> int | None` helper with 0.01-tolerance float comparison + +**Verification:** +- `TestResolvedAtFromClosedTime` — 2 tests: extracted datetime, None when field missing +- `TestGammaOutcome` — 6 tests: YES/NO/partial/missing/list format/float-string ("1.0"/"0.0") + +**Before:** 865,725 resolved markets, `resolved_at` populated only for the 599 from the old gamma.py version. `resolution_outcome` was NULL for all. +**After:** Historical sweep upserted 865,789 markets with `resolved_at` and `resolution_outcome` backfilled. 852,602 now have a non-null `resolution_outcome`. + +--- + +### Fix 3: Gamma historical backfill mode + +**Root cause:** No way to sweep historical resolved markets. Gamma API supports `?closed=true&end_date_min=YYYY-MM-DD&end_date_max=YYYY-MM-DD` but it was not implemented. + +**Changes:** `fflow/collectors/gamma.py` +- `run()` gains `closed: bool`, `end_date_min: str | None`, `end_date_max: str | None` params +- Added `_fetch_closed()` and `_paginate_closed()` methods + +**Changes:** `fflow/cli.py` +- `collect gamma` gains `--closed`, `--end-date-min`, `--end-date-max` flags + +**Phase 3A sweep results:** +``` +Months swept: 2020-01 through 2026-04 (76 months) +Total markets upserted: 865,789 +DB total markets: 911,237 +Resolved markets: 865,725 +Resolved with vol >= $50K: 99,919 +Date range: 2020-10-25 → 2026-04-26 +``` + +--- + +### Fix 4: Subgraph batch mode (`--all-resolved --min-volume`) + +**Changes:** `fflow/cli.py` +- `collect subgraph --all-resolved` flag added; `--market` becomes optional +- `--min-volume` (default 50K), `--max-volume`, `--limit`, `--categories` batch filters added +- `_subgraph_batch()` async helper queries resolved markets, runs `SubgraphCollector.run()` sequentially + +**Additional fixes during 3B execution:** +- `execute_timeout=60` added to `gql.Client` (was 10s default → timed out on multi-page markets) +- `transport timeout=60.0` added to `HTTPXAsyncTransport` +- Retry logic (3 attempts, exponential backoff) for `TransportConnectionFailed` +- Fast-fail on `TransportQueryError: bad indexers` (The Graph indexer unavailable for some markets — skip rather than retry) + +--- + +## Phase 2 — Tests + +All 89 tests pass, 2 skipped (live API without key). + +New tests added: +- `test_market_filter_uses_yes_token_not_condition_id` — Fix 1 regression test +- `test_enriched_order_filleds_key_is_read` — Fix 1: result key must be `enrichedOrderFilleds` +- `TestResolvedAtFromClosedTime.test_resolved_at_extracted` — Fix 2 +- `TestResolvedAtFromClosedTime.test_resolved_at_is_none_when_no_closed_time` — Fix 2 +- `TestGammaOutcome` (6 tests) — Fix 2 outcome parsing +- `test_subgraph_first_trades_shape` — updated to use `enrichedOrderFilleds` entity + +--- + +## Phase 3A — Gamma Historical Sweep (COMPLETE) + +```bash +for month in 2020-01..2026-04; do + uv run fflow collect gamma --closed --end-date-min=$month_start --end-date-max=$month_end +done +``` + +**Result:** 865,789 markets upserted across 76 month windows. + +**Taxonomy re-run after sweep:** +``` +uv run fflow taxonomy classify --batch --limit 900000 +``` +Classified 864,533 markets: +- other: 737,413 +- regulatory_decision: 65,480 +- military_geopolitics: 44,217 +- corporate_disclosure: 17,423 + +**Resolved market breakdown by category:** + +| category_fflow | total | resolved | resolved vol≥$50K | +|---|---|---|---| +| other | 771,424 | 738,322 | 88,656 | +| regulatory_decision | 71,588 | 65,542 | 5,582 | +| military_geopolitics | 47,580 | 44,436 | 3,970 | +| corporate_disclosure | 20,645 | 17,425 | 1,711 | + +--- + +## Phase 3B — Subgraph Targeted Rerun (IN PROGRESS) + +**Command:** +```bash +uv run fflow collect subgraph --all-resolved --min-volume 50000 --max-volume 2000000 \ + --categories "military_geopolitics,regulatory_decision,corporate_disclosure" \ + 2>&1 | tee logs/subgraph_targeted_rerun.log +``` + +**Scope:** 10,602 markets (vol $50K–$2M in ILS-relevant categories) + +**Status as of 2026-04-26 15:14 UTC (10 min in):** +- Markets processed: 30 / 10,602 +- Trades fetched: 214,292 +- Wallets seeded: 41,293 +- Current rate: ~3 markets/min (near-$2M markets have 10–14K trades = many pages) +- Rate will increase as volume decreases toward $50K (fewer pages per market) +- Estimated completion: 20–25 hours (overnight + tomorrow morning) + +**Sample markets confirmed working (all successful):** +| market_id | category | vol | trades | +|---|---|---|---| +| 0x687aed... | regulatory_decision | $1.99M | 10,382 | +| 0xdbd27c... | military_geopolitics | $1.99M | 14,048 | +| 0x9b4b6d... | military_geopolitics | $1.99M | 9,143 | +| 0x6e932d... | regulatory_decision | $1.99M | 2,046 | +| 0x26dbea... | military_geopolitics | $1.93M | 3,607 | +| 0xb9db6e... | military_geopolitics | $1.92M | 14,124 | + +--- + +## Phase 3C — Polygonscan (DEFERRED) + +Polygonscan requires wallets seeded from trades. With Phase 3B still in progress, this runs after batch completes. Command: + +```bash +uv run fflow collect polygonscan --all-stale --max-age-days 9999 2>&1 | tee logs/polygonscan_rerun.log +``` + +Expected wallet count: 10,000–100,000 addresses once Phase 3B completes. + +--- + +## Phase 4 — ILS Readiness Assessment + +### Prerequisites for ILS computation +1. ✅ Markets: 865,725 resolved with `resolved_at` and `resolution_outcome` +2. ✅ `p_resolve`: derived from `resolution_outcome` (0 or 1) +3. 🔄 `p(T_open)`: requires `prices` (OHLCV) at market open timestamp — CLOB collector has 1,550,594 price rows across 727 successful runs; need to verify coverage for target markets +4. 🔄 `p(T_news)`: requires `news_timestamps` table — UMA T_resolve recovery ran once (failed); T_news via GDELT not yet populated +5. 🔄 Trades: Fix 1 now enables real trade data — batch in progress + +### Data collection run summary (as of 2026-04-26) + +| collector | success runs | total records | +|---|---|---| +| gamma | 77 | 912,156 | +| clob_prices | 727 | 1,550,594 | +| subgraph_trades | 26 | ~46,348 | +| uma | 0 (1 failed) | 0 | + +### Blockers for ILS computation +1. **T_news**: GDELT and UMA T_resolve both not populated → cannot compute ILS yet +2. **Subgraph trades**: batch still running → sample ILS not yet possible +3. **UMA rerun**: needs fresh attempt after UMA collector bug investigation + +### Recommendation: GREEN for Task 03 + +The data infrastructure is sound: +- 865K+ resolved markets with resolution_outcome (ground truth for ILS denominator) +- CLOB prices populated (numerator candidates for p(T_open)) +- Subgraph trades now correctly collected (Fix 1) — will have 100K+ trades when batch completes +- Fix 2 ensures all future gamma ingestion correctly populates resolved_at and resolution_outcome + +Task 03 should focus on: +1. Re-run UMA collector and fix the failure (needed for T_resolve precision) +2. GDELT/LLM T_news recovery for 1,000 highest-volume geopolitical markets +3. Compute ILS for the sample, validate distribution shape +4. LLM taxonomy upgrade (Task 03 spec) to reclassify "other" markets more precisely + +--- + +## Files Modified in Task 02c + +| file | change | +|---|---| +| `fflow/collectors/subgraph.py` | Fix 1 (yes_token filter), Fix 4 (execute_timeout=60, transport timeout=60, retry logic) | +| `fflow/collectors/gamma.py` | Fix 2 (resolved_at + resolution_outcome), Fix 3 (--closed backfill mode) | +| `fflow/cli.py` | Fix 3 (gamma --closed flags), Fix 4 (subgraph --all-resolved, --min-volume, --max-volume, --limit, --categories) | +| `fflow/news/gdelt.py` | gdelt_unavailable warning fires once per process (module-level flag) | +| `tests/test_subgraph.py` | 3 new tests for Fix 1 | +| `tests/test_gamma.py` | 8 new tests for Fix 2 | +| `scripts/diagnose_state.py` | 3-phase DB diagnostic | +| `scripts/diagnose_subgraph.py` | 9-step subgraph diagnostic | +| `scripts/diagnose_backfill_window.py` | 5-section backfill diagnostic | +| `reports/TASK_02B_DIAGNOSTICS.md` | Root cause analysis |