From 154ce40fe49df3498d0263851af8f08adf5c7f73 Mon Sep 17 00:00:00 2001 From: Maksym Nechepurenko Date: Mon, 27 Apr 2026 21:40:23 +0400 Subject: [PATCH] =?UTF-8?q?feat:=20Task=2002H=20Phase=203=20=E2=80=94=20CL?= =?UTF-8?q?OB=20prices=20+=20data-api=20trade=20recovery=20for=202=20index?= =?UTF-8?q?er-failed=20markets?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both markets (Iran Apr30 $269M, Ceasefire Apr7 $174M) had zero trades in DB due to The Graph indexer failure. Polygonscan getLogs and eth_getLogs are infeasible (CTF Exchange emits 300 events/block across all markets; filtering client-side requires 82GB transfer). The Graph subgraph confirmed empty for both YES/NO token IDs. Final approach: CLOB /prices-history (full 1-min candles, 14-day batches) + data-api.polymarket.com/trades (4,000 most-recent trades per market). Results: - Iran Apr30: 28,774 price candles (2026-03-18 – 2026-04-09) + 4,000 trades - Ceasefire Apr7: 23,127 price candles (2026-03-24 – 2026-04-11) + 4,000 trades - Both markets resolved YES; price data covers full market lifetime Co-Authored-By: Claude Sonnet 4.6 --- fflow/config.py | 10 +- reports/TASK_02H_PHASE3_RESULTS.md | 105 +++++++ scripts/backfill_ffic_polygonscan_logs.py | 366 ++++++++++++++++++++++ 3 files changed, 478 insertions(+), 3 deletions(-) create mode 100644 reports/TASK_02H_PHASE3_RESULTS.md create mode 100644 scripts/backfill_ffic_polygonscan_logs.py diff --git a/fflow/config.py b/fflow/config.py index df7d404..f4ab00e 100644 --- a/fflow/config.py +++ b/fflow/config.py @@ -1,3 +1,4 @@ +from pydantic import Field, AliasChoices from pydantic_settings import BaseSettings, SettingsConfigDict @@ -20,8 +21,11 @@ class Settings(BaseSettings): polygonscan_api_key: str | None = None polygonscan_url: str = "https://api.polygonscan.com/api" - # Anthropic (Tier 3 LLM) - anthropic_api_key: str | None = None + # Anthropic (Tier 3 LLM) — accepts FFLOW_ANTHROPIC_API_KEY or ANTHROPIC_API_KEY + anthropic_api_key: str | None = Field( + default=None, + validation_alias=AliasChoices("FFLOW_ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY"), + ) # UMA / Polygon RPC polygon_rpc_url: str = "https://polygon-rpc.com" @@ -35,7 +39,7 @@ class Settings(BaseSettings): log_level: str = "INFO" log_json: bool = False - model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env") + model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env", extra="ignore") settings = Settings() diff --git a/reports/TASK_02H_PHASE3_RESULTS.md b/reports/TASK_02H_PHASE3_RESULTS.md new file mode 100644 index 0000000..1f3038b --- /dev/null +++ b/reports/TASK_02H_PHASE3_RESULTS.md @@ -0,0 +1,105 @@ +# Task 02H Phase 3 — Results + +**Branch:** `task02h-phase3/polygonscan-logs` +**Date:** 2026-04-27 +**Targets:** 2 indexer-failed FFIC fficd-003 markets + +--- + +## Targets + +| Label | Market ID | Volume | Resolved | +|---|---|---|---| +| US forces enter Iran by April 30 | `0x6d0e09d0f04572d9b1adad84703458b0297bc5603b69dccbde93147ee4443246` | $269M | YES — 2026-04-09 | +| US x Iran ceasefire by April 7 | `0x4c5701bcde0b8fb7d7f48c8e9d20245a6caa58c61a77f981fad98f2bfa0b1bc7` | $174M | YES — 2026-04-11 | + +Both markets resolved **YES** (outcome_index=1). + +--- + +## Approach History + +### Attempt 1 — Polygonscan getLogs (Etherscan V2 API) +- **Strategy:** Fetch all CTF Exchange `OrderFilled` events in the market's block range, filter by YES-token ID in the non-indexed `data` field. +- **CTF Exchange:** `0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e` +- **OrderFilled topic0:** `0xd0a08e8c493f9c94f29311604c9de1b4e8c8d4c06bd0c789af57f2d65bfec0f6` +- **Block ranges:** Iran 84,365,653 – 85,287,221 (22 days); Ceasefire 84,627,342 – 85,373,621 (18 days) +- **Result:** Failed. The CTF Exchange processes ~300 OrderFilled events per Polygon block (entire exchange). With the minimum window forced to 200 blocks and 1,000 events per window, scraping Iran-Apr30 alone would require ~4,600 API calls at the 3 req/s free tier → ~5 hours. Script timed out (`httpx.ReadTimeout`) at 13.3%. + +### Attempt 2 — Polygon RPC eth_getLogs (concurrent workers) +- **Strategy:** Use `eth_getLogs` on public Polygon RPC with 10 concurrent workers to parallelize the block range. +- **Tested endpoints:** `polygon-rpc.com` (disabled), `rpc.ankr.com` (requires key), `1rpc.io/matic` (works). +- **Result:** Infeasible. 1rpc.io returns **59,495 events per 200-block window** (vs. Polygonscan's truncated 1,000). Downloading all raw events for market 1 requires **~82 GB of data transfer** to filter for one YES token client-side. Not practical. + +### Attempt 3 — The Graph subgraph (confirmed empty) +- `enrichedOrderFilleds` queried with YES token and NO token decimal IDs — returns `[]` for both markets. +- The indexer truly has no data; not a query error. + +### Attempt 4 — CLOB API `/trades` +- Returns `{"error": "Unauthorized/Invalid api key"}` — requires Polymarket trader credentials. + +### Attempt 5 — Polymarket data-api `/trades` + CLOB `/prices-history` ✅ +- **Source:** `https://data-api.polymarket.com/trades?market=&limit=1000&offset=N` +- **Limitation:** Offset-based pagination caps at ~4,000 most-recent trades; time-range parameters (`startTs`/`endTs`) are ignored by the API. +- **Coverage:** The 4,000 available trades cover the last 1–2 days before resolution — i.e., the highest-activity period. +- **CLOB prices:** `GET /prices-history?market=&startTs=&endTs=&fidelity=1` — 14-day batch windows (API enforces ≤15 days per request); gives full 1-minute price history from market creation to resolution. + +--- + +## Results + +| Metric | Iran Apr30 | Ceasefire Apr7 | +|---|---|---| +| CLOB price candles collected | 28,774 | 23,127 | +| Price date range | 2026-03-18 – 2026-04-09 | 2026-03-24 – 2026-04-11 | +| Price range (probability) | 0.05% – 64.5% | 0.05% – 99.2% | +| Trades collected | 4,000 | 4,000 | +| Trade date range | 2026-04-08 – 2026-04-09 | 2026-04-09 – 2026-04-11 | +| Notional (4K trades) | $9,780,639 | $10,680,849 | +| Avg price in trade window | 28.8% | 32.4% | +| Unique wallets | 2,595 | 2,261 | +| Total elapsed | 22.8s | 14.9s | +| Status | success | success | + +--- + +## Notable Findings + +### Iran Apr30 — dramatic price collapse before YES resolution +The CLOB price history shows the market opened at ~50% probability (market just created), peaked at **64.5%** in early trading, then fell steadily toward zero over the following weeks, before resolving **YES** on April 9 at 00:28 UTC. This is a counterintuitive pattern — the market was pricing the event as increasingly unlikely, yet it happened. A pre-resolution ILS analysis may reveal abnormal buying activity in the final hours. + +### Ceasefire Apr7 — near-certainty price before resolution +The price range reached **99.2%** probability before resolving YES on April 11. The market tracked the ceasefire expectation closely, suggesting the information was fairly reflected — or the price rise was the informed signal. The 4,000 available resolution-window trades average **32.4%** price (mid-resolution trading, before full certainty). + +--- + +## Limitations + +1. **Full trade history not accessible.** The `data-api.polymarket.com` endpoint only serves the ~4,000 most recent trades; the complete historical log (estimated 60K–300K trades per market over their full lifetime) is not available through any free public endpoint. + +2. **CLOB prices are sufficient for ILS.** ILS = `(p_news − p_open) / (p_resolve − p_open)` requires only the price series, which we now have in full from `prices` table. + +3. **`resolution_type = 'unclassifiable'`** for both markets. Standard FFIC eligibility requires `event_resolved`. Phase 4 re-audit will confirm whether the new data changes eligibility. + +4. **Full wallet-level analysis requires more trades.** Features like `wallet_hhi_top10` and `time_to_news_top10` would need the full pre-news trade history, not just the resolution window. + +--- + +## Phase 4 Target + +Re-run `scripts/audit_ffic_eligibility.py` after completing Task 02H Phases 1–3. +Expected outcome: both markets will still fail on `resolution_type = 'unclassifiable'` but will now have price data enabling a deadline-market ILS variant. + +--- + +## Data Recovery Path (future work) + +For the complete trade history: **Dune Analytics** query: +```sql +SELECT * +FROM polymarket_polygon.ctf_exchange_OrderFilled +WHERE maker_asset_id = '' + OR taker_asset_id = '' +ORDER BY block_time; +``` +This bypasses the indexer failure and filters by token ID server-side. diff --git a/scripts/backfill_ffic_polygonscan_logs.py b/scripts/backfill_ffic_polygonscan_logs.py new file mode 100644 index 0000000..54c2f29 --- /dev/null +++ b/scripts/backfill_ffic_polygonscan_logs.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +""" +Task 02H Phase 3: Recover price + trade data for 2 unindexed FFIC markets. + +Markets (both resolved YES): + 1. fficd-003 'US forces enter Iran by April 30' — $269M (resolved 2026-04-09) + 2. fficd-003 'US-Iran ceasefire by April 7' — $174M (resolved 2026-04-11) + +Approach: + 1. CLOB price history → /prices-history (full 1-min candles, entire market life) + 2. Polymarket data-api → /trades (max 3000 most-recent trades per market) + +The subgraph indexer failed for both markets (too large / indexer crash). +eth_getLogs on the CTF Exchange emits ~300 events/block across ALL markets; +downloading 82 GB to filter client-side is not practical. +The data-api gives the last ~3000 trades, covering the resolution window which +is the most analytically interesting period. +""" + +import asyncio +import hashlib +import json +import sys +from datetime import UTC, datetime +from pathlib import Path + +import httpx +from sqlalchemy.dialects.postgresql import insert + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from fflow.config import settings +from fflow.db import AsyncSessionLocal +from fflow.log import get_logger +from fflow.models import Market, Price, Trade, Wallet +from sqlalchemy import select, text + +log = get_logger(__name__) + +DATA_API = "https://data-api.polymarket.com" +CLOB_API = settings.clob_api_url # https://clob.polymarket.com + +MARKETS = [ + { + "market_id": "0x6d0e09d0f04572d9b1adad84703458b0297bc5603b69dccbde93147ee4443246", + "label": "US forces enter Iran by April 30", + "created_at_ts": 1773851347, # 2026-03-18T16:29:07 UTC + "resolved_at_ts": 1775694501, # 2026-04-09T00:28:21 UTC + }, + { + "market_id": "0x4c5701bcde0b8fb7d7f48c8e9d20245a6caa58c61a77f981fad98f2bfa0b1bc7", + "label": "US x Iran ceasefire by April 7", + "created_at_ts": 1774374725, # 2026-03-24T17:52:05 UTC + "resolved_at_ts": 1775867319, # 2026-04-11T00:28:39 UTC + }, +] + +# --------------------------------------------------------------------------- +# CLOB price history +# --------------------------------------------------------------------------- + +_BATCH_SECONDS = 14 * 24 * 3600 # CLOB limits requests to ~15 days max + + +async def _get_yes_token(market_id: str) -> str: + async with AsyncSessionLocal() as session: + row = await session.execute(select(Market.raw_metadata).where(Market.id == market_id)) + meta = row.scalar_one() + raw = meta.get("clobTokenIds", "[]") + ids = json.loads(raw) if isinstance(raw, str) else raw + return str(ids[1]) if len(ids) > 1 else "" + + +async def collect_clob_prices( + client: httpx.AsyncClient, + market_id: str, + yes_token: str, + label: str, + start_ts: int = 0, +) -> int: + """Fetch 1-min OHLCV from CLOB and upsert to prices table.""" + all_prices: list[dict] = [] + cursor = start_ts # from market creation + + while True: + batch_end = cursor + _BATCH_SECONDS + resp = await client.get( + f"{CLOB_API}/prices-history", + params={"market": yes_token, "startTs": cursor, "endTs": batch_end, "fidelity": 1}, + ) + resp.raise_for_status() + history = resp.json().get("history", []) + if not history: + break + all_prices.extend(history) + if len(history) < 2: + break + last_ts = history[-1]["t"] + if last_ts <= cursor: + break + cursor = last_ts + 60 + + if not all_prices: + log.info("clob_no_prices", market=label) + return 0 + + seen_ts: set = set() + rows = [] + for p in all_prices: + ts = datetime.fromtimestamp(p["t"], tz=UTC).replace(second=0, microsecond=0) + if ts in seen_ts: + continue + seen_ts.add(ts) + rows.append({"market_id": market_id, "ts": ts, "mid_price": str(p["p"]), + "bid": None, "ask": None, "volume_minute": None}) + + total = 0 + async with AsyncSessionLocal() as session: + for i in range(0, len(rows), 1000): + chunk = rows[i : i + 1000] + stmt = ( + insert(Price) + .values(chunk) + .on_conflict_do_update( + index_elements=["market_id", "ts"], + set_={"mid_price": insert(Price).excluded.mid_price}, + ) + ) + await session.execute(stmt) + total += len(chunk) + await session.commit() + + log.info("clob_upserted", market=label, n=total) + return total + + +# --------------------------------------------------------------------------- +# Data-api trade history (max ~3000 most-recent trades) +# --------------------------------------------------------------------------- + +def _pseudo_log_index(tx_hash: str, proxy_wallet: str, size: float, price: float, ts: int) -> int: + key = f"{tx_hash}:{proxy_wallet}:{size}:{price}:{ts}" + return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2 ** 31) + + +async def collect_data_api_trades( + client: httpx.AsyncClient, + market_id: str, + label: str, +) -> int: + """Fetch up to 3000 most-recent trades from data-api and upsert.""" + all_raw: list[dict] = [] + for offset in range(0, 3001, 1000): + resp = await client.get( + f"{DATA_API}/trades", + params={"market": market_id, "limit": 1000, "offset": offset}, + ) + resp.raise_for_status() + data = resp.json() + if not isinstance(data, list) or not data: + break + if isinstance(data, dict) and "error" in data: + log.warning("data_api_error", market=label, offset=offset, error=data["error"]) + break + all_raw.extend(data) + if len(data) < 1000: + break + + if not all_raw: + return 0 + + now = datetime.now(UTC) + trade_rows: list[dict] = [] + wallet_set: dict[str, datetime] = {} + + for t in all_raw: + tx_hash = t.get("transactionHash", "") + proxy_wallet = (t.get("proxyWallet") or "").lower() + size = float(t.get("size", 0)) + price = float(t.get("price", 0)) + timestamp = int(t.get("timestamp", 0)) + side = (t.get("side") or "BUY").upper() + if side not in ("BUY", "SELL"): + side = "BUY" + + ts = datetime.fromtimestamp(timestamp, tz=UTC) + log_idx = _pseudo_log_index(tx_hash, proxy_wallet, size, price, timestamp) + notional = size * price + + trade_rows.append({ + "market_id": market_id, + "tx_hash": tx_hash, + "log_index": log_idx, + "ts": ts, + "taker_address": proxy_wallet or "0x0000000000000000000000000000000000000000", + "maker_address": None, + "side": side, + "outcome_index": 1, + "size_shares": str(round(size, 6)), + "price": str(round(price, 6)), + "notional_usdc": str(round(notional, 6)), + "raw_event": {"source": "data_api_polymarket", "tx_hash": tx_hash, + "proxy_wallet": proxy_wallet, "size": size, "price": price}, + }) + + if proxy_wallet: + if proxy_wallet not in wallet_set or wallet_set[proxy_wallet] > ts: + wallet_set[proxy_wallet] = ts + + # upsert trades + total = 0 + async with AsyncSessionLocal() as session: + for i in range(0, len(trade_rows), 500): + chunk = trade_rows[i : i + 500] + stmt = insert(Trade).values(chunk).on_conflict_do_nothing(constraint="uq_trades_tx_log") + await session.execute(stmt) + total += len(chunk) + await session.commit() + + # upsert wallets + if wallet_set: + wallet_rows = [ + {"address": a, "first_seen_polymarket_at": ts, "last_refreshed_at": now} + for a, ts in wallet_set.items() + ] + async with AsyncSessionLocal() as session: + for i in range(0, len(wallet_rows), 10_000): + chunk = wallet_rows[i : i + 10_000] + stmt = ( + insert(Wallet) + .values(chunk) + .on_conflict_do_update( + index_elements=["address"], + set_={"first_seen_polymarket_at": insert(Wallet).excluded.first_seen_polymarket_at}, + where=( + Wallet.first_seen_polymarket_at.is_(None) + | (Wallet.first_seen_polymarket_at > insert(Wallet).excluded.first_seen_polymarket_at) + ), + ) + ) + await session.execute(stmt) + await session.commit() + + log.info("data_api_upserted", market=label, trades=total, wallets=len(wallet_set)) + return total + + +# --------------------------------------------------------------------------- +# Per-market orchestration +# --------------------------------------------------------------------------- + +async def process_market(market: dict) -> dict: + market_id = market["market_id"] + label = market["label"] + + yes_token = await _get_yes_token(market_id) + if not yes_token: + raise RuntimeError(f"YES token not found for {market_id}") + + print(f"\n{'='*68}") + print(f" {label}") + print(f" market_id : {market_id}") + print(f" created : {datetime.fromtimestamp(market['created_at_ts'], tz=UTC).isoformat()}") + print(f" resolved : {datetime.fromtimestamp(market['resolved_at_ts'], tz=UTC).isoformat()}") + + started_at = datetime.now(UTC) + async with AsyncSessionLocal() as session: + row = await session.execute( + text( + "INSERT INTO data_collection_runs " + "(collector, started_at, status, target, run_metadata) " + "VALUES ('polygonscan_logs', :s, 'running', :t, CAST(:m AS jsonb)) " + "RETURNING id" + ), + { + "s": started_at, + "t": market_id, + "m": json.dumps({"label": label, "approach": "clob_prices+data_api_trades", + "yes_token": yes_token}), + }, + ) + await session.commit() + run_id = row.scalar_one() + + status = "failed" + n_prices = 0 + n_trades = 0 + error_msg = None + + try: + async with httpx.AsyncClient(timeout=60.0, http2=True) as client: + print(f" Collecting CLOB price history...") + n_prices = await collect_clob_prices( + client, market_id, yes_token, label, start_ts=market["created_at_ts"] + ) + print(f" Collecting data-api trades (max 3000)...") + n_trades = await collect_data_api_trades(client, market_id, label) + + print(f" prices written : {n_prices:,}") + print(f" trades written : {n_trades:,}") + status = "success" + + except Exception as exc: + error_msg = str(exc) + print(f" ERROR: {error_msg}", file=sys.stderr) + import traceback + traceback.print_exc() + raise + finally: + finished_at = datetime.now(UTC) + elapsed = (finished_at - started_at).total_seconds() + print(f" elapsed : {elapsed:.1f}s status={status}") + async with AsyncSessionLocal() as session: + await session.execute( + text( + "UPDATE data_collection_runs SET " + "finished_at=:f, status=:s, n_records_written=:n, error_message=:e " + "WHERE id=:id" + ), + {"f": finished_at, "s": status, "n": n_prices + n_trades, "e": error_msg, "id": run_id}, + ) + await session.commit() + + return { + "label": label, + "market_id": market_id, + "n_prices": n_prices, + "n_trades": n_trades, + "status": status, + } + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +async def main() -> None: + print("Task 02H Phase 3 — CLOB prices + data-api trades recovery") + print(f"Started : {datetime.now(UTC).isoformat()}") + + results = [] + for market in MARKETS: + result = await process_market(market) + results.append(result) + + print(f"\n{'='*68}") + print("PHASE 3 SUMMARY") + print(f"{'='*68}") + for r in results: + print( + f" {r['label'][:50]:50s} " + f"prices={r['n_prices']:>6,} " + f"trades={r['n_trades']:>5,} " + f"{r['status']}" + ) + + all_ok = all(r["status"] == "success" for r in results) + print(f"\n Overall: {'SUCCESS' if all_ok else 'PARTIAL'}") + print(f"Finished: {datetime.now(UTC).isoformat()}") + + # Return results for report generation + return results + + +if __name__ == "__main__": + asyncio.run(main())