Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f84f42c
chore(scripts): targeted backfill script for 8 documented insider cases
MaksymDS Apr 26, 2026
cd6d878
docs(reports): update Phase 3B progress — 189/10602 markets, 1.4M tra…
MaksymDS Apr 26, 2026
411b6c0
docs(reports): update Phase 3B progress — 351/10602 markets, 2.6M tra…
MaksymDS Apr 26, 2026
68ac02a
docs(reports): update Phase 3B progress — 530/10602 markets, 3.6M tra…
MaksymDS Apr 26, 2026
2fc6a5d
docs(reports): update Phase 3B progress — 737/10602 markets, 4.6M tra…
MaksymDS Apr 26, 2026
58b6638
docs(reports): update Phase 3B progress — 952/10602 markets, 5.6M tra…
MaksymDS Apr 26, 2026
9db00a0
docs(reports): update Phase 3B progress — 1172/10602 markets, 6.4M tr…
MaksymDS Apr 26, 2026
35c093c
docs(reports): update Phase 3B progress — 1398/10602 markets, 7.3M tr…
MaksymDS Apr 26, 2026
fe9a04a
docs(reports): update Phase 3B progress — 1657/10602 markets, 8.1M tr…
MaksymDS Apr 26, 2026
21bca77
docs(reports): update Phase 3B progress — 1997/10602 markets, 9M trad…
MaksymDS Apr 26, 2026
0e92c5c
docs(reports): update Phase 3B progress — 2471/10602 markets, 10M tra…
MaksymDS Apr 26, 2026
aba2a95
docs(reports): update Phase 3B progress — 3060/10602 markets, 11.3M t…
MaksymDS Apr 26, 2026
4024404
docs(reports): update Phase 3B progress — 3749/10602 markets, 12.4M t…
MaksymDS Apr 26, 2026
f2ca085
docs(reports): update Phase 3B progress — 4596/10602 markets, 13.5M t…
MaksymDS Apr 27, 2026
b15e835
docs(reports): update Phase 3B progress — 5553/10602 markets, 14.6M t…
MaksymDS Apr 27, 2026
aefdbb5
docs(reports): update Phase 3B progress — 6746/10602 markets, 15.7M t…
MaksymDS Apr 27, 2026
7040a6b
docs(reports): update Phase 3B progress — 8148/10602 markets, 16.6M t…
MaksymDS Apr 27, 2026
c783a79
docs(reports): Phase 3B near-complete — 9786/10602 markets, 17.5M tra…
MaksymDS Apr 27, 2026
a1fb427
feat: fixture generator script + Phase 3B final results
MaksymDS Apr 27, 2026
f17807e
fix: polygonscan collector migrated to Etherscan API V2
MaksymDS Apr 27, 2026
ad7c21a
feat: polygonscan --min-trades filter + fixture progress logging
MaksymDS Apr 27, 2026
064c3d9
diagnose: CLOB coverage diagnostic + TASK_02C contradiction resolved
MaksymDS Apr 27, 2026
3f2f76d
feat(phase1): trade-based price reconstruction + price_source in pipe…
MaksymDS Apr 27, 2026
42f7db4
feat(phase2): UMA collector — fix subgraph auth, add RPC eth_getLogs …
MaksymDS Apr 27, 2026
878f0aa
feat(phase3): news tier1-batch + seed-proxy CLI commands
MaksymDS Apr 27, 2026
b0037c0
fix(pipeline): handle t_open/t_news edge cases + NUMERIC overflow
MaksymDS Apr 27, 2026
90cf881
docs(phase4): ILS results report for 24 FFICD validation markets
MaksymDS Apr 27, 2026
a8c4132
chore: fixture script rewrite + phase0 fixture + charter v0.3
MaksymDS Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ dist/
.ruff_cache/
*.log
.DS_Store
.claude/
memory/
391 changes: 391 additions & 0 deletions CHARTER_v0.3.md

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions alembic/versions/0003_price_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Add price_source column to market_labels

Revision ID: 0003
Revises: 0002
Create Date: 2026-04-27
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

revision: str = "0003"
down_revision: Union[str, None] = "0002"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column(
"market_labels",
sa.Column("price_source", sa.String(20), nullable=True),
)


def downgrade() -> None:
op.drop_column("market_labels", "price_source")
33 changes: 33 additions & 0 deletions alembic/versions/0004_fix_pre_news_max_jump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Fix pre_news_max_jump precision — USDC amount, not price

Revision ID: 0004
Revises: 0003
Create Date: 2026-04-27
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

revision: str = "0004"
down_revision: Union[str, None] = "0003"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.alter_column(
"market_labels",
"pre_news_max_jump",
type_=sa.Numeric(20, 6),
existing_nullable=True,
)


def downgrade() -> None:
op.alter_column(
"market_labels",
"pre_news_max_jump",
type_=sa.Numeric(8, 6),
existing_nullable=True,
)
50 changes: 50 additions & 0 deletions data/fixture_phase0.jsonl

Large diffs are not rendered by default.

153 changes: 153 additions & 0 deletions fflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def collect_polygonscan(
wallet: Annotated[Optional[str], typer.Option(help="Wallet address (0x...)")] = None,
all_stale: Annotated[bool, typer.Option("--all-stale")] = False,
max_age_days: Annotated[int, typer.Option(help="Staleness threshold in days")] = 30,
min_trades: Annotated[int, typer.Option(help="Only refresh wallets with at least N trades")] = 0,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Fetch on-chain wallet data from Polygonscan."""
Expand All @@ -313,6 +314,7 @@ def collect_polygonscan(
wallet=wallet,
all_stale=all_stale,
max_age_days=max_age_days,
min_trades=min_trades,
dry_run=dry_run,
)
)
Expand Down Expand Up @@ -598,6 +600,157 @@ async def _run() -> None:
asyncio.run(_run())


@news_app.command("tier1-batch")
def news_tier1_batch(
limit: Annotated[int, typer.Option(help="Max markets to process")] = 500,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Batch Tier 1: extract T_news from resolution_evidence_url for all eligible markets."""
from fflow.db import AsyncSessionLocal
from fflow.models import Market, NewsTimestamp
from fflow.news.proposer_url import fetch_proposer_timestamp
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert

async def _run() -> None:
async with AsyncSessionLocal() as session:
already_done_sq = select(NewsTimestamp.market_id).where(NewsTimestamp.tier == 1)
stmt = (
select(Market.id, Market.resolution_evidence_url)
.where(Market.resolution_evidence_url.isnot(None))
.where(Market.id.notin_(already_done_sq))
.limit(limit)
)
rows = (await session.execute(stmt)).all()

typer.echo(f"tier1-batch: {len(rows)} markets to process")
ok = skip = fail = 0
for market_id, url in rows:
try:
result = await fetch_proposer_timestamp(url)
except Exception as exc:
fail += 1
log.warning("tier1_batch_error", market=market_id, error=str(exc))
continue

if result is None:
skip += 1
continue

if not dry_run:
async with AsyncSessionLocal() as session:
stmt = (
pg_insert(NewsTimestamp)
.values(
market_id=market_id,
t_news=result.t_news,
tier=1,
source_url=result.source_url,
confidence=result.confidence,
recovered_at=datetime.now(UTC),
)
.on_conflict_do_update(
index_elements=["market_id"],
set_={
"t_news": result.t_news,
"tier": 1,
"source_url": result.source_url,
"confidence": result.confidence,
},
)
)
await session.execute(stmt)
await session.commit()
ok += 1
if ok % 50 == 0:
typer.echo(f" progress: ok={ok} skip={skip} fail={fail}")

typer.echo(f"tier1-batch done: ok={ok} skip={skip} fail={fail}")

asyncio.run(_run())


@news_app.command("seed-proxy")
def news_seed_proxy(
market_ids: Annotated[Optional[str], typer.Option(help="Comma-separated market IDs")] = None,
category: Annotated[Optional[str], typer.Option(help="Seed all markets in this category_fflow")] = None,
offset_days: Annotated[int, typer.Option(help="Days before end_date for proxy T_news")] = 1,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Seed synthetic T_news proxy from end_date - offset_days (tier=2, confidence=0.50).

Used for markets resolved by Polymarket admin (no UMA evidence URL) where
the outcome was publicly knowable close to end_date.
"""
from fflow.db import AsyncSessionLocal
from fflow.models import Market, NewsTimestamp
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert

async def _run() -> None:
if market_ids:
ids = [m.strip() for m in market_ids.split(",") if m.strip()]
elif category:
async with AsyncSessionLocal() as session:
rows = (
await session.execute(
select(Market.id).where(Market.category_fflow == category)
.where(Market.end_date.isnot(None))
.where(Market.resolved_at.isnot(None))
)
).scalars().all()
ids = list(rows)
else:
typer.echo("Provide --market-ids or --category", err=True)
raise typer.Exit(1)

typer.echo(f"seed-proxy: {len(ids)} markets, offset={offset_days}d")
ok = skip = 0
async with AsyncSessionLocal() as session:
for mid in ids:
mkt = await session.get(Market, mid)
if mkt is None or mkt.end_date is None:
skip += 1
continue

t_news = mkt.end_date - timedelta(days=offset_days)
notes = f"proxy:end_date-{offset_days}d"

if not dry_run:
stmt = (
pg_insert(NewsTimestamp)
.values(
market_id=mid,
t_news=t_news,
tier=2,
source_url=None,
confidence=0.50,
notes=notes,
recovered_at=datetime.now(UTC),
)
.on_conflict_do_update(
index_elements=["market_id"],
set_={
"t_news": t_news,
"tier": 2,
"confidence": 0.50,
"notes": notes,
},
)
)
await session.execute(stmt)
else:
typer.echo(f" [dry-run] {mid[:12]}… t_news={t_news.isoformat()}")
ok += 1

if not dry_run:
await session.commit()

typer.echo(f"seed-proxy done: ok={ok} skip={skip}")

asyncio.run(_run())


# ---------------------------------------------------------------------------
# score commands
# ---------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions fflow/collectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def __init__(self, base_url: str = "", headers: dict | None = None) -> None:
async def get(self, url: str, **kwargs) -> httpx.Response:
return await self._request("GET", url, **kwargs)

async def post(self, url: str, **kwargs) -> httpx.Response:
return await self._request("POST", url, **kwargs)

async def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
last_exc: Exception | None = None
for attempt in range(settings.http_max_retries + 1):
Expand Down
44 changes: 34 additions & 10 deletions fflow/collectors/polygonscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def run(
wallet: str | None = None,
all_stale: bool = False,
max_age_days: int = 30,
min_trades: int = 0,
dry_run: bool = False,
) -> CollectorResult:
addr = (wallet or target or "").lower()
Expand All @@ -44,14 +45,17 @@ async def run(
run_id = await self._record_run_start(session, result)
try:
if all_stale:
wallets = await self._get_stale_wallets(session, max_age_days)
wallets = await self._get_stale_wallets(session, max_age_days, min_trades)
else:
wallets = [addr] if addr else []

total = 0
for w_addr in wallets:
n_wallets = len(wallets)
for i, w_addr in enumerate(wallets, 1):
n = await self._process_wallet(session, w_addr, dry_run)
total += n
if i % 100 == 0:
log.info("polygonscan_batch_progress", done=i, total=n_wallets, written=total)

result.n_written = total
result.status = "success"
Expand All @@ -65,16 +69,36 @@ async def run(
await self._record_run_end(session, run_id, result)
return result

async def _get_stale_wallets(self, session, max_age_days: int) -> list[str]:
async def _get_stale_wallets(self, session, max_age_days: int, min_trades: int = 0) -> list[str]:
from datetime import timedelta
from sqlalchemy import text as sa_text
cutoff = datetime.now(UTC) - timedelta(days=max_age_days)
rows = await session.execute(
select(Wallet.address).where(
(Wallet.last_refreshed_at < cutoff)
| Wallet.first_seen_chain_at.is_(None)
if min_trades > 0:
rows = await session.execute(
sa_text("""
SELECT w.address
FROM wallets w
JOIN (
SELECT taker_address, COUNT(*) AS tc
FROM trades
GROUP BY taker_address
) t ON t.taker_address = w.address
WHERE (w.last_refreshed_at < :cutoff OR w.first_seen_chain_at IS NULL)
AND t.tc >= :min_trades
ORDER BY t.tc DESC
"""),
{"cutoff": cutoff, "min_trades": min_trades},
)
)
return [r[0] for r in rows.all()]
else:
rows = await session.execute(
select(Wallet.address).where(
(Wallet.last_refreshed_at < cutoff)
| Wallet.first_seen_chain_at.is_(None)
)
)
result = [r[0] for r in rows.all()]
log.info("polygonscan_wallets_selected", count=len(result), min_trades=min_trades)
return result

async def _rate_limit(self) -> None:
async with self._token_bucket_lock:
Expand All @@ -87,7 +111,7 @@ async def _rate_limit(self) -> None:

async def _get(self, client: RetryableHTTPClient, params: dict) -> dict:
await self._rate_limit()
resp = await client.get(settings.polygonscan_url, params=params)
resp = await client.get(settings.polygonscan_url, params={"chainid": 137, **params})
resp.raise_for_status()
data = resp.json()
if data.get("status") == "0" and data.get("message") != "No transactions found":
Expand Down
Loading
Loading