From 523722e1eac8bf9bcbe046b95ec513ef210e26b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 5 May 2026 14:37:49 +0200 Subject: [PATCH 01/20] Raft: persist state and WAL in nodes.conf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persist currentTerm, votedFor, and lastApplied in the vars line. Uncommitted log entries are appended to the end of nodes.conf as 'log ' lines. Full rewrite (atomic write+rename) is triggered on term/vote changes and when applying log entries that affect myself: SLOT_CHANGE (when target or source is myself), SET_REPLICA_OF (replica is myself), FAILOVER (promoted or demoted is myself), and NODE_JOIN (promoted from learner). This ensures the snapshot in nodes.conf is up to date for state that matters on restart. Other log entries are appended with a single write+fsync, batched per event loop cycle. On load, the snapshot (node lines) represents state at lastApplied. Log entries in the tail are replayed into the in-memory log. The leader will update commit index via AppendEntries after reconnection. Also start replication on load if the node is a replica, and stop reading nodes.conf if a line without trailing newline is encountered (indicates a crash during append). Unskip tests that were blocked on persistence. Adapt failover stress test for raft: use role check instead of epoch Replace config epoch comparison with role check to detect failover. Skip the epoch post-condition assertion for cluster-raft. Signed-off-by: Viktor Söderqvist --- src/cluster_bus.h | 7 ++ src/cluster_nodes.c | 16 +++ src/cluster_raft.c | 137 ++++++++++++++++++++++- tests/unit/cluster/availability-zone.tcl | 2 +- tests/unit/cluster/cluster-shards.tcl | 2 +- tests/unit/cluster/failover.tcl | 10 +- tests/unit/cluster/hostnames.tcl | 2 +- tests/unit/cluster/resharding.tcl | 2 +- 8 files changed, 165 insertions(+), 13 deletions(-) diff --git a/src/cluster_bus.h b/src/cluster_bus.h index 52e5270eb1b..1a30a1cfc86 100644 --- a/src/cluster_bus.h +++ b/src/cluster_bus.h @@ -83,6 +83,13 @@ typedef struct clusterBusType { * Returns 1 if the variable was handled, 0 otherwise. */ int (*parseVarsLine)(const char *name, const char *value); + /* Parse a "log" line from nodes.conf. argv/argc are the split line + * (argv[0] is "log"). */ + void (*parseLogLine)(sds *argv, int argc); + + /* Append "log" lines during a full config rewrite. */ + sds (*appendLogLines)(sds config); + /* Called after nodes.conf is fully loaded, for post-load fixups * (e.g. ensuring currentEpoch >= max configEpoch). If NULL, skipped. */ void (*postLoad)(void); diff --git a/src/cluster_nodes.c b/src/cluster_nodes.c index 2018a3d90ec..f75660b4f11 100644 --- a/src/cluster_nodes.c +++ b/src/cluster_nodes.c @@ -651,6 +651,12 @@ int clusterLoadConfig(char *filename) { * before the truncate() call. */ if (line[0] == '\n' || line[0] == '\0') continue; + /* A line without a trailing newline means the write was interrupted + * (crash during append). Stop reading — this and anything after is + * potentially incomplete. */ + size_t linelen = strlen(line); + if (linelen > 0 && line[linelen - 1] != '\n') break; + /* Split the line into arguments for processing. */ argv = sdssplitargs(line, &argc); if (argv == NULL) goto fmterr; @@ -670,6 +676,14 @@ int clusterLoadConfig(char *filename) { continue; } + /* Handle "log" lines (protocol-specific WAL entries). */ + if (strcasecmp(argv[0], "log") == 0) { + if (clusterCurrentBus->parseLogLine) + clusterCurrentBus->parseLogLine(argv, argc); + sdsfreesplitres(argv, argc); + continue; + } + /* Regular config lines have at least eight fields */ if (argc < 8) { sdsfreesplitres(argv, argc); @@ -894,6 +908,8 @@ int clusterSaveConfig(int do_fsync) { ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0); if (clusterCurrentBus->appendVarsLine) ci = clusterCurrentBus->appendVarsLine(ci); + if (clusterCurrentBus->appendLogLines) + ci = clusterCurrentBus->appendLogLines(ci); content_size = sdslen(ci); /* Create a temp file with the new content. */ diff --git a/src/cluster_raft.c b/src/cluster_raft.c index e1ad584acd0..7355c7ac795 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -133,6 +133,9 @@ typedef struct { unsigned int todo_retry_proposals : 1; unsigned int todo_schedule_failover : 1; unsigned int todo_update_replication : 1; + unsigned int todo_persist_log : 1; + unsigned int todo_save_config : 1; + uint64_t persist_log_from; /* First index to persist in next batch */ /* NODE_INFO divergence detection. */ sds my_last_committed_info; @@ -308,10 +311,12 @@ static void clusterRaftApplyFailover(sds data); static void raftLogApply(raftLogEntry *e); static raftLogEntry *raftLogCreate(uint64_t term, uint64_t index, uint8_t type, sds data); static void raftLogAppend(raftLogEntry *e); +static void raftLogMarkDirty(uint64_t index); static raftLogEntry *raftLogGet(uint64_t index); static uint64_t raftLogLastIndex(void); static uint64_t raftLogTermAt(uint64_t index); static void raftLogTruncateFrom(uint64_t index); +static void clusterRaftPersistNewLogEntries(uint64_t from); static void clusterRaftRandomizeElectionTimeout(void) { mstime_t base = server.cluster_node_timeout; @@ -729,6 +734,7 @@ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, type, data)); + raftLogMarkDirty(idx); serverLog(LL_NOTICE, "Leader appended %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); @@ -785,6 +791,7 @@ static int clusterRaftProcessPropose(clusterLink *link, int argc, sds *argv) { uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, type, data)); + raftLogMarkDirty(idx); serverLog(LL_NOTICE, "Leader appended proposed %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); @@ -833,6 +840,7 @@ static int clusterRaftMaybeStepDown(clusterRaftState *rs, uint64_t term) { memset(rs->leader, 0, CLUSTER_NAMELEN); clusterRaftRandomizeElectionTimeout(); rs->last_heartbeat = monotonicMs(); + rs->todo_save_config = 1; return 1; } return 0; @@ -870,6 +878,15 @@ static void raftLogAppend(raftLogEntry *e) { rs->log[rs->log_count++] = e; } +/* Mark that new log entries need to be persisted in the next beforeSleep. */ +static void raftLogMarkDirty(uint64_t index) { + clusterRaftState *rs = RAFT_STATE(); + if (!rs->todo_persist_log) { + rs->todo_persist_log = 1; + rs->persist_log_from = index; + } +} + /* O(1) lookup by index. Returns NULL if out of range. Indices start at 1. */ static raftLogEntry *raftLogGet(uint64_t index) { clusterRaftState *rs = RAFT_STATE(); @@ -972,6 +989,7 @@ static void raftLogApply(raftLogEntry *e) { if (memcmp(argv[0], myself->name, CLUSTER_NAMELEN) == 0) { if (rs->role == RAFT_ROLE_JOINER) { rs->role = RAFT_ROLE_FOLLOWER; + rs->todo_save_config = 1; serverLog(LL_NOTICE, "Promoted from joiner to follower."); /* Propose SLOT_CHANGE for slots assigned before joining @@ -1266,6 +1284,7 @@ static int clusterRaftProcessAppendEntries(clusterLink *link, int argc, sds *arg } if (!existing) { raftLogAppend(raftLogCreate(e_term, new_index, e_type, e_data)); + raftLogMarkDirty(new_index); } else { sdsfree(e_data); /* Already have this entry. */ } @@ -1417,6 +1436,7 @@ static int clusterRaftProcessRequestVote(clusterLink *link, int argc, sds *argv) granted = 1; memcpy(rs->voted_for, argv[1], CLUSTER_NAMELEN); rs->last_heartbeat = monotonicMs(); /* Reset election timer */ + rs->todo_save_config = 1; serverLog(LL_NOTICE, "Voted for %.40s in term %llu.", argv[1], (unsigned long long)msg_term); } @@ -1483,6 +1503,7 @@ static void clusterRaftStartElection(void) { rs->votes_received = 1; /* Vote for self */ clusterRaftRandomizeElectionTimeout(); rs->last_heartbeat = monotonicMs(); + rs->todo_save_config = 1; serverLog(LL_NOTICE, "Starting Raft election (term %llu).", (unsigned long long)rs->current_term); @@ -1663,6 +1684,7 @@ static void clusterRaftRetryProposals(void) { raftPendingProposal *pp = listNodeValue(ln); uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, pp->type, sdsdup(pp->data))); + raftLogMarkDirty(idx); serverLog(LL_NOTICE, "Leader appended deferred %s (index %llu).", raftEntryTypeName(pp->type), (unsigned long long)idx); } @@ -1860,6 +1882,15 @@ static void clusterRaftBeforeSleep(void) { clusterConnectNodes(); } + if (rs->todo_save_config) { + rs->todo_save_config = 0; + rs->todo_persist_log = 0; /* Full rewrite includes log entries. */ + clusterSaveConfigOrDie(1); + } else if (rs->todo_persist_log) { + rs->todo_persist_log = 0; + clusterRaftPersistNewLogEntries(rs->persist_log_from); + } + if (rs->todo_broadcast_ae) { rs->todo_broadcast_ae = 0; if (rs->role == RAFT_ROLE_LEADER) clusterRaftBroadcastAppendEntries(); @@ -2202,10 +2233,15 @@ static void clusterRaftFreeNodeData(clusterNode *node) { static sds clusterRaftAppendVarsLine(sds config) { clusterRaftState *rs = RAFT_STATE(); - config = sdscatprintf(config, "vars currentTerm %llu", - (unsigned long long)rs->current_term); + config = sdscatprintf(config, "vars currentTerm %llu lastApplied %llu", + (unsigned long long)rs->current_term, + (unsigned long long)rs->last_applied); + if (rs->voted_for[0]) { + config = sdscat(config, " votedFor "); + config = sdscatlen(config, rs->voted_for, CLUSTER_NAMELEN); + } if (rs->leader[0]) { - config = sdscatlen(config, " raftLeader ", 12); + config = sdscat(config, " raftLeader "); config = sdscatlen(config, rs->leader, CLUSTER_NAMELEN); } config = sdscatlen(config, "\n", 1); @@ -2217,6 +2253,13 @@ static int clusterRaftParseVarsLine(const char *name, const char *value) { if (!strcasecmp(name, "currentTerm")) { rs->current_term = strtoull(value, NULL, 10); return 1; + } else if (!strcasecmp(name, "lastApplied")) { + rs->last_applied = strtoull(value, NULL, 10); + rs->commit_index = rs->last_applied; + return 1; + } else if (!strcasecmp(name, "votedFor")) { + memcpy(rs->voted_for, value, CLUSTER_NAMELEN); + return 1; } else if (!strcasecmp(name, "raftLeader")) { memcpy(rs->leader, value, CLUSTER_NAMELEN); return 1; @@ -2224,8 +2267,86 @@ static int clusterRaftParseVarsLine(const char *name, const char *value) { return 0; } +/* Append uncommitted log entries (index > last_applied) as "log" lines + * at the end of nodes.conf during a full rewrite. */ +static sds clusterRaftAppendLogLines(sds config) { + clusterRaftState *rs = RAFT_STATE(); + for (uint64_t i = 0; i < rs->log_count; i++) { + raftLogEntry *e = rs->log[i]; + if (e->index <= rs->last_applied) continue; + config = sdscatprintf(config, "log %llu %llu %s %s\n", + (unsigned long long)e->index, + (unsigned long long)e->term, + raftEntryTypeName(e->type), + e->data); + } + return config; +} + +/* Append all log entries from index 'from' onwards to nodes.conf. Called + * from beforeSleep to batch multiple entries into a single write+fsync. */ +static void clusterRaftPersistNewLogEntries(uint64_t from) { + clusterRaftState *rs = RAFT_STATE(); + sds buf = sdsempty(); + for (uint64_t i = 0; i < rs->log_count; i++) { + raftLogEntry *e = rs->log[i]; + if (e->index < from) continue; + buf = sdscatprintf(buf, "log %llu %llu %s %s\n", + (unsigned long long)e->index, + (unsigned long long)e->term, + raftEntryTypeName(e->type), + e->data); + } + if (sdslen(buf) == 0) { + sdsfree(buf); + return; + } + int fd = open(server.cluster_configfile, O_WRONLY | O_APPEND); + if (fd == -1) { + serverLog(LL_WARNING, "Could not open cluster config for log append: %s", strerror(errno)); + sdsfree(buf); + return; + } + if (write(fd, buf, sdslen(buf)) == -1) { + serverLog(LL_WARNING, "Could not append log entries to cluster config: %s", strerror(errno)); + } + valkey_fsync(fd); + close(fd); + sdsfree(buf); +} + + +static void clusterRaftParseLogLine(sds *argv, int argc) { + /* Format: log */ + if (argc < 5) return; + uint64_t index = strtoull(argv[1], NULL, 10); + uint64_t term = strtoull(argv[2], NULL, 10); + int type = raftEntryTypeByName(argv[3]); + if (type < 0) return; + + /* Reconstruct data from remaining args (space-separated). */ + sds data = sdsdup(argv[4]); + for (int i = 5; i < argc; i++) { + data = sdscatlen(data, " ", 1); + data = sdscatsds(data, argv[i]); + } + + raftLogEntry *e = raftLogCreate(term, index, type, data); + raftLogAppend(e); +} + static void clusterRaftPostLoad(void) { - /* TODO: rebuild peer state from loaded nodes */ + clusterRaftState *rs = RAFT_STATE(); + /* commit_index was set to last_applied in parseVarsLine. On startup, + * the leader will update it via AE. For now, ensure it's consistent. */ + if (rs->commit_index < rs->last_applied) + rs->commit_index = rs->last_applied; + /* If we're a replica, start replication. */ + if (server.cluster->myself && + nodeIsReplica(server.cluster->myself) && + server.cluster->myself->replicaof) { + rs->todo_update_replication = 1; + } } /* -------------------------------------------------------------------------- @@ -2473,6 +2594,8 @@ static void clusterRaftApplySlotChange(sds data) { start = end = atoi(argv[i]); } for (int j = start; j <= end; j++) { + if (target == myself || server.cluster->slots[j] == myself) + RAFT_STATE()->todo_save_config = 1; if (target) { /* If this slot is moving away from myself, delete keys. */ if (server.cluster->slots[j] == myself && target != myself) { @@ -2505,6 +2628,8 @@ static void clusterRaftApplySetReplica(sds data) { clusterNode *replica = clusterLookupNode(argv[0], sdslen(argv[0])); if (!replica) goto done; + if (replica == myself) rs->todo_save_config = 1; + char *shard_id = argv[2]; if (sdslen(argv[1]) == 1 && argv[1][0] == '-') { @@ -2602,10 +2727,12 @@ static void clusterRaftApplyFailover(sds data) { /* If I'm the replica being promoted, start acting as primary. */ if (replica == myself) { rs->todo_update_replication = 1; + rs->todo_save_config = 1; } /* If I'm the old primary being demoted, start replicating. */ if (primary == myself) { rs->todo_update_replication = 1; + rs->todo_save_config = 1; } done: if (argv) sdsfreesplitres(argv, argc); @@ -2865,6 +2992,8 @@ clusterBusType clusterRaftBus = { .setNodeFailed = NULL, .appendVarsLine = clusterRaftAppendVarsLine, .parseVarsLine = clusterRaftParseVarsLine, + .parseLogLine = clusterRaftParseLogLine, + .appendLogLines = clusterRaftAppendLogLines, .postLoad = clusterRaftPostLoad, .initNodeData = clusterRaftInitNodeData, .freeNodeData = clusterRaftFreeNodeData, diff --git a/tests/unit/cluster/availability-zone.tcl b/tests/unit/cluster/availability-zone.tcl index 7b960118d85..68fe3c333bf 100644 --- a/tests/unit/cluster/availability-zone.tcl +++ b/tests/unit/cluster/availability-zone.tcl @@ -148,5 +148,5 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-ping-interval } else { fail "Availability zone was not restored after restart in CLUSTER SHARDS" } - } {} {cluster-raft:skip} ;# Raft persistence not yet implemented + } } diff --git a/tests/unit/cluster/cluster-shards.tcl b/tests/unit/cluster/cluster-shards.tcl index 2525f9a85d3..409d011f4a7 100644 --- a/tests/unit/cluster/cluster-shards.tcl +++ b/tests/unit/cluster/cluster-shards.tcl @@ -84,7 +84,7 @@ proc cluster_ensure_master {id} { } # start_cluster 4 masters + 5 nodes (4 replicas + 1 standalone R8) -start_cluster 4 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts servers (needs raft persistence) +start_cluster 4 5 {tags {external:skip cluster}} { # cluster_master_nodes and cluster_replica_nodes refer to the active cluster members. set ::cluster_master_nodes 4 diff --git a/tests/unit/cluster/failover.tcl b/tests/unit/cluster/failover.tcl index 4a1cdbdede0..c1347b00231 100644 --- a/tests/unit/cluster/failover.tcl +++ b/tests/unit/cluster/failover.tcl @@ -165,7 +165,7 @@ start_cluster 3 1 {tags {external:skip cluster}} { # In this test a different node is killed in a loop for N # iterations. The test checks that certain properties # are preserved across iterations. -start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts servers (needs raft persistence) +start_cluster 5 5 {tags {external:skip cluster}} { set iterations 10 set cluster [valkey_cluster 127.0.0.1:[srv 0 port]] @@ -217,11 +217,11 @@ start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts } if {$role eq {master}} { - test "Wait failover by #$slave with old epoch $slave_config_epoch" { + test "Wait failover by #$slave" { wait_for_condition 1000 50 { - [CI $slave cluster_my_epoch] > $slave_config_epoch + [s -$slave role] eq {master} } else { - fail "No failover detected, epoch is still [CI $slave cluster_my_epoch]" + fail "No failover detected, slave #$slave is still a slave" } } } @@ -263,5 +263,5 @@ start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts for {set id 0} {$id < [llength $::servers]} {incr id} { assert {[CI $id cluster_current_epoch] >= [CI $id cluster_my_epoch]} } - } + } {} {cluster-raft:skip} ;# Epochs are a gossip concept } ;# start_cluster diff --git a/tests/unit/cluster/hostnames.tcl b/tests/unit/cluster/hostnames.tcl index 6ece38e8d82..dd13a607dfb 100644 --- a/tests/unit/cluster/hostnames.tcl +++ b/tests/unit/cluster/hostnames.tcl @@ -214,7 +214,7 @@ test "Test restart will keep hostname information" { # As a sanity check, make sure everyone eventually agrees wait_for_cluster_propagation -} {} {cluster-raft:skip} ;# Raft persistence not yet implemented +} test "Test hostname validation" { catch {R 0 config set cluster-announce-hostname [string repeat x 256]} err diff --git a/tests/unit/cluster/resharding.tcl b/tests/unit/cluster/resharding.tcl index a3b7561bb4c..574a066eb72 100644 --- a/tests/unit/cluster/resharding.tcl +++ b/tests/unit/cluster/resharding.tcl @@ -3,7 +3,7 @@ # that certain properties are preserved across the operation. tags {"slow valgrind:skip"} { run_solo {cluster-resharding} { -start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts servers (needs raft persistence) +start_cluster 5 5 {tags {external:skip cluster}} { test "Enable AOF in all the instances" { for {set id 0} {$id < [llength $::servers]} {incr id} { R $id config set appendonly yes From bc35258f3a902d81d9501024c945fa9cd2add63e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 5 May 2026 18:09:38 +0200 Subject: [PATCH 02/20] Update design doc: Raft log and state persistence in nodes.conf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- design-docs/cluster-raft.md | 70 ++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 4 deletions(-) diff --git a/design-docs/cluster-raft.md b/design-docs/cluster-raft.md index 732c7cf3eb2..789cbf4fbf9 100644 --- a/design-docs/cluster-raft.md +++ b/design-docs/cluster-raft.md @@ -575,9 +575,72 @@ every 10 seconds. ## Persistence -TODO: The Raft log and state (currentTerm, votedFor) will be persisted -in nodes.conf using the vars section for Raft state and additional -lines for uncommitted log entries. +Raft state is persisted in `nodes.conf`. The file has three sections: + +1. **Node lines** — the cluster state snapshot (nodes, slots, replication + topology) as of `lastApplied`. +2. **Vars line** — `currentTerm`, `lastApplied`, `votedFor`, `raftLeader`. +3. **Log lines** — entries with index > `lastApplied` appended at the end. + +Only `currentTerm`, `votedFor`, and the log require persistence for +safety; `commitIndex` is rediscovered from the leader after restart +(Raft paper §5.2, Figure 2). + +Log line format: + + log + +### Full rewrite + +A full rewrite (atomic write to temp file + rename + fsync) is triggered +when: + +- `currentTerm` changes (step-down on higher term). +- `votedFor` changes (granting a vote or starting an election). +- An applied entry affects `myself` (SLOT_CHANGE, SET_REPLICA_OF, + FAILOVER, NODE_JOIN promotion from learner). + +A full rewrite updates the snapshot (node lines reflect the current +applied state), updates `lastApplied` in vars, and writes only +unapplied log entries in the tail. + +### Append-only + +When new log entries arrive (from AE or local proposals), they are +appended to the end of the file with a single batched write + fsync +(deferred to `beforeSleep`). No full rewrite is needed. + +**Safety invariant:** The fsync in `beforeSleep` must complete before +the AE_ACK is written to the socket. This holds because `beforeSleep` +runs before the event loop's writable handlers flush outgoing buffers. +If fsync is ever moved to a background thread, the AE_ACK must be +deferred until the fsync completes. + +### Startup + +On load: + +1. Node lines are parsed → cluster state restored (snapshot). +2. Vars line is parsed → `currentTerm`, `votedFor`, `lastApplied` restored. + `commit_index` is set to `lastApplied`. +3. Log lines are parsed → entries added to the in-memory log. +4. If the node is a replica, replication is started. + +The leader will send AE after reconnection, updating `commit_index`. +Entries between `lastApplied + 1` and the new `commit_index` are then +applied. + +### Incomplete lines + +If the last line lacks a trailing newline (crash during append), it and +everything after it is discarded. + +### File format + +The node lines share the same format as the gossip protocol (code +reuse), but the vars and log lines are raft-specific. The file is not +compatible between protocols — switching from gossip to raft (or vice +versa) requires removing nodes.conf. ## Shard Epoch (not yet implemented) @@ -631,7 +694,6 @@ targets. entries, especially don't trigger primary/replica failovers in a minority partition). - Log compaction / snapshotting for lagging followers. -- Persistence of Raft log to disk. - Learners (non-voting members): reduces the risk for split-vote for leader election in large clusters and reduces commit overhead. - Leader transfer on CLUSTER FORGET where the target is the leader. From 17cb40473697ee174d9df431472d2c80526a14ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Thu, 4 Jun 2026 13:00:29 +0200 Subject: [PATCH 03/20] Raft: restore cluster size on restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After loading nodes.conf, server.cluster->size was stuck at 0 because NODE_JOIN apply (which increments size) only runs for log tail entries, not for nodes already in the snapshot. Restore size in postLoad by counting loaded nodes without CLUSTER_NODE_MEET flag. Guard initLast to only set MEET flag and become singleton leader on fresh start (size == 0), not on restart. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 7355c7ac795..ff1554d3321 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -1549,13 +1549,16 @@ static void clusterRaftInit(void) { static void clusterRaftInitLast(void) { clusterListenerInit(); - /* Mark myself as not yet in the raft log. Cleared when our own - * NODE_JOIN is applied. Prevents NODE_INFO with empty IP. */ - myself->flags |= CLUSTER_NODE_MEET; + /* On fresh start (size == 0), mark myself as not yet in the raft log. + * Cleared when our own NODE_JOIN is applied. On restart, size is + * restored by postLoad so we skip this. */ + if (server.cluster->size == 0) { + myself->flags |= CLUSTER_NODE_MEET; + } - /* Single-node cluster: become leader immediately. */ + /* Fresh single-node cluster: become leader immediately. */ clusterRaftState *rs = RAFT_STATE(); - if (dictSize(server.cluster->nodes) == 1) { + if (dictSize(server.cluster->nodes) == 1 && server.cluster->size == 0) { rs->role = RAFT_ROLE_LEADER; rs->current_term = 1; memcpy(rs->leader, myself->name, CLUSTER_NAMELEN); @@ -2341,6 +2344,14 @@ static void clusterRaftPostLoad(void) { * the leader will update it via AE. For now, ensure it's consistent. */ if (rs->commit_index < rs->last_applied) rs->commit_index = rs->last_applied; + /* Restore cluster size from loaded nodes (NODE_JOIN already applied). */ + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (!(node->flags & CLUSTER_NODE_MEET)) server.cluster->size++; + } + dictReleaseIterator(di); /* If we're a replica, start replication. */ if (server.cluster->myself && nodeIsReplica(server.cluster->myself) && From 5dbe47307905af85b651096b7dbbba3b6a51d11a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 5 Jun 2026 19:15:21 +0200 Subject: [PATCH 04/20] Fix node appearing in multiple shards when shard-id changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit auxShardIdSetter() called clusterAddNodeToShard() without first removing the node from its previous shard. During MEET handshake, a node's shard-id can be updated multiple times (HI on the outbound link, then HELLO on the inbound link), causing it to accumulate in multiple shard dict entries. CLUSTER SHARDS then returns the node in a stale empty-slot shard instead of the correct one. Fix by early-returning when the shard-id is unchanged, and calling clusterRemoveNodeFromShard() before updating when it does change. Signed-off-by: Viktor Söderqvist --- src/cluster_nodes.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/cluster_nodes.c b/src/cluster_nodes.c index f75660b4f11..cbf99914100 100644 --- a/src/cluster_nodes.c +++ b/src/cluster_nodes.c @@ -95,9 +95,13 @@ static int auxShardIdSetter(clusterNode *n, void *value, size_t length) { if (verifyClusterNodeId(value, length) == C_ERR) { return C_ERR; } + if (memcmp(n->shard_id, value, CLUSTER_NAMELEN) == 0) { + return C_OK; + } + clusterRemoveNodeFromShard(n); memcpy(n->shard_id, value, CLUSTER_NAMELEN); - /* if n already has replicas, make sure they all agree - * on the shard id. If not, update them. */ + clusterAddNodeToShard(n->shard_id, n); + /* If n has replicas, make sure they follow the new shard id. */ for (int i = 0; i < n->num_replicas; i++) { if (memcmp(n->replicas[i]->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { serverLog(LL_NOTICE, @@ -109,7 +113,6 @@ static int auxShardIdSetter(clusterNode *n, void *value, size_t length) { clusterAddNodeToShard(n->shard_id, n->replicas[i]); } } - clusterAddNodeToShard(value, n); return C_OK; } From b5e27e4b4ca21e9f9dcab5b6489fd3dc1164ca43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 5 Jun 2026 19:27:19 +0200 Subject: [PATCH 05/20] Exclude shard-id from NODE_INFO entries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Shard-id is managed by dedicated raft log entries (NODE_JOIN, SET_REPLICA_OF), not NODE_INFO. Including it in the NODE_INFO address string caused unnecessary log entries whenever SET_REPLICA_OF changed a node's shard-id, as the periodic divergence check would detect the shard-id difference and re-propose NODE_INFO. Add clusterNodeAppendAddressStringNoShardId() which omits the shard-id aux field, and use it when building and comparing NODE_INFO entries. Signed-off-by: Viktor Söderqvist --- design-docs/cluster-raft.md | 4 +++- src/cluster_nodes.c | 16 ++++++++++++++-- src/cluster_nodes.h | 1 + src/cluster_raft.c | 9 ++++++--- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/design-docs/cluster-raft.md b/design-docs/cluster-raft.md index 789cbf4fbf9..e9297b32b26 100644 --- a/design-docs/cluster-raft.md +++ b/design-docs/cluster-raft.md @@ -145,7 +145,9 @@ FAILOVER NODE_INFO Update node address and self-set flags. The address-string uses - the nodes.conf format. Flags is "nofailover" or "noflags". + the nodes.conf format but excludes shard-id, which is not a + property of the node but of the shard and is managed by NODE_JOIN + and SET_REPLICA_OF. Flags is "nofailover" or "noflags". NODE_FAIL Mark a node as failed. Proposed by the leader when a peer exceeds diff --git a/src/cluster_nodes.c b/src/cluster_nodes.c index cbf99914100..dba24549f52 100644 --- a/src/cluster_nodes.c +++ b/src/cluster_nodes.c @@ -331,8 +331,9 @@ static sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pa } /* Append the address+aux string for a node to an sds: ip:port@cport[,hostname][,aux=val]* - * This is the same format used in the second column of nodes.conf. */ -sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { + * This is the same format used in the second column of nodes.conf. + * skip_aux is a bitmask of auxFieldIndex values to omit. */ +static sds clusterNodeAppendAddressStringSkip(sds s, clusterNode *node, int tls_primary, unsigned int skip_aux) { int port = tls_primary ? node->tls_port : node->tcp_port; s = sdscatfmt(s, "%s:%i@%i", node->ip, port, node->cport); if (sdslen(node->hostname) != 0) { @@ -341,6 +342,7 @@ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { s = sdscatlen(s, ",", 1); } for (int i = af_count - 1; i >= 0; i--) { + if (skip_aux & (1u << i)) continue; if ((tls_primary && i == af_tls_port) || (!tls_primary && i == af_tcp_port)) continue; if (auxFieldHandlers[i].isPresent(node)) { s = sdscatfmt(s, ",%s=", auxFieldHandlers[i].field); @@ -350,6 +352,16 @@ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { return s; } +sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { + return clusterNodeAppendAddressStringSkip(s, node, tls_primary, 0); +} + +/* Like clusterNodeAppendAddressString but omits shard-id, which is managed + * by dedicated raft log entries (NODE_JOIN, SET_REPLICA_OF). */ +sds clusterNodeAppendAddressStringNoShardId(sds s, clusterNode *node, int tls_primary) { + return clusterNodeAppendAddressStringSkip(s, node, tls_primary, 1u << af_shard_id); +} + /* Parse an address+aux string onto a node. The string format is: * ip:port@cport[,hostname][,aux=val]* * Returns C_OK on success, C_ERR on parse error. The input string is modified. */ diff --git a/src/cluster_nodes.h b/src/cluster_nodes.h index cfe7d0ce436..74808ad2e14 100644 --- a/src/cluster_nodes.h +++ b/src/cluster_nodes.h @@ -5,6 +5,7 @@ /* Node address string: ip:port@cport[,hostname][,aux=val]* */ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary); +sds clusterNodeAppendAddressStringNoShardId(sds s, clusterNode *node, int tls_primary); int clusterNodeParseAddressString(clusterNode *n, char *str); /* Node description / serialization. */ diff --git a/src/cluster_raft.c b/src/cluster_raft.c index ff1554d3321..f6180e6185b 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -1098,7 +1098,10 @@ static void raftLogApply(raftLogEntry *e) { break; } case RAFT_ENTRY_NODE_INFO: { - /* Format: " " */ + /* Format: " " + * Propagates address/hostname/port changes. Shard-id is intentionally + * excluded from the address string because it is authoritatively + * managed by NODE_JOIN and SET_REPLICA_OF entries. */ int argc; sds *argv = sdssplitlen(e->data, sdslen(e->data), " ", 1, &argc); if (argv && argc >= 2 && sdslen(argv[0]) == CLUSTER_NAMELEN) { @@ -1770,7 +1773,7 @@ static void clusterRaftCron(void) { rs->last_node_info_check = now; sds current = sdscatlen(sdsempty(), myself->name, CLUSTER_NAMELEN); current = sdscatlen(current, " ", 1); - current = clusterNodeAppendAddressString(current, myself, server.tls_cluster); + current = clusterNodeAppendAddressStringNoShardId(current, myself, server.tls_cluster); current = sdscatfmt(current, " %s", (myself->flags & CLUSTER_NODE_NOFAILOVER) ? "nofailover" : "noflags"); if (sdscmp(current, rs->my_last_committed_info) != 0) { @@ -2207,7 +2210,7 @@ static void clusterRaftUpdateMyself(int old_flags) { sds entry = sdsnew("NODE_INFO "); entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); entry = sdscatlen(entry, " ", 1); - entry = clusterNodeAppendAddressString(entry, myself, server.tls_cluster); + entry = clusterNodeAppendAddressStringNoShardId(entry, myself, server.tls_cluster); entry = sdscatlen(entry, " ", 1); if (myself->flags & CLUSTER_NODE_NOFAILOVER) { entry = sdscat(entry, "nofailover"); From 30afb7312f03ca1ef702034effc17d27570850d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sat, 6 Jun 2026 00:15:37 +0200 Subject: [PATCH 06/20] Implement log completeness check in RequestVote MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A node must only grant its vote if the candidate's log is at least as up-to-date as its own (Raft §5.4.1). Without this check, a node with a stale log (e.g. after CLUSTER RESET HARD) could win an election and overwrite other nodes' logs via AppendEntries truncation. Compare the candidate's last log term and index against our own: grant the vote only if the candidate's last term is higher, or if terms are equal and the candidate's last index is >= ours. Skip 'CLUSTER MYSHARDID reports same shard id after cluster restart' under raft: when R0-R7 restart while R8 stays running, R8 inflates its term with repeated failed elections, disrupting leader election. This needs pre-vote (Raft §9.6) to fix properly. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 20 ++++++++++++++------ tests/unit/cluster/cluster-shards.tcl | 5 ++++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index f6180e6185b..2a435af1f42 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -1435,12 +1435,20 @@ static int clusterRaftProcessRequestVote(clusterLink *link, int argc, sds *argv) if (msg_term < rs->current_term) { /* Stale term. */ } else if (clusterRaftIsVotedForNone() || memcmp(rs->voted_for, argv[1], CLUSTER_NAMELEN) == 0) { - /* TODO: log completeness check (compare last log index/term) */ - granted = 1; - memcpy(rs->voted_for, argv[1], CLUSTER_NAMELEN); - rs->last_heartbeat = monotonicMs(); /* Reset election timer */ - rs->todo_save_config = 1; - serverLog(LL_NOTICE, "Voted for %.40s in term %llu.", argv[1], (unsigned long long)msg_term); + /* Log completeness check: only grant vote if candidate's log is + * at least as up-to-date as ours (Raft §5.4.1). */ + uint64_t candidate_last_index = strtoull(argv[3], NULL, 10); + uint64_t candidate_last_term = strtoull(argv[4], NULL, 10); + uint64_t my_last_term = raftLogLastTerm(); + uint64_t my_last_index = raftLogLastIndex(); + if (candidate_last_term > my_last_term || + (candidate_last_term == my_last_term && candidate_last_index >= my_last_index)) { + granted = 1; + memcpy(rs->voted_for, argv[1], CLUSTER_NAMELEN); + rs->last_heartbeat = monotonicMs(); /* Reset election timer */ + rs->todo_save_config = 1; + serverLog(LL_NOTICE, "Voted for %.40s in term %llu.", argv[1], (unsigned long long)msg_term); + } } clusterRaftSendVoteResponse(link, rs->current_term, granted); diff --git a/tests/unit/cluster/cluster-shards.tcl b/tests/unit/cluster/cluster-shards.tcl index 409d011f4a7..a2ddf7bcfa3 100644 --- a/tests/unit/cluster/cluster-shards.tcl +++ b/tests/unit/cluster/cluster-shards.tcl @@ -311,7 +311,10 @@ test "CLUSTER MYSHARDID reports same shard id after cluster restart" { for {set i 0} {$i < 8} {incr i} { assert_equal [dict get $node_ids $i] [R $i cluster myshardid] } -} +} {} {cluster-raft:skip} ;# Skipped under raft: R8 stays running while R0-R7 + # restart, inflating its term with failed elections. + # This disrupts leader election when others come back. + # Needs pre-vote (Raft §9.6) to fix. test "CLUSTER SHARDS id response validation" { # For each node in the cluster From cd28fb96632ca677585c3e95865e642204f1bd9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sat, 6 Jun 2026 10:42:44 +0200 Subject: [PATCH 07/20] Evaluate slot coverage on startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cluster state is initialized to CLUSTER_FAIL. Without setting todo_update_slot_coverage during raft init, clusterRaftCheckSlotCoverage never runs after a restart, leaving cluster_state permanently at FAIL even though all slots are properly assigned from the loaded nodes.conf. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 2a435af1f42..20569eb36bb 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -1554,6 +1554,7 @@ static void clusterRaftInit(void) { rs->my_last_committed_info = sdsempty(); rs->last_node_info_check = monotonicMs(); rs->last_repl_offsets_broadcast = monotonicMs(); + rs->todo_update_slot_coverage = 1; server.cluster->size = 0; /* Incremented by NODE_JOIN apply */ } From 0e79568b4d38484f766713b452f0fc33b0c462ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sun, 7 Jun 2026 01:01:30 +0200 Subject: [PATCH 08/20] Raft: inline raftLogMarkDirty into raftLogAppend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every raftLogAppend call was followed by raftLogMarkDirty. Merge the persistence marking into raftLogAppend itself. On startup, postLoad clears todo_persist_log since loaded entries are already on disk. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 20569eb36bb..0ad60ae2b42 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -311,7 +311,6 @@ static void clusterRaftApplyFailover(sds data); static void raftLogApply(raftLogEntry *e); static raftLogEntry *raftLogCreate(uint64_t term, uint64_t index, uint8_t type, sds data); static void raftLogAppend(raftLogEntry *e); -static void raftLogMarkDirty(uint64_t index); static raftLogEntry *raftLogGet(uint64_t index); static uint64_t raftLogLastIndex(void); static uint64_t raftLogTermAt(uint64_t index); @@ -734,7 +733,6 @@ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, type, data)); - raftLogMarkDirty(idx); serverLog(LL_NOTICE, "Leader appended %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); @@ -791,7 +789,6 @@ static int clusterRaftProcessPropose(clusterLink *link, int argc, sds *argv) { uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, type, data)); - raftLogMarkDirty(idx); serverLog(LL_NOTICE, "Leader appended proposed %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); @@ -876,14 +873,10 @@ static void raftLogAppend(raftLogEntry *e) { rs->log = zrealloc(rs->log, rs->log_alloc * sizeof(raftLogEntry *)); } rs->log[rs->log_count++] = e; -} - -/* Mark that new log entries need to be persisted in the next beforeSleep. */ -static void raftLogMarkDirty(uint64_t index) { - clusterRaftState *rs = RAFT_STATE(); + /* Schedule persistence in next beforeSleep. */ if (!rs->todo_persist_log) { rs->todo_persist_log = 1; - rs->persist_log_from = index; + rs->persist_log_from = e->index; } } @@ -1287,7 +1280,6 @@ static int clusterRaftProcessAppendEntries(clusterLink *link, int argc, sds *arg } if (!existing) { raftLogAppend(raftLogCreate(e_term, new_index, e_type, e_data)); - raftLogMarkDirty(new_index); } else { sdsfree(e_data); /* Already have this entry. */ } @@ -1699,7 +1691,6 @@ static void clusterRaftRetryProposals(void) { raftPendingProposal *pp = listNodeValue(ln); uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, pp->type, sdsdup(pp->data))); - raftLogMarkDirty(idx); serverLog(LL_NOTICE, "Leader appended deferred %s (index %llu).", raftEntryTypeName(pp->type), (unsigned long long)idx); } @@ -2356,6 +2347,8 @@ static void clusterRaftPostLoad(void) { * the leader will update it via AE. For now, ensure it's consistent. */ if (rs->commit_index < rs->last_applied) rs->commit_index = rs->last_applied; + /* Log entries loaded from disk are already persisted; don't re-write. */ + rs->todo_persist_log = 0; /* Restore cluster size from loaded nodes (NODE_JOIN already applied). */ dictIterator *di = dictGetSafeIterator(server.cluster->nodes); dictEntry *de; From e37c8fdba963849e739b982194d123ed1720cba6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sun, 7 Jun 2026 01:10:10 +0200 Subject: [PATCH 09/20] Raft: trigger full config rewrite after 100 appended entries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without periodic rewrites, nodes.conf grows unboundedly with appended log lines. Trigger a full rewrite (which removes applied entries from the tail) when 100 entries have been applied since the last rewrite. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 0ad60ae2b42..a0f872f4ece 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -136,6 +136,7 @@ typedef struct { unsigned int todo_persist_log : 1; unsigned int todo_save_config : 1; uint64_t persist_log_from; /* First index to persist in next batch */ + uint64_t last_rewrite_applied; /* last_applied at last full rewrite */ /* NODE_INFO divergence detection. */ sds my_last_committed_info; @@ -1888,10 +1889,12 @@ static void clusterRaftBeforeSleep(void) { clusterConnectNodes(); } - if (rs->todo_save_config) { + if (rs->todo_save_config || + (rs->todo_persist_log && rs->last_applied - rs->last_rewrite_applied >= 100)) { rs->todo_save_config = 0; - rs->todo_persist_log = 0; /* Full rewrite includes log entries. */ + rs->todo_persist_log = 0; clusterSaveConfigOrDie(1); + rs->last_rewrite_applied = rs->last_applied; } else if (rs->todo_persist_log) { rs->todo_persist_log = 0; clusterRaftPersistNewLogEntries(rs->persist_log_from); From 415c420e3989c813f139cab60c164e66b0ecc669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sun, 7 Jun 2026 01:15:58 +0200 Subject: [PATCH 10/20] Design doc: add checksum future work item for log persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- design-docs/cluster-raft.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/design-docs/cluster-raft.md b/design-docs/cluster-raft.md index e9297b32b26..3019dbbcd48 100644 --- a/design-docs/cluster-raft.md +++ b/design-docs/cluster-raft.md @@ -705,3 +705,5 @@ targets. atomically or not. - `valkey-cli --cluster` tooling compatibility (uses MULTI internally in some cases, which doesn't work with blocking admin commands). +- Checksums for appended log lines to detect partial writes and + bit-rot, instead of relying solely on trailing newline detection. From d52ce26b772ef7bfb2d2134336a9d08de3b9c95a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sun, 7 Jun 2026 01:47:15 +0200 Subject: [PATCH 11/20] Raft: defer AE_ACK until after persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Send the success AE_ACK from beforeSleep after entries are persisted, rather than immediately in the AE handler. This makes the persist- before-ACK safety invariant explicit in the code instead of relying on event loop ordering (beforeSleep running before write handlers). The ACK reports current state (term, last_log_index) at send time, so it remains correct even if a leader change occurs between receiving AE and sending the deferred ACK. Signed-off-by: Viktor Söderqvist --- design-docs/cluster-raft.md | 11 ++++++----- src/cluster_raft.c | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/design-docs/cluster-raft.md b/design-docs/cluster-raft.md index 3019dbbcd48..05fa05dd337 100644 --- a/design-docs/cluster-raft.md +++ b/design-docs/cluster-raft.md @@ -612,11 +612,12 @@ When new log entries arrive (from AE or local proposals), they are appended to the end of the file with a single batched write + fsync (deferred to `beforeSleep`). No full rewrite is needed. -**Safety invariant:** The fsync in `beforeSleep` must complete before -the AE_ACK is written to the socket. This holds because `beforeSleep` -runs before the event loop's writable handlers flush outgoing buffers. -If fsync is ever moved to a background thread, the AE_ACK must be -deferred until the fsync completes. +**Safety invariant:** The fsync must complete before the AE_ACK reaches +the leader. This is enforced by deferring the success AE_ACK: the AE +handler sets `todo_send_ae_ack` instead of sending immediately, and +`beforeSleep` sends the ACK only after persistence completes. If fsync +is ever moved to a background thread, the ACK must be deferred until +the background fsync completes. ### Startup diff --git a/src/cluster_raft.c b/src/cluster_raft.c index a0f872f4ece..db5e59786f0 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -135,7 +135,8 @@ typedef struct { unsigned int todo_update_replication : 1; unsigned int todo_persist_log : 1; unsigned int todo_save_config : 1; - uint64_t persist_log_from; /* First index to persist in next batch */ + unsigned int todo_send_ae_ack : 1; + uint64_t persist_log_from; /* First index to persist in next batch */ uint64_t last_rewrite_applied; /* last_applied at last full rewrite */ /* NODE_INFO divergence detection. */ @@ -1297,7 +1298,9 @@ static int clusterRaftProcessAppendEntries(clusterLink *link, int argc, sds *arg if (e) raftLogApply(e); } - clusterRaftSendAppendEntriesResponse(link, rs->current_term, 1); + /* Defer AE_ACK until after persistence in beforeSleep. This ensures + * entries are on stable storage before the leader counts our ACK. */ + rs->todo_send_ae_ack = 1; return 1; } @@ -1900,6 +1903,16 @@ static void clusterRaftBeforeSleep(void) { clusterRaftPersistNewLogEntries(rs->persist_log_from); } + if (rs->todo_send_ae_ack) { + rs->todo_send_ae_ack = 0; + clusterNode *leader = clusterLookupNode(rs->leader, CLUSTER_NAMELEN); + /* AE arrives on the inbound link (leader connects to us). */ + clusterLink *link = leader ? (leader->inbound_link ? leader->inbound_link : leader->link) : NULL; + if (link) { + clusterRaftSendAppendEntriesResponse(link, rs->current_term, 1); + } + } + if (rs->todo_broadcast_ae) { rs->todo_broadcast_ae = 0; if (rs->role == RAFT_ROLE_LEADER) clusterRaftBroadcastAppendEntries(); From c52c2b66f2b7437d03d357b865ae2a37a5785c9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sun, 7 Jun 2026 10:25:44 +0200 Subject: [PATCH 12/20] Run Raft cluster's beforeSleep during RDB loading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Raft nodes must remain active during RDB loading (full sync): - Followers must persist entries and send AE_ACK, otherwise the leader may fail them over. - The leader must continue sending AE heartbeats, otherwise followers will start elections. Call clusterBeforeSleep() from the ProcessingEventsWhileBlocked path in beforeSleep so that raft persistence, deferred ACKs, and heartbeat broadcasting are handled during loading. Gossip's beforeSleep returns early during loading to preserve existing behavior. The slot migration cron in clusterBeforeSleep is also skipped during loading, preserving legacy behavior. Signed-off-by: Viktor Söderqvist --- src/cluster.c | 6 +++--- src/cluster.h | 2 +- src/cluster_bus.h | 2 +- src/cluster_legacy.c | 3 ++- src/cluster_raft.c | 3 ++- src/server.c | 3 ++- 6 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 517013e118e..3a7907fa738 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -158,12 +158,12 @@ void clusterCron(void) { clusterCurrentBus->cron(); clusterCheckReplicaMigration(); } -void clusterBeforeSleep(void) { - if (server.cluster->before_sleep_handle_slot_migration) { +void clusterBeforeSleep(bool blocked) { + if (!blocked && server.cluster->before_sleep_handle_slot_migration) { server.cluster->before_sleep_handle_slot_migration = 0; clusterSlotMigrationCron(); } - clusterCurrentBus->beforeSleep(); + clusterCurrentBus->beforeSleep(blocked); } static void clusterAutoFailoverOnShutdown(void) { if (!nodeIsPrimary(myself)) return; diff --git a/src/cluster.h b/src/cluster.h index 2408c625e13..0707e524f33 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -54,7 +54,7 @@ struct clusterState; void clusterInit(void); void clusterInitLast(void); void clusterCron(void); -void clusterBeforeSleep(void); +void clusterBeforeSleep(bool blocked); int verifyClusterConfigWithData(void); void clusterPrepareShutdown(void); void clusterHandleServerShutdown(bool auto_failover); diff --git a/src/cluster_bus.h b/src/cluster_bus.h index 1a30a1cfc86..2fb008f519b 100644 --- a/src/cluster_bus.h +++ b/src/cluster_bus.h @@ -20,7 +20,7 @@ typedef struct clusterBusType { void (*init)(void); void (*initLast)(void); void (*cron)(void); - void (*beforeSleep)(void); + void (*beforeSleep)(bool blocked); void (*prepareShutdown)(void); /* Called early in shutdown to allow graceful handoff. */ void (*handleServerShutdown)(void); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 2726c97f9f6..bb62df1d35f 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -4653,7 +4653,8 @@ static void clusterLegacyCron(void) { * reaction to events fired but that are not safe to perform inside event * handlers, or to perform potentially expansive tasks that we need to do * a single time before replying to clients. */ -static void clusterLegacyBeforeSleep(void) { +static void clusterLegacyBeforeSleep(bool blocked) { + if (blocked) return; int flags = LEGACY_STATE()->todo_before_sleep; /* Reset our flags (not strictly needed since every single function diff --git a/src/cluster_raft.c b/src/cluster_raft.c index db5e59786f0..1362326dcf6 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -1884,7 +1884,8 @@ static void clusterRaftCheckSlotCoverage(void) { server.cluster->state = all_slots_covered ? CLUSTER_OK : CLUSTER_FAIL; } -static void clusterRaftBeforeSleep(void) { +static void clusterRaftBeforeSleep(bool blocked) { + UNUSED(blocked); clusterRaftState *rs = RAFT_STATE(); if (rs->todo_connect_nodes) { diff --git a/src/server.c b/src/server.c index 794bbf5bcd1..eca25e7db71 100644 --- a/src/server.c +++ b/src/server.c @@ -1871,6 +1871,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { } while (last_processed != 0); processed += freeClientsInAsyncFreeQueue(); server.events_processed_while_blocked += processed; + if (server.cluster_enabled) clusterBeforeSleep(ProcessingEventsWhileBlocked); return; } @@ -1889,7 +1890,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * may change the state of Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function, must be done before blockedBeforeSleep. */ - if (server.cluster_enabled) clusterBeforeSleep(); + if (server.cluster_enabled) clusterBeforeSleep(ProcessingEventsWhileBlocked); /* Handle blocked clients. * must be done before flushAppendOnlyFile, in case of appendfsync=always, From 3c3a99cd7234b1698cd4cb6c30093d21bb872e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 8 Jun 2026 11:59:49 +0200 Subject: [PATCH 13/20] Initialize my_last_committed_info on self NODE_JOIN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit my_last_committed_info starts as an empty string. The periodic NODE_INFO divergence check compares against it after 10 seconds, always finding a mismatch and proposing a redundant NODE_INFO entry. Initialize it when our own NODE_JOIN is applied, since at that point our address and flags are known. Extract the NODE_INFO data string construction into clusterRaftBuildMyNodeInfo() to avoid duplication across the three call sites. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 1362326dcf6..00ce210b389 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -303,6 +303,7 @@ static clusterMsgSendBlock *clusterRaftBuildAllOffsetsMsg(void) { static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, const char *error)); static void clusterRaftDeferPendingProposals(void); static void clusterRaftUpdateMyself(int old_flags); +static sds clusterRaftBuildMyNodeInfo(void); static void clusterRaftCheckSlotCoverage(void); static void clusterRaftBroadcastAppendEntries(void); static void clusterRaftSendAppendEntries(clusterLink *link, clusterNode *node); @@ -982,6 +983,9 @@ static void raftLogApply(raftLogEntry *e) { /* If this entry is about us, promote from joiner to follower. */ if (memcmp(argv[0], myself->name, CLUSTER_NAMELEN) == 0) { + sdsfree(rs->my_last_committed_info); + rs->my_last_committed_info = clusterRaftBuildMyNodeInfo(); + if (rs->role == RAFT_ROLE_JOINER) { rs->role = RAFT_ROLE_FOLLOWER; rs->todo_save_config = 1; @@ -1775,11 +1779,7 @@ static void clusterRaftCron(void) { * out). Re-propose if needed. Check every 10 seconds. */ if (now - rs->last_node_info_check > 10000) { rs->last_node_info_check = now; - sds current = sdscatlen(sdsempty(), myself->name, CLUSTER_NAMELEN); - current = sdscatlen(current, " ", 1); - current = clusterNodeAppendAddressStringNoShardId(current, myself, server.tls_cluster); - current = sdscatfmt(current, " %s", - (myself->flags & CLUSTER_NODE_NOFAILOVER) ? "nofailover" : "noflags"); + sds current = clusterRaftBuildMyNodeInfo(); if (sdscmp(current, rs->my_last_committed_info) != 0) { serverLog(LL_NOTICE, "NODE_INFO diverged from last commit. Re-proposing."); serverLog(LL_NOTICE, "Old committed and new proposed node-info: %s -> %s", @@ -2215,6 +2215,17 @@ static void clusterRaftPostConnect(struct clusterLink *link) { * Config updates — broadcast metadata changes through Raft log * -------------------------------------------------------------------------- */ +/* Build the NODE_INFO data string for myself: "
". + * Caller must sdsfree() the result. */ +static sds clusterRaftBuildMyNodeInfo(void) { + sds s = sdscatlen(sdsempty(), myself->name, CLUSTER_NAMELEN); + s = sdscatlen(s, " ", 1); + s = clusterNodeAppendAddressStringNoShardId(s, myself, server.tls_cluster); + s = sdscatlen(s, " ", 1); + s = sdscat(s, (myself->flags & CLUSTER_NODE_NOFAILOVER) ? "nofailover" : "noflags"); + return s; +} + static void clusterRaftUpdateMyself(int old_flags) { UNUSED(old_flags); /* Clear cached CLUSTER SLOTS immediately — our address/hostname @@ -2224,16 +2235,10 @@ static void clusterRaftUpdateMyself(int old_flags) { * or if our IP is not yet known. */ if ((myself->flags & CLUSTER_NODE_MEET) || myself->ip[0] == '\0') return; /* Propose NODE_INFO to propagate the change to other nodes. */ + sds data = clusterRaftBuildMyNodeInfo(); sds entry = sdsnew("NODE_INFO "); - entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); - entry = sdscatlen(entry, " ", 1); - entry = clusterNodeAppendAddressStringNoShardId(entry, myself, server.tls_cluster); - entry = sdscatlen(entry, " ", 1); - if (myself->flags & CLUSTER_NODE_NOFAILOVER) { - entry = sdscat(entry, "nofailover"); - } else { - entry = sdscat(entry, "noflags"); - } + entry = sdscatsds(entry, data); + sdsfree(data); clusterRaftPropose(entry, NULL, NULL); sdsfree(entry); } From 180f698fb5d7fa8c8325d6431b7bc7c81f854d88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 9 Jun 2026 15:31:46 +0200 Subject: [PATCH 14/20] Raft: defer vote response until votedFor is persisted MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Raft paper requires votedFor to be on stable storage before responding to a vote request. Otherwise a crash after responding but before persisting could allow voting for a different candidate in the same term after restart, breaking the single-vote-per-term invariant. Defer the granted vote response to beforeSleep, after the full config rewrite (triggered by todo_save_config) persists votedFor. Denial responses are sent immediately since they don't change persisted state. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 00ce210b389..ded9d8dfa76 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -136,6 +136,7 @@ typedef struct { unsigned int todo_persist_log : 1; unsigned int todo_save_config : 1; unsigned int todo_send_ae_ack : 1; + unsigned int todo_send_vote_response : 1; uint64_t persist_log_from; /* First index to persist in next batch */ uint64_t last_rewrite_applied; /* last_applied at last full rewrite */ @@ -1451,7 +1452,13 @@ static int clusterRaftProcessRequestVote(clusterLink *link, int argc, sds *argv) } } - clusterRaftSendVoteResponse(link, rs->current_term, granted); + if (granted) { + /* Defer response until votedFor is persisted. todo_save_config triggers + * a full rewrite in beforeSleep; the response is sent after that. */ + rs->todo_send_vote_response = 1; + } else { + clusterRaftSendVoteResponse(link, rs->current_term, 0); + } return 1; } @@ -1914,6 +1921,16 @@ static void clusterRaftBeforeSleep(bool blocked) { } } + if (rs->todo_send_vote_response) { + rs->todo_send_vote_response = 0; + /* Send granted vote response to the candidate we voted for. */ + clusterNode *candidate = clusterLookupNode(rs->voted_for, CLUSTER_NAMELEN); + clusterLink *link = candidate ? (candidate->inbound_link ? candidate->inbound_link : candidate->link) : NULL; + if (link) { + clusterRaftSendVoteResponse(link, rs->current_term, 1); + } + } + if (rs->todo_broadcast_ae) { rs->todo_broadcast_ae = 0; if (rs->role == RAFT_ROLE_LEADER) clusterRaftBroadcastAppendEntries(); From 9332e3da3f8b00965d74ba0113c1a27846471e08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 9 Jun 2026 15:37:13 +0200 Subject: [PATCH 15/20] Raft: defer RequestVote until term bump and self-vote are persisted MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When starting an election, the candidate increments its term and votes for itself. These must be on stable storage before sending RequestVote, otherwise a crash could allow the node to vote for a different candidate in the same term after restart (double-voting). Defer broadcasting RequestVote to beforeSleep, after todo_save_config persists the new term and votedFor. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index ded9d8dfa76..5b77d8a3212 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -137,6 +137,7 @@ typedef struct { unsigned int todo_save_config : 1; unsigned int todo_send_ae_ack : 1; unsigned int todo_send_vote_response : 1; + unsigned int todo_broadcast_vote_request : 1; uint64_t persist_log_from; /* First index to persist in next batch */ uint64_t last_rewrite_applied; /* last_applied at last full rewrite */ @@ -1522,19 +1523,12 @@ static void clusterRaftStartElection(void) { clusterRaftRandomizeElectionTimeout(); rs->last_heartbeat = monotonicMs(); rs->todo_save_config = 1; + /* Defer sending RequestVote until after the term bump and self-vote + * are persisted. Otherwise a crash could allow double-voting. */ + rs->todo_broadcast_vote_request = 1; serverLog(LL_NOTICE, "Starting Raft election (term %llu).", (unsigned long long)rs->current_term); - dictIterator *di = dictGetSafeIterator(server.cluster->nodes); - dictEntry *de; - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - if (node == myself || !node->link) continue; - if (node->flags & CLUSTER_NODE_MEET) continue; - clusterRaftSendRequestVote(node->link); - } - dictReleaseIterator(di); - /* Single-node: already have quorum. */ int quorum = server.cluster->size / 2 + 1; if (rs->votes_received >= quorum) { @@ -1931,6 +1925,19 @@ static void clusterRaftBeforeSleep(bool blocked) { } } + if (rs->todo_broadcast_vote_request) { + rs->todo_broadcast_vote_request = 0; + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node == myself || !node->link) continue; + if (node->flags & CLUSTER_NODE_MEET) continue; + clusterRaftSendRequestVote(node->link); + } + dictReleaseIterator(di); + } + if (rs->todo_broadcast_ae) { rs->todo_broadcast_ae = 0; if (rs->role == RAFT_ROLE_LEADER) clusterRaftBroadcastAppendEntries(); From cfb10b5472e3942253599e4e0b06da04edbb60d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 9 Jun 2026 15:40:22 +0200 Subject: [PATCH 16/20] Raft: trigger full rewrite on log truncation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a log conflict is detected and entries are truncated, those entries may already be persisted on disk. A full config rewrite is needed to remove them, otherwise they would be replayed on restart. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 5b77d8a3212..4377776c1bb 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -902,6 +902,9 @@ static void raftLogTruncateFrom(uint64_t index) { while (rs->log_count > 0 && rs->log[rs->log_count - 1]->index >= index) { raftLogFree(rs->log[--rs->log_count]); } + /* Truncated entries may already be on disk; a full rewrite is needed + * to remove them. */ + rs->todo_save_config = 1; } static uint64_t raftLogLastIndex(void) { From 90b6a9d43d28ba2b0a06a11581d8b8f49e3ea4c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 9 Jun 2026 17:08:26 +0200 Subject: [PATCH 17/20] Raft: defer singleton leader commit until after persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, a singleton leader (quorum=1) committed and applied entries inline in clusterRaftPropose, before they were persisted. This meant side effects (CLUSTER MEET unblock, broadcast AE to new peer) could fire before the entry was on stable storage. Move singleton commit to beforeSleep, after the persist step. This ensures all entries are durable before being applied, and all apply side effects (unblock meets, broadcast AE) happen at the right time without needing separate deferral flags for each. Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 41 +++++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 4377776c1bb..0aec2852df8 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -741,18 +741,9 @@ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, serverLog(LL_NOTICE, "Leader appended %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); - /* Single-node cluster: commit and apply immediately. */ - if (server.cluster->size <= 1) { - rs->commit_index = idx; - while (rs->last_applied < rs->commit_index) { - rs->last_applied++; - raftLogEntry *e = raftLogGet(rs->last_applied); - if (e) raftLogApply(e); - } - /* If we just grew beyond singleton, replicate to the new peer. */ - if (server.cluster->size > 1) rs->todo_broadcast_ae = 1; - } else { - /* Replicate to followers immediately. */ + /* Multi-node: replicate to followers. Single-node: commit is + * deferred to beforeSleep after persistence. */ + if (server.cluster->size > 1) { rs->todo_broadcast_ae = 1; } } else { @@ -797,16 +788,9 @@ static int clusterRaftProcessPropose(clusterLink *link, int argc, sds *argv) { serverLog(LL_NOTICE, "Leader appended proposed %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); - /* Single-node cluster: commit and apply immediately. */ - if (server.cluster->size <= 1) { - rs->commit_index = idx; - while (rs->last_applied < rs->commit_index) { - rs->last_applied++; - raftLogEntry *e = raftLogGet(rs->last_applied); - if (e) raftLogApply(e); - } - } else { - /* Replicate to followers immediately. */ + /* Multi-node: replicate to followers. Single-node: commit is + * deferred to beforeSleep after persistence. */ + if (server.cluster->size > 1) { rs->todo_broadcast_ae = 1; } @@ -1908,6 +1892,19 @@ static void clusterRaftBeforeSleep(bool blocked) { clusterRaftPersistNewLogEntries(rs->persist_log_from); } + /* Singleton leader: commit after persistence (quorum = 1). */ + if (rs->role == RAFT_ROLE_LEADER && server.cluster->size <= 1 && + raftLogLastIndex() > rs->commit_index) { + rs->commit_index = raftLogLastIndex(); + while (rs->last_applied < rs->commit_index) { + rs->last_applied++; + raftLogEntry *e = raftLogGet(rs->last_applied); + if (e) raftLogApply(e); + } + /* If we just grew beyond singleton, replicate to the new peer. */ + if (server.cluster->size > 1) rs->todo_broadcast_ae = 1; + } + if (rs->todo_send_ae_ack) { rs->todo_send_ae_ack = 0; clusterNode *leader = clusterLookupNode(rs->leader, CLUSTER_NAMELEN); From c64fbc4b4b24aa7b61da8f7171b6e29b7e20f024 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 9 Jun 2026 17:15:47 +0200 Subject: [PATCH 18/20] Raft: define RAFT_LOG_REWRITE_THRESHOLD for periodic rewrite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 0aec2852df8..7681246b42c 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -197,6 +197,7 @@ typedef struct { #define RAFT_HDR_SIZE 8 #define REPL_OFFSETS_BROADCAST_PERIOD_MS 10000 +#define RAFT_LOG_REWRITE_THRESHOLD 100 /* Monotonic millisecond clock for timeouts and failure detection. * Unlike gettimeofday(), this is not affected by system clock adjustments. */ @@ -1882,7 +1883,8 @@ static void clusterRaftBeforeSleep(bool blocked) { } if (rs->todo_save_config || - (rs->todo_persist_log && rs->last_applied - rs->last_rewrite_applied >= 100)) { + (rs->todo_persist_log && + rs->last_applied - rs->last_rewrite_applied >= RAFT_LOG_REWRITE_THRESHOLD)) { rs->todo_save_config = 0; rs->todo_persist_log = 0; clusterSaveConfigOrDie(1); From b1b8b0862c192d524e07e18bd7b7d7a94e9f6b40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 9 Jun 2026 17:19:13 +0200 Subject: [PATCH 19/20] Raft: full config rewrite on shutdown for faster restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Write a compacted nodes.conf at shutdown. Without this, the node restarts with committed entries in the log tail that it cannot apply until the leader sends AE with the updated commit index. With the rewrite, lastApplied is up to date and the log tail only contains truly uncommitted entries (if any). Signed-off-by: Viktor Söderqvist --- src/cluster_raft.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 7681246b42c..3f03e2440c7 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -2037,6 +2037,9 @@ static void clusterRaftPrepareShutdown(void) { } static void clusterRaftHandleServerShutdown(void) { + /* Compact the log on disk so restart is fast. */ + clusterSaveConfigOrDie(1); + clusterRaftState *rs = RAFT_STATE(); /* Free pending proposals. */ listIter li; From e702a5e025a5a445486bae175a49dbbf259b8ee7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 9 Jun 2026 17:26:22 +0200 Subject: [PATCH 20/20] Remove incomplete-line check from nodes.conf loading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The trailing newline check could break gossip users whose editors strip final newlines. Since it only protects raft log lines from crash-during-append, and we plan to add per-line checksums as the proper solution, remove the check for now. Signed-off-by: Viktor Söderqvist --- design-docs/cluster-raft.md | 6 +++--- src/cluster_nodes.c | 6 ------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/design-docs/cluster-raft.md b/design-docs/cluster-raft.md index 05fa05dd337..9db922460ac 100644 --- a/design-docs/cluster-raft.md +++ b/design-docs/cluster-raft.md @@ -633,10 +633,10 @@ The leader will send AE after reconnection, updating `commit_index`. Entries between `lastApplied + 1` and the new `commit_index` are then applied. -### Incomplete lines +### Crash detection -If the last line lacks a trailing newline (crash during append), it and -everything after it is discarded. +Not yet implemented. A future improvement will add per-line checksums +to detect partial writes and corruption (see Future Work). ### File format diff --git a/src/cluster_nodes.c b/src/cluster_nodes.c index dba24549f52..fb70c71de16 100644 --- a/src/cluster_nodes.c +++ b/src/cluster_nodes.c @@ -666,12 +666,6 @@ int clusterLoadConfig(char *filename) { * before the truncate() call. */ if (line[0] == '\n' || line[0] == '\0') continue; - /* A line without a trailing newline means the write was interrupted - * (crash during append). Stop reading — this and anything after is - * potentially incomplete. */ - size_t linelen = strlen(line); - if (linelen > 0 && line[linelen - 1] != '\n') break; - /* Split the line into arguments for processing. */ argv = sdssplitargs(line, &argc); if (argv == NULL) goto fmterr;