Conversation
- Added Kademlia DHT support to `Libp2pAdapter`. - Implemented `DhtPut` and `DhtGet` commands in the swarm loop. - Enhanced `Libp2pSyncAdapter` with a periodic background sync task. - Added memory metadata synchronization via DHT. - Updated tests and mocks in `sync_adapter_test.rs`. - Fixed DHT routing by adding peer addresses during mDNS discovery. - Added a shutdown mechanism for the sync adapter. Co-authored-by: iberi22 <10615454+iberi22@users.noreply.github.com>
|
👋 Jules, reporting for duty! I'm here to lend a hand with this pull request. When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down. I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job! For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with New to Jules? Learn more at jules.google/docs. For security, I will only act on instructions from the user who triggered this task. |
📝 WalkthroughWalkthroughImplements the Libp2p P2P Sync feature by adding Kad DHT support to the adapter with put/get operations, introducing background memory synchronization tasks, and integrating memory metadata comparison with peers. The feature now passes all designated tests. Changes
Sequence Diagram(s)sequenceDiagram
participant BG as Background Task
participant Sync as Libp2pSyncAdapter
participant Net as NetworkPort
participant DHT as Kad DHT
participant Peers as Connected Peers
participant Mem as MemoryPort
rect rgba(100, 150, 200, 0.5)
Note over BG,Mem: Every 60 seconds
end
BG->>Sync: trigger sync cycle
rect rgba(150, 100, 200, 0.5)
Note over Sync,Net: Peer Synchronization
end
Sync->>Sync: sync_peers()
Sync->>Net: query connected peers
rect rgba(200, 150, 100, 0.5)
Note over Sync,Mem: Memory Synchronization
end
Sync->>Mem: fetch_last_node()
Mem-->>Sync: MemoryNode
Sync->>DHT: publish memory metadata (DhtPut)
DHT-->>Sync: put confirmation
Sync->>Peers: compare memory timestamps
Peers-->>Sync: peer memory versions
Sync->>Sync: log out-of-date peers
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes The diff introduces heterogeneous changes spanning DHT architecture, background task management, and memory synchronization logic with significant density of new state management (pending_gets/pending_puts maps), event handling paths, and cross-component coordination. While individual file changes follow consistent patterns, the variety of concerns (Kad integration, async task lifecycle, DHT serialization, memory sync flow) demands separate reasoning for each cohort. Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the network's peer-to-peer synchronization capabilities by integrating a Distributed Hash Table (DHT) using Kademlia. This integration enables nodes to share and discover information about their availability and the latest updates to their memory, laying the groundwork for a robust and decentralized data reconciliation system. The changes introduce asynchronous communication patterns for DHT operations and establish a continuous background process to maintain network consistency. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces significant functionality by implementing P2P synchronization using Libp2p's Kademlia DHT. The changes are well-structured, adding DHT capabilities to the libp2p_adapter and creating a new libp2p_sync_adapter for periodic data reconciliation. The use of oneshot channels for asynchronous DHT operations and a background task for periodic sync are solid design choices. My feedback focuses on refactoring some complex logic for better readability and using more idiomatic Rust patterns to improve code clarity and maintainability.
| match result { | ||
| kad::QueryResult::GetRecord(Ok(ok)) => { | ||
| if let Some(sender) = pending_gets.remove(&id) { | ||
| match ok { | ||
| kad::GetRecordOk::FoundRecord(peer_record) => { | ||
| let _ = sender.send(Ok(Some(peer_record.record.value))); | ||
| } | ||
| _ => { | ||
| let _ = sender.send(Ok(None)); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| kad::QueryResult::GetRecord(Err(e)) => { | ||
| if let Some(sender) = pending_gets.remove(&id) { | ||
| match e { | ||
| kad::GetRecordError::NotFound { .. } => { | ||
| let _ = sender.send(Ok(None)); | ||
| } | ||
| _ => { | ||
| let _ = sender.send(Err(Error::System(format!("DHT get error: {:?}", e)))); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| kad::QueryResult::PutRecord(Ok(_)) => { | ||
| if let Some(sender) = pending_puts.remove(&id) { | ||
| let _ = sender.send(Ok(())); | ||
| } | ||
| } | ||
| kad::QueryResult::PutRecord(Err(e)) => { | ||
| if let Some(sender) = pending_puts.remove(&id) { | ||
| let _ = sender.send(Err(Error::System(format!("DHT put error: {:?}", e)))); | ||
| } | ||
| } | ||
| _ => {} | ||
| } |
There was a problem hiding this comment.
The logic for handling Kademlia query results is quite nested and repeats the if let Some(sender) = ... check. This can be refactored to be more linear and easier to read by handling GetRecord and PutRecord results in a more consolidated way. This improves readability and maintainability.
match result {
kad::QueryResult::GetRecord(result) => {
if let Some(sender) = pending_gets.remove(&id) {
let response = match result {
Ok(kad::GetRecordOk::FoundRecord(peer_record)) => Ok(Some(peer_record.record.value)),
Ok(_) => Ok(None),
Err(kad::GetRecordError::NotFound { .. }) => Ok(None),
Err(e) => Err(Error::System(format!("DHT get error: {:?}", e))),
};
let _ = sender.send(response);
}
}
kad::QueryResult::PutRecord(result) => {
if let Some(sender) = pending_puts.remove(&id) {
let response = result.map(|_| ()).map_err(|e| Error::System(format!("DHT put error: {:?}", e)));
let _ = sender.send(response);
}
}
_ => {}
}| if value.len() == 8 { | ||
| let mut bytes = [0u8; 8]; | ||
| bytes.copy_from_slice(&value); | ||
| let peer_last_update = i64::from_be_bytes(bytes); | ||
|
|
||
| if peer_last_update > last_update { | ||
| info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update); | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic to convert the Vec<u8> to [u8; 8] can be made more concise and idiomatic by using try_from. This avoids manually checking the length and then copying the slice, making the code safer and cleaner.
if let Ok(bytes) = <[u8; 8]>::try_from(value.as_slice()) {
let peer_last_update = i64::from_be_bytes(bytes);
if peer_last_update > last_update {
info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
}
}There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/synapse-infra/src/adapters/libp2p_adapter.rs (1)
89-93:⚠️ Potential issue | 🔴 CriticalReplace
SwarmBuilder::with_new_identity()withSwarmBuilder::with_existing_identity(local_key)at line 134.The code creates
local_keyandlocal_peer_idat lines 90–91, which are used consistently by gossipsub (line 126) and Kademlia (line 131). However,SwarmBuilder::with_new_identity()generates a separate identity, causing peer ID misalignment: DHT keys derive from the manual peer ID while the swarm advertises a different identity to the network. This breaks DHT record consistency and gossipsub topic keying.Use the libp2p API
SwarmBuilder::with_existing_identity(local_key)to ensure all subsystems (swarm, gossipsub, DHT) share a single identity source.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/synapse-infra/src/adapters/libp2p_adapter.rs` around lines 89 - 93, The swarm is currently constructed with SwarmBuilder::with_new_identity(), which creates a different identity than the manually generated local_key/local_peer_id used for gossipsub and Kademlia; replace the call to SwarmBuilder::with_new_identity() with SwarmBuilder::with_existing_identity(local_key) so the swarm, gossipsub, and DHT all use the same identity (local_key/local_peer_id) and prevent DHT/gossipsub key inconsistencies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs`:
- Around line 99-110: The DHT read currently ignores errors and malformed
payloads because it uses if let Ok(Some(value)) =
self.network_port.dht_get(key).await; change this to match on the Result from
self.network_port.dht_get(key).await so you explicitly handle Err(e) (log error
with the peer_id and e), handle Ok(None) (log debug/warn that no value was
found), and for Ok(Some(value)) validate value.len() == 8 else log a warning
about malformed payload before attempting bytes.copy_from_slice; keep the
existing peer_last_update comparison and log, but ensure all failure cases are
surfaced via logging (and return/propagate an error if appropriate for the
surrounding memory sync flow).
- Around line 49-77: The start() method can be called multiple times and will
spawn duplicate background tasks; add an idempotence guard by introducing an
AtomicBool (e.g., started) on the adapter struct and in start() do a
compare_exchange(false, true) to atomically mark started and return early (or
log/Err) if it was already true; keep existing shutdown() behavior using
shutdown_tx and do not change sync_peers()/sync_memories() — this prevents
duplicate tokio::spawn tasks and documents single-call semantics for start().
---
Outside diff comments:
In `@crates/synapse-infra/src/adapters/libp2p_adapter.rs`:
- Around line 89-93: The swarm is currently constructed with
SwarmBuilder::with_new_identity(), which creates a different identity than the
manually generated local_key/local_peer_id used for gossipsub and Kademlia;
replace the call to SwarmBuilder::with_new_identity() with
SwarmBuilder::with_existing_identity(local_key) so the swarm, gossipsub, and DHT
all use the same identity (local_key/local_peer_id) and prevent DHT/gossipsub
key inconsistencies.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.gitcore/features.jsoncrates/synapse-infra/src/adapters/libp2p_adapter.rscrates/synapse-infra/src/adapters/libp2p_sync_adapter.rscrates/synapse-infra/tests/sync_adapter_test.rs
| pub fn start(self: Arc<Self>) { | ||
| let adapter = self.clone(); | ||
| let mut shutdown_rx = self.shutdown_tx.subscribe(); | ||
| tokio::spawn(async move { | ||
| let mut interval = time::interval(Duration::from_secs(60)); | ||
| loop { | ||
| tokio::select! { | ||
| _ = interval.tick() => { | ||
| if let Err(e) = adapter.sync_peers().await { | ||
| error!("Peer sync failed: {}", e); | ||
| } | ||
| if let Err(e) = adapter.sync_memories().await { | ||
| error!("Memory sync failed: {}", e); | ||
| } | ||
| } | ||
| _ = shutdown_rx.changed() => { | ||
| if *shutdown_rx.borrow() { | ||
| info!("Shutting down sync adapter background task"); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| pub fn shutdown(&self) { | ||
| let _ = self.shutdown_tx.send(true); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all call sites for Libp2pSyncAdapter creation, start, and shutdown
rg -n -C3 'Libp2pSyncAdapter::new|\.start\(|\.shutdown\(' cratesRepository: iberi22/synapse-protocol
Length of output: 3726
🌐 Web query:
tokio sync watch receiver semantics: what happens when a receiver subscribes after sender has already sent a value
💡 Result:
In tokio::sync::watch, a receiver that subscribes after a value has already been sent will:
- Immediately see the latest value when it calls
rx.borrow()/rx.borrow_and_update()(watch only retains the most recent value). (docs.rs) - Consider that current value “seen” at subscribe time, meaning
rx.changed().awaitwill not complete for that already-sent value; it will wait for the next send after the subscription. (docs.rs)
So: late subscribers don’t get a “change notification” for the current value, but they can still read it immediately. (docs.rs)
Citations:
- 1: https://docs.rs/tokio/latest/tokio/sync/watch/?utm_source=openai
- 2: https://docs.rs/tokio/latest/tokio/sync/watch/?utm_source=openai
- 3: https://docs.rs/tokio/latest/tokio/sync/watch/?utm_source=openai
🏁 Script executed:
#!/bin/bash
# Check the complete Libp2pSyncAdapter struct definition and instantiation
rg -n -A20 'impl Libp2pSyncAdapter' crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs | head -80Repository: iberi22/synapse-protocol
Length of output: 811
🏁 Script executed:
#!/bin/bash
# Check if start() is actually called anywhere in production (not tests)
rg -n '\.start\(' crates --glob='!*test*' --glob='!**/tests/**'Repository: iberi22/synapse-protocol
Length of output: 144
Guard start() against multiple calls and add restart capability or document single-call semantics.
The start() method lacks idempotence guards: calling it multiple times spawns duplicate background tasks. Additionally, the one-way watch channel makes restart impossible after shutdown()—a late subscriber will see the true value immediately but won't trigger changed(), causing the task to exit on re-entry.
Either add a flag to track whether start() has been called (rejecting duplicates), make the adapter reusable with a reset mechanism, or document that start() must only be called once per instance.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs` around lines 49 -
77, The start() method can be called multiple times and will spawn duplicate
background tasks; add an idempotence guard by introducing an AtomicBool (e.g.,
started) on the adapter struct and in start() do a compare_exchange(false, true)
to atomically mark started and return early (or log/Err) if it was already true;
keep existing shutdown() behavior using shutdown_tx and do not change
sync_peers()/sync_memories() — this prevents duplicate tokio::spawn tasks and
documents single-call semantics for start().
| let key = memory_sync_key(&peer_id); | ||
| if let Ok(Some(value)) = self.network_port.dht_get(key).await { | ||
| if value.len() == 8 { | ||
| let mut bytes = [0u8; 8]; | ||
| bytes.copy_from_slice(&value); | ||
| let peer_last_update = i64::from_be_bytes(bytes); | ||
|
|
||
| if peer_last_update > last_update { | ||
| info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Don’t silently drop DHT read failures in memory sync.
The current if let Ok(Some(value)) = ... path suppresses DHT errors and malformed payloads, so sync can report success while peer metadata reads are failing.
Suggested patch
- let key = memory_sync_key(&peer_id);
- if let Ok(Some(value)) = self.network_port.dht_get(key).await {
- if value.len() == 8 {
- let mut bytes = [0u8; 8];
- bytes.copy_from_slice(&value);
- let peer_last_update = i64::from_be_bytes(bytes);
-
- if peer_last_update > last_update {
- info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
- }
- }
- }
+ let key = memory_sync_key(&peer_id);
+ match self.network_port.dht_get(key).await {
+ Ok(Some(value)) if value.len() == 8 => {
+ let mut bytes = [0u8; 8];
+ bytes.copy_from_slice(&value);
+ let peer_last_update = i64::from_be_bytes(bytes);
+
+ if peer_last_update > last_update {
+ info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
+ }
+ }
+ Ok(Some(value)) => {
+ warn!("Invalid memory sync payload size from {}: {}", peer_id, value.len());
+ }
+ Ok(None) => {}
+ Err(e) => {
+ warn!("Failed to read memory sync marker for {}: {}", peer_id, e);
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let key = memory_sync_key(&peer_id); | |
| if let Ok(Some(value)) = self.network_port.dht_get(key).await { | |
| if value.len() == 8 { | |
| let mut bytes = [0u8; 8]; | |
| bytes.copy_from_slice(&value); | |
| let peer_last_update = i64::from_be_bytes(bytes); | |
| if peer_last_update > last_update { | |
| info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update); | |
| } | |
| } | |
| } | |
| let key = memory_sync_key(&peer_id); | |
| match self.network_port.dht_get(key).await { | |
| Ok(Some(value)) if value.len() == 8 => { | |
| let mut bytes = [0u8; 8]; | |
| bytes.copy_from_slice(&value); | |
| let peer_last_update = i64::from_be_bytes(bytes); | |
| if peer_last_update > last_update { | |
| info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update); | |
| } | |
| } | |
| Ok(Some(value)) => { | |
| warn!("Invalid memory sync payload size from {}: {}", peer_id, value.len()); | |
| } | |
| Ok(None) => {} | |
| Err(e) => { | |
| warn!("Failed to read memory sync marker for {}: {}", peer_id, e); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs` around lines 99 -
110, The DHT read currently ignores errors and malformed payloads because it
uses if let Ok(Some(value)) = self.network_port.dht_get(key).await; change this
to match on the Result from self.network_port.dht_get(key).await so you
explicitly handle Err(e) (log error with the peer_id and e), handle Ok(None)
(log debug/warn that no value was found), and for Ok(Some(value)) validate
value.len() == 8 else log a warning about malformed payload before attempting
bytes.copy_from_slice; keep the existing peer_last_update comparison and log,
but ensure all failure cases are surfaced via logging (and return/propagate an
error if appropriate for the surrounding memory sync flow).
This PR implements the core functionality for Libp2p P2P synchronization. It enables nodes to share their availability and memory update status over a distributed hash table (DHT). Key changes include integrating Kademlia into the libp2p swarm, implementing async DHT operations with oneshot channels, and establishing a periodic background synchronization loop that reconciles peer info and memory timestamps.
Fixes #679
PR created automatically by Jules for task 6992677833960834598 started by @iberi22
Summary by CodeRabbit