Skip to content
140 changes: 127 additions & 13 deletions fflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,28 @@ def collect_gamma(
categories: Annotated[
Optional[str], typer.Option(help="Comma-separated Polymarket tag names")
] = None,
closed: Annotated[bool, typer.Option("--closed", help="Fetch historical resolved markets")] = False,
end_date_min: Annotated[Optional[str], typer.Option(help="YYYY-MM-DD min end date (with --closed)")] = None,
end_date_max: Annotated[Optional[str], typer.Option(help="YYYY-MM-DD max end date (with --closed)")] = None,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Fetch market metadata from Polymarket Gamma API."""
"""Fetch market metadata from Polymarket Gamma API.

Normal mode: fetches active + recently closed markets ordered by createdAt.
Historical mode (--closed): fetches resolved markets by end_date range.
"""
from fflow.collectors.gamma import GammaCollector

since_dt = _parse_date(since)
cats = [c.strip() for c in categories.split(",")] if categories else []
result = asyncio.run(GammaCollector().run(since=since_dt, categories=cats, dry_run=dry_run))
result = asyncio.run(GammaCollector().run(
since=since_dt,
categories=cats,
closed=closed,
end_date_min=end_date_min,
end_date_max=end_date_max,
dry_run=dry_run,
))
typer.echo(f"gamma: {result.status}, n={result.n_written}")
if result.error:
typer.echo(f"error: {result.error}", err=True)
Expand Down Expand Up @@ -89,25 +103,125 @@ def collect_clob(

@collect_app.command("subgraph")
def collect_subgraph(
market: Annotated[str, typer.Option(help="Market condition ID (0x...)")],
market: Annotated[Optional[str], typer.Option(help="Market condition ID (0x...)")] = None,
from_ts: Annotated[Optional[str], typer.Option(help="ISO datetime")] = None,
all_resolved: Annotated[bool, typer.Option("--all-resolved", help="Run for all resolved markets")] = False,
min_volume: Annotated[float, typer.Option(help="Min volume_total_usdc filter (batch only)")] = 50000.0,
max_volume: Annotated[Optional[float], typer.Option(help="Max volume_total_usdc filter (batch only)")] = None,
limit: Annotated[Optional[int], typer.Option(help="Max markets to process in batch mode")] = None,
categories: Annotated[Optional[str], typer.Option(help="Comma-separated category_fflow filter (batch only)")] = None,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Fetch full trade log from Polymarket subgraph."""
from fflow.collectors.subgraph import SubgraphCollector

result = asyncio.run(
SubgraphCollector().run(
market_id=market,
from_ts=_parse_dt(from_ts),
dry_run=dry_run,
)
)
typer.echo(f"subgraph: {result.status}, n={result.n_written}")
if result.error:
typer.echo(f"error: {result.error}", err=True)
if not market and not all_resolved:
typer.echo("Provide --market or --all-resolved", err=True)
raise typer.Exit(1)

if all_resolved:
cats = [c.strip() for c in categories.split(",")] if categories else None
asyncio.run(_subgraph_batch(min_volume=min_volume, max_volume=max_volume, limit=limit, categories=cats, dry_run=dry_run))
else:
result = asyncio.run(
SubgraphCollector().run(
market_id=market,
from_ts=_parse_dt(from_ts),
dry_run=dry_run,
)
)
typer.echo(f"subgraph: {result.status}, n={result.n_written}")
if result.error:
typer.echo(f"error: {result.error}", err=True)
raise typer.Exit(1)


async def _subgraph_batch(
min_volume: float,
dry_run: bool,
max_volume: float | None = None,
limit: int | None = None,
categories: list[str] | None = None,
) -> None:
import json
import pathlib
from fflow.collectors.subgraph import SubgraphCollector
from fflow.db import AsyncSessionLocal
from fflow.models import Market, Trade
from sqlalchemy import select, func

async with AsyncSessionLocal() as session:
stmt = (
select(Market.id, Market.volume_total_usdc, Market.resolved_at)
.where(Market.resolved_at.isnot(None))
.where(Market.volume_total_usdc >= min_volume)
.order_by(Market.volume_total_usdc.desc())
)
if max_volume:
stmt = stmt.where(Market.volume_total_usdc <= max_volume)
if categories:
stmt = stmt.where(Market.category_fflow.in_(categories))
if limit:
stmt = stmt.limit(limit)
rows = (await session.execute(stmt)).all()

total = len(rows)
log.info("subgraph_batch_start", total=total, min_volume=min_volume, max_volume=max_volume, limit=limit, categories=categories)
if max_volume:
typer.echo(f"subgraph batch: {total} markets vol=[${min_volume:,.0f}, ${max_volume:,.0f}]")
else:
typer.echo(f"subgraph batch: {total} markets vol>=${min_volume:,.0f}" + (f" (limit={limit})" if limit else "") + (f" categories={categories}" if categories else ""))

progress_path = pathlib.Path("logs/batch_progress.jsonl")
progress_path.parent.mkdir(parents=True, exist_ok=True)

collector = SubgraphCollector()
stale_cutoff = datetime.now(UTC) - timedelta(days=1)
ok = fail = skipped = already_done = 0

for mid, vol, resolved_at in rows:
# Idempotency: skip markets resolved >1 day ago that already have trades
resolved_is_old = resolved_at and resolved_at < stale_cutoff
if resolved_is_old and not dry_run:
async with AsyncSessionLocal() as session:
existing = await session.scalar(
select(func.count()).select_from(Trade).where(Trade.market_id == mid)
)
if existing and existing > 0:
already_done += 1
log.info("subgraph_batch_skip", market=mid, existing_trades=existing)
_write_progress(progress_path, mid, "already_done", existing, float(vol or 0))
continue

try:
r = await collector.run(market_id=mid, dry_run=dry_run)
if r.n_written and r.n_written > 0:
ok += 1
else:
skipped += 1
log.info("subgraph_batch_market", market=mid, vol=float(vol or 0),
status=r.status, n=r.n_written)
_write_progress(progress_path, mid, r.status, r.n_written or 0, float(vol or 0))
except Exception as exc:
fail += 1
log.error("subgraph_batch_error", market=mid, error=str(exc))
_write_progress(progress_path, mid, "failed", 0, float(vol or 0))

typer.echo(f"subgraph batch done: ok={ok} skipped(0 trades)={skipped} already_done={already_done} fail={fail}")


def _write_progress(path: "pathlib.Path", market_id: str, status: str, trades: int, vol: float) -> None:
import json, pathlib
entry = {
"market_id": market_id,
"status": status,
"trades_count": trades,
"volume": vol,
"timestamp": datetime.now(UTC).isoformat(),
}
with open(path, "a") as f:
f.write(json.dumps(entry) + "\n")


# ---------------------------------------------------------------------------
# collect uma
Expand Down
91 changes: 89 additions & 2 deletions fflow/collectors/gamma.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
Category information is derived from the event title (events[0].title) and
the filter tag used during collection, stored together in category_raw.
clobTokenIds is a JSON-encoded string and must be parsed with json.loads().

Historical backfill mode (--closed):
Use ?closed=true&end_date_min=YYYY-MM-DD&end_date_max=YYYY-MM-DD to sweep
through historical resolved markets month-by-month.
outcomePrices: ["1","0"] = YES won (outcome=1); ["0","1"] = NO won (outcome=0).
"""

import json
Expand All @@ -30,13 +35,19 @@ async def run(
target: str | None = None,
since: datetime | None = None,
categories: list[str] | None = None,
closed: bool = False,
end_date_min: str | None = None,
end_date_max: str | None = None,
dry_run: bool = False,
) -> CollectorResult:
result = self._start_result(target)
async with AsyncSessionLocal() as session:
run_id = await self._record_run_start(session, result)
try:
markets = await self._fetch_markets(since=since, categories=categories)
if closed:
markets = await self._fetch_closed(end_date_min=end_date_min, end_date_max=end_date_max)
else:
markets = await self._fetch_markets(since=since, categories=categories)
if not dry_run:
result.n_written = await self._upsert_markets(session, markets)
else:
Expand Down Expand Up @@ -73,6 +84,55 @@ async def _fetch_markets(
log.info("gamma_fetched", n=len(all_markets))
return all_markets

async def _fetch_closed(
self,
end_date_min: str | None = None,
end_date_max: str | None = None,
) -> list[dict]:
"""Fetch historical resolved markets using closed=true + end_date range."""
all_markets: list[dict] = []
async with RetryableHTTPClient(base_url=settings.gamma_api_url) as client:
all_markets = await self._paginate_closed(
client, end_date_min=end_date_min, end_date_max=end_date_max
)
log.info("gamma_closed_fetched", n=len(all_markets),
end_date_min=end_date_min, end_date_max=end_date_max)
return all_markets

async def _paginate_closed(
self,
client: RetryableHTTPClient,
end_date_min: str | None,
end_date_max: str | None,
) -> list[dict]:
results = []
offset = 0
while True:
params: dict = {
"closed": "true",
"limit": _PAGE_SIZE,
"offset": offset,
"order": "endDate",
"ascending": "false",
}
if end_date_min:
params["end_date_min"] = end_date_min
if end_date_max:
params["end_date_max"] = end_date_max

resp = await client.get("/markets", params=params)
resp.raise_for_status()
page: list[dict] = resp.json()
if not page:
break

results.extend(page)
if len(page) < _PAGE_SIZE:
break
offset += _PAGE_SIZE

return results

async def _paginate(
self,
client: RetryableHTTPClient,
Expand Down Expand Up @@ -136,14 +196,16 @@ async def _upsert_markets(self, session, raw_markets: list[dict]) -> int:
"category_fflow": None,
"created_at_chain": _parse_dt(m.get("createdAt") or m.get("startDate")),
"end_date": _parse_dt(m.get("endDate")),
"resolved_at": _parse_dt(m.get("closedTime")),
"resolution_outcome": _gamma_outcome(m),
"volume_total_usdc": m.get("volume"),
"liquidity_usdc": m.get("liquidity"),
"slug": m.get("slug"),
"raw_metadata": m,
"last_refreshed_at": now,
})

# asyncpg limit: 32767 params per query; with 12 cols per row → max ~2730 rows/batch
# asyncpg limit: 32767 params per query; with 14 cols per row → max ~2340 rows/batch
_BATCH = 2000
for i in range(0, len(rows), _BATCH):
batch = rows[i : i + _BATCH]
Expand All @@ -158,6 +220,8 @@ async def _upsert_markets(self, session, raw_markets: list[dict]) -> int:
"category_raw": insert(Market).excluded.category_raw,
"created_at_chain": insert(Market).excluded.created_at_chain,
"end_date": insert(Market).excluded.end_date,
"resolved_at": insert(Market).excluded.resolved_at,
"resolution_outcome": insert(Market).excluded.resolution_outcome,
"volume_total_usdc": insert(Market).excluded.volume_total_usdc,
"liquidity_usdc": insert(Market).excluded.liquidity_usdc,
"slug": insert(Market).excluded.slug,
Expand All @@ -176,3 +240,26 @@ def _parse_dt(value: str | None) -> datetime | None:
if not value:
return None
return datetime.fromisoformat(value.replace("Z", "+00:00"))


def _gamma_outcome(market: dict) -> int | None:
"""Parse outcomePrices to determine resolution outcome.

outcomePrices[0] = YES token final price.
["1","0"] = YES won (outcome=1); ["0","1"] = NO won (outcome=0).
"""
prices_raw = market.get("outcomePrices")
if not prices_raw:
return None
prices = json.loads(prices_raw) if isinstance(prices_raw, str) else prices_raw
if not prices or len(prices) < 2:
return None
try:
first = float(prices[0])
if abs(first - 1.0) < 0.01:
return 1 # YES token resolved at $1
if abs(first - 0.0) < 0.01:
return 0 # YES token resolved at $0 → NO won
return None # intermediate price, not yet resolved
except (ValueError, TypeError):
return None
Loading
Loading