diff --git a/src/net_processing.cpp b/src/net_processing.cpp index c01f93c21aed..22761dc78ed6 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -200,6 +200,8 @@ static constexpr size_t NUM_PRIVATE_BROADCAST_PER_TX{3}; /** Private broadcast connections must complete within this time. Disconnect the peer if it takes longer. */ static constexpr auto PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME{3min}; +GlobalMutex cs_blocks_in_flight ACQUIRED_AFTER(cs_main); + // Internal stuff namespace { /** Blocks that are in flight, and that are in the queue to be downloaded. */ @@ -413,6 +415,15 @@ struct Peer { * timestamp the peer sent in the version message. */ std::atomic m_time_offset{0s}; + /** List of blocks we've requested. */ + std::list vBlocksInFlight GUARDED_BY(cs_blocks_in_flight); + + /** Since when we're stalling block download progress (in microseconds), or 0. */ + std::chrono::microseconds m_stalling_since GUARDED_BY(cs_blocks_in_flight){0us}; + + /** When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty. */ + std::chrono::microseconds m_downloading_since GUARDED_BY(cs_blocks_in_flight){0us}; + explicit Peer(NodeId id, ServiceFlags our_services, bool is_inbound) : m_id{id} , m_our_services{our_services} @@ -445,11 +456,6 @@ struct CNodeState { const CBlockIndex* pindexBestHeaderSent{nullptr}; //! Whether we've started headers synchronization with this peer. bool fSyncStarted{false}; - //! Since when we're stalling block download progress (in microseconds), or 0. - std::chrono::microseconds m_stalling_since{0us}; - std::list vBlocksInFlight; - //! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty. - std::chrono::microseconds m_downloading_since{0us}; //! Whether we consider this a preferred download peer. bool fPreferredDownload{false}; /** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */ @@ -515,13 +521,13 @@ class PeerManagerImpl final : public PeerManager void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void BlockChecked(const std::shared_ptr& block, const BlockValidationState& state) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr& pblock) override EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex); /** Implement NetEventsInterface */ void InitializeNode(const CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_tx_download_mutex); - void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, !m_tx_download_mutex); + void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, !m_tx_download_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); bool HasAllDesirableServiceFlags(ServiceFlags services) const override; bool ProcessMessages(CNode& node, std::atomic& interrupt) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex); @@ -530,10 +536,10 @@ class PeerManagerImpl final : public PeerManager /** Implement PeerManager */ void StartScheduledTasks(CScheduler& scheduler) override; - void CheckForStaleTipAndEvictPeers() override; + void CheckForStaleTipAndEvictPeers() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); util::Expected FetchBlock(NodeId peer_id, const CBlockIndex& block_index) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); + bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); std::vector GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex); PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); std::vector GetPrivateBroadcastInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -553,13 +559,13 @@ class PeerManagerImpl final : public PeerManager private: void ProcessMessage(Peer& peer, CNode& pfrom, const std::string& msg_type, DataStream& vRecv, NodeClock::time_point time_received, const std::atomic& interruptMsgProc) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex); /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ - void EvictExtraOutboundPeers(NodeClock::time_point now) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void EvictExtraOutboundPeers(NodeClock::time_point now) EXCLUSIVE_LOCKS_REQUIRED(cs_main, cs_blocks_in_flight, !m_peer_mutex); /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -711,7 +717,7 @@ class PeerManagerImpl final : public PeerManager */ bool MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& locator, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Potentially fetch blocks from this peer upon receipt of a new headers tip */ - void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex& last_header); + void HeadersDirectFetchBlocks(CNode& pfrom, Peer& peer, const CBlockIndex& last_header) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); /** Update peer state based on received headers message */ void UpdatePeerStateForReceivedHeaders(CNode& pfrom, Peer& peer, const CBlockIndex& last_header, bool received_new_header, bool may_have_more_headers) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); @@ -896,10 +902,10 @@ class PeerManagerImpl final : public PeerManager int m_highest_fast_announce GUARDED_BY(::cs_main){0}; /** Have we requested this block from a peer */ - bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_blocks_in_flight); /** Have we requested this block from an outbound peer */ - bool IsBlockRequestedFromOutbound(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); + bool IsBlockRequestedFromOutbound(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_blocks_in_flight, !m_peer_mutex); /** Remove this block from our tracked requested blocks. Called if: * - the block has been received from a peer @@ -908,23 +914,23 @@ class PeerManagerImpl final : public PeerManager * flight from that peer (to avoid one peer's network traffic from * affecting another's state). */ - void RemoveBlockRequest(const uint256& hash, std::optional from_peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void RemoveBlockRequest(const uint256& hash, std::optional from_peer) EXCLUSIVE_LOCKS_REQUIRED(cs_blocks_in_flight, !m_peer_mutex); /* Mark a block as in flight * Returns false, still setting pit, if the block was already in flight from the same peer - * pit will only be valid as long as the same cs_main lock is being held + * pit will only be valid as long as the same cs_blocks_in_flight lock is being held */ - bool BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + bool BlockRequested(Peer& peer, const CBlockIndex& block, std::list::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_blocks_in_flight, !m_peer_mutex); - bool TipMayBeStale() EXCLUSIVE_LOCKS_REQUIRED(cs_main); + bool TipMayBeStale() EXCLUSIVE_LOCKS_REQUIRED(cs_main, cs_blocks_in_flight); /** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has * at most count entries. */ - void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main, cs_blocks_in_flight); /** Request blocks for the background chainstate, if one is in use. */ - void TryDownloadingHistoricalBlocks(const Peer& peer, unsigned int count, std::vector& vBlocks, const CBlockIndex* from_tip, const CBlockIndex* target_block) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void TryDownloadingHistoricalBlocks(const Peer& peer, unsigned int count, std::vector& vBlocks, const CBlockIndex* from_tip, const CBlockIndex* target_block) EXCLUSIVE_LOCKS_REQUIRED(cs_main, cs_blocks_in_flight); /** * \brief Find next blocks to download from a peer after a starting block. @@ -953,11 +959,11 @@ class PeerManagerImpl final : public PeerManager * block in the window is in flight and no other peer is * trying to download the next block). */ - void FindNextBlocks(std::vector& vBlocks, const Peer& peer, CNodeState *state, const CBlockIndex *pindexWalk, unsigned int count, int nWindowEnd, const CChain* activeChain=nullptr, NodeId* nodeStaller=nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void FindNextBlocks(std::vector& vBlocks, const Peer& peer, CNodeState *state, const CBlockIndex *pindexWalk, unsigned int count, int nWindowEnd, const CChain* activeChain=nullptr, NodeId* nodeStaller=nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main, cs_blocks_in_flight); /* Multimap used to preserve insertion order */ typedef std::multimap::iterator>> BlockDownloadMap; - BlockDownloadMap mapBlocksInFlight GUARDED_BY(cs_main); + BlockDownloadMap mapBlocksInFlight GUARDED_BY(cs_blocks_in_flight); /** When our tip was last updated. */ std::atomic m_last_tip_update{0s}; @@ -971,11 +977,11 @@ class PeerManagerImpl final : public PeerManager LOCKS_EXCLUDED(::cs_main); /** Process a new block. Perform any post-processing housekeeping */ - void ProcessBlock(CNode& node, const std::shared_ptr& block, bool force_processing, bool min_pow_checked); + void ProcessBlock(CNode& node, const std::shared_ptr& block, bool force_processing, bool min_pow_checked) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); /** Process compact block txns */ void ProcessCompactBlockTxns(CNode& pfrom, Peer& peer, const BlockTransactions& block_transactions) - EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex); + EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex, !m_peer_mutex) LOCKS_EXCLUDED(cs_blocks_in_flight); /** * Schedule an INV for a transaction to be sent to the given peer (via `PushMessage()`). @@ -997,7 +1003,7 @@ class PeerManagerImpl final : public PeerManager std::list lNodesAnnouncingHeaderAndIDs GUARDED_BY(cs_main); /** Number of peers from which we're downloading blocks. */ - int m_peers_downloading_from GUARDED_BY(cs_main) = 0; + int m_peers_downloading_from GUARDED_BY(cs_blocks_in_flight) = 0; void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); @@ -1194,11 +1200,16 @@ std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::micros bool PeerManagerImpl::IsBlockRequested(const uint256& hash) { + AssertLockHeld(cs_blocks_in_flight); + return mapBlocksInFlight.contains(hash); } bool PeerManagerImpl::IsBlockRequestedFromOutbound(const uint256& hash) { + AssertLockHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); + for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) { auto [nodeid, block_it] = range.first->second; PeerRef peer{GetPeerRef(nodeid)}; @@ -1210,6 +1221,9 @@ bool PeerManagerImpl::IsBlockRequestedFromOutbound(const uint256& hash) void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optional from_peer) { + AssertLockHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); + auto range = mapBlocksInFlight.equal_range(hash); if (range.first == range.second) { // Block was not requested from any peer @@ -1227,36 +1241,36 @@ void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optionalvBlocksInFlight.begin() == list_it) { // First block on the queue was received, update the start download time for the next one - state.m_downloading_since = std::max(state.m_downloading_since, GetTime()); + peer->m_downloading_since = std::max(peer->m_downloading_since, GetTime()); } - state.vBlocksInFlight.erase(list_it); + peer->vBlocksInFlight.erase(list_it); - if (state.vBlocksInFlight.empty()) { + if (peer->vBlocksInFlight.empty()) { // Last validated block on the queue for this peer was received. m_peers_downloading_from--; } - state.m_stalling_since = 0us; + peer->m_stalling_since = 0us; range.first = mapBlocksInFlight.erase(range.first); } } -bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator** pit) +bool PeerManagerImpl::BlockRequested(Peer& peer, const CBlockIndex& block, std::list::iterator** pit) { - const uint256& hash{block.GetBlockHash()}; + AssertLockHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); - CNodeState *state = State(nodeid); - assert(state != nullptr); + const uint256& hash{block.GetBlockHash()}; Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK); // Short-circuit most stuff in case it is from the same node for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) { - if (range.first->second.first == nodeid) { + if (range.first->second.first == peer.m_id) { if (pit) { *pit = &range.first->second.second; } @@ -1265,16 +1279,16 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st } // Make sure it's not being fetched already from same peer. - RemoveBlockRequest(hash, nodeid); + RemoveBlockRequest(hash, peer.m_id); - std::list::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), + std::list::iterator it = peer.vBlocksInFlight.insert(peer.vBlocksInFlight.end(), {&block, std::unique_ptr(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)}); - if (state->vBlocksInFlight.size() == 1) { + if (peer.vBlocksInFlight.size() == 1) { // We're starting a block download (batch) from this peer. - state->m_downloading_since = GetTime(); + peer.m_downloading_since = GetTime(); m_peers_downloading_from++; } - auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))); + auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(peer.m_id, it))); if (pit) { *pit = &itInFlight->second.second; } @@ -1343,6 +1357,8 @@ void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) bool PeerManagerImpl::TipMayBeStale() { AssertLockHeld(cs_main); + AssertLockHeld(cs_blocks_in_flight); + const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); if (m_last_tip_update.load() == 0s) { m_last_tip_update = GetTime(); @@ -1405,6 +1421,8 @@ void PeerManagerImpl::UpdateBlockAvailability(NodeId nodeid, const uint256 &hash // Logic for calculating which blocks to download from a given peer, given our current tip. void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) { + AssertLockHeld(cs_blocks_in_flight); + if (count == 0) return; @@ -1455,6 +1473,8 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co void PeerManagerImpl::TryDownloadingHistoricalBlocks(const Peer& peer, unsigned int count, std::vector& vBlocks, const CBlockIndex *from_tip, const CBlockIndex* target_block) { + AssertLockHeld(cs_blocks_in_flight); + Assert(from_tip); Assert(target_block); @@ -1484,6 +1504,8 @@ void PeerManagerImpl::TryDownloadingHistoricalBlocks(const Peer& peer, unsigned void PeerManagerImpl::FindNextBlocks(std::vector& vBlocks, const Peer& peer, CNodeState *state, const CBlockIndex *pindexWalk, unsigned int count, int nWindowEnd, const CChain* activeChain, NodeId* nodeStaller) { + AssertLockHeld(cs_blocks_in_flight); + std::vector vToFetch; int nMaxHeight = std::min(state->pindexBestKnownBlock->nHeight, nWindowEnd + 1); bool is_limited_peer = IsLimitedPeer(peer); @@ -1686,9 +1708,11 @@ void PeerManagerImpl::ReattemptPrivateBroadcast(CScheduler& scheduler) void PeerManagerImpl::FinalizeNode(const CNode& node) { + AssertLockNotHeld(cs_blocks_in_flight); + NodeId nodeid = node.GetId(); { - LOCK(cs_main); + LOCK2(cs_main, cs_blocks_in_flight); { // We remove the PeerRef from g_peer_map here, but we don't always // destruct the Peer. Sometimes another thread is still holding a @@ -1699,6 +1723,19 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) assert(peer != nullptr); m_wtxid_relay_peers -= peer->m_wtxid_relay; assert(m_wtxid_relay_peers >= 0); + for (const QueuedBlock& entry : peer->vBlocksInFlight) { + auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash()); + while (range.first != range.second) { + auto [node_id, list_it] = range.first->second; + if (node_id != nodeid) { + range.first++; + } else { + range.first = mapBlocksInFlight.erase(range.first); + } + } + } + m_peers_downloading_from -= (!peer->vBlocksInFlight.empty()); + assert(m_peers_downloading_from >= 0); } CNodeState *state = State(nodeid); assert(state != nullptr); @@ -1706,25 +1743,12 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) if (state->fSyncStarted) nSyncStarted--; - for (const QueuedBlock& entry : state->vBlocksInFlight) { - auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash()); - while (range.first != range.second) { - auto [node_id, list_it] = range.first->second; - if (node_id != nodeid) { - range.first++; - } else { - range.first = mapBlocksInFlight.erase(range.first); - } - } - } { LOCK(m_tx_download_mutex); m_txdownloadman.DisconnectedPeer(nodeid); } if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid); m_num_preferred_download_peers -= state->fPreferredDownload; - m_peers_downloading_from -= (!state->vBlocksInFlight.empty()); - assert(m_peers_downloading_from >= 0); m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; assert(m_outbound_peers_with_protect_from_disconnect >= 0); @@ -1739,7 +1763,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) assert(m_wtxid_relay_peers == 0); WITH_LOCK(m_tx_download_mutex, m_txdownloadman.CheckIsEmpty()); } - } // cs_main + } // cs_main, cs_blocks_in_flight if (node.fSuccessfullyConnected && !node.IsBlockOnlyConn() && !node.IsPrivateBroadcastConn() && !node.IsInboundConn()) { // Only change visible addrman state for full outbound peers. We don't @@ -1810,6 +1834,8 @@ std::vector PeerManagerImpl::GetAllPeers() const bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const { + AssertLockNotHeld(cs_blocks_in_flight); + { LOCK(cs_main); const CNodeState* state = State(nodeid); @@ -1817,14 +1843,17 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c return false; stats.nSyncHeight = state->pindexBestKnownBlock ? state->pindexBestKnownBlock->nHeight : -1; stats.nCommonHeight = state->pindexLastCommonBlock ? state->pindexLastCommonBlock->nHeight : -1; - for (const QueuedBlock& queue : state->vBlocksInFlight) { - if (queue.pindex) - stats.vHeightInFlight.push_back(queue.pindex->nHeight); - } } PeerRef peer = GetPeerRef(nodeid); if (peer == nullptr) return false; + { + LOCK(cs_blocks_in_flight); + for (const QueuedBlock& queue : peer->vBlocksInFlight) { + if (queue.pindex) + stats.vHeightInFlight.push_back(queue.pindex->nHeight); + } + } stats.their_services = peer->m_their_services; // It is common for nodes with good ping times to suddenly become lagged, // due to a new block arriving or other large transfer. @@ -1984,13 +2013,15 @@ bool PeerManagerImpl::BlockRequestAllowed(const CBlockIndex& block_index) util::Expected PeerManagerImpl::FetchBlock(NodeId peer_id, const CBlockIndex& block_index) { + AssertLockNotHeld(cs_blocks_in_flight); + if (m_chainman.m_blockman.LoadingBlocks()) return util::Unexpected{"Loading blocks ..."}; // The lock must be taken here before fetching Peer so another thread does // not delete the CNodeState from under the current thread, causing an // assertion failure in BlockRequested. This lock can be replaced with a // net-specific lock when more of CNodeState is moved into Peer. - LOCK(cs_main); + LOCK2(cs_main, cs_blocks_in_flight); // Ensure this peer exists and hasn't been disconnected PeerRef peer = GetPeerRef(peer_id); @@ -2003,7 +2034,7 @@ util::Expected PeerManagerImpl::FetchBlock(NodeId peer_id, co RemoveBlockRequest(block_index.GetBlockHash(), std::nullopt); // Mark block as in-flight - if (!BlockRequested(peer_id, block_index)) return util::Unexpected{"Already requested from this peer"}; + if (!BlockRequested(*peer, block_index)) return util::Unexpected{"Already requested from this peer"}; // Construct message to request the block const uint256& hash{block_index.GetBlockHash()}; @@ -2224,7 +2255,9 @@ void PeerManagerImpl::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlock */ void PeerManagerImpl::BlockChecked(const std::shared_ptr& block, const BlockValidationState& state) { - LOCK(cs_main); + AssertLockNotHeld(cs_blocks_in_flight); + + LOCK2(cs_main, cs_blocks_in_flight); const uint256 hash(block->GetHash()); std::map>::iterator it = mapBlockSource.find(hash); @@ -2869,9 +2902,12 @@ bool PeerManagerImpl::MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& loc * We require that the given tip have at least as much work as our tip, and for * our current tip to be "close to synced" (see CanDirectFetch()). */ -void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex& last_header) +void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, Peer& peer, const CBlockIndex& last_header) { - LOCK(cs_main); + AssertLockNotHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); + + LOCK2(cs_main, cs_blocks_in_flight); CNodeState *nodestate = State(pfrom.GetId()); if (CanDirectFetch() && last_header.IsValid(BLOCK_VALID_TREE) && m_chainman.ActiveChain().Tip()->nChainWork <= last_header.nChainWork) { @@ -2900,13 +2936,13 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c std::vector vGetData; // Download as much as possible, from earliest to latest. for (const CBlockIndex* pindex : vToFetch | std::views::reverse) { - if (nodestate->vBlocksInFlight.size() >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (peer.vBlocksInFlight.size() >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { // Can't download any more from this peer break; } uint32_t nFetchFlags = GetFetchFlags(peer); vGetData.emplace_back(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash()); - BlockRequested(pfrom.GetId(), *pindex); + BlockRequested(peer, *pindex); LogDebug(BCLog::NET, "Requesting block %s from peer=%d", pindex->GetBlockHash().ToString(), pfrom.GetId()); } @@ -3454,6 +3490,9 @@ void PeerManagerImpl::ProcessGetCFCheckPt(CNode& node, Peer& peer, DataStream& v void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptr& block, bool force_processing, bool min_pow_checked) { + AssertLockNotHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); + bool new_block{false}; m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block); if (new_block) { @@ -3461,7 +3500,7 @@ void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptrGetHash(), std::nullopt); } else { LOCK(cs_main); @@ -3471,10 +3510,13 @@ void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptr pblock = std::make_shared(); bool fBlockRead{false}; { - LOCK(cs_main); + LOCK2(cs_main, cs_blocks_in_flight); auto range_flight = mapBlocksInFlight.equal_range(block_transactions.blockhash); size_t already_in_flight = std::distance(range_flight.first, range_flight.second); @@ -3605,6 +3647,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string const std::atomic& interruptMsgProc) { AssertLockHeld(g_msgproc_mutex); + AssertLockNotHeld(cs_blocks_in_flight); LogDebug(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); @@ -4142,7 +4185,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string LogDebug(BCLog::NET, "got inv: %s %s peer=%d", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); UpdateBlockAvailability(pfrom.GetId(), inv.hash); - if (!fAlreadyHave && !m_chainman.m_blockman.LoadingBlocks() && !IsBlockRequested(inv.hash)) { + if (!fAlreadyHave && !m_chainman.m_blockman.LoadingBlocks() && WITH_LOCK(cs_blocks_in_flight, return !IsBlockRequested(inv.hash))) { // Headers-first is the primary method of announcement on // the network. If a node fell back to sending blocks by // inv, it may be for a re-org, or because we haven't @@ -4600,7 +4643,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string bool fBlockReconstructed = false; { - LOCK(cs_main); + LOCK2(cs_main, cs_blocks_in_flight); UpdateBlockAvailability(pfrom.GetId(), pindex->GetBlockHash()); CNodeState *nodestate = State(pfrom.GetId()); @@ -4649,10 +4692,10 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string // We want to be a bit conservative just to be extra careful about DoS // possibilities in compact block processing... if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) { - if ((already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || + if ((already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK && peer.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || requested_block_from_this_peer) { std::list::iterator* queuedBlockIt = nullptr; - if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) { + if (!BlockRequested(peer, *pindex, &queuedBlockIt)) { if (!(*queuedBlockIt)->partialBlock) (*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&m_mempool)); else { @@ -4740,7 +4783,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string fRevertToHeaderProcessing = true; } } - } // cs_main + } // cs_main, cs_blocks_in_flight if (fProcessBLOCKTXN) { BlockTransactions txn; @@ -4780,6 +4823,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string // process from some other peer. We do this after calling // ProcessNewBlock so that a malleated cmpctblock announcement // can't be used to interfere with block relay. + LOCK(cs_blocks_in_flight); RemoveBlockRequest(pblock->GetHash(), std::nullopt); } } @@ -4861,7 +4905,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string /*check_witness_root=*/DeploymentActiveAfter(prev_block, m_chainman, Consensus::DEPLOYMENT_SEGWIT))) { LogDebug(BCLog::NET, "Received mutated block from peer=%d\n", peer.m_id); Misbehaving(peer, "mutated block"); - WITH_LOCK(cs_main, RemoveBlockRequest(pblock->GetHash(), peer.m_id)); + WITH_LOCK(cs_blocks_in_flight, RemoveBlockRequest(pblock->GetHash(), peer.m_id)); return; } @@ -4869,7 +4913,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string const uint256 hash(pblock->GetHash()); bool min_pow_checked = false; { - LOCK(cs_main); + LOCK2(cs_main, cs_blocks_in_flight); // Always process the block if we requested it, since we may // need it even when it's not a candidate for a new best tip. forceProcessing = IsBlockRequested(hash); @@ -5282,6 +5326,9 @@ void PeerManagerImpl::ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seco void PeerManagerImpl::EvictExtraOutboundPeers(NodeClock::time_point now) { + AssertLockHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); + // If we have any extra block-relay-only peers, disconnect the youngest unless // it's given us a block -- in which case, compare with the second-youngest, and // out of those two, disconnect the peer who least recently gave us a block. @@ -5305,23 +5352,24 @@ void PeerManagerImpl::EvictExtraOutboundPeers(NodeClock::time_point now) // disconnect our second youngest. to_disconnect = next_youngest_peer.first; } - m_connman.ForNode(to_disconnect, [&](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { - AssertLockHeld(::cs_main); + m_connman.ForNode(to_disconnect, [&](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_blocks_in_flight, !m_peer_mutex) { + AssertLockHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); // Make sure we're not getting a block right now, and that // we've been connected long enough for this eviction to happen // at all. // Note that we only request blocks from a peer if we learn of a // valid headers chain with at least as much work as our tip. - CNodeState *node_state = State(pnode->GetId()); - if (node_state == nullptr || - (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->vBlocksInFlight.empty())) { + PeerRef peer_ref{GetPeerRef(pnode->GetId())}; + if (peer_ref == nullptr || + (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && peer_ref->vBlocksInFlight.empty())) { pnode->fDisconnect = true; LogDebug(BCLog::NET, "disconnecting extra block-relay-only peer=%d (last block received at time %d)\n", pnode->GetId(), count_seconds(pnode->m_last_block_time)); return true; } else { LogDebug(BCLog::NET, "keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), TicksSinceEpoch(pnode->m_connected), node_state->vBlocksInFlight.size()); + pnode->GetId(), TicksSinceEpoch(pnode->m_connected), peer_ref->vBlocksInFlight.size()); } return false; }); @@ -5357,22 +5405,23 @@ void PeerManagerImpl::EvictExtraOutboundPeers(NodeClock::time_point now) } }); if (worst_peer != -1) { - bool disconnected = m_connman.ForNode(worst_peer, [&](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { - AssertLockHeld(::cs_main); + bool disconnected = m_connman.ForNode(worst_peer, [&](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_blocks_in_flight, !m_peer_mutex) { + AssertLockHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); // Only disconnect a peer that has been connected to us for // some reasonable fraction of our check-frequency, to give // it time for new information to have arrived. // Also don't disconnect any peer we're trying to download a // block from. - CNodeState &state = *State(pnode->GetId()); - if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.vBlocksInFlight.empty()) { + PeerRef peer{Assume(GetPeerRef(pnode->GetId()))}; + if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && peer->vBlocksInFlight.empty()) { LogDebug(BCLog::NET, "disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->GetId(), oldest_block_announcement); pnode->fDisconnect = true; return true; } else { LogDebug(BCLog::NET, "keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), TicksSinceEpoch(pnode->m_connected), state.vBlocksInFlight.size()); + pnode->GetId(), TicksSinceEpoch(pnode->m_connected), peer->vBlocksInFlight.size()); return false; } }); @@ -5390,7 +5439,10 @@ void PeerManagerImpl::EvictExtraOutboundPeers(NodeClock::time_point now) void PeerManagerImpl::CheckForStaleTipAndEvictPeers() { - LOCK(cs_main); + AssertLockNotHeld(cs_blocks_in_flight); + AssertLockNotHeld(m_peer_mutex); + + LOCK2(cs_main, cs_blocks_in_flight); const auto current_time{NodeClock::now()}; auto now{GetTime()}; @@ -5869,7 +5921,7 @@ bool PeerManagerImpl::SendMessages(CNode& node) // the latest blocks is from an inbound peer, we have to be sure to // eventually download it (and not just wait indefinitely for an // outbound peer to have it). - if (m_num_preferred_download_peers == 0 || mapBlocksInFlight.empty()) { + if (m_num_preferred_download_peers == 0 || WITH_LOCK(cs_blocks_in_flight, return mapBlocksInFlight.empty())) { sync_blocks_and_headers_from_peer = true; } } @@ -6171,9 +6223,14 @@ bool PeerManagerImpl::SendMessages(CNode& node) if (!vInv.empty()) MakeAndPushMessage(node, NetMsgType::INV, vInv); + std::vector vGetData; + + { + LOCK(cs_blocks_in_flight); + // Detect whether we're stalling auto stalling_timeout = m_block_stalling_timeout.load(); - if (state.m_stalling_since.count() && state.m_stalling_since < current_time - stalling_timeout) { + if (peer.m_stalling_since.count() && peer.m_stalling_since < current_time - stalling_timeout) { // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // should only happen during initial block download. @@ -6192,10 +6249,10 @@ bool PeerManagerImpl::SendMessages(CNode& node) // We compensate for other peers to prevent killing off peers due to our own downstream link // being saturated. We only count validated in-flight blocks so peers can't advertise non-existing block hashes // to unreasonably increase our timeout. - if (state.vBlocksInFlight.size() > 0) { - QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); + if (peer.vBlocksInFlight.size() > 0) { + QueuedBlock &queuedBlock = peer.vBlocksInFlight.front(); int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1; - if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { + if (current_time > peer.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { LogInfo("Timeout downloading block %s, %s", queuedBlock.pindex->GetBlockHash().ToString(), node.DisconnectMsg()); node.fDisconnect = true; return true; @@ -6241,12 +6298,11 @@ bool PeerManagerImpl::SendMessages(CNode& node) // // Message: getdata (blocks) // - std::vector vGetData; - if (CanServeBlocks(peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(peer)) || !m_chainman.IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (CanServeBlocks(peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(peer)) || !m_chainman.IsInitialBlockDownload()) && peer.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { std::vector vToDownload; NodeId staller = -1; - auto get_inflight_budget = [&state]() { - return std::max(0, MAX_BLOCKS_IN_TRANSIT_PER_PEER - static_cast(state.vBlocksInFlight.size())); + auto get_inflight_budget = [&peer]() EXCLUSIVE_LOCKS_REQUIRED(cs_blocks_in_flight) { + return std::max(0, MAX_BLOCKS_IN_TRANSIT_PER_PEER - static_cast(peer.vBlocksInFlight.size())); }; // If there are multiple chainstates, download blocks for the @@ -6266,17 +6322,18 @@ bool PeerManagerImpl::SendMessages(CNode& node) for (const CBlockIndex *pindex : vToDownload) { uint32_t nFetchFlags = GetFetchFlags(peer); vGetData.emplace_back(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash()); - BlockRequested(node.GetId(), *pindex); + BlockRequested(peer, *pindex); LogDebug(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->nHeight, node.GetId()); } - if (state.vBlocksInFlight.empty() && staller != -1) { - if (State(staller)->m_stalling_since == 0us) { - State(staller)->m_stalling_since = current_time; + if (peer.vBlocksInFlight.empty() && staller != -1) { + if (GetPeerRef(staller)->m_stalling_since == 0us) { + GetPeerRef(staller)->m_stalling_since = current_time; LogDebug(BCLog::NET, "Stall started peer=%d\n", staller); } } } + } // cs_blocks_in_flight // // Message: getdata (transactions) diff --git a/src/sync.cpp b/src/sync.cpp index 6d740866d498..36035bfb93e6 100644 --- a/src/sync.cpp +++ b/src/sync.cpp @@ -289,6 +289,7 @@ void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, } template void AssertLockHeldInternal(const char*, const char*, int, Mutex*); template void AssertLockHeldInternal(const char*, const char*, int, RecursiveMutex*); +template void AssertLockHeldInternal(const char*, const char*, int, GlobalMutex*); template void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, MutexType* cs) @@ -299,6 +300,7 @@ void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLi } template void AssertLockNotHeldInternal(const char*, const char*, int, Mutex*); template void AssertLockNotHeldInternal(const char*, const char*, int, RecursiveMutex*); +template void AssertLockNotHeldInternal(const char*, const char*, int, GlobalMutex*); void DeleteLock(void* cs) {