diff --git a/design-docs/cluster-raft.md b/design-docs/cluster-raft.md index 3794f12cdf1..2da2bda9fe7 100644 --- a/design-docs/cluster-raft.md +++ b/design-docs/cluster-raft.md @@ -138,20 +138,21 @@ NODE_JOIN
via MEET. The node starts as a learner and is promoted to follower when the entry is committed. -NODE_FORGET - Remove a node from the cluster (CLUSTER FORGET). Not yet implemented. +NODE_FORGET + Remove a node from the cluster (CLUSTER FORGET). -SLOT_CHANGE [ ...] +SLOT_CHANGE [ ...] Assign or remove slot ownership. A dash means "no owner" (delete slots). Ranges use the nodes.conf format: "0-5460" or "5461". -SET_REPLICA_OF +SET_REPLICA_OF Set a node as replica of a primary (CLUSTER REPLICATE). A dash as - primary means promote to primary. The shard-id is the target shard: + primary means promote to primary. Both source and target shard epochs + are validated to guard against concurrent shard changes. for promotion, a new random id; for assignment, the primary's current shard-id (used as a guard against concurrent changes). -FAILOVER +FAILOVER The replica takes over the primary's slots and becomes primary. The old primary becomes a replica of the new primary. @@ -171,7 +172,7 @@ NODE_RECOVER ``` Ranges in SLOT_CHANGE use the same format as nodes.conf: `0-5460` or -`5461`. A dash as node-id means "no owner" (delete slots) or "no +`5461`. A dash as source/target-id means "no owner" (delete slots) or "no primary" (promote to primary). ### Why typed entries instead of a key-value store? @@ -212,9 +213,9 @@ changes are infrequent. ## PROPOSE and Leader Validation Followers forward proposals to the leader using the PROPOSE message, -sent on the outbound link to the leader. The leader always accepts -proposals without validation — it appends them to the log and -replicates them. Validation happens at apply time, where the apply +sent on the outbound link to the leader. The leader accepts +proposals with best effort pre-validations — it appends them to the log and +replicates them. Authoritative validation happens at apply time, where the apply function can detect conflicts and treat them as no-ops. This design simplifies the leader: it doesn't need to understand the @@ -723,21 +724,29 @@ reuse), but the vars and log lines are raft-specific. The file is not compatible between protocols — switching from gossip to raft (or vice versa) requires removing nodes.conf. -## Shard Epoch (not yet implemented) +## Shard Epoch -A shard-epoch is a per-shard monotonically increasing counter, bumped -on topology changes within the shard (FAILOVER, SET_REPLICA_OF, -SLOT_CHANGE). Entries that modify shard topology include the current -shard-epoch at proposal time. On apply, if the shard-epoch has -advanced, the entry is stale and becomes a no-op. +Raft ensures entries are applied in a total order, but ordering alone +is not sufficient to prevent stale mutations from corrupting cluster +state. When concurrent operations target the same shard (e.g., a slot +migration racing with a failover), a committed entry may carry +assumptions about shard topology that are no longer true by the time +it is applied. Without additional application-level state to fence +against these stale updates, the apply logic can produce +inconsistencies — such as moving a slot to a node that no longer owns +the corresponding keys. -This prevents stale entries from causing inconsistencies when -concurrent operations race in the log. Example: +A shard-epoch is a per-shard monotonically increasing counter stored +in `server.cluster->shard_epochs`. It is bumped each time membership or +leadership of the shard changes. Such entries include the shard's +current epoch at proposal time. Epoch is validated at prepare time +and at apply time. If the epoch has advanced past the value in the entry, +the entry is stale and is ignored. -``` -Slot migration racing with failover: +### Example: slot migration racing with failover -1. Atomic slot migration starts: keys transferred from shard A to B. +``` +1. Slot migration starts: keys transferred from shard A to shard B. 2. Primary of shard A fails. FAILOVER entry is proposed. 3. Migration is rolled back (keys stay on shard A's new primary). 4. SLOT_CHANGE entry (assigning slot to shard B) was proposed before @@ -748,14 +757,59 @@ Slot migration racing with failover: carries the old epoch, so it's a no-op. Slot stays on shard A. ``` -Entries that should carry a shard-epoch: -- FAILOVER (bumps epoch of the shard) -- SET_REPLICA_OF (bumps epoch when changing shard membership) -- SLOT_CHANGE (checked against source and target shard epochs) +### Entry formats with epoch + +``` +FAILOVER +SET_REPLICA_OF +SLOT_CHANGE +NODE_FORGET +``` + +SLOT_CHANGE carries two epochs because it involves two shards (source +and target). NODE_FORGET carries the epoch of the departing node's +shard to guard against removing a node whose role changed (e.g., +promoted to primary via a concurrent FAILOVER). + +### Validation + +Epoch validation happens at two points: + +1. **Pre-validation on the leader** — before appending to the log. + This is a best-effort optimization that rejects obviously stale + proposals early, saving log space and replication bandwidth. It + performs a read-only check without bumping the epoch. + +2. **Apply-time validation** — the authoritative check. Each apply + function validates the entry's epoch against the current shard + epoch. On match (or epoch 0 for a new shard), the epoch is bumped + and the entry is applied. On mismatch, the entry is a no-op and + the error is propagated to the caller's callback. + +### Retry on stale epoch + +Proposals rejected due to a stale shard epoch are automatically retried +with a fresh epoch (up to 5 attempts): + +- **SET_REPLICA_OF / NODE_FORGET / FAILOVER (force) / SLOT_CHANGE** — + the proposal is rebuilt with current epoch(s) and re-submitted. + +- **Automatic failover** — if the FAILOVER proposal is rejected, the + failover is re-scheduled (via `todo_schedule_failover`) as long as + the primary is still failed. The next attempt uses the current epoch. + For automatic failover, no cap on retry attempt to avoid leaderless shard. + +Only `STALE_SHARD_EPOCH_REJECTION_MSG` triggers retry. Other errors +(format errors, invalid state) are forwarded to the client immediately. + +When the leader rejects a forwarded proposal at pre-validation, it sends +a `REJECT retry` message back. The `retry` suffix signals +the follower that the rejection is epoch-related and eligible for retry. + +### Entries that don't carry an epoch -Entries that don't need a shard-epoch: -- NODE_FAIL / NODE_RECOVER (liveness, not topology) -- NODE_INFO / NODE_JOIN / NODE_FORGET (node-level, not shard-level) +- NODE_FAIL / NODE_RECOVER +- NODE_INFO, NODE_JOIN ## Leader Transfer diff --git a/src/cluster_raft.c b/src/cluster_raft.c index 37bbf28d401..9476ba3e334 100644 --- a/src/cluster_raft.c +++ b/src/cluster_raft.c @@ -41,9 +41,21 @@ #include +/* Generic error message returned to clients when a proposal is rejected. */ +#define GENERIC_PROPOSAL_REJECTION_MSG "proposal rejected by raft leader" +#define STALE_SHARD_EPOCH_REJECTION_MSG "proposal rejected due to stale shard epoch" + /* From module.c */ void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len); +/* Shard epoch dict type, mapping shard_id to epoch. */ +static dictType raftShardEpochDictType = { + .entryGetKey = dictEntryGetKey, + .hashFunction = dictSdsHash, + .keyCompare = dictSdsKeyCompare, + .entryDestructor = dictEntryDestructorSdsKey, +}; + /* -------------------------------------------------------------------------- * Raft log entry types — what gets replicated * -------------------------------------------------------------------------- */ @@ -177,6 +189,9 @@ typedef struct { uint64_t stats_pubsub_bytes_received; uint64_t stats_module_bytes_sent; uint64_t stats_module_bytes_received; + + /* Shard epoch tracking (per-shard monotonic counter for stale proposal detection). */ + dict *shard_epochs; } clusterRaftState; #define RAFT_STATE() ((clusterRaftState *)server.cluster->protocol_data) @@ -201,6 +216,7 @@ typedef struct { #define RAFT_HDR_SIZE 8 #define REPL_OFFSETS_BROADCAST_PERIOD_MS 10000 #define RAFT_LOG_REWRITE_THRESHOLD 100 +#define PROPOSAL_MAX_RETRIES 5 /* Monotonic millisecond clock for timeouts and failure detection. * Unlike gettimeofday(), this is not affected by system clock adjustments. */ @@ -308,6 +324,10 @@ static clusterMsgSendBlock *clusterRaftBuildAllOffsetsMsg(void) { static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, const char *error)); static void clusterRaftDeferPendingProposals(void); +static void clusterRaftCompletePendingProposal(int type, sds data, const char *error); +static int clusterRaftPreValidateEpoch(int type, sds data); +static void clusterRaftAutoFailoverCallback(void *ctx, const char *error); +static void clusterRaftSlotChange(slotRange *ranges, int numranges, clusterNode *target, void *ctx, void (*callback)(void *ctx, const char *error)); static void clusterRaftUpdateMyself(int old_flags); static sds clusterRaftBuildMyNodeInfo(void); static void clusterRaftCheckSlotCoverage(void); @@ -315,9 +335,9 @@ static void clusterRaftBroadcastAppendEntries(void); static void clusterRaftSendAppendEntries(clusterLink *link, clusterNode *node); static void clusterRaftSendPreVoteRequest(clusterLink *link, uint64_t term); static void clusterRaftUnblockMeet(clusterNode *node); -static void clusterRaftApplySlotChange(sds data); -static void clusterRaftApplySetReplica(sds data); -static void clusterRaftApplyFailover(sds data); +static const char *clusterRaftApplySlotChange(sds data); +static const char *clusterRaftApplySetReplica(sds data); +static const char *clusterRaftApplyFailover(sds data); static void raftLogApply(raftLogEntry *e); static raftLogEntry *raftLogCreate(uint64_t term, uint64_t index, uint8_t type, sds data); static void raftLogAppend(raftLogEntry *e); @@ -421,8 +441,9 @@ static void clusterRaftSelfJoin(void) { slots = sdscatfmt(slots, " %i-%i", start, j); } if (sdslen(slots) > 0) { - entry = sdsnew("SLOT_CHANGE "); + entry = sdsnew("SLOT_CHANGE - 0 "); entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); + entry = sdscat(entry, " 0"); entry = sdscatsds(entry, slots); clusterRaftPropose(entry, NULL, NULL); sdsfree(entry); @@ -719,7 +740,7 @@ static int clusterRaftProcessMeetRejected(clusterLink *link, int argc, sds *argv * * Examples: * PROPOSE NODE_JOIN
- * PROPOSE SLOT_CHANGE [ ...] + * PROPOSE SLOT_CHANGE [ ...] * -------------------------------------------------------------------------- */ static int raftEntryTypeByName(const char *name) { if (!strcasecmp(name, "NODE_JOIN")) return RAFT_ENTRY_NODE_JOIN; @@ -749,7 +770,7 @@ static const char *raftEntryTypeName(uint8_t type) { /* Propose a log entry. The entry is an sds containing the type name * followed by the data, e.g. "NODE_JOIN " or - * "SLOT_CHANGE 0-5460". On the leader, it's appended directly + * "SLOT_CHANGE 0-5460". On the leader, it's appended directly * to the log. On followers, it's forwarded to the leader. */ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, const char *error)) { clusterRaftState *rs = RAFT_STATE(); @@ -766,6 +787,15 @@ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, sds data = sp ? sdsnew(sp + 1) : sdsempty(); + if (rs->role == RAFT_ROLE_LEADER) { + /* Pre-validate epoch on the leader to reject obviously stale proposals. */ + if (!clusterRaftPreValidateEpoch(type, data)) { + if (callback) callback(ctx, STALE_SHARD_EPOCH_REJECTION_MSG); + sdsfree(data); + return; + } + } + /* Track pending proposal for retry on leader change. */ { raftPendingProposal *pp = zmalloc(sizeof(*pp)); @@ -786,7 +816,6 @@ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, memcmp(data, myself->name, CLUSTER_NAMELEN) == 0)) { clusterRaftSelfJoin(); } - uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, type, data)); serverLog(LL_NOTICE, "Leader appended %s (index %llu).", @@ -815,8 +844,165 @@ static void clusterRaftPropose(sds entry, void *ctx, void (*callback)(void *ctx, } } +/* -------------------------------------------------------------------------- + * Shard epoch helpers + * -------------------------------------------------------------------------- */ + +static uint64_t clusterGetShardEpoch(const char *shard_id) { + clusterRaftState *rs = RAFT_STATE(); + sds s = sdsnewlen(shard_id, CLUSTER_NAMELEN); + dictEntry *de = dictFind(rs->shard_epochs, s); + sdsfree(s); + return de ? dictGetUnsignedIntegerVal(de) : 0; +} + +static void clusterSetShardEpoch(const char *shard_id, uint64_t epoch) { + clusterRaftState *rs = RAFT_STATE(); + sds s = sdsnewlen(shard_id, CLUSTER_NAMELEN); + dictEntry *de = dictAddOrFind(rs->shard_epochs, s); + if (dictGetKey(de) != s) { + sdsfree(s); + } + dictSetUnsignedIntegerVal(de, epoch); +} + +static int isShardEpochCurrent(const char *shard_id, uint64_t entry_epoch) { + uint64_t current = clusterGetShardEpoch(shard_id); + return (current == 0 || entry_epoch == current); +} + +/* Parsed fields from a SLOT_CHANGE entry's epoch-related data. + * Format: " " */ +typedef struct { + clusterNode *target; /* Target node (NULL if dash). */ + clusterNode *source_owner; /* Source node (NULL if dash). */ + const char *source_shard_id; /* source_owner->shard_id or NULL. */ + const char *target_shard_id; /* target->shard_id or NULL. */ + uint64_t source_epoch; + uint64_t target_epoch; + int range_end; /* Index in argv where ranges begin (first range element). */ +} slotChangeEpochInfo; + +/* Parse fields from a SLOT_CHANGE entry's split argv. + * Format: " " + * Caller must have verified argc >= 5. + * Returns true if parsing succeeded, false otherwise. */ +static bool parseSlotChangeEpochs(sds *argv, int argc, slotChangeEpochInfo *info) { + memset(info, 0, sizeof(*info)); + + /* argv[0] = source-id-or-dash */ + info->source_owner = (sdslen(argv[0]) == CLUSTER_NAMELEN) + ? clusterLookupNode(argv[0], CLUSTER_NAMELEN) + : NULL; + /* argv[1] = source-epoch */ + info->source_epoch = strtoull(argv[1], NULL, 10); + /* argv[2] = target-id-or-dash */ + info->target = (sdslen(argv[2]) == CLUSTER_NAMELEN) + ? clusterLookupNode(argv[2], CLUSTER_NAMELEN) + : NULL; + /* argv[3] = target-epoch */ + info->target_epoch = strtoull(argv[3], NULL, 10); + /* argv[4..argc-1] = ranges */ + info->range_end = 4; + + info->source_shard_id = info->source_owner ? info->source_owner->shard_id : NULL; + info->target_shard_id = info->target ? info->target->shard_id : NULL; + return (argc > 4); +} + +/* Pre-validate shard epoch on the leader before appending to the log. + * Rejects stale proposals early to avoid wasting log space. + * Returns 1 if valid, 0 if stale. */ +static int clusterRaftPreValidateEpoch(int type, sds data) { + int argc; + sds *argv = sdssplitlen(data, sdslen(data), " ", 1, &argc); + if (!argv) return 1; + + int ok = 1; + switch (type) { + case RAFT_ENTRY_FAILOVER: { + /* Format: */ + if (argc < 4) { + ok = 0; + } else { + clusterNode *primary = (sdslen(argv[1]) == CLUSTER_NAMELEN) + ? clusterLookupNode(argv[1], CLUSTER_NAMELEN) + : NULL; + if (primary && memcmp(primary->shard_id, argv[2], CLUSTER_NAMELEN) != 0) { + ok = 0; + } else { + uint64_t epoch = strtoull(argv[3], NULL, 10); + ok = isShardEpochCurrent(argv[2], epoch); + } + } + break; + } + case RAFT_ENTRY_SET_REPLICA_OF: { + /* Format: */ + if (argc < 6) { + ok = 0; + } else { + /* Validate source shard epoch. */ + uint64_t source_epoch = strtoull(argv[2], NULL, 10); + ok = isShardEpochCurrent(argv[1], source_epoch); + /* Validate target shard epoch. */ + if (ok) { + uint64_t target_epoch = strtoull(argv[5], NULL, 10); + ok = isShardEpochCurrent(argv[4], target_epoch); + } + /* Validate primary's shard matches target-shard. */ + if (ok && sdslen(argv[3]) == CLUSTER_NAMELEN) { + clusterNode *primary = clusterLookupNode(argv[3], CLUSTER_NAMELEN); + if (primary && memcmp(primary->shard_id, argv[4], CLUSTER_NAMELEN) != 0) { + ok = 0; + } + } + } + break; + } + case RAFT_ENTRY_SLOT_CHANGE: { + /* Format: */ + if (argc < 5) { + ok = 0; + } else { + slotChangeEpochInfo info; + if (parseSlotChangeEpochs(argv, argc, &info)) { + if (info.source_shard_id) { + ok = isShardEpochCurrent(info.source_shard_id, info.source_epoch); + } + if (ok && info.target_shard_id) { + ok = isShardEpochCurrent(info.target_shard_id, info.target_epoch); + } + } + } + break; + } + case RAFT_ENTRY_NODE_FORGET: { + /* Format: */ + if (argc < 2) { + ok = 0; + } else { + clusterNode *node = clusterLookupNode(argv[0], sdslen(argv[0])); + if (node) { + uint64_t epoch = strtoull(argv[1], NULL, 10); + ok = isShardEpochCurrent(node->shard_id, epoch); + } + } + break; + } + default: + break; + } + + sdsfreesplitres(argv, argc); + if (!ok) { + serverLog(LL_DEBUG, "Leader pre-validation: rejecting %s proposal (stale epoch).", + raftEntryTypeName(type)); + } + return ok; +} + static int clusterRaftProcessPropose(clusterLink *link, int argc, sds *argv) { - UNUSED(link); clusterRaftState *rs = RAFT_STATE(); /* argv[0]="PROPOSE", argv[1..] is the entry (type + data). */ @@ -834,6 +1020,23 @@ static int clusterRaftProcessPropose(clusterLink *link, int argc, sds *argv) { /* Data is everything after the type name. */ sds data = (argc >= 3) ? sdsjoinsds(argv + 2, argc - 2, " ", 1) : sdsempty(); + /* Pre-validate epoch on the leader to reject obviously stale proposals. */ + if (!clusterRaftPreValidateEpoch(type, data)) { + /* Send REJECT back to the proposing follower so it can unblock the client. + * Append "retry" hint so the follower knows it can retry with a fresh epoch. */ + if (link) { + sds msg = wireNewMsg("REJECT"); + msg = sdscatlen(msg, " ", 1); + msg = sdscatlen(msg, entry, sdslen(entry)); + msg = sdscat(msg, " retry"); + msg = wireFinishMsg(msg); + clusterRaftSendMsg(link, msg); + } + sdsfree(data); + sdsfree(entry); + return 1; + } + uint64_t idx = raftLogLastIndex() + 1; raftLogAppend(raftLogCreate(rs->current_term, idx, type, data)); serverLog(LL_NOTICE, "Leader appended proposed %s (index %llu).", @@ -849,6 +1052,33 @@ static int clusterRaftProcessPropose(clusterLink *link, int argc, sds *argv) { return 1; } +/* Handle a REJECT message from the leader. The leader sends this when it + * rejects a forwarded PROPOSE. We match it against our + * pending_proposals and fire the callback with an error so the client + * gets an immediate reply instead of hanging until timeout. + * Format: "REJECT [retry]" (echoes back the original entry). + * If the last token is "retry", the rejection is retryable. */ +static int clusterRaftProcessReject(clusterLink *link, int argc, sds *argv) { + UNUSED(link); + + /* argv[0]="REJECT", argv[1]=type, argv[2..]=data, optional last="retry" */ + if (argc < 2) return 1; + + int type = raftEntryTypeByName(argv[1]); + if (type < 0) return 1; + + /* Check if the last token is the "retry" hint. */ + int retryable = (argc >= 3 && sdslen(argv[argc - 1]) == 5 && + memcmp(argv[argc - 1], "retry", 5) == 0); + int data_argc = retryable ? argc - 3 : argc - 2; + + sds data = (data_argc > 0) ? sdsjoinsds(argv + 2, data_argc, " ", 1) : sdsempty(); + const char *error = retryable ? STALE_SHARD_EPOCH_REJECTION_MSG : GENERIC_PROPOSAL_REJECTION_MSG; + clusterRaftCompletePendingProposal(type, data, error); + sdsfree(data); + return 1; +} + /* -------------------------------------------------------------------------- * Raft election and heartbeat * -------------------------------------------------------------------------- */ @@ -957,9 +1187,58 @@ static uint64_t raftLogTermAt(uint64_t index) { return e ? e->term : 0; } +/* -------------------------------------------------------------------------- + * Shard epoch validation helpers + * -------------------------------------------------------------------------- */ + +/* Validate a pair of shard epochs (source and target). + * Returns 1 if both are current, 0 if either is stale. + * Either shard_id may be NULL (skipped). Does not bump. */ +static int clusterValidateShardEpochPair(const char *source_shard_id, + uint64_t source_epoch, + const char *target_shard_id, + uint64_t target_epoch) { + uint64_t src_current = source_shard_id ? clusterGetShardEpoch(source_shard_id) : 0; + uint64_t tgt_current = target_shard_id ? clusterGetShardEpoch(target_shard_id) : 0; + + /* Check source. */ + if (source_shard_id && src_current > 0 && source_epoch != src_current) { + return 0; + } + /* Check target. */ + if (target_shard_id && tgt_current > 0 && target_epoch != tgt_current) { + return 0; + } + + return 1; +} + +/* Find a pending proposal matching type+data, fire its callback, and remove it. + * Called both when an entry is applied (from raftLogApply) and when the leader + * sends a REJECT for a forwarded proposal. */ +static void clusterRaftCompletePendingProposal(int type, sds data, const char *error) { + clusterRaftState *rs = RAFT_STATE(); + if (listLength(rs->pending_proposals) == 0) return; + + listIter li; + listNode *ln; + listRewind(rs->pending_proposals, &li); + while ((ln = listNext(&li)) != NULL) { + raftPendingProposal *pp = listNodeValue(ln); + if (pp->type == type && !sdscmp(pp->data, data)) { + if (pp->callback) pp->callback(pp->ctx, error); + sdsfree(pp->data); + zfree(pp); + listDelNode(rs->pending_proposals, ln); + break; + } + } +} + /* Apply a committed log entry. */ static void raftLogApply(raftLogEntry *e) { clusterRaftState *rs = RAFT_STATE(); + const char *entry_error = NULL; switch (e->type) { case RAFT_ENTRY_NODE_JOIN: { /* data: "
" */ @@ -1033,23 +1312,27 @@ static void raftLogApply(raftLogEntry *e) { /* Propose SLOT_CHANGE for slots assigned before joining * the cluster (from our singleton state). */ - sds entry = sdsnew("SLOT_CHANGE "); - entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); int count = 0; + sds slot_range = sdsempty(); for (int j = 0; j < CLUSTER_SLOTS; j++) { if (server.cluster->slots[j] != myself) continue; int start = j; while (j + 1 < CLUSTER_SLOTS && server.cluster->slots[j + 1] == myself) j++; if (j == start) - entry = sdscatfmt(entry, " %i", start); + slot_range = sdscatfmt(slot_range, " %i", start); else - entry = sdscatfmt(entry, " %i-%i", start, j); + slot_range = sdscatfmt(slot_range, " %i-%i", start, j); count++; } if (count > 0) { + sds entry = sdsnew("SLOT_CHANGE - 0 "); + entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); + entry = sdscat(entry, " 0"); + entry = sdscatsds(entry, slot_range); clusterRaftPropose(entry, NULL, NULL); + sdsfree(entry); } - sdsfree(entry); + sdsfree(slot_range); } } @@ -1065,26 +1348,59 @@ static void raftLogApply(raftLogEntry *e) { if (argv) sdsfreesplitres(argv, argc); break; } - case RAFT_ENTRY_SLOT_CHANGE: - clusterRaftApplySlotChange(e->data); - rs->todo_update_slot_coverage = 1; - rs->todo_invalidate_slots_cache = 1; - serverLog(LL_NOTICE, "Applied SLOT_CHANGE (index %llu).", (unsigned long long)e->index); + case RAFT_ENTRY_SLOT_CHANGE: { + entry_error = clusterRaftApplySlotChange(e->data); + if (!entry_error) { + rs->todo_update_slot_coverage = 1; + rs->todo_invalidate_slots_cache = 1; + } + serverLog(LL_NOTICE, "Applied SLOT_CHANGE (index %llu)%s.", (unsigned long long)e->index, + entry_error ? " [stale]" : ""); break; - case RAFT_ENTRY_SET_REPLICA_OF: - clusterRaftApplySetReplica(e->data); - rs->todo_invalidate_slots_cache = 1; - serverLog(LL_NOTICE, "Applied SET_REPLICA_OF (index %llu).", (unsigned long long)e->index); + } + case RAFT_ENTRY_SET_REPLICA_OF: { + entry_error = clusterRaftApplySetReplica(e->data); + if (!entry_error) { + rs->todo_invalidate_slots_cache = 1; + } + serverLog(LL_NOTICE, "Applied SET_REPLICA_OF (index %llu)%s.", (unsigned long long)e->index, + entry_error ? " [stale]" : ""); break; - case RAFT_ENTRY_FAILOVER: - clusterRaftApplyFailover(e->data); - rs->todo_update_slot_coverage = 1; - rs->todo_invalidate_slots_cache = 1; - serverLog(LL_NOTICE, "Applied FAILOVER (index %llu).", (unsigned long long)e->index); + } + case RAFT_ENTRY_FAILOVER: { + entry_error = clusterRaftApplyFailover(e->data); + if (!entry_error) { + rs->todo_update_slot_coverage = 1; + rs->todo_invalidate_slots_cache = 1; + } + serverLog(LL_NOTICE, "Applied FAILOVER (index %llu)%s.", (unsigned long long)e->index, + entry_error ? " [stale]" : ""); break; + } case RAFT_ENTRY_NODE_FORGET: { - clusterNode *node = clusterLookupNode(e->data, sdslen(e->data)); + int argc; + sds *argv = sdssplitlen(e->data, sdslen(e->data), " ", 1, &argc); + if (!argv || argc < 2) { + if (argv) sdsfreesplitres(argv, argc); + break; + } + clusterNode *node = clusterLookupNode(argv[0], sdslen(argv[0])); if (node && node != myself) { + uint64_t epoch = strtoull(argv[1], NULL, 10); + if (!isShardEpochCurrent(node->shard_id, epoch)) { + entry_error = STALE_SHARD_EPOCH_REJECTION_MSG; + sdsfreesplitres(argv, argc); + break; + } + if (nodeIsPrimary(node) && (node->num_replicas > 0 || node->numslots > 0)) { + entry_error = "Can't forget a primary with replicas or assigned slots."; + sdsfreesplitres(argv, argc); + break; + } + /* Save shard_id before deleting the node. */ + char shard_id[CLUSTER_NAMELEN]; + memcpy(shard_id, node->shard_id, CLUSTER_NAMELEN); + /* Detach link before deleting so clusterReadHandler can detect * that the node it was talking to is gone. */ if (node->link) { @@ -1094,9 +1410,14 @@ static void raftLogApply(raftLogEntry *e) { clusterDelNode(node); rs->todo_update_slot_coverage = 1; rs->todo_invalidate_slots_cache = 1; + + /* Bump shard epoch after successful delete. */ + uint64_t current = clusterGetShardEpoch(shard_id); + clusterSetShardEpoch(shard_id, current == 0 ? 1 : current + 1); } serverLog(LL_NOTICE, "Applied NODE_FORGET %.40s (index %llu).", - e->data, (unsigned long long)e->index); + argv[0], (unsigned long long)e->index); + sdsfreesplitres(argv, argc); break; } case RAFT_ENTRY_NODE_FAIL: { @@ -1143,8 +1464,9 @@ static void raftLogApply(raftLogEntry *e) { * managed by NODE_JOIN and SET_REPLICA_OF entries. */ int argc; sds *argv = sdssplitlen(e->data, sdslen(e->data), " ", 1, &argc); - if (argv && argc >= 2 && sdslen(argv[0]) == CLUSTER_NAMELEN) { + if (argv && argc >= 3 && sdslen(argv[0]) == CLUSTER_NAMELEN) { clusterNode *node = clusterLookupNode(argv[0], CLUSTER_NAMELEN); + if (node && node != myself) { /* Reset optional fields so absent aux fields get cleared. */ node->announce_client_tcp_port = 0; @@ -1183,22 +1505,13 @@ static void raftLogApply(raftLogEntry *e) { break; } - /* Check pending proposals for a match and remove it. */ - if (listLength(rs->pending_proposals) > 0) { - listIter li; - listNode *ln; - listRewind(rs->pending_proposals, &li); - while ((ln = listNext(&li)) != NULL) { - raftPendingProposal *pp = listNodeValue(ln); - if (pp->type == e->type && !sdscmp(pp->data, e->data)) { - if (pp->callback) pp->callback(pp->ctx, NULL); - sdsfree(pp->data); - zfree(pp); - listDelNode(rs->pending_proposals, ln); - break; - } - } + if (entry_error) { + serverLog(LL_DEBUG, "Proposal rejected at apply: %s (type %s, index %llu).", + entry_error, raftEntryTypeName(e->type), (unsigned long long)e->index); } + + /* Check pending proposals for a match and remove it. */ + clusterRaftCompletePendingProposal(e->type, e->data, entry_error); } /* -------------------------------------------------------------------------- @@ -1699,6 +2012,7 @@ static void clusterRaftInit(void) { rs->last_node_info_check = monotonicMs(); rs->last_repl_offsets_broadcast = monotonicMs(); rs->todo_update_slot_coverage = 1; + rs->shard_epochs = dictCreate(&raftShardEpochDictType); server.cluster->size = 0; /* Incremented by NODE_JOIN apply */ } @@ -1837,6 +2151,9 @@ static void clusterRaftRetryProposals(void) { } serverLog(LL_NOTICE, "Forwarded %lu deferred proposals to leader.", listLength(rs->pending_proposals)); + } else { + /* Leader link not available yet — retry on next cycle. */ + rs->todo_retry_proposals = 1; } } } @@ -1972,6 +2289,10 @@ static void clusterRaftCron(void) { entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); entry = sdscatlen(entry, " ", 1); entry = sdscatlen(entry, primary->name, CLUSTER_NAMELEN); + entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, primary->shard_id, CLUSTER_NAMELEN); + uint64_t epoch = clusterGetShardEpoch(primary->shard_id); + entry = sdscatfmt(entry, " %U", (unsigned long long)epoch); clusterRaftPropose(entry, rs->mf_ctx, rs->mf_callback); sdsfree(entry); rs->mf_end = 0; @@ -1991,7 +2312,11 @@ static void clusterRaftCron(void) { entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); entry = sdscatlen(entry, " ", 1); entry = sdscatlen(entry, primary->name, CLUSTER_NAMELEN); - clusterRaftPropose(entry, NULL, NULL); + entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, primary->shard_id, CLUSTER_NAMELEN); + uint64_t epoch = clusterGetShardEpoch(primary->shard_id); + entry = sdscatfmt(entry, " %U", (unsigned long long)epoch); + clusterRaftPropose(entry, NULL, clusterRaftAutoFailoverCallback); sdsfree(entry); } } @@ -2205,6 +2530,7 @@ static void clusterRaftHandleServerShutdown(void) { } zfree(rs->log); sdsfree(rs->my_last_committed_info); + dictRelease(rs->shard_epochs); zfree(rs); server.cluster->protocol_data = NULL; } @@ -2258,6 +2584,8 @@ static int clusterRaftProcessMessage(struct clusterLink *link) { ret = clusterRaftProcessMeetRejected(link, argc, argv); } else if (!strcasecmp(argv[0], "PROPOSE")) { ret = clusterRaftProcessPropose(link, argc, argv); + } else if (!strcasecmp(argv[0], "REJECT")) { + ret = clusterRaftProcessReject(link, argc, argv); } else if (!strcasecmp(argv[0], "AE")) { ret = clusterRaftProcessAppendEntries(link, argc, argv, lines + 1, line_count - 1); } else if (!strcasecmp(argv[0], "AE_ACK")) { @@ -2430,6 +2758,25 @@ static void clusterRaftFreeNodeData(clusterNode *node) { zfree(node->protocol_data); } +/* For raft, the config_epoch field in nodes.conf stores the shard epoch. + * ping_sent and pong_received have no meaning in raft (no gossip). */ +static void clusterRaftGetNodePingPongEpoch(clusterNode *node, long long *ping_sent, long long *pong_received, uint64_t *config_epoch) { + *ping_sent = 0; + *pong_received = 0; + *config_epoch = clusterGetShardEpoch(node->shard_id); +} + +/* On load, the config_epoch field is the shard epoch for this node's shard. + * We use it to restore the shard_epochs dict. ping/pong are ignored. */ +static void clusterRaftSetNodePingPongEpoch(clusterNode *node, int ping_active, int pong_active, uint64_t shard_epoch) { + UNUSED(ping_active); + UNUSED(pong_active); + uint64_t current = clusterGetShardEpoch(node->shard_id); + if (shard_epoch > current) { + clusterSetShardEpoch(node->shard_id, shard_epoch); + } +} + static sds clusterRaftAppendVarsLine(sds config) { clusterRaftState *rs = RAFT_STATE(); config = sdscatprintf(config, "vars currentTerm %llu lastApplied %llu", @@ -2725,15 +3072,28 @@ typedef struct { void *orig_ctx; void (*orig_callback)(void *orig_ctx, const char *error); clusterNode *target; + slotRange *ranges; + int numranges; + int retries; } slotChangeCallbackCtx; static void clusterRaftSlotChangeApplyCallback(void *ctx, const char *error) { slotChangeCallbackCtx *sc = (slotChangeCallbackCtx *)ctx; - if (clusterNodeGetPrimary(myself)->numslots == 0 && sc->target && !error) { + if (!error && clusterNodeGetPrimary(myself)->numslots == 0 && sc->target) { clusterHandleLostLastSlot(sc->target); } + if (error && strcmp(error, STALE_SHARD_EPOCH_REJECTION_MSG) == 0 && sc->retries > 0) { + sc->retries--; + /* Retry: re-invoke slot change with fresh epochs. */ + clusterRaftSlotChange(sc->ranges, sc->numranges, sc->target, + sc->orig_ctx, sc->orig_callback); + zfree(sc->ranges); + zfree(sc); + return; + } if (sc->orig_callback) sc->orig_callback(sc->orig_ctx, error); - zfree(ctx); + zfree(sc->ranges); + zfree(sc); } /* Invoked for slot assignments and slot migrations (ADDSLOTS, SETSLOT, etc.) */ @@ -2760,9 +3120,17 @@ static void clusterRaftSlotChange(slotRange *ranges, int numranges, clusterNode return; } - /* Build entry: "SLOT_CHANGE " */ + /* Build entry: "SLOT_CHANGE " */ + clusterNode *source_owner = server.cluster->slots[ranges[0].start_slot]; + serverAssert(source_owner || target); /* At least one side must be set. */ + uint64_t source_epoch = (source_owner) ? clusterGetShardEpoch(source_owner->shard_id) : 0; + uint64_t target_epoch = target ? clusterGetShardEpoch(target->shard_id) : 0; + sds entry = sdsnew("SLOT_CHANGE "); + entry = sdscatlen(entry, source_owner ? source_owner->name : "-", source_owner ? CLUSTER_NAMELEN : 1); + entry = sdscatfmt(entry, " %U ", (unsigned long long)source_epoch); entry = sdscatlen(entry, target ? target->name : "-", target ? CLUSTER_NAMELEN : 1); + entry = sdscatfmt(entry, " %U", (unsigned long long)target_epoch); for (int i = 0; i < numranges; i++) { if (ranges[i].start_slot == ranges[i].end_slot) entry = sdscatfmt(entry, " %i", ranges[i].start_slot); @@ -2778,22 +3146,36 @@ static void clusterRaftSlotChange(slotRange *ranges, int numranges, clusterNode sc->orig_ctx = ctx; sc->orig_callback = callback; sc->target = target; + sc->ranges = zmalloc(sizeof(slotRange) * numranges); + memcpy(sc->ranges, ranges, sizeof(slotRange) * numranges); + sc->numranges = numranges; + sc->retries = PROPOSAL_MAX_RETRIES; clusterRaftPropose(entry, sc, &clusterRaftSlotChangeApplyCallback); sdsfree(entry); } -/* Apply a SLOT_CHANGE entry. Format: " [ ...]" - * Ranges use the same format as nodes.conf: "0-5460" or "5461". */ -static void clusterRaftApplySlotChange(sds data) { +/* Apply a SLOT_CHANGE entry. Format: " " + * Ranges use the same format as nodes.conf: "0-5460" or "5461". + * Returns NULL on success, or a error string describing the failure. */ +static const char *clusterRaftApplySlotChange(sds data) { + const char *error = NULL; int argc; sds *argv = sdssplitlen(data, sdslen(data), " ", 1, &argc); - if (!argv || argc < 2) goto done; + /* Need at least: source + source_epoch + target + target_epoch + one_range = 5 */ + if (!argv || argc < 5) goto reject; - clusterNode *target = (sdslen(argv[0]) == CLUSTER_NAMELEN) - ? clusterLookupNode(argv[0], CLUSTER_NAMELEN) - : NULL; + slotChangeEpochInfo info; + if (!parseSlotChangeEpochs(argv, argc, &info)) goto reject; - for (int i = 1; i < argc; i++) { + /* Epoch validation: validate both source and target epochs. */ + if (!clusterValidateShardEpochPair(info.source_shard_id, info.source_epoch, + info.target_shard_id, info.target_epoch)) { + error = STALE_SHARD_EPOCH_REJECTION_MSG; + goto reject; + } + + /* Range fields are argv[range_end] through argv[argc-1]. */ + for (int i = info.range_end; i < argc; i++) { int start, end; char *p = strchr(argv[i], '-'); if (p) { @@ -2804,17 +3186,17 @@ static void clusterRaftApplySlotChange(sds data) { start = end = atoi(argv[i]); } for (int j = start; j <= end; j++) { - if (target == myself || server.cluster->slots[j] == myself) + if (info.target == myself || server.cluster->slots[j] == myself) RAFT_STATE()->todo_save_config = 1; - if (target) { + if (info.target) { /* If this slot is moving away from myself, delete keys. */ - if (server.cluster->slots[j] == myself && target != myself) { + if (server.cluster->slots[j] == myself && info.target != myself) { serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s", j, myself->name); delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false); } if (server.cluster->slots[j]) clusterDelSlot(j); - clusterAddSlot(target, j); + clusterAddSlot(info.target, j); } else { if (server.cluster->slots[j] == myself) { delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false); @@ -2823,27 +3205,44 @@ static void clusterRaftApplySlotChange(sds data) { } } } + goto done; + +reject: + if (!error) error = GENERIC_PROPOSAL_REJECTION_MSG; done: if (argv) sdsfreesplitres(argv, argc); + return error; } /* Apply a SET_REPLICA_OF entry. - * Format: " " */ -static void clusterRaftApplySetReplica(sds data) { + * Format: " " + * Returns NULL on success, or a error string describing the failure. */ +static const char *clusterRaftApplySetReplica(sds data) { clusterRaftState *rs = RAFT_STATE(); + const char *error = NULL; int argc; sds *argv = sdssplitlen(data, sdslen(data), " ", 1, &argc); - if (!argv || argc != 3) goto done; + if (!argv || argc != 6) goto reject; clusterNode *replica = clusterLookupNode(argv[0], sdslen(argv[0])); - if (!replica) goto done; + if (!replica) goto reject; - if (replica == myself) rs->todo_save_config = 1; + char *source_shard = argv[1]; + uint64_t source_epoch = strtoull(argv[2], NULL, 10); + char *target_shard = argv[4]; + uint64_t target_epoch = strtoull(argv[5], NULL, 10); - char *shard_id = argv[2]; + /* Validate both shard epochs without bumping. */ + if (!clusterValidateShardEpochPair(source_shard, source_epoch, + target_shard, target_epoch)) { + error = STALE_SHARD_EPOCH_REJECTION_MSG; + goto reject; + } + + if (replica == myself) rs->todo_save_config = 1; - if (sdslen(argv[1]) == 1 && argv[1][0] == '-') { - /* Promote to primary with the shard-id from the entry. */ + if (sdslen(argv[3]) == 1 && argv[3][0] == '-') { + /* Promote to primary with the target-shard from the entry. */ if (nodeIsReplica(replica)) { if (replica->replicaof) clusterNodeRemoveReplica(replica->replicaof, replica); replica->flags &= ~CLUSTER_NODE_REPLICA; @@ -2852,13 +3251,13 @@ static void clusterRaftApplySetReplica(sds data) { if (replica == myself) rs->todo_update_replication = 1; } clusterRemoveNodeFromShard(replica); - memcpy(replica->shard_id, shard_id, CLUSTER_NAMELEN); - clusterAddNodeToShard(shard_id, replica); + memcpy(replica->shard_id, target_shard, CLUSTER_NAMELEN); + clusterAddNodeToShard(target_shard, replica); } else { - clusterNode *primary = clusterLookupNode(argv[1], sdslen(argv[1])); - if (!primary) goto done; + clusterNode *primary = clusterLookupNode(argv[3], sdslen(argv[3])); + if (!primary) goto reject; /* Guard: skip if the primary's shard-id has changed. */ - if (memcmp(primary->shard_id, shard_id, CLUSTER_NAMELEN) != 0) goto done; + if (memcmp(primary->shard_id, target_shard, CLUSTER_NAMELEN) != 0) goto reject; if (replica == myself) { /* Update cluster state; actual replication change deferred to beforeSleep. */ if (myself->replicaof) clusterNodeRemoveReplica(myself->replicaof, myself); @@ -2882,23 +3281,49 @@ static void clusterRaftApplySetReplica(sds data) { clusterAddNodeToShard(primary->shard_id, replica); } } + + /* Bump both shard epochs after successful apply. */ + clusterSetShardEpoch(source_shard, source_epoch + 1); + clusterSetShardEpoch(target_shard, target_epoch + 1); + goto done; + +reject: + if (!error) error = GENERIC_PROPOSAL_REJECTION_MSG; done: if (argv) sdsfreesplitres(argv, argc); + return error; } -/* Apply a FAILOVER entry. Format: " " +/* Apply a FAILOVER entry. Format: " " * The replica takes over the primary's slots and becomes primary. - * The old primary becomes a replica of the new primary. */ -static void clusterRaftApplyFailover(sds data) { + * The old primary becomes a replica of the new primary. + * Returns NULL on success, or a error string describing the failure. */ +static const char *clusterRaftApplyFailover(sds data) { clusterRaftState *rs = RAFT_STATE(); + const char *error = NULL; int argc; sds *argv = sdssplitlen(data, sdslen(data), " ", 1, &argc); - if (!argv || argc != 2) goto done; + if (!argv || argc != 4) goto reject; clusterNode *replica = clusterLookupNode(argv[0], sdslen(argv[0])); clusterNode *primary = clusterLookupNode(argv[1], sdslen(argv[1])); - if (!replica || !primary) goto done; - if (!nodeIsReplica(replica) || nodeIsReplica(primary) || replica->replicaof != primary) goto done; + if (!replica || !primary) goto reject; + + /* Validate that the primary still belongs to the shard claimed in the entry. */ + if (memcmp(primary->shard_id, argv[2], CLUSTER_NAMELEN) != 0) { + serverLog(LL_WARNING, "FAILOVER rejected: primary %.40s shard mismatch (expected %.40s, got %.40s).", + primary->name, argv[2], primary->shard_id); + goto reject; + } + + /* Epoch validation: validate against the shard-id from the entry (no bump yet). */ + uint64_t shard_epoch = strtoull(argv[3], NULL, 10); + if (!isShardEpochCurrent(argv[2], shard_epoch)) { + error = STALE_SHARD_EPOCH_REJECTION_MSG; + goto reject; + } + + if (!nodeIsReplica(replica) || nodeIsReplica(primary) || replica->replicaof != primary) goto reject; /* Transfer slots from old primary to new primary. */ for (int j = 0; j < CLUSTER_SLOTS; j++) { @@ -2944,35 +3369,214 @@ static void clusterRaftApplyFailover(sds data) { rs->todo_update_replication = 1; rs->todo_save_config = 1; } + + /* Bump shard epoch after successful apply. */ + clusterSetShardEpoch(argv[2], shard_epoch + 1); + goto done; + +reject: + if (!error) error = GENERIC_PROPOSAL_REJECTION_MSG; done: if (argv) sdsfreesplitres(argv, argc); + return error; +} + +/* -------------------------------------------------------------------------- + * Proposal retry on stale shard epoch + * + * REPLICATE and FORGET operations can be safely retried when rejected due to + * a stale shard epoch (a concurrent operation bumped the epoch). The retry + * rebuilds the proposal with a fresh epoch. Max retries: 5. + * -------------------------------------------------------------------------- */ +typedef struct { + void *client_ctx; /* Original blocked client handle. */ + void (*client_callback)(void *, const char *); /* Original completion callback. */ + int retries; /* Remaining retry attempts. */ + /* FORGET-specific fields. */ + char node_id[CLUSTER_NAMELEN]; + /* SET_REPLICA_OF-specific fields. */ + char primary_name[CLUSTER_NAMELEN]; /* Primary node name (or "-" if promotion). */ + int has_primary; /* 1 if replicating, 0 if promoting. */ +} proposalRetryCtx; + +/* Callback for automatic failover proposals. On rejection (stale epoch or any + * reason), re-schedule the failover if the primary is still failed. The next + * attempt will rebuild the proposal with a fresh shard epoch. */ +static void clusterRaftAutoFailoverCallback(void *ctx, const char *error) { + UNUSED(ctx); + if (!error) return; /* Success — nothing to do. */ + + clusterNode *myself = getMyClusterNode(); + if (nodeIsReplica(myself) && myself->replicaof && + nodeFailed(myself->replicaof) && !nodeCantFailover(myself)) { + RAFT_STATE()->todo_schedule_failover = 1; + serverLog(LL_NOTICE, "Automatic failover proposal rejected (%s), re-scheduling.", error); + } +} + +static void clusterRaftForgetNodeRetryCallback(void *ctx, const char *error) { + proposalRetryCtx *rc = (proposalRetryCtx *)ctx; + if (error && strcmp(error, STALE_SHARD_EPOCH_REJECTION_MSG) == 0 && rc->retries > 0) { + rc->retries--; + /* Rebuild proposal with fresh epoch. */ + clusterNode *node = clusterLookupNode(rc->node_id, CLUSTER_NAMELEN); + if (!node) { + /* Node already gone — treat as success. */ + if (rc->client_callback) rc->client_callback(rc->client_ctx, NULL); + zfree(rc); + return; + } + sds entry = sdsnew("NODE_FORGET "); + entry = sdscatlen(entry, rc->node_id, CLUSTER_NAMELEN); + uint64_t epoch = clusterGetShardEpoch(node->shard_id); + entry = sdscatfmt(entry, " %U", (unsigned long long)epoch); + clusterRaftPropose(entry, rc, clusterRaftForgetNodeRetryCallback); + sdsfree(entry); + return; + } + /* No retry — forward result to the original callback. */ + if (rc->client_callback) rc->client_callback(rc->client_ctx, error); + zfree(rc); +} + +static void clusterRaftSetReplicaOfRetryCallback(void *ctx, const char *error) { + proposalRetryCtx *rc = (proposalRetryCtx *)ctx; + if (error && strcmp(error, STALE_SHARD_EPOCH_REJECTION_MSG) == 0 && rc->retries > 0) { + rc->retries--; + /* Rebuild proposal with fresh epochs. */ + clusterNode *primary = rc->has_primary + ? clusterLookupNode(rc->primary_name, CLUSTER_NAMELEN) + : NULL; + if (rc->has_primary && !primary) { + if (rc->client_callback) rc->client_callback(rc->client_ctx, "target primary no longer exists"); + zfree(rc); + return; + } + uint64_t source_epoch = clusterGetShardEpoch(myself->shard_id); + char target_shard[CLUSTER_NAMELEN]; + uint64_t target_epoch; + if (primary) { + memcpy(target_shard, primary->shard_id, CLUSTER_NAMELEN); + target_epoch = clusterGetShardEpoch(primary->shard_id); + } else { + getRandomHexChars(target_shard, CLUSTER_NAMELEN); + target_epoch = 0; + } + sds entry = sdsnew("SET_REPLICA_OF "); + entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); + entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, myself->shard_id, CLUSTER_NAMELEN); + entry = sdscatfmt(entry, " %U ", (unsigned long long)source_epoch); + entry = sdscatlen(entry, primary ? primary->name : "-", primary ? CLUSTER_NAMELEN : 1); + entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, target_shard, CLUSTER_NAMELEN); + entry = sdscatfmt(entry, " %U", (unsigned long long)target_epoch); + clusterRaftPropose(entry, rc, clusterRaftSetReplicaOfRetryCallback); + sdsfree(entry); + return; + } + /* No retry — forward result to the original callback. */ + if (rc->client_callback) rc->client_callback(rc->client_ctx, error); + zfree(rc); +} + +static void clusterRaftFailoverRetryCallback(void *ctx, const char *error) { + proposalRetryCtx *rc = (proposalRetryCtx *)ctx; + if (error && strcmp(error, STALE_SHARD_EPOCH_REJECTION_MSG) == 0 && rc->retries > 0) { + rc->retries--; + /* Rebuild proposal with fresh epoch. */ + clusterNode *primary = myself->replicaof; + if (!primary) { + if (rc->client_callback) rc->client_callback(rc->client_ctx, "no primary to fail over"); + zfree(rc); + return; + } + sds entry = sdsnew("FAILOVER "); + entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); + entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, primary->name, CLUSTER_NAMELEN); + entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, primary->shard_id, CLUSTER_NAMELEN); + uint64_t epoch = clusterGetShardEpoch(primary->shard_id); + entry = sdscatfmt(entry, " %U", (unsigned long long)epoch); + clusterRaftPropose(entry, rc, clusterRaftFailoverRetryCallback); + sdsfree(entry); + return; + } + /* No retry — forward result to the original callback. */ + if (rc->client_callback) rc->client_callback(rc->client_ctx, error); + zfree(rc); } static void clusterRaftForgetNode(const char *node_id, size_t id_len, void *ctx, void (*callback)(void *ctx, const char *error)) { + /* Reject forgetting the raft leader — it would crash the cluster. + * The admin should transfer leadership first. Only block if the leader + * node is still a known, non-failed member. */ + clusterRaftState *rs = RAFT_STATE(); + if (id_len == CLUSTER_NAMELEN && memcmp(node_id, rs->leader, CLUSTER_NAMELEN) == 0) { + clusterNode *leader_node = clusterLookupNode(node_id, id_len); + if (leader_node && !nodeFailed(leader_node)) { + if (callback) callback(ctx, "Can't forget the raft leader. Transfer leadership first."); + return; + } + } + + /* Wrap with retry context for stale epoch recovery. */ + proposalRetryCtx *rc = zmalloc(sizeof(*rc)); + rc->client_ctx = ctx; + rc->client_callback = callback; + rc->retries = PROPOSAL_MAX_RETRIES; + memset(rc->node_id, 0, CLUSTER_NAMELEN); + memcpy(rc->node_id, node_id, id_len < CLUSTER_NAMELEN ? id_len : CLUSTER_NAMELEN); + rc->has_primary = 0; + sds entry = sdsnew("NODE_FORGET "); entry = sdscatlen(entry, node_id, id_len); - clusterRaftPropose(entry, ctx, callback); + /* Append departing node's shard epoch. */ + clusterNode *node = clusterLookupNode(node_id, id_len); + uint64_t epoch = node ? clusterGetShardEpoch(node->shard_id) : 0; + entry = sdscatfmt(entry, " %U", (unsigned long long)epoch); + clusterRaftPropose(entry, rc, clusterRaftForgetNodeRetryCallback); sdsfree(entry); } static void clusterRaftSetReplicaOf(clusterNode *primary, void *ctx, void (*callback)(void *ctx, const char *error)) { - /* Propose SET_REPLICA_OF: " " - * For promotion (dash): shard-id is a new random id. - * For assignment: shard-id is the primary's current shard-id, used - * as a guard against concurrent shard changes. */ + /* Propose SET_REPLICA_OF: + * " " + * Source is myself's current shard. Target is the primary's shard (for assignment) + * or a new random shard (for promotion to primary). */ + + /* Wrap with retry context for stale epoch recovery. */ + proposalRetryCtx *rc = zmalloc(sizeof(*rc)); + rc->client_ctx = ctx; + rc->client_callback = callback; + rc->retries = PROPOSAL_MAX_RETRIES; + memset(rc->node_id, 0, CLUSTER_NAMELEN); + rc->has_primary = (primary != NULL); + if (primary) { + memcpy(rc->primary_name, primary->name, CLUSTER_NAMELEN); + } + + uint64_t source_epoch = clusterGetShardEpoch(myself->shard_id); + + char target_shard[CLUSTER_NAMELEN]; + if (primary) { + memcpy(target_shard, primary->shard_id, CLUSTER_NAMELEN); + } else { + getRandomHexChars(target_shard, CLUSTER_NAMELEN); + } + uint64_t target_epoch = primary ? clusterGetShardEpoch(primary->shard_id) : 0; + sds entry = sdsnew("SET_REPLICA_OF "); entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, myself->shard_id, CLUSTER_NAMELEN); + entry = sdscatfmt(entry, " %U ", (unsigned long long)source_epoch); entry = sdscatlen(entry, primary ? primary->name : "-", primary ? CLUSTER_NAMELEN : 1); entry = sdscatlen(entry, " ", 1); - if (primary) { - entry = sdscatlen(entry, primary->shard_id, CLUSTER_NAMELEN); - } else { - char new_shard_id[CLUSTER_NAMELEN]; - getRandomHexChars(new_shard_id, CLUSTER_NAMELEN); - entry = sdscatlen(entry, new_shard_id, CLUSTER_NAMELEN); - } - clusterRaftPropose(entry, ctx, callback); + entry = sdscatlen(entry, target_shard, CLUSTER_NAMELEN); + entry = sdscatfmt(entry, " %U", (unsigned long long)target_epoch); + clusterRaftPropose(entry, rc, clusterRaftSetReplicaOfRetryCallback); sdsfree(entry); } @@ -2986,11 +3590,23 @@ static void clusterRaftFailover(int force, int takeover, void *ctx, void (*callb if (force) { /* FORCE/TAKEOVER: propose immediately without coordination. */ + /* Wrap with retry context for stale epoch recovery. */ + proposalRetryCtx *rc = zmalloc(sizeof(*rc)); + rc->client_ctx = ctx; + rc->client_callback = callback; + rc->retries = PROPOSAL_MAX_RETRIES; + memset(rc->node_id, 0, CLUSTER_NAMELEN); + rc->has_primary = 0; + sds entry = sdsnew("FAILOVER "); entry = sdscatlen(entry, myself->name, CLUSTER_NAMELEN); entry = sdscatlen(entry, " ", 1); entry = sdscatlen(entry, primary->name, CLUSTER_NAMELEN); - clusterRaftPropose(entry, ctx, callback); + entry = sdscatlen(entry, " ", 1); + entry = sdscatlen(entry, primary->shard_id, CLUSTER_NAMELEN); + uint64_t epoch = clusterGetShardEpoch(primary->shard_id); + entry = sdscatfmt(entry, " %U", (unsigned long long)epoch); + clusterRaftPropose(entry, rc, clusterRaftFailoverRetryCallback); sdsfree(entry); } else { /* Coordinated failover: ask primary to pause writes, then wait @@ -3085,6 +3701,7 @@ static void clusterRaftResetCluster(int hard) { /* Recreate shards dict. */ dictEmpty(server.cluster->shards, NULL); + dictEmpty(rs->shard_epochs, NULL); /* Forget all nodes except myself. */ dictIterator *di = dictGetSafeIterator(server.cluster->nodes); @@ -3197,8 +3814,8 @@ clusterBusType clusterRaftBus = { .resetStats = clusterRaftResetStats, .appendInfoFields = clusterRaftAppendInfoFields, .getFailureReportsCount = clusterRaftGetFailureReportsCount, - .getNodePingPongEpoch = NULL, - .setNodePingPongEpoch = NULL, + .getNodePingPongEpoch = clusterRaftGetNodePingPongEpoch, + .setNodePingPongEpoch = clusterRaftSetNodePingPongEpoch, .setNodeFailed = NULL, .appendVarsLine = clusterRaftAppendVarsLine, .parseVarsLine = clusterRaftParseVarsLine, diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index ec1f91fdf0f..00a7e4f63a8 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -185,13 +185,44 @@ proc cluster_allocate_slots {masters replicas} { } } +# Issue CLUSTER REPLICATE with retry on epoch rejection. In raft mode, +# concurrent REPLICATE commands for the same shard may race with shard +# epoch bumps, causing transient "proposal rejected" errors. +proc cluster_replicate_with_retry {node_id target_id {max_retries 5}} { + for {set attempt 0} {$attempt < $max_retries} {incr attempt} { + if {[catch {R $node_id cluster replicate $target_id} err]} { + if {$attempt < $max_retries - 1 && [string match "*proposal rejected*" $err]} { + after 100 + continue + } + error $err + } + return + } +} + +# Same as above but takes a client object instead of a node index. +# Returns the result on success. +proc cluster_client_replicate_with_retry {client target_id {max_retries 5}} { + for {set attempt 0} {$attempt < $max_retries} {incr attempt} { + if {[catch {$client cluster replicate $target_id} result]} { + if {$attempt < $max_retries - 1 && [string match "*proposal rejected*" $result]} { + after 100 + continue + } + error $result + } + return $result + } +} + proc default_replica_allocation {masters replicas} { # Setup master/replica relationships set node_count [expr $masters + $replicas] for {set i 0} {$i < $masters} {incr i} { set nodeid [R $i CLUSTER MYID] for {set j [expr $i + $masters]} {$j < $node_count} {incr j $masters} { - R $j CLUSTER REPLICATE $nodeid + cluster_replicate_with_retry $j $nodeid } } } @@ -204,7 +235,7 @@ proc cluster_allocate_replicas {masters replicas} { set master_id [expr {$j % $masters}] set replica_id [cluster_find_available_replica $masters] set master_myself [cluster_get_myself $master_id] - R $replica_id cluster replicate [dict get $master_myself id] + cluster_replicate_with_retry $replica_id [dict get $master_myself id] } } diff --git a/tests/unit/cluster/cluster-raft-proto.tcl b/tests/unit/cluster/cluster-raft-proto.tcl index a33a2c7a7b0..f48cd2aed6a 100644 --- a/tests/unit/cluster/cluster-raft-proto.tcl +++ b/tests/unit/cluster/cluster-raft-proto.tcl @@ -84,6 +84,16 @@ proc raft_listen_and_accept {port_var {timeout 5000} {before_accept {}}} { return $::_raft_accepted } +# Connect a fake node to a cluster bus port and complete HELLO handshake. +# Returns the connected fd. +proc raft_connect_fake_node {host cport fake_id fake_addr} { + set fd [raft_connect $host $cport] + raft_send $fd "HELLO $fake_id $fake_addr" + set reply [raft_recv $fd] + assert_match "HI *" $reply + return $fd +} + # Parse an AE message and reply with AE_ACK. # Returns the last log index after applying the entries. proc raft_reply_ae_ack {fd ae_msg repl_offset} { @@ -515,4 +525,186 @@ test "Raft proto: leader sends REPL_OFFSETS after follower offset changes" { } } +# -------------------------------------------------------------------------- +# Shard epoch: stale proposal rejection tests (parameterized). +# These verify that the leader rejects stale proposals BEFORE appending +# to the raft log (commit index unchanged, cluster state unchanged). +# -------------------------------------------------------------------------- + +proc get_cluster_info_field {client field} { + set info [$client CLUSTER INFO] + foreach line [split $info "\n"] { + set line [string trim $line "\r"] + if {[string match "${field}:*" $line]} { + return [lindex [split $line ":"] 1] + } + } + return "" +} + +start_multiple_servers 2 {overrides {cluster-enabled yes cluster-protocol raft cluster-node-timeout 2000 loglevel debug}} { + # Shared setup: form cluster and assign slots. + set r0 [srv 0 client] + set r1 [srv -1 client] + + $r0 CLUSTER MEET [srv -1 host] [srv -1 port] + + wait_for_condition 50 100 { + [get_cluster_info_field $r0 cluster_size] == 2 && + [get_cluster_info_field $r1 cluster_size] == 2 + } else { + fail "Cluster did not form" + } + + $r0 CLUSTER ADDSLOTSRANGE 0 16383 + + wait_for_condition 50 100 { + [get_cluster_info_field $r0 cluster_slots_assigned] == 16384 && + [get_cluster_info_field $r1 cluster_slots_assigned] == 16384 + } else { + fail "Slots not assigned" + } + + set node_id [$r0 CLUSTER MYID] + set node1_id [$r1 CLUSTER MYID] + + # Make r1 a replica of r0. This commits a SET_REPLICA_OF entry + # which bumps the shard epoch from 0 to 1. + $r1 CLUSTER REPLICATE $node_id + + set port [srv 0 port] + set cport [expr {$port + 10000}] + + # Extract shard-id of r0 from CLUSTER SHARDS output. + set node0_shard "" + set shards [$r0 CLUSTER SHARDS] + foreach shard $shards { + foreach node [dict get $shard nodes] { + if {[dict get $node id] eq $node_id} { + set node0_shard [dict get $shard id] + } + } + } + assert {$node0_shard ne ""} + + set fake_id [string repeat "f" 40] + set fake_addr "127.0.0.1:9999@19999,,tls-port=0,shard-id=[string repeat e 40]" + + # Each entry: {entry_type propose_msg state_check} + set stale_proposals [list \ + [list "SLOT_CHANGE" "PROPOSE SLOT_CHANGE $node_id 50 $node1_id 0 0-100" { + assert_equal 16384 [get_cluster_info_field $r0 cluster_slots_assigned] + }] \ + [list "FAILOVER" "PROPOSE FAILOVER $node1_id $node_id $node0_shard 50" { + set nodes [$r0 CLUSTER NODES] + assert_match "*$node_id*myself,master*" $nodes + assert_equal 16384 [get_cluster_info_field $r0 cluster_slots_assigned] + }] \ + ] + + foreach case $stale_proposals { + lassign $case entry_type propose_msg state_check + + test "Raft shard epoch: stale $entry_type is rejected (pre-validation, no log append)" { + # Record commit index before injection. + set commit_before [get_cluster_info_field $r0 cluster_raft_commit_index] + set loglines [count_log_lines 0] + + # Connect and inject stale PROPOSE. + set fd [raft_connect_fake_node 127.0.0.1 $cport $fake_id $fake_addr] + raft_send $fd $propose_msg + + # Expect a REJECT message back from the leader. + set reply [raft_recv $fd 2000] + assert_match "REJECT *" $reply + + # Verify rejection logged. + wait_for_log_messages 0 {"*Leader pre-validation: rejecting*"} $loglines 1000 10 + + # Verify log index unchanged (rejected at pre-validation). + set commit_after [get_cluster_info_field $r0 cluster_raft_commit_index] + assert_equal $commit_before $commit_after + + # Verify cluster state unchanged (stale proposal had no effect). + eval $state_check + + close $fd + } + } + + # -------------------------------------------------------------------------- + # Missing epoch: proposals without the required epoch field are rejected + # at pre-validation (never appended to the log). + # -------------------------------------------------------------------------- + + set missing_epoch_proposals [list \ + [list "SLOT_CHANGE (missing epoch)" "PROPOSE SLOT_CHANGE $node_id 1 $node1_id"] \ + [list "FAILOVER (missing epoch)" "PROPOSE FAILOVER $node1_id $node_id $node0_shard"] \ + [list "SET_REPLICA_OF (missing epoch)" "PROPOSE SET_REPLICA_OF $node1_id [string repeat a 40] 1 $node_id [string repeat b 40]"] \ + [list "NODE_FORGET (missing epoch)" "PROPOSE NODE_FORGET $node1_id"] \ + ] + + foreach case $missing_epoch_proposals { + lassign $case label propose_msg + + test "Raft shard epoch: $label is rejected (pre-validation, no log append)" { + set commit_before [get_cluster_info_field $r0 cluster_raft_commit_index] + set loglines [count_log_lines 0] + + set fd [raft_connect_fake_node 127.0.0.1 $cport $fake_id $fake_addr] + raft_send $fd $propose_msg + + # Expect a REJECT message back from the leader. + set reply [raft_recv $fd 2000] + assert_match "REJECT *" $reply + + # Verify rejection logged. + wait_for_log_messages 0 {"*Leader pre-validation: rejecting*"} $loglines 1000 10 + + # Verify log index unchanged (rejected at pre-validation). + set commit_after [get_cluster_info_field $r0 cluster_raft_commit_index] + assert_equal $commit_before $commit_after + + close $fd + } + } + + # -------------------------------------------------------------------------- + # Wrong shard-id: proposals where the shard-id doesn't match the node's + # actual shard are rejected at pre-validation. + # -------------------------------------------------------------------------- + + set wrong_shard [string repeat "1" 40] + + set wrong_shard_proposals [list \ + [list "FAILOVER" "PROPOSE FAILOVER $node1_id $node_id $wrong_shard 0"] \ + [list "SET_REPLICA_OF" "PROPOSE SET_REPLICA_OF $node1_id $node0_shard 1 $node_id $wrong_shard 0"] \ + ] + + foreach case $wrong_shard_proposals { + lassign $case entry_type propose_msg + + test "Raft shard epoch: $entry_type with wrong shard-id is rejected at pre-validation" { + set commit_before [get_cluster_info_field $r0 cluster_raft_commit_index] + set loglines [count_log_lines 0] + + set fd [raft_connect_fake_node 127.0.0.1 $cport $fake_id $fake_addr] + raft_send $fd $propose_msg + + # Expect a REJECT message back from the leader. + set reply [raft_recv $fd 2000] + assert_match "REJECT *" $reply + + # Verify rejection logged. + wait_for_log_messages 0 {"*Leader pre-validation: rejecting*"} $loglines 1000 10 + + # Verify log index unchanged (rejected at pre-validation, epoch not bumped). + set commit_after [get_cluster_info_field $r0 cluster_raft_commit_index] + assert_equal $commit_before $commit_after + + close $fd + } + } +} + } ;# tags diff --git a/tests/unit/cluster/faster-failover.tcl b/tests/unit/cluster/faster-failover.tcl index 6ecfbf73c99..cb06b8125bc 100644 --- a/tests/unit/cluster/faster-failover.tcl +++ b/tests/unit/cluster/faster-failover.tcl @@ -40,8 +40,8 @@ start_cluster 3 1 {overrides {cluster-ping-interval 1000 cluster-node-timeout 50 # R3 and R4 are both replicas of R0. proc single_primary_replica_allocation {masters replicas} { set master0_id [R 0 CLUSTER MYID] - R 3 CLUSTER REPLICATE $master0_id - R 4 CLUSTER REPLICATE $master0_id + cluster_replicate_with_retry 3 $master0_id + cluster_replicate_with_retry 4 $master0_id } # Test 1: Single primary failure. diff --git a/tests/unit/cluster/pubsubshard-slot-migration.tcl b/tests/unit/cluster/pubsubshard-slot-migration.tcl index c5a324f0949..3d49ca76ed5 100644 --- a/tests/unit/cluster/pubsubshard-slot-migration.tcl +++ b/tests/unit/cluster/pubsubshard-slot-migration.tcl @@ -156,7 +156,7 @@ test "Move a replica to another primary, verify client receives sunsubscribe on $nodefrom(link) spublish $channelname hello assert_equal {smessage mychannel2 hello} [$subscribeclient read] - assert_equal {OK} [$replica_client cluster replicate $nodeto(id)] + assert_equal {OK} [cluster_client_replicate_with_retry $replica_client $nodeto(id)] set msg [$subscribeclient read] assert {"sunsubscribe" eq [lindex $msg 0]}