Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
523722e
Raft: persist state and WAL in nodes.conf
zuiderkwast May 5, 2026
bc35258
Update design doc: Raft log and state persistence in nodes.conf
zuiderkwast May 5, 2026
17cb404
Raft: restore cluster size on restart
zuiderkwast Jun 4, 2026
5dbe473
Fix node appearing in multiple shards when shard-id changes
zuiderkwast Jun 5, 2026
b5e27e4
Exclude shard-id from NODE_INFO entries
zuiderkwast Jun 5, 2026
30afb73
Implement log completeness check in RequestVote
zuiderkwast Jun 5, 2026
cd28fb9
Evaluate slot coverage on startup
zuiderkwast Jun 6, 2026
0e79568
Raft: inline raftLogMarkDirty into raftLogAppend
zuiderkwast Jun 6, 2026
e37c8fd
Raft: trigger full config rewrite after 100 appended entries
zuiderkwast Jun 6, 2026
415c420
Design doc: add checksum future work item for log persistence
zuiderkwast Jun 6, 2026
d52ce26
Raft: defer AE_ACK until after persistence
zuiderkwast Jun 6, 2026
c52c2b6
Run Raft cluster's beforeSleep during RDB loading
zuiderkwast Jun 7, 2026
3c3a99c
Initialize my_last_committed_info on self NODE_JOIN
zuiderkwast Jun 8, 2026
180f698
Raft: defer vote response until votedFor is persisted
zuiderkwast Jun 9, 2026
9332e3d
Raft: defer RequestVote until term bump and self-vote are persisted
zuiderkwast Jun 9, 2026
cfb10b5
Raft: trigger full rewrite on log truncation
zuiderkwast Jun 9, 2026
90b6a9d
Raft: defer singleton leader commit until after persistence
zuiderkwast Jun 9, 2026
c64fbc4
Raft: define RAFT_LOG_REWRITE_THRESHOLD for periodic rewrite
zuiderkwast Jun 9, 2026
b1b8b08
Raft: full config rewrite on shutdown for faster restart
zuiderkwast Jun 9, 2026
e702a5e
Remove incomplete-line check from nodes.conf loading
zuiderkwast Jun 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 72 additions & 5 deletions design-docs/cluster-raft.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ FAILOVER <replica-id> <primary-id>

NODE_INFO <node-id> <address-string> <flags>
Update node address and self-set flags. The address-string uses
the nodes.conf format. Flags is "nofailover" or "noflags".
the nodes.conf format but excludes shard-id, which is not a
property of the node but of the shard and is managed by NODE_JOIN
and SET_REPLICA_OF. Flags is "nofailover" or "noflags".

NODE_FAIL <node-id>
Mark a node as failed. Proposed by the leader when a peer exceeds
Expand Down Expand Up @@ -575,9 +577,73 @@ every 10 seconds.

## Persistence

TODO: The Raft log and state (currentTerm, votedFor) will be persisted
in nodes.conf using the vars section for Raft state and additional
lines for uncommitted log entries.
Raft state is persisted in `nodes.conf`. The file has three sections:

1. **Node lines** — the cluster state snapshot (nodes, slots, replication
topology) as of `lastApplied`.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this will include the shard level epoch as well ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll need to persist it in some way, either in the node lines or in the vars line.

I have an idea. I posted it on your PR. #3899 (comment)

2. **Vars line** — `currentTerm`, `lastApplied`, `votedFor`, `raftLeader`.
3. **Log lines** — entries with index > `lastApplied` appended at the end.

Only `currentTerm`, `votedFor`, and the log require persistence for
safety; `commitIndex` is rediscovered from the leader after restart
(Raft paper §5.2, Figure 2).

Log line format:

log <index> <term> <type> <data>

### Full rewrite

A full rewrite (atomic write to temp file + rename + fsync) is triggered
when:

- `currentTerm` changes (step-down on higher term).
- `votedFor` changes (granting a vote or starting an election).
- An applied entry affects `myself` (SLOT_CHANGE, SET_REPLICA_OF,
FAILOVER, NODE_JOIN promotion from learner).

A full rewrite updates the snapshot (node lines reflect the current
Comment thread
zuiderkwast marked this conversation as resolved.
applied state), updates `lastApplied` in vars, and writes only
unapplied log entries in the tail.

### Append-only

When new log entries arrive (from AE or local proposals), they are
appended to the end of the file with a single batched write + fsync
(deferred to `beforeSleep`). No full rewrite is needed.

**Safety invariant:** The fsync must complete before the AE_ACK reaches
the leader. This is enforced by deferring the success AE_ACK: the AE
handler sets `todo_send_ae_ack` instead of sending immediately, and
`beforeSleep` sends the ACK only after persistence completes. If fsync
is ever moved to a background thread, the ACK must be deferred until
the background fsync completes.

### Startup

On load:

1. Node lines are parsed → cluster state restored (snapshot).
2. Vars line is parsed → `currentTerm`, `votedFor`, `lastApplied` restored.
`commit_index` is set to `lastApplied`.
3. Log lines are parsed → entries added to the in-memory log.
4. If the node is a replica, replication is started.

The leader will send AE after reconnection, updating `commit_index`.
Entries between `lastApplied + 1` and the new `commit_index` are then
applied.

### Crash detection

Not yet implemented. A future improvement will add per-line checksums
to detect partial writes and corruption (see Future Work).

### File format

The node lines share the same format as the gossip protocol (code
reuse), but the vars and log lines are raft-specific. The file is not
compatible between protocols — switching from gossip to raft (or vice
versa) requires removing nodes.conf.
Comment thread
zuiderkwast marked this conversation as resolved.

## Shard Epoch (not yet implemented)

Expand Down Expand Up @@ -631,7 +697,6 @@ targets.
entries, especially don't trigger primary/replica failovers in a
minority partition).
- Log compaction / snapshotting for lagging followers.
- Persistence of Raft log to disk.
- Learners (non-voting members): reduces the risk for split-vote for
leader election in large clusters and reduces commit overhead.
- Leader transfer on CLUSTER FORGET where the target is the leader.
Expand All @@ -641,3 +706,5 @@ targets.
atomically or not.
- `valkey-cli --cluster` tooling compatibility (uses MULTI internally in
some cases, which doesn't work with blocking admin commands).
- Checksums for appended log lines to detect partial writes and
bit-rot, instead of relying solely on trailing newline detection.
6 changes: 3 additions & 3 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ void clusterCron(void) {
clusterCurrentBus->cron();
clusterCheckReplicaMigration();
}
void clusterBeforeSleep(void) {
if (server.cluster->before_sleep_handle_slot_migration) {
void clusterBeforeSleep(bool blocked) {
if (!blocked && server.cluster->before_sleep_handle_slot_migration) {
server.cluster->before_sleep_handle_slot_migration = 0;
clusterSlotMigrationCron();
}
clusterCurrentBus->beforeSleep();
clusterCurrentBus->beforeSleep(blocked);
}
static void clusterAutoFailoverOnShutdown(void) {
if (!nodeIsPrimary(myself)) return;
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct clusterState;
void clusterInit(void);
void clusterInitLast(void);
void clusterCron(void);
void clusterBeforeSleep(void);
void clusterBeforeSleep(bool blocked);
int verifyClusterConfigWithData(void);
void clusterPrepareShutdown(void);
void clusterHandleServerShutdown(bool auto_failover);
Expand Down
9 changes: 8 additions & 1 deletion src/cluster_bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ typedef struct clusterBusType {
void (*init)(void);
void (*initLast)(void);
void (*cron)(void);
void (*beforeSleep)(void);
void (*beforeSleep)(bool blocked);
void (*prepareShutdown)(void); /* Called early in shutdown to allow graceful handoff. */
void (*handleServerShutdown)(void);

Expand Down Expand Up @@ -83,6 +83,13 @@ typedef struct clusterBusType {
* Returns 1 if the variable was handled, 0 otherwise. */
int (*parseVarsLine)(const char *name, const char *value);

/* Parse a "log" line from nodes.conf. argv/argc are the split line
* (argv[0] is "log"). */
void (*parseLogLine)(sds *argv, int argc);

/* Append "log" lines during a full config rewrite. */
sds (*appendLogLines)(sds config);

/* Called after nodes.conf is fully loaded, for post-load fixups
* (e.g. ensuring currentEpoch >= max configEpoch). If NULL, skipped. */
void (*postLoad)(void);
Expand Down
3 changes: 2 additions & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -4653,7 +4653,8 @@ static void clusterLegacyCron(void) {
* reaction to events fired but that are not safe to perform inside event
* handlers, or to perform potentially expansive tasks that we need to do
* a single time before replying to clients. */
static void clusterLegacyBeforeSleep(void) {
static void clusterLegacyBeforeSleep(bool blocked) {
if (blocked) return;
int flags = LEGACY_STATE()->todo_before_sleep;

/* Reset our flags (not strictly needed since every single function
Expand Down
35 changes: 30 additions & 5 deletions src/cluster_nodes.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ static int auxShardIdSetter(clusterNode *n, void *value, size_t length) {
if (verifyClusterNodeId(value, length) == C_ERR) {
return C_ERR;
}
if (memcmp(n->shard_id, value, CLUSTER_NAMELEN) == 0) {
return C_OK;
}
clusterRemoveNodeFromShard(n);
memcpy(n->shard_id, value, CLUSTER_NAMELEN);
/* if n already has replicas, make sure they all agree
* on the shard id. If not, update them. */
clusterAddNodeToShard(n->shard_id, n);
/* If n has replicas, make sure they follow the new shard id. */
for (int i = 0; i < n->num_replicas; i++) {
if (memcmp(n->replicas[i]->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) {
serverLog(LL_NOTICE,
Expand All @@ -109,7 +113,6 @@ static int auxShardIdSetter(clusterNode *n, void *value, size_t length) {
clusterAddNodeToShard(n->shard_id, n->replicas[i]);
}
}
clusterAddNodeToShard(value, n);
return C_OK;
}

Expand Down Expand Up @@ -328,8 +331,9 @@ static sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pa
}

/* Append the address+aux string for a node to an sds: ip:port@cport[,hostname][,aux=val]*
* This is the same format used in the second column of nodes.conf. */
sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) {
* This is the same format used in the second column of nodes.conf.
* skip_aux is a bitmask of auxFieldIndex values to omit. */
static sds clusterNodeAppendAddressStringSkip(sds s, clusterNode *node, int tls_primary, unsigned int skip_aux) {
int port = tls_primary ? node->tls_port : node->tcp_port;
s = sdscatfmt(s, "%s:%i@%i", node->ip, port, node->cport);
if (sdslen(node->hostname) != 0) {
Expand All @@ -338,6 +342,7 @@ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) {
s = sdscatlen(s, ",", 1);
}
for (int i = af_count - 1; i >= 0; i--) {
if (skip_aux & (1u << i)) continue;
if ((tls_primary && i == af_tls_port) || (!tls_primary && i == af_tcp_port)) continue;
if (auxFieldHandlers[i].isPresent(node)) {
s = sdscatfmt(s, ",%s=", auxFieldHandlers[i].field);
Expand All @@ -347,6 +352,16 @@ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) {
return s;
}

sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) {
return clusterNodeAppendAddressStringSkip(s, node, tls_primary, 0);
}

/* Like clusterNodeAppendAddressString but omits shard-id, which is managed
* by dedicated raft log entries (NODE_JOIN, SET_REPLICA_OF). */
sds clusterNodeAppendAddressStringNoShardId(sds s, clusterNode *node, int tls_primary) {
return clusterNodeAppendAddressStringSkip(s, node, tls_primary, 1u << af_shard_id);
}

/* Parse an address+aux string onto a node. The string format is:
* ip:port@cport[,hostname][,aux=val]*
* Returns C_OK on success, C_ERR on parse error. The input string is modified. */
Expand Down Expand Up @@ -670,6 +685,14 @@ int clusterLoadConfig(char *filename) {
continue;
}

/* Handle "log" lines (protocol-specific WAL entries). */
if (strcasecmp(argv[0], "log") == 0) {
if (clusterCurrentBus->parseLogLine)
clusterCurrentBus->parseLogLine(argv, argc);
sdsfreesplitres(argv, argc);
continue;
}
Comment thread
murphyjacob4 marked this conversation as resolved.

/* Regular config lines have at least eight fields */
if (argc < 8) {
sdsfreesplitres(argv, argc);
Expand Down Expand Up @@ -894,6 +917,8 @@ int clusterSaveConfig(int do_fsync) {
ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0);
if (clusterCurrentBus->appendVarsLine)
ci = clusterCurrentBus->appendVarsLine(ci);
if (clusterCurrentBus->appendLogLines)
ci = clusterCurrentBus->appendLogLines(ci);
content_size = sdslen(ci);

/* Create a temp file with the new content. */
Expand Down
1 change: 1 addition & 0 deletions src/cluster_nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

/* Node address string: ip:port@cport[,hostname][,aux=val]* */
sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary);
sds clusterNodeAppendAddressStringNoShardId(sds s, clusterNode *node, int tls_primary);
int clusterNodeParseAddressString(clusterNode *n, char *str);

/* Node description / serialization. */
Expand Down
Loading
Loading