Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions neurons/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from tensorusd.auction.event_listener import AuctionEventListener
from tensorusd.miner.bidding import BiddingStrategy
from tensorusd.miner.auction_manager import MinerAuctionManager
from tensorusd.utils.subnet import get_dynamic_info


class Miner(BaseMinerNeuron):
Expand Down Expand Up @@ -139,6 +140,29 @@ def _handle_auction_event(self, event: AuctionUnionEvent):

def run(self):
"""Override run to start event listener alongside axon."""
if self.config.miner.refund_sync:
dynamic_info = get_dynamic_info(self.subtensor, self.config.netuid)
start_block = dynamic_info["last_step_block"]
end_block = self.auction_contract.get_current_block()

bt.logging.info(
"Catching up on historical finalized auctions for refunds "
f"from block {start_block} to {end_block}..."
)
asyncio.run(
self.auction_manager.sync_historical_finalized_refunds(
event_listener=self.event_listener,
start_block=start_block,
end_block=end_block,
)
)
else:
bt.logging.info(
"Historical refund catch-up disabled "
"(--miner.refund_sync not set); "
"starting in live-only mode."
)

bt.logging.info("Catching up on active auctions...")
asyncio.run(self.auction_manager.sync_active_auctions())

Expand Down
75 changes: 69 additions & 6 deletions tensorusd/auction/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

import threading
from typing import Callable, Optional
from typing import Callable, Optional, Set

from scalecodec.base import ScaleBytes
from substrateinterface import SubstrateInterface
Expand Down Expand Up @@ -52,13 +52,17 @@ def __init__(

self.contract_metadata: Optional[ContractMetadata] = None

def _ensure_metadata_loaded(self):
"""Load contract metadata once for decoding events."""
if self.contract_metadata is None:
self.contract_metadata = ContractMetadata.create_from_file(
metadata_file=self.metadata_path,
substrate=self.substrate,
)

def run(self):
"""Main loop - subscribe to events (runs in background thread)."""
# Load contract metadata
self.contract_metadata = ContractMetadata.create_from_file(
metadata_file=self.metadata_path,
substrate=self.substrate,
)
self._ensure_metadata_loaded()

bt.logging.info(
f"Event listener started for Auction contract {self.contract_address}"
Expand All @@ -69,6 +73,65 @@ def run(self):
except Exception as e:
bt.logging.error(f"Event listener error: {e}")

def sync_historical_events(
self,
start_block: int,
end_block: int,
event_types: Optional[Set[AuctionEventType]] = None,
progress_log_interval: int = 200,
) -> int:
"""
Process historical auction events in a bounded block range.

Args:
start_block: Inclusive start block.
end_block: Inclusive end block.
event_types: Optional filter for specific auction event types.
progress_log_interval: How often to log progress while scanning.

Returns:
Number of decoded events passed to callback.
"""
if end_block < start_block:
return 0

self._ensure_metadata_loaded()
decoded_count = 0

for block_number in range(start_block, end_block + 1):
try:
block_hash = self.substrate.get_block_hash(block_number)
events = self.substrate.get_events(block_hash)
except Exception as e:
bt.logging.warning(
f"Skipping block {block_number} during historical sync: {e}"
)
continue

for event in events:
if not self._is_contract_event(event):
continue

decoded_event = self._decode_contract_event(event, block_number)
if decoded_event is None:
continue
if event_types and decoded_event.event_type not in event_types:
continue

decoded_count += 1
try:
self.callback(decoded_event)
except Exception as e:
bt.logging.error(f"Error in historical event callback: {e}")

if progress_log_interval > 0 and block_number % progress_log_interval == 0:
bt.logging.info(
f"Historical auction scan progress: block={block_number}, "
f"decoded_events={decoded_count}"
)

return decoded_count

def _subscription_handler(self, obj, update_nr, subscription_id):
"""Handle new blocks."""
if self.should_exit:
Expand Down
121 changes: 98 additions & 23 deletions tensorusd/miner/auction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
Handles auction events and coordinates bidding.
"""

from typing import Optional
from typing import Optional, Set

import bittensor as bt

from tensorusd.auction.types import (
AuctionCreatedEvent,
AuctionFinalizedEvent,
AuctionEventType,
AuctionUnionEvent,
BidPlacedEvent,
)
from tensorusd.auction.event_listener import AuctionEventListener
from tensorusd.auction.contract import (
TensorUSDAuctionContract,
TensorUSDVaultContract,
Expand Down Expand Up @@ -184,37 +187,109 @@ async def handle_auction_finalized(self, event: AuctionFinalizedEvent):
Args:
event: AuctionFinalized event
"""
self._handle_finalized_result(event, source="live")

def _handle_finalized_result(self, event: AuctionFinalizedEvent, source: str):
"""Handle finalized outcome and attempt refund claim for lost auctions."""
auction_id = event.auction_id
if auction_id is None:
bt.logging.warning(
f"[{source}] Skipping finalized event with missing auction_id"
)
return

my_address = self.wallet.coldkey.ss58_address

if event.winner == my_address:
bt.logging.success(
f"Won auction {auction_id}! winning_bid={event.highest_bid}"
f"[{source}] Won auction {auction_id}! winning_bid={event.highest_bid}"
)
return

bt.logging.info(
f"[{source}] Auction {auction_id} finalized. "
f"winner={event.winner}, "
f"winning_bid={event.highest_bid}"
)
miner_bid = self.auction_contract.get_auction_bid(auction_id, my_address)
if miner_bid is None:
bt.logging.info(
f"[{source}] No refundable bid found for auction {auction_id} and bidder {my_address}"
)
return

tx_hash = self.auction_contract.withdraw_refund(auction_id, miner_bid.id)
if tx_hash:
bt.logging.success(
f"[{source}] Refund withdrawn for auction {auction_id}: "
f"amount={miner_bid.amount}, tx={tx_hash}"
)
else:
bt.logging.warning(
f"[{source}] Refund withdrawal failed or already withdrawn for auction {auction_id}"
)

async def sync_historical_finalized_refunds(
self,
event_listener: AuctionEventListener,
start_block: int,
end_block: int,
):
"""
Catch up historical finalized auctions and withdraw pending refunds.

Scans only the provided bounded block range and processes auction
finalized events from the auction contract.
"""
if end_block < start_block:
bt.logging.info(
f"Auction {auction_id} finalized. "
f"winner={event.winner}, "
f"winning_bid={event.highest_bid}"
f"Skipping historical refund sync: invalid range {start_block}-{end_block}"
)
miner_bid = self.auction_contract.get_auction_bid(auction_id, my_address)
if miner_bid is None:
bt.logging.warning(
f"Could not fetch miner bid for auction {auction_id}, skipping"
)
else:
tx_hash = await self.auction_contract.withdraw_refund(
auction_id, miner_bid.id
)
if tx_hash:
bt.logging.success(
f"Refund withdrawn for auction {auction_id}: "
f"amount={miner_bid.amount}, tx={tx_hash}"
)
else:
bt.logging.error(
f"Failed to withdraw refund for auction {auction_id}"
)
return

bt.logging.info(
f"Syncing historical finalized auctions for refunds: "
f"start_block={start_block}, end_block={end_block}"
)

seen_auction_ids: Set[int] = set()
finalized_events_seen = 0
lost_auctions_checked = 0

def _historical_callback(event: AuctionUnionEvent):
nonlocal finalized_events_seen, lost_auctions_checked
if event.event_type != AuctionEventType.FINALIZED:
return
if event.auction_id is None:
return

finalized_events_seen += 1
if event.auction_id in seen_auction_ids:
return
seen_auction_ids.add(event.auction_id)

if event.winner != self.wallet.coldkey.ss58_address:
lost_auctions_checked += 1
self._handle_finalized_result(event, source="historical")

original_callback = event_listener.callback
try:
event_listener.callback = _historical_callback
processed = event_listener.sync_historical_events(
start_block=start_block,
end_block=end_block,
event_types={AuctionEventType.FINALIZED},
)
finally:
event_listener.callback = original_callback

bt.logging.info(
f"Historical refund sync complete: "
f"finalized_events={finalized_events_seen}, "
f"unique_auctions={len(seen_auction_ids)}, "
f"lost_checked={lost_auctions_checked}, "
f"decoded_processed={processed}"
)

async def _submit_bid(self, auction_id: int, bid_amount: int) -> Optional[str]:
"""
Expand Down
10 changes: 10 additions & 0 deletions tensorusd/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ def add_args(cls, parser):
def add_miner_args(cls, parser):
"""Add miner specific arguments to the parser."""

parser.add_argument(
"--miner.refund_sync",
action="store_true",
help=(
"Enable bounded historical scan for finalized auctions at startup "
"to auto-claim pending refunds. Disabled by default (live-only mode)."
),
default=False,
)

parser.add_argument(
"--neuron.name",
type=str,
Expand Down