Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 64 additions & 23 deletions fflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,68 @@ def collect_subgraph(
raise typer.Exit(1)


def _load_resume_set(progress_path: "pathlib.Path") -> "set[str]":
"""Return set of market_ids already successfully processed (status == 'ok')."""
import json
done: set[str] = set()
if not progress_path.exists():
return done
with open(progress_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
if rec.get("status") == "ok":
done.add(rec["market_id"])
except (json.JSONDecodeError, KeyError):
pass
return done


def _write_progress(
path: "pathlib.Path",
market_id: str,
status: str,
trades_count: int,
wallets_count: int,
duration_ms: int,
) -> None:
import json
entry = {
"market_id": market_id,
"status": status,
"trades_count": trades_count,
"wallets_count": wallets_count,
"duration_ms": duration_ms,
"ts": datetime.now(UTC).isoformat(),
}
with open(path, "a") as f:
f.write(json.dumps(entry) + "\n")


async def _subgraph_batch(
min_volume: float,
dry_run: bool,
max_volume: float | None = None,
limit: int | None = None,
categories: list[str] | None = None,
) -> None:
import json
import pathlib
import time
from fflow.collectors.subgraph import SubgraphCollector
from fflow.db import AsyncSessionLocal
from fflow.models import Market, Trade
from sqlalchemy import select, func

progress_path = pathlib.Path("logs/batch_progress.jsonl")
progress_path.parent.mkdir(parents=True, exist_ok=True)
resume_set = _load_resume_set(progress_path)
if resume_set:
log.info("subgraph_batch_resume", already_done=len(resume_set))
typer.echo(f"resuming: {len(resume_set)} markets already completed in checkpoint")

async with AsyncSessionLocal() as session:
stmt = (
select(Market.id, Market.volume_total_usdc, Market.resolved_at)
Expand All @@ -172,15 +220,17 @@ async def _subgraph_batch(
else:
typer.echo(f"subgraph batch: {total} markets vol>=${min_volume:,.0f}" + (f" (limit={limit})" if limit else "") + (f" categories={categories}" if categories else ""))

progress_path = pathlib.Path("logs/batch_progress.jsonl")
progress_path.parent.mkdir(parents=True, exist_ok=True)

collector = SubgraphCollector()
stale_cutoff = datetime.now(UTC) - timedelta(days=1)
ok = fail = skipped = already_done = 0

for mid, vol, resolved_at in rows:
# Idempotency: skip markets resolved >1 day ago that already have trades
# Resume: skip markets already in checkpoint with status=ok
if mid in resume_set:
already_done += 1
continue

# Idempotency: skip markets resolved >1 day ago that already have trades in DB
resolved_is_old = resolved_at and resolved_at < stale_cutoff
if resolved_is_old and not dry_run:
async with AsyncSessionLocal() as session:
Expand All @@ -189,40 +239,31 @@ async def _subgraph_batch(
)
if existing and existing > 0:
already_done += 1
log.info("subgraph_batch_skip", market=mid, existing_trades=existing)
_write_progress(progress_path, mid, "already_done", existing, float(vol or 0))
log.info("subgraph_skip_already_collected", market=mid, existing_trades=existing)
_write_progress(progress_path, mid, "ok", existing, 0, 0)
continue

t0 = time.monotonic()
try:
r = await collector.run(market_id=mid, dry_run=dry_run)
duration_ms = int((time.monotonic() - t0) * 1000)
if r.n_written and r.n_written > 0:
ok += 1
_write_progress(progress_path, mid, "ok", r.n_written, r.n_wallets, duration_ms)
else:
skipped += 1
_write_progress(progress_path, mid, "skipped", 0, 0, duration_ms)
log.info("subgraph_batch_market", market=mid, vol=float(vol or 0),
status=r.status, n=r.n_written)
_write_progress(progress_path, mid, r.status, r.n_written or 0, float(vol or 0))
status=r.status, n=r.n_written, wallets=r.n_wallets, ms=duration_ms)
except Exception as exc:
duration_ms = int((time.monotonic() - t0) * 1000)
fail += 1
log.error("subgraph_batch_error", market=mid, error=str(exc))
_write_progress(progress_path, mid, "failed", 0, float(vol or 0))
_write_progress(progress_path, mid, "failed", 0, 0, duration_ms)

typer.echo(f"subgraph batch done: ok={ok} skipped(0 trades)={skipped} already_done={already_done} fail={fail}")


def _write_progress(path: "pathlib.Path", market_id: str, status: str, trades: int, vol: float) -> None:
import json, pathlib
entry = {
"market_id": market_id,
"status": status,
"trades_count": trades,
"volume": vol,
"timestamp": datetime.now(UTC).isoformat(),
}
with open(path, "a") as f:
f.write(json.dumps(entry) + "\n")


# ---------------------------------------------------------------------------
# collect uma
# ---------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions fflow/collectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CollectorResult(BaseModel):
collector: str
target: str | None = None
n_written: int = 0
n_wallets: int = 0
started_at: datetime
finished_at: datetime | None = None
status: str = "running"
Expand Down
4 changes: 2 additions & 2 deletions fflow/collectors/subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def run(
yes_token = await self._resolve_yes_token(session, mid)
trades = await self._fetch_trades(mid, yes_token, from_ts)
if not dry_run:
result.n_written = await self._upsert_trades(session, mid, yes_token, trades)
result.n_written, result.n_wallets = await self._upsert_trades(session, mid, yes_token, trades)
else:
result.n_written = len(trades)
result.status = "success"
Expand Down Expand Up @@ -240,7 +240,7 @@ async def _upsert_trades(
await session.commit()

log.info("subgraph_upserted", market=market_id, trades=total, wallets=len(wallet_set))
return total
return total, len(wallet_set)


def _parse_log_index(raw_id: str) -> int:
Expand Down
2 changes: 1 addition & 1 deletion fflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Settings(BaseSettings):
log_level: str = "INFO"
log_json: bool = False

model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env")
model_config = SettingsConfigDict(env_prefix="FFLOW_", env_file=".env", extra="ignore")


settings = Settings()
Loading
Loading