-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Cluster bus v2 - Added shard level epoch and proposal pre-validation #3899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: cluster-v2
Are you sure you want to change the base?
Changes from all commits
4fcef86
ebc2456
5219c9c
ab5807d
3c8dd26
7893b98
b49b879
f4d7d1e
18a1c8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -138,20 +138,21 @@ NODE_JOIN <node-id> <address> | |||||||||||||||||||||||||||||||||
| via MEET. The node starts as a learner and is promoted to follower | ||||||||||||||||||||||||||||||||||
| when the entry is committed. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| NODE_FORGET <node-id> | ||||||||||||||||||||||||||||||||||
| Remove a node from the cluster (CLUSTER FORGET). Not yet implemented. | ||||||||||||||||||||||||||||||||||
| NODE_FORGET <node-id> <shard-epoch> | ||||||||||||||||||||||||||||||||||
| Remove a node from the cluster (CLUSTER FORGET). | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| SLOT_CHANGE <node-id-or-dash> <range> [<range> ...] | ||||||||||||||||||||||||||||||||||
| SLOT_CHANGE <source-node-id-or-dash> <source-epoch> <target-node-id-or-dash> <target-epoch> <range> [<range> ...] | ||||||||||||||||||||||||||||||||||
| Assign or remove slot ownership. A dash means "no owner" (delete | ||||||||||||||||||||||||||||||||||
| slots). Ranges use the nodes.conf format: "0-5460" or "5461". | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| SET_REPLICA_OF <replica-id> <primary-id-or-dash> <shard-id> | ||||||||||||||||||||||||||||||||||
| SET_REPLICA_OF <replica-id> <source-shard> <source-epoch> <primary-id-or-dash> <target-shard> <target-epoch> | ||||||||||||||||||||||||||||||||||
| Set a node as replica of a primary (CLUSTER REPLICATE). A dash as | ||||||||||||||||||||||||||||||||||
| primary means promote to primary. The shard-id is the target shard: | ||||||||||||||||||||||||||||||||||
| primary means promote to primary. Both source and target shard epochs | ||||||||||||||||||||||||||||||||||
| are validated to guard against concurrent shard changes. | ||||||||||||||||||||||||||||||||||
| for promotion, a new random id; for assignment, the primary's | ||||||||||||||||||||||||||||||||||
| current shard-id (used as a guard against concurrent changes). | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| FAILOVER <replica-id> <primary-id> | ||||||||||||||||||||||||||||||||||
| FAILOVER <replica-id> <primary-id> <shard-id> <shard-epoch> | ||||||||||||||||||||||||||||||||||
| The replica takes over the primary's slots and becomes primary. | ||||||||||||||||||||||||||||||||||
| The old primary becomes a replica of the new primary. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
@@ -171,7 +172,7 @@ NODE_RECOVER <node-id> | |||||||||||||||||||||||||||||||||
| ``` | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Ranges in SLOT_CHANGE use the same format as nodes.conf: `0-5460` or | ||||||||||||||||||||||||||||||||||
| `5461`. A dash as node-id means "no owner" (delete slots) or "no | ||||||||||||||||||||||||||||||||||
| `5461`. A dash as source/target-id means "no owner" (delete slots) or "no | ||||||||||||||||||||||||||||||||||
| primary" (promote to primary). | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ### Why typed entries instead of a key-value store? | ||||||||||||||||||||||||||||||||||
|
|
@@ -212,9 +213,9 @@ changes are infrequent. | |||||||||||||||||||||||||||||||||
| ## PROPOSE and Leader Validation | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Followers forward proposals to the leader using the PROPOSE message, | ||||||||||||||||||||||||||||||||||
| sent on the outbound link to the leader. The leader always accepts | ||||||||||||||||||||||||||||||||||
| proposals without validation — it appends them to the log and | ||||||||||||||||||||||||||||||||||
| replicates them. Validation happens at apply time, where the apply | ||||||||||||||||||||||||||||||||||
| sent on the outbound link to the leader. The leader accepts | ||||||||||||||||||||||||||||||||||
| proposals with best effort pre-validations — it appends them to the log and | ||||||||||||||||||||||||||||||||||
| replicates them. Authoritative validation happens at apply time, where the apply | ||||||||||||||||||||||||||||||||||
| function can detect conflicts and treat them as no-ops. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| This design simplifies the leader: it doesn't need to understand the | ||||||||||||||||||||||||||||||||||
|
|
@@ -723,21 +724,29 @@ reuse), but the vars and log lines are raft-specific. The file is not | |||||||||||||||||||||||||||||||||
| compatible between protocols — switching from gossip to raft (or vice | ||||||||||||||||||||||||||||||||||
| versa) requires removing nodes.conf. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ## Shard Epoch (not yet implemented) | ||||||||||||||||||||||||||||||||||
| ## Shard Epoch | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| A shard-epoch is a per-shard monotonically increasing counter, bumped | ||||||||||||||||||||||||||||||||||
| on topology changes within the shard (FAILOVER, SET_REPLICA_OF, | ||||||||||||||||||||||||||||||||||
| SLOT_CHANGE). Entries that modify shard topology include the current | ||||||||||||||||||||||||||||||||||
| shard-epoch at proposal time. On apply, if the shard-epoch has | ||||||||||||||||||||||||||||||||||
| advanced, the entry is stale and becomes a no-op. | ||||||||||||||||||||||||||||||||||
| Raft ensures entries are applied in a total order, but ordering alone | ||||||||||||||||||||||||||||||||||
| is not sufficient to prevent stale mutations from corrupting cluster | ||||||||||||||||||||||||||||||||||
| state. When concurrent operations target the same shard (e.g., a slot | ||||||||||||||||||||||||||||||||||
| migration racing with a failover), a committed entry may carry | ||||||||||||||||||||||||||||||||||
| assumptions about shard topology that are no longer true by the time | ||||||||||||||||||||||||||||||||||
| it is applied. Without additional application-level state to fence | ||||||||||||||||||||||||||||||||||
| against these stale updates, the apply logic can produce | ||||||||||||||||||||||||||||||||||
| inconsistencies — such as moving a slot to a node that no longer owns | ||||||||||||||||||||||||||||||||||
| the corresponding keys. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| This prevents stale entries from causing inconsistencies when | ||||||||||||||||||||||||||||||||||
| concurrent operations race in the log. Example: | ||||||||||||||||||||||||||||||||||
| A shard-epoch is a per-shard monotonically increasing counter stored | ||||||||||||||||||||||||||||||||||
| in `server.cluster->shard_epochs`. It is bumped each time membership or | ||||||||||||||||||||||||||||||||||
| leadership of the shard changes. Such entries include the shard's | ||||||||||||||||||||||||||||||||||
| current epoch at proposal time. Epoch is validated at prepare time | ||||||||||||||||||||||||||||||||||
| and at apply time. If the epoch has advanced past the value in the entry, | ||||||||||||||||||||||||||||||||||
| the entry is stale and is ignored. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ``` | ||||||||||||||||||||||||||||||||||
| Slot migration racing with failover: | ||||||||||||||||||||||||||||||||||
| ### Example: slot migration racing with failover | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| 1. Atomic slot migration starts: keys transferred from shard A to B. | ||||||||||||||||||||||||||||||||||
| ``` | ||||||||||||||||||||||||||||||||||
| 1. Slot migration starts: keys transferred from shard A to shard B. | ||||||||||||||||||||||||||||||||||
| 2. Primary of shard A fails. FAILOVER entry is proposed. | ||||||||||||||||||||||||||||||||||
| 3. Migration is rolled back (keys stay on shard A's new primary). | ||||||||||||||||||||||||||||||||||
| 4. SLOT_CHANGE entry (assigning slot to shard B) was proposed before | ||||||||||||||||||||||||||||||||||
|
|
@@ -748,14 +757,59 @@ Slot migration racing with failover: | |||||||||||||||||||||||||||||||||
| carries the old epoch, so it's a no-op. Slot stays on shard A. | ||||||||||||||||||||||||||||||||||
| ``` | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Entries that should carry a shard-epoch: | ||||||||||||||||||||||||||||||||||
| - FAILOVER (bumps epoch of the shard) | ||||||||||||||||||||||||||||||||||
| - SET_REPLICA_OF (bumps epoch when changing shard membership) | ||||||||||||||||||||||||||||||||||
| - SLOT_CHANGE (checked against source and target shard epochs) | ||||||||||||||||||||||||||||||||||
| ### Entry formats with epoch | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ``` | ||||||||||||||||||||||||||||||||||
| FAILOVER <replica-id> <primary-id> <shard-id> <shard-epoch> | ||||||||||||||||||||||||||||||||||
| SET_REPLICA_OF <replica-id> <source-shard> <source-epoch> <primary-id-or-dash> <target-shard> <target-epoch> | ||||||||||||||||||||||||||||||||||
| SLOT_CHANGE <source-node-id-or-dash> <source-epoch> <target-node-id-or-dash> <target-epoch> <ranges...> | ||||||||||||||||||||||||||||||||||
| NODE_FORGET <node-id> <epoch> | ||||||||||||||||||||||||||||||||||
| ``` | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| SLOT_CHANGE carries two epochs because it involves two shards (source | ||||||||||||||||||||||||||||||||||
| and target). NODE_FORGET carries the epoch of the departing node's | ||||||||||||||||||||||||||||||||||
| shard to guard against removing a node whose role changed (e.g., | ||||||||||||||||||||||||||||||||||
| promoted to primary via a concurrent FAILOVER). | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ### Validation | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Epoch validation happens at two points: | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| 1. **Pre-validation on the leader** — before appending to the log. | ||||||||||||||||||||||||||||||||||
| This is a best-effort optimization that rejects obviously stale | ||||||||||||||||||||||||||||||||||
| proposals early, saving log space and replication bandwidth. It | ||||||||||||||||||||||||||||||||||
| performs a read-only check without bumping the epoch. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| 2. **Apply-time validation** — the authoritative check. Each apply | ||||||||||||||||||||||||||||||||||
| function validates the entry's epoch against the current shard | ||||||||||||||||||||||||||||||||||
| epoch. On match (or epoch 0 for a new shard), the epoch is bumped | ||||||||||||||||||||||||||||||||||
| and the entry is applied. On mismatch, the entry is a no-op and | ||||||||||||||||||||||||||||||||||
| the error is propagated to the caller's callback. | ||||||||||||||||||||||||||||||||||
|
sushilpaneru1 marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ### Retry on stale epoch | ||||||||||||||||||||||||||||||||||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a high level, retry for proposal rejection looks like it defeats the purpose of epoch level fencing. From a operator point of view I think it makes sense to have a internal retry if epoch was stale; it kinda says the operation is semantically correct but issued concurrently. I have added a bound to number of retries. lmk what you think about the overall strategy? @zuiderkwast @murphyjacob4
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way I see it:
What exactly do we want to prevent and what do we want to allow? For example, a NODE_FORGET for replica C is rejected because of a mismatching epoch, the proposer waits for the log entries to be applied and if it turns out the conflicting entry was a failover A -> B in the same shard and C is still a replica, then it's still safe to forget node C, so we can retry in this case. If instead C got promoted to primary of the shard, we can't retry the FORGET right now. Then, don't retry and instead return an error to the admin client for CLUSTER FORGET. For example, just for
(*) Here, it looks as we may want to bump the epoch at SLOT_CHANGE, at least if it turns an shard without slots into a shard with slots. Another option is to allow forgetting a primary with slots, as we do in gossip. 🤔 We can make similar tables for each entry type. (Or we can make one huge matrix with all things that can get rejected as rows and all possible conficting scenarios as columns. Hard to read though.)
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this. The retry logic was kept intentionally simple to start this kind of discussion. I agree the retry can be made smarter per entry type, but I think we still need a bound on the number of retries regardless — to avoid a proposer getting stuck in a loop. Like you pointed out, I noticed current implementation has a gap: the retry fires immediately in the callback without waiting for the epoch to advance. I think this is the reason why tests are failing even with 5 retries (will fix this). That said, it makes me wonder: for stale epoch rejections specifically, are we really adding a lot of value by making the retry logic smarter but more complex, when the raft leader already does authoritative validation? also, stale epoch proposals are rejected early so I don't think it adds a lot of cost as these are not frequent operations. On operator UX — both modes are similar in the sense that operators wait for convergence. The difference I see is fail-silent/fire-forget (gossip) vs fail-stop. If we fix the epoch increment gate and increase the internal retry count in the current implementation and make all test cases pass without client-side retries, I think we're (kinda) on par for UX. IMO, operator experience is equivalent or better (since this implementation gives a definitive answer rather than requiring convergence polling). My proposal: instead of handling an exhaustive list of scenarios upfront, let's keep it simple, and incrementally make the retry smarter when we need it. @murphyjacob4 could you also provide your inputs on how we should do the retry.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Current behavior
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Proposals rejected due to a stale shard epoch are automatically retried | ||||||||||||||||||||||||||||||||||
| with a fresh epoch (up to 5 attempts): | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| - **SET_REPLICA_OF / NODE_FORGET / FAILOVER (force) / SLOT_CHANGE** — | ||||||||||||||||||||||||||||||||||
| the proposal is rebuilt with current epoch(s) and re-submitted. | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+794
to
+795
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The general rule for retrying should probably be something like:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed but imo, we should make pt.2 simpler. Let's use the thread above to discuss this further. |
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| - **Automatic failover** — if the FAILOVER proposal is rejected, the | ||||||||||||||||||||||||||||||||||
| failover is re-scheduled (via `todo_schedule_failover`) as long as | ||||||||||||||||||||||||||||||||||
| the primary is still failed. The next attempt uses the current epoch. | ||||||||||||||||||||||||||||||||||
| For automatic failover, no cap on retry attempt to avoid leaderless shard. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Only `STALE_SHARD_EPOCH_REJECTION_MSG` triggers retry. Other errors | ||||||||||||||||||||||||||||||||||
| (format errors, invalid state) are forwarded to the client immediately. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| When the leader rejects a forwarded proposal at pre-validation, it sends | ||||||||||||||||||||||||||||||||||
| a `REJECT <type> <data> retry` message back. The `retry` suffix signals | ||||||||||||||||||||||||||||||||||
| the follower that the rejection is epoch-related and eligible for retry. | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+805
to
+807
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Put the rejection message before type and data instead. I think the rejected message should be the reason for rejection, i.e. indicate why it was rejected, instead of saying what the receiver should do. Say
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For multi worded error message, it would be difficult to parse but I think we can introduce a delimiter like "|" So you mean we do something like |
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ### Entries that don't carry an epoch | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Entries that don't need a shard-epoch: | ||||||||||||||||||||||||||||||||||
| - NODE_FAIL / NODE_RECOVER (liveness, not topology) | ||||||||||||||||||||||||||||||||||
| - NODE_INFO / NODE_JOIN / NODE_FORGET (node-level, not shard-level) | ||||||||||||||||||||||||||||||||||
| - NODE_FAIL / NODE_RECOVER | ||||||||||||||||||||||||||||||||||
| - NODE_INFO, NODE_JOIN | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ## Leader Transfer | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.