Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 117 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ void clusterReadHandler(connection *conn);
void clusterSendPing(clusterLink *link, int type);
void clusterSendFail(char *nodename);
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
void clusterProcessFailoverAuthNack(clusterNode *sender, clusterMsg *request);
void clusterSendFailoverNack(clusterNode *node, uint8_t reason);
static const char *clusterNackReasonString(uint8_t reason);
void clusterUpdateState(void);
list *clusterGetNodesInMyShard(clusterNode *node);
int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica);
Expand Down Expand Up @@ -1253,7 +1256,8 @@ void clusterUpdateMyselfFlags(void) {
myself->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED |
CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED |
CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED |
CLUSTER_NODE_MULTI_MEET_SUPPORTED;
CLUSTER_NODE_MULTI_MEET_SUPPORTED |
CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);

Expand Down Expand Up @@ -1447,6 +1451,7 @@ void clusterInit(void) {
server.cluster->fail_reason = CLUSTER_FAIL_NONE;
server.cluster->safe_to_join = 0;
server.cluster->size = 0;
server.cluster->size_fail = 0;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType);
server.cluster->shards = dictCreate(&clusterSdsToListType);
Expand All @@ -1455,6 +1460,7 @@ void clusterInit(void) {
server.cluster->importing_slots_from = dictCreate(&clusterSlotDictType);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_nack_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_failed_primary_rank = 0;
Expand Down Expand Up @@ -3772,6 +3778,9 @@ int clusterIsValidPacket(clusterLink *link) {
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFailoverNack);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
Expand Down Expand Up @@ -3902,6 +3911,13 @@ int clusterProcessPacket(clusterLink *link) {
} else {
sender->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
}

/* Check if the node understands FAILOVER_AUTH_NACK packets. */
if (flags & CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED) {
sender->flags |= CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_FAILOVER_AUTH_NACK_SUPPORTED;
}
}

/* Update the last time we saw any data from this node. We
Expand Down Expand Up @@ -4415,10 +4431,25 @@ int clusterProcessPacket(clusterLink *link) {
* equal to epoch where this node started the election. */
if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) {
server.cluster->failover_auth_count++;
serverLog(LL_NOTICE, "Failover auth ACK from %.40s (%s) for epoch %llu (ACKs %d, quorum %d)",
sender->name, humanNodename(sender), (unsigned long long)server.cluster->failover_auth_epoch,
server.cluster->failover_auth_count, (server.cluster->size / 2) + 1);
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK) {
if (!sender) return 1; /* We don't know that node. */

/* We consider this nack only if the sender is a primary serving
* a non-zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
if (server.cluster->failover_auth_time &&
server.cluster->failover_auth_sent &&
clusterNodeIsVotingPrimary(sender) &&
sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) {
clusterProcessFailoverAuthNack(sender, msg);
}
Comment thread
zuiderkwast marked this conversation as resolved.
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
/* This message is acceptable only if I'm a primary and the sender
* is one of my replicas. */
Expand Down Expand Up @@ -5274,6 +5305,34 @@ void clusterSendFailoverAuth(clusterNode *node) {
clusterMsgSendBlockDecrRefCount(msgblock);
}

static const char *clusterNackReasonString(uint8_t reason) {
switch (reason) {
case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NOT_SAFE: return "not-safe";
case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_EPOCH_OLD: return "req-epoch-old";
case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_ALREADY_VOTED: return "already-voted";
case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_IS_PRIMARY: return "req-is-primary";
case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NO_PRIMARY: return "no-primary";
case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_PRIMARY_UP: return "primary-up";
case CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_STALE_CONFIG: return "stale-config";
default: return "unknown";
}
}

/* Send a FAILOVER_AUTH_NACK message to the specified node. */
void clusterSendFailoverNack(clusterNode *node, uint8_t reason) {
if (!node->link) return;
if (!nodeSupportsFailoverAuthNack(node)) return;

uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData) + sizeof(clusterMsgDataFailoverNack);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK, msglen);

clusterMsg *hdr = getMessageFromSendBlock(msgblock);
memcpy(&hdr->data.failover_nack.nack.reason, &reason, sizeof(reason));

clusterSendMessage(node->link, msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}

/* Send a MFSTART message to the specified node. */
void clusterSendMFStart(clusterNode *node) {
if (!node->link) return;
Expand Down Expand Up @@ -5304,6 +5363,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
if (!server.cluster->safe_to_join) {
serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): it is not safe to vote in this moment)",
node->name, humanNodename(node));
clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NOT_SAFE);
return;
}

Expand All @@ -5315,13 +5375,15 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): reqEpoch (%llu) < curEpoch(%llu)", node->name,
humanNodename(node), (unsigned long long)requestCurrentEpoch,
(unsigned long long)server.cluster->currentEpoch);
clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_EPOCH_OLD);
return;
}

/* I already voted for this epoch? Return ASAP. */
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): already voted for epoch %llu", node->name,
humanNodename(node), (unsigned long long)server.cluster->currentEpoch);
clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_ALREADY_VOTED);
return;
}

Expand All @@ -5332,12 +5394,15 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
if (clusterNodeIsPrimary(node)) {
serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: it is a primary node", node->name,
humanNodename(node), (unsigned long long)requestCurrentEpoch);
clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_REQ_IS_PRIMARY);
} else if (primary == NULL) {
serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: I don't know its primary",
node->name, humanNodename(node), (unsigned long long)requestCurrentEpoch);
clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_NO_PRIMARY);
} else if (!nodeFailed(primary)) {
serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: its primary is up", node->name,
humanNodename(node), (unsigned long long)requestCurrentEpoch);
clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_PRIMARY_UP);
}
return;
}
Expand Down Expand Up @@ -5372,7 +5437,13 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
node->name, humanNodename(node), slot_owner->name, humanNodename(slot_owner));
/* Send UPDATE first so the replica can fix its slot config; the NACK
* that follows then triggers fast-fail, letting the replica retry
* with the freshly-updated configEpoch right away instead of waiting
* for auth_timeout. TCP ordering on the same link guarantees the
* UPDATE arrives before the NACK. */
clusterSendUpdate(node->link, slot_owner);
clusterSendFailoverNack(node, CLUSTERMSG_FAILOVER_AUTH_NACK_REASON_STALE_CONFIG);
return;
}
}
Expand All @@ -5385,6 +5456,34 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
(unsigned long long)server.cluster->currentEpoch);
}

/* Handle a FAILOVER_AUTH_NACK from a voter. */
void clusterProcessFailoverAuthNack(clusterNode *sender, clusterMsg *request) {
server.cluster->failover_auth_nack_count++;
Comment thread
enjoy-binbin marked this conversation as resolved.

/* A voter that NACKed us in this epoch will not change its mind, so the
* upper bound on the votes we can still collect is the voters that have
* not NACKed, minus FAIL voters that will never reply (they count towards
* size but neither ACK nor NACK). Fast-fail once that bound drops below
* the quorum we need to win.. */
int needed_quorum = (server.cluster->size / 2) + 1;
int max_possible_acks = server.cluster->size - server.cluster->size_fail - server.cluster->failover_auth_nack_count;
serverLog(LL_NOTICE, "Failover auth NACK [%s] from %.40s (%s) for epoch %llu (NACKs %d, quorum %d)",
clusterNackReasonString(request->data.failover_nack.nack.reason), sender->name,
humanNodename(sender), (unsigned long long)server.cluster->failover_auth_epoch,
server.cluster->failover_auth_nack_count, needed_quorum);
if (max_possible_acks < needed_quorum) {
serverLog(LL_NOTICE,
"Failover election for epoch %llu cannot reach quorum %d (NACKs %d, dead voters %d). "
"Resetting the election since we cannot win an election without quorum.",
(unsigned long long)server.cluster->failover_auth_epoch, needed_quorum,
server.cluster->failover_auth_nack_count, server.cluster->size_fail);
server.cluster->failover_auth_time = 0;
/* Maybe we could start a new election, set a flag here to make sure
* we check as soon as possible, instead of waiting for a cron. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
}

/* This function returns the "rank" of this instance, a replica, in the context
* of its primary-replicas ring. The rank of the replica is given by the number of
* other replicas for the same primary that have a better replication offset
Expand Down Expand Up @@ -5628,6 +5727,7 @@ void clusterHandleReplicaFailover(void) {
/* Use a failover delay relative to node timeout: 500 for the default node
* timeout of 15000, less for lower node timeout, but not more. */
long long delay = min(server.cluster_node_timeout / 30, 500);
if (server.debug_cluster_failover_delay >= 0) delay = server.debug_cluster_failover_delay;

/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
Expand Down Expand Up @@ -5676,7 +5776,9 @@ void clusterHandleReplicaFailover(void) {
server.cluster->failover_auth_time = now +
delay + /* Fixed delay to let FAIL msg propagate. */
(delay ? random() % delay : 0); /* Random delay between 0 and the fixed delay. */
if (server.debug_cluster_failover_delay >= 0) server.cluster->failover_auth_time = now + delay;
Comment thread
enjoy-binbin marked this conversation as resolved.
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_nack_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetReplicaRank();
/* We add another delay that is proportional to the replica rank.
Expand Down Expand Up @@ -5786,7 +5888,17 @@ void clusterHandleReplicaFailover(void) {

/* Ask for votes if needed. */
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
if (server.debug_cluster_failover_epoch >= 0) {
/* Testing only: force this election to run in a specific epoch so
* that several replicas can be made to contend in the very same
* epoch, deterministically reproducing a split vote. Consumed
* once; subsequent retries fall back to the normal currentEpoch++
* so the replicas can eventually win in distinct epochs. */
server.cluster->currentEpoch = server.debug_cluster_failover_epoch;
server.debug_cluster_failover_epoch = -1;
} else {
server.cluster->currentEpoch++;
}
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_NOTICE, "Starting a failover election for epoch %llu, node config epoch is %llu",
(unsigned long long)server.cluster->currentEpoch, (unsigned long long)nodeEpoch(myself));
Expand Down Expand Up @@ -6653,12 +6765,14 @@ void clusterUpdateState(void) {
dictEntry *de;

server.cluster->size = 0;
server.cluster->size_fail = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (clusterNodeIsVotingPrimary(node)) {
server.cluster->size++;
if (node->flags & CLUSTER_NODE_FAIL) server.cluster->size_fail++;
if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++;
}
}
Expand Down Expand Up @@ -7141,6 +7255,7 @@ const char *clusterGetMessageTypeString(int type) {
case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_NACK: return "auth-nack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
case CLUSTERMSG_TYPE_MODULE: return "module";
Expand Down
Loading
Loading