Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 60 additions & 46 deletions src/cluster_raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,49 @@ static void clusterRaftRandomizeElectionTimeout(void) {
RAFT_STATE()->election_timeout = base + (rand() % base);
}

static int clusterRaftQuorum(void) {
return server.cluster->size / 2 + 1;
}
Comment thread
zuiderkwast marked this conversation as resolved.

/* Leader step-down to follower. */
static void clusterRaftStepDown(mstime_t now, const char *reason) {
clusterRaftState *rs = RAFT_STATE();
clusterRaftDeferPendingProposals();
rs->role = RAFT_ROLE_FOLLOWER;
rs->votes_received = 0;
memset(rs->voted_for, 0, CLUSTER_NAMELEN);
memset(rs->leader, 0, CLUSTER_NAMELEN);
clusterRaftRandomizeElectionTimeout();
rs->last_heartbeat = now;
rs->todo_save_config = 1;
serverLog(LL_NOTICE, "Stepping down to follower: %s.", reason);
}

/* Return non-zero if the leader still has a recently responsive voting
* quorum based on follower AE_ACK timing. */
static int clusterRaftLeaderHasFreshQuorum(mstime_t now) {
clusterRaftState *rs = RAFT_STATE();
if (rs->role != RAFT_ROLE_LEADER || server.cluster->size <= 1) return 1;

int fresh = 1; /* Self */
int quorum = clusterRaftQuorum();
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node == myself) continue;
if (node->flags & CLUSTER_NODE_MEET) continue;
clusterNodeRaftData *rd = RAFT_NODE(node);
if (rd->last_ack_time > 0 &&
now - rd->last_ack_time <= server.cluster_node_timeout) {
fresh++;
if (fresh >= quorum) break;
}
}
dictReleaseIterator(di);
return fresh >= quorum;
}

/* Singleton leader steps down to joiner when joining another cluster. */
static void clusterRaftSingletonStepDown(void) {
clusterRaftState *rs = RAFT_STATE();
Expand Down Expand Up @@ -818,16 +861,11 @@ static void clusterRaftDeferPendingProposals(void) {
}

/* Step down to follower if we see a higher term. Returns 1 if stepped down. */
static int clusterRaftMaybeStepDown(clusterRaftState *rs, uint64_t term) {
static int clusterRaftMaybeStepDown(uint64_t term) {
clusterRaftState *rs = RAFT_STATE();
if (term > rs->current_term) {
clusterRaftDeferPendingProposals();
rs->current_term = term;
rs->role = RAFT_ROLE_FOLLOWER;
memset(rs->voted_for, 0, CLUSTER_NAMELEN);
memset(rs->leader, 0, CLUSTER_NAMELEN);
clusterRaftRandomizeElectionTimeout();
rs->last_heartbeat = monotonicMs();
rs->todo_save_config = 1;
clusterRaftStepDown(monotonicMs(), "observed higher term");
return 1;
}
return 0;
Expand Down Expand Up @@ -1236,7 +1274,7 @@ static int clusterRaftProcessAppendEntries(clusterLink *link, int argc, sds *arg
return 1;
}

clusterRaftMaybeStepDown(rs, msg_term);
clusterRaftMaybeStepDown(msg_term);

/* Accept heartbeat. */
if (rs->role != RAFT_ROLE_JOINER) rs->role = RAFT_ROLE_FOLLOWER;
Expand Down Expand Up @@ -1310,7 +1348,7 @@ static int clusterRaftProcessAppendEntriesResponse(clusterLink *link, int argc,
uint64_t follower_last_index = strtoull(argv[3], NULL, 10);
long long follower_repl_offset = (argc >= 5) ? strtoll(argv[4], NULL, 10) : 0;

clusterRaftMaybeStepDown(rs, msg_term);
clusterRaftMaybeStepDown(msg_term);
if (rs->role != RAFT_ROLE_LEADER) return 1;

clusterNode *node = link->node;
Expand Down Expand Up @@ -1360,7 +1398,7 @@ static int clusterRaftProcessAppendEntriesResponse(clusterLink *link, int argc,
}
dictReleaseIterator(di);

int quorum = server.cluster->size / 2 + 1;
int quorum = clusterRaftQuorum();
if (matches >= quorum) {
rs->commit_index = idx;
break;
Expand Down Expand Up @@ -1420,7 +1458,7 @@ static int clusterRaftProcessRequestVote(clusterLink *link, int argc, sds *argv)
uint64_t msg_term = strtoull(argv[2], NULL, 10);
int granted = 0;

clusterRaftMaybeStepDown(rs, msg_term);
clusterRaftMaybeStepDown(msg_term);

if (msg_term < rs->current_term) {
/* Stale term. */
Expand Down Expand Up @@ -1460,14 +1498,14 @@ static int clusterRaftProcessRequestVoteResponse(clusterLink *link, int argc, sd
uint64_t msg_term = strtoull(argv[1], NULL, 10);
int granted = atoi(argv[2]);

clusterRaftMaybeStepDown(rs, msg_term);
clusterRaftMaybeStepDown(msg_term);

if (rs->role != RAFT_ROLE_CANDIDATE) return 1;
if (msg_term != rs->current_term) return 1;

if (granted) {
rs->votes_received++;
int quorum = server.cluster->size / 2 + 1;
int quorum = clusterRaftQuorum();
if (rs->votes_received >= quorum) {
char old_leader[CLUSTER_NAMELEN];
memcpy(old_leader, rs->leader, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -1518,7 +1556,7 @@ static void clusterRaftStartElection(void) {
serverLog(LL_NOTICE, "Starting Raft election (term %llu).", (unsigned long long)rs->current_term);

/* Single-node: already have quorum. */
int quorum = server.cluster->size / 2 + 1;
int quorum = clusterRaftQuorum();
if (rs->votes_received >= quorum) {
rs->role = RAFT_ROLE_LEADER;
memcpy(rs->leader, myself->name, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -1568,42 +1606,13 @@ static void clusterRaftInitLast(void) {
}
}

/* Leader: detect node failures and propose NODE_FAIL.
* If a majority of peers are overdue, the problem is likely on our side
* (paused or partitioned) — reset ack times to avoid false positives. */
/* Leader: detect node failures and propose NODE_FAIL. */
static void clusterRaftDetectFailures(mstime_t now) {
mstime_t node_timeout = server.cluster_node_timeout;

/* Check if a majority of peers are overdue. */
int overdue = 0, total = 0;
/* Check individual nodes. */
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
clusterNode *n = dictGetVal(de);
if (n == myself) continue;
total++;
clusterNodeRaftData *rd = RAFT_NODE(n);
if (rd->last_ack_time > 0 &&
now - rd->last_ack_time > node_timeout) {
overdue++;
}
}
dictReleaseIterator(di);

if (overdue > total / 2) {
/* Majority overdue — reset all ack times. */
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *n = dictGetVal(de);
if (n == myself) continue;
RAFT_NODE(n)->last_ack_time = now;
}
dictReleaseIterator(di);
return;
}

/* Check individual nodes. */
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node == myself) continue;
Expand Down Expand Up @@ -1739,6 +1748,11 @@ static void clusterRaftCron(void) {
clusterRaftStartElection();
}

if (rs->role == RAFT_ROLE_LEADER &&
!clusterRaftLeaderHasFreshQuorum(now)) {
clusterRaftStepDown(now, "lost quorum freshness");
}

/* Expire stale pending proposals to prevent unbounded buildup
* during prolonged partitions. Timeout covers election detection
* (1-2x node_timeout), election itself (~1x), plus margin. */
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/cluster/cluster-raft.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Test higher-level Raft cluster behavior that does not require direct
# wire-protocol interaction from Tcl.

tags {external:skip cluster singledb} {

test "Raft: leader steps down after losing quorum freshness" {
start_multiple_servers 3 {overrides {cluster-enabled yes cluster-protocol raft cluster-node-timeout 1000}} {
[srv 0 client] CLUSTER MEET [srv -1 host] [srv -1 port]
[srv 0 client] CLUSTER MEET [srv -2 host] [srv -2 port]

wait_for_condition 50 100 {
[CI 0 cluster_size] == 3 &&
[CI 1 cluster_size] == 3 &&
[CI 2 cluster_size] == 3
} else {
fail "Cluster did not form: sizes=[CI 0 cluster_size],[CI 1 cluster_size],[CI 2 cluster_size]"
}

assert_equal [CI 0 cluster_raft_role] "leader"
set leader_idx 0

set paused [list]
foreach idx {0 1 2} {
if {$idx == $leader_idx} continue
pause_process [srv [expr {-$idx}] pid]
lappend paused $idx
}

wait_for_condition 100 50 {
[CI $leader_idx cluster_raft_role] eq "follower"
} else {
foreach idx $paused {
resume_process [srv [expr {-$idx}] pid]
}
fail "Leader did not step down after losing quorum freshness: role=[CI $leader_idx cluster_raft_role] leader=[CI $leader_idx cluster_raft_leader]"
}

foreach idx $paused {
resume_process [srv [expr {-$idx}] pid]
}
}
}

} ;# tags
Loading