diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index a9dc4926f51..c5fc876201c 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -67,6 +67,9 @@ void clusterReadHandler(connection *conn); void clusterSendPing(clusterLink *link, int type); void clusterSendFail(char *nodename); void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request); +void clusterProcessFailoverAuthNack(clusterNode *sender, clusterMsg *request); +void clusterSendFailoverNack(clusterNode *node, uint8_t reason); +static const char *clusterNackReasonString(uint8_t reason); void clusterUpdateState(void); list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica); @@ -1253,7 +1256,8 @@ void clusterUpdateMyselfFlags(void) { myself->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED | CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED | CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED | - CLUSTER_NODE_MULTI_MEET_SUPPORTED; + CLUSTER_NODE_MULTI_MEET_SUPPORTED | + CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED; if (myself->flags != oldflags) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); @@ -1447,6 +1451,7 @@ void clusterInit(void) { server.cluster->fail_reason = CLUSTER_FAIL_NONE; server.cluster->safe_to_join = 0; server.cluster->size = 0; + server.cluster->size_fail = 0; server.cluster->todo_before_sleep = 0; server.cluster->nodes = dictCreate(&clusterNodesDictType); server.cluster->shards = dictCreate(&clusterSdsToListType); @@ -1455,6 +1460,7 @@ void clusterInit(void) { server.cluster->importing_slots_from = dictCreate(&clusterSlotDictType); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_nack_count = 0; server.cluster->failover_auth_rank = 0; server.cluster->failover_auth_sent = 0; server.cluster->failover_failed_primary_rank = 0; @@ -3772,6 +3778,9 @@ int clusterIsValidPacket(clusterLink *link) { } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK || type == CLUSTERMSG_TYPE_MFSTART) { explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); + } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK) { + explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); + explen += sizeof(clusterMsgDataFailoverNack); } else if (type == CLUSTERMSG_TYPE_UPDATE) { explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataUpdate); @@ -3902,6 +3911,13 @@ int clusterProcessPacket(clusterLink *link) { } else { sender->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL; } + + /* Check if the node understands FAILOVER_AUTH_NACK packets. */ + if (flags & CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED) { + sender->flags |= CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED; + } else { + sender->flags &= ~CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED; + } } /* Update the last time we saw any data from this node. We @@ -4415,10 +4431,25 @@ int clusterProcessPacket(clusterLink *link) { * equal to epoch where this node started the election. */ if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) { server.cluster->failover_auth_count++; + serverLog(LL_NOTICE, "Failover auth ACK from %.40s (%s) for epoch %llu (ACKs %d, quorum %d)", + sender->name, humanNodename(sender), (unsigned long long)server.cluster->failover_auth_epoch, + server.cluster->failover_auth_count, (server.cluster->size / 2) + 1); /* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } + } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK) { + if (!sender) return 1; /* We don't know that node. */ + + /* We consider this nack only if the sender is a primary serving + * a non-zero number of slots, and its currentEpoch is greater or + * equal to epoch where this node started the election. */ + if (server.cluster->failover_auth_time && + server.cluster->failover_auth_sent && + clusterNodeIsVotingPrimary(sender) && + sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) { + clusterProcessFailoverAuthNack(sender, msg); + } } else if (type == CLUSTERMSG_TYPE_MFSTART) { /* This message is acceptable only if I'm a primary and the sender * is one of my replicas. */ @@ -5274,6 +5305,34 @@ void clusterSendFailoverAuth(clusterNode *node) { clusterMsgSendBlockDecrRefCount(msgblock); } +static const char *clusterNackReasonString(uint8_t reason) { + switch (reason) { + case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NOT_SAFE: return "not-safe"; + case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_EPOCH_OLD: return "req-epoch-old"; + case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_ALREADY_VOTED: return "already-voted"; + case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_IS_PRIMARY: return "req-is-primary"; + case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NO_PRIMARY: return "no-primary"; + case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_PRIMARY_UP: return "primary-up"; + case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_STALE_CONFIG: return "stale-config"; + default: return "unknown"; + } +} + +/* Send a FAILOVER_AUTH_NACK message to the specified node. */ +void clusterSendFailoverNack(clusterNode *node, uint8_t reason) { + if (!node->link) return; + if (!nodeSupportsFailoverAuthNack(node)) return; + + uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData) + sizeof(clusterMsgDataFailoverNack); + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK, msglen); + + clusterMsg *hdr = getMessageFromSendBlock(msgblock); + memcpy(&hdr->data.failover_nack.nack.reason, &reason, sizeof(reason)); + + clusterSendMessage(node->link, msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); +} + /* Send a MFSTART message to the specified node. */ void clusterSendMFStart(clusterNode *node) { if (!node->link) return; @@ -5304,6 +5363,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (!server.cluster->safe_to_join) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): it is not safe to vote in this moment)", node->name, humanNodename(node)); + clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NOT_SAFE); return; } @@ -5315,6 +5375,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): reqEpoch (%llu) < curEpoch(%llu)", node->name, humanNodename(node), (unsigned long long)requestCurrentEpoch, (unsigned long long)server.cluster->currentEpoch); + clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_EPOCH_OLD); return; } @@ -5322,6 +5383,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): already voted for epoch %llu", node->name, humanNodename(node), (unsigned long long)server.cluster->currentEpoch); + clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_ALREADY_VOTED); return; } @@ -5332,12 +5394,15 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (clusterNodeIsPrimary(node)) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: it is a primary node", node->name, humanNodename(node), (unsigned long long)requestCurrentEpoch); + clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_IS_PRIMARY); } else if (primary == NULL) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: I don't know its primary", node->name, humanNodename(node), (unsigned long long)requestCurrentEpoch); + clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NO_PRIMARY); } else if (!nodeFailed(primary)) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: its primary is up", node->name, humanNodename(node), (unsigned long long)requestCurrentEpoch); + clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_PRIMARY_UP); } return; } @@ -5372,7 +5437,13 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { "Node %.40s (%s) has old slots configuration, sending " "an UPDATE message about %.40s (%s)", node->name, humanNodename(node), slot_owner->name, humanNodename(slot_owner)); + /* Send UPDATE first so the replica can fix its slot config; the NACK + * that follows then triggers fast-fail, letting the replica retry + * with the freshly-updated configEpoch right away instead of waiting + * for auth_timeout. TCP ordering on the same link guarantees the + * UPDATE arrives before the NACK. */ clusterSendUpdate(node->link, slot_owner); + clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_STALE_CONFIG); return; } } @@ -5385,6 +5456,34 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { (unsigned long long)server.cluster->currentEpoch); } +/* Handle a FAILOVER_AUTH_NACK from a voter. */ +void clusterProcessFailoverAuthNack(clusterNode *sender, clusterMsg *request) { + server.cluster->failover_auth_nack_count++; + + /* A voter that NACKed us in this epoch will not change its mind, so the + * upper bound on the votes we can still collect is the voters that have + * not NACKed, minus FAIL voters that will never reply (they count towards + * size but neither ACK nor NACK). Fast-fail once that bound drops below + * the quorum we need to win.. */ + int needed_quorum = (server.cluster->size / 2) + 1; + int max_possible_acks = server.cluster->size - server.cluster->size_fail - server.cluster->failover_auth_nack_count; + serverLog(LL_NOTICE, "Failover auth NACK [%s] from %.40s (%s) for epoch %llu (NACKs %d, quorum %d)", + clusterNackReasonString(request->data.failover_nack.nack.reason), sender->name, + humanNodename(sender), (unsigned long long)server.cluster->failover_auth_epoch, + server.cluster->failover_auth_nack_count, needed_quorum); + if (max_possible_acks < needed_quorum) { + serverLog(LL_NOTICE, + "Failover election for epoch %llu cannot reach quorum %d (NACKs %d, dead voters %d). " + "Resetting the election since we cannot win an election without quorum.", + (unsigned long long)server.cluster->failover_auth_epoch, needed_quorum, + server.cluster->failover_auth_nack_count, server.cluster->size_fail); + server.cluster->failover_auth_time = 0; + /* Maybe we could start a new election, set a flag here to make sure + * we check as soon as possible, instead of waiting for a cron. */ + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); + } +} + /* This function returns the "rank" of this instance, a replica, in the context * of its primary-replicas ring. The rank of the replica is given by the number of * other replicas for the same primary that have a better replication offset @@ -5628,6 +5727,7 @@ void clusterHandleReplicaFailover(void) { /* Use a failover delay relative to node timeout: 500 for the default node * timeout of 15000, less for lower node timeout, but not more. */ long long delay = min(server.cluster_node_timeout / 30, 500); + if (server.debug_cluster_failover_delay >= 0) delay = server.debug_cluster_failover_delay; /* Pre conditions to run the function, that must be met both in case * of an automatic or manual failover: @@ -5676,7 +5776,9 @@ void clusterHandleReplicaFailover(void) { server.cluster->failover_auth_time = now + delay + /* Fixed delay to let FAIL msg propagate. */ (delay ? random() % delay : 0); /* Random delay between 0 and the fixed delay. */ + if (server.debug_cluster_failover_delay >= 0) server.cluster->failover_auth_time = now + delay; server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_nack_count = 0; server.cluster->failover_auth_sent = 0; server.cluster->failover_auth_rank = clusterGetReplicaRank(); /* We add another delay that is proportional to the replica rank. @@ -5786,7 +5888,17 @@ void clusterHandleReplicaFailover(void) { /* Ask for votes if needed. */ if (server.cluster->failover_auth_sent == 0) { - server.cluster->currentEpoch++; + if (server.debug_cluster_failover_epoch >= 0) { + /* Testing only: force this election to run in a specific epoch so + * that several replicas can be made to contend in the very same + * epoch, deterministically reproducing a split vote. Consumed + * once; subsequent retries fall back to the normal currentEpoch++ + * so the replicas can eventually win in distinct epochs. */ + server.cluster->currentEpoch = server.debug_cluster_failover_epoch; + server.debug_cluster_failover_epoch = -1; + } else { + server.cluster->currentEpoch++; + } server.cluster->failover_auth_epoch = server.cluster->currentEpoch; serverLog(LL_NOTICE, "Starting a failover election for epoch %llu, node config epoch is %llu", (unsigned long long)server.cluster->currentEpoch, (unsigned long long)nodeEpoch(myself)); @@ -6653,12 +6765,14 @@ void clusterUpdateState(void) { dictEntry *de; server.cluster->size = 0; + server.cluster->size_fail = 0; di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); if (clusterNodeIsVotingPrimary(node)) { server.cluster->size++; + if (node->flags & CLUSTER_NODE_FAIL) server.cluster->size_fail++; if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++; } } @@ -7141,6 +7255,7 @@ const char *clusterGetMessageTypeString(int type) { case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard"; case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req"; case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack"; + case CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK: return "auth-nack"; case CLUSTERMSG_TYPE_UPDATE: return "update"; case CLUSTERMSG_TYPE_MFSTART: return "mfstart"; case CLUSTERMSG_TYPE_MODULE: return "module"; diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 7703f071de0..ccbb3a48adc 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -69,6 +69,7 @@ typedef struct clusterLink { * myself will gossip this flag to other replica in the \ * shard so that the replicas can make a better ranking \ * decisions to help with the failover. */ +#define CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED (1 << 14) /* This node understands FAILOVER_AUTH_NACK messages. */ #define CLUSTER_NODE_NULL_NAME \ "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ @@ -86,6 +87,7 @@ typedef struct clusterLink { #define nodeSupportsMultiMeet(n) ((n)->flags & CLUSTER_NODE_MULTI_MEET_SUPPORTED) #define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL))) #define nodePrimaryIsFail(n) ((n)->flags & CLUSTER_NODE_MY_PRIMARY_FAIL) +#define nodeSupportsFailoverAuthNack(n) ((n)->flags & CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED) /* Cluster messages header */ @@ -106,7 +108,8 @@ typedef struct clusterLink { #define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */ #define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */ #define CLUSTERMSG_TYPE_PUBLISHSHARD 10 /* Pub/Sub Publish shard propagation */ -#define CLUSTERMSG_TYPE_COUNT 11 /* Total number of message types. */ +#define CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK 11 /* No, you don't have my vote. */ +#define CLUSTERMSG_TYPE_COUNT 12 /* Total number of message types. */ #define CLUSTERMSG_LIGHT 0x8000 /* Modifier bit for message types that support light header */ @@ -141,6 +144,10 @@ typedef struct { char nodename[CLUSTER_NAMELEN]; } clusterMsgDataFail; +typedef struct { + uint8_t reason; +} clusterMsgDataFailoverNack; + typedef struct { uint32_t channel_len; uint32_t message_len; @@ -264,6 +271,11 @@ union clusterMsgData { struct { clusterMsgModule msg; } module; + + /* FAILOVER_AUTH_NACK */ + struct { + clusterMsgDataFailoverNack nack; + } failover_nack; }; #define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */ @@ -336,6 +348,15 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset"); primary is up. */ #define CLUSTERMSG_FLAG0_EXT_DATA (1 << 2) /* Message contains extension data */ +/* Reason values carried in clusterMsgDataFailoverNack.reason. */ +#define CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NOT_SAFE 1 /* Voter is not safe to vote yet. */ +#define CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_EPOCH_OLD 2 /* Request epoch < voter's currentEpoch. */ +#define CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_ALREADY_VOTED 3 /* Voter already voted in this epoch. */ +#define CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_IS_PRIMARY 4 /* Requester is a primary itself. */ +#define CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NO_PRIMARY 5 /* Voter doesn't know it's primary. */ +#define CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_PRIMARY_UP 6 /* Voter still sees it's primary up. */ +#define CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_STALE_CONFIG 7 /* Replica's slot config is stale. */ + typedef struct { char sig[4]; /* Signature "RCmb" (Cluster message bus). */ uint32_t totlen; /* Total length of this message */ @@ -436,6 +457,7 @@ struct clusterState { int fail_reason; /* Why the cluster state changes to fail. */ int safe_to_join; /* Can the restarted node safely join the cluster? */ int size; /* Num of primary nodes with at least one slot */ + int size_fail; /* Num of voting primaries currently in FAIL state (subset of size). */ dict *nodes; /* Hash table of name -> clusterNode structures */ dict *shards; /* Hash table of shard_id -> list (of nodes) structures */ dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ @@ -448,6 +470,7 @@ struct clusterState { /* The following fields are used to take the replica state on elections. */ mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */ + int failover_auth_nack_count; /* Number of rejected votes received so far. */ int failover_auth_sent; /* True if we already asked for votes. */ int failover_auth_rank; /* This replica rank for current auth request. */ int failover_failed_primary_rank; /* The rank of this instance in the context of all failed primary list. */ diff --git a/src/debug.c b/src/debug.c index 9b32dc6a408..1c6afad82f2 100644 --- a/src/debug.c +++ b/src/debug.c @@ -448,6 +448,14 @@ void debugCommand(client *c) { " Disable sending cluster ping to a random node every second.", "DISABLE-CLUSTER-RECONNECTION <0|1>", " Disable cluster reconnection of cluster nodes.", + "CLUSTER-FAILOVER-DELAY ", + " Override the failover delay. -1 is the default value, meaning don't", + " override, values >= 0 will be used for the failover delay.", + "CLUSTER-FAILOVER-EPOCH ", + " Force the next failover election started by this replica to run in", + " the given epoch instead of currentEpoch+1. -1 (default) disables the", + " override. It is consumed once: subsequent retries use currentEpoch+1", + " again. Useful to make several replicas contend in the same epoch.", "OOM", " Crash the server simulating an out-of-memory error.", "PANIC", @@ -638,6 +646,24 @@ void debugCommand(client *c) { } else if (!strcasecmp(objectGetVal(c->argv[1]), "disable-cluster-reconnection") && c->argc == 3) { server.debug_cluster_disable_reconnection = atoi(objectGetVal(c->argv[2])); addReply(c, shared.ok); + } else if (!strcasecmp(objectGetVal(c->argv[1]), "cluster-failover-delay") && c->argc == 3) { + int delay_ms; + if (getIntFromObjectOrReply(c, c->argv[2], &delay_ms, NULL) != C_OK) return; + if (delay_ms < -1) { + addReplyError(c, "delay-ms must be -1 (default) or a non-negative value in ms"); + return; + } + server.debug_cluster_failover_delay = delay_ms; + addReply(c, shared.ok); + } else if (!strcasecmp(objectGetVal(c->argv[1]), "cluster-failover-epoch") && c->argc == 3) { + long long epoch; + if (getLongLongFromObjectOrReply(c, c->argv[2], &epoch, NULL) != C_OK) return; + if (epoch < -1) { + addReplyError(c, "epoch must be -1 (default) or a non-negative value"); + return; + } + server.debug_cluster_failover_epoch = epoch; + addReply(c, shared.ok); } else if (!strcasecmp(objectGetVal(c->argv[1]), "slotmigration")) { if (!strcasecmp(objectGetVal(c->argv[2]), "prevent-pause")) { server.debug_slot_migration_prevent_pause = atoi(objectGetVal(c->argv[3])); diff --git a/src/server.c b/src/server.c index 92f9c9754fa..e16b235a42e 100644 --- a/src/server.c +++ b/src/server.c @@ -2963,6 +2963,8 @@ void initServer(void) { server.cluster_drop_packet_filter = -1; server.debug_cluster_disable_random_ping = 0; server.debug_cluster_disable_reconnection = 0; + server.debug_cluster_failover_delay = -1; + server.debug_cluster_failover_epoch = -1; server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME; server.reply_buffer_resizing_enabled = 1; server.client_mem_usage_buckets = NULL; diff --git a/src/server.h b/src/server.h index 8702ed222e7..a323b011154 100644 --- a/src/server.h +++ b/src/server.h @@ -2317,6 +2317,11 @@ struct valkeyServer { /* Debug config to expose intermediary slot migration states. */ uint32_t debug_slot_migration_prevent_pause : 1; uint32_t debug_slot_migration_prevent_failover : 1; + /* Debug config to override the failover delay (in ms). */ + int debug_cluster_failover_delay; + /* Debug config to force the next failover election to run in a specific + * epoch (testing only). -1 means don't override; consumed once. */ + long long debug_cluster_failover_epoch; sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */ /* Scripting */ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */ diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index ec1f91fdf0f..f4beb406549 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -292,6 +292,20 @@ proc cluster_has_flag {node flag} { expr {[lsearch -exact [dict get $node flags] $flag] != -1} } +# Returns 1 only when every server instance in `srv_idxs` sees every +# node id in `node_ids` carrying `flag` in its CLUSTER NODES output. +proc cluster_all_see_flag {srv_idxs node_ids flag} { + foreach idx $srv_idxs { + foreach id $node_ids { + set node [cluster_get_node_by_id $idx $id] + if {![cluster_has_flag $node $flag]} { + return 0 + } + } + } + return 1 +} + # Returns the parsed "myself" node entry as a dictionary. proc cluster_get_myself id { set nodes [get_cluster_nodes $id] diff --git a/tests/unit/cluster/failover2.tcl b/tests/unit/cluster/failover2.tcl index 905299ac110..72e5aa05bc0 100644 --- a/tests/unit/cluster/failover2.tcl +++ b/tests/unit/cluster/failover2.tcl @@ -64,16 +64,51 @@ start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval } } ;# start_cluster -start_cluster 7 3 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 15000}} { - test "Primaries will not time out then they are elected in the same epoch" { +# Needs to run in the body of +# start_cluster 7 3 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 15000}} +proc test_same_epoch {delay} { + test "Primaries will not time out then they are elected in the same epoch - delay $delay" { # Since we have the delay time, so these node may not initiate the # election at the same time (same epoch). But if they do, we make # sure there is no failover timeout. + R 7 DEBUG CLUSTER-FAILOVER-DELAY $delay + R 8 DEBUG CLUSTER-FAILOVER-DELAY $delay + R 9 DEBUG CLUSTER-FAILOVER-DELAY $delay # Killing there primary nodes. - pause_process [srv 0 pid] - pause_process [srv -1 pid] - pause_process [srv -2 pid] + set primary_ids [list [R 0 cluster myid] [R 1 cluster myid] [R 2 cluster myid]] + exec kill -SIGSTOP [srv 0 pid] [srv -1 pid] [srv -2 pid] + + # Wait until every voter (idx 3..6) sees all three paused primaries + # as fail, so the upcoming election is granted on the first round. + # Otherwise voter might reply with NACK primary-up. + wait_for_condition 1000 50 { + [cluster_all_see_flag {3 4 5 6} $primary_ids fail] + } else { + fail "Voters did not mark all paused primaries as fail" + } + + # Force the three replicas to start their election in the very same + # epoch, so the same-epoch split vote is reproduced deterministically + # rather than depending on the timing of the (possibly zero) delay. + if {$delay == 0} { + set epoch [expr [CI 3 cluster_current_epoch] + 1] + R 7 DEBUG CLUSTER-FAILOVER-EPOCH $epoch + R 8 DEBUG CLUSTER-FAILOVER-EPOCH $epoch + R 9 DEBUG CLUSTER-FAILOVER-EPOCH $epoch + } + + # Now let the replicas proceed with the election. + R 7 CONFIG SET cluster-replica-no-failover no + R 8 CONFIG SET cluster-replica-no-failover no + R 9 CONFIG SET cluster-replica-no-failover no + + # All three must have contended in the very same (forced) epoch. + if {$delay == 0} { + wait_for_log_messages -7 [list "*Starting a failover election for epoch $epoch*"] 0 1000 50 + wait_for_log_messages -8 [list "*Starting a failover election for epoch $epoch*"] 0 1000 50 + wait_for_log_messages -9 [list "*Starting a failover election for epoch $epoch*"] 0 1000 50 + } # Wait for the failover wait_for_condition 1000 50 { @@ -99,6 +134,14 @@ start_cluster 7 3 {tags {external:skip cluster} overrides {cluster-ping-interval resume_process [srv -1 pid] resume_process [srv -2 pid] } +} + +start_cluster 7 3 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-replica-no-failover yes}} { + test_same_epoch 500 +} ;# start_cluster + +start_cluster 7 3 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-replica-no-failover yes}} { + test_same_epoch 0 } ;# start_cluster run_solo {cluster} { @@ -134,12 +177,26 @@ run_solo {cluster} { } ;# start_cluster } ;# run_solo +# Setup: R3 is the only replica of R0. While R3 is network-isolated, +# we bump R0's configEpoch and let R1/R2 learn the new value through +# gossip. R3's view is therefore stale. We then pause R0 and let R3 +# attempt failover: the voters reject it with STALE_CONFIG, send back +# both an UPDATE (with R0's fresh slot config) and a NACK, and R3 must +# eventually win a second election with the refreshed configEpoch. +# +# `type` : "automatic" or "manual" failover. +# `drop_nack` : 1 -> drop FAILOVER_AUTH_NACK on R3 to exercise the +# legacy timeout-based retry. +# 0 -> let NACKs through and exercise the fast-fail +# path that retries without waiting for the timeout. +# # Needs to run in the body of # start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} -proc test_replica_config_epoch_failover {type} { - test "Replica can update the config epoch when trigger the failover - $type" { +proc test_replica_config_epoch_failover {type drop_nack} { + test "Replica can update the config epoch when trigger the failover - $type - drop_nack $drop_nack" { set CLUSTER_PACKET_TYPE_NONE -1 set CLUSTER_PACKET_TYPE_ALL -2 + set CLUSTER_PACKET_TYPE_FAILOVER_AUTH_NACK 11 if {$type == "automatic"} { R 3 CONFIG SET cluster-replica-no-failover no @@ -167,24 +224,61 @@ proc test_replica_config_epoch_failover {type} { # Make sure that replica do not update config epoch. assert_not_equal $R0_config_epoch [dict get [cluster_get_node_by_id 3 $R0_nodeid] config_epoch] - # Pause the R 0 and wait for the cluster to be down. + # Pause R0 and resume R3's debug. + # drop_nack=1 keeps NACKs filtered so the legacy timeout path is exercised. + # drop_nack=0 lets NACKs through for the fast-fail path. pause_process [srv 0 pid] - R 3 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + if {$drop_nack} { + R 3 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_FAILOVER_AUTH_NACK + } else { + R 3 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + } R 3 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 0 + + # Wait for R3 to reconnect to both voters before triggering anything + # that depends on bidirectional traffic, otherwise an immediate failover + # request can race the link rebuild and the NACK reply may be lost. + set R1_nodeid [R 1 cluster myid] + set R2_nodeid [R 2 cluster myid] + set R3_nodeid [R 3 cluster myid] wait_for_condition 1000 50 { - [CI 1 cluster_state] == "fail" && - [CI 2 cluster_state] == "fail" && - [CI 3 cluster_state] == "fail" + [dict get [cluster_get_node_by_id 1 $R3_nodeid] linkstate] eq "connected" && + [dict get [cluster_get_node_by_id 3 $R1_nodeid] linkstate] eq "connected" && + [dict get [cluster_get_node_by_id 2 $R3_nodeid] linkstate] eq "connected" && + [dict get [cluster_get_node_by_id 3 $R2_nodeid] linkstate] eq "connected" } else { - fail "Cluster does not fail" + fail "R3 did not reconnect its bus links to the voters" } - # Make sure both the automatic and the manual failover will fail in the first time. - if {$type == "automatic"} { - wait_for_log_messages -3 {"*Failover attempt expired*"} 0 1200 50 - } elseif {$type == "manual"} { + if {$drop_nack} { + wait_for_condition 1000 50 { + [CI 1 cluster_state] == "fail" && + [CI 2 cluster_state] == "fail" && + [CI 3 cluster_state] == "fail" + } else { + fail "Cluster does not fail" + } + } + + if {$type == "manual"} { R 3 cluster failover force - wait_for_log_messages -3 {"*Manual failover timed out*"} 0 1200 50 + } + + if {$drop_nack} { + # Make sure both the automatic and the manual failover will fail in the first time. + if {$type == "automatic"} { + wait_for_log_messages -3 {"*Failover attempt expired*"} 0 1200 50 + } elseif {$type == "manual"} { + wait_for_log_messages -3 {"*Manual failover timed out*"} 0 1200 50 + } + } else { + # Fast-fail path: NACK accounting trips the quorum check + # and "attempt expired" / "timed out" must never appear. + wait_for_log_messages -3 {"*cannot reach quorum*"} 0 1200 50 + verify_no_log_message -3 "*Failover attempt expired*" 0 + if {$type == "manual"} { + verify_no_log_message -3 "*Manual failover timed out*" 0 + } } # Make sure the primaries prints the relevant logs. @@ -200,7 +294,7 @@ proc test_replica_config_epoch_failover {type} { fail "The replica does not update the config epoch" } - if {$type == "manual"} { + if {$drop_nack && $type == "manual"} { # The second manual failure will succeed because the config epoch # has already propagated. R 3 cluster failover force @@ -228,9 +322,17 @@ proc test_replica_config_epoch_failover {type} { } start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} { - test_replica_config_epoch_failover "automatic" + test_replica_config_epoch_failover "automatic" 1 +} + +start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} { + test_replica_config_epoch_failover "manual" 1 +} + +start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} { + test_replica_config_epoch_failover "automatic" 0 } start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-replica-validity-factor 0}} { - test_replica_config_epoch_failover "manual" + test_replica_config_epoch_failover "manual" 0 }