-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsensus.py
More file actions
404 lines (347 loc) · 25 KB
/
consensus.py
File metadata and controls
404 lines (347 loc) · 25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# consensus.py
"""
Module for implementing the synchronous, round-based consensus protocol (crash fault-tolerant).
"""
import threading
import socket
import json
import time
import math
import sys
import logging
from network import send_message
from blockchain import create_block, add_block, get_confirmed_nonce
from mempool import get_all_transactions, remove_transactions, pool_snapshot
from validation import is_valid_nonce
# Get logger
logger = logging.getLogger(__name__)
# Timeout for waiting for proposals from peers during the exchange phase
PROPOSAL_COLLECTION_TIMEOUT = 2.0 # seconds
PEER_RESPONSE_TIMEOUT = 2.0 # seconds per peer
class ConsensusThread(threading.Thread):
"""
Consensus thread that waits for triggers and runs consensus rounds.
"""
def __init__(self, peers: dict[str, socket.socket], n_total_nodes: int, peers_lock: threading.Lock):
super().__init__(daemon=True)
self.peers = peers
self.n_total_nodes = n_total_nodes # Total number of nodes in the network
self.peers_lock = peers_lock # Store the lock
self.all_proposals_by_round: dict[int, dict[str, dict]] = {}
self.all_proposals_lock = threading.Lock()
self.round_lock = threading.Lock() # Protects round_in_progress, round_counter, AND work_to_do
self.consensus_event = threading.Event() # Event for signaling consensus round
self.round_counter = 0 # Initialize round identifier
self.current_round_event = None # Will be initialized for each round
self.expected_responses = 0 # Number of responses expected for current round
self.received_responses = 0 # Number of responses received for current round
self.round_sync_lock = threading.Lock() # Protects the above variables
# Proposal submission gate event
self.proposals_submission_gate_event = threading.Event()
self.proposals_gate_lock = threading.Lock() # Lock to protect the gate event
self.peers_response_status = {} # Track response status for each peer: {peer_id: {responded: bool, timeout: float}}
def record_peer_proposal(self, peer_id: str, proposal: dict, round_id: int):
"""
Called by PeerHandlerThread to store a proposal received via an incoming 'values' message.
Stores the proposal in a round-specific, peer-specific way.
If the proposal pertains to the currently active consensus round and its gate is open,
it also updates the response counter and possibly signals the round event.
"""
# Part 1: Store the proposal if it's valid, regardless of the active round's gate status.
# This ensures that if a peer sends a proposal (e.g., in a values_request) before our
# consensus round is fully ready for it, we still capture that proposal for the specified round.
if proposal: # Only store if the proposal is not None
with self.all_proposals_lock:
if round_id not in self.all_proposals_by_round:
self.all_proposals_by_round[round_id] = {}
# Only store new proposals; ignore duplicates for the same peer and round.
if peer_id not in self.all_proposals_by_round[round_id]:
self.all_proposals_by_round[round_id][peer_id] = proposal
logger.debug(f"Consensus [Round {round_id}]: Stored NEW proposal from {peer_id} into all_proposals_by_round (potentially early).")
else:
logger.debug(f"Consensus [Round {round_id}]: IGNORED duplicate proposal from {peer_id} for round {round_id} as one already exists.")
# Part 2: If this proposal corresponds to the *active* consensus round (self.round_counter)
# and the proposal submission gate for that active round is open, then also count it
# towards the completion of the current round's proposal collection.
gate_is_open_for_active_round = False
with self.proposals_gate_lock:
# Check against current consensus round counter AND if the gate is open for *that* specific round
if round_id == self.round_counter and self.proposals_submission_gate_event.is_set():
gate_is_open_for_active_round = True
if gate_is_open_for_active_round:
logger.debug(f"Consensus [Round {self.round_counter}]: Gate open for active round. Processing {peer_id}'s proposal for response counting.")
# The proposal should already be in self.all_proposals_by_round[round_id][peer_id] from Part 1 if it was not None.
# This logic is for response counting for the *active* round.
with self.round_sync_lock:
# Ensure we are indeed working with the current_round_event for the active round.
if round_id == self.round_counter and self.current_round_event is not None:
# Check if this peer is part of the current round's expected responders
# and hasn't been marked as responded yet for this active round.
if peer_id in self.peers_response_status and not self.peers_response_status[peer_id]['responded']:
self.peers_response_status[peer_id]['responded'] = True
self.received_responses += 1
logger.debug(f"Consensus [Round {self.round_counter}]: Active round response from {peer_id} recorded. Received {self.received_responses}/{self.expected_responses}.")
if self.received_responses >= self.expected_responses:
logger.debug(f"Consensus [Round {self.round_counter}]: All expected responses for active round received. Setting event.")
if self.current_round_event: self.current_round_event.set()
# else:
# If peer_id not in self.peers_response_status (e.g., crashed before _run_round init for them)
# or already marked as responded, no double counting.
# logger.debug(f"Consensus [Round {self.round_counter}]: Peer {peer_id} not in active response status or already responded for this round. No change to active round count from this path.")
# else:
# If gate not open for the active round, or round_id doesn't match self.round_counter,
# the proposal (if valid and stored in Part 1) doesn't affect the *active* round's response counting here.
# logger.debug(f"Consensus: Proposal from {peer_id} for round {round_id} (now in all_proposals_by_round if not None). Not processed for active round {self.round_counter} response counting via this path (gate/round mismatch).")
def run(self) -> None:
"""
Main loop: wait for a signal (via consensus_event and work_to_do flag),
then execute a consensus round if no other round is currently in progress.
"""
while True:
self.consensus_event.wait()
with self.round_lock:
self.round_counter += 1
current_round_id = self.round_counter
logger.info(f"[Round {current_round_id}] Start")
try:
self._run_round(current_round_id)
finally:
#with self.round_lock:
#logger.debug(f"Consensus: Round {current_round_id} finished. round_in_progress set to False. work_to_do={self.work_to_do}")
self.consensus_event.clear()
def _run_round(self, current_round_id: int) -> None:
"""
Execute one propose-exchange-decide-commit round for the given round_id.
PeerHandlerThreads are responsible for receiving 'values' messages and populating
all_proposals_by_round. This thread sends its proposal and then checks that dict.
Now, it also checks for pre-existing proposals for the current round before sending requests.
"""
logger.debug(f"Consensus [Round {current_round_id}]: _run_round started. n_total_nodes={self.n_total_nodes}") # IMPORTANT
with self.proposals_gate_lock:
self.proposals_submission_gate_event.clear() # Ensure gate is closed initially for this new round logic
active_peers_at_round_start: dict[str, socket.socket] = {}
with self.peers_lock:
active_peers_at_round_start = dict(self.peers)
# Check for pre-existing proposals for this current_round_id
# that might have been stored by record_peer_proposal before this round officially started.
pre_existing_proposals_for_this_round = {}
with self.all_proposals_lock:
if current_round_id in self.all_proposals_by_round:
pre_existing_proposals_for_this_round = dict(self.all_proposals_by_round[current_round_id])
initial_received_count = 0
peers_who_already_proposed = set()
with self.round_sync_lock:
self.current_round_event = threading.Event()
self.received_responses = 0 # Reset for the current round
self.expected_responses = 0 # Will be set based on peers we actually send to + pre-existing
self.peers_response_status = {} # Reset for the current round
for peer_id in list(active_peers_at_round_start.keys()): # Iterate over a copy of keys
if peer_id in pre_existing_proposals_for_this_round and pre_existing_proposals_for_this_round[peer_id] is not None:
# This peer has already provided a valid proposal for this round.
logger.debug(f"Consensus [Round {current_round_id}]: Peer {peer_id} has a pre-existing proposal. Counting it.")
self.peers_response_status[peer_id] = {
'responded': True,
'timeout': 0 # Not relevant as already responded
}
initial_received_count += 1
peers_who_already_proposed.add(peer_id)
else:
# This peer has not yet provided a proposal for this round, or it was None.
# We will expect a response from them if we successfully send a request.
self.peers_response_status[peer_id] = {
'responded': False,
'timeout': time.time() + PEER_RESPONSE_TIMEOUT
}
self.received_responses = initial_received_count
# Expected responses will be this initial count + those we send to and don't crash on send
self.expected_responses = initial_received_count
logger.debug(f"Consensus [Round {current_round_id}]: Initialized. Pre-received: {initial_received_count}. Active peers: {len(active_peers_at_round_start)}")
# Open the gate *after* initializing response statuses and checking for pre-existing proposals.
with self.proposals_gate_lock:
self.proposals_submission_gate_event.set()
logger.debug(f"Consensus [Round {current_round_id}]: Proposal submission gate is now open.") # Less critical
my_transactions = get_all_transactions()
my_proposal = create_block(my_transactions)
peers_to_send_request_to = {
pid: sock for pid, sock in active_peers_at_round_start.items()
if pid not in peers_who_already_proposed
}
logger.debug(f"Consensus [Round {current_round_id}]: Will send 'values_request' to {len(peers_to_send_request_to)} peers who haven't proposed yet.")
# Increment expected_responses for peers we are about to send to.
with self.round_sync_lock:
self.expected_responses += len(peers_to_send_request_to)
if self.expected_responses == 0 or (self.received_responses >= self.expected_responses and self.expected_responses > 0): # Covers case where all proposed early or no one to send to
logger.debug(f"Consensus [Round {current_round_id}]: All expected responses accounted for (early or none to send). Setting event.")
if self.current_round_event: self.current_round_event.set()
peers_crashed_on_send = []
if not (self.current_round_event and self.current_round_event.is_set()):
for peer_id, sock in peers_to_send_request_to.items():
if sock.fileno() == -1:
logger.debug(f"Consensus [Round {current_round_id}]: Socket for {peer_id} already closed before send.")
peers_crashed_on_send.append(peer_id)
with self.round_sync_lock:
self.expected_responses = max(0, self.expected_responses - 1)
if peer_id in self.peers_response_status: del self.peers_response_status[peer_id]
if self.current_round_event and (self.expected_responses == 0 or (self.expected_responses > 0 and self.received_responses >= self.expected_responses)):
if self.current_round_event: self.current_round_event.set()
continue
try:
send_message(sock, {"type": "values_request", "payload": {"proposal": my_proposal, "round": current_round_id}})
except (ConnectionError, OSError, socket.error) as e_conn:
logger.debug(f"Consensus [Round {current_round_id}]: Connection error sending to {peer_id}: {e_conn!r}.")
peers_crashed_on_send.append(peer_id)
with self.round_sync_lock:
self.expected_responses = max(0, self.expected_responses - 1)
if peer_id in self.peers_response_status: del self.peers_response_status[peer_id]
if self.current_round_event and (self.expected_responses == 0 or (self.expected_responses > 0 and self.received_responses >= self.expected_responses)):
if self.current_round_event: self.current_round_event.set()
except Exception as e:
logger.debug(f"Consensus [Round {current_round_id}]: Generic exception sending to {peer_id}: {e!r}.")
peers_crashed_on_send.append(peer_id)
with self.round_sync_lock:
self.expected_responses = max(0, self.expected_responses - 1)
if peer_id in self.peers_response_status: del self.peers_response_status[peer_id]
if self.current_round_event and (self.expected_responses == 0 or (self.expected_responses > 0 and self.received_responses >= self.expected_responses)):
if self.current_round_event: self.current_round_event.set()
if peers_crashed_on_send:
sockets_to_close_send_crash = []
with self.peers_lock:
for peer_id_remove in peers_crashed_on_send:
removed_sock = self.peers.pop(peer_id_remove, None)
if removed_sock: sockets_to_close_send_crash.append(removed_sock)
for sock_close in sockets_to_close_send_crash:
try: sock_close.close()
except: pass
logger.debug(f"Consensus [Round {current_round_id}]: After send-time crashes, active peers: {len(self.peers)}. Expected responses: {self.expected_responses}") # Less critical
if self.current_round_event and not self.current_round_event.is_set():
logger.debug(f"Consensus [Round {current_round_id}]: Waiting for responses... Expected: {self.expected_responses}, Received: {self.received_responses}") # Less critical
wait_start_time = time.time()
wait_end_time = wait_start_time + PROPOSAL_COLLECTION_TIMEOUT
peers_to_timeout_this_loop = []
while time.time() < wait_end_time and not self.current_round_event.is_set():
with self.round_sync_lock:
now = time.time()
for peer_id, status in list(self.peers_response_status.items()):
if not status['responded'] and now > status['timeout']:
logger.debug(f"Consensus [Round {current_round_id}]: Peer {peer_id} timed out for proposal.") # IMPORTANT if it happens
peers_to_timeout_this_loop.append(peer_id)
del self.peers_response_status[peer_id]
self.expected_responses = max(0, self.expected_responses - 1)
if self.expected_responses == 0 or (self.expected_responses > 0 and self.received_responses >= self.expected_responses):
logger.debug(f"Consensus [Round {current_round_id}]: All expected responses received or peers timed out. Setting event.") # Less critical
if self.current_round_event: self.current_round_event.set()
self.current_round_event.wait(0.01)
if peers_to_timeout_this_loop:
sockets_to_close_timeout = []
with self.peers_lock:
for peer_id_timeout in peers_to_timeout_this_loop:
if peer_id_timeout in self.peers:
removed_sock = self.peers.pop(peer_id_timeout, None)
if removed_sock: sockets_to_close_timeout.append(removed_sock)
logger.debug(f"Consensus [Round {current_round_id}]: Removed peer {peer_id_timeout} due to response timeout.") # Less critical
for sock_close in sockets_to_close_timeout:
try: sock_close.close()
except: pass
if not self.current_round_event.is_set():
logger.debug(f"Consensus [Round {current_round_id}]: Overall proposal collection timed out after {PROPOSAL_COLLECTION_TIMEOUT}s.") # IMPORTANT
self.current_round_event.set()
# else:
# logger.debug(f"Consensus [Round {current_round_id}]: Proposal collection wait completed. Received: {self.received_responses}") # Less critical
with self.proposals_gate_lock:
self.proposals_submission_gate_event.clear()
logger.debug(f"Consensus [Round {current_round_id}]: Proposal submission gate is now closed.") # Less critical
final_collected_proposals_for_decision = {}
with self.all_proposals_lock:
current_round_proposals_map = self.all_proposals_by_round.get(current_round_id, {})
for peer_id, proposal_content in current_round_proposals_map.items():
if proposal_content:
final_collected_proposals_for_decision[peer_id] = proposal_content
logger.debug(f"Consensus [Round {current_round_id}]: Collected {len(final_collected_proposals_for_decision)} peer proposals for decision.") # IMPORTANT
all_proposals_for_decision_list = [my_proposal] + [p for p in final_collected_proposals_for_decision.values() if p is not None]
logger.debug(f"Consensus [Round {current_round_id}]: Total proposals for decision: {len(all_proposals_for_decision_list)}") # Covered by above generally
#for i, prop in enumerate(all_proposals_for_decision_list):
# if prop and prop.get("transactions"):
# logger.debug(f"Consensus [Round {current_round_id}]: Printing all proposals for decision:")
# logger.debug(f"Consensus [Round {current_round_id}]: Proposal {i+1}/{len(all_proposals_for_decision_list)} (Hash: {prop.get('current_hash', 'N/A')[:8]}):")
# logger.debug(json.dumps(prop, sort_keys=True, indent=2, separators=(',', ': ')))
decided_block = self._decide(all_proposals_for_decision_list)
decided_block_summary = decided_block.get('current_hash', 'N/A') if decided_block else 'None'
if decided_block and decided_block.get('transactions'):
decided_block_summary = f"Block with {len(decided_block['transactions'])} txs, hash: {decided_block.get('current_hash', 'N/A')[:8]}..."
elif decided_block:
decided_block_summary = f"Empty block, hash: {decided_block.get('current_hash', 'N/A')[:8]}..."
logger.debug(f"Consensus [Round {current_round_id}]: Decided block: {decided_block_summary}") # IMPORTANT
current_active_peer_count = 0
with self.peers_lock:
current_active_peer_count = len(self.peers)
num_active_nodes = current_active_peer_count + 1
required_nodes_for_consensus = math.ceil((self.n_total_nodes - 1) / 2) - 1
logger.debug(f"Consensus [Round {current_round_id}]: Commit check. Active nodes: {num_active_nodes}/{self.n_total_nodes}. Required (strictly >): {required_nodes_for_consensus}") # IMPORTANT
if num_active_nodes > required_nodes_for_consensus:
if decided_block:
logger.debug(f"Consensus [Round {current_round_id}]: Committing decided block. Hash: {decided_block.get('current_hash')}") # Covered by the actual print
try:
# Attempt to add the block first
successfully_added = add_block(decided_block) # blockchain.add_block now returns bool
if successfully_added:
# Only print and process further if the block was actually new and added
print(json.dumps(decided_block, sort_keys=True, indent=2, separators=(',', ': ')), flush=True) # CRITICAL OUTPUT
remove_transactions(decided_block.get('transactions', []))
self._revalidate_mempool()
else:
# Block was a duplicate or didn't connect, add_block in blockchain.py already printed a debug/error
logger.debug(f"Consensus [Round {current_round_id}]: Decided block was not added to chain (duplicate or connection issue).")
except Exception as e:
logger.error(f"During commit in Round {current_round_id}: {type(e).__name__} - {e}") # IMPORTANT
pass # Allow to continue if commit failed for other reasons after add_block check
else:
logger.debug(f"Consensus [Round {current_round_id}]: Decision resulted in no block. Nothing to commit.") # IMPORTANT
pass
else:
logger.debug(f"Consensus [Round {current_round_id}]: Commit skipped. Not enough active nodes ({num_active_nodes} <= {required_nodes_for_consensus}).") # IMPORTANT
pass
def _decide(self, proposals: list[dict]) -> dict | None:
"""
Choose the block to commit:
- Filter out any None proposals first.
- If any proposal has transactions, ignore empty proposals.
- Select the block with lexicographically smallest current_hash.
Returns the decided block, or None if no valid proposals were available.
"""
if not proposals:
return None
valid_proposals = [p for p in proposals if p is not None] # Filter out None objects
if not valid_proposals:
return None
non_empty_tx_proposals = [p for p in valid_proposals if p.get('transactions')]
candidates_to_choose_from = [] # Corrected variable name
if non_empty_tx_proposals:
candidates_to_choose_from = non_empty_tx_proposals
else:
logger.debug(f"Consensus [_decide]: No proposals with transactions. proposals_count={len(valid_proposals)}") # Potentially important if expecting blocks
return None
return min(candidates_to_choose_from, key=lambda blk: blk.get('current_hash', chr(255) * 64))
def _revalidate_mempool(self) -> None:
"""
Re-validate all transactions in the mempool after a block is committed.
Remove any transactions that are now invalid due to nonce conflicts.
"""
current_mempool = pool_snapshot()
if not current_mempool:
return # No transactions in mempool to revalidate
logger.debug(f"Consensus [_revalidate_mempool]: Revalidating {len(current_mempool)} transactions.") # Less critical
invalid_txs = []
nonce_store_cache = {} # Renamed to avoid conflict
for tx_id, tx in current_mempool.items(): # Iterate over items() for tx_id and tx content
sender = tx.get('sender')
if not sender:
invalid_txs.append(tx)
continue
if sender not in nonce_store_cache: # Cache confirmed nonce lookups
nonce_store_cache[sender] = get_confirmed_nonce(sender)
if not is_valid_nonce(tx, {sender: nonce_store_cache[sender]}, {}): # Pass only relevant part of nonce_store
logger.debug(f"Consensus [_revalidate_mempool]: Transaction {tx_id[:8]} for sender {sender[:8]} (nonce {tx.get('nonce')}) is now invalid against confirmed nonce {nonce_store_cache[sender]}. Marking for removal.") # Less critical
invalid_txs.append(tx)
if invalid_txs:
logger.debug(f"Consensus [_revalidate_mempool]: Removing {len(invalid_txs)} invalid transactions from mempool.") # IMPORTANT
remove_transactions(invalid_txs)