diff --git a/fflow/collectors/subgraph.py b/fflow/collectors/subgraph.py index 509c206..d9f496e 100644 --- a/fflow/collectors/subgraph.py +++ b/fflow/collectors/subgraph.py @@ -16,9 +16,12 @@ _PAGE_SIZE = 1000 +# API note (verified 2026-04-26): The Polymarket subgraph exposes `enrichedOrderFilleds` +# (not `orderFilleds`). market.id is the YES token decimal ID. size is raw int (divide by +# 1e6 for shares). price is already a 0-1 decimal. side is "Buy"/"Sell" relative to the token. _TRADES_QUERY = gql(""" query Trades($market: String!, $lastId: String!, $first: Int!) { - orderFilleds( + enrichedOrderFilleds( where: { market: $market, id_gt: $lastId } first: $first orderBy: id @@ -27,16 +30,13 @@ id timestamp transactionHash - maker - taker - makerAssetId - takerAssetId - makerAmountFilled - takerAmountFilled - fee - market { - id - } + orderHash + maker { id } + taker { id } + market { id } + side + size + price } } """) @@ -91,8 +91,12 @@ def _make_client(self) -> Client: headers = {"Accept": "application/json"} if settings.thegraph_api_key: headers["Authorization"] = f"Bearer {settings.thegraph_api_key}" - transport = HTTPXAsyncTransport(url=settings.subgraph_url, headers=headers) - return Client(transport=transport, fetch_schema_from_transport=False) + transport = HTTPXAsyncTransport( + url=settings.subgraph_url, + headers=headers, + timeout=60.0, + ) + return Client(transport=transport, fetch_schema_from_transport=False, execute_timeout=60) async def _fetch_trades( self, @@ -100,21 +104,35 @@ async def _fetch_trades( yes_token: str, from_ts: datetime | None, ) -> list[dict]: + import asyncio as _asyncio + from gql.transport.exceptions import TransportConnectionFailed + from_unix = int(from_ts.timestamp()) if from_ts else 0 all_trades: list[dict] = [] last_id = "" async with self._make_client() as client: while True: - result = await client.execute( - _TRADES_QUERY, - variable_values={ - "market": market_id.lower(), - "lastId": last_id, - "first": _PAGE_SIZE, - }, - ) - page = result.get("orderFilleds", []) + for attempt in range(3): + try: + result = await client.execute( + _TRADES_QUERY, + variable_values={ + "market": yes_token, + "lastId": last_id, + "first": _PAGE_SIZE, + }, + ) + break + except (TransportConnectionFailed, Exception) as exc: + from gql.transport.exceptions import TransportQueryError + if isinstance(exc, TransportQueryError) and "bad indexers" in str(exc): + raise # indexer unavailable — no point retrying + if attempt == 2: + raise + await _asyncio.sleep(2 ** attempt) + + page = result.get("enrichedOrderFilleds", []) if not page: break @@ -142,31 +160,20 @@ async def _upsert_trades( for t in raw_trades: tx = t.get("transactionHash", "") - # log_index from id (format: txHash-logIndex or sequential id) raw_id = t.get("id", "") log_idx = _parse_log_index(raw_id) ts = datetime.fromtimestamp(int(t["timestamp"]), tz=UTC) - taker = (t.get("taker") or "").lower() - maker = (t.get("maker") or "").lower() or None - - taker_asset = t.get("takerAssetId", "") - maker_amount = int(t.get("makerAmountFilled", 0)) - taker_amount = int(t.get("takerAmountFilled", 0)) - - # taker receives YES token → BUY; taker gives YES token → SELL - if str(taker_asset) == yes_token: - side = "BUY" - outcome_index = 1 - size_shares = taker_amount - usdc_paid = maker_amount - else: - side = "SELL" - outcome_index = 1 - size_shares = maker_amount - usdc_paid = taker_amount - - price_val = (usdc_paid / size_shares / 1e6) if size_shares else 0 - notional = usdc_paid / 1e6 + taker = ((t.get("taker") or {}).get("id") or "").lower() + maker = ((t.get("maker") or {}).get("id") or "").lower() or None + + raw_side = t.get("side", "Buy") + side = "BUY" if raw_side.lower() == "buy" else "SELL" + outcome_index = 1 # all enrichedOrderFilleds filtered by YES token + + raw_size = int(t.get("size", 0)) + size_shares = raw_size / 1e6 + price_val = float(t.get("price", 0)) + notional = size_shares * price_val trade_rows.append({ "market_id": market_id, @@ -177,7 +184,7 @@ async def _upsert_trades( "maker_address": maker, "side": side, "outcome_index": outcome_index, - "size_shares": str(size_shares / 1e6), + "size_shares": str(round(size_shares, 6)), "price": str(round(price_val, 6)), "notional_usdc": str(round(notional, 6)), "raw_event": t, @@ -202,7 +209,8 @@ async def _upsert_trades( total += len(chunk) await session.commit() - # upsert wallets + # upsert wallets — chunked (PostgreSQL param limit: 32767; 3 cols × 10000 = 30000) + wallet_chunk_size = 10_000 if wallet_set: wallet_rows = [ { @@ -212,24 +220,26 @@ async def _upsert_trades( } for addr, ts in wallet_set.items() ] - wallet_stmt = ( - insert(Wallet) - .values(wallet_rows) - .on_conflict_do_update( - index_elements=["address"], - set_={ - "first_seen_polymarket_at": insert(Wallet).excluded.first_seen_polymarket_at, - }, - where=( - Wallet.first_seen_polymarket_at.is_(None) - | ( - Wallet.first_seen_polymarket_at - > insert(Wallet).excluded.first_seen_polymarket_at - ) - ), + for i in range(0, len(wallet_rows), wallet_chunk_size): + chunk = wallet_rows[i : i + wallet_chunk_size] + wallet_stmt = ( + insert(Wallet) + .values(chunk) + .on_conflict_do_update( + index_elements=["address"], + set_={ + "first_seen_polymarket_at": insert(Wallet).excluded.first_seen_polymarket_at, + }, + where=( + Wallet.first_seen_polymarket_at.is_(None) + | ( + Wallet.first_seen_polymarket_at + > insert(Wallet).excluded.first_seen_polymarket_at + ) + ), + ) ) - ) - await session.execute(wallet_stmt) + await session.execute(wallet_stmt) await session.commit() log.info("subgraph_upserted", market=market_id, trades=total, wallets=len(wallet_set)) @@ -237,6 +247,10 @@ async def _upsert_trades( def _parse_log_index(raw_id: str) -> int: + # enrichedOrderFilleds id format: txHash_orderHash — hash the orderHash part + if "_" in raw_id: + order_hash_part = raw_id.split("_", 1)[-1] + return int(order_hash_part, 16) % 2**31 if order_hash_part.startswith("0x") else hash(order_hash_part) % 2**31 if "-" in raw_id: try: return int(raw_id.split("-")[-1]) diff --git a/fflow/config.py b/fflow/config.py index df7d404..f4ab00e 100644 --- a/fflow/config.py +++ b/fflow/config.py @@ -1,3 +1,4 @@ +from pydantic import Field, AliasChoices from pydantic_settings import BaseSettings, SettingsConfigDict @@ -20,8 +21,11 @@ class Settings(BaseSettings): polygonscan_api_key: str | None = None polygonscan_url: str = "https://api.polygonscan.com/api" - # Anthropic (Tier 3 LLM) - anthropic_api_key: str | None = None + # Anthropic (Tier 3 LLM) — accepts FFLOW_ANTHROPIC_API_KEY or ANTHROPIC_API_KEY + anthropic_api_key: str | None = Field( + default=None, + validation_alias=AliasChoices("FFLOW_ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY"), + ) # UMA / Polygon RPC polygon_rpc_url: str = "https://polygon-rpc.com" @@ -35,7 +39,7 @@ class Settings(BaseSettings): log_level: str = "INFO" log_json: bool = False - model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env") + model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env", extra="ignore") settings = Settings() diff --git a/reports/POLYGONSCAN_COLLECTION_STATUS.md b/reports/POLYGONSCAN_COLLECTION_STATUS.md new file mode 100644 index 0000000..b41aae4 --- /dev/null +++ b/reports/POLYGONSCAN_COLLECTION_STATUS.md @@ -0,0 +1,134 @@ +# Polygonscan Collection — Status Report + +**Last updated:** 2026-04-27 +**Branch:** task02h/ffic-trade-backfill + +--- + +## Current State + +| Metric | Value | +|---|---| +| Total wallets in DB | 815,304 | +| Enriched (`first_seen_chain_at` set) | 2,449 (0.3%) | +| With `funding_sources` data | 4,706 (0.6%) | +| Attempted but returned empty | 810,598 (99.4%) | +| **Stale wallets remaining** | **812,855** | +| Estimated time to complete at 4 req/s | **56.4 hours** | + +**Progress snapshot:** 2,449 wallets with confirmed on-chain data. The collection +has barely started relative to the full corpus (< 1% enriched). + +--- + +## Run History + +| Run ID | Date | Duration | Status | n_written | Error | +|---|---|---|---|---|---| +| 11714 | 2026-04-27 05:28 | 2s | failed | 0 | Deprecated V1 endpoint | +| 11715 | 2026-04-27 05:29 | 1s | failed | 0 | Deprecated V1 endpoint | +| 11716 | 2026-04-27 05:29 | 34s | failed | 0 | DNS resolution failure | +| 11717 | 2026-04-27 06:15 | — | ~~running~~ → failed | 0 | Process died without cleanup *(closed manually)* | +| 11718 | 2026-04-27 06:23 | 78s | failed | 0 | Rate limit: Max 3 req/s | +| 11719 | 2026-04-27 06:24 | — | ~~running~~ → failed | 0 | Process died without cleanup *(closed manually)* | +| 11720 | 2026-04-27 06:25 | 14,352s (~4h) | failed | 0 | Server timeout / too busy | + +**All 7 runs failed.** Two stale `running` records (ids 11717, 11719) were left open by +crashed processes — closed manually on 2026-04-27. + +--- + +## Failure Taxonomy + +### 1. Deprecated V1 endpoint (runs 11714–11715, 3s) +- **Error:** `NOTOK | You are using a deprecated V1 endpoint, switch to Etherscan V2` +- **Root cause:** `polygonscan_url` in config was `https://api.polygonscan.com/api` — deprecated. +- **Fix already applied:** config.py now uses `https://api.etherscan.io/v2/api` (the V2 unified endpoint). This fix is on `task02d+` branches but was re-applied on `task02h` via config.py update. + +### 2. DNS failure (run 11716, 34s) +- **Error:** `[Errno 8] nodename nor servname provided, or not known` +- **Root cause:** Transient network failure — no code issue. + +### 3. Rate limit exceeded (run 11718, 78s) +- **Error:** `Max calls per sec rate limit reached (3/sec)` +- **Root cause:** The `.env` key is a free-tier key capped at **3 req/s** (not 5 as the code assumes). The `_RATE_LIMIT` constant in `polygonscan.py` is set to 4 req/s, which exceeds the actual key limit. +- **Fix needed:** Set `_RATE_LIMIT = 2` (conservative, below the 3/sec cap) to avoid triggering rate errors. + +### 4. Server timeout (run 11720, ~4h) +- **Error:** `Unexpected error, timeout or server too busy. Please try again` +- **Root cause:** The `all_stale` batch queried 812,855 wallets in a single long-running process. After ~4 hours the Etherscan/Polygonscan API returned a server error that wasn't retried gracefully. The collector has no checkpoint/resume — on failure the entire run is lost. +- **Fix needed:** Add a checkpoint mechanism: persist progress (last completed wallet address) to `data_collection_runs.run_metadata` every N wallets, and resume from that address on restart. + +--- + +## Last Enriched Wallets (checkpoint for resume) + +These are the 5 most recently enriched wallets (by `last_refreshed_at`), usable as +resume anchors if the collector is restarted alphabetically or by `first_seen_polymarket_at`: + +| Wallet address | last_refreshed_at | +|---|---| +| `0x3801c747ac8ae7fa77514dd852e81f44376883dd` | 2026-04-27 ~06:18 UTC | +| `0xec8d797a40d5990d00e7468f347c732d4e3b453d` | 2026-04-27 ~06:18 UTC | +| `0x1b73480fbf1bc450991d93f687570ccdf6b545d9` | 2026-04-27 ~06:18 UTC | +| `0xd713f0d2761a77f7834dcbbbc8a25abc319daf79` | 2026-04-27 ~06:18 UTC | +| `0x4f5e6216719c7347caf4dc42cf49013ce4671773` | 2026-04-27 ~06:18 UTC | + +The `_get_stale_wallets` query does **not** order by any resumable key — it returns all +wallets where `first_seen_chain_at IS NULL OR last_refreshed_at < cutoff`. Without a +checkpoint, a resume re-queries all 812,855 wallets from scratch, but `ON CONFLICT DO +UPDATE` on `address` means already-enriched wallets are safely overwritten with the same +data. The 2,449 already-enriched wallets will not be re-queried (they have +`first_seen_chain_at IS NOT NULL` and a recent `last_refreshed_at`). + +--- + +## Blockers Before Next Run + +Two fixes required before the next `--all-stale` run will succeed: + +### Fix A — Rate limit (required) +In `fflow/collectors/polygonscan.py`, change: +```python +_RATE_LIMIT = 4 # req/s ← exceeds actual free-tier cap of 3/sec +``` +to: +```python +_RATE_LIMIT = 2 # req/s ← conservative under 3/sec free-tier cap +``` + +### Fix B — Checkpoint/resume (strongly recommended for full corpus) +At 2 req/s the full 812,855-wallet run would take **113 hours** (~4.7 days). Without +checkpoint/resume, any interruption (network blip, server timeout, process kill) loses +all progress and restarts from scratch. + +Minimal implementation: every 1,000 wallets, write `{"last_address": addr, "n_done": n}` +to `data_collection_runs.run_metadata` for the current run. On restart, read the metadata +and pass `address > last_address` to `_get_stale_wallets`. + +### Fix C — API key upgrade (optional but recommended) +A paid Etherscan/Polygonscan key unlocks 10+ req/s (vs 3/sec free). At 10 req/s the +full corpus would take ~22 hours (manageable in a single overnight run). Cost: ~$10-20/month. + +--- + +## Recommendation for Next Run + +Priority order: +1. Apply Fix A (rate limit) — 5 min change, prevents immediate failure. +2. Apply Fix B (checkpoint) — prevents 4-hour loss on next server timeout. +3. Run `fflow collect polygonscan --all-stale` in a tmux/screen session or as a background service. +4. Monitor via `SELECT COUNT(*) FROM wallets WHERE first_seen_chain_at IS NOT NULL` — should increment steadily. + +At 2 req/s without Fix B, expect the run to take 4+ days. With Fix B + checkpoint, interruptions become recoverable. + +--- + +## Why This Matters for Task 03 + +The polygonscan `funding_sources` field identifies wallet provenance — whether a trader +funded from a CEX, another wallet cluster, or fresh on-chain. This is a secondary signal +for insider-trading detection (well-funded wallets with pre-news positions are stronger +candidates than retail wallets). The 2,449 currently enriched wallets are a small fraction +of the relevant population. Full enrichment is not a blocker for Task 03 ILS methodology, +but is needed for the full wallet-level analysis in Task 04+. diff --git a/reports/TASK_02H_FFIC_DIAGNOSTICS.md b/reports/TASK_02H_FFIC_DIAGNOSTICS.md new file mode 100644 index 0000000..7bdb6f5 --- /dev/null +++ b/reports/TASK_02H_FFIC_DIAGNOSTICS.md @@ -0,0 +1,105 @@ +# Task 02h — FFIC Trade-History Diagnostics + +**Generated:** 2026-04-27 +**Branch:** task02h/ffic-trade-backfill + +Per-market diagnosis of missing trade history for all 24 FFIC markets. + +--- + +## Diagnostic Table + +| Case | Label | Market ID | Vol ($) | Trades in DB | n_runs | Last status | n_written | Diagnosis | Recommendation | +|---|---|---|---|---|---|---|---|---|---| +| fficd-001 | Trump wins | `0xdd22472e552920…` | 1,531,479,285 | 0 | 4 | failed | 0 | ran_indexer_failed | try_rpc_direct | +| fficd-001 | Harris wins | `0xc6485bb7ea46d7…` | 1,037,039,118 | 0 | 1 | failed | 0 | ran_indexer_failed | try_rpc_direct | +| fficd-001 | Other Republican wins | `0x55c551896c10a7…` | 241,655,100 | 0 | 1 | failed | 0 | ran_indexer_failed | try_rpc_direct | +| fficd-001 | Michelle Obama wins | `0x230144e34a84df…` | 153,382,276 | 0 | 1 | failed | 0 | ran_indexer_failed | try_rpc_direct | +| fficd-002 | Iran strike today | `0xc1b6d7128a66a7…` | 148,732 | 309 | 1 | success | 309 | ok | ok | +| fficd-002 | Another strike by Friday | `0x9372742055caba…` | 100,479 | 607 | 1 | success | 607 | ok | ok | +| fficd-002 | Iran strike by Nov 8 | `0xc83128531d31cc…` | 788,895 | 1,929 | 1 | success | 1929 | ok | ok | +| fficd-003 | US forces enter Iran by Apr 30 | `0x6d0e09d0f04572…` | 269,049,107 | 0 | 2 | running | — | ran_indexer_failed | try_rpc_direct | +| fficd-003 | US-Iran ceasefire by Apr 7 | `0x4c5701bcde0b8f…` | 173,696,184 | 0 | 1 | failed | 0 | ran_indexer_failed | try_rpc_direct | +| fficd-003 | Khamenei out by Feb 28 | `0xd4bbf7f6707c67…` | 131,114,971 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-003 | Israel-Hezbollah ceasefire by Apr 18 | `0x9823d715687a0a…` | 98,599,882 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-003 | US strikes Iran by Feb 28 | `0x3488f31e6449f9…` | 89,652,867 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-003 | Khamenei out by Mar 31 | `0x70909f0ba8256a…` | 63,238,698 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-004 | Maduro in US custody by Jan 31 | `0xbfa45527ec959a…` | 11,034,070 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-004 | US-Venezuela military by Dec 31 | `0x62b0cd598091a1…` | 51,073,021 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-004 | US invades Venezuela by Jan 31 | `0x7f3c6b9029a1a4…` | 8,368,551 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-005 | Bitcoin ETF approved by Jan 15 | `0xb36886bb0cf7ce…` | 12,622,418 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-006 | Gene Hackman #1 Passings | `0x54361608e7307b…` | 2,952,428 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-006 | Ismail Haniyeh #1 Passings | `0x4512635352a1ae…` | 1,591,632 | 358 | 1 | success | 358 | ok | ok | +| fficd-006 | Zendaya #1 Actors | `0x264771233508f8…` | 755,946 | 299 | 1 | success | 299 | ok | ok | +| fficd-007 | Biden pardons SBF | `0xf4078ddd084c89…` | 8,209,071 | 0 | 0 | — | — | never_run | rerun_subgraph | +| fficd-007 | SBF sentenced to 50+ years | `0x2b8608c1c98160…` | 363,283 | 493 | 1 | success | 493 | ok | ok | +| fficd-007 | FTX no payouts in 2024 | `0x02c8326d2a5e3b…` | 952,525 | 2,148 | 1 | success | 2148 | ok | ok | +| fficd-008 | Ciuca wins Romanian election | `0x9872fe47fbf628…` | 326,507,671 | 9,288 | 2 | success | 9288 | ok | ok | + +--- + +## Diagnosis Definitions + +| Diagnosis | Meaning | +|---|---| +| `ok` | ≥ 100 trades in DB, no action needed | +| `never_run` | subgraph collector has never been run for this market | +| `ran_returned_zero` | collector ran successfully but returned 0 trades (low-volume or subgraph gap) | +| `ran_indexer_failed` | collector ran but returned 0 despite high volume — likely The Graph indexer capacity limit | +| `partial` | < 100 trades in DB, re-run needed | +| `not_in_db` | market not present in markets table at all | +| `investigate_further` | ambiguous state requiring manual review | + +## Recommendation Definitions + +| Recommendation | Action | +|---|---| +| `ok` | No action | +| `rerun_subgraph` | `fflow collect subgraph --market --max-pages 200` | +| `try_rpc_direct` | Direct Polygon JSON-RPC or Polygonscan logs endpoint (Phase 3) | +| `check_gamma_collection` | Market not in DB — re-run gamma collector first | +| `investigate_further` | Manual review required | + +--- + +## Summary + +| Diagnosis | Count | +|---|---| +| never_run | 10 | +| ok | 8 | +| ran_indexer_failed | 6 | + +| Recommendation | Count | +|---|---| +| rerun_subgraph | 10 | +| ok | 8 | +| try_rpc_direct | 6 | + +--- + +## Phase 2 Target List (rerun_subgraph) + +| Case | Label | Market ID | Vol ($) | Trades in DB | +|---|---|---|---|---| +| fficd-003 | Khamenei out by Feb 28 | `0xd4bbf7f6707c67beb736135ad32a41f6db41f8ae52d3ac4919650de9eeb94ed8` | 131,114,971 | 0 | +| fficd-003 | Israel-Hezbollah ceasefire by Apr 18 | `0x9823d715687a0a82d2a03731792e83bf58a0409f10def1379e00e4d67a95ba69` | 98,599,882 | 0 | +| fficd-003 | US strikes Iran by Feb 28 | `0x3488f31e6449f9803f99a8b5dd232c7ad883637f1c86e6953305a2ef19c77f20` | 89,652,867 | 0 | +| fficd-003 | Khamenei out by Mar 31 | `0x70909f0ba8256a89c301da58812ae47203df54957a07c7f8b10235e877ad63c2` | 63,238,698 | 0 | +| fficd-004 | Maduro in US custody by Jan 31 | `0xbfa45527ec959aacc36f7c312bd4f328171a7681ef1aeb3a7e34db5fb47d3f1d` | 11,034,070 | 0 | +| fficd-004 | US-Venezuela military by Dec 31 | `0x62b0cd598091a179147acbd4616400f804acfdff6f76f029944b481b37cbd45f` | 51,073,021 | 0 | +| fficd-004 | US invades Venezuela by Jan 31 | `0x7f3c6b9029a1a4a932509c147a2cc0762e1116b7a4568cde472908b29dd4889d` | 8,368,551 | 0 | +| fficd-005 | Bitcoin ETF approved by Jan 15 | `0xb36886bb0cf7cede4fd57fedbbbf80342ec76921d567fa9958275c22e1df04bd` | 12,622,418 | 0 | +| fficd-006 | Gene Hackman #1 Passings | `0x54361608e7307b22f080b6e6eed9f1d698e1fe122f7f6813efa7a2d8f2eb470c` | 2,952,428 | 0 | +| fficd-007 | Biden pardons SBF | `0xf4078ddd084c8979c81f1ac4674d5e846b87a13b7f568bdd402296181e83b4d9` | 8,209,071 | 0 | + +## Phase 3 Target List (try_rpc_direct — Group A) + +| Case | Label | Market ID | Vol ($) | +|---|---|---|---| +| fficd-001 | Trump wins | `0xdd22472e552920b8438158ea7238bfadfa4f736aa4cee91a6b86c39ead110917` | 1,531,479,285 | +| fficd-001 | Harris wins | `0xc6485bb7ea46d7bb89beb9c91e7572ecfc72a6273789496f78bc5e989e4d1638` | 1,037,039,118 | +| fficd-001 | Other Republican wins | `0x55c551896c10a74861f2fd88b4f928694310114704cc74b29b9760d1156cade6` | 241,655,100 | +| fficd-001 | Michelle Obama wins | `0x230144e34a84dfd0ebdc6de7fde37780e28154f6f84dd8880c7f0e58d302d448` | 153,382,276 | +| fficd-003 | US forces enter Iran by Apr 30 | `0x6d0e09d0f04572d9b1adad84703458b0297bc5603b69dccbde93147ee4443246` | 269,049,107 | +| fficd-003 | US-Iran ceasefire by Apr 7 | `0x4c5701bcde0b8fb7d7f48c8e9d20245a6caa58c61a77f981fad98f2bfa0b1bc7` | 173,696,184 | diff --git a/reports/TASK_02H_PHASE2_RESULTS.md b/reports/TASK_02H_PHASE2_RESULTS.md new file mode 100644 index 0000000..d9be638 --- /dev/null +++ b/reports/TASK_02H_PHASE2_RESULTS.md @@ -0,0 +1,80 @@ +# Task 02h Phase 2 — Subgraph Backfill Results + +**Generated:** 2026-04-27 +**Branch:** task02h/ffic-trade-backfill + +Phase 2 ran the subgraph collector for all 10 `never_run` FFIC markets. +All 10 succeeded. Two bugs were discovered and fixed en route. + +--- + +## Results + +| Case | Label | Vol ($) | Trades collected | Status | +|---|---|---|---|---| +| fficd-003 | Khamenei out by Feb 28 | 131,114,971 | **113,472** | ✓ success | +| fficd-003 | Israel-Hezbollah ceasefire by Apr 18 | 98,599,882 | **11,035** | ✓ success | +| fficd-003 | US strikes Iran by Feb 28 | 89,652,867 | **109,072** | ✓ success | +| fficd-003 | Khamenei out by Mar 31 | 63,238,698 | **89,267** | ✓ success | +| fficd-004 | Maduro in US custody by Jan 31 | 11,034,070 | **3,350** | ✓ success | +| fficd-004 | US-Venezuela military by Dec 31 | 51,073,021 | **60,785** | ✓ success | +| fficd-004 | US invades Venezuela by Jan 31 | 8,368,551 | **17,776** | ✓ success | +| fficd-005 | Bitcoin ETF approved by Jan 15 | 12,622,418 | **7,515** | ✓ success | +| fficd-006 | Gene Hackman #1 Passings | 2,952,428 | **453** | ✓ success | +| fficd-007 | Biden pardons SBF | 8,209,071 | **13,787** | ✓ success | + +**Total new trades collected: 446,512** + +All four fficd-003 borderline markets ($63–131M) were handled successfully by the +subgraph — unlike fficd-003's two previously-attempted markets ($174M, $269M) which +failed with the indexer. The $63–131M range is at the upper edge of indexer capacity; +all four returned large trade counts (11K–113K) rather than zero. + +--- + +## Bugs Fixed + +### 1. Wrong subgraph entity — `orderFilleds` vs `enrichedOrderFilleds` + +The `master` branch `subgraph.py` used `orderFilleds` in the GQL query. The configured +subgraph (`81Dm16JjuFSrqz813HysXoUPvzTwE7fsfPk2RTf66nyC`) exposes `enrichedOrderFilleds` +with a different field schema. The fix was already present on `task02d+` branches but not +merged to master. Cherry-picked commit `263faac` onto this branch. + +### 2. PostgreSQL 32,767 parameter limit in wallet upsert + +For markets with > 10,922 unique wallets, the single-batch wallet INSERT exceeded +asyncpg's 32,767 argument limit (3 columns × rows). Fixed by chunking wallet rows at +10,000 per batch (matching the existing 500-row chunk used for trades). + +Affected markets: Khamenei Feb28 (21,425 wallets), US-strikes Iran (20,193), +Khamenei Mar31 (19,494), Venezuela Dec31 (10,113). + +--- + +## Remaining Phase 3 Targets (ran_indexer_failed, unchanged) + +The 6 markets with confirmed indexer failures still have 0 trades: + +| Case | Label | Vol ($) | n_runs | Status | +|---|---|---|---|---| +| fficd-001 | Trump wins | 1,531,479,285 | 4 | indexer_failed | +| fficd-001 | Harris wins | 1,037,039,118 | 1 | indexer_failed | +| fficd-001 | Other Republican wins | 241,655,100 | 1 | indexer_failed | +| fficd-001 | Michelle Obama wins | 153,382,276 | 1 | indexer_failed | +| fficd-003 | US forces enter Iran by Apr 30 | 269,049,107 | 2 | indexer_failed | +| fficd-003 | US-Iran ceasefire by Apr 7 | 173,696,184 | 1 | indexer_failed | + +These require Polygonscan logs API (Option 3B) or direct Polygon JSON-RPC (Option 3A). +The US election markets ($153M–$1.5B) are the primary Phase 3 targets. + +--- + +## Updated FFIC Counts + +| Status | Before Phase 2 | After Phase 2 | +|---|---|---| +| ok (≥ 100 trades in DB) | 8 | **18** | +| never_run → now ok | 0 | +10 | +| ran_indexer_failed | 6 | 6 (unchanged) | +| **Total with trade data** | **8** | **18** | diff --git a/scripts/diagnose_ffic_missing.py b/scripts/diagnose_ffic_missing.py new file mode 100644 index 0000000..98ed1d0 --- /dev/null +++ b/scripts/diagnose_ffic_missing.py @@ -0,0 +1,345 @@ +"""Phase 1: Diagnose trade-history gaps for all 24 FFIC markets. + +For each market: + - Resolve prefix → full market_id + metadata + - Current trades count in DB + - subgraph_trades collector run history (n_runs, last status, last n_written) + - Diagnosis: never_run | ran_returned_zero | ran_indexer_failed | partial | ok + - Recommendation: rerun_subgraph | try_rpc_direct | investigate_further | ok + +Writes reports/TASK_02H_FFIC_DIAGNOSTICS.md. +""" + +import asyncio +import pathlib +from datetime import UTC, datetime + +from sqlalchemy import text + +from fflow.db import AsyncSessionLocal + +FFIC_CASES = [ + { + "case_id": "fficd-001", + "name": "2024 US Presidential Election", + "markets": [ + {"prefix": "0xdd22472e", "label": "Trump wins"}, + {"prefix": "0xc6485bb7", "label": "Harris wins"}, + {"prefix": "0x55c55189", "label": "Other Republican wins"}, + {"prefix": "0x230144e3", "label": "Michelle Obama wins"}, + ], + }, + { + "case_id": "fficd-002", + "name": "October 2024 Iran Strike on Israel", + "markets": [ + {"prefix": "0xc1b6d712", "label": "Iran strike today"}, + {"prefix": "0x93727420", "label": "Another strike by Friday"}, + {"prefix": "0xc8312853", "label": "Iran strike by Nov 8"}, + ], + }, + { + "case_id": "fficd-003", + "name": "2026 US-Iran Military Conflict Cluster", + "markets": [ + {"prefix": "0x6d0e09d0", "label": "US forces enter Iran by Apr 30"}, + {"prefix": "0x4c5701bc", "label": "US-Iran ceasefire by Apr 7"}, + {"prefix": "0xd4bbf7f6", "label": "Khamenei out by Feb 28"}, + {"prefix": "0x9823d715", "label": "Israel-Hezbollah ceasefire by Apr 18"}, + {"prefix": "0x3488f31e", "label": "US strikes Iran by Feb 28"}, + {"prefix": "0x70909f0b", "label": "Khamenei out by Mar 31"}, + ], + }, + { + "case_id": "fficd-004", + "name": "Maduro / Venezuela 2024–2026", + "markets": [ + {"prefix": "0xbfa45527", "label": "Maduro in US custody by Jan 31"}, + {"prefix": "0x62b0cd59", "label": "US-Venezuela military by Dec 31"}, + {"prefix": "0x7f3c6b90", "label": "US invades Venezuela by Jan 31"}, + ], + }, + { + "case_id": "fficd-005", + "name": "Bitcoin ETF SEC Approval Jan 2024", + "markets": [ + {"prefix": "0xb36886bb", "label": "Bitcoin ETF approved by Jan 15"}, + ], + }, + { + "case_id": "fficd-006", + "name": "Google Year in Search 2025", + "markets": [ + {"prefix": "0x54361608", "label": "Gene Hackman #1 Passings"}, + {"prefix": "0x45126353", "label": "Ismail Haniyeh #1 Passings"}, + {"prefix": "0x26477123", "label": "Zendaya #1 Actors"}, + ], + }, + { + "case_id": "fficd-007", + "name": "FTX / SBF Collapse 2022–2024", + "markets": [ + {"prefix": "0xf4078ddd", "label": "Biden pardons SBF"}, + {"prefix": "0x2b8608c1", "label": "SBF sentenced to 50+ years"}, + {"prefix": "0x02c8326d", "label": "FTX no payouts in 2024"}, + ], + }, + { + "case_id": "fficd-008", + "name": "Romanian Presidential Election 2024", + "markets": [ + {"prefix": "0x9872fe47", "label": "Ciuca wins Romanian election"}, + ], + }, +] + + +async def diagnose_market(session, prefix: str, label: str, case_id: str) -> dict: + result = { + "prefix": prefix, + "label": label, + "case_id": case_id, + "market_id": None, + "question": None, + "volume_usdc": None, + "created_at": None, + "closed_at": None, + "resolved_at": None, + "resolution_type": None, + "n_trades_db": 0, + "n_runs": 0, + "last_status": None, + "last_n_written": None, + "all_statuses": [], + "diagnosis": None, + "recommendation": None, + } + + # Resolve prefix → market + row = (await session.execute( + text("SELECT id, question, volume_total_usdc, created_at_chain, " + "end_date, resolved_at, resolution_type " + "FROM markets WHERE id LIKE :p LIMIT 1"), + {"p": prefix + "%"}, + )).mappings().first() + + if row is None: + result["diagnosis"] = "not_in_db" + result["recommendation"] = "check_gamma_collection" + return result + + mid = row["id"] + result["market_id"] = mid + result["question"] = row["question"] + result["volume_usdc"] = float(row["volume_total_usdc"] or 0) + result["created_at"] = row["created_at_chain"] + result["closed_at"] = row["end_date"] + result["resolved_at"] = row["resolved_at"] + result["resolution_type"] = row["resolution_type"] + + # Current trades count + result["n_trades_db"] = (await session.execute( + text("SELECT COUNT(*) FROM trades WHERE market_id = :mid"), {"mid": mid} + )).scalar() or 0 + + # Collector run history + runs = (await session.execute( + text("SELECT status, n_records_written, started_at " + "FROM data_collection_runs " + "WHERE collector = 'subgraph_trades' AND target = :mid " + "ORDER BY started_at DESC"), + {"mid": mid}, + )).mappings().all() + + result["n_runs"] = len(runs) + if runs: + result["last_status"] = runs[0]["status"] + result["last_n_written"] = runs[0]["n_records_written"] + result["all_statuses"] = [r["status"] for r in runs] + + # Diagnose + n = result["n_trades_db"] + nr = result["n_runs"] + + if nr == 0: + if n > 0: + result["diagnosis"] = "partial" # trades exist but no run record + result["recommendation"] = "rerun_subgraph" + else: + result["diagnosis"] = "never_run" + result["recommendation"] = "rerun_subgraph" + else: + last_written = result["last_n_written"] or 0 + any_success = any(s == "success" for s in result["all_statuses"]) + any_failed = any(s == "failed" for s in result["all_statuses"]) + + vol = result["volume_usdc"] or 0 + if n == 0 and any_success and last_written == 0: + if vol >= 1_000_000: + result["diagnosis"] = "ran_indexer_failed" + result["recommendation"] = "try_rpc_direct" + else: + result["diagnosis"] = "ran_returned_zero" + result["recommendation"] = "rerun_subgraph" + elif n == 0 and any_failed: + # Runs failed — high-volume markets likely exceed indexer capacity + if vol >= 50_000_000: + result["diagnosis"] = "ran_indexer_failed" + result["recommendation"] = "try_rpc_direct" + else: + result["diagnosis"] = "ran_indexer_failed" + result["recommendation"] = "rerun_subgraph" + elif n > 0 and n < 100: + result["diagnosis"] = "partial" + result["recommendation"] = "rerun_subgraph" + elif n >= 100: + result["diagnosis"] = "ok" + result["recommendation"] = "ok" + else: + result["diagnosis"] = "investigate_further" + result["recommendation"] = "investigate_further" + + return result + + +async def run(): + async with AsyncSessionLocal() as session: + all_results = [] + for case in FFIC_CASES: + for mkt in case["markets"]: + r = await diagnose_market(session, mkt["prefix"], mkt["label"], case["case_id"]) + r["case_name"] = case["name"] + all_results.append(r) + print( + f" {r['case_id']} {r['label'][:35]:35s}: " + f"trades={r['n_trades_db']:5d} n_runs={r['n_runs']} " + f"vol=${r['volume_usdc']:>15,.0f} " + f"diag={r['diagnosis']}" + ) + + # ── Summary counts ──────────────────────────────────────────────────────── + diag_counts: dict[str, int] = {} + rec_counts: dict[str, int] = {} + for r in all_results: + diag_counts[r["diagnosis"]] = diag_counts.get(r["diagnosis"], 0) + 1 + rec_counts[r["recommendation"]] = rec_counts.get(r["recommendation"], 0) + 1 + + print(f"\nDiagnosis summary: {diag_counts}") + print(f"Recommendation summary: {rec_counts}") + + # ── Write report ────────────────────────────────────────────────────────── + now = datetime.now(UTC).strftime("%Y-%m-%d") + lines = [ + "# Task 02h — FFIC Trade-History Diagnostics", + "", + f"**Generated:** {now} ", + "**Branch:** task02h/ffic-trade-backfill", + "", + "Per-market diagnosis of missing trade history for all 24 FFIC markets.", + "", + "---", + "", + "## Diagnostic Table", + "", + "| Case | Label | Market ID | Vol ($) | Trades in DB | n_runs | Last status | n_written | Diagnosis | Recommendation |", + "|---|---|---|---|---|---|---|---|---|---|", + ] + + for r in all_results: + mid_str = (r["market_id"][:16] + "…") if r["market_id"] else "NOT_IN_DB" + vol_str = f"{r['volume_usdc']:,.0f}" if r["volume_usdc"] is not None else "—" + last_s = r["last_status"] or "—" + last_w = str(r["last_n_written"]) if r["last_n_written"] is not None else "—" + lines.append( + f"| {r['case_id']} | {r['label']} | `{mid_str}` " + f"| {vol_str} | {r['n_trades_db']:,} " + f"| {r['n_runs']} | {last_s} | {last_w} " + f"| {r['diagnosis']} | {r['recommendation']} |" + ) + + lines += [ + "", + "---", + "", + "## Diagnosis Definitions", + "", + "| Diagnosis | Meaning |", + "|---|---|", + "| `ok` | ≥ 100 trades in DB, no action needed |", + "| `never_run` | subgraph collector has never been run for this market |", + "| `ran_returned_zero` | collector ran successfully but returned 0 trades (low-volume or subgraph gap) |", + "| `ran_indexer_failed` | collector ran but returned 0 despite high volume — likely The Graph indexer capacity limit |", + "| `partial` | < 100 trades in DB, re-run needed |", + "| `not_in_db` | market not present in markets table at all |", + "| `investigate_further` | ambiguous state requiring manual review |", + "", + "## Recommendation Definitions", + "", + "| Recommendation | Action |", + "|---|---|", + "| `ok` | No action |", + "| `rerun_subgraph` | `fflow collect subgraph --market --max-pages 200` |", + "| `try_rpc_direct` | Direct Polygon JSON-RPC or Polygonscan logs endpoint (Phase 3) |", + "| `check_gamma_collection` | Market not in DB — re-run gamma collector first |", + "| `investigate_further` | Manual review required |", + "", + "---", + "", + "## Summary", + "", + "| Diagnosis | Count |", + "|---|---|", + ] + for diag, cnt in sorted(diag_counts.items(), key=lambda x: -x[1]): + lines.append(f"| {diag} | {cnt} |") + + lines += [ + "", + "| Recommendation | Count |", + "|---|---|", + ] + for rec, cnt in sorted(rec_counts.items(), key=lambda x: -x[1]): + lines.append(f"| {rec} | {cnt} |") + + # Group A (high-volume, indexer-failed) — flagged for Phase 3 + group_a = [r for r in all_results if r["recommendation"] == "try_rpc_direct"] + group_b = [r for r in all_results if r["recommendation"] == "rerun_subgraph"] + + lines += [ + "", + "---", + "", + "## Phase 2 Target List (rerun_subgraph)", + "", + "| Case | Label | Market ID | Vol ($) | Trades in DB |", + "|---|---|---|---|---|", + ] + for r in group_b: + mid_str = r["market_id"] if r["market_id"] else "NOT_IN_DB" + lines.append( + f"| {r['case_id']} | {r['label']} | `{mid_str}` " + f"| {r['volume_usdc']:,.0f} | {r['n_trades_db']:,} |" + ) + + if group_a: + lines += [ + "", + "## Phase 3 Target List (try_rpc_direct — Group A)", + "", + "| Case | Label | Market ID | Vol ($) |", + "|---|---|---|---|", + ] + for r in group_a: + mid_str = r["market_id"] if r["market_id"] else "NOT_IN_DB" + lines.append( + f"| {r['case_id']} | {r['label']} | `{mid_str}` " + f"| {r['volume_usdc']:,.0f} |" + ) + + pathlib.Path("reports").mkdir(exist_ok=True) + pathlib.Path("reports/TASK_02H_FFIC_DIAGNOSTICS.md").write_text("\n".join(lines) + "\n") + print("\n→ reports/TASK_02H_FFIC_DIAGNOSTICS.md") + + +if __name__ == "__main__": + asyncio.run(run())