From cc4842bf55bdbb4bce20de268bc72b96477af878 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 3 Jun 2026 19:59:59 +0200 Subject: [PATCH 1/2] Raft: add nodes to shards dict on NODE_JOIN apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nodes joining via NODE_JOIN were added to the nodes dict but not the shards dict, making them invisible in CLUSTER SHARDS responses. Add clusterAddNodeToShard in both paths: when creating a fresh node (on followers that never saw the MEET) and when transitioning an existing MEET-flagged node to a full member. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 1c0adc51fd5..dd1cfc5bae7 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -928,13 +928,15 @@ static void raftLogApply(raftLogEntry *e) { /* Update address for existing node. */ clusterNodeParseAddressString(existing, argv[1]); } - /* Only count the first NODE_JOIN for each node. New nodes - * don't have HANDSHAKE. Existing nodes from handshake do. */ + /* Increment size on first official join. Nodes created from MEET + * handshake have CLUSTER_NODE_MEET set; clear it now. Nodes created + * fresh (on followers) don't exist yet. Either way, count once. */ clusterNode *joined = existing ? existing : clusterLookupNode(argv[0], CLUSTER_NAMELEN); if (joined) { if (!existing || (joined->flags & CLUSTER_NODE_MEET)) { joined->flags &= ~CLUSTER_NODE_MEET; server.cluster->size++; + clusterAddNodeToShard(joined->shard_id, joined); /* Process deferred MEET messages now that we're in a cluster. */ if (server.cluster->size > 1 && listLength(rs->deferred_meets) > 0) { From d75a8fcdb3dfb3c6350a3112ab8544fe7a0273ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 3 Jun 2026 20:01:58 +0200 Subject: [PATCH 2/2] Raft: fix REPL_OFFSETS broadcast for replica health tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes to replication offset propagation: 1. Broadcast when offset drops from non-zero to 0 (replica starts full resync), not just 0 to non-zero. This lets peers report the replica's health as "loading" in CLUSTER SHARDS. 2. Include zero offsets in the periodic broadcast. Previously nodes with offset 0 were skipped, so a peer that missed the immediate broadcast (e.g. brief disconnection) would never learn the correct value. This also affected the raft leader's own offset when it is a data replica. 3. Initialize the broadcast timer to startup time so the first periodic broadcast is deferred by 10 seconds, avoiding unnecessary traffic during cluster formation. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index dd1cfc5bae7..e1ad584acd0 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -273,7 +273,6 @@ static clusterMsgSendBlock *clusterRaftBuildAllOffsetsMsg(void) { if (n->flags & CLUSTER_NODE_MEET) continue; long long off = (n == myself) ? getNodeReplicationOffset(myself) : n->repl_offset; - if (off == 0) continue; msg = sdscatlen(msg, " ", 1); msg = sdscatlen(msg, n->name, CLUSTER_NAMELEN); msg = sdscatfmt(msg, " %I", off); @@ -1311,8 +1310,10 @@ static int clusterRaftProcessAppendEntriesResponse(clusterLink *link, int argc, node->repl_offset = follower_repl_offset; /* Broadcast this node's offset to all peers when it transitions - * from 0 to non-zero (e.g. replica finishes initial sync). */ - if (prev_offset == 0 && follower_repl_offset > 0 && !(node->flags & CLUSTER_NODE_MEET)) { + * between 0 and non-zero (replica finishes sync or starts resync). */ + if (prev_offset != follower_repl_offset && + (prev_offset == 0 || follower_repl_offset == 0) && + !(node->flags & CLUSTER_NODE_MEET)) { clusterRaftBroadcastNodeOffset(node, follower_repl_offset); } @@ -1520,6 +1521,7 @@ static void clusterRaftInit(void) { rs->deferred_meets = listCreate(); rs->my_last_committed_info = sdsempty(); rs->last_node_info_check = monotonicMs(); + rs->last_repl_offsets_broadcast = monotonicMs(); server.cluster->size = 0; /* Incremented by NODE_JOIN apply */ }