Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 77 additions & 63 deletions fflow/collectors/subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

_PAGE_SIZE = 1000

# API note (verified 2026-04-26): The Polymarket subgraph exposes `enrichedOrderFilleds`
# (not `orderFilleds`). market.id is the YES token decimal ID. size is raw int (divide by
# 1e6 for shares). price is already a 0-1 decimal. side is "Buy"/"Sell" relative to the token.
_TRADES_QUERY = gql("""
query Trades($market: String!, $lastId: String!, $first: Int!) {
orderFilleds(
enrichedOrderFilleds(
where: { market: $market, id_gt: $lastId }
first: $first
orderBy: id
Expand All @@ -27,16 +30,13 @@
id
timestamp
transactionHash
maker
taker
makerAssetId
takerAssetId
makerAmountFilled
takerAmountFilled
fee
market {
id
}
orderHash
maker { id }
taker { id }
market { id }
side
size
price
}
}
""")
Expand Down Expand Up @@ -91,30 +91,48 @@ def _make_client(self) -> Client:
headers = {"Accept": "application/json"}
if settings.thegraph_api_key:
headers["Authorization"] = f"Bearer {settings.thegraph_api_key}"
transport = HTTPXAsyncTransport(url=settings.subgraph_url, headers=headers)
return Client(transport=transport, fetch_schema_from_transport=False)
transport = HTTPXAsyncTransport(
url=settings.subgraph_url,
headers=headers,
timeout=60.0,
)
return Client(transport=transport, fetch_schema_from_transport=False, execute_timeout=60)

async def _fetch_trades(
self,
market_id: str,
yes_token: str,
from_ts: datetime | None,
) -> list[dict]:
import asyncio as _asyncio
from gql.transport.exceptions import TransportConnectionFailed

from_unix = int(from_ts.timestamp()) if from_ts else 0
all_trades: list[dict] = []
last_id = ""

async with self._make_client() as client:
while True:
result = await client.execute(
_TRADES_QUERY,
variable_values={
"market": market_id.lower(),
"lastId": last_id,
"first": _PAGE_SIZE,
},
)
page = result.get("orderFilleds", [])
for attempt in range(3):
try:
result = await client.execute(
_TRADES_QUERY,
variable_values={
"market": yes_token,
"lastId": last_id,
"first": _PAGE_SIZE,
},
)
break
except (TransportConnectionFailed, Exception) as exc:
from gql.transport.exceptions import TransportQueryError
if isinstance(exc, TransportQueryError) and "bad indexers" in str(exc):
raise # indexer unavailable — no point retrying
if attempt == 2:
raise
await _asyncio.sleep(2 ** attempt)

page = result.get("enrichedOrderFilleds", [])
if not page:
break

Expand Down Expand Up @@ -142,31 +160,20 @@ async def _upsert_trades(

for t in raw_trades:
tx = t.get("transactionHash", "")
# log_index from id (format: txHash-logIndex or sequential id)
raw_id = t.get("id", "")
log_idx = _parse_log_index(raw_id)
ts = datetime.fromtimestamp(int(t["timestamp"]), tz=UTC)
taker = (t.get("taker") or "").lower()
maker = (t.get("maker") or "").lower() or None

taker_asset = t.get("takerAssetId", "")
maker_amount = int(t.get("makerAmountFilled", 0))
taker_amount = int(t.get("takerAmountFilled", 0))

# taker receives YES token → BUY; taker gives YES token → SELL
if str(taker_asset) == yes_token:
side = "BUY"
outcome_index = 1
size_shares = taker_amount
usdc_paid = maker_amount
else:
side = "SELL"
outcome_index = 1
size_shares = maker_amount
usdc_paid = taker_amount

price_val = (usdc_paid / size_shares / 1e6) if size_shares else 0
notional = usdc_paid / 1e6
taker = ((t.get("taker") or {}).get("id") or "").lower()
maker = ((t.get("maker") or {}).get("id") or "").lower() or None

raw_side = t.get("side", "Buy")
side = "BUY" if raw_side.lower() == "buy" else "SELL"
outcome_index = 1 # all enrichedOrderFilleds filtered by YES token

raw_size = int(t.get("size", 0))
size_shares = raw_size / 1e6
price_val = float(t.get("price", 0))
notional = size_shares * price_val

trade_rows.append({
"market_id": market_id,
Expand All @@ -177,7 +184,7 @@ async def _upsert_trades(
"maker_address": maker,
"side": side,
"outcome_index": outcome_index,
"size_shares": str(size_shares / 1e6),
"size_shares": str(round(size_shares, 6)),
"price": str(round(price_val, 6)),
"notional_usdc": str(round(notional, 6)),
"raw_event": t,
Expand All @@ -202,7 +209,8 @@ async def _upsert_trades(
total += len(chunk)
await session.commit()

# upsert wallets
# upsert wallets — chunked (PostgreSQL param limit: 32767; 3 cols × 10000 = 30000)
wallet_chunk_size = 10_000
if wallet_set:
wallet_rows = [
{
Expand All @@ -212,31 +220,37 @@ async def _upsert_trades(
}
for addr, ts in wallet_set.items()
]
wallet_stmt = (
insert(Wallet)
.values(wallet_rows)
.on_conflict_do_update(
index_elements=["address"],
set_={
"first_seen_polymarket_at": insert(Wallet).excluded.first_seen_polymarket_at,
},
where=(
Wallet.first_seen_polymarket_at.is_(None)
| (
Wallet.first_seen_polymarket_at
> insert(Wallet).excluded.first_seen_polymarket_at
)
),
for i in range(0, len(wallet_rows), wallet_chunk_size):
chunk = wallet_rows[i : i + wallet_chunk_size]
wallet_stmt = (
insert(Wallet)
.values(chunk)
.on_conflict_do_update(
index_elements=["address"],
set_={
"first_seen_polymarket_at": insert(Wallet).excluded.first_seen_polymarket_at,
},
where=(
Wallet.first_seen_polymarket_at.is_(None)
| (
Wallet.first_seen_polymarket_at
> insert(Wallet).excluded.first_seen_polymarket_at
)
),
)
)
)
await session.execute(wallet_stmt)
await session.execute(wallet_stmt)
await session.commit()

log.info("subgraph_upserted", market=market_id, trades=total, wallets=len(wallet_set))
return total


def _parse_log_index(raw_id: str) -> int:
# enrichedOrderFilleds id format: txHash_orderHash — hash the orderHash part
if "_" in raw_id:
order_hash_part = raw_id.split("_", 1)[-1]
return int(order_hash_part, 16) % 2**31 if order_hash_part.startswith("0x") else hash(order_hash_part) % 2**31
if "-" in raw_id:
try:
return int(raw_id.split("-")[-1])
Expand Down
10 changes: 7 additions & 3 deletions fflow/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pydantic import Field, AliasChoices
from pydantic_settings import BaseSettings, SettingsConfigDict


Expand All @@ -20,8 +21,11 @@ class Settings(BaseSettings):
polygonscan_api_key: str | None = None
polygonscan_url: str = "https://api.polygonscan.com/api"

# Anthropic (Tier 3 LLM)
anthropic_api_key: str | None = None
# Anthropic (Tier 3 LLM) — accepts FFLOW_ANTHROPIC_API_KEY or ANTHROPIC_API_KEY
anthropic_api_key: str | None = Field(
default=None,
validation_alias=AliasChoices("FFLOW_ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY"),
)

# UMA / Polygon RPC
polygon_rpc_url: str = "https://polygon-rpc.com"
Expand All @@ -35,7 +39,7 @@ class Settings(BaseSettings):
log_level: str = "INFO"
log_json: bool = False

model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env")
model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env", extra="ignore")


settings = Settings()
134 changes: 134 additions & 0 deletions reports/POLYGONSCAN_COLLECTION_STATUS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Polygonscan Collection — Status Report

**Last updated:** 2026-04-27
**Branch:** task02h/ffic-trade-backfill

---

## Current State

| Metric | Value |
|---|---|
| Total wallets in DB | 815,304 |
| Enriched (`first_seen_chain_at` set) | 2,449 (0.3%) |
| With `funding_sources` data | 4,706 (0.6%) |
| Attempted but returned empty | 810,598 (99.4%) |
| **Stale wallets remaining** | **812,855** |
| Estimated time to complete at 4 req/s | **56.4 hours** |

**Progress snapshot:** 2,449 wallets with confirmed on-chain data. The collection
has barely started relative to the full corpus (< 1% enriched).

---

## Run History

| Run ID | Date | Duration | Status | n_written | Error |
|---|---|---|---|---|---|
| 11714 | 2026-04-27 05:28 | 2s | failed | 0 | Deprecated V1 endpoint |
| 11715 | 2026-04-27 05:29 | 1s | failed | 0 | Deprecated V1 endpoint |
| 11716 | 2026-04-27 05:29 | 34s | failed | 0 | DNS resolution failure |
| 11717 | 2026-04-27 06:15 | — | ~~running~~ → failed | 0 | Process died without cleanup *(closed manually)* |
| 11718 | 2026-04-27 06:23 | 78s | failed | 0 | Rate limit: Max 3 req/s |
| 11719 | 2026-04-27 06:24 | — | ~~running~~ → failed | 0 | Process died without cleanup *(closed manually)* |
| 11720 | 2026-04-27 06:25 | 14,352s (~4h) | failed | 0 | Server timeout / too busy |

**All 7 runs failed.** Two stale `running` records (ids 11717, 11719) were left open by
crashed processes — closed manually on 2026-04-27.

---

## Failure Taxonomy

### 1. Deprecated V1 endpoint (runs 11714–11715, 3s)
- **Error:** `NOTOK | You are using a deprecated V1 endpoint, switch to Etherscan V2`
- **Root cause:** `polygonscan_url` in config was `https://api.polygonscan.com/api` — deprecated.
- **Fix already applied:** config.py now uses `https://api.etherscan.io/v2/api` (the V2 unified endpoint). This fix is on `task02d+` branches but was re-applied on `task02h` via config.py update.

### 2. DNS failure (run 11716, 34s)
- **Error:** `[Errno 8] nodename nor servname provided, or not known`
- **Root cause:** Transient network failure — no code issue.

### 3. Rate limit exceeded (run 11718, 78s)
- **Error:** `Max calls per sec rate limit reached (3/sec)`
- **Root cause:** The `.env` key is a free-tier key capped at **3 req/s** (not 5 as the code assumes). The `_RATE_LIMIT` constant in `polygonscan.py` is set to 4 req/s, which exceeds the actual key limit.
- **Fix needed:** Set `_RATE_LIMIT = 2` (conservative, below the 3/sec cap) to avoid triggering rate errors.

### 4. Server timeout (run 11720, ~4h)
- **Error:** `Unexpected error, timeout or server too busy. Please try again`
- **Root cause:** The `all_stale` batch queried 812,855 wallets in a single long-running process. After ~4 hours the Etherscan/Polygonscan API returned a server error that wasn't retried gracefully. The collector has no checkpoint/resume — on failure the entire run is lost.
- **Fix needed:** Add a checkpoint mechanism: persist progress (last completed wallet address) to `data_collection_runs.run_metadata` every N wallets, and resume from that address on restart.

---

## Last Enriched Wallets (checkpoint for resume)

These are the 5 most recently enriched wallets (by `last_refreshed_at`), usable as
resume anchors if the collector is restarted alphabetically or by `first_seen_polymarket_at`:

| Wallet address | last_refreshed_at |
|---|---|
| `0x3801c747ac8ae7fa77514dd852e81f44376883dd` | 2026-04-27 ~06:18 UTC |
| `0xec8d797a40d5990d00e7468f347c732d4e3b453d` | 2026-04-27 ~06:18 UTC |
| `0x1b73480fbf1bc450991d93f687570ccdf6b545d9` | 2026-04-27 ~06:18 UTC |
| `0xd713f0d2761a77f7834dcbbbc8a25abc319daf79` | 2026-04-27 ~06:18 UTC |
| `0x4f5e6216719c7347caf4dc42cf49013ce4671773` | 2026-04-27 ~06:18 UTC |

The `_get_stale_wallets` query does **not** order by any resumable key — it returns all
wallets where `first_seen_chain_at IS NULL OR last_refreshed_at < cutoff`. Without a
checkpoint, a resume re-queries all 812,855 wallets from scratch, but `ON CONFLICT DO
UPDATE` on `address` means already-enriched wallets are safely overwritten with the same
data. The 2,449 already-enriched wallets will not be re-queried (they have
`first_seen_chain_at IS NOT NULL` and a recent `last_refreshed_at`).

---

## Blockers Before Next Run

Two fixes required before the next `--all-stale` run will succeed:

### Fix A — Rate limit (required)
In `fflow/collectors/polygonscan.py`, change:
```python
_RATE_LIMIT = 4 # req/s ← exceeds actual free-tier cap of 3/sec
```
to:
```python
_RATE_LIMIT = 2 # req/s ← conservative under 3/sec free-tier cap
```

### Fix B — Checkpoint/resume (strongly recommended for full corpus)
At 2 req/s the full 812,855-wallet run would take **113 hours** (~4.7 days). Without
checkpoint/resume, any interruption (network blip, server timeout, process kill) loses
all progress and restarts from scratch.

Minimal implementation: every 1,000 wallets, write `{"last_address": addr, "n_done": n}`
to `data_collection_runs.run_metadata` for the current run. On restart, read the metadata
and pass `address > last_address` to `_get_stale_wallets`.

### Fix C — API key upgrade (optional but recommended)
A paid Etherscan/Polygonscan key unlocks 10+ req/s (vs 3/sec free). At 10 req/s the
full corpus would take ~22 hours (manageable in a single overnight run). Cost: ~$10-20/month.

---

## Recommendation for Next Run

Priority order:
1. Apply Fix A (rate limit) — 5 min change, prevents immediate failure.
2. Apply Fix B (checkpoint) — prevents 4-hour loss on next server timeout.
3. Run `fflow collect polygonscan --all-stale` in a tmux/screen session or as a background service.
4. Monitor via `SELECT COUNT(*) FROM wallets WHERE first_seen_chain_at IS NOT NULL` — should increment steadily.

At 2 req/s without Fix B, expect the run to take 4+ days. With Fix B + checkpoint, interruptions become recoverable.

---

## Why This Matters for Task 03

The polygonscan `funding_sources` field identifies wallet provenance — whether a trader
funded from a CEX, another wallet cluster, or fresh on-chain. This is a secondary signal
for insider-trading detection (well-funded wallets with pre-news positions are stronger
candidates than retail wallets). The 2,449 currently enriched wallets are a small fraction
of the relevant population. Full enrichment is not a blocker for Task 03 ILS methodology,
but is needed for the full wallet-level analysis in Task 04+.
Loading
Loading