diff --git a/design-docs/cluster-raft.md b/design-docs/cluster-raft.md index 732c7cf3eb2..9db922460ac 100644 --- a/design-docs/cluster-raft.md +++ b/design-docs/cluster-raft.md @@ -145,7 +145,9 @@ FAILOVER NODE_INFO Update node address and self-set flags. The address-string uses - the nodes.conf format. Flags is "nofailover" or "noflags". + the nodes.conf format but excludes shard-id, which is not a + property of the node but of the shard and is managed by NODE_JOIN + and SET_REPLICA_OF. Flags is "nofailover" or "noflags". NODE_FAIL Mark a node as failed. Proposed by the leader when a peer exceeds @@ -575,9 +577,73 @@ every 10 seconds. ## Persistence -TODO: The Raft log and state (currentTerm, votedFor) will be persisted -in nodes.conf using the vars section for Raft state and additional -lines for uncommitted log entries. +Raft state is persisted in `nodes.conf`. The file has three sections: + +1. **Node lines** — the cluster state snapshot (nodes, slots, replication + topology) as of `lastApplied`. +2. **Vars line** — `currentTerm`, `lastApplied`, `votedFor`, `raftLeader`. +3. **Log lines** — entries with index > `lastApplied` appended at the end. + +Only `currentTerm`, `votedFor`, and the log require persistence for +safety; `commitIndex` is rediscovered from the leader after restart +(Raft paper §5.2, Figure 2). + +Log line format: + + log + +### Full rewrite + +A full rewrite (atomic write to temp file + rename + fsync) is triggered +when: + +- `currentTerm` changes (step-down on higher term). +- `votedFor` changes (granting a vote or starting an election). +- An applied entry affects `myself` (SLOT_CHANGE, SET_REPLICA_OF, + FAILOVER, NODE_JOIN promotion from learner). + +A full rewrite updates the snapshot (node lines reflect the current +applied state), updates `lastApplied` in vars, and writes only +unapplied log entries in the tail. + +### Append-only + +When new log entries arrive (from AE or local proposals), they are +appended to the end of the file with a single batched write + fsync +(deferred to `beforeSleep`). No full rewrite is needed. + +**Safety invariant:** The fsync must complete before the AE_ACK reaches +the leader. This is enforced by deferring the success AE_ACK: the AE +handler sets `todo_send_ae_ack` instead of sending immediately, and +`beforeSleep` sends the ACK only after persistence completes. If fsync +is ever moved to a background thread, the ACK must be deferred until +the background fsync completes. + +### Startup + +On load: + +1. Node lines are parsed → cluster state restored (snapshot). +2. Vars line is parsed → `currentTerm`, `votedFor`, `lastApplied` restored. + `commit_index` is set to `lastApplied`. +3. Log lines are parsed → entries added to the in-memory log. +4. If the node is a replica, replication is started. + +The leader will send AE after reconnection, updating `commit_index`. +Entries between `lastApplied + 1` and the new `commit_index` are then +applied. + +### Crash detection + +Not yet implemented. A future improvement will add per-line checksums +to detect partial writes and corruption (see Future Work). + +### File format + +The node lines share the same format as the gossip protocol (code +reuse), but the vars and log lines are raft-specific. The file is not +compatible between protocols — switching from gossip to raft (or vice +versa) requires removing nodes.conf. ## Shard Epoch (not yet implemented) @@ -631,7 +697,6 @@ targets. entries, especially don't trigger primary/replica failovers in a minority partition). - Log compaction / snapshotting for lagging followers. -- Persistence of Raft log to disk. - Learners (non-voting members): reduces the risk for split-vote for leader election in large clusters and reduces commit overhead. - Leader transfer on CLUSTER FORGET where the target is the leader. @@ -641,3 +706,5 @@ targets. atomically or not. - `valkey-cli --cluster` tooling compatibility (uses MULTI internally in some cases, which doesn't work with blocking admin commands). +- Checksums for appended log lines to detect partial writes and + bit-rot, instead of relying solely on trailing newline detection. diff --git a/src/cluster.c b/src/cluster.c index 517013e118e..3a7907fa738 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -158,12 +158,12 @@ void clusterCron(void) { clusterCurrentBus->cron(); clusterCheckReplicaMigration(); } -void clusterBeforeSleep(void) { - if (server.cluster->before_sleep_handle_slot_migration) { +void clusterBeforeSleep(bool blocked) { + if (!blocked && server.cluster->before_sleep_handle_slot_migration) { server.cluster->before_sleep_handle_slot_migration = 0; clusterSlotMigrationCron(); } - clusterCurrentBus->beforeSleep(); + clusterCurrentBus->beforeSleep(blocked); } static void clusterAutoFailoverOnShutdown(void) { if (!nodeIsPrimary(myself)) return; diff --git a/src/cluster.h b/src/cluster.h index 2408c625e13..0707e524f33 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -54,7 +54,7 @@ struct clusterState; void clusterInit(void); void clusterInitLast(void); void clusterCron(void); -void clusterBeforeSleep(void); +void clusterBeforeSleep(bool blocked); int verifyClusterConfigWithData(void); void clusterPrepareShutdown(void); void clusterHandleServerShutdown(bool auto_failover); diff --git a/src/cluster_bus.h b/src/cluster_bus.h index 52e5270eb1b..2fb008f519b 100644 --- a/src/cluster_bus.h +++ b/src/cluster_bus.h @@ -20,7 +20,7 @@ typedef struct clusterBusType { void (*init)(void); void (*initLast)(void); void (*cron)(void); - void (*beforeSleep)(void); + void (*beforeSleep)(bool blocked); void (*prepareShutdown)(void); /* Called early in shutdown to allow graceful handoff. */ void (*handleServerShutdown)(void); @@ -83,6 +83,13 @@ typedef struct clusterBusType { * Returns 1 if the variable was handled, 0 otherwise. */ int (*parseVarsLine)(const char *name, const char *value); + /* Parse a "log" line from nodes.conf. argv/argc are the split line + * (argv[0] is "log"). */ + void (*parseLogLine)(sds *argv, int argc); + + /* Append "log" lines during a full config rewrite. */ + sds (*appendLogLines)(sds config); + /* Called after nodes.conf is fully loaded, for post-load fixups * (e.g. ensuring currentEpoch >= max configEpoch). If NULL, skipped. */ void (*postLoad)(void); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 2726c97f9f6..bb62df1d35f 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -4653,7 +4653,8 @@ static void clusterLegacyCron(void) { * reaction to events fired but that are not safe to perform inside event * handlers, or to perform potentially expansive tasks that we need to do * a single time before replying to clients. */ -static void clusterLegacyBeforeSleep(void) { +static void clusterLegacyBeforeSleep(bool blocked) { + if (blocked) return; int flags = LEGACY_STATE()->todo_before_sleep; /* Reset our flags (not strictly needed since every single function diff --git a/src/cluster_nodes.c b/src/cluster_nodes.c index 2018a3d90ec..fb70c71de16 100644 --- a/src/cluster_nodes.c +++ b/src/cluster_nodes.c @@ -95,9 +95,13 @@ static int auxShardIdSetter(clusterNode *n, void *value, size_t length) { if (verifyClusterNodeId(value, length) == C_ERR) { return C_ERR; } + if (memcmp(n->shard_id, value, CLUSTER_NAMELEN) == 0) { + return C_OK; + } + clusterRemoveNodeFromShard(n); memcpy(n->shard_id, value, CLUSTER_NAMELEN); - /* if n already has replicas, make sure they all agree - * on the shard id. If not, update them. */ + clusterAddNodeToShard(n->shard_id, n); + /* If n has replicas, make sure they follow the new shard id. */ for (int i = 0; i < n->num_replicas; i++) { if (memcmp(n->replicas[i]->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { serverLog(LL_NOTICE, @@ -109,7 +113,6 @@ static int auxShardIdSetter(clusterNode *n, void *value, size_t length) { clusterAddNodeToShard(n->shard_id, n->replicas[i]); } } - clusterAddNodeToShard(value, n); return C_OK; } @@ -328,8 +331,9 @@ static sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pa } /* Append the address+aux string for a node to an sds: ip:port@cport[,hostname][,aux=val]* - * This is the same format used in the second column of nodes.conf. */ -sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { + * This is the same format used in the second column of nodes.conf. + * skip_aux is a bitmask of auxFieldIndex values to omit. */ +static sds clusterNodeAppendAddressStringSkip(sds s, clusterNode *node, int tls_primary, unsigned int skip_aux) { int port = tls_primary ? node->tls_port : node->tcp_port; s = sdscatfmt(s, "%s:%i@%i", node->ip, port, node->cport); if (sdslen(node->hostname) != 0) { @@ -338,6 +342,7 @@ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { s = sdscatlen(s, ",", 1); } for (int i = af_count - 1; i >= 0; i--) { + if (skip_aux & (1u << i)) continue; if ((tls_primary && i == af_tls_port) || (!tls_primary && i == af_tcp_port)) continue; if (auxFieldHandlers[i].isPresent(node)) { s = sdscatfmt(s, ",%s=", auxFieldHandlers[i].field); @@ -347,6 +352,16 @@ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { return s; } +sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) { + return clusterNodeAppendAddressStringSkip(s, node, tls_primary, 0); +} + +/* Like clusterNodeAppendAddressString but omits shard-id, which is managed + * by dedicated raft log entries (NODE_JOIN, SET_REPLICA_OF). */ +sds clusterNodeAppendAddressStringNoShardId(sds s, clusterNode *node, int tls_primary) { + return clusterNodeAppendAddressStringSkip(s, node, tls_primary, 1u << af_shard_id); +} + /* Parse an address+aux string onto a node. The string format is: * ip:port@cport[,hostname][,aux=val]* * Returns C_OK on success, C_ERR on parse error. The input string is modified. */ @@ -670,6 +685,14 @@ int clusterLoadConfig(char *filename) { continue; } + /* Handle "log" lines (protocol-specific WAL entries). */ + if (strcasecmp(argv[0], "log") == 0) { + if (clusterCurrentBus->parseLogLine) + clusterCurrentBus->parseLogLine(argv, argc); + sdsfreesplitres(argv, argc); + continue; + } + /* Regular config lines have at least eight fields */ if (argc < 8) { sdsfreesplitres(argv, argc); @@ -894,6 +917,8 @@ int clusterSaveConfig(int do_fsync) { ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0); if (clusterCurrentBus->appendVarsLine) ci = clusterCurrentBus->appendVarsLine(ci); + if (clusterCurrentBus->appendLogLines) + ci = clusterCurrentBus->appendLogLines(ci); content_size = sdslen(ci); /* Create a temp file with the new content. */ diff --git a/src/cluster_nodes.h b/src/cluster_nodes.h index cfe7d0ce436..74808ad2e14 100644 --- a/src/cluster_nodes.h +++ b/src/cluster_nodes.h @@ -5,6 +5,7 @@ /* Node address string: ip:port@cport[,hostname][,aux=val]* */ sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary); +sds clusterNodeAppendAddressStringNoShardId(sds s, clusterNode *node, int tls_primary); int clusterNodeParseAddressString(clusterNode *n, char *str); /* Node description / serialization. */ diff --git a/src/cluster_raft.c b/src/cluster_raft.c index e1ad584acd0..3f03e2440c7 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -133,6 +133,13 @@ typedef struct { unsigned int todo_retry_proposals : 1; unsigned int todo_schedule_failover : 1; unsigned int todo_update_replication : 1; + unsigned int todo_persist_log : 1; + unsigned int todo_save_config : 1; + unsigned int todo_send_ae_ack : 1; + unsigned int todo_send_vote_response : 1; + unsigned int todo_broadcast_vote_request : 1; + uint64_t persist_log_from; /* First index to persist in next batch */ + uint64_t last_rewrite_applied; /* last_applied at last full rewrite */ /* NODE_INFO divergence detection. */ sds my_last_committed_info; @@ -190,6 +197,7 @@ typedef struct { #define RAFT_HDR_SIZE 8 #define REPL_OFFSETS_BROADCAST_PERIOD_MS 10000 +#define RAFT_LOG_REWRITE_THRESHOLD 100 /* Monotonic millisecond clock for timeouts and failure detection. * Unlike gettimeofday(), this is not affected by system clock adjustments. */ @@ -298,6 +306,7 @@ static clusterMsgSendBlock *clusterRaftBuildAllOffsetsMsg(void) { static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, const char *error)); static void clusterRaftDeferPendingProposals(void); static void clusterRaftUpdateMyself(int old_flags); +static sds clusterRaftBuildMyNodeInfo(void); static void clusterRaftCheckSlotCoverage(void); static void clusterRaftBroadcastAppendEntries(void); static void clusterRaftSendAppendEntries(clusterLink *link, clusterNode *node); @@ -312,6 +321,7 @@ static raftLogEntry *raftLogGet(uint64_t index); static uint64_t raftLogLastIndex(void); static uint64_t raftLogTermAt(uint64_t index); static void raftLogTruncateFrom(uint64_t index); +static void clusterRaftPersistNewLogEntries(uint64_t from); static void clusterRaftRandomizeElectionTimeout(void) { mstime_t base = server.cluster_node_timeout; @@ -732,18 +742,9 @@ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, serverLog(LL_NOTICE, "Leader appended %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); - /* Single-node cluster: commit and apply immediately. */ - if (server.cluster->size <= 1) { - rs->commit_index = idx; - while (rs->last_applied < rs->commit_index) { - rs->last_applied++; - raftLogEntry *e = raftLogGet(rs->last_applied); - if (e) raftLogApply(e); - } - /* If we just grew beyond singleton, replicate to the new peer. */ - if (server.cluster->size > 1) rs->todo_broadcast_ae = 1; - } else { - /* Replicate to followers immediately. */ + /* Multi-node: replicate to followers. Single-node: commit is + * deferred to beforeSleep after persistence. */ + if (server.cluster->size > 1) { rs->todo_broadcast_ae = 1; } } else { @@ -788,16 +789,9 @@ static int clusterRaftProcessPropose(clusterLink *link, int argc, sds *argv) { serverLog(LL_NOTICE, "Leader appended proposed %s (index %llu).", raftEntryTypeName(type), (unsigned long long)idx); - /* Single-node cluster: commit and apply immediately. */ - if (server.cluster->size <= 1) { - rs->commit_index = idx; - while (rs->last_applied < rs->commit_index) { - rs->last_applied++; - raftLogEntry *e = raftLogGet(rs->last_applied); - if (e) raftLogApply(e); - } - } else { - /* Replicate to followers immediately. */ + /* Multi-node: replicate to followers. Single-node: commit is + * deferred to beforeSleep after persistence. */ + if (server.cluster->size > 1) { rs->todo_broadcast_ae = 1; } @@ -833,6 +827,7 @@ static int clusterRaftMaybeStepDown(clusterRaftState *rs, uint64_t term) { memset(rs->leader, 0, CLUSTER_NAMELEN); clusterRaftRandomizeElectionTimeout(); rs->last_heartbeat = monotonicMs(); + rs->todo_save_config = 1; return 1; } return 0; @@ -868,6 +863,11 @@ static void raftLogAppend(raftLogEntry *e) { rs->log = zrealloc(rs->log, rs->log_alloc * sizeof(raftLogEntry *)); } rs->log[rs->log_count++] = e; + /* Schedule persistence in next beforeSleep. */ + if (!rs->todo_persist_log) { + rs->todo_persist_log = 1; + rs->persist_log_from = e->index; + } } /* O(1) lookup by index. Returns NULL if out of range. Indices start at 1. */ @@ -887,6 +887,9 @@ static void raftLogTruncateFrom(uint64_t index) { while (rs->log_count > 0 && rs->log[rs->log_count - 1]->index >= index) { raftLogFree(rs->log[--rs->log_count]); } + /* Truncated entries may already be on disk; a full rewrite is needed + * to remove them. */ + rs->todo_save_config = 1; } static uint64_t raftLogLastIndex(void) { @@ -970,8 +973,12 @@ static void raftLogApply(raftLogEntry *e) { /* If this entry is about us, promote from joiner to follower. */ if (memcmp(argv[0], myself->name, CLUSTER_NAMELEN) == 0) { + sdsfree(rs->my_last_committed_info); + rs->my_last_committed_info = clusterRaftBuildMyNodeInfo(); + if (rs->role == RAFT_ROLE_JOINER) { rs->role = RAFT_ROLE_FOLLOWER; + rs->todo_save_config = 1; serverLog(LL_NOTICE, "Promoted from joiner to follower."); /* Propose SLOT_CHANGE for slots assigned before joining @@ -1080,7 +1087,10 @@ static void raftLogApply(raftLogEntry *e) { break; } case RAFT_ENTRY_NODE_INFO: { - /* Format: " " */ + /* Format: " " + * Propagates address/hostname/port changes. Shard-id is intentionally + * excluded from the address string because it is authoritatively + * managed by NODE_JOIN and SET_REPLICA_OF entries. */ int argc; sds *argv = sdssplitlen(e->data, sdslen(e->data), " ", 1, &argc); if (argv && argc >= 2 && sdslen(argv[0]) == CLUSTER_NAMELEN) { @@ -1282,7 +1292,9 @@ static int clusterRaftProcessAppendEntries(clusterLink *link, int argc, sds *arg if (e) raftLogApply(e); } - clusterRaftSendAppendEntriesResponse(link, rs->current_term, 1); + /* Defer AE_ACK until after persistence in beforeSleep. This ensures + * entries are on stable storage before the leader counts our ACK. */ + rs->todo_send_ae_ack = 1; return 1; } @@ -1413,14 +1425,29 @@ static int clusterRaftProcessRequestVote(clusterLink *link, int argc, sds *argv) if (msg_term < rs->current_term) { /* Stale term. */ } else if (clusterRaftIsVotedForNone() || memcmp(rs->voted_for, argv[1], CLUSTER_NAMELEN) == 0) { - /* TODO: log completeness check (compare last log index/term) */ - granted = 1; - memcpy(rs->voted_for, argv[1], CLUSTER_NAMELEN); - rs->last_heartbeat = monotonicMs(); /* Reset election timer */ - serverLog(LL_NOTICE, "Voted for %.40s in term %llu.", argv[1], (unsigned long long)msg_term); + /* Log completeness check: only grant vote if candidate's log is + * at least as up-to-date as ours (Raft §5.4.1). */ + uint64_t candidate_last_index = strtoull(argv[3], NULL, 10); + uint64_t candidate_last_term = strtoull(argv[4], NULL, 10); + uint64_t my_last_term = raftLogLastTerm(); + uint64_t my_last_index = raftLogLastIndex(); + if (candidate_last_term > my_last_term || + (candidate_last_term == my_last_term && candidate_last_index >= my_last_index)) { + granted = 1; + memcpy(rs->voted_for, argv[1], CLUSTER_NAMELEN); + rs->last_heartbeat = monotonicMs(); /* Reset election timer */ + rs->todo_save_config = 1; + serverLog(LL_NOTICE, "Voted for %.40s in term %llu.", argv[1], (unsigned long long)msg_term); + } } - clusterRaftSendVoteResponse(link, rs->current_term, granted); + if (granted) { + /* Defer response until votedFor is persisted. todo_save_config triggers + * a full rewrite in beforeSleep; the response is sent after that. */ + rs->todo_send_vote_response = 1; + } else { + clusterRaftSendVoteResponse(link, rs->current_term, 0); + } return 1; } @@ -1483,19 +1510,13 @@ static void clusterRaftStartElection(void) { rs->votes_received = 1; /* Vote for self */ clusterRaftRandomizeElectionTimeout(); rs->last_heartbeat = monotonicMs(); + rs->todo_save_config = 1; + /* Defer sending RequestVote until after the term bump and self-vote + * are persisted. Otherwise a crash could allow double-voting. */ + rs->todo_broadcast_vote_request = 1; serverLog(LL_NOTICE, "Starting Raft election (term %llu).", (unsigned long long)rs->current_term); - dictIterator *di = dictGetSafeIterator(server.cluster->nodes); - dictEntry *de; - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - if (node == myself || !node->link) continue; - if (node->flags & CLUSTER_NODE_MEET) continue; - clusterRaftSendRequestVote(node->link); - } - dictReleaseIterator(di); - /* Single-node: already have quorum. */ int quorum = server.cluster->size / 2 + 1; if (rs->votes_received >= quorum) { @@ -1522,19 +1543,23 @@ static void clusterRaftInit(void) { rs->my_last_committed_info = sdsempty(); rs->last_node_info_check = monotonicMs(); rs->last_repl_offsets_broadcast = monotonicMs(); + rs->todo_update_slot_coverage = 1; server.cluster->size = 0; /* Incremented by NODE_JOIN apply */ } static void clusterRaftInitLast(void) { clusterListenerInit(); - /* Mark myself as not yet in the raft log. Cleared when our own - * NODE_JOIN is applied. Prevents NODE_INFO with empty IP. */ - myself->flags |= CLUSTER_NODE_MEET; + /* On fresh start (size == 0), mark myself as not yet in the raft log. + * Cleared when our own NODE_JOIN is applied. On restart, size is + * restored by postLoad so we skip this. */ + if (server.cluster->size == 0) { + myself->flags |= CLUSTER_NODE_MEET; + } - /* Single-node cluster: become leader immediately. */ + /* Fresh single-node cluster: become leader immediately. */ clusterRaftState *rs = RAFT_STATE(); - if (dictSize(server.cluster->nodes) == 1) { + if (dictSize(server.cluster->nodes) == 1 && server.cluster->size == 0) { rs->role = RAFT_ROLE_LEADER; rs->current_term = 1; memcpy(rs->leader, myself->name, CLUSTER_NAMELEN); @@ -1743,11 +1768,7 @@ static void clusterRaftCron(void) { * out). Re-propose if needed. Check every 10 seconds. */ if (now - rs->last_node_info_check > 10000) { rs->last_node_info_check = now; - sds current = sdscatlen(sdsempty(), myself->name, CLUSTER_NAMELEN); - current = sdscatlen(current, " ", 1); - current = clusterNodeAppendAddressString(current, myself, server.tls_cluster); - current = sdscatfmt(current, " %s", - (myself->flags & CLUSTER_NODE_NOFAILOVER) ? "nofailover" : "noflags"); + sds current = clusterRaftBuildMyNodeInfo(); if (sdscmp(current, rs->my_last_committed_info) != 0) { serverLog(LL_NOTICE, "NODE_INFO diverged from last commit. Re-proposing."); serverLog(LL_NOTICE, "Old committed and new proposed node-info: %s -> %s", @@ -1852,7 +1873,8 @@ static void clusterRaftCheckSlotCoverage(void) { server.cluster->state = all_slots_covered ? CLUSTER_OK : CLUSTER_FAIL; } -static void clusterRaftBeforeSleep(void) { +static void clusterRaftBeforeSleep(bool blocked) { + UNUSED(blocked); clusterRaftState *rs = RAFT_STATE(); if (rs->todo_connect_nodes) { @@ -1860,6 +1882,64 @@ static void clusterRaftBeforeSleep(void) { clusterConnectNodes(); } + if (rs->todo_save_config || + (rs->todo_persist_log && + rs->last_applied - rs->last_rewrite_applied >= RAFT_LOG_REWRITE_THRESHOLD)) { + rs->todo_save_config = 0; + rs->todo_persist_log = 0; + clusterSaveConfigOrDie(1); + rs->last_rewrite_applied = rs->last_applied; + } else if (rs->todo_persist_log) { + rs->todo_persist_log = 0; + clusterRaftPersistNewLogEntries(rs->persist_log_from); + } + + /* Singleton leader: commit after persistence (quorum = 1). */ + if (rs->role == RAFT_ROLE_LEADER && server.cluster->size <= 1 && + raftLogLastIndex() > rs->commit_index) { + rs->commit_index = raftLogLastIndex(); + while (rs->last_applied < rs->commit_index) { + rs->last_applied++; + raftLogEntry *e = raftLogGet(rs->last_applied); + if (e) raftLogApply(e); + } + /* If we just grew beyond singleton, replicate to the new peer. */ + if (server.cluster->size > 1) rs->todo_broadcast_ae = 1; + } + + if (rs->todo_send_ae_ack) { + rs->todo_send_ae_ack = 0; + clusterNode *leader = clusterLookupNode(rs->leader, CLUSTER_NAMELEN); + /* AE arrives on the inbound link (leader connects to us). */ + clusterLink *link = leader ? (leader->inbound_link ? leader->inbound_link : leader->link) : NULL; + if (link) { + clusterRaftSendAppendEntriesResponse(link, rs->current_term, 1); + } + } + + if (rs->todo_send_vote_response) { + rs->todo_send_vote_response = 0; + /* Send granted vote response to the candidate we voted for. */ + clusterNode *candidate = clusterLookupNode(rs->voted_for, CLUSTER_NAMELEN); + clusterLink *link = candidate ? (candidate->inbound_link ? candidate->inbound_link : candidate->link) : NULL; + if (link) { + clusterRaftSendVoteResponse(link, rs->current_term, 1); + } + } + + if (rs->todo_broadcast_vote_request) { + rs->todo_broadcast_vote_request = 0; + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node == myself || !node->link) continue; + if (node->flags & CLUSTER_NODE_MEET) continue; + clusterRaftSendRequestVote(node->link); + } + dictReleaseIterator(di); + } + if (rs->todo_broadcast_ae) { rs->todo_broadcast_ae = 0; if (rs->role == RAFT_ROLE_LEADER) clusterRaftBroadcastAppendEntries(); @@ -1957,6 +2037,9 @@ static void clusterRaftPrepareShutdown(void) { } static void clusterRaftHandleServerShutdown(void) { + /* Compact the log on disk so restart is fast. */ + clusterSaveConfigOrDie(1); + clusterRaftState *rs = RAFT_STATE(); /* Free pending proposals. */ listIter li; @@ -2161,6 +2244,17 @@ static void clusterRaftPostConnect(struct clusterLink *link) { * Config updates — broadcast metadata changes through Raft log * -------------------------------------------------------------------------- */ +/* Build the NODE_INFO data string for myself: "
". + * Caller must sdsfree() the result. */ +static sds clusterRaftBuildMyNodeInfo(void) { + sds s = sdscatlen(sdsempty(), myself->name, CLUSTER_NAMELEN); + s = sdscatlen(s, " ", 1); + s = clusterNodeAppendAddressStringNoShardId(s, myself, server.tls_cluster); + s = sdscatlen(s, " ", 1); + s = sdscat(s, (myself->flags & CLUSTER_NODE_NOFAILOVER) ? "nofailover" : "noflags"); + return s; +} + static void clusterRaftUpdateMyself(int old_flags) { UNUSED(old_flags); /* Clear cached CLUSTER SLOTS immediately — our address/hostname @@ -2170,16 +2264,10 @@ static void clusterRaftUpdateMyself(int old_flags) { * or if our IP is not yet known. */ if ((myself->flags & CLUSTER_NODE_MEET) || myself->ip[0] == '\0') return; /* Propose NODE_INFO to propagate the change to other nodes. */ + sds data = clusterRaftBuildMyNodeInfo(); sds entry = sdsnew("NODE_INFO "); - entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); - entry = sdscatlen(entry, " ", 1); - entry = clusterNodeAppendAddressString(entry, myself, server.tls_cluster); - entry = sdscatlen(entry, " ", 1); - if (myself->flags & CLUSTER_NODE_NOFAILOVER) { - entry = sdscat(entry, "nofailover"); - } else { - entry = sdscat(entry, "noflags"); - } + entry = sdscatsds(entry, data); + sdsfree(data); clusterRaftPropose(entry, NULL, NULL); sdsfree(entry); } @@ -2202,10 +2290,15 @@ static void clusterRaftFreeNodeData(clusterNode *node) { static sds clusterRaftAppendVarsLine(sds config) { clusterRaftState *rs = RAFT_STATE(); - config = sdscatprintf(config, "vars currentTerm %llu", - (unsigned long long)rs->current_term); + config = sdscatprintf(config, "vars currentTerm %llu lastApplied %llu", + (unsigned long long)rs->current_term, + (unsigned long long)rs->last_applied); + if (rs->voted_for[0]) { + config = sdscat(config, " votedFor "); + config = sdscatlen(config, rs->voted_for, CLUSTER_NAMELEN); + } if (rs->leader[0]) { - config = sdscatlen(config, " raftLeader ", 12); + config = sdscat(config, " raftLeader "); config = sdscatlen(config, rs->leader, CLUSTER_NAMELEN); } config = sdscatlen(config, "\n", 1); @@ -2217,6 +2310,13 @@ static int clusterRaftParseVarsLine(const char *name, const char *value) { if (!strcasecmp(name, "currentTerm")) { rs->current_term = strtoull(value, NULL, 10); return 1; + } else if (!strcasecmp(name, "lastApplied")) { + rs->last_applied = strtoull(value, NULL, 10); + rs->commit_index = rs->last_applied; + return 1; + } else if (!strcasecmp(name, "votedFor")) { + memcpy(rs->voted_for, value, CLUSTER_NAMELEN); + return 1; } else if (!strcasecmp(name, "raftLeader")) { memcpy(rs->leader, value, CLUSTER_NAMELEN); return 1; @@ -2224,8 +2324,96 @@ static int clusterRaftParseVarsLine(const char *name, const char *value) { return 0; } +/* Append uncommitted log entries (index > last_applied) as "log" lines + * at the end of nodes.conf during a full rewrite. */ +static sds clusterRaftAppendLogLines(sds config) { + clusterRaftState *rs = RAFT_STATE(); + for (uint64_t i = 0; i < rs->log_count; i++) { + raftLogEntry *e = rs->log[i]; + if (e->index <= rs->last_applied) continue; + config = sdscatprintf(config, "log %llu %llu %s %s\n", + (unsigned long long)e->index, + (unsigned long long)e->term, + raftEntryTypeName(e->type), + e->data); + } + return config; +} + +/* Append all log entries from index 'from' onwards to nodes.conf. Called + * from beforeSleep to batch multiple entries into a single write+fsync. */ +static void clusterRaftPersistNewLogEntries(uint64_t from) { + clusterRaftState *rs = RAFT_STATE(); + sds buf = sdsempty(); + for (uint64_t i = 0; i < rs->log_count; i++) { + raftLogEntry *e = rs->log[i]; + if (e->index < from) continue; + buf = sdscatprintf(buf, "log %llu %llu %s %s\n", + (unsigned long long)e->index, + (unsigned long long)e->term, + raftEntryTypeName(e->type), + e->data); + } + if (sdslen(buf) == 0) { + sdsfree(buf); + return; + } + int fd = open(server.cluster_configfile, O_WRONLY | O_APPEND); + if (fd == -1) { + serverLog(LL_WARNING, "Could not open cluster config for log append: %s", strerror(errno)); + sdsfree(buf); + return; + } + if (write(fd, buf, sdslen(buf)) == -1) { + serverLog(LL_WARNING, "Could not append log entries to cluster config: %s", strerror(errno)); + } + valkey_fsync(fd); + close(fd); + sdsfree(buf); +} + + +static void clusterRaftParseLogLine(sds *argv, int argc) { + /* Format: log */ + if (argc < 5) return; + uint64_t index = strtoull(argv[1], NULL, 10); + uint64_t term = strtoull(argv[2], NULL, 10); + int type = raftEntryTypeByName(argv[3]); + if (type < 0) return; + + /* Reconstruct data from remaining args (space-separated). */ + sds data = sdsdup(argv[4]); + for (int i = 5; i < argc; i++) { + data = sdscatlen(data, " ", 1); + data = sdscatsds(data, argv[i]); + } + + raftLogEntry *e = raftLogCreate(term, index, type, data); + raftLogAppend(e); +} + static void clusterRaftPostLoad(void) { - /* TODO: rebuild peer state from loaded nodes */ + clusterRaftState *rs = RAFT_STATE(); + /* commit_index was set to last_applied in parseVarsLine. On startup, + * the leader will update it via AE. For now, ensure it's consistent. */ + if (rs->commit_index < rs->last_applied) + rs->commit_index = rs->last_applied; + /* Log entries loaded from disk are already persisted; don't re-write. */ + rs->todo_persist_log = 0; + /* Restore cluster size from loaded nodes (NODE_JOIN already applied). */ + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (!(node->flags & CLUSTER_NODE_MEET)) server.cluster->size++; + } + dictReleaseIterator(di); + /* If we're a replica, start replication. */ + if (server.cluster->myself && + nodeIsReplica(server.cluster->myself) && + server.cluster->myself->replicaof) { + rs->todo_update_replication = 1; + } } /* -------------------------------------------------------------------------- @@ -2473,6 +2661,8 @@ static void clusterRaftApplySlotChange(sds data) { start = end = atoi(argv[i]); } for (int j = start; j <= end; j++) { + if (target == myself || server.cluster->slots[j] == myself) + RAFT_STATE()->todo_save_config = 1; if (target) { /* If this slot is moving away from myself, delete keys. */ if (server.cluster->slots[j] == myself && target != myself) { @@ -2505,6 +2695,8 @@ static void clusterRaftApplySetReplica(sds data) { clusterNode *replica = clusterLookupNode(argv[0], sdslen(argv[0])); if (!replica) goto done; + if (replica == myself) rs->todo_save_config = 1; + char *shard_id = argv[2]; if (sdslen(argv[1]) == 1 && argv[1][0] == '-') { @@ -2602,10 +2794,12 @@ static void clusterRaftApplyFailover(sds data) { /* If I'm the replica being promoted, start acting as primary. */ if (replica == myself) { rs->todo_update_replication = 1; + rs->todo_save_config = 1; } /* If I'm the old primary being demoted, start replicating. */ if (primary == myself) { rs->todo_update_replication = 1; + rs->todo_save_config = 1; } done: if (argv) sdsfreesplitres(argv, argc); @@ -2865,6 +3059,8 @@ clusterBusType clusterRaftBus = { .setNodeFailed = NULL, .appendVarsLine = clusterRaftAppendVarsLine, .parseVarsLine = clusterRaftParseVarsLine, + .parseLogLine = clusterRaftParseLogLine, + .appendLogLines = clusterRaftAppendLogLines, .postLoad = clusterRaftPostLoad, .initNodeData = clusterRaftInitNodeData, .freeNodeData = clusterRaftFreeNodeData, diff --git a/src/server.c b/src/server.c index 794bbf5bcd1..eca25e7db71 100644 --- a/src/server.c +++ b/src/server.c @@ -1871,6 +1871,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { } while (last_processed != 0); processed += freeClientsInAsyncFreeQueue(); server.events_processed_while_blocked += processed; + if (server.cluster_enabled) clusterBeforeSleep(ProcessingEventsWhileBlocked); return; } @@ -1889,7 +1890,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * may change the state of Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function, must be done before blockedBeforeSleep. */ - if (server.cluster_enabled) clusterBeforeSleep(); + if (server.cluster_enabled) clusterBeforeSleep(ProcessingEventsWhileBlocked); /* Handle blocked clients. * must be done before flushAppendOnlyFile, in case of appendfsync=always, diff --git a/tests/unit/cluster/availability-zone.tcl b/tests/unit/cluster/availability-zone.tcl index 7b960118d85..68fe3c333bf 100644 --- a/tests/unit/cluster/availability-zone.tcl +++ b/tests/unit/cluster/availability-zone.tcl @@ -148,5 +148,5 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-ping-interval } else { fail "Availability zone was not restored after restart in CLUSTER SHARDS" } - } {} {cluster-raft:skip} ;# Raft persistence not yet implemented + } } diff --git a/tests/unit/cluster/cluster-shards.tcl b/tests/unit/cluster/cluster-shards.tcl index 2525f9a85d3..a2ddf7bcfa3 100644 --- a/tests/unit/cluster/cluster-shards.tcl +++ b/tests/unit/cluster/cluster-shards.tcl @@ -84,7 +84,7 @@ proc cluster_ensure_master {id} { } # start_cluster 4 masters + 5 nodes (4 replicas + 1 standalone R8) -start_cluster 4 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts servers (needs raft persistence) +start_cluster 4 5 {tags {external:skip cluster}} { # cluster_master_nodes and cluster_replica_nodes refer to the active cluster members. set ::cluster_master_nodes 4 @@ -311,7 +311,10 @@ test "CLUSTER MYSHARDID reports same shard id after cluster restart" { for {set i 0} {$i < 8} {incr i} { assert_equal [dict get $node_ids $i] [R $i cluster myshardid] } -} +} {} {cluster-raft:skip} ;# Skipped under raft: R8 stays running while R0-R7 + # restart, inflating its term with failed elections. + # This disrupts leader election when others come back. + # Needs pre-vote (Raft §9.6) to fix. test "CLUSTER SHARDS id response validation" { # For each node in the cluster diff --git a/tests/unit/cluster/failover.tcl b/tests/unit/cluster/failover.tcl index 4a1cdbdede0..c1347b00231 100644 --- a/tests/unit/cluster/failover.tcl +++ b/tests/unit/cluster/failover.tcl @@ -165,7 +165,7 @@ start_cluster 3 1 {tags {external:skip cluster}} { # In this test a different node is killed in a loop for N # iterations. The test checks that certain properties # are preserved across iterations. -start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts servers (needs raft persistence) +start_cluster 5 5 {tags {external:skip cluster}} { set iterations 10 set cluster [valkey_cluster 127.0.0.1:[srv 0 port]] @@ -217,11 +217,11 @@ start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts } if {$role eq {master}} { - test "Wait failover by #$slave with old epoch $slave_config_epoch" { + test "Wait failover by #$slave" { wait_for_condition 1000 50 { - [CI $slave cluster_my_epoch] > $slave_config_epoch + [s -$slave role] eq {master} } else { - fail "No failover detected, epoch is still [CI $slave cluster_my_epoch]" + fail "No failover detected, slave #$slave is still a slave" } } } @@ -263,5 +263,5 @@ start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts for {set id 0} {$id < [llength $::servers]} {incr id} { assert {[CI $id cluster_current_epoch] >= [CI $id cluster_my_epoch]} } - } + } {} {cluster-raft:skip} ;# Epochs are a gossip concept } ;# start_cluster diff --git a/tests/unit/cluster/hostnames.tcl b/tests/unit/cluster/hostnames.tcl index 6ece38e8d82..dd13a607dfb 100644 --- a/tests/unit/cluster/hostnames.tcl +++ b/tests/unit/cluster/hostnames.tcl @@ -214,7 +214,7 @@ test "Test restart will keep hostname information" { # As a sanity check, make sure everyone eventually agrees wait_for_cluster_propagation -} {} {cluster-raft:skip} ;# Raft persistence not yet implemented +} test "Test hostname validation" { catch {R 0 config set cluster-announce-hostname [string repeat x 256]} err diff --git a/tests/unit/cluster/resharding.tcl b/tests/unit/cluster/resharding.tcl index a3b7561bb4c..574a066eb72 100644 --- a/tests/unit/cluster/resharding.tcl +++ b/tests/unit/cluster/resharding.tcl @@ -3,7 +3,7 @@ # that certain properties are preserved across the operation. tags {"slow valgrind:skip"} { run_solo {cluster-resharding} { -start_cluster 5 5 {tags {external:skip cluster cluster-raft:skip}} { ;# Restarts servers (needs raft persistence) +start_cluster 5 5 {tags {external:skip cluster}} { test "Enable AOF in all the instances" { for {set id 0} {$id < [llength $::servers]} {incr id} { R $id config set appendonly yes