diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 3f03e2440c7..b843c8dadcb 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -329,6 +329,49 @@ static void clusterRaftRandomizeElectionTimeout(void) { RAFT_STATE()->election_timeout = base + (rand() % base); } +static int clusterRaftQuorum(void) { + return server.cluster->size / 2 + 1; +} + +/* Leader step-down to follower. */ +static void clusterRaftStepDown(mstime_t now, const char *reason) { + clusterRaftState *rs = RAFT_STATE(); + clusterRaftDeferPendingProposals(); + rs->role = RAFT_ROLE_FOLLOWER; + rs->votes_received = 0; + memset(rs->voted_for, 0, CLUSTER_NAMELEN); + memset(rs->leader, 0, CLUSTER_NAMELEN); + clusterRaftRandomizeElectionTimeout(); + rs->last_heartbeat = now; + rs->todo_save_config = 1; + serverLog(LL_NOTICE, "Stepping down to follower: %s.", reason); +} + +/* Return non-zero if the leader still has a recently responsive voting + * quorum based on follower AE_ACK timing. */ +static int clusterRaftLeaderHasFreshQuorum(mstime_t now) { + clusterRaftState *rs = RAFT_STATE(); + if (rs->role != RAFT_ROLE_LEADER || server.cluster->size <= 1) return 1; + + int fresh = 1; /* Self */ + int quorum = clusterRaftQuorum(); + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node == myself) continue; + if (node->flags & CLUSTER_NODE_MEET) continue; + clusterNodeRaftData *rd = RAFT_NODE(node); + if (rd->last_ack_time > 0 && + now - rd->last_ack_time <= server.cluster_node_timeout) { + fresh++; + if (fresh >= quorum) break; + } + } + dictReleaseIterator(di); + return fresh >= quorum; +} + /* Singleton leader steps down to joiner when joining another cluster. */ static void clusterRaftSingletonStepDown(void) { clusterRaftState *rs = RAFT_STATE(); @@ -818,16 +861,11 @@ static void clusterRaftDeferPendingProposals(void) { } /* Step down to follower if we see a higher term. Returns 1 if stepped down. */ -static int clusterRaftMaybeStepDown(clusterRaftState *rs, uint64_t term) { +static int clusterRaftMaybeStepDown(uint64_t term) { + clusterRaftState *rs = RAFT_STATE(); if (term > rs->current_term) { - clusterRaftDeferPendingProposals(); rs->current_term = term; - rs->role = RAFT_ROLE_FOLLOWER; - memset(rs->voted_for, 0, CLUSTER_NAMELEN); - memset(rs->leader, 0, CLUSTER_NAMELEN); - clusterRaftRandomizeElectionTimeout(); - rs->last_heartbeat = monotonicMs(); - rs->todo_save_config = 1; + clusterRaftStepDown(monotonicMs(), "observed higher term"); return 1; } return 0; @@ -1236,7 +1274,7 @@ static int clusterRaftProcessAppendEntries(clusterLink *link, int argc, sds *arg return 1; } - clusterRaftMaybeStepDown(rs, msg_term); + clusterRaftMaybeStepDown(msg_term); /* Accept heartbeat. */ if (rs->role != RAFT_ROLE_JOINER) rs->role = RAFT_ROLE_FOLLOWER; @@ -1310,7 +1348,7 @@ static int clusterRaftProcessAppendEntriesResponse(clusterLink *link, int argc, uint64_t follower_last_index = strtoull(argv[3], NULL, 10); long long follower_repl_offset = (argc >= 5) ? strtoll(argv[4], NULL, 10) : 0; - clusterRaftMaybeStepDown(rs, msg_term); + clusterRaftMaybeStepDown(msg_term); if (rs->role != RAFT_ROLE_LEADER) return 1; clusterNode *node = link->node; @@ -1360,7 +1398,7 @@ static int clusterRaftProcessAppendEntriesResponse(clusterLink *link, int argc, } dictReleaseIterator(di); - int quorum = server.cluster->size / 2 + 1; + int quorum = clusterRaftQuorum(); if (matches >= quorum) { rs->commit_index = idx; break; @@ -1420,7 +1458,7 @@ static int clusterRaftProcessRequestVote(clusterLink *link, int argc, sds *argv) uint64_t msg_term = strtoull(argv[2], NULL, 10); int granted = 0; - clusterRaftMaybeStepDown(rs, msg_term); + clusterRaftMaybeStepDown(msg_term); if (msg_term < rs->current_term) { /* Stale term. */ @@ -1460,14 +1498,14 @@ static int clusterRaftProcessRequestVoteResponse(clusterLink *link, int argc, sd uint64_t msg_term = strtoull(argv[1], NULL, 10); int granted = atoi(argv[2]); - clusterRaftMaybeStepDown(rs, msg_term); + clusterRaftMaybeStepDown(msg_term); if (rs->role != RAFT_ROLE_CANDIDATE) return 1; if (msg_term != rs->current_term) return 1; if (granted) { rs->votes_received++; - int quorum = server.cluster->size / 2 + 1; + int quorum = clusterRaftQuorum(); if (rs->votes_received >= quorum) { char old_leader[CLUSTER_NAMELEN]; memcpy(old_leader, rs->leader, CLUSTER_NAMELEN); @@ -1518,7 +1556,7 @@ static void clusterRaftStartElection(void) { serverLog(LL_NOTICE, "Starting Raft election (term %llu).", (unsigned long long)rs->current_term); /* Single-node: already have quorum. */ - int quorum = server.cluster->size / 2 + 1; + int quorum = clusterRaftQuorum(); if (rs->votes_received >= quorum) { rs->role = RAFT_ROLE_LEADER; memcpy(rs->leader, myself->name, CLUSTER_NAMELEN); @@ -1568,42 +1606,13 @@ static void clusterRaftInitLast(void) { } } -/* Leader: detect node failures and propose NODE_FAIL. - * If a majority of peers are overdue, the problem is likely on our side - * (paused or partitioned) — reset ack times to avoid false positives. */ +/* Leader: detect node failures and propose NODE_FAIL. */ static void clusterRaftDetectFailures(mstime_t now) { mstime_t node_timeout = server.cluster_node_timeout; - /* Check if a majority of peers are overdue. */ - int overdue = 0, total = 0; + /* Check individual nodes. */ dictIterator *di = dictGetSafeIterator(server.cluster->nodes); dictEntry *de; - while ((de = dictNext(di)) != NULL) { - clusterNode *n = dictGetVal(de); - if (n == myself) continue; - total++; - clusterNodeRaftData *rd = RAFT_NODE(n); - if (rd->last_ack_time > 0 && - now - rd->last_ack_time > node_timeout) { - overdue++; - } - } - dictReleaseIterator(di); - - if (overdue > total / 2) { - /* Majority overdue — reset all ack times. */ - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *n = dictGetVal(de); - if (n == myself) continue; - RAFT_NODE(n)->last_ack_time = now; - } - dictReleaseIterator(di); - return; - } - - /* Check individual nodes. */ - di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); if (node == myself) continue; @@ -1739,6 +1748,11 @@ static void clusterRaftCron(void) { clusterRaftStartElection(); } + if (rs->role == RAFT_ROLE_LEADER && + !clusterRaftLeaderHasFreshQuorum(now)) { + clusterRaftStepDown(now, "lost quorum freshness"); + } + /* Expire stale pending proposals to prevent unbounded buildup * during prolonged partitions. Timeout covers election detection * (1-2x node_timeout), election itself (~1x), plus margin. */ diff --git a/tests/unit/cluster/cluster-raft.tcl b/tests/unit/cluster/cluster-raft.tcl new file mode 100644 index 00000000000..83afd841ea0 --- /dev/null +++ b/tests/unit/cluster/cluster-raft.tcl @@ -0,0 +1,44 @@ +# Test higher-level Raft cluster behavior that does not require direct +# wire-protocol interaction from Tcl. + +tags {external:skip cluster singledb} { + +test "Raft: leader steps down after losing quorum freshness" { + start_multiple_servers 3 {overrides {cluster-enabled yes cluster-protocol raft cluster-node-timeout 1000}} { + [srv 0 client] CLUSTER MEET [srv -1 host] [srv -1 port] + [srv 0 client] CLUSTER MEET [srv -2 host] [srv -2 port] + + wait_for_condition 50 100 { + [CI 0 cluster_size] == 3 && + [CI 1 cluster_size] == 3 && + [CI 2 cluster_size] == 3 + } else { + fail "Cluster did not form: sizes=[CI 0 cluster_size],[CI 1 cluster_size],[CI 2 cluster_size]" + } + + assert_equal [CI 0 cluster_raft_role] "leader" + set leader_idx 0 + + set paused [list] + foreach idx {0 1 2} { + if {$idx == $leader_idx} continue + pause_process [srv [expr {-$idx}] pid] + lappend paused $idx + } + + wait_for_condition 100 50 { + [CI $leader_idx cluster_raft_role] eq "follower" + } else { + foreach idx $paused { + resume_process [srv [expr {-$idx}] pid] + } + fail "Leader did not step down after losing quorum freshness: role=[CI $leader_idx cluster_raft_role] leader=[CI $leader_idx cluster_raft_leader]" + } + + foreach idx $paused { + resume_process [srv [expr {-$idx}] pid] + } + } +} + +} ;# tags