From 6aaf2d01ccb6d0e8879f2d6c50cc67478e3116bd Mon Sep 17 00:00:00 2001 From: coderNp101 Date: Fri, 20 Mar 2026 18:22:50 +0545 Subject: [PATCH] Add historical refund sync changes --- neurons/miner.py | 24 ++++++ tensorusd/auction/event_listener.py | 75 +++++++++++++++-- tensorusd/miner/auction_manager.py | 121 ++++++++++++++++++++++------ tensorusd/utils/config.py | 10 +++ 4 files changed, 201 insertions(+), 29 deletions(-) diff --git a/neurons/miner.py b/neurons/miner.py index 6802236..fdd1c6f 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -40,6 +40,7 @@ from tensorusd.auction.event_listener import AuctionEventListener from tensorusd.miner.bidding import BiddingStrategy from tensorusd.miner.auction_manager import MinerAuctionManager +from tensorusd.utils.subnet import get_dynamic_info class Miner(BaseMinerNeuron): @@ -139,6 +140,29 @@ def _handle_auction_event(self, event: AuctionUnionEvent): def run(self): """Override run to start event listener alongside axon.""" + if self.config.miner.refund_sync: + dynamic_info = get_dynamic_info(self.subtensor, self.config.netuid) + start_block = dynamic_info["last_step_block"] + end_block = self.auction_contract.get_current_block() + + bt.logging.info( + "Catching up on historical finalized auctions for refunds " + f"from block {start_block} to {end_block}..." + ) + asyncio.run( + self.auction_manager.sync_historical_finalized_refunds( + event_listener=self.event_listener, + start_block=start_block, + end_block=end_block, + ) + ) + else: + bt.logging.info( + "Historical refund catch-up disabled " + "(--miner.refund_sync not set); " + "starting in live-only mode." + ) + bt.logging.info("Catching up on active auctions...") asyncio.run(self.auction_manager.sync_active_auctions()) diff --git a/tensorusd/auction/event_listener.py b/tensorusd/auction/event_listener.py index ba8ccc8..b1ed7fa 100644 --- a/tensorusd/auction/event_listener.py +++ b/tensorusd/auction/event_listener.py @@ -5,7 +5,7 @@ """ import threading -from typing import Callable, Optional +from typing import Callable, Optional, Set from scalecodec.base import ScaleBytes from substrateinterface import SubstrateInterface @@ -52,13 +52,17 @@ def __init__( self.contract_metadata: Optional[ContractMetadata] = None + def _ensure_metadata_loaded(self): + """Load contract metadata once for decoding events.""" + if self.contract_metadata is None: + self.contract_metadata = ContractMetadata.create_from_file( + metadata_file=self.metadata_path, + substrate=self.substrate, + ) + def run(self): """Main loop - subscribe to events (runs in background thread).""" - # Load contract metadata - self.contract_metadata = ContractMetadata.create_from_file( - metadata_file=self.metadata_path, - substrate=self.substrate, - ) + self._ensure_metadata_loaded() bt.logging.info( f"Event listener started for Auction contract {self.contract_address}" @@ -69,6 +73,65 @@ def run(self): except Exception as e: bt.logging.error(f"Event listener error: {e}") + def sync_historical_events( + self, + start_block: int, + end_block: int, + event_types: Optional[Set[AuctionEventType]] = None, + progress_log_interval: int = 200, + ) -> int: + """ + Process historical auction events in a bounded block range. + + Args: + start_block: Inclusive start block. + end_block: Inclusive end block. + event_types: Optional filter for specific auction event types. + progress_log_interval: How often to log progress while scanning. + + Returns: + Number of decoded events passed to callback. + """ + if end_block < start_block: + return 0 + + self._ensure_metadata_loaded() + decoded_count = 0 + + for block_number in range(start_block, end_block + 1): + try: + block_hash = self.substrate.get_block_hash(block_number) + events = self.substrate.get_events(block_hash) + except Exception as e: + bt.logging.warning( + f"Skipping block {block_number} during historical sync: {e}" + ) + continue + + for event in events: + if not self._is_contract_event(event): + continue + + decoded_event = self._decode_contract_event(event, block_number) + if decoded_event is None: + continue + if event_types and decoded_event.event_type not in event_types: + continue + + decoded_count += 1 + try: + self.callback(decoded_event) + except Exception as e: + bt.logging.error(f"Error in historical event callback: {e}") + + if progress_log_interval > 0 and block_number % progress_log_interval == 0: + bt.logging.info( + f"Historical auction scan progress: block={block_number}, " + f"decoded_events={decoded_count}" + ) + + return decoded_count + def _subscription_handler(self, obj, update_nr, subscription_id): """Handle new blocks.""" if self.should_exit: diff --git a/tensorusd/miner/auction_manager.py b/tensorusd/miner/auction_manager.py index cc67935..de9ac11 100644 --- a/tensorusd/miner/auction_manager.py +++ b/tensorusd/miner/auction_manager.py @@ -4,15 +4,18 @@ Handles auction events and coordinates bidding. """ -from typing import Optional +from typing import Optional, Set import bittensor as bt from tensorusd.auction.types import ( AuctionCreatedEvent, AuctionFinalizedEvent, + AuctionEventType, + AuctionUnionEvent, BidPlacedEvent, ) +from tensorusd.auction.event_listener import AuctionEventListener from tensorusd.auction.contract import ( TensorUSDAuctionContract, TensorUSDVaultContract, @@ -184,37 +187,109 @@ async def handle_auction_finalized(self, event: AuctionFinalizedEvent): Args: event: AuctionFinalized event """ + self._handle_finalized_result(event, source="live") + + def _handle_finalized_result(self, event: AuctionFinalizedEvent, source: str): + """Handle finalized outcome and attempt refund claim for lost auctions.""" auction_id = event.auction_id + if auction_id is None: + bt.logging.warning( + f"[{source}] Skipping finalized event with missing auction_id" + ) + return + my_address = self.wallet.coldkey.ss58_address if event.winner == my_address: bt.logging.success( - f"Won auction {auction_id}! winning_bid={event.highest_bid}" + f"[{source}] Won auction {auction_id}! winning_bid={event.highest_bid}" + ) + return + + bt.logging.info( + f"[{source}] Auction {auction_id} finalized. " + f"winner={event.winner}, " + f"winning_bid={event.highest_bid}" + ) + miner_bid = self.auction_contract.get_auction_bid(auction_id, my_address) + if miner_bid is None: + bt.logging.info( + f"[{source}] No refundable bid found for auction {auction_id} and bidder {my_address}" + ) + return + + tx_hash = self.auction_contract.withdraw_refund(auction_id, miner_bid.id) + if tx_hash: + bt.logging.success( + f"[{source}] Refund withdrawn for auction {auction_id}: " + f"amount={miner_bid.amount}, tx={tx_hash}" ) else: + bt.logging.warning( + f"[{source}] Refund withdrawal failed or already withdrawn for auction {auction_id}" + ) + + async def sync_historical_finalized_refunds( + self, + event_listener: AuctionEventListener, + start_block: int, + end_block: int, + ): + """ + Catch up historical finalized auctions and withdraw pending refunds. + + Scans only the provided bounded block range and processes auction + finalized events from the auction contract. + """ + if end_block < start_block: bt.logging.info( - f"Auction {auction_id} finalized. " - f"winner={event.winner}, " - f"winning_bid={event.highest_bid}" + f"Skipping historical refund sync: invalid range {start_block}-{end_block}" ) - miner_bid = self.auction_contract.get_auction_bid(auction_id, my_address) - if miner_bid is None: - bt.logging.warning( - f"Could not fetch miner bid for auction {auction_id}, skipping" - ) - else: - tx_hash = await self.auction_contract.withdraw_refund( - auction_id, miner_bid.id - ) - if tx_hash: - bt.logging.success( - f"Refund withdrawn for auction {auction_id}: " - f"amount={miner_bid.amount}, tx={tx_hash}" - ) - else: - bt.logging.error( - f"Failed to withdraw refund for auction {auction_id}" - ) + return + + bt.logging.info( + f"Syncing historical finalized auctions for refunds: " + f"start_block={start_block}, end_block={end_block}" + ) + + seen_auction_ids: Set[int] = set() + finalized_events_seen = 0 + lost_auctions_checked = 0 + + def _historical_callback(event: AuctionUnionEvent): + nonlocal finalized_events_seen, lost_auctions_checked + if event.event_type != AuctionEventType.FINALIZED: + return + if event.auction_id is None: + return + + finalized_events_seen += 1 + if event.auction_id in seen_auction_ids: + return + seen_auction_ids.add(event.auction_id) + + if event.winner != self.wallet.coldkey.ss58_address: + lost_auctions_checked += 1 + self._handle_finalized_result(event, source="historical") + + original_callback = event_listener.callback + try: + event_listener.callback = _historical_callback + processed = event_listener.sync_historical_events( + start_block=start_block, + end_block=end_block, + event_types={AuctionEventType.FINALIZED}, + ) + finally: + event_listener.callback = original_callback + + bt.logging.info( + f"Historical refund sync complete: " + f"finalized_events={finalized_events_seen}, " + f"unique_auctions={len(seen_auction_ids)}, " + f"lost_checked={lost_auctions_checked}, " + f"decoded_processed={processed}" + ) async def _submit_bid(self, auction_id: int, bid_amount: int) -> Optional[str]: """ diff --git a/tensorusd/utils/config.py b/tensorusd/utils/config.py index 14ef4ae..7cee099 100644 --- a/tensorusd/utils/config.py +++ b/tensorusd/utils/config.py @@ -146,6 +146,16 @@ def add_args(cls, parser): def add_miner_args(cls, parser): """Add miner specific arguments to the parser.""" + parser.add_argument( + "--miner.refund_sync", + action="store_true", + help=( + "Enable bounded historical scan for finalized auctions at startup " + "to auto-claim pending refunds. Disabled by default (live-only mode)." + ), + default=False, + ) + parser.add_argument( "--neuron.name", type=str,