Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions alembic/versions/0005_resolution_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Add resolution_type column to markets

Revision ID: 0005
Revises: 0004
Create Date: 2026-04-27
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

revision: str = "0005"
down_revision: Union[str, None] = "0004"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column(
"markets",
sa.Column("resolution_type", sa.String(50), nullable=True),
)
op.create_index("ix_markets_resolution_type", "markets", ["resolution_type"])


def downgrade() -> None:
op.drop_index("ix_markets_resolution_type", table_name="markets")
op.drop_column("markets", "resolution_type")
145 changes: 116 additions & 29 deletions fflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,25 @@ async def _subgraph_batch(
def collect_uma(
market: Annotated[Optional[str], typer.Option(help="Market condition ID")] = None,
all_resolved: Annotated[bool, typer.Option("--all-resolved")] = False,
event_resolved: Annotated[bool, typer.Option("--event-resolved", help="Run on event_resolved markets missing evidence URL")] = False,
min_volume: Annotated[float, typer.Option(help="Min volume for --event-resolved mode")] = 50000.0,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Fetch UMA resolution data for markets."""
from fflow.collectors.uma import UmaCollector

if not market and not all_resolved:
typer.echo("Provide --market or --all-resolved", err=True)
if not market and not all_resolved and not event_resolved:
typer.echo("Provide --market, --all-resolved, or --event-resolved", err=True)
raise typer.Exit(1)

result = asyncio.run(
UmaCollector().run(market_id=market, all_resolved=all_resolved, dry_run=dry_run)
UmaCollector().run(
market_id=market,
all_resolved=all_resolved,
event_resolved=event_resolved,
min_volume=min_volume,
dry_run=dry_run,
)
)
typer.echo(f"uma: {result.status}, n={result.n_written}")
if result.error:
Expand Down Expand Up @@ -674,47 +682,62 @@ async def _run() -> None:
def news_seed_proxy(
market_ids: Annotated[Optional[str], typer.Option(help="Comma-separated market IDs")] = None,
category: Annotated[Optional[str], typer.Option(help="Seed all markets in this category_fflow")] = None,
offset_days: Annotated[int, typer.Option(help="Days before end_date for proxy T_news")] = 1,
resolution_type: Annotated[Optional[str], typer.Option(help="Filter by resolution_type")] = None,
min_volume: Annotated[float, typer.Option(help="Min volume_total_usdc filter")] = 0.0,
offset_days: Annotated[int, typer.Option(help="Days offset for proxy T_news")] = 1,
anchor: Annotated[str, typer.Option(help="Anchor for proxy: 'end_date' or 'resolved_at'")] = "end_date",
confidence: Annotated[float, typer.Option(help="Confidence value to store")] = 0.50,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Seed synthetic T_news proxy from end_date - offset_days (tier=2, confidence=0.50).
"""Seed synthetic T_news proxy from anchor - offset_days (tier=2).

Used for markets resolved by Polymarket admin (no UMA evidence URL) where
the outcome was publicly knowable close to end_date.
--anchor end_date: t_news = end_date - offset_days (use for deadline markets)
--anchor resolved_at: t_news = resolved_at - offset_days (use for event markets;
resolved_at is close to the actual outcome event)
"""
from sqlalchemy import select

from fflow.db import AsyncSessionLocal
from fflow.models import Market, NewsTimestamp
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert

if anchor not in ("end_date", "resolved_at"):
typer.echo("--anchor must be 'end_date' or 'resolved_at'", err=True)
raise typer.Exit(1)

async def _run() -> None:
if market_ids:
ids = [m.strip() for m in market_ids.split(",") if m.strip()]
elif category:
else:
async with AsyncSessionLocal() as session:
rows = (
await session.execute(
select(Market.id).where(Market.category_fflow == category)
.where(Market.end_date.isnot(None))
.where(Market.resolved_at.isnot(None))
)
).scalars().all()
stmt = select(Market.id).where(Market.resolved_at.isnot(None))
if category:
stmt = stmt.where(Market.category_fflow == category)
if resolution_type:
stmt = stmt.where(Market.resolution_type == resolution_type)
if min_volume > 0:
stmt = stmt.where(Market.volume_total_usdc >= min_volume)
if anchor == "end_date":
stmt = stmt.where(Market.end_date.isnot(None))
rows = (await session.execute(stmt)).scalars().all()
ids = list(rows)
else:
typer.echo("Provide --market-ids or --category", err=True)
raise typer.Exit(1)

typer.echo(f"seed-proxy: {len(ids)} markets, offset={offset_days}d")
typer.echo(f"seed-proxy: {len(ids)} markets, anchor={anchor}, offset={offset_days}d")
ok = skip = 0
async with AsyncSessionLocal() as session:
for mid in ids:
mkt = await session.get(Market, mid)
if mkt is None or mkt.end_date is None:
if mkt is None:
skip += 1
continue

anchor_ts = mkt.resolved_at if anchor == "resolved_at" else mkt.end_date
if anchor_ts is None:
skip += 1
continue

t_news = mkt.end_date - timedelta(days=offset_days)
notes = f"proxy:end_date-{offset_days}d"
t_news = anchor_ts - timedelta(days=offset_days)
notes = f"proxy:{anchor}-{offset_days}d"

if not dry_run:
stmt = (
Expand All @@ -724,7 +747,7 @@ async def _run() -> None:
t_news=t_news,
tier=2,
source_url=None,
confidence=0.50,
confidence=confidence,
notes=notes,
recovered_at=datetime.now(UTC),
)
Expand All @@ -733,7 +756,7 @@ async def _run() -> None:
set_={
"t_news": t_news,
"tier": 2,
"confidence": 0.50,
"confidence": confidence,
"notes": notes,
},
)
Expand Down Expand Up @@ -777,26 +800,90 @@ async def _run() -> None:
asyncio.run(_run())


@score_app.command("classify-types")
def score_classify_types(
min_volume: Annotated[float, typer.Option(help="Min volume_total_usdc")] = 50000.0,
categories: Annotated[Optional[str], typer.Option(help="Comma-separated category_fflow filter")] = None,
limit: Annotated[Optional[int], typer.Option(help="Max markets to classify")] = None,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Classify resolution_type for all resolved markets with sufficient volume."""
from sqlalchemy import select, update

from fflow.db import AsyncSessionLocal
from fflow.models import Market
from fflow.scoring.resolution_type import classify_from_text

cats = [c.strip() for c in categories.split(",")] if categories else None

async def _run() -> None:
async with AsyncSessionLocal() as session:
stmt = (
select(
Market.id,
Market.question,
Market.resolution_outcome,
)
.where(Market.resolution_outcome.isnot(None))
.where(Market.volume_total_usdc >= min_volume)
)
if cats:
stmt = stmt.where(Market.category_fflow.in_(cats))
if limit:
stmt = stmt.limit(limit)
rows = (await session.execute(stmt)).all()

typer.echo(f"classify-types: {len(rows)} markets to classify")
counts: dict[str, int] = {}
ok = 0

for batch_start in range(0, len(rows), 500):
batch = rows[batch_start : batch_start + 500]
async with AsyncSessionLocal() as session:
for mid, question, outcome in batch:
rt = classify_from_text(question=question, resolution_outcome=outcome, last_price=None)
counts[rt] = counts.get(rt, 0) + 1
if not dry_run:
await session.execute(
update(Market).where(Market.id == mid).values(resolution_type=rt)
)
ok += 1
if not dry_run:
await session.commit()

typer.echo("Distribution: " + ", ".join(f"{k}={v}" for k, v in sorted(counts.items())))
if dry_run:
typer.echo("[dry-run] no writes")
else:
typer.echo(f"classify-types done: {ok} updated")

asyncio.run(_run())


@score_app.command("batch")
def score_batch(
limit: Annotated[int, typer.Option(help="Max markets to score")] = 500,
resolution_type: Annotated[Optional[str], typer.Option(help="Filter by resolution_type")] = None,
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Compute ILS labels for all markets that have a NewsTimestamp but no label."""
from sqlalchemy import select

from fflow.db import AsyncSessionLocal
from fflow.models import MarketLabel, NewsTimestamp
from fflow.models import Market, MarketLabel, NewsTimestamp
from fflow.scoring.pipeline import compute_market_label
from sqlalchemy import select

async def _run() -> None:
async with AsyncSessionLocal() as session:
# Markets with news but no label yet
labelled = select(MarketLabel.market_id)
stmt = (
select(NewsTimestamp.market_id)
.join(Market, Market.id == NewsTimestamp.market_id)
.where(NewsTimestamp.market_id.notin_(labelled))
.limit(limit)
)
if resolution_type:
stmt = stmt.where(Market.resolution_type == resolution_type)
stmt = stmt.limit(limit)
rows = (await session.execute(stmt)).scalars().all()

n_ok = n_fail = 0
Expand Down
23 changes: 20 additions & 3 deletions fflow/collectors/uma.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,30 @@ async def run(
target: str | None = None,
market_id: str | None = None,
all_resolved: bool = False,
event_resolved: bool = False,
min_volume: float = 50000.0,
dry_run: bool = False,
) -> CollectorResult:
mid = market_id or target
result = self._start_result(mid or "all_resolved")
label = mid or ("event_resolved" if event_resolved else "all_resolved")
result = self._start_result(label)
async with AsyncSessionLocal() as session:
run_id = await self._record_run_start(session, result)
try:
if all_resolved:
if event_resolved:
market_ids = await self._get_event_resolved_market_ids(session, min_volume)
elif all_resolved:
market_ids = await self._get_unresolved_market_ids(session)
else:
market_ids = [mid] if mid else []

log.info("uma_batch_start", n=len(market_ids), mode=label)
total = 0
for m_id in market_ids:
for i, m_id in enumerate(market_ids):
n = await self._process_market(session, m_id, dry_run)
total += n
if (i + 1) % 100 == 0:
log.info("uma_batch_progress", done=i + 1, total=len(market_ids), found=total)

result.n_written = total
result.status = "success"
Expand All @@ -129,6 +137,15 @@ async def _get_unresolved_market_ids(self, session) -> list[str]:
)
return [r[0] for r in rows.all()]

async def _get_event_resolved_market_ids(self, session, min_volume: float) -> list[str]:
rows = await session.execute(
select(Market.id)
.where(Market.resolution_type == "event_resolved")
.where(Market.resolution_evidence_url.is_(None))
.where(Market.volume_total_usdc >= min_volume)
)
return [r[0] for r in rows.all()]

def _make_gql_client(self) -> Client:
url = _uma_subgraph_url()
headers = {"Accept": "application/json"}
Expand Down
1 change: 1 addition & 0 deletions fflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Market(Base):
end_date: Mapped[datetime | None] = mapped_column(TZ())
resolved_at: Mapped[datetime | None] = mapped_column(TZ()) # T_resolve
resolution_outcome: Mapped[int | None] = mapped_column(Integer) # 0=NO, 1=YES
resolution_type: Mapped[str | None] = mapped_column(String(50))
resolution_evidence_url: Mapped[str | None] = mapped_column(Text)
resolution_proposer: Mapped[str | None] = mapped_column(String(42))
volume_total_usdc: Mapped[Any] = mapped_column(Numeric(20, 6), nullable=True)
Expand Down
Loading
Loading