diff --git a/.gitignore b/.gitignore index ebe488d..cff1ce1 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,9 @@ __pycache__ CLAUDE.md .claude/ .vscode/ +.mcp.json +.vouch +*.md # Rust/Cargo build artifacts target/ diff --git a/allways/constants.py b/allways/constants.py index 96eb6a3..c4e69f6 100644 --- a/allways/constants.py +++ b/allways/constants.py @@ -68,6 +68,20 @@ # out of the rolling window. 0-2 tolerated; the 3rd timeout zeros credibility. CREDIBILITY_MAX_TIMEOUTS: int = 2 +# ─── Depth / Quality ───────────────────────────────────── +# Scale a crown holder's reward by how far their rate beats the per-direction +# "market" reference (a trimmed, volume-weighted, recency-decayed average of the +# subnet's own completed-swap clearing rates). Floored like VOLUME_WEIGHT_ALPHA +# so it's forgiving. Reference and factor are scoring-side only — no contract change. +QUALITY_EMA_HALF_LIFE_BLOCKS: int = 7_200 # ~1 day — clearing-rate weight halves each +QUALITY_TRIM_PCT: float = 0.10 # Drop top/bottom 10% by rate before averaging (kills wash/outlier extremes) +QUALITY_PER_MINER_CAP: float = 0.30 # No single hotkey contributes >30% of the reference weight +QUALITY_N_MIN: int = 20 # Below this many post-trim observations → reference disabled (factor 1.0) +# Improvement past the reference that earns the full bonus. THE tuning knob — +# calibrate against real live rate dispersion before trusting it. +QUALITY_ANCHOR: float = 0.05 # 5% better than market → full bonus +QUALITY_FLOOR: float = 0.5 # Crown holder never earns below half on rate quality alone + # ─── Emission Recycling ──────────────────────────────────── RECYCLE_UID = 53 # Subnet owner UID diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index fe3b52f..bfd98b4 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -720,6 +720,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None tao = int(values.get('tao_amount') or 0) fee = int(values.get('fee_amount') or 0) from_chain, to_chain = self._lookup_swap_direction(swap_id) + clearing_rate = self._lookup_swap_clearing_rate(swap_id) self.state_store.insert_swap_outcome( swap_id=swap_id, miner_hotkey=miner, @@ -728,6 +729,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None tao_amount=tao, from_chain=from_chain, to_chain=to_chain, + clearing_rate=clearing_rate, ) # The contract's apply_collateral_penalty deducts ``fee_amount`` # from collateral without emitting a CollateralWithdrawn event, @@ -835,6 +837,22 @@ def _lookup_swap_direction(self, swap_id: int) -> Tuple[str, str]: return '', '' return (swap.from_chain or '').lower(), (swap.to_chain or '').lower() + def _lookup_swap_clearing_rate(self, swap_id: int) -> float: + """Clearing rate (canonical TAO/BTC) for a just-completed swap, read + from the tracker's still-live Swap (resolve() runs after we record the + outcome). Snapshotted from the miner's commitment at initiation, so it's + the rate the swap actually cleared at. Returns 0.0 when unknown or + unparseable — excluded from the depth reference, same as a legacy row.""" + if self.swap_tracker is None: + return 0.0 + swap = self.swap_tracker.active.get(swap_id) + if swap is None: + return 0.0 + try: + return float(swap.rate) if swap.rate else 0.0 + except (TypeError, ValueError): + return 0.0 + def record_reservation_pin(self, block_num: int, miner: str, reserved_until: int) -> None: """Pin the miner's commitment as of the reservation block ``block_num``. diff --git a/allways/validator/scoring.py b/allways/validator/scoring.py index 0e34a6c..aac96b2 100644 --- a/allways/validator/scoring.py +++ b/allways/validator/scoring.py @@ -8,6 +8,7 @@ from __future__ import annotations +import math from dataclasses import dataclass, field from enum import IntEnum from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set, Tuple @@ -22,6 +23,12 @@ CREDIBILITY_WINDOW_BLOCKS, DIRECTION_POOLS, MAX_SCORING_BACKFILL_BLOCKS, + QUALITY_ANCHOR, + QUALITY_EMA_HALF_LIFE_BLOCKS, + QUALITY_FLOOR, + QUALITY_N_MIN, + QUALITY_PER_MINER_CAP, + QUALITY_TRIM_PCT, RECYCLE_UID, SCORING_WINDOW_BLOCKS, SUCCESS_EXPONENT, @@ -41,8 +48,10 @@ class DirectionTrace: pool: float = 0.0 crown_blocks: Dict[str, float] = field(default_factory=dict) cap_weighted_blocks: Dict[str, float] = field(default_factory=dict) + quality_weighted_blocks: Dict[str, float] = field(default_factory=dict) unfilled_blocks: int = 0 best_rate: float = 0.0 + quality_reference: Optional[float] = None def due_for_scoring(current_block: int, last_scored_block: int, initial_scoring_done: bool) -> bool: @@ -203,6 +212,13 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: if storage_enabled: intervals = [] intervals_by_dir[(from_chain, to_chain)] = intervals + # Per-direction depth reference: trimmed, volume/recency-weighted average + # of recent completed-swap clearing rates. None until enough swaps accrue + # (bootstrap), in which case the quality factor is a 1.0 no-op. Same 30-day + # lookback the table is pruned to; the EMA half-life down-weights old swaps. + clearing_obs = self.state_store.get_clearing_rates_by_direction_since(credibility_since, from_chain, to_chain) + quality_reference = compute_quality_reference(clearing_obs, window_end) + trace.quality_reference = quality_reference crown_blocks = replay_crown_time_window( store=self.state_store, event_watcher=self.event_watcher, @@ -215,6 +231,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: intervals_out=intervals, min_swap_rao=min_swap_amount, max_swap_rao=max_swap_amount, + quality_reference=quality_reference, ) total_crown_dir = sum(crown_blocks.values()) volumes_dir = self.state_store.get_volume_by_direction_since(window_start, from_chain, to_chain) @@ -245,8 +262,15 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: # already earned (#409). cap_blocks = trace.cap_weighted_blocks.get(hotkey, 0.0) cap = (cap_blocks / blocks) if blocks > 0 else 0.0 + # Depth quality, integrated per-block like capacity: the time-weighted + # average of the floored quality factor over the miner's crown + # intervals (1.0 during bootstrap). A post-window rate change can't + # retroactively rescale credit already earned. + q_blocks = trace.quality_weighted_blocks.get(hotkey, 0.0) + quality = (q_blocks / blocks) if blocks > 0 else 0.0 wt = weighting_traces.setdefault(hotkey, WeightingTrace()) wt.record_capacity(factor=cap) + wt.record_quality(factor=quality) wt.record_credibility( closed_swaps=sum(success_stats.get(hotkey, (0, 0))), ramp_target=CREDIBILITY_RAMP_OBSERVATIONS, @@ -260,7 +284,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: pool * crown_share_dir * (success_rates[hotkey] ** SUCCESS_EXPONENT) * credibility_ramps[hotkey] * cap ) unweighted_rewards[uid] += base - rewards[uid] += base * vol_factor + rewards[uid] += base * vol_factor * quality if vol_factor < 1.0: bt.logging.debug( f'V1 scoring [{from_chain}→{to_chain}] {hotkey[:8]}: ' @@ -346,6 +370,93 @@ def volume_factor( return (1.0 - alpha) + alpha * participation +def compute_quality_reference( + observations: List[Tuple[float, int, int, str]], + now_block: int, + *, + half_life_blocks: int = QUALITY_EMA_HALF_LIFE_BLOCKS, + trim_pct: float = QUALITY_TRIM_PCT, + per_miner_cap: float = QUALITY_PER_MINER_CAP, + n_min: int = QUALITY_N_MIN, +) -> Optional[float]: + """Per-direction "market" reference from completed-swap clearing rates. + + ``observations`` are ``(rate, tao_amount, resolved_block, miner_hotkey)``. + Trimmed (top/bottom ``trim_pct`` by rate), volume- and recency-weighted + (weight halves every ``half_life_blocks``), with each miner's summed weight + capped at ``per_miner_cap`` of the total. Returns ``None`` when fewer than + ``n_min`` observations survive the trim — caller treats that as "depth off" + (factor 1.0), matching the self-bootstrapping rollout. + + DETERMINISM: this feeds emissions → consensus, so every validator must get a + byte-identical result. We sort by a stable total-order key and sum with + ``math.fsum`` over that fixed order — never reduce over an unordered set. + ``now_block`` must be an on-chain block (window_end), never wall-clock. + """ + rows = [o for o in observations if o[0] > 0] + # Stable TOTAL order so trim drops identical rows on every validator: rate, + # then block, miner, volume. Two rows equal on all four are interchangeable. + rows.sort(key=lambda o: (o[0], o[2], o[3], o[1])) + n = len(rows) + drop = int(math.floor(n * trim_pct)) + trimmed = rows[drop : n - drop] if drop > 0 else rows + if len(trimmed) < n_min: + return None + + weights: List[float] = [] + by_miner: Dict[str, float] = {} + for rate, vol, block, miner in trimmed: + recency = 2.0 ** (-(now_block - block) / half_life_blocks) + w = max(0.0, float(vol)) * recency + weights.append(w) + by_miner[miner] = by_miner.get(miner, 0.0) + w + + total_w = math.fsum(weights) + if total_w <= 0: + return None + + # Per-miner cap: scale down any miner whose summed weight exceeds the cap so + # no single operator (or sybil cluster sharing a hotkey) defines the market. + cap_w = per_miner_cap * total_w + scale = {m: (cap_w / w if w > cap_w else 1.0) for m, w in by_miner.items()} + + capped = [w * scale[trimmed[i][3]] for i, w in enumerate(weights)] + denom = math.fsum(capped) + if denom <= 0: + return None + numer = math.fsum(capped[i] * trimmed[i][0] for i in range(len(trimmed))) + return numer / denom + + +def direction_aware_improvement(rate: float, reference: float, lower_rate_wins: bool) -> float: + """Fractional improvement of ``rate`` past the reference, signed so deeper + is positive in either direction. btc→tao: higher TAO/BTC is the better deal; + tao→btc (``lower_rate_wins``): lower TAO/BTC is. Returns 0 for a non-positive + reference.""" + if reference <= 0: + return 0.0 + return (reference - rate) / reference if lower_rate_wins else (rate - reference) / reference + + +def quality_factor( + rate: float, + reference: Optional[float], + lower_rate_wins: bool, + *, + anchor: float = QUALITY_ANCHOR, + floor: float = QUALITY_FLOOR, +) -> float: + """Depth multiplier in ``[floor, 1.0]``. At/below market → floor; at or past + ``anchor`` improvement → 1.0; linear between. ``reference is None`` (bootstrap) + → 1.0, so depth is a no-op until the reference exists. Mirrors + ``volume_factor``'s floored-ramp shape.""" + if reference is None or anchor <= 0: + return 1.0 + improvement = direction_aware_improvement(rate, reference, lower_rate_wins) + depth_ramp = min(1.0, max(0.0, improvement / anchor)) + return floor + (1.0 - floor) * depth_ramp + + def record_volume_traces( *, weighting_traces: Dict[str, WeightingTrace], @@ -524,6 +635,7 @@ def replay_crown_time_window( intervals_out: Optional[List[Tuple[int, int, List[str], float]]] = None, min_swap_rao: int = 0, max_swap_rao: int = 0, + quality_reference: Optional[float] = None, ) -> Dict[str, float]: """Walk the merged event stream, return ``{hotkey: crown_blocks_float}``. Ties at the same rate split credit evenly. A miner qualifies for crown @@ -560,6 +672,7 @@ def executable_check(rate: float) -> bool: crown_blocks: Dict[str, float] = {} cap_weighted_blocks: Dict[str, float] = {} + quality_weighted_blocks: Dict[str, float] = {} prev_block = window_start def effective_rates() -> Dict[str, float]: @@ -613,12 +726,16 @@ def credit_interval(interval_start: int, interval_end: int) -> None: if intervals_out is not None: intervals_out.append((interval_start, interval_end, list(holders), winner_rate)) split = duration / len(holders) + # All holders are tied at winner_rate, so the depth factor is shared. + # None reference (bootstrap) → 1.0, leaving credit unchanged. + qf = quality_factor(winner_rate, quality_reference, lower_rate_wins) for hk in holders: crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split # Unknown collateral (no event recorded) → capacity 1.0, matching # can_fund's fail-open. Only a known value scales capacity down. cap = capacity_factor(collaterals[hk], max_swap_rao) if hk in collaterals else 1.0 cap_weighted_blocks[hk] = cap_weighted_blocks.get(hk, 0.0) + split * cap + quality_weighted_blocks[hk] = quality_weighted_blocks.get(hk, 0.0) + split * qf def apply_event(event: ReplayEvent) -> None: if event.kind is EventKind.RATE: @@ -651,6 +768,7 @@ def apply_event(event: ReplayEvent) -> None: if trace is not None: trace.crown_blocks = dict(crown_blocks) trace.cap_weighted_blocks = dict(cap_weighted_blocks) + trace.quality_weighted_blocks = dict(quality_weighted_blocks) return crown_blocks diff --git a/allways/validator/scoring_trace.py b/allways/validator/scoring_trace.py index fa5c36d..a3262d7 100644 --- a/allways/validator/scoring_trace.py +++ b/allways/validator/scoring_trace.py @@ -44,10 +44,14 @@ class WeightingTrace: volume_factor: float = 1.0 closed_swaps: int = 0 credibility_ramp: float = 0.0 + quality_factor: float = 1.0 def record_capacity(self, factor: float) -> None: self.capacity_factor = factor + def record_quality(self, factor: float) -> None: + self.quality_factor = factor + def record_volume(self, vol_rao: int, total_volume_rao: int, crown_share: float, factor: float) -> None: self.volume_rao = vol_rao self.crown_share = crown_share @@ -111,6 +115,7 @@ def log_scoring_trace( f' cap={wt.capacity_factor:.2f}' f' vol={wt.volume_rao / TAO_TO_RAO:g}t vol_share={wt.volume_share:.2f}' f' crown_share={wt.crown_share:.2f} vol_f={wt.volume_factor:.2f}' + f' quality_f={wt.quality_factor:.2f}' ) lines.append( f' uid={uid} hotkey={hk[:8]}.. crown_blk={crown_blk:.0f} sr={sr:.3f}{extras} reward={crown_reward:.3f}' diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index 970b081..b4f8076 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -498,16 +498,18 @@ def insert_swap_outcome( tao_amount: int = 0, from_chain: str = '', to_chain: str = '', + clearing_rate: float = 0.0, ) -> None: # Direction is normalized to lowercase on write so the per-direction # volume query is robust to upstream case drift. SQLite text # comparisons are case-sensitive and DIRECTION_POOLS keys are - # lowercase. + # lowercase. ``clearing_rate`` (canonical TAO/BTC) feeds the depth + # reference; 0.0 means "no usable observation" (timed-out or legacy). self._execute( """ INSERT OR REPLACE INTO swap_outcomes - (swap_id, miner_hotkey, completed, resolved_block, tao_amount, from_chain, to_chain) - VALUES (?, ?, ?, ?, ?, ?, ?) + (swap_id, miner_hotkey, completed, resolved_block, tao_amount, from_chain, to_chain, clearing_rate) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( swap_id, @@ -517,6 +519,7 @@ def insert_swap_outcome( int(tao_amount or 0), (from_chain or '').lower(), (to_chain or '').lower(), + float(clearing_rate or 0.0), ), ) @@ -576,6 +579,32 @@ def get_volume_by_direction_since(self, since_block: int, from_chain: str, to_ch ) return {r['miner_hotkey']: int(r['total'] or 0) for r in rows} + def get_clearing_rates_by_direction_since( + self, since_block: int, from_chain: str, to_chain: str + ) -> List[Tuple[float, int, int, str]]: + """Completed-swap clearing rates for one direction, for the depth + reference. Each row is ``(clearing_rate, tao_amount, resolved_block, + miner_hotkey)``. Rows with ``clearing_rate = 0`` (timed-out swaps, + pre-migration legacy) are excluded — they carry no usable rate, same + as the volume query excludes ``tao_amount = 0``. Direction is + lowercased to match ``insert_swap_outcome``.""" + rows = self._fetchall( + """ + SELECT clearing_rate, tao_amount, resolved_block, miner_hotkey + FROM swap_outcomes + WHERE resolved_block >= ? + AND completed = 1 + AND clearing_rate > 0 + AND from_chain = ? + AND to_chain = ? + """, + (since_block, (from_chain or '').lower(), (to_chain or '').lower()), + ) + return [ + (float(r['clearing_rate']), int(r['tao_amount'] or 0), int(r['resolved_block']), r['miner_hotkey']) + for r in rows + ] + def prune_swap_outcomes_older_than(self, cutoff_block: int) -> None: if cutoff_block <= 0: return @@ -877,7 +906,8 @@ def init_db(self) -> None: resolved_block INTEGER NOT NULL, tao_amount INTEGER NOT NULL DEFAULT 0, from_chain TEXT NOT NULL DEFAULT '', - to_chain TEXT NOT NULL DEFAULT '' + to_chain TEXT NOT NULL DEFAULT '', + clearing_rate REAL NOT NULL DEFAULT 0.0 ); CREATE INDEX IF NOT EXISTS idx_swap_outcomes_hotkey ON swap_outcomes(miner_hotkey); @@ -976,6 +1006,7 @@ def init_db(self) -> None: ('swap_outcomes', 'tao_amount', 'INTEGER NOT NULL DEFAULT 0'), ('swap_outcomes', 'from_chain', "TEXT NOT NULL DEFAULT ''"), ('swap_outcomes', 'to_chain', "TEXT NOT NULL DEFAULT ''"), + ('swap_outcomes', 'clearing_rate', 'REAL NOT NULL DEFAULT 0.0'), ): try: conn.execute(f'ALTER TABLE {table} ADD COLUMN {column} {ddl}') diff --git a/tests/test_scoring_v1.py b/tests/test_scoring_v1.py index 038db3b..ff7c35b 100644 --- a/tests/test_scoring_v1.py +++ b/tests/test_scoring_v1.py @@ -2805,3 +2805,282 @@ def test_fresh_seed_scores_one_trailing_window(self): seed = max(0, block - SCORING_WINDOW_BLOCKS) start, end = scoring_window_bounds(current_block=block, last_scored_block=seed) assert (start, end) == (block - SCORING_WINDOW_BLOCKS, block) + + +class TestQualityReferenceHelper: + """Unit tests for the depth reference + quality-factor pure functions.""" + + def test_empty_observations_bootstrap_none(self): + from allways.validator.scoring import compute_quality_reference + + assert compute_quality_reference([], now_block=1_000) is None + + def test_below_n_min_bootstrap_none(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, 'a'), (11.0, 1, 0, 'b')] + assert compute_quality_reference(obs, now_block=0, n_min=5) is None + + def test_volume_weighted_mean(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, 'a'), (20.0, 99, 0, 'b')] + ref = compute_quality_reference(obs, now_block=0, n_min=2, trim_pct=0.0, per_miner_cap=1.0) + # (1·10 + 99·20) / 100 = 19.9 — big swap dominates. + assert np.isclose(ref, 19.9) + + def test_recency_decay_favors_recent(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, 'a'), (20.0, 1, 1_000, 'b')] + ref = compute_quality_reference( + obs, now_block=1_000, n_min=2, trim_pct=0.0, per_miner_cap=1.0, half_life_blocks=1_000 + ) + # old weight 2^-1 = 0.5, recent 2^0 = 1.0 → (5 + 20) / 1.5. + assert np.isclose(ref, 25.0 / 1.5) + + def test_trim_drops_outlier(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, f'm{i}') for i in range(10)] + [(1e6, 1, 0, 'outlier')] + ref = compute_quality_reference(obs, now_block=0, n_min=5, trim_pct=0.10, per_miner_cap=1.0) + # 11 rows, drop 1 each tail → the 1e6 outlier is trimmed away. + assert np.isclose(ref, 10.0) + + def test_per_miner_cap_limits_flood(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(100.0, 1, 0, 'flood') for _ in range(20)] + [(10.0, 1, 0, f'h{i}') for i in range(5)] + ref = compute_quality_reference(obs, now_block=0, n_min=5, trim_pct=0.0, per_miner_cap=0.30) + uncapped = (20 * 100 + 5 * 10) / 25 # 80.4 — flood dominates + assert ref < uncapped + # flood capped to 30% of weight → 800 / 12.5 = 64.0, pulled toward honest. + assert np.isclose(ref, 64.0) + + def test_shuffle_invariance(self): + """Determinism: the reference must be byte-identical regardless of input + order (guards the stable-sort + math.fsum consensus requirement).""" + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0 + i, (i % 3) + 1, i * 10, f'm{i % 4}') for i in range(15)] + ref1 = compute_quality_reference(obs, now_block=200, n_min=3) + ref2 = compute_quality_reference(list(reversed(obs)), now_block=200, n_min=3) + ref3 = compute_quality_reference(obs[7:] + obs[:7], now_block=200, n_min=3) + assert ref1 == ref2 == ref3 + + def test_factor_none_reference_is_noop(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(123.0, None, False) == 1.0 + + def test_factor_at_market_is_floor(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(100.0, 100.0, False) == 0.5 + assert quality_factor(100.0, 100.0, True) == 0.5 + + def test_factor_anchor_saturates_both_directions(self): + from allways.validator.scoring import quality_factor + + # btc→tao: 5% higher = full bonus. tao→btc: 5% lower = full bonus. + assert np.isclose(quality_factor(105.0, 100.0, False), 1.0) + assert np.isclose(quality_factor(95.0, 100.0, True), 1.0) + + def test_factor_half_anchor_interpolates(self): + from allways.validator.scoring import quality_factor + + # 2.5% improvement = half the anchor → 0.5 + 0.5·0.5 = 0.75. + assert np.isclose(quality_factor(102.5, 100.0, False), 0.75) + + def test_factor_below_market_floored(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(98.0, 100.0, False) == 0.5 # worse than market, higher-better + assert quality_factor(105.0, 100.0, True) == 0.5 # worse than market, lower-better + + def test_factor_past_anchor_capped(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(200.0, 100.0, False) == 1.0 + + +class TestQualityWeighting: + """End-to-end depth/quality weighting via calculate_miner_rewards.""" + + def seed_crown(self, v, hotkey, rate, from_chain='tao', to_chain='btc'): + conn = v.state_store.require_connection() + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + (hotkey, from_chain, to_chain, rate, 0), + ) + conn.commit() + + def seed_clearing( + self, v, hotkey, rate, n=25, from_chain='tao', to_chain='btc', tao_amount=100_000_000, swap_id_base=1_000 + ): + for i in range(n): + v.state_store.insert_swap_outcome( + swap_id=swap_id_base + i, + miner_hotkey=hotkey, + completed=True, + resolved_block=9_900, # inside the default [9_700, 10_000] window + tao_amount=tao_amount, + from_chain=from_chain, + to_chain=to_chain, + clearing_rate=rate, + ) + + def test_deep_rate_earns_full_pool(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + # tao→btc: lower is deeper. Reference 0.00020, holder posts 6% lower. + self.seed_clearing(v, 'hk_a', rate=0.00020) + self.seed_crown(v, 'hk_a', rate=0.00020 * 0.94) + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC, atol=1e-6) + v.state_store.close() + + def test_at_market_rate_earns_floor(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + self.seed_clearing(v, 'hk_a', rate=0.00020) + self.seed_crown(v, 'hk_a', rate=0.00020) # at reference → quality floor + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * 0.5, atol=1e-6) + # Unearned half + the empty btc→tao pool recycle; total still 1.0. + np.testing.assert_allclose(rewards.sum(), 1.0, atol=1e-6) + v.state_store.close() + + def test_bootstrap_below_n_min_behaves_like_today(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + # Only 5 observations (< QUALITY_N_MIN) → reference disabled → factor 1.0. + self.seed_clearing(v, 'hk_a', rate=0.00020, n=5) + self.seed_crown(v, 'hk_a', rate=0.00020) # at market, but depth is off + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC, atol=1e-6) + v.state_store.close() + + def test_btc_tao_direction_deep_earns_full(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + # btc→tao: higher is deeper. Reference 280, holder posts 6% higher. + self.seed_clearing(v, 'hk_a', rate=280.0, from_chain='btc', to_chain='tao') + self.seed_crown(v, 'hk_a', rate=280.0 * 1.06, from_chain='btc', to_chain='tao') + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_BTC_TAO, atol=1e-6) + v.state_store.close() + + def test_idle_and_shallow_stacks_to_quarter(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a', 'hk_b']) + v = make_validator(tmp_path, hotkeys) + # hk_a holds crown at market (quality 0.5); hk_b serves all volume so + # hk_a's vol_factor is the idle floor (0.5). Independent multiply → 0.25. + self.seed_crown(v, 'hk_a', rate=0.00020) + self.seed_clearing(v, 'hk_b', rate=0.00020, swap_id_base=2_000) + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * 0.25, atol=1e-6) + v.state_store.close() + + +class TestClearingRateStorage: + """clearing_rate column + per-direction query for the depth reference.""" + + def test_round_trip(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_swap_outcome( + swap_id=1, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.00021, + ) + rows = store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') + assert len(rows) == 1 + rate, tao, block, hk = rows[0] + assert np.isclose(rate, 0.00021) + assert (tao, block, hk) == (5, 100, 'hk_a') + store.close() + + def test_excludes_zero_rate_and_timed_out(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + # completed but no usable rate (legacy / unresolved) + store.insert_swap_outcome( + swap_id=1, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.0, + ) + # timed out (never excluded by completed=1) + store.insert_swap_outcome( + swap_id=2, + miner_hotkey='hk_a', + completed=False, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.0003, + ) + assert store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') == [] + store.close() + + def test_direction_filter_lowercased(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_swap_outcome( + swap_id=1, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.0002, + ) + store.insert_swap_outcome( + swap_id=2, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='btc', + to_chain='tao', + clearing_rate=300.0, + ) + # Query with uppercase → normalized to lowercase, matches the tao→btc row only. + rows = store.get_clearing_rates_by_direction_since(0, 'TAO', 'BTC') + assert len(rows) == 1 and np.isclose(rows[0][0], 0.0002) + store.close() + + +class TestEventWatcherPassesClearingRate: + """SwapCompleted resolves the swap's clearing rate into the outcome row.""" + + def test_swap_completed_persists_clearing_rate(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + swap = SimpleNamespace(rate='0.00022', from_chain='tao', to_chain='btc') + watcher.swap_tracker = SimpleNamespace(active={7: swap}, resolve=MagicMock()) + watcher.apply_event(100, 'SwapInitiated', {'swap_id': 7, 'miner': 'hk_a'}) + watcher.apply_event(200, 'SwapCompleted', {'swap_id': 7, 'miner': 'hk_a', 'tao_amount': 5}) + rows = store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') + assert len(rows) == 1 and np.isclose(rows[0][0], 0.00022) + store.close() + + def test_unparseable_rate_falls_back_to_zero(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + swap = SimpleNamespace(rate='', from_chain='tao', to_chain='btc') + watcher.swap_tracker = SimpleNamespace(active={7: swap}, resolve=MagicMock()) + watcher.apply_event(100, 'SwapInitiated', {'swap_id': 7, 'miner': 'hk_a'}) + watcher.apply_event(200, 'SwapCompleted', {'swap_id': 7, 'miner': 'hk_a', 'tao_amount': 5}) + # Row written with clearing_rate 0 → excluded from the reference. + assert store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') == [] + store.close()