-
Notifications
You must be signed in to change notification settings - Fork 0
Fix adjacent slot range behavior in ASM operations #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: coderabbit_full_base_fix_adjacent_slot_range_behavior_in_asm_operations_pr2
Are you sure you want to change the base?
Changes from all commits
deabeea
0bb1647
98943f5
a6db339
3129018
55e9331
f32efc8
0443e9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -627,7 +627,6 @@ static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { | |||||||||||||||||||||||||||||||||||||||||||||||||||
| /* Validates the given slot ranges for a migration task: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * - Ensures the current node is a master. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * - Verifies all slots are in a STABLE state. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * - Checks that slot ranges are well-formed and non-overlapping. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * - Confirms all slots belong to a single source node. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * - Confirms no ongoing import task that overlaps with the slot ranges. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -804,11 +803,11 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er | |||||||||||||||||||||||||||||||||||||||||||||||||||
| * initiated for them. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| source = validateImportSlotRanges(slots, err, NULL); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!source) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| goto err; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (source == getMyClusterNode()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| *err = sdsnew("this node is already the owner of the slot range"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| goto err; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* Only support a single task at a time now. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -820,15 +819,15 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er | |||||||||||||||||||||||||||||||||||||||||||||||||||
| asmTaskCancel(current, "new import requested"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| *err = sdsnew("another ASM task is already in progress"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| goto err; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* There should be no task in progress. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverAssert(listLength(asmManager->tasks) == 0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* Create a slot migration task */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| asmTask *task = asmTaskCreate(task_id); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->slots = slotRangeArrayDup(slots); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->slots = slots; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->state = ASM_NONE; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->operation = ASM_IMPORT; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->source_node = source; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -842,6 +841,10 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er | |||||||||||||||||||||||||||||||||||||||||||||||||||
| sdsfree(slots_str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| return task; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| err: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| slotRangeArrayFree(slots); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* CLUSTER MIGRATION IMPORT <start-slot end-slot [start-slot end-slot ...]> | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -860,7 +863,6 @@ static void clusterMigrationCommandImport(client *c) { | |||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| sds err = NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| asmTask *task = asmCreateImportTask(NULL, slots, &err); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| slotRangeArrayFree(slots); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!task) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| addReplyErrorSds(c, err); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1006,6 +1008,20 @@ void clusterMigrationCommand(client *c) { | |||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* Return the number of keys in the specified slot ranges. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!slots) return 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| unsigned long long key_count = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| int total_ranges = slots->num_ranges; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < slots->num_ranges; i++) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| key_count += kvstoreDictSize(server.db[0].keys, j); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return key_count; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1011
to
+1023
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix CI failure: remove unused 🛠️ Proposed fix- int total_ranges = slots->num_ranges;📝 Committable suggestion
Suggested change
🧰 Tools🪛 GitHub Actions: CI[error] 1016-1016: unused variable 'total_ranges' [-Werror=unused-variable]. Variable is assigned but never used in asmCountKeysInSlots. 🪛 GitHub Actions: External Server Tests[error] 1016-1016: unused variable 'total_ranges' [-Werror=unused-variable] 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* Log a human-readable message for ASM task lifecycle events. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| void asmLogTaskEvent(asmTask *task, int event) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| sds str = slotRangeArrayToString(task->slots); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1021,10 +1037,12 @@ void asmLogTaskEvent(asmTask *task, int event) { | |||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_IMPORT_COMPLETED: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Import task %s completed for slots: %s", task->id, str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->id, str, asmCountKeysInSlots(task->slots)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_MIGRATE_STARTED: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Migrate task %s started for slots: %s", task->id, str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->id, str, asmCountKeysInSlots(task->slots)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_MIGRATE_FAILED: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1033,7 +1051,8 @@ void asmLogTaskEvent(asmTask *task, int event) { | |||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_MIGRATE_COMPLETED: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s", task->id, str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task->id, str, asmCountKeysInSlots(task->slots)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -2847,24 +2866,36 @@ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) { | |||||||||||||||||||||||||||||||||||||||||||||||||||
| if (err) *err = NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| switch (event) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_IMPORT_START: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ret = asmCreateImportTask(task_id, arg, &errsds) ? C_OK : C_ERR; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_IMPORT_START: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* Validate the slot ranges. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| slotRangeArray *slots = slotRangeArrayDup(arg); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (slotRangeArrayNormalizeAndValidate(slots, &errsds) != C_OK) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| slotRangeArrayFree(slots); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ret = C_ERR; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_CANCEL: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_CANCEL: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| num_cancelled = clusterAsmCancel(task_id, "user request"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (arg) *((int *)arg) = num_cancelled; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ret = C_OK; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_HANDOFF: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_HANDOFF: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ret = clusterAsmHandoff(task_id, &errsds); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_DONE: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ASM_EVENT_DONE: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ret = clusterAsmDone(task_id, &errsds); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ret = C_ERR; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (ret != C_OK && errsds && err) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -3273,10 +3304,7 @@ void asmActiveTrimStart(void) { | |||||||||||||||||||||||||||||||||||||||||||||||||||
| asmManager->active_trim_current_job_trimmed = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* Count the number of keys to trim */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < slots->num_ranges; i++) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| asmManager->active_trim_current_job_keys += kvstoreDictSize(server.db[0].keys, slot); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| asmManager->active_trim_current_job_keys = asmCountKeysInSlots(slots); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validation error message is discarded without being sent to the client.
The function name
parseSlotRangesOrReplysuggests it should reply with errors, but the validation error fromslotRangeArrayNormalizeAndValidateis freed without sending it to the client. The caller receivesNULLwith no explanation.🐛 Proposed fix
sds err = NULL; if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) { + addReplyErrorSds(c, err); - sdsfree(err); slotRangeArrayFree(slots); return NULL; }📝 Committable suggestion
🤖 Prompt for AI Agents