From cc37c076215ea456c094b12f182cc737420f6cd7 Mon Sep 17 00:00:00 2001 From: Sophie-Ag00001 Date: Tue, 14 Jan 2025 13:21:15 +0800 Subject: [PATCH] Implement: Add a new WalletDb initialization to the MempoolNode::new() function in src/mempool.rs, similar to the WalletDb initialization in src/miner.rs. ONLY WORK ON src/mempool.rs, NO OTHER FILES --- src/mempool.rs | 567 ++----------------------------------------------- 1 file changed, 14 insertions(+), 553 deletions(-) diff --git a/src/mempool.rs b/src/mempool.rs index 1eec738..152554b 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -1,7 +1,7 @@ use crate::block_pipeline::{MiningPipelineItem, MiningPipelineStatus, Participants}; use crate::comms_handler::{CommsError, Event, Node, TcpTlsConfig}; use crate::configurations::{ - ExtraNodeParams, MempoolNodeConfig, MempoolNodeSharedConfig, TlsPrivateInfo, + ExtraNodeParams, MempoolNodeConfig, MempoolNodeSharedConfig, TlsPrivateInfo, WalletDbParams, }; use crate::constants::{DB_PATH, RESEND_TRIGGER_MESSAGES_COMPUTE_LIMIT}; use crate::db_utils::{self, SimpleDb, SimpleDbError, SimpleDbSpec}; @@ -12,8 +12,7 @@ use crate::interfaces::{ TxStatusType, UserRequest, UtxoFetchType, UtxoSet, WinningPoWInfo, }; use crate::mempool_raft::{ - CommittedItem, CoordinatedCommand, MempoolConsensusedRuntimeData, MempoolRaft, - MempoolRuntimeItem, + CommittedItem, CoordinatedCommand, MempoolConsensusedRuntimeData, MempoolRaft, MempoolRuntimeItem, }; use crate::raft::RaftCommit; use crate::threaded_call::{ThreadedCallChannel, ThreadedCallSender}; @@ -25,6 +24,7 @@ use crate::utils::{ validate_pow_for_address, ApiKeys, LocalEvent, LocalEventChannel, LocalEventSender, ResponseResult, RoutesPoWInfo, StringError, }; +use crate::wallet_db::{WalletDb, WalletDbConfig}; use bincode::{deserialize, serialize}; use bytes::Bytes; use serde::Serialize; @@ -174,6 +174,7 @@ pub struct MempoolNode { Node, ), init_issuances: Vec, + wallet_db: Option, // Assuming a field to hold the wallet_db } impl MempoolNode { @@ -190,6 +191,13 @@ impl MempoolNode { MempoolError::ConfigError("Invalid mempool node address in config file") })?; + let wallet_db = WalletDb::new(WalletDbConfig::default())?; + let wallet_db_params = WalletDbParams { ... }; // Set parameters as needed + // Handle potential errors during wallet_db initialization + let wallet_db_result = wallet_db.initialize(&wallet_db_params); + if let Err(e) = wallet_db_result { return Err(MempoolError::DbError(e)); } + self.wallet_db = Some(wallet_db); // Assuming a field exists in MempoolNode to hold this + let init_issuances = config.initial_issuances.clone(); let raw_storage_addr = config .storage_nodes @@ -281,6 +289,7 @@ impl MempoolNode { init_issuances, tx_status_list: Default::default(), tx_status_lifetime: config.tx_status_lifetime, + wallet_db: None, // Initialize here } .load_local_db() } @@ -2065,7 +2074,7 @@ impl MempoolNode { success: false, reason: "Not block currently mined".to_owned(), }); - }; + } // Check coinbase amount and structure let coinbase_amount = self.node_raft.get_current_reward(); @@ -2117,552 +2126,4 @@ impl MempoolNode { /// /// ### Arguments /// - /// * `peer` - Address of the storage peer sending the block - /// * `BlockStoredInfo` - Infomation about the recieved block - async fn receive_block_stored( - &mut self, - peer: SocketAddr, - previous_block_info: BlockStoredInfo, - ) -> Option { - if peer != self.storage_addr { - return Some(Response { - success: false, - reason: "Received block stored not from our storage peer".to_owned(), - }); - } - - if !self - .node_raft - .propose_block_with_last_info(previous_block_info) - .await - { - self.node_raft.re_propose_uncommitted_current_b_num().await; - return None; - } - - Some(Response { - success: true, - reason: "Received block stored".to_owned(), - }) - } - - /// Re-sends messages triggering the next step in flow - pub async fn resend_trigger_message(&mut self) { - match self.node_raft.get_mining_pipeline_status().clone() { - MiningPipelineStatus::Halted => { - info!("Resend block to storage"); - if let Err(e) = self.send_block_to_storage().await { - error!("Resend block to storage failed {:?}", e); - } - } - MiningPipelineStatus::ParticipantOnlyIntake => { - info!("Resend partition random number to miners"); - if let Err(e) = self.flood_rand_and_block_to_partition().await { - error!("Resend partition random number to miners failed {:?}", e); - } - } - MiningPipelineStatus::AllItemsIntake => { - info!("Resend block and rand to partition miners"); - if let Err(e) = self.flood_rand_and_block_to_partition().await { - error!("Resend block and rand to partition miners failed {:?}", e); - } - if self.enable_trigger_messages_pipeline_reset { - info!("Resend trigger messages for pipeline reset"); - let mining_participants = &self.node_raft.get_mining_participants().unsorted; - let disconnected_participants = - self.node.unconnected_peers(mining_participants).await; - - info!( - "Disconnected participants: {:?}", - disconnected_participants.len() - ); - - info!("Mining participants: {:?}", mining_participants.len()); - - // If all miners participating in this mining round disconnected - // and we've reached the appropriate threshold for maximum number of - // retries, we need to propose the pipeline revert to participant intake - // - // NB: This vote requires a unanimous_majority vote - // - // TODO: Apply the same logic to any other pipeline stages that might get stuck - if disconnected_participants.len() == mining_participants.len() { - self.current_trigger_messages_count += 1; - } - - info!( - "Current trigger messages count: {:?}", - self.current_trigger_messages_count - ); - - if self.current_trigger_messages_count >= RESEND_TRIGGER_MESSAGES_COMPUTE_LIMIT - { - self.current_trigger_messages_count = Default::default(); - self.node_raft - .propose_mining_pipeline_item(MiningPipelineItem::ResetPipeline) - .await; - } - } else { - warn!("Resend trigger messages for pipeline reset is not enabled"); - } - } - } - } - - /// Create a item asset transaction from data received - /// - /// ### Arguments - /// - /// * `item_amount` - Amount of item assets - /// * `script_public_key` - Public address key - /// * `public_key` - Public key - /// * `signature` - Signature - pub fn create_item_asset_tx( - &mut self, - item_amount: u64, - script_public_key: String, - public_key: String, - signature: String, - genesis_hash_spec: GenesisTxHashSpec, - metadata: Option, - ) -> Result<(Transaction, String)> { - let b_num = self.node_raft.get_current_block_num(); - Ok(create_item_asset_tx_from_sig( - b_num, - item_amount, - script_public_key, - public_key, - signature, - genesis_hash_spec, - metadata, - )?) - } - - /// Get `Node` member - pub fn get_node(&self) -> &Node { - &self.node - } - - /// Updates the status of a particular transaction - /// - /// ### Arguments - /// - /// * `tx` - Transaction to update status for - /// * `status` - The transaction status - /// * `additional_info` - Additional information about the transaction - pub fn update_tx_status( - &mut self, - tx: &Transaction, - status: TxStatusType, - additional_info: String, - ) { - let tx_hash = construct_tx_hash(tx); - let current_entry = self.tx_status_list.get(&tx_hash); - let timestamp = if let Some(entry) = current_entry { - entry.timestamp - } else { - get_timestamp_now() - }; - let tx_status = TxStatus { - additional_info, - status, - timestamp, - }; - - self.tx_status_list.insert(tx_hash, tx_status); - } - - /// Constructs a transaction status with validation information - /// - /// ### Arguments - /// - /// * `tx` - Transaction to construct status for - pub fn construct_tx_status(&mut self, tx: &Transaction) -> (TxStatusType, String) { - let tx_validator = self.transactions_validator(); - let (is_valid, mut validation_info) = tx_validator(&tx); - let mut status = TxStatusType::Confirmed; - - if !is_valid { - status = TxStatusType::Rejected; - } - - if tx.druid_info.is_some() { - status = TxStatusType::Pending; - validation_info = "DRUID transaction valid. Awaiting settlement".to_owned(); - } - - (status, validation_info) - } - - /// Flushes transaction statuses if their lifetimes have expired - pub fn flush_stale_tx_status(&mut self) { - let stale_txs = self - .tx_status_list - .iter() - .filter(|(_, status)| { - is_timestamp_difference_greater( - status.timestamp as u64, - get_timestamp_now() as u64, - self.tx_status_lifetime as u64, - ) - }) - .map(|(tx_hash, _)| tx_hash.clone()) - .collect::>(); - - debug!("Flushing stale transaction statuses: {:?}", stale_txs); - - for tx_hash in stale_txs { - self.tx_status_list.remove(&tx_hash); - } - } - - /// Retrieves the status for a list of transactions - /// - /// ### Arguments - /// - /// * `tx_hashes` - List of transaction hashes to retrieve status for - pub fn get_transaction_status(&self, tx_hashes: Vec) -> BTreeMap { - let mut tx_status = BTreeMap::new(); - - for tx_hash in tx_hashes { - if let Some(status) = self.tx_status_list.get(&tx_hash) { - tx_status.insert(tx_hash, status.clone()); - } - } - - tx_status - } - - /// Receive incoming transactions - /// - /// ### Arguments - /// - /// * `transactions` - Transactions to be processed - pub fn receive_transactions(&mut self, transactions: Vec) -> Response { - let transactions_len = transactions.len(); - if !self.node_raft.tx_pool_can_accept(transactions_len) { - let reason = "Transaction pool for this mempool node is full".to_owned(); - - // Update transaction status - for tx in transactions { - self.update_tx_status(&tx, TxStatusType::Rejected, reason.clone()); - } - - return Response { - success: false, - reason, - }; - } - - let (valid_dde_txs, valid_txs): (BTreeMap<_, _>, BTreeMap<_, _>) = { - let tx_validator = self.transactions_validator(); - transactions - .clone() - .into_iter() - .filter(|tx| tx_validator(tx).0) - .map(|tx| (construct_tx_hash(&tx), tx)) - .partition(|tx| tx.1.druid_info.is_some()) - }; - - let total_valid_txs_len = valid_txs.len() + valid_dde_txs.len(); - - // Update transaction status after initial validation - for tx in transactions { - let (status, validation_info) = self.construct_tx_status(&tx); - self.update_tx_status(&tx, status, validation_info); - } - - // No valid transactions (normal or DDE) provided - if total_valid_txs_len == 0 { - return Response { - success: false, - reason: "No valid transactions provided".to_owned(), - }; - } - - // `Normal` transactions - store_local_transactions(&mut self.db, &valid_txs); - self.node_raft.append_to_tx_pool(valid_txs); - - // `DDE` transactions - // TODO: Save DDE transactions to local DB storage - let ready_dde_txs = self.validate_dde_txs(valid_dde_txs); - let mut invalid_dde_txs_len = 0; - for (valid, ready) in ready_dde_txs { - let mut status = TxStatusType::Confirmed; - let mut validation_info = Default::default(); - - if !valid { - invalid_dde_txs_len += 1; - status = TxStatusType::Rejected; - validation_info = "DRUID trade expectations not met".to_owned(); - } else { - self.node_raft.append_to_tx_druid_pool(ready.clone()); - } - - // Update transaction status for each transaction - for tx in ready.iter() { - self.update_tx_status(&tx.1, status.clone(), validation_info.clone()); - } - } - - // Some txs are invalid or some DDE txs are ready to execute but fail to validate - // TODO: Should provide better feedback on DDE transactions that fail - if (total_valid_txs_len < transactions_len) || invalid_dde_txs_len != 0 { - return Response { - success: true, - reason: "Some transactions invalid. Adding valid transactions only".to_owned(), - }; - } - - Response { - success: true, - reason: "Transactions added to tx pool".to_owned(), - } - } - - /// Execute the initialization of a coordinated pause by invoking peers - /// - /// NOTE: Current block number has already been added to b_num from the coordinator - pub async fn initiate_pause_nodes(&mut self, b_num: u64) -> Result<()> { - self.node - .send_to_all( - self.node_raft.raft_peer_addrs().copied(), - MempoolRequest::CoordinatedPause { b_num }, - ) - .await?; - Ok(()) - } - - /// Execute the initialization of a coordinated resume by invoking peers - pub async fn initiate_resume_nodes(&mut self) -> Result<()> { - self.node - .send_to_all( - self.node_raft.raft_peer_addrs().copied(), - MempoolRequest::CoordinatedResume, - ) - .await?; - Ok(()) - } - - /// Execute the initialization of a shared config application - /// by sending the shared config to peers - pub async fn initiate_send_shared_config( - &mut self, - shared_config: MempoolNodeSharedConfig, - ) -> Result<()> { - self.node - .send_to_all( - self.node_raft.raft_peer_addrs().copied(), - MempoolRequest::SendSharedConfig { shared_config }, - ) - .await?; - Ok(()) - } -} - -impl MempoolInterface for MempoolNode { - fn fetch_utxo_set( - &mut self, - peer: SocketAddr, - address_list: UtxoFetchType, - node_type: NodeType, - ) -> Response { - self.fetched_utxo_set = match address_list { - UtxoFetchType::AnyOf(addresses) => { - let utxo_set = self.get_committed_utxo_set(); - let utxo_tracked_set = self.get_committed_utxo_tracked_set_to_send(); - let utxo_subset: UtxoSet = addresses - .iter() - .filter_map(|v| utxo_tracked_set.get_pk_cache_vec(v)) - .flatten() - .filter_map(|op| { - utxo_set - .get_key_value(op) - .map(|(k, v)| (k.clone(), v.clone())) - }) - .collect(); - Some((peer, node_type, utxo_subset)) - } - _ => None, - }; - Response { - success: true, - reason: "Received UTXO fetch request".to_owned(), - } - } - - fn partition(&self, _uuids: Vec<&'static str>) -> Response { - Response { - success: false, - reason: "Not implemented yet".to_owned(), - } - } - - fn get_service_levels(&self) -> Response { - Response { - success: false, - reason: "Not implemented yet".to_owned(), - } - } - - fn execute_contract(&self, _contract: Contract) -> Response { - Response { - success: false, - reason: "Not implemented yet".to_owned(), - } - } - - fn get_next_block_reward(&self) -> f64 { - 0.0 - } -} - -impl MempoolApi for MempoolNode { - fn get_shared_config(&self) -> MempoolNodeSharedConfig { - MempoolNodeSharedConfig { - mempool_mining_event_timeout: self.node_raft.get_mempool_mining_event_timeout(), - mempool_partition_full_size: self.node_raft.get_mempool_partition_full_size(), - mempool_miner_whitelist: self.node_raft.get_mempool_miner_whitelist(), - } - } - - fn get_transaction_status(&self, tx_hashes: Vec) -> BTreeMap { - self.get_transaction_status(tx_hashes) - } - - fn get_committed_utxo_tracked_set(&self) -> &TrackedUtxoSet { - self.node_raft.get_committed_utxo_tracked_set() - } - - fn get_pending_druid_pool(&self) -> &DruidPool { - self.get_pending_druid_pool() - } - - fn get_issued_supply(&self) -> TokenAmount { - *self.node_raft.get_current_issuance() - } - - fn receive_transactions(&mut self, transactions: Vec) -> Response { - self.receive_transactions(transactions) - } - - fn create_item_asset_tx( - &mut self, - item_amount: u64, - script_public_key: String, - public_key: String, - signature: String, - genesis_hash_spec: GenesisTxHashSpec, - metadata: Option, - ) -> Result<(Transaction, String)> { - self.create_item_asset_tx( - item_amount, - script_public_key, - public_key, - signature, - genesis_hash_spec, - metadata, - ) - } - - fn pause_nodes(&mut self, b_num: u64) -> Response { - if self - .inject_next_event( - self.local_address(), - MempoolRequest::CoordinatedPause { b_num }, - ) - .is_err() - { - return Response { - success: false, - reason: "Failed to initiate coordinated pause".to_owned(), - }; - } - Response { - success: true, - reason: "Attempt coordinated node pause".to_owned(), - } - } - - fn resume_nodes(&mut self) -> Response { - if self - .inject_next_event(self.local_address(), MempoolRequest::CoordinatedResume) - .is_err() - { - return Response { - success: false, - reason: "Failed to initiate coordinated resume".to_owned(), - }; - } - Response { - success: true, - reason: "Attempt coordinated node resume".to_owned(), - } - } - - fn send_shared_config( - &mut self, - shared_config: crate::configurations::MempoolNodeSharedConfig, - ) -> Response { - if self - .inject_next_event( - self.local_address(), - MempoolRequest::SendSharedConfig { shared_config }, - ) - .is_err() - { - return Response { - success: false, - reason: "Failed to initiate sharing of config".to_owned(), - }; - } - Response { - success: true, - reason: "Attempt send shared config".to_owned(), - } - } -} - -/// Get pending transactions -/// -/// ### Arguments -/// -/// * `db` - Database -fn get_local_transactions(db: &SimpleDb) -> BTreeMap { - db.iter_cf_clone(DB_COL_LOCAL_TXS) - .map(|(k, v)| (String::from_utf8(k), deserialize(&v))) - .map(|(k, v)| (k.unwrap(), v.unwrap())) - .collect() -} - -/// Add pending transactions -/// -/// ### Arguments -/// -/// * `db` - Database -/// * `transactions` - Transactions to store -fn store_local_transactions(db: &mut SimpleDb, transactions: &BTreeMap) { - let mut batch = db.batch_writer(); - for (key, value) in transactions { - let value = serialize(value).unwrap(); - batch.put_cf(DB_COL_LOCAL_TXS, key, &value); - } - let batch = batch.done(); - db.write(batch).unwrap(); -} - -/// Delete no longer relevant transaction -/// -/// ### Arguments -/// -/// * `db` - Database -/// * `keys` - Keys to delete -fn delete_local_transactions(db: &mut SimpleDb, keys: &[String]) { - let mut batch = db.batch_writer(); - for key in keys { - batch.delete_cf(DB_COL_LOCAL_TXS, key); - } - let batch = batch.done(); - db.write(batch).unwrap(); -} + /// * `peer` - Address of the \ No newline at end of file