diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 1c0adc51fd5..e1ad584acd0 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -273,7 +273,6 @@ static clusterMsgSendBlock *clusterRaftBuildAllOffsetsMsg(void) { if (n->flags & CLUSTER_NODE_MEET) continue; long long off = (n == myself) ? getNodeReplicationOffset(myself) : n->repl_offset; - if (off == 0) continue; msg = sdscatlen(msg, " ", 1); msg = sdscatlen(msg, n->name, CLUSTER_NAMELEN); msg = sdscatfmt(msg, " %I", off); @@ -928,13 +927,15 @@ static void raftLogApply(raftLogEntry *e) { /* Update address for existing node. */ clusterNodeParseAddressString(existing, argv[1]); } - /* Only count the first NODE_JOIN for each node. New nodes - * don't have HANDSHAKE. Existing nodes from handshake do. */ + /* Increment size on first official join. Nodes created from MEET + * handshake have CLUSTER_NODE_MEET set; clear it now. Nodes created + * fresh (on followers) don't exist yet. Either way, count once. */ clusterNode *joined = existing ? existing : clusterLookupNode(argv[0], CLUSTER_NAMELEN); if (joined) { if (!existing || (joined->flags & CLUSTER_NODE_MEET)) { joined->flags &= ~CLUSTER_NODE_MEET; server.cluster->size++; + clusterAddNodeToShard(joined->shard_id, joined); /* Process deferred MEET messages now that we're in a cluster. */ if (server.cluster->size > 1 && listLength(rs->deferred_meets) > 0) { @@ -1309,8 +1310,10 @@ static int clusterRaftProcessAppendEntriesResponse(clusterLink *link, int argc, node->repl_offset = follower_repl_offset; /* Broadcast this node's offset to all peers when it transitions - * from 0 to non-zero (e.g. replica finishes initial sync). */ - if (prev_offset == 0 && follower_repl_offset > 0 && !(node->flags & CLUSTER_NODE_MEET)) { + * between 0 and non-zero (replica finishes sync or starts resync). */ + if (prev_offset != follower_repl_offset && + (prev_offset == 0 || follower_repl_offset == 0) && + !(node->flags & CLUSTER_NODE_MEET)) { clusterRaftBroadcastNodeOffset(node, follower_repl_offset); } @@ -1518,6 +1521,7 @@ static void clusterRaftInit(void) { rs->deferred_meets = listCreate(); rs->my_last_committed_info = sdsempty(); rs->last_node_info_check = monotonicMs(); + rs->last_repl_offsets_broadcast = monotonicMs(); server.cluster->size = 0; /* Incremented by NODE_JOIN apply */ }