From 512497dae711a060b474ca774141d0a6ca84b373 Mon Sep 17 00:00:00 2001 From: YaacovHazan <31382944+YaacovHazan@users.noreply.github.com> Date: Thu, 14 May 2026 17:14:15 +0300 Subject: [PATCH 01/17] Set default for INLINE_LSE_ATOMICS to 0 for compatibility across architectures (#15212) Ensure backward compatibility and consistent behavior across different architectures by explicitly setting the default value. Fixes #15175 Co-authored-by: ofiryanai (cherry picked from commit 6c3a8ecceff085835a5388e97af0238646755bfe) --- modules/redisearch/Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/redisearch/Makefile b/modules/redisearch/Makefile index c3990b6865f..fc2dc84b06f 100644 --- a/modules/redisearch/Makefile +++ b/modules/redisearch/Makefile @@ -3,5 +3,10 @@ MODULE_VERSION = v8.6.8 MODULE_REPO = https://github.com/redisearch/redisearch TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/search-community/redisearch.so + # Set INLINE_LSE_ATOMICS=1 for perf improvement on common ARM CPUs (i.e. Graviton2/3/4); no effect on x86 or macOS. + # Default 0 keeps the binary runnable on pre-Armv8.1-a cores (Cortex-A72, Graviton1, RPi4) that would otherwise SIGILL at module load. +INLINE_LSE_ATOMICS ?= 0 +export INLINE_LSE_ATOMICS + include ../common.mk From e6eef891804c86e4c8c90ce7762a21617d1fed45 Mon Sep 17 00:00:00 2001 From: Antoni Dikov Date: Mon, 30 Mar 2026 09:07:07 +0300 Subject: [PATCH 02/17] Fix COMMAND GETKEYS for PFMERGE with no source keys (#14942) PFMERGE's second key spec (source keys) produces an empty range when called with only a dest key (e.g. PFMERGE dest). getKeysUsingKeySpecs treats that as invalid_spec, which discards all previously found keys and returns an error. Add pfmergeGetKeys as a getkeys callback so the command correctly falls back to it when key specs fail on the edge case. (cherry picked from commit 5f5ddfd1a1ff39617277ac0a4302bee1ef94b492) --- src/commands.def | 2 +- src/commands/pfmerge.json | 1 + src/db.c | 23 +++++++++++++++++++++++ src/server.h | 1 + tests/unit/introspection-2.tcl | 11 +++++++++++ 5 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/commands.def b/src/commands.def index fda31af2a8c..b90587aa839 100644 --- a/src/commands.def +++ b/src/commands.def @@ -11830,7 +11830,7 @@ struct COMMAND_STRUCT redisCommandTable[] = { {MAKE_CMD("pfadd","Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.","O(1) to add every element.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFADD_History,0,PFADD_Tips,0,pfaddCommand,-2,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HYPERLOGLOG,PFADD_Keyspecs,1,NULL,2),.args=PFADD_Args}, {MAKE_CMD("pfcount","Returns the approximated cardinality of the set(s) observed by the HyperLogLog key(s).","O(1) with a very small average constant time when called with a single key. O(N) with N being the number of keys, and much bigger constant times, when called with multiple keys.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFCOUNT_History,0,PFCOUNT_Tips,0,pfcountCommand,-2,CMD_READONLY|CMD_MAY_REPLICATE,ACL_CATEGORY_HYPERLOGLOG,PFCOUNT_Keyspecs,1,NULL,1),.args=PFCOUNT_Args}, {MAKE_CMD("pfdebug","Internal commands for debugging HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFDEBUG_History,0,PFDEBUG_Tips,0,pfdebugCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFDEBUG_Keyspecs,1,NULL,2),.args=PFDEBUG_Args}, -{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,NULL,2),.args=PFMERGE_Args}, +{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,pfmergeGetKeys,2),.args=PFMERGE_Args}, {MAKE_CMD("pfselftest","An internal command for testing HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFSELFTEST_History,0,PFSELFTEST_Tips,0,pfselftestCommand,1,CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFSELFTEST_Keyspecs,0,NULL,0)}, /* list */ {MAKE_CMD("blmove","Pops an element from a list, pushes it to another list and returns it. Blocks until an element is available otherwise. Deletes the list if the last element was moved.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,BLMOVE_History,0,BLMOVE_Tips,0,blmoveCommand,6,CMD_WRITE|CMD_DENYOOM|CMD_BLOCKING,ACL_CATEGORY_LIST,BLMOVE_Keyspecs,2,NULL,5),.args=BLMOVE_Args}, diff --git a/src/commands/pfmerge.json b/src/commands/pfmerge.json index c93070f1167..e3e26fabd84 100644 --- a/src/commands/pfmerge.json +++ b/src/commands/pfmerge.json @@ -6,6 +6,7 @@ "since": "2.8.9", "arity": -2, "function": "pfmergeCommand", + "get_keys_function": "pfmergeGetKeys", "command_flags": [ "WRITE", "DENYOOM" diff --git a/src/db.c b/src/db.c index 4dcd7f09f26..8219d4435ea 100644 --- a/src/db.c +++ b/src/db.c @@ -3643,6 +3643,29 @@ int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult * return result->numkeys; } +int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int i, numkeys; + keyReference *keys; + UNUSED(cmd); + UNUSED(argv); + + numkeys = argc - 1; /* destkey + all sourcekeys */ + keys = getKeysPrepareResult(result, numkeys); + + /* destkey at argv[1] */ + keys[0].pos = 1; + keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_INSERT; + + /* sourcekeys at argv[2..argc-1], may be zero */ + for (i = 2; i < argc; i++) { + keys[i - 1].pos = i; + keys[i - 1].flags = CMD_KEY_RO | CMD_KEY_ACCESS; + } + + result->numkeys = numkeys; + return result->numkeys; +} + /* This command declares incomplete keys, so the flags are correctly set for this function */ int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { int i, j, num, first; diff --git a/src/server.h b/src/server.h index 0d4c7bd73ca..53c5dfe1f27 100644 --- a/src/server.h +++ b/src/server.h @@ -3993,6 +3993,7 @@ int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); diff --git a/tests/unit/introspection-2.tcl b/tests/unit/introspection-2.tcl index 1e98fdb6a02..32932ef1a86 100644 --- a/tests/unit/introspection-2.tcl +++ b/tests/unit/introspection-2.tcl @@ -189,6 +189,17 @@ start_server {tags {"introspection"}} { assert_equal {key1 key2} [r command getkeys lcs key1 key2] } + test {COMMAND GETKEYS PFMERGE with and without source keys} { + # dest + sources: both key specs yield keys + assert_equal {dest src1 src2} [r command getkeys PFMERGE dest src1 src2] + + # dest only, no source keys: spec[1] yields empty range (last < first). + # Without pfmergeGetKeys this returned "Invalid arguments" because + # getKeysUsingKeySpecs treated the empty range as invalid_spec, + # discarding the dest key found by spec[0]. + assert_equal {dest} [r command getkeys PFMERGE dest] + } + test {COMMAND GETKEYS MORE THAN 256 KEYS} { set all_keys [list] set numkeys 260 From 1be85ca8a92f697e3f41e7ffc26e80c30db7c134 Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Mon, 13 Apr 2026 09:46:46 +0300 Subject: [PATCH 03/17] Fix HEXPIRE numfields overflow (#15021) Validate HEXPIRE-family field counts without parser overflow keep flexible option order; only require fields fit in argv add tests for INT_MAX numfields across HEXPIRE/HPEXPIRE/HEXPIREAT/HPEXPIREAT (cherry picked from commit e1d35aca01c4240fa6c3feac55b00e9c1640abc0) --- src/t_hash.c | 5 +++-- tests/unit/type/hash-field-expire.tcl | 5 +++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index df3ac615be9..270cb3d1ffa 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -3589,15 +3589,16 @@ static int parseHashCommandArgs(client *c, HashCommandArgs *args, &numFields, "Parameter `numFields` should be greater than 0") != C_OK) return C_ERR; - args->fieldCount = (int)numFields; args->firstFieldPos = i + 2; /* Check bounds - we must have exactly the right number of fields */ - if (args->firstFieldPos + args->fieldCount > c->argc) { + if (numFields > c->argc - args->firstFieldPos) { addReplyError(c, "wrong number of arguments"); return C_ERR; } + args->fieldCount = (int)numFields; + /* Skip over the field arguments */ i = args->firstFieldPos + args->fieldCount - 1; continue; diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 3c0f8fbccfe..13bfbb250ab 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -2349,6 +2349,11 @@ start_server {tags {"hash"}} { assert_error {*Parameter*numFields*should be greater than 0*} {r HEXPIRE myhash 60 FIELDS -1 f1} assert_error {*invalid number of fields*} {r HSETEX myhash FIELDS 0 f1 v1 EX 60} assert_error {*invalid number of fields*} {r HGETEX myhash FIELDS 0 f1 EX 60} + set future_sec [expr {[clock seconds] + 60}] + set future_ms [expr {[clock milliseconds] + 60000}] + foreach {cmd expire} [list HEXPIRE 60 HPEXPIRE 60000 HEXPIREAT $future_sec HPEXPIREAT $future_ms] { + assert_error {*wrong number of arguments*} [list r $cmd myhash $expire FIELDS 2147483647 f1] + } # Test missing FIELDS keyword assert_error {*unknown argument*} {r HEXPIRE myhash 60 2 f1 f2} From 6c6779f33222d51c6263eafc1ceb1038cf674262 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 16 Apr 2026 21:50:49 +0800 Subject: [PATCH 04/17] Fix decrRefCount on NULL robj on corrupt KEY_META payload (#15034) This PR fixes two issues when processing corrupt data in rdbLoadCheckModuleValue(): 1. When handling `RDB_MODULE_OPCODE_STRING` opcode, rdbGenericLoadStringObject() can return NULL on a corrupt payload. The code called decrRefCount(o) unconditionally without a NULL check, resulting in a NULL pointer dereference crash. 2. The while loop condition was `!= RDB_MODULE_OPCODE_EOF`, which means a truncated payload (causing rdbLoadLen to return RDB_LENERR) would never exit the loop, since `RDB_LENERR != RDB_MODULE_OPCODE_EOF` is always true, potentially causing an infinite hang. (cherry picked from commit ca6e471a3fe69c7b0af04c7fb6827c6cd88b5a6e) --- src/keymeta.c | 2 +- src/rdb.c | 27 ++++++++++++++++++++++----- src/rdb.h | 2 +- src/redis-check-rdb.c | 4 ++-- tests/integration/corrupt-dump.tcl | 9 +++++++++ 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/keymeta.c b/src/keymeta.c index 530ea049300..829458daec7 100644 --- a/src/keymeta.c +++ b/src/keymeta.c @@ -416,7 +416,7 @@ int rdbLoadSkipMetaIfAllowed(rio *rdb, char *cname, int flags) { * * Note: rdbLoadCheckModuleValue() reads opcodes until it finds RDB_MODULE_OPCODE_EOF, * so it consumes the EOF marker as well. We don't need to read it separately. */ - robj *dummy = rdbLoadCheckModuleValue(rdb, cname); + robj *dummy = rdbLoadCheckModuleValue(rdb, cname, 1); if (dummy == NULL) { serverLog(LL_WARNING, "Corrupted metadata value for class '%s'", cname); return -1; diff --git a/src/rdb.c b/src/rdb.c index ac66fcec060..5766335e257 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1919,11 +1919,18 @@ void rdbRemoveTempFile(pid_t childpid, int from_signal) { /* This function is called by rdbLoadObject() when the code is in RDB-check * mode and we find a module value of type 2 that can be parsed without - * the need of the actual module. The value is parsed for errors, finally - * a dummy redis object is returned just to conform to the API. */ -robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { + * the need of the actual module. The value is parsed for errors. + * If null_on_error is true, NULL is returned when data corruption is detected; + * otherwise a dummy redis object is always returned regardless of success or + * failure. */ +robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename, int null_on_error) { uint64_t opcode; while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) { + if (opcode == RDB_LENERR) { + rdbReportCorruptRDB("Error reading module opcode length from module %s value", modulename); + goto error; + } + if (opcode == RDB_MODULE_OPCODE_SINT || opcode == RDB_MODULE_OPCODE_UINT) { @@ -1931,12 +1938,14 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { if (rdbLoadLenByRef(rdb,NULL,&len) == -1) { rdbReportCorruptRDB( "Error reading integer from module %s value", modulename); + goto error; } } else if (opcode == RDB_MODULE_OPCODE_STRING) { robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); if (o == NULL) { rdbReportCorruptRDB( "Error reading string from module %s value", modulename); + goto error; } decrRefCount(o); } else if (opcode == RDB_MODULE_OPCODE_FLOAT) { @@ -1944,16 +1953,24 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { if (rdbLoadBinaryFloatValue(rdb,&val) == -1) { rdbReportCorruptRDB( "Error reading float from module %s value", modulename); + goto error; } } else if (opcode == RDB_MODULE_OPCODE_DOUBLE) { double val; if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) { rdbReportCorruptRDB( "Error reading double from module %s value", modulename); + goto error; } + } else { + rdbReportCorruptRDB( + "Unknown module opcode %llu reading module %s value", (unsigned long long)opcode, modulename); + goto error; } } return createStringObject("module-dummy-value",18); +error: + return null_on_error ? NULL : createStringObject("module-dummy-value",18); } /* Load object type and optional key metadata (into `keymeta`) from RDB stream. @@ -3428,7 +3445,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) if (rdbCheckMode) { char name[10]; moduleTypeNameByID(name,moduleid); - return rdbLoadCheckModuleValue(rdb,name); + return rdbLoadCheckModuleValue(rdb, name, 0); } if (mt == NULL) { @@ -3879,7 +3896,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin continue; } else { /* RDB check mode. */ - robj *aux = rdbLoadCheckModuleValue(rdb,name); + robj *aux = rdbLoadCheckModuleValue(rdb, name, 0); decrRefCount(aux); continue; /* Read next opcode. */ } diff --git a/src/rdb.h b/src/rdb.h index a020ff62cef..c379ff007f8 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -150,7 +150,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid); ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); -robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename); +robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename, int null_on_error); int rdbResolveKeyType(rio *rdb, int *type, int dbid, KeyMetaSpec *keymeta); robj *rdbLoadStringObject(rio *rdb); ssize_t rdbSaveStringObject(rio *rdb, robj *obj); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 11781dd61b7..9624bf3ff6f 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -254,7 +254,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { uint32_t classSpec; if (rioRead(&rdb, &classSpec, 4) == 0) goto eoferr; /* Skip module value using rdbLoadCheckModuleValue */ - robj *o = rdbLoadCheckModuleValue(&rdb, "metadata"); + robj *o = rdbLoadCheckModuleValue(&rdb, "metadata", 1); if (o == NULL) goto eoferr; decrRefCount(o); } @@ -324,7 +324,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { moduleTypeNameByID(name,moduleid); rdbCheckInfo("MODULE AUX for: %s", name); - robj *o = rdbLoadCheckModuleValue(&rdb,name); + robj *o = rdbLoadCheckModuleValue(&rdb, name, 0); decrRefCount(o); continue; /* Read type again. */ } else if (type == RDB_OPCODE_FUNCTION_PRE_GA) { diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl index 13c97a996eb..3f3768dda8a 100644 --- a/tests/integration/corrupt-dump.tcl +++ b/tests/integration/corrupt-dump.tcl @@ -1048,6 +1048,15 @@ test {corrupt payload: stream - duplicated consumer PEL entry} { } } +test {corrupt payload: fuzzer findings - decrRefCount on NULL robj on corrupt KEY_META payload} { + start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { + r config set sanitize-dump-payload no + r debug set-skip-checksum-validation 1 + catch {r restore key 0 "\xF3\x02\x01\x0D\x00\x54\x23\x3F\xC9\x82\x32\x05\x8D" replace} err + assert_match "*Bad data format*" $err + r ping + } +} } ;# tags From 4fef0980e9de7809f3b6774e076d3ba4c81437ef Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Tue, 14 Apr 2026 19:26:42 +0800 Subject: [PATCH 05/17] Fix wrong argv index in xinfoReplyWithStreamInfo for slot alloc size tracking (#15037) `xinfoReplyWithStreamInfo` passed the wrong key(c->argv[1]) instead of `c->argv[2]` to `updateSlotAllocSize` when updating per-slot memory tracking. Fix by passing the key explicitly to `xinfoReplyWithStreamInfo` instead of relying on a hardcoded argv index. Also, add the `-DDEBUG_ASSERTIONS` flag to the test-ubuntu-jemalloc CI to cover this debug assertion. (cherry picked from commit 2049c7fe32f08841c57d50b595986760ed8d9bd0) --- .github/workflows/daily.yml | 2 +- src/t_stream.c | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 0e478bb771f..5cc49c75711 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -52,7 +52,7 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror -DREDIS_TEST' + run: make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERTIONS' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test diff --git a/src/t_stream.c b/src/t_stream.c index cb20720fc52..0c45535d1f1 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -4802,7 +4802,7 @@ void xtrimCommand(client *c) { /* Helper function for xinfoCommand. * Handles the variants of XINFO STREAM */ -void xinfoReplyWithStreamInfo(client *c, kvobj *kv) { +void xinfoReplyWithStreamInfo(client *c, robj *key, kvobj *kv) { stream *s = kv->ptr; int full = 1; long long count = 10; /* Default COUNT is 10 so we don't block the server */ @@ -5032,7 +5032,7 @@ void xinfoReplyWithStreamInfo(client *c, kvobj *kv) { } } if (server.memory_tracking_enabled) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),kv,old_alloc,kvobjAllocSize(kv)); + updateSlotAllocSize(c->db,getKeySlot(key->ptr),kv,old_alloc,kvobjAllocSize(kv)); } /* XINFO CONSUMERS @@ -5136,7 +5136,7 @@ NULL raxStop(&ri); } else if (!strcasecmp(opt,"STREAM")) { /* XINFO STREAM [FULL [COUNT ]]. */ - xinfoReplyWithStreamInfo(c,kv); + xinfoReplyWithStreamInfo(c,key,kv); } else { addReplySubcommandSyntaxError(c); } From 651cf097d624012f1f683cea2c27bc65378562a4 Mon Sep 17 00:00:00 2001 From: Darsheel Rathore Date: Thu, 23 Apr 2026 14:14:36 +0530 Subject: [PATCH 06/17] Fix use-after-free in RM_RegisterClusterMessageReceiver() (#15059) RM_RegisterClusterMessageReceiver() unlinks a receiver node from the clusterReceivers[type] linked list when the callback is set to NULL, but when removing the head node (prev == NULL), the code updates clusterReceivers[type]->next instead of clusterReceivers[type] itself. This leaves clusterReceivers[type] pointing to the freed node, so any later traversal through clusterReceivers[type] dereferences a dangling pointer. Fix by updating clusterReceivers[type] directly when prev == NULL. Fixes #15057 --------- Co-authored-by: debing.sun (cherry picked from commit 303667a40cdf5032b1044e94dfc6860a15414e03) --- src/module.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index 3a2715e7f90..31e4a09c454 100644 --- a/src/module.c +++ b/src/module.c @@ -9492,7 +9492,7 @@ void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisM if (prev) prev->next = r->next; else - clusterReceivers[type]->next = r->next; + clusterReceivers[type] = r->next; /* Update the head */ zfree(r); } return; From 35dc32a67f7a86c4bc2732a09e71a260f740e8d9 Mon Sep 17 00:00:00 2001 From: Raj Kripal Danday Date: Mon, 27 Apr 2026 18:24:47 -0700 Subject: [PATCH 07/17] tracking: fix self-overlap returning non-zero loop index (#15073) Fixes checkPrefixCollisionsOrReply() to return 0 (failure) on any provided-prefix self-overlap, instead of accidentally returning a non-zero loop index for overlaps found after the first prefix. Signed-off-by: Raj Danday (cherry picked from commit 625b6f58f691bfee045fd3a45b32477df8cc7404) --- src/tracking.c | 2 +- tests/unit/tracking.tcl | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index 256b8b6b1f6..0f3ced61ad2 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -125,7 +125,7 @@ int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) { "Prefixes for a single client must not overlap.", (unsigned char *)prefixes[i]->ptr, (unsigned char *)prefixes[j]->ptr); - return i; + return 0; } } } diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 666b5930e43..174575eee98 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -413,6 +413,18 @@ start_server {tags {"tracking network logreqres:skip"}} { $r CLIENT TRACKING OFF } + test {BCAST prefix self-overlap past first index reports error without enabling} { + # When any of the provided BCAST prefixes overlap with each other, + # CLIENT TRACKING ON must reply with a single error and leave tracking + # disabled, regardless of the position of the overlapping prefix in + # the argument list. + r CLIENT TRACKING OFF + catch {r CLIENT TRACKING ON BCAST PREFIX BAZ PREFIX FOOBAR PREFIX FOO} output + assert_match {ERR Prefix 'FOOBAR' overlaps with another provided prefix 'FOO'*} $output + # Tracking must not have been enabled after the overlap error. + assert_match {*flags off*} [r CLIENT TRACKINGINFO] + } + test {hdel deliver invalidate message after response in the same connection} { r CLIENT TRACKING off r HELLO 3 From 5b190f3e650b08b056d7b785157620cdd0539346 Mon Sep 17 00:00:00 2001 From: Vitah Lin Date: Thu, 23 Apr 2026 17:38:42 +0800 Subject: [PATCH 08/17] Fix signed integer overflow in scan count parameter (#14982) ### Problem In `scanGenericCommand`, `maxiterations = count * 10` overflows when `count > LONG_MAX / 10`, causing undefined behavior. ### Changed 1. Use saturating arithmetic to prevent overflow. 2. Added a test to trigger the overflow path, detectable by UBSan. (cherry picked from commit fafc47251afce2e55b917c088fbb63217b5241cb) --- src/db.c | 2 +- tests/unit/scan.tcl | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index 8219d4435ea..fbc81c46a3b 100644 --- a/src/db.c +++ b/src/db.c @@ -1873,7 +1873,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { * COUNT, so if the hash table is in a pathological state (very * sparsely populated) we avoid to block too much time at the cost * of returning no or very few elements. */ - long maxiterations = count*10; + long maxiterations = (count > LONG_MAX / 10) ? LONG_MAX : count * 10; /* We pass scanData which have three pointers to the callback: * 1. data.keys: the list to which it will add new elements; diff --git a/tests/unit/scan.tcl b/tests/unit/scan.tcl index 6a092cb4e95..c3ec5f273c1 100644 --- a/tests/unit/scan.tcl +++ b/tests/unit/scan.tcl @@ -471,6 +471,21 @@ proc test_scan {type} { } } + test "{$type} SCAN COUNT overflow" { + r flushdb + populate 10 + + # count = LONG_MAX/10 + 1, within LONG_MAX so it parses fine, + # but count*10 overflows signed long which is undefined behavior. + # Compute dynamically to support both 32-bit and 64-bit builds. + set long_max [expr {[s arch_bits] == 32 ? 2147483647 : 9223372036854775807}] + set big_count [expr {$long_max / 10 + 1}] + set res [r scan 0 count $big_count] + assert {[llength $res] == 2} + assert_equal 0 [lindex $res 0] + assert_equal 10 [llength [lindex $res 1]] + } + test "{$type} SCAN MATCH pattern implies cluster slot" { # Tests the code path for an optimization for patterns like "{foo}-*" # which implies that all matching keys belong to one slot. From 76d3d6e1c5296797d3bc87b3f950d40d65fc101c Mon Sep 17 00:00:00 2001 From: hristostaykov-del Date: Tue, 5 May 2026 11:04:28 +0300 Subject: [PATCH 09/17] Fix Sentinel config injection via SENTINEL SET (#14970) Reject control characters (0x00-0x1F, 0x7F) in user-controlled string arguments to SENTINEL SET, SENTINEL MONITOR, and SENTINEL CONFIG SET to prevent newline injection into the persisted config file. An attacker with access to SENTINEL SET could inject arbitrary config directives (e.g. notification-script) by embedding \r\n in auth-pass or similar fields, leading to code execution on restart. As a defense-in-depth measure, config serialization now uses sdscatrepr (via sentinelSdscatConfigArg) for all user-controlled string fields when they contain characters that require escaping. Simple values remain unquoted for backward compatibility with older config parsers. Add comprehensive Sentinel tests (16-config-injection.tcl) covering control character rejection for all affected commands, verification that injection payloads do not pollute the config file, round-trip persistence of values containing spaces and quotes through restart, and backward compatibility with the old unquoted config format. (cherry picked from commit 3e1afec688dc3f9354277ce6ad522996ebe4f2a6) --- src/sentinel.c | 108 +++++-- tests/sentinel/tests/16-config-injection.tcl | 312 +++++++++++++++++++ 2 files changed, 393 insertions(+), 27 deletions(-) create mode 100644 tests/sentinel/tests/16-config-injection.tcl diff --git a/src/sentinel.c b/src/sentinel.c index f6a1f75bd3e..372e4b64067 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -458,6 +458,25 @@ const char *preMonitorCfgName[] = { "announce-hostnames" }; +/* Returns 1 if the string contains control characters (0x00-0x1F or 0x7F), + * which must be rejected to prevent config injection via newlines/etc. */ +int sentinelStringContainsControlChars(sds s) { + for (size_t i = 0; i < sdslen(s); i++) { + unsigned char c = (unsigned char)s[i]; + if (c < 0x20 || c == 0x7F) return 1; + } + return 0; +} + +/* Append an sds value to dest, quoting it with sdscatrepr only if the value + * contains characters that need escaping (spaces, quotes, control chars, etc.). + * Simple values are appended as-is, preserving the traditional config format. */ +static sds sentinelSdscatConfigArg(sds dest, sds value) { + if (sdsneedsrepr(value)) + return sdscatrepr(dest, value, sdslen(value)); + return sdscatsds(dest, value); +} + /* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ void initSentinelConfig(void) { @@ -2048,8 +2067,13 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel monitor */ master = dictGetVal(de); master_addr = sentinelGetCurrentMasterAddress(master); + + /* Pre-compute the safely-formatted master name for config serialization. + * Only quoted if it contains characters requiring escaping. */ + sds qname = sentinelSdscatConfigArg(sdsempty(), master->name); + line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d", - master->name, announceSentinelAddr(master_addr), master_addr->port, + qname, announceSentinelAddr(master_addr), master_addr->port, master->quorum); rewriteConfigRewriteLine(state,"sentinel monitor",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2058,7 +2082,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->down_after_period != sentinel_default_down_after) { line = sdscatprintf(sdsempty(), "sentinel down-after-milliseconds %s %ld", - master->name, (long) master->down_after_period); + qname, (long) master->down_after_period); rewriteConfigRewriteLine(state,"sentinel down-after-milliseconds",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2067,7 +2091,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->failover_timeout != sentinel_default_failover_timeout) { line = sdscatprintf(sdsempty(), "sentinel failover-timeout %s %ld", - master->name, (long) master->failover_timeout); + qname, (long) master->failover_timeout); rewriteConfigRewriteLine(state,"sentinel failover-timeout",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2077,42 +2101,38 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) { line = sdscatprintf(sdsempty(), "sentinel parallel-syncs %s %d", - master->name, master->parallel_syncs); + qname, master->parallel_syncs); rewriteConfigRewriteLine(state,"sentinel parallel-syncs",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } /* sentinel notification-script */ if (master->notification_script) { - line = sdscatprintf(sdsempty(), - "sentinel notification-script %s %s", - master->name, master->notification_script); + line = sdscatprintf(sdsempty(), "sentinel notification-script %s ", qname); + line = sentinelSdscatConfigArg(line, master->notification_script); rewriteConfigRewriteLine(state,"sentinel notification-script",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } /* sentinel client-reconfig-script */ if (master->client_reconfig_script) { - line = sdscatprintf(sdsempty(), - "sentinel client-reconfig-script %s %s", - master->name, master->client_reconfig_script); + line = sdscatprintf(sdsempty(), "sentinel client-reconfig-script %s ", qname); + line = sentinelSdscatConfigArg(line, master->client_reconfig_script); rewriteConfigRewriteLine(state,"sentinel client-reconfig-script",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } /* sentinel auth-pass & auth-user */ if (master->auth_pass) { - line = sdscatprintf(sdsempty(), - "sentinel auth-pass %s %s", - master->name, master->auth_pass); + line = sdscatprintf(sdsempty(), "sentinel auth-pass %s ", qname); + line = sentinelSdscatConfigArg(line, master->auth_pass); rewriteConfigRewriteLine(state,"sentinel auth-pass",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } if (master->auth_user) { - line = sdscatprintf(sdsempty(), - "sentinel auth-user %s %s", - master->name, master->auth_user); + line = sdscatprintf(sdsempty(), "sentinel auth-user %s ", qname); + line = sentinelSdscatConfigArg(line, master->auth_user); rewriteConfigRewriteLine(state,"sentinel auth-user",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2121,7 +2141,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->master_reboot_down_after_period != 0) { line = sdscatprintf(sdsempty(), "sentinel master-reboot-down-after-period %s %ld", - master->name, (long) master->master_reboot_down_after_period); + qname, (long) master->master_reboot_down_after_period); rewriteConfigRewriteLine(state,"sentinel master-reboot-down-after-period",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2129,7 +2149,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel config-epoch */ line = sdscatprintf(sdsempty(), "sentinel config-epoch %s %llu", - master->name, (unsigned long long) master->config_epoch); + qname, (unsigned long long) master->config_epoch); rewriteConfigRewriteLine(state,"sentinel config-epoch",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2137,7 +2157,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel leader-epoch */ line = sdscatprintf(sdsempty(), "sentinel leader-epoch %s %llu", - master->name, (unsigned long long) master->leader_epoch); + qname, (unsigned long long) master->leader_epoch); rewriteConfigRewriteLine(state,"sentinel leader-epoch",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2158,7 +2178,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { slave_addr = master->addr; line = sdscatprintf(sdsempty(), "sentinel known-replica %s %s %d", - master->name, announceSentinelAddr(slave_addr), slave_addr->port); + qname, announceSentinelAddr(slave_addr), slave_addr->port); /* try to replace any known-slave option first if found */ if (rewriteConfigRewriteLine(state, "sentinel known-slave", sdsdup(line), 0) == 0) { rewriteConfigRewriteLine(state, "sentinel known-replica", line, 1); @@ -2176,7 +2196,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (ri->runid == NULL) continue; line = sdscatprintf(sdsempty(), "sentinel known-sentinel %s %s %d %s", - master->name, announceSentinelAddr(ri->addr), ri->addr->port, ri->runid); + qname, announceSentinelAddr(ri->addr), ri->addr->port, ri->runid); rewriteConfigRewriteLine(state,"sentinel known-sentinel",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2187,13 +2207,16 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { while((de = dictNext(&di2)) != NULL) { sds oldname = dictGetKey(de); sds newname = dictGetVal(de); - line = sdscatprintf(sdsempty(), - "sentinel rename-command %s %s %s", - master->name, oldname, newname); + line = sdscatprintf(sdsempty(), "sentinel rename-command %s ", qname); + line = sentinelSdscatConfigArg(line, oldname); + line = sdscatlen(line, " ", 1); + line = sentinelSdscatConfigArg(line, newname); rewriteConfigRewriteLine(state,"sentinel rename-command",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } dictResetIterator(&di2); + + sdsfree(qname); } /* sentinel current-epoch is a global state valid for all the masters. */ @@ -2221,7 +2244,8 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel sentinel-user. */ if (sentinel.sentinel_auth_user) { - line = sdscatprintf(sdsempty(), "sentinel sentinel-user %s", sentinel.sentinel_auth_user); + line = sdsnew("sentinel sentinel-user "); + line = sentinelSdscatConfigArg(line, sentinel.sentinel_auth_user); rewriteConfigRewriteLine(state,"sentinel sentinel-user",line,1); } else { rewriteConfigMarkAsProcessed(state,"sentinel sentinel-user"); @@ -2229,10 +2253,11 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel sentinel-pass. */ if (sentinel.sentinel_auth_pass) { - line = sdscatprintf(sdsempty(), "sentinel sentinel-pass %s", sentinel.sentinel_auth_pass); + line = sdsnew("sentinel sentinel-pass "); + line = sentinelSdscatConfigArg(line, sentinel.sentinel_auth_pass); rewriteConfigRewriteLine(state,"sentinel sentinel-pass",line,1); } else { - rewriteConfigMarkAsProcessed(state,"sentinel sentinel-pass"); + rewriteConfigMarkAsProcessed(state,"sentinel sentinel-pass"); } dictResetIterator(&di); @@ -3238,6 +3263,11 @@ void sentinelConfigSetCommand(client *c) { if (!(!strcasecmp(val->ptr, "debug") || !strcasecmp(val->ptr, "verbose") || !strcasecmp(val->ptr, "notice") || !strcasecmp(val->ptr, "warning") || !strcasecmp(val->ptr, "nothing"))) goto badfmt; + } else if (!strcasecmp(option, "announce-ip")) { + if (sentinelStringContainsControlChars(val->ptr)) { + addReplyErrorFormat(c, "'%s' must not contain control characters", option); + goto exit; + } } } @@ -4045,6 +4075,11 @@ NULL return; } + if (sentinelStringContainsControlChars(c->argv[2]->ptr)) { + addReplyError(c, "Master name must not contain control characters"); + return; + } + /* If resolve-hostnames is used, actual DNS resolution may take place. * Otherwise just validate address. */ @@ -4388,6 +4423,12 @@ void sentinelSetCommand(client *c) { goto seterr; } + if (sentinelStringContainsControlChars(value)) { + addReplyError(c, + "notification-script must not contain control characters"); + goto seterr; + } + if (strlen(value) && access(value,X_OK) == -1) { addReplyError(c, "Notification script seems non existing or non executable"); @@ -4407,6 +4448,12 @@ void sentinelSetCommand(client *c) { goto seterr; } + if (sentinelStringContainsControlChars(value)) { + addReplyError(c, + "client-reconfig-script must not contain control characters"); + goto seterr; + } + if (strlen(value) && access(value,X_OK) == -1) { addReplyError(c, "Client reconfiguration script seems non existing or " @@ -4450,6 +4497,13 @@ void sentinelSetCommand(client *c) { goto badfmt; } + if (sentinelStringContainsControlChars(oldname) || + sentinelStringContainsControlChars(newname)) { + addReplyError(c, + "rename-command arguments must not contain control characters"); + goto seterr; + } + /* Remove any older renaming for this command. */ dictDelete(ri->renamed_commands,oldname); diff --git a/tests/sentinel/tests/16-config-injection.tcl b/tests/sentinel/tests/16-config-injection.tcl new file mode 100644 index 00000000000..6aff07de989 --- /dev/null +++ b/tests/sentinel/tests/16-config-injection.tcl @@ -0,0 +1,312 @@ +# Test that control characters are rejected where appropriate, and that +# string values are safely quoted when persisted to disk. +# +# Config injection is prevented by sentinelSdscatConfigArg(), which escapes +# values containing special characters at persistence time. Fields like +# notification-script, rename-command, master name, and announce-ip also +# reject control characters at input time as an additional safeguard. + +source "../tests/includes/init-tests.tcl" + +# Helper: read the sentinel config file for a given sentinel id. +proc read_sentinel_config {id} { + set configfile [file join "sentinel_${id}" "sentinel.conf"] + set fp [open $configfile r] + set content [read $fp] + close $fp + return $content +} + +# Helper: count how many lines in the config match a pattern. +proc count_config_lines {content pattern} { + set count 0 + foreach line [split $content "\n"] { + if {[string match $pattern $line]} { + incr count + } + } + return $count +} + +# Helper: restart a (already stopped) sentinel and wait until it responds to PING. +proc start_sentinel_and_wait {sid} { + restart_instance sentinel $sid + wait_for_condition 200 50 { + [catch {S $sid PING}] == 0 + } else { + fail "Sentinel $sid did not restart in time" + } +} + +# Helper: kill sentinel, restart it, and wait until it responds to PING. +proc restart_sentinel_and_wait {sid} { + kill_instance sentinel $sid + start_sentinel_and_wait $sid +} + +# Helper: assert that the sentinel config file contains the expected substring. +proc assert_config_contains {sid expected} { + set content [read_sentinel_config $sid] + assert {[string first $expected $content] >= 0} +} + +# Helper: append lines to a sentinel's config file (sentinel must be stopped). +proc append_to_sentinel_config {sid lines} { + set configfile [file join "sentinel_${sid}" "sentinel.conf"] + set fp [open $configfile a] + foreach line $lines { + puts $fp $line + } + close $fp +} + +# Helper: create an executable script with spaces in its path. +# Returns the full path. Caller should "file delete -force" the directory. +proc create_script_with_spaces {sid} { + set script_dir [file join [pwd] "sentinel_${sid}" "script dir"] + file mkdir $script_dir + set script_path [file join $script_dir "my script.sh"] + set fp [open $script_path w] + puts $fp "#!/bin/sh" + close $fp + file attributes $script_path -permissions 0755 + return $script_path +} + +# -------------------------------------------------------------------------- +# Section 1: Control character rejection in SENTINEL SET +# -------------------------------------------------------------------------- + +test "SENTINEL SET notification-script rejects control characters" { + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster notification-script "/tmp/ok\n/tmp/evil.sh" + } +} + +test "SENTINEL SET client-reconfig-script rejects control characters" { + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster client-reconfig-script "/tmp/ok\n/tmp/evil.sh" + } +} + +test "SENTINEL SET rename-command rejects control characters" { + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster rename-command "CONFIG\nEVIL" "NEWCONFIG" + } + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster rename-command "CONFIG" "NEW\nCONFIG" + } +} + +# -------------------------------------------------------------------------- +# Section 2: Control character rejection in SENTINEL MONITOR +# -------------------------------------------------------------------------- + +test "SENTINEL MONITOR rejects master name with control characters" { + set port [get_instance_attrib redis 0 port] + assert_error "*must not contain control characters*" { + S 0 SENTINEL MONITOR "bad\nmaster" 127.0.0.1 $port 2 + } + assert_error "*must not contain control characters*" { + S 0 SENTINEL MONITOR "bad\rmaster" 127.0.0.1 $port 2 + } +} + +# -------------------------------------------------------------------------- +# Section 3: Control character rejection in SENTINEL CONFIG SET +# -------------------------------------------------------------------------- + +test "SENTINEL CONFIG SET announce-ip rejects control characters" { + catch {S 0 SENTINEL CONFIG SET announce-ip "1.2.3.4\nevil-directive"} e + assert_match "*must not contain control characters*" $e +} + +# -------------------------------------------------------------------------- +# Section 4: Config injection attempt does not pollute config file +# -------------------------------------------------------------------------- + +test "Newline injection in auth-pass does not pollute config file" { + # Auth-pass accepts control characters, but sentinelSdscatConfigArg + # escapes them at persistence time, preventing config injection. + S 0 SENTINEL SET mymaster auth-pass "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel auth-pass mymaster "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL SET mymaster auth-pass "" +} + +test "Newline injection in auth-user does not pollute config file" { + S 0 SENTINEL SET mymaster auth-user "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel auth-user mymaster "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL SET mymaster auth-user "" +} + +test "Newline injection in sentinel-pass does not pollute config file" { + S 0 SENTINEL CONFIG SET sentinel-pass "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel sentinel-pass "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL CONFIG SET sentinel-pass "" +} + +test "Newline injection in sentinel-user does not pollute config file" { + S 0 SENTINEL CONFIG SET sentinel-user "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel sentinel-user "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL CONFIG SET sentinel-user "" +} + +# -------------------------------------------------------------------------- +# Section 5: Values with special characters survive config round-trip +# -------------------------------------------------------------------------- + +test "auth-pass with special characters persists correctly through restart" { + S 0 SENTINEL SET mymaster auth-pass {my "comp#$&^`'!,lex pass} + set expected {sentinel auth-pass mymaster "my \"comp#$&^`'!,lex pass"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + S 0 SENTINEL SET mymaster auth-pass "" +} + +test "auth-user with spaces persists correctly through restart" { + S 0 SENTINEL SET mymaster auth-user {user with spaces} + set expected {sentinel auth-user mymaster "user with spaces"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + S 0 SENTINEL SET mymaster auth-user "" +} + +test "notification-script with spaces persists correctly through restart" { + set script_path [create_script_with_spaces 0] + S 0 SENTINEL SET mymaster notification-script $script_path + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + # The path must be quoted since it contains spaces. + assert {[string first "notification-script" $content] >= 0} + restart_sentinel_and_wait 0 + set info [S 0 SENTINEL MASTER mymaster] + set idx [lsearch $info "notification-script"] + assert {$idx >= 0} + assert_equal [lindex $info [expr {$idx+1}]] $script_path + S 0 SENTINEL SET mymaster notification-script "" + file delete -force [file dirname $script_path] +} + +test "client-reconfig-script with spaces persists correctly through restart" { + set script_path [create_script_with_spaces 0] + S 0 SENTINEL SET mymaster client-reconfig-script $script_path + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + # The path must be quoted since it contains spaces. + assert {[string first "client-reconfig-script" $content] >= 0} + restart_sentinel_and_wait 0 + set info [S 0 SENTINEL MASTER mymaster] + set idx [lsearch $info "client-reconfig-script"] + assert {$idx >= 0} + assert_equal [lindex $info [expr {$idx+1}]] $script_path + S 0 SENTINEL SET mymaster client-reconfig-script "" + file delete -force [file dirname $script_path] +} + +test "rename-command persists unquoted through restart" { + S 0 SENTINEL SET mymaster rename-command CONFIG CONF_RENAMED + set expected {sentinel rename-command mymaster CONFIG CONF_RENAMED} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + S 0 SENTINEL SET mymaster rename-command CONFIG CONFIG +} + +# -------------------------------------------------------------------------- +# Section 6: Backward compatibility -- old unquoted config format still loads +# -------------------------------------------------------------------------- + +test "Old unquoted config format for auth-pass and auth-user loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel auth-pass mymaster oldformatpass" + "sentinel auth-user mymaster oldformatuser" + } + start_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 "sentinel auth-pass mymaster oldformatpass" + assert_config_contains 0 "sentinel auth-user mymaster oldformatuser" + S 0 SENTINEL SET mymaster auth-pass "" + S 0 SENTINEL SET mymaster auth-user "" +} + +test "Old unquoted config format for rename-command loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel rename-command mymaster CONFIG NEWCONFIGNAME" + } + start_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 "sentinel rename-command mymaster CONFIG NEWCONFIGNAME" + S 0 SENTINEL SET mymaster rename-command CONFIG CONFIG +} + +test "Old unquoted config format for sentinel-pass loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel sentinel-pass oldsentinelpass" + } + start_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-pass] + assert_equal [lindex $result 1] "oldsentinelpass" + S 0 SENTINEL CONFIG SET sentinel-pass "" +} + +test "Old unquoted config format for sentinel-user loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel sentinel-user oldsentineluser" + } + start_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-user] + assert_equal [lindex $result 1] "oldsentineluser" + S 0 SENTINEL CONFIG SET sentinel-user "" +} + +# -------------------------------------------------------------------------- +# Section 7: Values with special characters survive config round-trip +# -------------------------------------------------------------------------- + +test "sentinel-pass with special characters persists correctly through restart" { + set test_pass {sentinel pass word} + S 0 SENTINEL CONFIG SET sentinel-pass $test_pass + set expected {sentinel sentinel-pass "sentinel pass word"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-pass] + assert_equal [lindex $result 1] $test_pass + S 0 SENTINEL CONFIG SET sentinel-pass "" +} + +test "sentinel-user with special characters persists correctly through restart" { + set test_user {sentinel user name} + S 0 SENTINEL CONFIG SET sentinel-user $test_user + set expected {sentinel sentinel-user "sentinel user name"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-user] + assert_equal [lindex $result 1] $test_user + S 0 SENTINEL CONFIG SET sentinel-user "" +} From 482aff898a9600e0aed5ae893234216f0eb11f89 Mon Sep 17 00:00:00 2001 From: Vitah Lin Date: Mon, 1 Jun 2026 13:31:20 +0800 Subject: [PATCH 10/17] Fix vector set tests to use RESP2 for default clients (#15287) ## Issue The vector set Python tests intentionally use two clients: - the default client (`self.redis`) for the existing RESP2-oriented test expectations - `self.redis3` for RESP3-specific coverage. However, the default client did not explicitly set a protocol, so it depended on redis-py's default behavior. With newer redis-py versions, RESP3 is now the default protocol(https://github.com/redis/redis-py/pull/4052). In particular, vector set replies such as `VSIM ... WITHSCORES` may be parsed into map/dict-like structures instead of the RESP2 flat-array shape assumed by existing tests. ## Changes Explicitly create the default primary and replica Redis clients with `protocol=2`. `self.redis3` is left unchanged and continues to use `protocol=3` for RESP3-specific test coverage. (cherry picked from commit 8fcf3dc86600f8f1bf9bb6e7aa69b2d4e67f5957) --- modules/vector-sets/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/vector-sets/test.py b/modules/vector-sets/test.py index 8d56d582766..0b512333641 100755 --- a/modules/vector-sets/test.py +++ b/modules/vector-sets/test.py @@ -96,10 +96,10 @@ def __init__(self, primary_port=6379, replica_port=6380): self.error_details = None self.test_key = f"test:{self.__class__.__name__.lower()}" # Primary Redis instance - self.redis = redis.Redis(port=primary_port,db=9) + self.redis = redis.Redis(port=primary_port,protocol=2,db=9) self.redis3 = redis.Redis(port=primary_port,protocol=3,db=9) # Replica Redis instance - self.replica = redis.Redis(port=replica_port,db=9) + self.replica = redis.Redis(port=replica_port,protocol=2,db=9) # Replication status self.replication_setup = False # Ports From e00a30036f9cbb97adb0627a316223826ddea91e Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 6 May 2026 09:46:17 +0800 Subject: [PATCH 11/17] Fix client output buffer memory tracking not accounting for copy-avoided bulk string references (#14934) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After #14608 (Reply Copy Avoidance), when copy avoidance kicks in, bulk string replies are sent by reference instead of being copied into the output buffer. The referenced bytes are not counted in `reply_bytes`, which causes: 1. `getClientOutputBufferMemoryUsage()` underestimates the actual memory usage, so output buffer limits may not be triggered in time, allowing clients to consume unbounded memory. 2. Client eviction does not account for the referenced bytes, making it ineffective when copy avoidance is used. 3. `omem` reported in `CLIENT LIST` / `CLIENT INFO` does not reflect the true output buffer memory footprint. Track the bytes of referenced bulk strings in the output buffer with two per-client counters: 1. reply_bytes_shared - the logical size of all BULK_STR_REF payloads in the output buffer. Updated incrementally whenever a reference is added/removed. Represents memory the client is "charged" for even though it is shared with the keyspace. 2. reply_bytes_unshared — the subset of the above where the referenced object's refcount == 1 (i.e. the key has been deleted from the keyspace), so the memory is kept alive solely by this client's output buffer and would actually be freed on disconnect. Maintained as a lazy cache refreshed via updateClientUnsharedReplyBytes(). CLIENT LIST / CLIENT INFO — two new fields, plus refined semantics for existing ones: Field | Meaning -- | -- omem | (semantics changed) logical output-buffer memory, now including shared memory referenced from the keyspace. Still excludes client->buf so static clients show 0. omem-shared | (new) shared output-buffer memory (referenced bulk strings, not solely owned by this client). omem-unshared | (new) unshared output-buffer memory (referenced bulk strings solely owned by this client; freed on disconnect). tot-mem | (semantics refined) actual memory usage — includes omem-unshared, excludes omem-shared to avoid double-counting keyspace memory. INFO memory — two new fields mirroring the above: Field | Meaning -- | -- mem_clients_normal | (semantics changed) actual memory usage of normal clients (includes unshared, excludes shared). mem_clients_normal_shared | (new) aggregate shared output-buffer memory across normal clients. mem_clients_normal_unshared | (new) aggregate unshared output-buffer memory across normal clients. MEMORY STATS — schema extended with the matching keys: Field | Meaning -- | -- clients.normal.shared | (new) aggregate shared output-buffer memory across normal clients. clients.normal.unshared | (new) aggregate unshared output-buffer memory across normal clients. Fix missing closeClientOnOutputBufferLimitReached() call when adding a referenced robj to the reply --------- Co-authored-by: oranagra (cherry picked from commit 05859cdd7ecd847c14403383f166783d8b7aee78) --- src/cluster_asm.c | 2 +- src/cluster_asm.h | 2 +- src/commands/memory-stats.json | 6 ++ src/evict.c | 2 +- src/module.c | 2 +- src/networking.c | 162 ++++++++++++++++++++++++++++----- src/object.c | 13 ++- src/replication.c | 2 +- src/script_lua.c | 2 +- src/server.c | 21 ++++- src/server.h | 8 +- tests/unit/client-eviction.tcl | 14 +-- tests/unit/info.tcl | 2 - tests/unit/introspection.tcl | 8 +- tests/unit/maxmemory.tcl | 17 ++-- tests/unit/memefficiency.tcl | 2 - tests/unit/obuf-limits.tcl | 80 +++++++++++++++- tests/unit/replybufsize.tcl | 5 +- 18 files changed, 281 insertions(+), 69 deletions(-) diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 044299e6d4a..63bb224534c 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -536,7 +536,7 @@ size_t asmGetImportInputBufferSize(void) { return 0; } -size_t asmGetMigrateOutputBufferSize(void) { +size_t asmGetMigrateOutputMemoryUsage(void) { if (!asmManager || listLength(asmManager->tasks) == 0) return 0; asmTask *task = listNodeValue(listFirst(asmManager->tasks)); diff --git a/src/cluster_asm.h b/src/cluster_asm.h index 841a55d76ab..d836dc6f24c 100644 --- a/src/cluster_asm.h +++ b/src/cluster_asm.h @@ -33,7 +33,7 @@ struct slotRangeArray *asmTaskGetSlotRanges(const char *task_id); int asmNotifyConfigUpdated(struct asmTask *task, sds *err); size_t asmGetPeakSyncBufferSize(void); size_t asmGetImportInputBufferSize(void); -size_t asmGetMigrateOutputBufferSize(void); +size_t asmGetMigrateOutputMemoryUsage(void); int clusterAsmCancel(const char *task_id, const char *reason); int clusterAsmCancelBySlot(int slot, const char *reason); int clusterAsmCancelBySlotRangeArray(struct slotRangeArray *slots, const char *reason); diff --git a/src/commands/memory-stats.json b/src/commands/memory-stats.json index 0e95e0f36db..4098c1b36dd 100644 --- a/src/commands/memory-stats.json +++ b/src/commands/memory-stats.json @@ -38,6 +38,12 @@ "clients.normal": { "type": "integer" }, + "clients.normal.shared": { + "type": "integer" + }, + "clients.normal.unshared": { + "type": "integer" + }, "cluster.links": { "type": "integer" }, diff --git a/src/evict.c b/src/evict.c index e287edec6e3..50037c6891d 100644 --- a/src/evict.c +++ b/src/evict.c @@ -349,7 +349,7 @@ size_t freeMemoryGetNotCountedMemory(void) { /* The migrate client is like a replica, we also push DELs into it when * evicting keys belonging to the migrating slot, so we don't count its * output buffer to avoid eviction loop. */ - overhead += asmGetMigrateOutputBufferSize(); + overhead += asmGetMigrateOutputMemoryUsage(); if (server.aof_state != AOF_OFF) { overhead += sdsAllocSize(server.aof_buf); diff --git a/src/module.c b/src/module.c index 31e4a09c454..3982bf22b3a 100644 --- a/src/module.c +++ b/src/module.c @@ -666,7 +666,7 @@ void moduleReleaseTempClient(client *c) { } clearClientConnectionState(c); listEmpty(c->reply); - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; c->duration = 0; resetClient(c, -1); serverAssert(c->all_argv_len_sum == 0); diff --git a/src/networking.c b/src/networking.c index 0230406a32f..625ac0615cf 100644 --- a/src/networking.c +++ b/src/networking.c @@ -37,6 +37,7 @@ static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten); static inline int _writeToClientSlave(client *c, ssize_t *nwritten); static pendingCommand *acquirePendingCommand(void); static void reclaimPendingCommand(client *c, pendingCommand *pcmd); +static size_t getClientOutputBufferLogicalSize(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_reusable_qb = NULL; @@ -212,7 +213,7 @@ client *createClient(connection *conn) { c->main_ch_client_id = 0; c->reply = listCreate(); c->deferred_reply_errors = NULL; - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); @@ -363,7 +364,7 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ -static int tryAddPayload(char *buf, size_t *used, size_t size, uint8_t type, const void *payload, size_t len) { +static int tryAddPayload(client *c, char *buf, size_t *used, size_t size, uint8_t type, const void *payload, size_t len) { if (*used + sizeof(payloadHeader) + len > size) return 0; /* Start a new payload chunk */ @@ -372,6 +373,13 @@ static int tryAddPayload(char *buf, size_t *used, size_t size, uint8_t type, con header->payload_len = len; memcpy((char *)header + sizeof(payloadHeader), payload, len); *used += sizeof(payloadHeader) + len; + + /* Track referenced reply bytes for copy avoidance. */ + if (type == BULK_STR_REF) { + const bulkStrRef *str_ref = (const bulkStrRef *)payload; + c->reply_bytes_shared += sdslen(str_ref->obj->ptr); + } + return 1; } @@ -391,8 +399,11 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl if (tail) { if (unlikely(tail->buf_encoded)) { /* Try to add to encoded buffer */ - if (tryAddPayload(tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)) { - len = 0; + if (tryAddPayload(c, tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)) { + /* For BULK_STR_REF payloads, tryAddPayload updates shared reply bytes + * which accounts for referenced strings. */ + if (encoded) closeClientOnOutputBufferLimitReached(c, 1); + return; } } else if (!encoded) { /* Both tail and new payload are non-encoded, can append directly */ @@ -420,7 +431,7 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl tail->used = 0; tail->buf_encoded = encoded; if (tail->buf_encoded) { - serverAssert(tryAddPayload(tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)); + serverAssert(tryAddPayload(c, tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)); } else { tail->used = len; memcpy(tail->buf, payload, len); @@ -452,7 +463,7 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le size_t available = c->buf_usable_size - c->bufpos; size_t reply_len = min(available, len); if (c->buf_encoded) { - if (!tryAddPayload(c->buf, &c->bufpos, c->buf_usable_size, payload_type, payload, len)) + if (!tryAddPayload(c, c->buf, &c->bufpos, c->buf_usable_size, payload_type, payload, len)) return 0; reply_len = len; } else { @@ -468,18 +479,27 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le /* Adds bulk string reference (i.e. pointer to object and pointer to string itself) to static buffer * Returns non-zero value if succeeded to add */ static size_t _addBulkStrRefToBuffer(client *c, const void *payload, size_t len) { + size_t result; if (!c->buf_encoded) { /* If buffer is plain and not empty then can't add bulk string reference to it */ if (c->bufpos) return 0; c->buf_encoded = 1; /* Set c->buf to encoded mode to allow bulk string reference to be stored in it */ - size_t result = _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); + result = _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); if (!result) { /* Failed to add bulk string reference to buffer, need to revert to plain mode. */ c->buf_encoded = 0; + return 0; } - return result; + } else { + result = _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); + if (!result) return 0; } - return _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); + + /* Even though the bulk string is stored by reference and the underlying + * memory is shared, we still account this shared memory towards this + * client's output buffer usage, so we need to check the output buffer limits. */ + closeClientOnOutputBufferLimitReached(c, 1); + return result; } void _addReplyToBufferOrList(client *c, const char *s, size_t len) { @@ -1472,6 +1492,7 @@ void AddReplyFromClient(client *dst, client *src) { /* Concatenate the reply list into the dest */ if (listLength(src->reply)) listJoin(dst->reply,src->reply); + serverAssert(src->reply_bytes_shared == 0); /* It is non-normal client, never has references. */ dst->reply_bytes += src->reply_bytes; src->reply_bytes = 0; src->bufpos = 0; @@ -1941,6 +1962,72 @@ void tryUnlinkClientFromPendingRefReply(client *c, int force) { } } +/* Count bytes in an encoded buffer where the client holds the last remaining + * reference to the underlying string object (refcount == 1), meaning the key + * has been deleted from the keyspace and only this client buffer keeps the + * memory alive. + * + * Note: when multiple clients share a reference to the same object, + * the object's refcount stays above 1 even after the key is deleted. In that + * case none of those clients will be counted here, so the shared memory is + * under-reported until all but one client has consumed its copy. */ +static size_t computeUnsharedReplyBytes(char *buf, size_t bufpos) { + size_t total = 0; + char *ptr = buf; + while (ptr < buf + bufpos) { + payloadHeader *header = (payloadHeader *)ptr; + ptr += sizeof(payloadHeader); + if (header->payload_type == BULK_STR_REF) { + bulkStrRef *str_ref = (bulkStrRef *)ptr; + if (str_ref->obj != NULL && str_ref->obj->refcount == 1) + total += sdslen(str_ref->obj->ptr); + } + ptr += header->payload_len; + } + return total; +} + +/* Update the client's unshared reply memory (solely owned). */ +void updateClientUnsharedReplyBytes(client *c) { + c->reply_bytes_unshared = 0; + + /* No shared memory means no unshared memory either. */ + if (c->reply_bytes_shared == 0) return; + + /* Scan the static output buffer. */ + if (c->buf_encoded) + c->reply_bytes_unshared += computeUnsharedReplyBytes(c->buf, c->bufpos); + + /* Scan each block in the reply list. */ + listIter reply_li; + listNode *reply_ln; + listRewind(c->reply, &reply_li); + while ((reply_ln = listNext(&reply_li))) { + clientReplyBlock *block = listNodeValue(reply_ln); + if (block == NULL) continue; /* deferred-length placeholder */ + if (block->buf_encoded) + c->reply_bytes_unshared += computeUnsharedReplyBytes(block->buf, block->used); + } +} + +/* Compute shared reply memory: total shared reply bytes and the unshared subset where the key + * has been deleted and the client buffer is the sole holder. */ +void getClientsSharedMemoryUsage(size_t *shared_mem, size_t *unshared_mem) { + listNode *ln; + listIter li; + listRewind(server.clients_with_pending_ref_reply, &li); + while ((ln = listNext(&li))) { + client *c = listNodeValue(ln); + + /* Total shared reply bytes (logical size, shared with keyspace). */ + *shared_mem += c->reply_bytes_shared; + + /* Unshared reply bytes: the client is the sole owner because the key was deleted. */ + updateClientUnsharedReplyBytes(c); + *unshared_mem += c->reply_bytes_unshared; + } +} + /* Clear the client state to resemble a newly connected client. */ void clearClientConnectionState(client *c) { listNode *ln; @@ -2035,6 +2122,7 @@ static void releaseBufReferences(client *c, char *buf, size_t bufpos) { bulkStrRef *str_ref = (bulkStrRef *)ptr; /* Only release if not already released. */ if (str_ref->obj != NULL) { + c->reply_bytes_shared -= sdslen(str_ref->obj->ptr); if (in_io_thread) ioDeferFreeRobj(c, str_ref->obj); else @@ -2451,6 +2539,7 @@ static payloadHeader *processSentDataInEncodedBuffer(client *c, char *start_ptr, return head; } *remaining -= (writen_len - *sentlen); + c->reply_bytes_shared -= sdslen(str_ref->obj->ptr); if (in_io_thread) { ioDeferFreeRobj(c, str_ref->obj); } else { @@ -2609,7 +2698,7 @@ static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten) { /* If there are no longer objects in the list, we expect * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); + serverAssert(c->reply_bytes == 0 && c->reply_bytes_shared == 0); } else if (c->bufpos > 0) { /* For encoded buffers, we need to use writev to handle bulk string references */ if (c->buf_encoded) { @@ -3980,8 +4069,12 @@ sds catClientInfoString(sds s, client *client) { } *p = '\0'; + /* Refresh the cached unshared reply bytes before computing memory stats below. */ + updateClientUnsharedReplyBytes(client); + /* Compute the total memory consumed by this client. */ - size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); + size_t obufmem = getClientOutputBufferLogicalSize(client); + size_t total_mem = getClientMemoryUsage(client); size_t used_blocks_of_repl_buf = 0; if (client->ref_repl_buf_node) { @@ -4013,8 +4106,10 @@ sds catClientInfoString(sds s, client *client) { " rbp=%U", (unsigned long long) client->buf_peak, " obl=%U", (unsigned long long) client->bufpos, " oll=%U", (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf, - " omem=%U", (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ - " tot-mem=%U", (unsigned long long) total_mem, + " omem=%U", (unsigned long long) obufmem, /* logical output buffer memory (includes shared memory; excludes client->buf so static clients show 0) */ + " omem-shared=%U", (unsigned long long) client->reply_bytes_shared, /* shared memory (not solely owned by this client) */ + " omem-unshared=%U", (unsigned long long) client->reply_bytes_unshared, /* unshared memory (solely owned by this client) */ + " tot-mem=%U", (unsigned long long) total_mem, /* actual memory usage (includes unshared memory, excludes shared memory) */ " events=%s", events, " cmd=%s", client->lastcmd ? client->lastcmd->fullname : "NULL", " user=%s", client->user ? client->user->name : "(superuser)", @@ -5035,11 +5130,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { /* This function returns the number of bytes that Redis is * using to store the reply still not read by the client. + * It does NOT include any referenced bytes (neither shared nor unshared). * * Note: this function is very fast so can be called as many time as * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ -size_t getClientOutputBufferMemoryUsage(client *c) { +static size_t getClientOutputBufferAllocSize(client *c) { if (unlikely(clientTypeIsSlave(c))) { size_t repl_buf_size = 0; size_t repl_node_num = 0; @@ -5057,22 +5153,38 @@ size_t getClientOutputBufferMemoryUsage(client *c) { } } +/* Returns the logical output buffer size for limit enforcement. + * This includes all shared memory (shared with the keyspace), ensuring that + * a client requesting huge amounts of data via copy-avoidance is still + * subject to output buffer limits. */ +static size_t getClientOutputBufferLogicalSize(client *c) { + size_t mem = getClientOutputBufferAllocSize(c); + if (!clientTypeIsSlave(c)) + mem += c->reply_bytes_shared; + return mem; +} + +/* Returns the actual memory used to store the reply not yet read by the client. + * This includes unshared memory (solely owned by this client), which would be + * freed when the client disconnects. */ +size_t getClientOutputBufferMemoryUsage(client *c) { + size_t mem = getClientOutputBufferAllocSize(c); + mem += c->reply_bytes_unshared; + return mem; +} + size_t getNormalClientPendingReplyBytes(client *c) { serverAssert(!clientTypeIsSlave(c)); - if (listLength(c->reply) == 0) return c->bufpos; + if (listLength(c->reply) == 0) return c->bufpos + c->reply_bytes_shared; clientReplyBlock *block = listNodeValue(listLast(c->reply)); - return (c->reply_bytes - block->size + block->used) + c->bufpos; + return (c->reply_bytes + c->reply_bytes_shared - block->size + block->used) + c->bufpos; } -/* Returns the total client's memory usage. - * Optionally, if output_buffer_mem_usage is not NULL, it fills it with - * the client output buffer memory usage portion of the total. */ -size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { +/* Returns the total client's memory usage. */ +size_t getClientMemoryUsage(client *c) { size_t mem = getClientOutputBufferMemoryUsage(c); - if (output_buffer_mem_usage != NULL) - *output_buffer_mem_usage = mem; mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; @@ -5150,7 +5262,7 @@ char *getClientTypeName(int class) { * Otherwise zero is returned. */ int checkClientOutputBufferLimits(client *c) { int soft = 0, hard = 0, class; - unsigned long used_mem = getClientOutputBufferMemoryUsage(c); + unsigned long used_mem = getClientOutputBufferLogicalSize(c); /* For unauthenticated clients the output buffer is limited to prevent * them from abusing it by not reading the replies */ @@ -5214,10 +5326,10 @@ int checkClientOutputBufferLimits(client *c) { * Returns 1 if client was (flagged) closed. */ int closeClientOnOutputBufferLimitReached(client *c, int async) { if (!c->conn) return 0; /* It is unsafe to free fake clients. */ - serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); + serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); /* actual memory only, logical memory may exceed SIZE_MAX */ /* Note that c->reply_bytes is irrelevant for replica clients * (they use the global repl buffers). */ - if ((c->reply_bytes == 0 && !clientTypeIsSlave(c)) || + if ((c->reply_bytes == 0 && c->reply_bytes_shared == 0 && !clientTypeIsSlave(c)) || c->flags & CLIENT_CLOSE_ASAP) return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); diff --git a/src/object.c b/src/object.c index d4915f2d675..31e12ca1baf 100644 --- a/src/object.c +++ b/src/object.c @@ -1327,6 +1327,9 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mem_total += mh->repl_backlog; mem_total += mh->clients_slaves; + /* Compute shared/unshared reply memory. */ + getClientsSharedMemoryUsage(&mh->clients_normal_shared, &mh->clients_normal_unshared); + /* Computing the memory used by the clients would be O(N) if done * here online. We use our values computed incrementally by * updateClientMemoryUsage(). */ @@ -1357,7 +1360,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { /* Cluster atomic slot migration buffers. */ mh->asm_import_input_buffer = asmGetImportInputBufferSize(); - mh->asm_migrate_output_buffer = asmGetMigrateOutputBufferSize(); + mh->asm_migrate_output_buffer = asmGetMigrateOutputMemoryUsage(); mem_total += mh->asm_import_input_buffer; mem_total += mh->asm_migrate_output_buffer; @@ -1682,7 +1685,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) { struct redisMemOverhead *mh = getMemoryOverheadData(); - addReplyMapLen(c,33+mh->num_dbs); + addReplyMapLen(c,35+mh->num_dbs); addReplyBulkCString(c,"peak.allocated"); addReplyLongLong(c,mh->peak_allocated); @@ -1705,6 +1708,12 @@ NULL addReplyBulkCString(c,"clients.normal"); addReplyLongLong(c,mh->clients_normal); + addReplyBulkCString(c,"clients.normal.shared"); + addReplyLongLong(c,mh->clients_normal_shared); + + addReplyBulkCString(c,"clients.normal.unshared"); + addReplyLongLong(c,mh->clients_normal_unshared); + addReplyBulkCString(c,"cluster.links"); addReplyLongLong(c,mh->cluster_links); diff --git a/src/replication.c b/src/replication.c index ab2c88f639c..c7c1e284aab 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4472,7 +4472,7 @@ void replicationCacheMaster(client *c) { if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; c->bufpos = 0; resetClient(c, -1); resetClientQbufState(c); diff --git a/src/script_lua.c b/src/script_lua.c index dbb60a39cd0..f6b61952b3f 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -961,7 +961,7 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) { ldbLogRedisReply(reply); if (reply != c->buf) sdsfree(reply); - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; cleanup: /* Clean up. Command code may have changed argv/argc so we use the diff --git a/src/server.c b/src/server.c index 241fe69aec2..10572eaeb6b 100644 --- a/src/server.c +++ b/src/server.c @@ -1035,7 +1035,8 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) { */ void updateClientMemoryUsage(client *c) { serverAssert(c->conn); - size_t mem = getClientMemoryUsage(c, NULL); + size_t mem = getClientMemoryUsage(c); + int type = getClientType(c); /* Now that we have the memory used by the client, remove the old * value from the old category, and add it back. */ @@ -1104,6 +1105,20 @@ int updateClientMemUsageAndBucket(client *c) { return 0; } + /* Include unshared reply bytes in the client's memory usage for eviction. + * Walking the reply buffer is costly, so skip the scan when its outcome + * cannot affect bucket placement: since 0 <= unshared <= shared, if both + * endpoints map to the same bucket the cached value is reused. */ + if (c->reply_bytes_shared > 0) { + size_t lower_bound = getClientMemoryUsage(c) - c->reply_bytes_unshared; + size_t upper_bound = lower_bound + c->reply_bytes_shared; + if (getMemUsageBucket(lower_bound) != getMemUsageBucket(upper_bound)) + updateClientUnsharedReplyBytes(c); + } else { + /* No shared bytes: clear any stale cached unshared. */ + c->reply_bytes_unshared = 0; + } + /* Update client memory usage. */ updateClientMemoryUsage(c); @@ -6330,7 +6345,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "mem_total_replication_buffers:%zu\r\n", server.repl_buffer_mem + server.repl_full_sync_buffer.mem_used, "mem_replica_full_sync_buffer:%zu\r\n", server.repl_full_sync_buffer.mem_used, "mem_clients_slaves:%zu\r\n", mh->clients_slaves, - "mem_clients_normal:%zu\r\n", mh->clients_normal, + "mem_clients_normal:%zu\r\n", mh->clients_normal, /* actual memory usage (includes unshared memory, excludes shared memory) */ + "mem_clients_normal_shared:%zu\r\n", mh->clients_normal_shared, /* shared memory (not solely owned by this client) */ + "mem_clients_normal_unshared:%zu\r\n", mh->clients_normal_unshared, /* unshared memory (solely owned by this client) */ "mem_cluster_slot_migration_output_buffer:%zu\r\n", mh->asm_migrate_output_buffer, "mem_cluster_slot_migration_input_buffer:%zu\r\n", mh->asm_import_input_buffer, "mem_cluster_slot_migration_input_buffer_peak:%zu\r\n", asmGetPeakSyncBufferSize(), diff --git a/src/server.h b/src/server.h index 53c5dfe1f27..045ebd29e5b 100644 --- a/src/server.h +++ b/src/server.h @@ -1461,6 +1461,8 @@ typedef struct client { long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ + unsigned long long reply_bytes_shared; /* Bytes shared with keyspace objects in reply list. */ + unsigned long long reply_bytes_unshared; /* Cached subset of reply_bytes_shared solely owned by this client. */ list *deferred_reply_errors; /* Used for module thread safe contexts. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ @@ -1761,6 +1763,8 @@ struct redisMemOverhead { size_t replica_fullsync_buffer; size_t clients_slaves; size_t clients_normal; + size_t clients_normal_shared; + size_t clients_normal_unshared; size_t cluster_links; size_t aof_buffer; size_t eval_caches; @@ -3175,7 +3179,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv); void redactClientCommandArgument(client *c, int argc); size_t getClientOutputBufferMemoryUsage(client *c); size_t getNormalClientPendingReplyBytes(client *c); -size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage); +size_t getClientMemoryUsage(client *c); +void updateClientUnsharedReplyBytes(client *c); +void getClientsSharedMemoryUsage(size_t *shared_mem, size_t *unshared_mem); int freeClientsInAsyncFreeQueue(void); int closeClientOnOutputBufferLimitReached(client *c, int async); int getClientType(client *c); diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl index 7f5d6691ad5..afe32e4f960 100644 --- a/tests/unit/client-eviction.tcl +++ b/tests/unit/client-eviction.tcl @@ -52,7 +52,6 @@ proc kb {v} { start_server {} { set maxmemory_clients 3000000 r config set maxmemory-clients $maxmemory_clients - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage test "client evicted due to large argv" { r flushdb @@ -328,7 +327,6 @@ start_server {} { set obuf_limit [mb 3] r config set maxmemory-clients $maxmemory_clients r config set client-output-buffer-limit "normal $obuf_limit 0 0" - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage test "avoid client eviction when client is freed by output buffer limit" { r flushdb @@ -391,7 +389,6 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage test "decrease maxmemory-clients causes client eviction" { set maxmemory_clients [mb 4] @@ -432,8 +429,6 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - test "evict clients only until below limit" { set client_count 10 set client_mem [mb 1] @@ -501,8 +496,6 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - test "evict clients in right order (large to small)" { # Note that each size step needs to be at least x2 larger than previous step # because of how the client-eviction size bucketing works @@ -571,15 +564,13 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - foreach type {"client no-evict" "maxmemory-clients disabled"} { r flushall r client no-evict on r config set maxmemory-clients 0 test "client total memory grows during $type" { - r setrange k [mb 1] v + r setrange k [kb 10] v ;# Keep value <= 16KB to avoid copy-avoidance, which shares memory and slows tot-mem growth. set rr [redis_client] $rr client setname test_client if {$type eq "client no-evict"} { @@ -591,8 +582,9 @@ start_server {} { # Fill output buffer in loop without reading it and make sure # the tot-mem of client has increased (OS buffers didn't swallow it) # and eviction not occurring. + set mget_args [lrepeat 100 k] ;# Use mget with 100 keys so each reply adds ~1MB to tot-mem, reaching 10MB faster. while {true} { - $rr get k + $rr mget {*}$mget_args $rr flush after 10 if {[client_field test_client tot-mem] > [mb 10]} { diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index ee6d6d19adf..d0246667111 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -393,8 +393,6 @@ start_server {tags {"info" "external:skip"}} { } test {stats: client input and output buffer limit disconnections} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - r config resetstat set info [r info stats] assert_equal [getInfoProperty $info client_query_buffer_limit_disconnections] {0} diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 47a5e211439..ac749111adb 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -21,9 +21,9 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { set client_list [r client list] if {[lindex [r config get io-threads] 1] == 1} { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list } else { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list } } @@ -36,9 +36,9 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { set client [r client info] if {[lindex [r config get io-threads] 1] == 1} { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client } else { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client } } diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index f9b36b49159..f86ed7ce802 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -15,7 +15,6 @@ start_server {tags {"maxmemory" "external:skip"}} { r config set maxmemory 11mb r config set maxmemory-policy allkeys-lru set server_pid [s process_id] - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage proc init_test {client_eviction} { r flushdb @@ -29,11 +28,11 @@ start_server {tags {"maxmemory" "external:skip"}} { } r config resetstat - # fill 5mb using 50 keys of 100kb - for {set j 0} {$j < 50} {incr j} { - r setrange $j 100000 x + # fill 5mb using 500 keys of 10kb + for {set j 0} {$j < 500} {incr j} { + r setrange key$j 10000 x } - assert_equal [r dbsize] 50 + assert_equal [r dbsize] 500 } # Return true if the eviction occurred (client or key) based on argument @@ -44,12 +43,12 @@ start_server {tags {"maxmemory" "external:skip"}} { if $client_eviction { if {[lindex [r config get io-threads] 1] == 1} { - return [expr $evicted_clients > 0 && $evicted_keys == 0 && $dbsize == 50] + return [expr $evicted_clients > 0 && $evicted_keys == 0 && $dbsize == 500] } else { - return [expr $evicted_clients >= 0 && $evicted_keys >= 0 && $dbsize <= 50] + return [expr $evicted_clients >= 0 && $evicted_keys >= 0 && $dbsize <= 500] } } else { - return [expr $evicted_clients == 0 && $evicted_keys > 0 && $dbsize < 50] + return [expr $evicted_clients == 0 && $evicted_keys > 0 && $dbsize < 500] } } @@ -84,7 +83,7 @@ start_server {tags {"maxmemory" "external:skip"}} { while {![check_eviction_test $client_eviction] && [expr [clock seconds] - $t] < 20} { foreach rr $clients { if {[catch { - $rr mget 1 + $rr mget key1 key2 key3 key4 key5 key6 key7 key8 key9 key10 $rr flush } err]} { lremove clients $rr diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 0ab12c6c7e4..895248606e0 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -83,8 +83,6 @@ run_solo {defrag} { # note: Disabling lookahead because it changes the number and order of allocations which interferes with defrag and causes tests to fail r config set lookahead 1 - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} { test "Active defrag main dictionary: $type" { r config set hz 100 diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 148187b739a..f58eeda8981 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -1,6 +1,4 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - test {CONFIG SET client-output-buffer-limit} { set oldval [lindex [r config get client-output-buffer-limit] 1] @@ -237,4 +235,82 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} { assert_match "*I/O error*" $e reconnect } + + test "zero-copy referenced reply bytes are reflected in memory stats" { + r flushdb + r config set client-output-buffer-limit {normal 0 0 0} + # Use a value large enough to trigger copy avoidance + set val_size 100000 + r set bigkey [string repeat v $val_size] + + # Use MULTI/EXEC so all observers see the zero-copy ref before it is sent. + r client setname refmem_test + r multi + r get bigkey ;# adds zero-copy ref to output buffer + r client list ;# per-client omem / omem-shared / omem-unshared / tot-mem + r info memory ;# global mem_clients_normal_shared / mem_clients_normal_unshared + r memory stats ;# clients.normal.shared and clients.normal.unshared + set res [r exec] + + # omem-shared tracks total shared reply bytes, key is still alive so omem-unshared must be 0. + set clients [split [string trim [lindex $res 1]] "\r\n"] + set c [lsearch -inline $clients *name=refmem_test*] + regexp {omem-shared=([0-9]+)} $c - omem_shared + regexp {omem-unshared=([0-9]+)} $c - omem_unshared + assert {$omem_shared >= $val_size} + assert_equal 0 $omem_unshared + + # mem_clients_normal_shared is incremented at write time, before the reply is sent + set info_mem [lindex $res 2] + assert {[getInfoProperty $info_mem mem_clients_normal_shared] >= $val_size} + assert_equal 0 [getInfoProperty $info_mem mem_clients_normal_unshared] + + # MEMORY STATS exposes the same shared bytes; normal.unshared is 0 since the key is still in keyspace + set mem_stats [lindex $res 3] + assert {[dict get $mem_stats clients.normal.shared] >= $val_size} + assert_equal 0 [dict get $mem_stats clients.normal.unshared] ;# key still in keyspace + + # After the reply is fully sent, the global counter must return to 0 + wait_for_condition 50 10 { + [s mem_clients_normal_shared] == 0 + } else { + fail "mem_clients_normal_shared did not return to 0 after reply was sent" + } + } + + test "shared reply bytes are tracked as unshared after the key is deleted" { + r flushdb + r config set client-output-buffer-limit {normal 0 0 0} + + set rr [redis_deferring_client] + $rr client setname test_client + $rr flush + + # Repeatedly SET/GET/DEL a big key on a deferred client and poll CLIENT LIST + # until omem-unshared on test_client reflects the referenced bytes. + set val_size 100000 + set deadline [expr {[clock milliseconds] + 5000}] + while {true} { + r set k [string repeat v $val_size] + $rr get k + $rr del k + $rr flush + after 10 + + set clients [split [r client list] "\r\n"] + set c [lsearch -inline $clients *name=test_client*] + regexp {omem-shared=([0-9]+)} $c - omem_shared + regexp {omem-unshared=([0-9]+)} $c - omem_unshared + if {$omem_unshared >= $val_size} { + assert_morethan_equal $omem_shared $omem_unshared + break + } + + if {[clock milliseconds] > $deadline} { + fail "timed out waiting for omem-unshared to reflect unshared bytes" + } + } + + $rr close + } } diff --git a/tests/unit/replybufsize.tcl b/tests/unit/replybufsize.tcl index 302417cf8f2..151d7757db4 100644 --- a/tests/unit/replybufsize.tcl +++ b/tests/unit/replybufsize.tcl @@ -13,7 +13,6 @@ start_server {tags {"replybufsize"}} { test {verify reply buffer limits} { # In order to reduce test time we can set the peak reset time very low r debug replybuffer peak-reset-time 100 - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage # Create a simple idle test client variable tc [redis_client] @@ -27,13 +26,13 @@ start_server {tags {"replybufsize"}} { fail "reply buffer of idle client is $rbs after 1 seconds" } - r set bigval [string repeat x 32768] + r set bigval [string repeat x 8192] ;# Keep value <= 16KB to avoid copy-avoidance, which shares memory and slows tot-mem growth. # In order to reduce test time we can set the peak reset time very low r debug replybuffer peak-reset-time never wait_for_condition 10 100 { - [$tc get bigval ; get_reply_buffer_size test_client] >= 16384 && [get_reply_buffer_size test_client] < 32768 + [$tc mget bigval bigval bigval bigval ; get_reply_buffer_size test_client] >= 16384 && [get_reply_buffer_size test_client] < 32768 } else { set rbs [get_reply_buffer_size test_client] fail "reply buffer of busy client is $rbs after 1 seconds" From 7607874e0013a62b471c4261f1703edea6c26443 Mon Sep 17 00:00:00 2001 From: Vitah Lin Date: Tue, 19 May 2026 18:27:33 +0800 Subject: [PATCH 12/17] Fix diskless replicas drop during rdb pipe test (#15131) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR is based on: valkey-io/valkey#3511 Close https://github.com/redis/redis/issues/14983 ## Summary During diskless replication, if **any single replica** cannot accept a write (TCP send buffer full / `EAGAIN`), the master stops reading the RDB pipe entirely, stalling data delivery to **all** replicas — including fast ones that are ready to receive data. The failure reason is similar to https://github.com/redis/redis/pull/14946, the socket buffer is more easy to fill. ## Root Cause In `rdbPipeReadHandler`, the master reads from the child's RDB pipe and writes to all replica sockets in a loop. When `connWrite` to any replica returns a partial write (socket send buffer full), the handler: 1. Installs a per-replica `rdbPipeWriteHandler` and increments `rdb_pipe_numconns_writing` 2. **Removes the pipe read event** via `aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE)`, stopping all pipe reads The pipe read event is only re-enabled when **all** pending write handlers complete (`rdb_pipe_numconns_writing == 0`), meaning the **slowest replica dictates the throughput for all replicas**. ## Observed Behavior With one slow replica (consuming at ~290 KB/s due to `key-load-delay`): - Master bursts ~1.3 MB of RDB data until the slow replica's socket send buffer fills - `rdbPipeReadHandler` disables the pipe read event - **All replicas starve for 4–5 seconds** while the slow replica drains its buffer - Cycle repeats: burst → stall → burst → stall Ultimately, it leads to a very slow synchronization process of the entire master and replica. ### Changes 1. Skip the entire `diskless replicas drop during rdb pipe` test under Valgrind to avoid timing flakiness on slow env. 2. Move `start_server` inside the `foreach all_drop` loop so each subcase gets a fresh master instead of sharing state across subcases. 3. For `no / slow / fast / all` subcases, replica 0 runs with `key-load-delay 500`, which combined with the blocked-writer TCP back-pressure can stall the RDB-saving child indefinitely; shrink the dataset to ~40 MB so the transfer still exercises the blocked-writer path but completes in reasonable time instead of hanging on the TCP deadlock. For the timeout subcase, replica 0 does not run with `key-load-delay 500`, so to avoid the TCP deadlock we still reduce the dataset somewhat, but keep it larger than the other subcases. Otherwise the kernel TCP send buffer can absorb the whole RDB, and we'd miss the repl_last_partial_write != 0 "(full sync)" timeout path and only hit the "(streaming sync)" path instead. 5. For the `all` subcase, set `rdb-key-save-delay 1000` on the master so the RDB child keeps generating data while both replicas are killed, ensuring the last-replica-drop path is exercised rather than racing with normal completion. 6. Move the slow-replica `pause_process()` so it happens only in the timeout subcase, not after killing replicas, so Redis observes the disconnect promptly in non-timeout flows. 7. In the timeout subcase, set `repl-timeout` 2, wait inline for `*Disconnecting timedout replica (full sync)*`, then restore `repl-timeout` 60 so the remaining replica can finish the streamed RDB. --------- Co-authored-by: Sarthak Aggarwal Co-authored-by: debing.sun (cherry picked from commit 31896140d1e940cad43d725e830cff0e49d060e5) --- tests/integration/replication.tcl | 111 +++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 31 deletions(-) diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0309099428b..709c8fa2134 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -880,27 +880,44 @@ proc compute_cpu_usage {start end} { return [ list $pucpu $pscpu ] } - +if {!$::valgrind} { # test diskless rdb pipe with multiple replicas, which may drop half way -start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { - set master [srv 0 client] - $master config set repl-diskless-sync yes - $master config set repl-diskless-sync-delay 5 - $master config set repl-diskless-sync-max-replicas 2 - set master_host [srv 0 host] - set master_port [srv 0 port] - set master_pid [srv 0 pid] - # put enough data in the db that the rdb file will be bigger than the socket buffers - # and since we'll have key-load-delay of 100, 20000 keys will take at least 2 seconds - # we also need the replica to process requests during transfer (which it does only once in 2mb) - $master debug populate 20000 test 10000 - $master config set rdbcompression no - $master config set repl-rdb-channel no - # If running on Linux, we also measure utime/stime to detect possible I/O handling issues - set os [catch {exec uname}] - set measure_time [expr {$os == "Linux"} ? 1 : 0] - foreach all_drop {no slow fast all timeout} { +foreach all_drop {no slow fast all timeout} { + start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { + set master [srv 0 client] + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 5 + $master config set repl-diskless-sync-max-replicas 2 + set master_host [srv 0 host] + set master_port [srv 0 port] + set master_pid [srv 0 pid] + if {$all_drop == "timeout"} { + # Use a larger RDB (~100 MB) so it cannot fit into the kernel TCP + # send buffer (autotuning can absorb tens of MB on some hosts). We + # need the primary to hit the blocked writer path + # (repl_last_partial_write != 0) while the slow replica is paused, + # so the cron triggers the "(full sync)" timeout path instead of + # the replica being moved to ONLINE prematurely and timing out via + # the "(streaming sync)" path. + $master debug populate 10000 test 10000 + } else { + # Put enough data in the db that the RDB is comfortably larger than the + # pipe and socket buffers so the primary can hit the blocked writer path, + # but keep it small enough that slow TLS CI runners don't spend minutes + # draining an oversized transfer (~40 MB uncompressed). + $master debug populate 4000 test 10000 + } + $master config set rdbcompression no + $master config set repl-rdb-channel no + # If running on Linux, we also measure utime/stime to detect possible I/O handling issues + set os [catch {exec uname}] + set measure_time [expr {$os == "Linux"} ? 1 : 0] + test "diskless $all_drop replicas drop during rdb pipe" { + # Reset config that the timeout subcase may change, so a failing + # subcase does not leave the next one with an aggressive timeout. + $master config set repl-timeout 60 + $master config set rdb-key-save-delay 0 set replicas {} set replicas_alive {} # start one replica that will read the rdb fast, and one that will be slow @@ -917,7 +934,24 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set loglines [count_log_lines -2] [lindex $replicas 0] config set repl-diskless-load swapdb [lindex $replicas 1] config set repl-diskless-load swapdb - [lindex $replicas 0] config set key-load-delay 100 ;# 20k keys and 100 microseconds sleep means at least 2 seconds + if {$all_drop == "all"} { + # Keep the RDB child generating data long enough for + # both replicas to be killed before the pipe reaches + # EOF, so this subcase still covers the last-replica + # drop path instead of racing with normal completion. + $master config set rdb-key-save-delay 1000 + } + # For non-timeout subcases, use key-load-delay to keep + # replica 0 as a steady slow reader for the entire RDB + # transfer. This keeps the expected diskless pipe code + # paths covered without accepting alternate log outcomes. + if {$all_drop != "timeout"} { + # 4k keys with 500 microseconds each keeps replica 0 + # slow for about 2 seconds, which is long enough to + # fill the pipe without turning the transfer into a + # multi-minute TLS run. + [lindex $replicas 0] config set key-load-delay 500 + } [lindex $replicas 0] replicaof $master_host $master_port [lindex $replicas 1] replicaof $master_host $master_port @@ -931,9 +965,16 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set start_time [clock seconds] } - # wait a while so that the pipe socket writer will be - # blocked on write (since replica 0 is slow to read from the socket) - after 500 + if {$all_drop != "timeout"} { + # key-load-delay is already throttling the slow + # replica; just wait for the pipe to fill. + after 500 + } else { + # For the timeout subcase, stop the slow reader so it + # reaches repl-timeout during full sync. + pause_process [srv -1 pid] + after 500 + } # add some command to be present in the command stream after the rdb. $master incr $all_drop @@ -948,14 +989,17 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set replicas_alive [lreplace $replicas_alive 0 0] } if {$all_drop == "timeout"} { + # Let one replica hit repl-timeout while the slow reader + # is paused, then restore a generous timeout so the + # remaining replica can finish the streamed RDB. $master config set repl-timeout 2 - # we want the slow replica to hang on a key for very long so it'll reach repl-timeout - pause_process [srv -1 pid] - after 2000 + wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 200 100 + $master config set repl-timeout 60 } - # wait for rdb child to exit - wait_for_condition 500 100 { + # Use a single generous budget for all subcases; successful + # runs still exit early once the child is done. + wait_for_condition 5000 100 { [s -2 rdb_bgsave_in_progress] == 0 } else { fail "rdb child didn't terminate" @@ -972,7 +1016,6 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 } if {$all_drop == "timeout"} { - wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 1 1 wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 # master disconnected the slow replica, remove from array set replicas_alive [lreplace $replicas_alive 0 0] @@ -996,18 +1039,23 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { assert {$master_utime < 70} assert {$master_stime < 70} } - if {!$::no_latency && ($all_drop == "none" || $all_drop == "fast")} { + if {!$::no_latency && ($all_drop == "no" || $all_drop == "fast")} { assert {$master_utime < 15} assert {$master_stime < 15} } } + # In the "no" case both replicas stay alive through the + # full streamed RDB, so on slow TLS runners the final + # ONLINE transition can lag behind child exit. + set replica_online_wait_tries [expr {$all_drop == "no" ? 600 : 150}] + # verify the data integrity foreach replica $replicas_alive { # Wait that replicas acknowledge they are online so # we are sure that DBSIZE and DEBUG DIGEST will not # fail because of timing issues. - wait_for_condition 150 100 { + wait_for_condition $replica_online_wait_tries 100 { [lindex [$replica role] 3] eq {connected} } else { fail "replicas still not connected after some time" @@ -1032,6 +1080,7 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { } } } +} ;# end of valgrind test "diskless replication child being killed is collected" { # when diskless master is waiting for the replica to become writable From b7e268c9b9e43f548bd1e900f7c8ad1661979553 Mon Sep 17 00:00:00 2001 From: hristostaykov-del Date: Fri, 8 May 2026 10:58:15 +0300 Subject: [PATCH 13/17] Add inline cleanup to sentinel CONFIG SET/GET tests (#15174) Test 15-config-set-config-get.tcl was leaving announce-port and announce-hostnames at non-default values, which breaks auto-discovery in subsequent test units. Add reset lines at the end of each test that modifies config. This PR fixes failures in Daily CI tests. (cherry picked from commit d9b03bdb9890684d12df2c3f5e79de18221a54db) --- tests/sentinel/tests/15-config-set-config-get.tcl | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/sentinel/tests/15-config-set-config-get.tcl b/tests/sentinel/tests/15-config-set-config-get.tcl index f9831f8e866..16b30200060 100644 --- a/tests/sentinel/tests/15-config-set-config-get.tcl +++ b/tests/sentinel/tests/15-config-set-config-get.tcl @@ -6,17 +6,22 @@ test "SENTINEL CONFIG SET and SENTINEL CONFIG GET handles multiple variables" { } assert_match {*yes*1234*} [S 1 SENTINEL CONFIG GET resolve-hostnames announce-port] assert_match {announce-port 1234} [S 1 SENTINEL CONFIG GET announce-port] + foreach_sentinel_id id { + S $id SENTINEL CONFIG SET resolve-hostnames no announce-port 0 + } } test "SENTINEL CONFIG GET for duplicate and unknown variables" { assert_equal {OK} [S 1 SENTINEL CONFIG SET resolve-hostnames yes announce-port 1234] assert_match {resolve-hostnames yes} [S 1 SENTINEL CONFIG GET resolve-hostnames resolve-hostnames does-not-exist] + S 1 SENTINEL CONFIG SET resolve-hostnames no announce-port 0 } test "SENTINEL CONFIG GET for patterns" { assert_equal {OK} [S 1 SENTINEL CONFIG SET loglevel notice announce-port 1234 announce-hostnames yes ] assert_match {loglevel notice} [S 1 SENTINEL CONFIG GET log* *level loglevel] assert_match {announce-hostnames yes announce-ip*announce-port 1234} [S 1 SENTINEL CONFIG GET announce*] + S 1 SENTINEL CONFIG SET announce-port 0 announce-hostnames no } test "SENTINEL CONFIG SET duplicate variables" { @@ -36,6 +41,9 @@ test "SENTINEL CONFIG SET, one option does not exist" { } # The announce-port should not be set to 1234 as it was called with a wrong argument assert_match {*111*} [S 1 SENTINEL CONFIG GET announce-port] + foreach_sentinel_id id { + S $id SENTINEL CONFIG SET announce-port 0 + } } test "SENTINEL CONFIG SET, one option with wrong value" { From 75c85faf9ee96b73b072f05994ce8e629db19602 Mon Sep 17 00:00:00 2001 From: Sergei Georgiev Date: Wed, 8 Apr 2026 14:59:22 +0300 Subject: [PATCH 14/17] Fix missing consumer propagation on empty XREADGROUP (#14963) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes consumer replication inconsistency when `XREADGROUP` is called for a new consumer but no `XCLAIM` commands are propagated to the replica. Previously, consumer creation was only propagated to replicas when `noack=true`, relying on `XCLAIM` propagation to implicitly create the consumer in the non-NOACK path. However, if no messages exist to read, no `XCLAIM` is generated, and the consumer is silently lost on the replica. This is a follow-up to the original fix in [redis/redis#7140](https://github.com/redis/redis/issues/7140) / [redis/redis#7526](https://github.com/redis/redis/pull/7526), which introduced `XGROUP CREATECONSUMER` propagation but only for the `NOACK` case. - **`xreadgroupCommand` (src/t_stream.c):** Replaced the `if (noack)` guard around the `streamPropagateConsumerCreation()` call with a deferred check after `streamReplyWithRange()`. Consumer creation is now propagated when `noack || propCount == 0` — that is, only when no `XCLAIM` commands were generated. This avoids redundant propagation in the common case where `XCLAIM` already implicitly creates the consumer on the replica, while correctly handling both the NOACK path (where PEL/XCLAIM is skipped entirely) and the no-messages path (where there is nothing to XCLAIM). - **Test (tests/unit/type/stream-cgroups.tcl):** Added replication test `"XREADGROUP propagates new consumer to replica"` that sets up a master-replica pair and verifies consumer propagation in two cases: (1) without NOACK when no messages are available to deliver, and (2) with NOACK when messages are delivered but XCLAIM is skipped. - **Master-replica consistency:** Consumers created by `XREADGROUP` are now visible on replicas whenever no `XCLAIM` would otherwise create them — covering both the NOACK path and the empty-stream path. - **No redundant propagation:** The noack || propCount == 0 condition avoids emitting a superfluous XGROUP CREATECONSUMER when XCLAIM commands are already propagated and would implicitly create the consumer on the replica. (cherry picked from commit 0be39e503260324013ec1293501d5b5acaf34acf) --- src/t_stream.c | 27 +++++++---- tests/unit/type/stream-cgroups.tcl | 75 +++++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 0c45535d1f1..74a86f59404 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1908,11 +1908,14 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna decrRefCount(argv[6]); } -/* We need this when we want to propagate creation of consumer that was created - * by XREADGROUP with the NOACK option. In that case, the only way to create - * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) +/* Propagate creation of a consumer that was implicitly created by XREADGROUP. + * Called only when no XCLAIM commands were propagated for this consumer, + * since XCLAIM implicitly creates the consumer on the replica. This covers + * two cases: + * (1) NOACK, where the PEL/XCLAIM path is skipped entirely. + * (2) no messages were available to deliver (see #7140). * - * XGROUP CREATECONSUMER + * XGROUP CREATECONSUMER */ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { robj *argv[5]; @@ -2873,6 +2876,7 @@ void xreadCommand(client *c) { int serve_claimed = 0; int serve_synchronously = 0; int serve_history = 0; /* True for XREADGROUP with ID != ">". */ + int consumer_created = 0; streamConsumer *consumer = NULL; /* Unused if XREAD */ streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */ @@ -2932,10 +2936,7 @@ void xreadCommand(client *c) { c->db->id,SCC_DEFAULT); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),o,old_alloc,kvobjAllocSize(o)); - if (noack) - streamPropagateConsumerCreation(c,spi.keyname, - spi.groupname, - consumer->name); + consumer_created = 1; } consumer->seen_time = commandTimeSnapshot(); keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ @@ -2961,6 +2962,7 @@ void xreadCommand(client *c) { flags |= STREAM_RWR_CLAIMED; } + unsigned long propCount = 0; if (serve_synchronously) { arraylen++; if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); @@ -2975,7 +2977,6 @@ void xreadCommand(client *c) { if (c->resp == 2) addReplyArrayLen(c,2); addReplyBulk(c,c->argv[streams_arg+i]); - unsigned long propCount = 0; if (noack) flags |= STREAM_RWR_NOACK; if (serve_history) flags |= STREAM_RWR_HISTORY; old_alloc = kvobjAllocSize(o); @@ -2989,6 +2990,14 @@ void xreadCommand(client *c) { keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ } } + + /* Propagate consumer creation only when no XCLAIM was generated, + * since XCLAIM implicitly creates the consumer on the replica. + * With NOACK the PEL/XCLAIM path is skipped entirely, so we + * always need explicit propagation regardless of propCount. */ + if (consumer_created && (noack || propCount == 0)) { + streamPropagateConsumerCreation(c,spi.keyname, spi.groupname, consumer->name); + } } /* We replied synchronously! Set the top array len and return to caller. */ diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 4990275e218..adc12ff8c88 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -1904,7 +1904,80 @@ start_server { } } } - + + start_server {tags {"repl external:skip" "stream"}} { + # Verify that XREADGROUP propagates a newly created consumer to + # the replica in cases where no XCLAIM is generated (XCLAIM + # implicitly creates the consumer, so explicit propagation is + # only needed when it is absent). Two cases are tested: + # 1. Without NOACK and no messages to deliver — no XCLAIM at all. + # 2. With NOACK and messages delivered — NOACK skips PEL/XCLAIM. + test "XREADGROUP propagates new consumer to replica" { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + start_server {tags {"stream"}} { + set replica [srv 0 client] + + $replica replicaof $master_host $master_port + wait_for_sync $replica + + $master DEL mystream + $master XADD mystream 1-0 f v + $master XGROUP CREATE mystream grp 0 + + # Consume the only message so the stream has no + # new messages pending for delivery. + $master XREADGROUP GROUP grp c1 STREAMS mystream > + $master XACK mystream grp 1-0 + + wait_for_ofs_sync $master $replica + + # Case 1: XREADGROUP without NOACK for a brand-new + # consumer when there are NO messages to deliver. + # No XCLAIM is generated, so the consumer must be + # explicitly propagated. + set reply [$master XREADGROUP GROUP grp c2 STREAMS mystream >] + assert_equal $reply {} + + set master_consumers [$master XINFO CONSUMERS mystream grp] + set master_names [lmap c $master_consumers {dict get $c name}] + assert {[lsearch $master_names "c2"] >= 0} + + wait_for_ofs_sync $master $replica + + set replica_consumers [$replica XINFO CONSUMERS mystream grp] + set replica_names [lmap c $replica_consumers {dict get $c name}] + if {[lsearch $replica_names "c2"] < 0} { + fail "Consumer 'c2' not found on replica (have: $replica_names)" + } + + # Case 2: XREADGROUP with NOACK for a brand-new consumer + # when a message IS available. NOACK skips PEL/XCLAIM + # entirely, so the consumer must be explicitly propagated + # even though messages were delivered. + $master XADD mystream 2-0 f v + wait_for_ofs_sync $master $replica + + set reply [$master XREADGROUP GROUP grp c3 NOACK STREAMS mystream >] + assert {$reply ne {}} + + set master_consumers [$master XINFO CONSUMERS mystream grp] + set master_names [lmap c $master_consumers {dict get $c name}] + assert {[lsearch $master_names "c3"] >= 0} + + wait_for_ofs_sync $master $replica + + set replica_consumers [$replica XINFO CONSUMERS mystream grp] + set replica_names [lmap c $replica_consumers {dict get $c name}] + if {[lsearch $replica_names "c3"] < 0} { + fail "Consumer 'c3' not found on replica (have: $replica_names)" + } + } + } + } + start_server {} { if {!$::force_resp3} { test "XREADGROUP CLAIM field types are correct" { From 99184b355688995716da1a55ae59e88cd0c4445c Mon Sep 17 00:00:00 2001 From: Shubham S Taple <155555100+ShubhamTaple@users.noreply.github.com> Date: Wed, 29 Apr 2026 19:34:06 +0530 Subject: [PATCH 15/17] Fix sharded pubsub unsubscribe lookup using cached command slot (#15094) Fixes #15085 ## Problem getKeySlot() may return `server.current_client->slot` while a command is executing instead of computing the slot from the provided string. The unsubscribe can be triggered by another client, in which case server.current_client is not the client being unsubscribed, so getKeySlot() would return that client's cached slot. Using this wrong slot would make the lookup in type.serverPubSubChannels miss the channel and ultimately trigger the assertion below. ## Fix Always use keyHashSlot() instead of getKeySlot() on unsubscribe. --------- Co-authored-by: debing.sun (cherry picked from commit 0bbb196c4612aec7ae2c66c31e9bcd5bcd77ad9c) --- src/pubsub.c | 5 ++++- tests/unit/cluster/sharded-pubsub.tcl | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/pubsub.c b/src/pubsub.c index 7199be1e06c..b9198d26391 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -293,7 +293,10 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty retval = 1; /* Remove the client from the channel -> clients list hash table */ if (server.cluster_enabled && type.shard) { - slot = getKeySlot(channel->ptr); + /* Compute the slot from the channel directly instead of using getKeySlot(), + * because the unsubscribe may be triggered by a different client, and + * getKeySlot() would return the cached slot of that client. */ + slot = keyHashSlot(channel->ptr, sdslen(channel->ptr)); } de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); serverAssertWithInfo(c,NULL,de != NULL); diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index 57b550ab727..5f78b7f0fd2 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -64,4 +64,31 @@ start_cluster 1 1 {tags {external:skip cluster}} { catch {[$replica EXEC]} err assert_match {EXECABORT*} $err } + + # Regression: shard channel slot must not follow getKeySlot() current_client + # cache when CLIENT KILL runs inside another client's EXEC (pubsubUnsubscribeChannel). + test {Shard pubsub: CLIENT KILL subscriber inside MULTI/EXEC (cross-slot)} { + # SET fixes the transaction client's slot to keyk's slot; the subscriber must + # use a shard channel in a different slot so a wrong-slot lookup would fail. + set keyk "{06S}k" + set channel "{Qi}ch" + assert {[R 0 cluster keyslot $channel] != [R 0 cluster keyslot $keyk]} + + set rd_sub [redis_deferring_client] + $rd_sub client id + set cid [$rd_sub read] + $rd_sub ssubscribe $channel + $rd_sub read + + $primary multi + $primary set $keyk v + $primary client kill id $cid + set got [$primary exec] + + assert_equal {OK 1} $got + assert_equal PONG [$primary ping] + + catch {$rd_sub read} + $rd_sub close + } } From 99b910380953b83c94a3004a4e81bd30c4d7211d Mon Sep 17 00:00:00 2001 From: Leenear Date: Sun, 10 May 2026 09:01:08 +0400 Subject: [PATCH 16/17] Fix incorrect memmove size in LDB breakpoint deletion (#15115) # Description There is an array corruption bug in LDB caused by an incorrect size argument being passed to `memmove()` inside the `ldbDelBreakpoint()` function. When deleting a breakpoint, `memmove()` is used to shift the remaining breakpoints in the ldb.bp integer array forward. However, the size parameter passes the number of elements rather than the number of bytes. Because ldb.bp is an array of type `int`, this results in an under-copy. (cherry picked from commit bf432c98fd36138a89b2cee20d51be6dcfc734b9) --- src/eval.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eval.c b/src/eval.c index cec8c162568..0edea5ddd74 100644 --- a/src/eval.c +++ b/src/eval.c @@ -1027,7 +1027,7 @@ int ldbDelBreakpoint(int line) { for (j = 0; j < ldb.bpcount; j++) { if (ldb.bp[j] == line) { ldb.bpcount--; - memmove(ldb.bp+j,ldb.bp+j+1,ldb.bpcount-j); + memmove(ldb.bp+j,ldb.bp+j+1,(ldb.bpcount-j) * sizeof(int)); return 1; } } From 8ee4d4536c2fa350b9cd84848f6ecf8325b99c5e Mon Sep 17 00:00:00 2001 From: "h.o.t. neglected" Date: Sun, 10 May 2026 22:53:18 -0400 Subject: [PATCH 17/17] Fix MULTI queue memory accounting in multiStateMemOverhead (#15163) PR #14440 changed `mstate.commands` from an array of `multiCmd` structs to an array of `pendingCommand` pointers. This PR fixes the overhead calculation in multiStateMemOverhead to account for both the pointer array and the actual structs: - The pointer array: `alloc_count * sizeof(pendingCommand*)` - The individually allocated structs: `count * sizeof(pendingCommand)` (cherry picked from commit 9302d2788936eea4710792a499d7ce43cfec8806) --- src/multi.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/multi.c b/src/multi.c index cd8783d20e7..2b900bcbd14 100644 --- a/src/multi.c +++ b/src/multi.c @@ -504,6 +504,7 @@ size_t multiStateMemOverhead(client *c) { /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); /* Reserved memory for queued multi commands. */ - mem += c->mstate.alloc_count * sizeof(pendingCommand); + mem += c->mstate.alloc_count * sizeof(pendingCommand*); + mem += c->mstate.count * sizeof(pendingCommand); return mem; }