Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ __pycache__
CLAUDE.md
.claude/
.vscode/
.mcp.json
.vouch
*.md

# Rust/Cargo build artifacts
target/
Expand Down
14 changes: 14 additions & 0 deletions allways/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@
# out of the rolling window. 0-2 tolerated; the 3rd timeout zeros credibility.
CREDIBILITY_MAX_TIMEOUTS: int = 2

# ─── Depth / Quality ─────────────────────────────────────
# Scale a crown holder's reward by how far their rate beats the per-direction
# "market" reference (a trimmed, volume-weighted, recency-decayed average of the
# subnet's own completed-swap clearing rates). Floored like VOLUME_WEIGHT_ALPHA
# so it's forgiving. Reference and factor are scoring-side only — no contract change.
QUALITY_EMA_HALF_LIFE_BLOCKS: int = 7_200 # ~1 day — clearing-rate weight halves each
QUALITY_TRIM_PCT: float = 0.10 # Drop top/bottom 10% by rate before averaging (kills wash/outlier extremes)
QUALITY_PER_MINER_CAP: float = 0.30 # No single hotkey contributes >30% of the reference weight
QUALITY_N_MIN: int = 20 # Below this many post-trim observations → reference disabled (factor 1.0)
# Improvement past the reference that earns the full bonus. THE tuning knob —
# calibrate against real live rate dispersion before trusting it.
QUALITY_ANCHOR: float = 0.05 # 5% better than market → full bonus
QUALITY_FLOOR: float = 0.5 # Crown holder never earns below half on rate quality alone

# ─── Emission Recycling ────────────────────────────────────
RECYCLE_UID = 53 # Subnet owner UID

Expand Down
18 changes: 18 additions & 0 deletions allways/validator/event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None
tao = int(values.get('tao_amount') or 0)
fee = int(values.get('fee_amount') or 0)
from_chain, to_chain = self._lookup_swap_direction(swap_id)
clearing_rate = self._lookup_swap_clearing_rate(swap_id)
self.state_store.insert_swap_outcome(
swap_id=swap_id,
miner_hotkey=miner,
Expand All @@ -728,6 +729,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None
tao_amount=tao,
from_chain=from_chain,
to_chain=to_chain,
clearing_rate=clearing_rate,
)
# The contract's apply_collateral_penalty deducts ``fee_amount``
# from collateral without emitting a CollateralWithdrawn event,
Expand Down Expand Up @@ -835,6 +837,22 @@ def _lookup_swap_direction(self, swap_id: int) -> Tuple[str, str]:
return '', ''
return (swap.from_chain or '').lower(), (swap.to_chain or '').lower()

def _lookup_swap_clearing_rate(self, swap_id: int) -> float:
"""Clearing rate (canonical TAO/BTC) for a just-completed swap, read
from the tracker's still-live Swap (resolve() runs after we record the
outcome). Snapshotted from the miner's commitment at initiation, so it's
the rate the swap actually cleared at. Returns 0.0 when unknown or
unparseable — excluded from the depth reference, same as a legacy row."""
if self.swap_tracker is None:
return 0.0
swap = self.swap_tracker.active.get(swap_id)
if swap is None:
return 0.0
try:
return float(swap.rate) if swap.rate else 0.0
except (TypeError, ValueError):
return 0.0

def record_reservation_pin(self, block_num: int, miner: str, reserved_until: int) -> None:
"""Pin the miner's commitment as of the reservation block ``block_num``.

Expand Down
120 changes: 119 additions & 1 deletion allways/validator/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from __future__ import annotations

import math
from dataclasses import dataclass, field
from enum import IntEnum
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set, Tuple
Expand All @@ -22,6 +23,12 @@
CREDIBILITY_WINDOW_BLOCKS,
DIRECTION_POOLS,
MAX_SCORING_BACKFILL_BLOCKS,
QUALITY_ANCHOR,
QUALITY_EMA_HALF_LIFE_BLOCKS,
QUALITY_FLOOR,
QUALITY_N_MIN,
QUALITY_PER_MINER_CAP,
QUALITY_TRIM_PCT,
RECYCLE_UID,
SCORING_WINDOW_BLOCKS,
SUCCESS_EXPONENT,
Expand All @@ -41,8 +48,10 @@ class DirectionTrace:
pool: float = 0.0
crown_blocks: Dict[str, float] = field(default_factory=dict)
cap_weighted_blocks: Dict[str, float] = field(default_factory=dict)
quality_weighted_blocks: Dict[str, float] = field(default_factory=dict)
unfilled_blocks: int = 0
best_rate: float = 0.0
quality_reference: Optional[float] = None


def due_for_scoring(current_block: int, last_scored_block: int, initial_scoring_done: bool) -> bool:
Expand Down Expand Up @@ -203,6 +212,13 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]:
if storage_enabled:
intervals = []
intervals_by_dir[(from_chain, to_chain)] = intervals
# Per-direction depth reference: trimmed, volume/recency-weighted average
# of recent completed-swap clearing rates. None until enough swaps accrue
# (bootstrap), in which case the quality factor is a 1.0 no-op. Same 30-day
# lookback the table is pruned to; the EMA half-life down-weights old swaps.
clearing_obs = self.state_store.get_clearing_rates_by_direction_since(credibility_since, from_chain, to_chain)
quality_reference = compute_quality_reference(clearing_obs, window_end)
trace.quality_reference = quality_reference
crown_blocks = replay_crown_time_window(
store=self.state_store,
event_watcher=self.event_watcher,
Expand All @@ -215,6 +231,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]:
intervals_out=intervals,
min_swap_rao=min_swap_amount,
max_swap_rao=max_swap_amount,
quality_reference=quality_reference,
)
total_crown_dir = sum(crown_blocks.values())
volumes_dir = self.state_store.get_volume_by_direction_since(window_start, from_chain, to_chain)
Expand Down Expand Up @@ -245,8 +262,15 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]:
# already earned (#409).
cap_blocks = trace.cap_weighted_blocks.get(hotkey, 0.0)
cap = (cap_blocks / blocks) if blocks > 0 else 0.0
# Depth quality, integrated per-block like capacity: the time-weighted
# average of the floored quality factor over the miner's crown
# intervals (1.0 during bootstrap). A post-window rate change can't
# retroactively rescale credit already earned.
q_blocks = trace.quality_weighted_blocks.get(hotkey, 0.0)
quality = (q_blocks / blocks) if blocks > 0 else 0.0
wt = weighting_traces.setdefault(hotkey, WeightingTrace())
wt.record_capacity(factor=cap)
wt.record_quality(factor=quality)
wt.record_credibility(
closed_swaps=sum(success_stats.get(hotkey, (0, 0))),
ramp_target=CREDIBILITY_RAMP_OBSERVATIONS,
Expand All @@ -260,7 +284,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]:
pool * crown_share_dir * (success_rates[hotkey] ** SUCCESS_EXPONENT) * credibility_ramps[hotkey] * cap
)
unweighted_rewards[uid] += base
rewards[uid] += base * vol_factor
rewards[uid] += base * vol_factor * quality
if vol_factor < 1.0:
bt.logging.debug(
f'V1 scoring [{from_chain}→{to_chain}] {hotkey[:8]}: '
Expand Down Expand Up @@ -346,6 +370,93 @@ def volume_factor(
return (1.0 - alpha) + alpha * participation


def compute_quality_reference(
observations: List[Tuple[float, int, int, str]],
now_block: int,
*,
half_life_blocks: int = QUALITY_EMA_HALF_LIFE_BLOCKS,
trim_pct: float = QUALITY_TRIM_PCT,
per_miner_cap: float = QUALITY_PER_MINER_CAP,
n_min: int = QUALITY_N_MIN,
) -> Optional[float]:
"""Per-direction "market" reference from completed-swap clearing rates.

``observations`` are ``(rate, tao_amount, resolved_block, miner_hotkey)``.
Trimmed (top/bottom ``trim_pct`` by rate), volume- and recency-weighted
(weight halves every ``half_life_blocks``), with each miner's summed weight
capped at ``per_miner_cap`` of the total. Returns ``None`` when fewer than
``n_min`` observations survive the trim — caller treats that as "depth off"
(factor 1.0), matching the self-bootstrapping rollout.

DETERMINISM: this feeds emissions → consensus, so every validator must get a
byte-identical result. We sort by a stable total-order key and sum with
``math.fsum`` over that fixed order — never reduce over an unordered set.
``now_block`` must be an on-chain block (window_end), never wall-clock.
"""
rows = [o for o in observations if o[0] > 0]
# Stable TOTAL order so trim drops identical rows on every validator: rate,
# then block, miner, volume. Two rows equal on all four are interchangeable.
rows.sort(key=lambda o: (o[0], o[2], o[3], o[1]))
n = len(rows)
drop = int(math.floor(n * trim_pct))
trimmed = rows[drop : n - drop] if drop > 0 else rows
if len(trimmed) < n_min:
return None

weights: List[float] = []
by_miner: Dict[str, float] = {}
for rate, vol, block, miner in trimmed:
recency = 2.0 ** (-(now_block - block) / half_life_blocks)
w = max(0.0, float(vol)) * recency
weights.append(w)
by_miner[miner] = by_miner.get(miner, 0.0) + w

total_w = math.fsum(weights)
if total_w <= 0:
return None

# Per-miner cap: scale down any miner whose summed weight exceeds the cap so
# no single operator (or sybil cluster sharing a hotkey) defines the market.
cap_w = per_miner_cap * total_w
scale = {m: (cap_w / w if w > cap_w else 1.0) for m, w in by_miner.items()}

capped = [w * scale[trimmed[i][3]] for i, w in enumerate(weights)]
denom = math.fsum(capped)
if denom <= 0:
return None
numer = math.fsum(capped[i] * trimmed[i][0] for i in range(len(trimmed)))
return numer / denom


def direction_aware_improvement(rate: float, reference: float, lower_rate_wins: bool) -> float:
"""Fractional improvement of ``rate`` past the reference, signed so deeper
is positive in either direction. btc→tao: higher TAO/BTC is the better deal;
tao→btc (``lower_rate_wins``): lower TAO/BTC is. Returns 0 for a non-positive
reference."""
if reference <= 0:
return 0.0
return (reference - rate) / reference if lower_rate_wins else (rate - reference) / reference


def quality_factor(
rate: float,
reference: Optional[float],
lower_rate_wins: bool,
*,
anchor: float = QUALITY_ANCHOR,
floor: float = QUALITY_FLOOR,
) -> float:
"""Depth multiplier in ``[floor, 1.0]``. At/below market → floor; at or past
``anchor`` improvement → 1.0; linear between. ``reference is None`` (bootstrap)
→ 1.0, so depth is a no-op until the reference exists. Mirrors
``volume_factor``'s floored-ramp shape."""
if reference is None or anchor <= 0:
return 1.0
improvement = direction_aware_improvement(rate, reference, lower_rate_wins)
depth_ramp = min(1.0, max(0.0, improvement / anchor))
return floor + (1.0 - floor) * depth_ramp


def record_volume_traces(
*,
weighting_traces: Dict[str, WeightingTrace],
Expand Down Expand Up @@ -524,6 +635,7 @@ def replay_crown_time_window(
intervals_out: Optional[List[Tuple[int, int, List[str], float]]] = None,
min_swap_rao: int = 0,
max_swap_rao: int = 0,
quality_reference: Optional[float] = None,
) -> Dict[str, float]:
"""Walk the merged event stream, return ``{hotkey: crown_blocks_float}``.
Ties at the same rate split credit evenly. A miner qualifies for crown
Expand Down Expand Up @@ -560,6 +672,7 @@ def executable_check(rate: float) -> bool:

crown_blocks: Dict[str, float] = {}
cap_weighted_blocks: Dict[str, float] = {}
quality_weighted_blocks: Dict[str, float] = {}
prev_block = window_start

def effective_rates() -> Dict[str, float]:
Expand Down Expand Up @@ -613,12 +726,16 @@ def credit_interval(interval_start: int, interval_end: int) -> None:
if intervals_out is not None:
intervals_out.append((interval_start, interval_end, list(holders), winner_rate))
split = duration / len(holders)
# All holders are tied at winner_rate, so the depth factor is shared.
# None reference (bootstrap) → 1.0, leaving credit unchanged.
qf = quality_factor(winner_rate, quality_reference, lower_rate_wins)
for hk in holders:
crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split
# Unknown collateral (no event recorded) → capacity 1.0, matching
# can_fund's fail-open. Only a known value scales capacity down.
cap = capacity_factor(collaterals[hk], max_swap_rao) if hk in collaterals else 1.0
cap_weighted_blocks[hk] = cap_weighted_blocks.get(hk, 0.0) + split * cap
quality_weighted_blocks[hk] = quality_weighted_blocks.get(hk, 0.0) + split * qf

def apply_event(event: ReplayEvent) -> None:
if event.kind is EventKind.RATE:
Expand Down Expand Up @@ -651,6 +768,7 @@ def apply_event(event: ReplayEvent) -> None:
if trace is not None:
trace.crown_blocks = dict(crown_blocks)
trace.cap_weighted_blocks = dict(cap_weighted_blocks)
trace.quality_weighted_blocks = dict(quality_weighted_blocks)
return crown_blocks


Expand Down
5 changes: 5 additions & 0 deletions allways/validator/scoring_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ class WeightingTrace:
volume_factor: float = 1.0
closed_swaps: int = 0
credibility_ramp: float = 0.0
quality_factor: float = 1.0

def record_capacity(self, factor: float) -> None:
self.capacity_factor = factor

def record_quality(self, factor: float) -> None:
self.quality_factor = factor

def record_volume(self, vol_rao: int, total_volume_rao: int, crown_share: float, factor: float) -> None:
self.volume_rao = vol_rao
self.crown_share = crown_share
Expand Down Expand Up @@ -111,6 +115,7 @@ def log_scoring_trace(
f' cap={wt.capacity_factor:.2f}'
f' vol={wt.volume_rao / TAO_TO_RAO:g}t vol_share={wt.volume_share:.2f}'
f' crown_share={wt.crown_share:.2f} vol_f={wt.volume_factor:.2f}'
f' quality_f={wt.quality_factor:.2f}'
)
lines.append(
f' uid={uid} hotkey={hk[:8]}.. crown_blk={crown_blk:.0f} sr={sr:.3f}{extras} reward={crown_reward:.3f}'
Expand Down
39 changes: 35 additions & 4 deletions allways/validator/state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,16 +498,18 @@ def insert_swap_outcome(
tao_amount: int = 0,
from_chain: str = '',
to_chain: str = '',
clearing_rate: float = 0.0,
) -> None:
# Direction is normalized to lowercase on write so the per-direction
# volume query is robust to upstream case drift. SQLite text
# comparisons are case-sensitive and DIRECTION_POOLS keys are
# lowercase.
# lowercase. ``clearing_rate`` (canonical TAO/BTC) feeds the depth
# reference; 0.0 means "no usable observation" (timed-out or legacy).
self._execute(
"""
INSERT OR REPLACE INTO swap_outcomes
(swap_id, miner_hotkey, completed, resolved_block, tao_amount, from_chain, to_chain)
VALUES (?, ?, ?, ?, ?, ?, ?)
(swap_id, miner_hotkey, completed, resolved_block, tao_amount, from_chain, to_chain, clearing_rate)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
swap_id,
Expand All @@ -517,6 +519,7 @@ def insert_swap_outcome(
int(tao_amount or 0),
(from_chain or '').lower(),
(to_chain or '').lower(),
float(clearing_rate or 0.0),
),
)

Expand Down Expand Up @@ -576,6 +579,32 @@ def get_volume_by_direction_since(self, since_block: int, from_chain: str, to_ch
)
return {r['miner_hotkey']: int(r['total'] or 0) for r in rows}

def get_clearing_rates_by_direction_since(
self, since_block: int, from_chain: str, to_chain: str
) -> List[Tuple[float, int, int, str]]:
"""Completed-swap clearing rates for one direction, for the depth
reference. Each row is ``(clearing_rate, tao_amount, resolved_block,
miner_hotkey)``. Rows with ``clearing_rate = 0`` (timed-out swaps,
pre-migration legacy) are excluded — they carry no usable rate, same
as the volume query excludes ``tao_amount = 0``. Direction is
lowercased to match ``insert_swap_outcome``."""
rows = self._fetchall(
"""
SELECT clearing_rate, tao_amount, resolved_block, miner_hotkey
FROM swap_outcomes
WHERE resolved_block >= ?
AND completed = 1
AND clearing_rate > 0
AND from_chain = ?
AND to_chain = ?
""",
(since_block, (from_chain or '').lower(), (to_chain or '').lower()),
)
return [
(float(r['clearing_rate']), int(r['tao_amount'] or 0), int(r['resolved_block']), r['miner_hotkey'])
for r in rows
]

def prune_swap_outcomes_older_than(self, cutoff_block: int) -> None:
if cutoff_block <= 0:
return
Expand Down Expand Up @@ -877,7 +906,8 @@ def init_db(self) -> None:
resolved_block INTEGER NOT NULL,
tao_amount INTEGER NOT NULL DEFAULT 0,
from_chain TEXT NOT NULL DEFAULT '',
to_chain TEXT NOT NULL DEFAULT ''
to_chain TEXT NOT NULL DEFAULT '',
clearing_rate REAL NOT NULL DEFAULT 0.0
);
CREATE INDEX IF NOT EXISTS idx_swap_outcomes_hotkey
ON swap_outcomes(miner_hotkey);
Expand Down Expand Up @@ -976,6 +1006,7 @@ def init_db(self) -> None:
('swap_outcomes', 'tao_amount', 'INTEGER NOT NULL DEFAULT 0'),
('swap_outcomes', 'from_chain', "TEXT NOT NULL DEFAULT ''"),
('swap_outcomes', 'to_chain', "TEXT NOT NULL DEFAULT ''"),
('swap_outcomes', 'clearing_rate', 'REAL NOT NULL DEFAULT 0.0'),
):
try:
conn.execute(f'ALTER TABLE {table} ADD COLUMN {column} {ddl}')
Expand Down
Loading