diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 0e478bb771f..5cc49c75711 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -52,7 +52,7 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror -DREDIS_TEST' + run: make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERTIONS' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test diff --git a/modules/redisearch/Makefile b/modules/redisearch/Makefile index c3990b6865f..fc2dc84b06f 100644 --- a/modules/redisearch/Makefile +++ b/modules/redisearch/Makefile @@ -3,5 +3,10 @@ MODULE_VERSION = v8.6.8 MODULE_REPO = https://github.com/redisearch/redisearch TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/search-community/redisearch.so + # Set INLINE_LSE_ATOMICS=1 for perf improvement on common ARM CPUs (i.e. Graviton2/3/4); no effect on x86 or macOS. + # Default 0 keeps the binary runnable on pre-Armv8.1-a cores (Cortex-A72, Graviton1, RPi4) that would otherwise SIGILL at module load. +INLINE_LSE_ATOMICS ?= 0 +export INLINE_LSE_ATOMICS + include ../common.mk diff --git a/modules/vector-sets/test.py b/modules/vector-sets/test.py index 8d56d582766..0b512333641 100755 --- a/modules/vector-sets/test.py +++ b/modules/vector-sets/test.py @@ -96,10 +96,10 @@ def __init__(self, primary_port=6379, replica_port=6380): self.error_details = None self.test_key = f"test:{self.__class__.__name__.lower()}" # Primary Redis instance - self.redis = redis.Redis(port=primary_port,db=9) + self.redis = redis.Redis(port=primary_port,protocol=2,db=9) self.redis3 = redis.Redis(port=primary_port,protocol=3,db=9) # Replica Redis instance - self.replica = redis.Redis(port=replica_port,db=9) + self.replica = redis.Redis(port=replica_port,protocol=2,db=9) # Replication status self.replication_setup = False # Ports diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 044299e6d4a..63bb224534c 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -536,7 +536,7 @@ size_t asmGetImportInputBufferSize(void) { return 0; } -size_t asmGetMigrateOutputBufferSize(void) { +size_t asmGetMigrateOutputMemoryUsage(void) { if (!asmManager || listLength(asmManager->tasks) == 0) return 0; asmTask *task = listNodeValue(listFirst(asmManager->tasks)); diff --git a/src/cluster_asm.h b/src/cluster_asm.h index 841a55d76ab..d836dc6f24c 100644 --- a/src/cluster_asm.h +++ b/src/cluster_asm.h @@ -33,7 +33,7 @@ struct slotRangeArray *asmTaskGetSlotRanges(const char *task_id); int asmNotifyConfigUpdated(struct asmTask *task, sds *err); size_t asmGetPeakSyncBufferSize(void); size_t asmGetImportInputBufferSize(void); -size_t asmGetMigrateOutputBufferSize(void); +size_t asmGetMigrateOutputMemoryUsage(void); int clusterAsmCancel(const char *task_id, const char *reason); int clusterAsmCancelBySlot(int slot, const char *reason); int clusterAsmCancelBySlotRangeArray(struct slotRangeArray *slots, const char *reason); diff --git a/src/commands.def b/src/commands.def index fda31af2a8c..b90587aa839 100644 --- a/src/commands.def +++ b/src/commands.def @@ -11830,7 +11830,7 @@ struct COMMAND_STRUCT redisCommandTable[] = { {MAKE_CMD("pfadd","Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.","O(1) to add every element.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFADD_History,0,PFADD_Tips,0,pfaddCommand,-2,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HYPERLOGLOG,PFADD_Keyspecs,1,NULL,2),.args=PFADD_Args}, {MAKE_CMD("pfcount","Returns the approximated cardinality of the set(s) observed by the HyperLogLog key(s).","O(1) with a very small average constant time when called with a single key. O(N) with N being the number of keys, and much bigger constant times, when called with multiple keys.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFCOUNT_History,0,PFCOUNT_Tips,0,pfcountCommand,-2,CMD_READONLY|CMD_MAY_REPLICATE,ACL_CATEGORY_HYPERLOGLOG,PFCOUNT_Keyspecs,1,NULL,1),.args=PFCOUNT_Args}, {MAKE_CMD("pfdebug","Internal commands for debugging HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFDEBUG_History,0,PFDEBUG_Tips,0,pfdebugCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFDEBUG_Keyspecs,1,NULL,2),.args=PFDEBUG_Args}, -{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,NULL,2),.args=PFMERGE_Args}, +{MAKE_CMD("pfmerge","Merges one or more HyperLogLog values into a single key.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,0,PFMERGE_Tips,0,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,PFMERGE_Keyspecs,2,pfmergeGetKeys,2),.args=PFMERGE_Args}, {MAKE_CMD("pfselftest","An internal command for testing HyperLogLog values.","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFSELFTEST_History,0,PFSELFTEST_Tips,0,pfselftestCommand,1,CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG,PFSELFTEST_Keyspecs,0,NULL,0)}, /* list */ {MAKE_CMD("blmove","Pops an element from a list, pushes it to another list and returns it. Blocks until an element is available otherwise. Deletes the list if the last element was moved.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,BLMOVE_History,0,BLMOVE_Tips,0,blmoveCommand,6,CMD_WRITE|CMD_DENYOOM|CMD_BLOCKING,ACL_CATEGORY_LIST,BLMOVE_Keyspecs,2,NULL,5),.args=BLMOVE_Args}, diff --git a/src/commands/memory-stats.json b/src/commands/memory-stats.json index 0e95e0f36db..4098c1b36dd 100644 --- a/src/commands/memory-stats.json +++ b/src/commands/memory-stats.json @@ -38,6 +38,12 @@ "clients.normal": { "type": "integer" }, + "clients.normal.shared": { + "type": "integer" + }, + "clients.normal.unshared": { + "type": "integer" + }, "cluster.links": { "type": "integer" }, diff --git a/src/commands/pfmerge.json b/src/commands/pfmerge.json index c93070f1167..e3e26fabd84 100644 --- a/src/commands/pfmerge.json +++ b/src/commands/pfmerge.json @@ -6,6 +6,7 @@ "since": "2.8.9", "arity": -2, "function": "pfmergeCommand", + "get_keys_function": "pfmergeGetKeys", "command_flags": [ "WRITE", "DENYOOM" diff --git a/src/db.c b/src/db.c index 4dcd7f09f26..fbc81c46a3b 100644 --- a/src/db.c +++ b/src/db.c @@ -1873,7 +1873,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { * COUNT, so if the hash table is in a pathological state (very * sparsely populated) we avoid to block too much time at the cost * of returning no or very few elements. */ - long maxiterations = count*10; + long maxiterations = (count > LONG_MAX / 10) ? LONG_MAX : count * 10; /* We pass scanData which have three pointers to the callback: * 1. data.keys: the list to which it will add new elements; @@ -3643,6 +3643,29 @@ int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult * return result->numkeys; } +int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int i, numkeys; + keyReference *keys; + UNUSED(cmd); + UNUSED(argv); + + numkeys = argc - 1; /* destkey + all sourcekeys */ + keys = getKeysPrepareResult(result, numkeys); + + /* destkey at argv[1] */ + keys[0].pos = 1; + keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_INSERT; + + /* sourcekeys at argv[2..argc-1], may be zero */ + for (i = 2; i < argc; i++) { + keys[i - 1].pos = i; + keys[i - 1].flags = CMD_KEY_RO | CMD_KEY_ACCESS; + } + + result->numkeys = numkeys; + return result->numkeys; +} + /* This command declares incomplete keys, so the flags are correctly set for this function */ int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { int i, j, num, first; diff --git a/src/eval.c b/src/eval.c index cec8c162568..0edea5ddd74 100644 --- a/src/eval.c +++ b/src/eval.c @@ -1027,7 +1027,7 @@ int ldbDelBreakpoint(int line) { for (j = 0; j < ldb.bpcount; j++) { if (ldb.bp[j] == line) { ldb.bpcount--; - memmove(ldb.bp+j,ldb.bp+j+1,ldb.bpcount-j); + memmove(ldb.bp+j,ldb.bp+j+1,(ldb.bpcount-j) * sizeof(int)); return 1; } } diff --git a/src/evict.c b/src/evict.c index e287edec6e3..50037c6891d 100644 --- a/src/evict.c +++ b/src/evict.c @@ -349,7 +349,7 @@ size_t freeMemoryGetNotCountedMemory(void) { /* The migrate client is like a replica, we also push DELs into it when * evicting keys belonging to the migrating slot, so we don't count its * output buffer to avoid eviction loop. */ - overhead += asmGetMigrateOutputBufferSize(); + overhead += asmGetMigrateOutputMemoryUsage(); if (server.aof_state != AOF_OFF) { overhead += sdsAllocSize(server.aof_buf); diff --git a/src/keymeta.c b/src/keymeta.c index 530ea049300..829458daec7 100644 --- a/src/keymeta.c +++ b/src/keymeta.c @@ -416,7 +416,7 @@ int rdbLoadSkipMetaIfAllowed(rio *rdb, char *cname, int flags) { * * Note: rdbLoadCheckModuleValue() reads opcodes until it finds RDB_MODULE_OPCODE_EOF, * so it consumes the EOF marker as well. We don't need to read it separately. */ - robj *dummy = rdbLoadCheckModuleValue(rdb, cname); + robj *dummy = rdbLoadCheckModuleValue(rdb, cname, 1); if (dummy == NULL) { serverLog(LL_WARNING, "Corrupted metadata value for class '%s'", cname); return -1; diff --git a/src/module.c b/src/module.c index 3a2715e7f90..3982bf22b3a 100644 --- a/src/module.c +++ b/src/module.c @@ -666,7 +666,7 @@ void moduleReleaseTempClient(client *c) { } clearClientConnectionState(c); listEmpty(c->reply); - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; c->duration = 0; resetClient(c, -1); serverAssert(c->all_argv_len_sum == 0); @@ -9492,7 +9492,7 @@ void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisM if (prev) prev->next = r->next; else - clusterReceivers[type]->next = r->next; + clusterReceivers[type] = r->next; /* Update the head */ zfree(r); } return; diff --git a/src/multi.c b/src/multi.c index cd8783d20e7..2b900bcbd14 100644 --- a/src/multi.c +++ b/src/multi.c @@ -504,6 +504,7 @@ size_t multiStateMemOverhead(client *c) { /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); /* Reserved memory for queued multi commands. */ - mem += c->mstate.alloc_count * sizeof(pendingCommand); + mem += c->mstate.alloc_count * sizeof(pendingCommand*); + mem += c->mstate.count * sizeof(pendingCommand); return mem; } diff --git a/src/networking.c b/src/networking.c index 0230406a32f..625ac0615cf 100644 --- a/src/networking.c +++ b/src/networking.c @@ -37,6 +37,7 @@ static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten); static inline int _writeToClientSlave(client *c, ssize_t *nwritten); static pendingCommand *acquirePendingCommand(void); static void reclaimPendingCommand(client *c, pendingCommand *pcmd); +static size_t getClientOutputBufferLogicalSize(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_reusable_qb = NULL; @@ -212,7 +213,7 @@ client *createClient(connection *conn) { c->main_ch_client_id = 0; c->reply = listCreate(); c->deferred_reply_errors = NULL; - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); @@ -363,7 +364,7 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ -static int tryAddPayload(char *buf, size_t *used, size_t size, uint8_t type, const void *payload, size_t len) { +static int tryAddPayload(client *c, char *buf, size_t *used, size_t size, uint8_t type, const void *payload, size_t len) { if (*used + sizeof(payloadHeader) + len > size) return 0; /* Start a new payload chunk */ @@ -372,6 +373,13 @@ static int tryAddPayload(char *buf, size_t *used, size_t size, uint8_t type, con header->payload_len = len; memcpy((char *)header + sizeof(payloadHeader), payload, len); *used += sizeof(payloadHeader) + len; + + /* Track referenced reply bytes for copy avoidance. */ + if (type == BULK_STR_REF) { + const bulkStrRef *str_ref = (const bulkStrRef *)payload; + c->reply_bytes_shared += sdslen(str_ref->obj->ptr); + } + return 1; } @@ -391,8 +399,11 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl if (tail) { if (unlikely(tail->buf_encoded)) { /* Try to add to encoded buffer */ - if (tryAddPayload(tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)) { - len = 0; + if (tryAddPayload(c, tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)) { + /* For BULK_STR_REF payloads, tryAddPayload updates shared reply bytes + * which accounts for referenced strings. */ + if (encoded) closeClientOnOutputBufferLimitReached(c, 1); + return; } } else if (!encoded) { /* Both tail and new payload are non-encoded, can append directly */ @@ -420,7 +431,7 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl tail->used = 0; tail->buf_encoded = encoded; if (tail->buf_encoded) { - serverAssert(tryAddPayload(tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)); + serverAssert(tryAddPayload(c, tail->buf, &tail->used, tail->size, payload_type, (void *)payload, len)); } else { tail->used = len; memcpy(tail->buf, payload, len); @@ -452,7 +463,7 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le size_t available = c->buf_usable_size - c->bufpos; size_t reply_len = min(available, len); if (c->buf_encoded) { - if (!tryAddPayload(c->buf, &c->bufpos, c->buf_usable_size, payload_type, payload, len)) + if (!tryAddPayload(c, c->buf, &c->bufpos, c->buf_usable_size, payload_type, payload, len)) return 0; reply_len = len; } else { @@ -468,18 +479,27 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le /* Adds bulk string reference (i.e. pointer to object and pointer to string itself) to static buffer * Returns non-zero value if succeeded to add */ static size_t _addBulkStrRefToBuffer(client *c, const void *payload, size_t len) { + size_t result; if (!c->buf_encoded) { /* If buffer is plain and not empty then can't add bulk string reference to it */ if (c->bufpos) return 0; c->buf_encoded = 1; /* Set c->buf to encoded mode to allow bulk string reference to be stored in it */ - size_t result = _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); + result = _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); if (!result) { /* Failed to add bulk string reference to buffer, need to revert to plain mode. */ c->buf_encoded = 0; + return 0; } - return result; + } else { + result = _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); + if (!result) return 0; } - return _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); + + /* Even though the bulk string is stored by reference and the underlying + * memory is shared, we still account this shared memory towards this + * client's output buffer usage, so we need to check the output buffer limits. */ + closeClientOnOutputBufferLimitReached(c, 1); + return result; } void _addReplyToBufferOrList(client *c, const char *s, size_t len) { @@ -1472,6 +1492,7 @@ void AddReplyFromClient(client *dst, client *src) { /* Concatenate the reply list into the dest */ if (listLength(src->reply)) listJoin(dst->reply,src->reply); + serverAssert(src->reply_bytes_shared == 0); /* It is non-normal client, never has references. */ dst->reply_bytes += src->reply_bytes; src->reply_bytes = 0; src->bufpos = 0; @@ -1941,6 +1962,72 @@ void tryUnlinkClientFromPendingRefReply(client *c, int force) { } } +/* Count bytes in an encoded buffer where the client holds the last remaining + * reference to the underlying string object (refcount == 1), meaning the key + * has been deleted from the keyspace and only this client buffer keeps the + * memory alive. + * + * Note: when multiple clients share a reference to the same object, + * the object's refcount stays above 1 even after the key is deleted. In that + * case none of those clients will be counted here, so the shared memory is + * under-reported until all but one client has consumed its copy. */ +static size_t computeUnsharedReplyBytes(char *buf, size_t bufpos) { + size_t total = 0; + char *ptr = buf; + while (ptr < buf + bufpos) { + payloadHeader *header = (payloadHeader *)ptr; + ptr += sizeof(payloadHeader); + if (header->payload_type == BULK_STR_REF) { + bulkStrRef *str_ref = (bulkStrRef *)ptr; + if (str_ref->obj != NULL && str_ref->obj->refcount == 1) + total += sdslen(str_ref->obj->ptr); + } + ptr += header->payload_len; + } + return total; +} + +/* Update the client's unshared reply memory (solely owned). */ +void updateClientUnsharedReplyBytes(client *c) { + c->reply_bytes_unshared = 0; + + /* No shared memory means no unshared memory either. */ + if (c->reply_bytes_shared == 0) return; + + /* Scan the static output buffer. */ + if (c->buf_encoded) + c->reply_bytes_unshared += computeUnsharedReplyBytes(c->buf, c->bufpos); + + /* Scan each block in the reply list. */ + listIter reply_li; + listNode *reply_ln; + listRewind(c->reply, &reply_li); + while ((reply_ln = listNext(&reply_li))) { + clientReplyBlock *block = listNodeValue(reply_ln); + if (block == NULL) continue; /* deferred-length placeholder */ + if (block->buf_encoded) + c->reply_bytes_unshared += computeUnsharedReplyBytes(block->buf, block->used); + } +} + +/* Compute shared reply memory: total shared reply bytes and the unshared subset where the key + * has been deleted and the client buffer is the sole holder. */ +void getClientsSharedMemoryUsage(size_t *shared_mem, size_t *unshared_mem) { + listNode *ln; + listIter li; + listRewind(server.clients_with_pending_ref_reply, &li); + while ((ln = listNext(&li))) { + client *c = listNodeValue(ln); + + /* Total shared reply bytes (logical size, shared with keyspace). */ + *shared_mem += c->reply_bytes_shared; + + /* Unshared reply bytes: the client is the sole owner because the key was deleted. */ + updateClientUnsharedReplyBytes(c); + *unshared_mem += c->reply_bytes_unshared; + } +} + /* Clear the client state to resemble a newly connected client. */ void clearClientConnectionState(client *c) { listNode *ln; @@ -2035,6 +2122,7 @@ static void releaseBufReferences(client *c, char *buf, size_t bufpos) { bulkStrRef *str_ref = (bulkStrRef *)ptr; /* Only release if not already released. */ if (str_ref->obj != NULL) { + c->reply_bytes_shared -= sdslen(str_ref->obj->ptr); if (in_io_thread) ioDeferFreeRobj(c, str_ref->obj); else @@ -2451,6 +2539,7 @@ static payloadHeader *processSentDataInEncodedBuffer(client *c, char *start_ptr, return head; } *remaining -= (writen_len - *sentlen); + c->reply_bytes_shared -= sdslen(str_ref->obj->ptr); if (in_io_thread) { ioDeferFreeRobj(c, str_ref->obj); } else { @@ -2609,7 +2698,7 @@ static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten) { /* If there are no longer objects in the list, we expect * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); + serverAssert(c->reply_bytes == 0 && c->reply_bytes_shared == 0); } else if (c->bufpos > 0) { /* For encoded buffers, we need to use writev to handle bulk string references */ if (c->buf_encoded) { @@ -3980,8 +4069,12 @@ sds catClientInfoString(sds s, client *client) { } *p = '\0'; + /* Refresh the cached unshared reply bytes before computing memory stats below. */ + updateClientUnsharedReplyBytes(client); + /* Compute the total memory consumed by this client. */ - size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); + size_t obufmem = getClientOutputBufferLogicalSize(client); + size_t total_mem = getClientMemoryUsage(client); size_t used_blocks_of_repl_buf = 0; if (client->ref_repl_buf_node) { @@ -4013,8 +4106,10 @@ sds catClientInfoString(sds s, client *client) { " rbp=%U", (unsigned long long) client->buf_peak, " obl=%U", (unsigned long long) client->bufpos, " oll=%U", (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf, - " omem=%U", (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ - " tot-mem=%U", (unsigned long long) total_mem, + " omem=%U", (unsigned long long) obufmem, /* logical output buffer memory (includes shared memory; excludes client->buf so static clients show 0) */ + " omem-shared=%U", (unsigned long long) client->reply_bytes_shared, /* shared memory (not solely owned by this client) */ + " omem-unshared=%U", (unsigned long long) client->reply_bytes_unshared, /* unshared memory (solely owned by this client) */ + " tot-mem=%U", (unsigned long long) total_mem, /* actual memory usage (includes unshared memory, excludes shared memory) */ " events=%s", events, " cmd=%s", client->lastcmd ? client->lastcmd->fullname : "NULL", " user=%s", client->user ? client->user->name : "(superuser)", @@ -5035,11 +5130,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { /* This function returns the number of bytes that Redis is * using to store the reply still not read by the client. + * It does NOT include any referenced bytes (neither shared nor unshared). * * Note: this function is very fast so can be called as many time as * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ -size_t getClientOutputBufferMemoryUsage(client *c) { +static size_t getClientOutputBufferAllocSize(client *c) { if (unlikely(clientTypeIsSlave(c))) { size_t repl_buf_size = 0; size_t repl_node_num = 0; @@ -5057,22 +5153,38 @@ size_t getClientOutputBufferMemoryUsage(client *c) { } } +/* Returns the logical output buffer size for limit enforcement. + * This includes all shared memory (shared with the keyspace), ensuring that + * a client requesting huge amounts of data via copy-avoidance is still + * subject to output buffer limits. */ +static size_t getClientOutputBufferLogicalSize(client *c) { + size_t mem = getClientOutputBufferAllocSize(c); + if (!clientTypeIsSlave(c)) + mem += c->reply_bytes_shared; + return mem; +} + +/* Returns the actual memory used to store the reply not yet read by the client. + * This includes unshared memory (solely owned by this client), which would be + * freed when the client disconnects. */ +size_t getClientOutputBufferMemoryUsage(client *c) { + size_t mem = getClientOutputBufferAllocSize(c); + mem += c->reply_bytes_unshared; + return mem; +} + size_t getNormalClientPendingReplyBytes(client *c) { serverAssert(!clientTypeIsSlave(c)); - if (listLength(c->reply) == 0) return c->bufpos; + if (listLength(c->reply) == 0) return c->bufpos + c->reply_bytes_shared; clientReplyBlock *block = listNodeValue(listLast(c->reply)); - return (c->reply_bytes - block->size + block->used) + c->bufpos; + return (c->reply_bytes + c->reply_bytes_shared - block->size + block->used) + c->bufpos; } -/* Returns the total client's memory usage. - * Optionally, if output_buffer_mem_usage is not NULL, it fills it with - * the client output buffer memory usage portion of the total. */ -size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { +/* Returns the total client's memory usage. */ +size_t getClientMemoryUsage(client *c) { size_t mem = getClientOutputBufferMemoryUsage(c); - if (output_buffer_mem_usage != NULL) - *output_buffer_mem_usage = mem; mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; @@ -5150,7 +5262,7 @@ char *getClientTypeName(int class) { * Otherwise zero is returned. */ int checkClientOutputBufferLimits(client *c) { int soft = 0, hard = 0, class; - unsigned long used_mem = getClientOutputBufferMemoryUsage(c); + unsigned long used_mem = getClientOutputBufferLogicalSize(c); /* For unauthenticated clients the output buffer is limited to prevent * them from abusing it by not reading the replies */ @@ -5214,10 +5326,10 @@ int checkClientOutputBufferLimits(client *c) { * Returns 1 if client was (flagged) closed. */ int closeClientOnOutputBufferLimitReached(client *c, int async) { if (!c->conn) return 0; /* It is unsafe to free fake clients. */ - serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); + serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); /* actual memory only, logical memory may exceed SIZE_MAX */ /* Note that c->reply_bytes is irrelevant for replica clients * (they use the global repl buffers). */ - if ((c->reply_bytes == 0 && !clientTypeIsSlave(c)) || + if ((c->reply_bytes == 0 && c->reply_bytes_shared == 0 && !clientTypeIsSlave(c)) || c->flags & CLIENT_CLOSE_ASAP) return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); diff --git a/src/object.c b/src/object.c index d4915f2d675..31e12ca1baf 100644 --- a/src/object.c +++ b/src/object.c @@ -1327,6 +1327,9 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mem_total += mh->repl_backlog; mem_total += mh->clients_slaves; + /* Compute shared/unshared reply memory. */ + getClientsSharedMemoryUsage(&mh->clients_normal_shared, &mh->clients_normal_unshared); + /* Computing the memory used by the clients would be O(N) if done * here online. We use our values computed incrementally by * updateClientMemoryUsage(). */ @@ -1357,7 +1360,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { /* Cluster atomic slot migration buffers. */ mh->asm_import_input_buffer = asmGetImportInputBufferSize(); - mh->asm_migrate_output_buffer = asmGetMigrateOutputBufferSize(); + mh->asm_migrate_output_buffer = asmGetMigrateOutputMemoryUsage(); mem_total += mh->asm_import_input_buffer; mem_total += mh->asm_migrate_output_buffer; @@ -1682,7 +1685,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) { struct redisMemOverhead *mh = getMemoryOverheadData(); - addReplyMapLen(c,33+mh->num_dbs); + addReplyMapLen(c,35+mh->num_dbs); addReplyBulkCString(c,"peak.allocated"); addReplyLongLong(c,mh->peak_allocated); @@ -1705,6 +1708,12 @@ NULL addReplyBulkCString(c,"clients.normal"); addReplyLongLong(c,mh->clients_normal); + addReplyBulkCString(c,"clients.normal.shared"); + addReplyLongLong(c,mh->clients_normal_shared); + + addReplyBulkCString(c,"clients.normal.unshared"); + addReplyLongLong(c,mh->clients_normal_unshared); + addReplyBulkCString(c,"cluster.links"); addReplyLongLong(c,mh->cluster_links); diff --git a/src/pubsub.c b/src/pubsub.c index 7199be1e06c..b9198d26391 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -293,7 +293,10 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty retval = 1; /* Remove the client from the channel -> clients list hash table */ if (server.cluster_enabled && type.shard) { - slot = getKeySlot(channel->ptr); + /* Compute the slot from the channel directly instead of using getKeySlot(), + * because the unsubscribe may be triggered by a different client, and + * getKeySlot() would return the cached slot of that client. */ + slot = keyHashSlot(channel->ptr, sdslen(channel->ptr)); } de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); serverAssertWithInfo(c,NULL,de != NULL); diff --git a/src/rdb.c b/src/rdb.c index ac66fcec060..5766335e257 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1919,11 +1919,18 @@ void rdbRemoveTempFile(pid_t childpid, int from_signal) { /* This function is called by rdbLoadObject() when the code is in RDB-check * mode and we find a module value of type 2 that can be parsed without - * the need of the actual module. The value is parsed for errors, finally - * a dummy redis object is returned just to conform to the API. */ -robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { + * the need of the actual module. The value is parsed for errors. + * If null_on_error is true, NULL is returned when data corruption is detected; + * otherwise a dummy redis object is always returned regardless of success or + * failure. */ +robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename, int null_on_error) { uint64_t opcode; while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) { + if (opcode == RDB_LENERR) { + rdbReportCorruptRDB("Error reading module opcode length from module %s value", modulename); + goto error; + } + if (opcode == RDB_MODULE_OPCODE_SINT || opcode == RDB_MODULE_OPCODE_UINT) { @@ -1931,12 +1938,14 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { if (rdbLoadLenByRef(rdb,NULL,&len) == -1) { rdbReportCorruptRDB( "Error reading integer from module %s value", modulename); + goto error; } } else if (opcode == RDB_MODULE_OPCODE_STRING) { robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); if (o == NULL) { rdbReportCorruptRDB( "Error reading string from module %s value", modulename); + goto error; } decrRefCount(o); } else if (opcode == RDB_MODULE_OPCODE_FLOAT) { @@ -1944,16 +1953,24 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { if (rdbLoadBinaryFloatValue(rdb,&val) == -1) { rdbReportCorruptRDB( "Error reading float from module %s value", modulename); + goto error; } } else if (opcode == RDB_MODULE_OPCODE_DOUBLE) { double val; if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) { rdbReportCorruptRDB( "Error reading double from module %s value", modulename); + goto error; } + } else { + rdbReportCorruptRDB( + "Unknown module opcode %llu reading module %s value", (unsigned long long)opcode, modulename); + goto error; } } return createStringObject("module-dummy-value",18); +error: + return null_on_error ? NULL : createStringObject("module-dummy-value",18); } /* Load object type and optional key metadata (into `keymeta`) from RDB stream. @@ -3428,7 +3445,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) if (rdbCheckMode) { char name[10]; moduleTypeNameByID(name,moduleid); - return rdbLoadCheckModuleValue(rdb,name); + return rdbLoadCheckModuleValue(rdb, name, 0); } if (mt == NULL) { @@ -3879,7 +3896,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin continue; } else { /* RDB check mode. */ - robj *aux = rdbLoadCheckModuleValue(rdb,name); + robj *aux = rdbLoadCheckModuleValue(rdb, name, 0); decrRefCount(aux); continue; /* Read next opcode. */ } diff --git a/src/rdb.h b/src/rdb.h index a020ff62cef..c379ff007f8 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -150,7 +150,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid); ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); -robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename); +robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename, int null_on_error); int rdbResolveKeyType(rio *rdb, int *type, int dbid, KeyMetaSpec *keymeta); robj *rdbLoadStringObject(rio *rdb); ssize_t rdbSaveStringObject(rio *rdb, robj *obj); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 11781dd61b7..9624bf3ff6f 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -254,7 +254,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { uint32_t classSpec; if (rioRead(&rdb, &classSpec, 4) == 0) goto eoferr; /* Skip module value using rdbLoadCheckModuleValue */ - robj *o = rdbLoadCheckModuleValue(&rdb, "metadata"); + robj *o = rdbLoadCheckModuleValue(&rdb, "metadata", 1); if (o == NULL) goto eoferr; decrRefCount(o); } @@ -324,7 +324,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { moduleTypeNameByID(name,moduleid); rdbCheckInfo("MODULE AUX for: %s", name); - robj *o = rdbLoadCheckModuleValue(&rdb,name); + robj *o = rdbLoadCheckModuleValue(&rdb, name, 0); decrRefCount(o); continue; /* Read type again. */ } else if (type == RDB_OPCODE_FUNCTION_PRE_GA) { diff --git a/src/replication.c b/src/replication.c index ab2c88f639c..c7c1e284aab 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4472,7 +4472,7 @@ void replicationCacheMaster(client *c) { if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; c->bufpos = 0; resetClient(c, -1); resetClientQbufState(c); diff --git a/src/script_lua.c b/src/script_lua.c index dbb60a39cd0..f6b61952b3f 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -961,7 +961,7 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) { ldbLogRedisReply(reply); if (reply != c->buf) sdsfree(reply); - c->reply_bytes = 0; + c->reply_bytes = c->reply_bytes_shared = c->reply_bytes_unshared = 0; cleanup: /* Clean up. Command code may have changed argv/argc so we use the diff --git a/src/sentinel.c b/src/sentinel.c index f6a1f75bd3e..372e4b64067 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -458,6 +458,25 @@ const char *preMonitorCfgName[] = { "announce-hostnames" }; +/* Returns 1 if the string contains control characters (0x00-0x1F or 0x7F), + * which must be rejected to prevent config injection via newlines/etc. */ +int sentinelStringContainsControlChars(sds s) { + for (size_t i = 0; i < sdslen(s); i++) { + unsigned char c = (unsigned char)s[i]; + if (c < 0x20 || c == 0x7F) return 1; + } + return 0; +} + +/* Append an sds value to dest, quoting it with sdscatrepr only if the value + * contains characters that need escaping (spaces, quotes, control chars, etc.). + * Simple values are appended as-is, preserving the traditional config format. */ +static sds sentinelSdscatConfigArg(sds dest, sds value) { + if (sdsneedsrepr(value)) + return sdscatrepr(dest, value, sdslen(value)); + return sdscatsds(dest, value); +} + /* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ void initSentinelConfig(void) { @@ -2048,8 +2067,13 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel monitor */ master = dictGetVal(de); master_addr = sentinelGetCurrentMasterAddress(master); + + /* Pre-compute the safely-formatted master name for config serialization. + * Only quoted if it contains characters requiring escaping. */ + sds qname = sentinelSdscatConfigArg(sdsempty(), master->name); + line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d", - master->name, announceSentinelAddr(master_addr), master_addr->port, + qname, announceSentinelAddr(master_addr), master_addr->port, master->quorum); rewriteConfigRewriteLine(state,"sentinel monitor",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2058,7 +2082,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->down_after_period != sentinel_default_down_after) { line = sdscatprintf(sdsempty(), "sentinel down-after-milliseconds %s %ld", - master->name, (long) master->down_after_period); + qname, (long) master->down_after_period); rewriteConfigRewriteLine(state,"sentinel down-after-milliseconds",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2067,7 +2091,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->failover_timeout != sentinel_default_failover_timeout) { line = sdscatprintf(sdsempty(), "sentinel failover-timeout %s %ld", - master->name, (long) master->failover_timeout); + qname, (long) master->failover_timeout); rewriteConfigRewriteLine(state,"sentinel failover-timeout",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2077,42 +2101,38 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) { line = sdscatprintf(sdsempty(), "sentinel parallel-syncs %s %d", - master->name, master->parallel_syncs); + qname, master->parallel_syncs); rewriteConfigRewriteLine(state,"sentinel parallel-syncs",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } /* sentinel notification-script */ if (master->notification_script) { - line = sdscatprintf(sdsempty(), - "sentinel notification-script %s %s", - master->name, master->notification_script); + line = sdscatprintf(sdsempty(), "sentinel notification-script %s ", qname); + line = sentinelSdscatConfigArg(line, master->notification_script); rewriteConfigRewriteLine(state,"sentinel notification-script",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } /* sentinel client-reconfig-script */ if (master->client_reconfig_script) { - line = sdscatprintf(sdsempty(), - "sentinel client-reconfig-script %s %s", - master->name, master->client_reconfig_script); + line = sdscatprintf(sdsempty(), "sentinel client-reconfig-script %s ", qname); + line = sentinelSdscatConfigArg(line, master->client_reconfig_script); rewriteConfigRewriteLine(state,"sentinel client-reconfig-script",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } /* sentinel auth-pass & auth-user */ if (master->auth_pass) { - line = sdscatprintf(sdsempty(), - "sentinel auth-pass %s %s", - master->name, master->auth_pass); + line = sdscatprintf(sdsempty(), "sentinel auth-pass %s ", qname); + line = sentinelSdscatConfigArg(line, master->auth_pass); rewriteConfigRewriteLine(state,"sentinel auth-pass",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } if (master->auth_user) { - line = sdscatprintf(sdsempty(), - "sentinel auth-user %s %s", - master->name, master->auth_user); + line = sdscatprintf(sdsempty(), "sentinel auth-user %s ", qname); + line = sentinelSdscatConfigArg(line, master->auth_user); rewriteConfigRewriteLine(state,"sentinel auth-user",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2121,7 +2141,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (master->master_reboot_down_after_period != 0) { line = sdscatprintf(sdsempty(), "sentinel master-reboot-down-after-period %s %ld", - master->name, (long) master->master_reboot_down_after_period); + qname, (long) master->master_reboot_down_after_period); rewriteConfigRewriteLine(state,"sentinel master-reboot-down-after-period",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2129,7 +2149,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel config-epoch */ line = sdscatprintf(sdsempty(), "sentinel config-epoch %s %llu", - master->name, (unsigned long long) master->config_epoch); + qname, (unsigned long long) master->config_epoch); rewriteConfigRewriteLine(state,"sentinel config-epoch",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2137,7 +2157,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel leader-epoch */ line = sdscatprintf(sdsempty(), "sentinel leader-epoch %s %llu", - master->name, (unsigned long long) master->leader_epoch); + qname, (unsigned long long) master->leader_epoch); rewriteConfigRewriteLine(state,"sentinel leader-epoch",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ @@ -2158,7 +2178,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { slave_addr = master->addr; line = sdscatprintf(sdsempty(), "sentinel known-replica %s %s %d", - master->name, announceSentinelAddr(slave_addr), slave_addr->port); + qname, announceSentinelAddr(slave_addr), slave_addr->port); /* try to replace any known-slave option first if found */ if (rewriteConfigRewriteLine(state, "sentinel known-slave", sdsdup(line), 0) == 0) { rewriteConfigRewriteLine(state, "sentinel known-replica", line, 1); @@ -2176,7 +2196,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { if (ri->runid == NULL) continue; line = sdscatprintf(sdsempty(), "sentinel known-sentinel %s %s %d %s", - master->name, announceSentinelAddr(ri->addr), ri->addr->port, ri->runid); + qname, announceSentinelAddr(ri->addr), ri->addr->port, ri->runid); rewriteConfigRewriteLine(state,"sentinel known-sentinel",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } @@ -2187,13 +2207,16 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { while((de = dictNext(&di2)) != NULL) { sds oldname = dictGetKey(de); sds newname = dictGetVal(de); - line = sdscatprintf(sdsempty(), - "sentinel rename-command %s %s %s", - master->name, oldname, newname); + line = sdscatprintf(sdsempty(), "sentinel rename-command %s ", qname); + line = sentinelSdscatConfigArg(line, oldname); + line = sdscatlen(line, " ", 1); + line = sentinelSdscatConfigArg(line, newname); rewriteConfigRewriteLine(state,"sentinel rename-command",line,1); /* rewriteConfigMarkAsProcessed is handled after the loop */ } dictResetIterator(&di2); + + sdsfree(qname); } /* sentinel current-epoch is a global state valid for all the masters. */ @@ -2221,7 +2244,8 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel sentinel-user. */ if (sentinel.sentinel_auth_user) { - line = sdscatprintf(sdsempty(), "sentinel sentinel-user %s", sentinel.sentinel_auth_user); + line = sdsnew("sentinel sentinel-user "); + line = sentinelSdscatConfigArg(line, sentinel.sentinel_auth_user); rewriteConfigRewriteLine(state,"sentinel sentinel-user",line,1); } else { rewriteConfigMarkAsProcessed(state,"sentinel sentinel-user"); @@ -2229,10 +2253,11 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { /* sentinel sentinel-pass. */ if (sentinel.sentinel_auth_pass) { - line = sdscatprintf(sdsempty(), "sentinel sentinel-pass %s", sentinel.sentinel_auth_pass); + line = sdsnew("sentinel sentinel-pass "); + line = sentinelSdscatConfigArg(line, sentinel.sentinel_auth_pass); rewriteConfigRewriteLine(state,"sentinel sentinel-pass",line,1); } else { - rewriteConfigMarkAsProcessed(state,"sentinel sentinel-pass"); + rewriteConfigMarkAsProcessed(state,"sentinel sentinel-pass"); } dictResetIterator(&di); @@ -3238,6 +3263,11 @@ void sentinelConfigSetCommand(client *c) { if (!(!strcasecmp(val->ptr, "debug") || !strcasecmp(val->ptr, "verbose") || !strcasecmp(val->ptr, "notice") || !strcasecmp(val->ptr, "warning") || !strcasecmp(val->ptr, "nothing"))) goto badfmt; + } else if (!strcasecmp(option, "announce-ip")) { + if (sentinelStringContainsControlChars(val->ptr)) { + addReplyErrorFormat(c, "'%s' must not contain control characters", option); + goto exit; + } } } @@ -4045,6 +4075,11 @@ NULL return; } + if (sentinelStringContainsControlChars(c->argv[2]->ptr)) { + addReplyError(c, "Master name must not contain control characters"); + return; + } + /* If resolve-hostnames is used, actual DNS resolution may take place. * Otherwise just validate address. */ @@ -4388,6 +4423,12 @@ void sentinelSetCommand(client *c) { goto seterr; } + if (sentinelStringContainsControlChars(value)) { + addReplyError(c, + "notification-script must not contain control characters"); + goto seterr; + } + if (strlen(value) && access(value,X_OK) == -1) { addReplyError(c, "Notification script seems non existing or non executable"); @@ -4407,6 +4448,12 @@ void sentinelSetCommand(client *c) { goto seterr; } + if (sentinelStringContainsControlChars(value)) { + addReplyError(c, + "client-reconfig-script must not contain control characters"); + goto seterr; + } + if (strlen(value) && access(value,X_OK) == -1) { addReplyError(c, "Client reconfiguration script seems non existing or " @@ -4450,6 +4497,13 @@ void sentinelSetCommand(client *c) { goto badfmt; } + if (sentinelStringContainsControlChars(oldname) || + sentinelStringContainsControlChars(newname)) { + addReplyError(c, + "rename-command arguments must not contain control characters"); + goto seterr; + } + /* Remove any older renaming for this command. */ dictDelete(ri->renamed_commands,oldname); diff --git a/src/server.c b/src/server.c index 241fe69aec2..10572eaeb6b 100644 --- a/src/server.c +++ b/src/server.c @@ -1035,7 +1035,8 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) { */ void updateClientMemoryUsage(client *c) { serverAssert(c->conn); - size_t mem = getClientMemoryUsage(c, NULL); + size_t mem = getClientMemoryUsage(c); + int type = getClientType(c); /* Now that we have the memory used by the client, remove the old * value from the old category, and add it back. */ @@ -1104,6 +1105,20 @@ int updateClientMemUsageAndBucket(client *c) { return 0; } + /* Include unshared reply bytes in the client's memory usage for eviction. + * Walking the reply buffer is costly, so skip the scan when its outcome + * cannot affect bucket placement: since 0 <= unshared <= shared, if both + * endpoints map to the same bucket the cached value is reused. */ + if (c->reply_bytes_shared > 0) { + size_t lower_bound = getClientMemoryUsage(c) - c->reply_bytes_unshared; + size_t upper_bound = lower_bound + c->reply_bytes_shared; + if (getMemUsageBucket(lower_bound) != getMemUsageBucket(upper_bound)) + updateClientUnsharedReplyBytes(c); + } else { + /* No shared bytes: clear any stale cached unshared. */ + c->reply_bytes_unshared = 0; + } + /* Update client memory usage. */ updateClientMemoryUsage(c); @@ -6330,7 +6345,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "mem_total_replication_buffers:%zu\r\n", server.repl_buffer_mem + server.repl_full_sync_buffer.mem_used, "mem_replica_full_sync_buffer:%zu\r\n", server.repl_full_sync_buffer.mem_used, "mem_clients_slaves:%zu\r\n", mh->clients_slaves, - "mem_clients_normal:%zu\r\n", mh->clients_normal, + "mem_clients_normal:%zu\r\n", mh->clients_normal, /* actual memory usage (includes unshared memory, excludes shared memory) */ + "mem_clients_normal_shared:%zu\r\n", mh->clients_normal_shared, /* shared memory (not solely owned by this client) */ + "mem_clients_normal_unshared:%zu\r\n", mh->clients_normal_unshared, /* unshared memory (solely owned by this client) */ "mem_cluster_slot_migration_output_buffer:%zu\r\n", mh->asm_migrate_output_buffer, "mem_cluster_slot_migration_input_buffer:%zu\r\n", mh->asm_import_input_buffer, "mem_cluster_slot_migration_input_buffer_peak:%zu\r\n", asmGetPeakSyncBufferSize(), diff --git a/src/server.h b/src/server.h index 0d4c7bd73ca..045ebd29e5b 100644 --- a/src/server.h +++ b/src/server.h @@ -1461,6 +1461,8 @@ typedef struct client { long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ + unsigned long long reply_bytes_shared; /* Bytes shared with keyspace objects in reply list. */ + unsigned long long reply_bytes_unshared; /* Cached subset of reply_bytes_shared solely owned by this client. */ list *deferred_reply_errors; /* Used for module thread safe contexts. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ @@ -1761,6 +1763,8 @@ struct redisMemOverhead { size_t replica_fullsync_buffer; size_t clients_slaves; size_t clients_normal; + size_t clients_normal_shared; + size_t clients_normal_unshared; size_t cluster_links; size_t aof_buffer; size_t eval_caches; @@ -3175,7 +3179,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv); void redactClientCommandArgument(client *c, int argc); size_t getClientOutputBufferMemoryUsage(client *c); size_t getNormalClientPendingReplyBytes(client *c); -size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage); +size_t getClientMemoryUsage(client *c); +void updateClientUnsharedReplyBytes(client *c); +void getClientsSharedMemoryUsage(size_t *shared_mem, size_t *unshared_mem); int freeClientsInAsyncFreeQueue(void); int closeClientOnOutputBufferLimitReached(client *c, int async); int getClientType(client *c); @@ -3993,6 +3999,7 @@ int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int pfmergeGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); diff --git a/src/t_hash.c b/src/t_hash.c index df3ac615be9..270cb3d1ffa 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -3589,15 +3589,16 @@ static int parseHashCommandArgs(client *c, HashCommandArgs *args, &numFields, "Parameter `numFields` should be greater than 0") != C_OK) return C_ERR; - args->fieldCount = (int)numFields; args->firstFieldPos = i + 2; /* Check bounds - we must have exactly the right number of fields */ - if (args->firstFieldPos + args->fieldCount > c->argc) { + if (numFields > c->argc - args->firstFieldPos) { addReplyError(c, "wrong number of arguments"); return C_ERR; } + args->fieldCount = (int)numFields; + /* Skip over the field arguments */ i = args->firstFieldPos + args->fieldCount - 1; continue; diff --git a/src/t_stream.c b/src/t_stream.c index cb20720fc52..74a86f59404 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1908,11 +1908,14 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna decrRefCount(argv[6]); } -/* We need this when we want to propagate creation of consumer that was created - * by XREADGROUP with the NOACK option. In that case, the only way to create - * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) +/* Propagate creation of a consumer that was implicitly created by XREADGROUP. + * Called only when no XCLAIM commands were propagated for this consumer, + * since XCLAIM implicitly creates the consumer on the replica. This covers + * two cases: + * (1) NOACK, where the PEL/XCLAIM path is skipped entirely. + * (2) no messages were available to deliver (see #7140). * - * XGROUP CREATECONSUMER + * XGROUP CREATECONSUMER */ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { robj *argv[5]; @@ -2873,6 +2876,7 @@ void xreadCommand(client *c) { int serve_claimed = 0; int serve_synchronously = 0; int serve_history = 0; /* True for XREADGROUP with ID != ">". */ + int consumer_created = 0; streamConsumer *consumer = NULL; /* Unused if XREAD */ streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */ @@ -2932,10 +2936,7 @@ void xreadCommand(client *c) { c->db->id,SCC_DEFAULT); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),o,old_alloc,kvobjAllocSize(o)); - if (noack) - streamPropagateConsumerCreation(c,spi.keyname, - spi.groupname, - consumer->name); + consumer_created = 1; } consumer->seen_time = commandTimeSnapshot(); keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ @@ -2961,6 +2962,7 @@ void xreadCommand(client *c) { flags |= STREAM_RWR_CLAIMED; } + unsigned long propCount = 0; if (serve_synchronously) { arraylen++; if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); @@ -2975,7 +2977,6 @@ void xreadCommand(client *c) { if (c->resp == 2) addReplyArrayLen(c,2); addReplyBulk(c,c->argv[streams_arg+i]); - unsigned long propCount = 0; if (noack) flags |= STREAM_RWR_NOACK; if (serve_history) flags |= STREAM_RWR_HISTORY; old_alloc = kvobjAllocSize(o); @@ -2989,6 +2990,14 @@ void xreadCommand(client *c) { keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ } } + + /* Propagate consumer creation only when no XCLAIM was generated, + * since XCLAIM implicitly creates the consumer on the replica. + * With NOACK the PEL/XCLAIM path is skipped entirely, so we + * always need explicit propagation regardless of propCount. */ + if (consumer_created && (noack || propCount == 0)) { + streamPropagateConsumerCreation(c,spi.keyname, spi.groupname, consumer->name); + } } /* We replied synchronously! Set the top array len and return to caller. */ @@ -4802,7 +4811,7 @@ void xtrimCommand(client *c) { /* Helper function for xinfoCommand. * Handles the variants of XINFO STREAM */ -void xinfoReplyWithStreamInfo(client *c, kvobj *kv) { +void xinfoReplyWithStreamInfo(client *c, robj *key, kvobj *kv) { stream *s = kv->ptr; int full = 1; long long count = 10; /* Default COUNT is 10 so we don't block the server */ @@ -5032,7 +5041,7 @@ void xinfoReplyWithStreamInfo(client *c, kvobj *kv) { } } if (server.memory_tracking_enabled) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),kv,old_alloc,kvobjAllocSize(kv)); + updateSlotAllocSize(c->db,getKeySlot(key->ptr),kv,old_alloc,kvobjAllocSize(kv)); } /* XINFO CONSUMERS @@ -5136,7 +5145,7 @@ NULL raxStop(&ri); } else if (!strcasecmp(opt,"STREAM")) { /* XINFO STREAM [FULL [COUNT ]]. */ - xinfoReplyWithStreamInfo(c,kv); + xinfoReplyWithStreamInfo(c,key,kv); } else { addReplySubcommandSyntaxError(c); } diff --git a/src/tracking.c b/src/tracking.c index 256b8b6b1f6..0f3ced61ad2 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -125,7 +125,7 @@ int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) { "Prefixes for a single client must not overlap.", (unsigned char *)prefixes[i]->ptr, (unsigned char *)prefixes[j]->ptr); - return i; + return 0; } } } diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl index 13c97a996eb..3f3768dda8a 100644 --- a/tests/integration/corrupt-dump.tcl +++ b/tests/integration/corrupt-dump.tcl @@ -1048,6 +1048,15 @@ test {corrupt payload: stream - duplicated consumer PEL entry} { } } +test {corrupt payload: fuzzer findings - decrRefCount on NULL robj on corrupt KEY_META payload} { + start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { + r config set sanitize-dump-payload no + r debug set-skip-checksum-validation 1 + catch {r restore key 0 "\xF3\x02\x01\x0D\x00\x54\x23\x3F\xC9\x82\x32\x05\x8D" replace} err + assert_match "*Bad data format*" $err + r ping + } +} } ;# tags diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0309099428b..709c8fa2134 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -880,27 +880,44 @@ proc compute_cpu_usage {start end} { return [ list $pucpu $pscpu ] } - +if {!$::valgrind} { # test diskless rdb pipe with multiple replicas, which may drop half way -start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { - set master [srv 0 client] - $master config set repl-diskless-sync yes - $master config set repl-diskless-sync-delay 5 - $master config set repl-diskless-sync-max-replicas 2 - set master_host [srv 0 host] - set master_port [srv 0 port] - set master_pid [srv 0 pid] - # put enough data in the db that the rdb file will be bigger than the socket buffers - # and since we'll have key-load-delay of 100, 20000 keys will take at least 2 seconds - # we also need the replica to process requests during transfer (which it does only once in 2mb) - $master debug populate 20000 test 10000 - $master config set rdbcompression no - $master config set repl-rdb-channel no - # If running on Linux, we also measure utime/stime to detect possible I/O handling issues - set os [catch {exec uname}] - set measure_time [expr {$os == "Linux"} ? 1 : 0] - foreach all_drop {no slow fast all timeout} { +foreach all_drop {no slow fast all timeout} { + start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { + set master [srv 0 client] + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 5 + $master config set repl-diskless-sync-max-replicas 2 + set master_host [srv 0 host] + set master_port [srv 0 port] + set master_pid [srv 0 pid] + if {$all_drop == "timeout"} { + # Use a larger RDB (~100 MB) so it cannot fit into the kernel TCP + # send buffer (autotuning can absorb tens of MB on some hosts). We + # need the primary to hit the blocked writer path + # (repl_last_partial_write != 0) while the slow replica is paused, + # so the cron triggers the "(full sync)" timeout path instead of + # the replica being moved to ONLINE prematurely and timing out via + # the "(streaming sync)" path. + $master debug populate 10000 test 10000 + } else { + # Put enough data in the db that the RDB is comfortably larger than the + # pipe and socket buffers so the primary can hit the blocked writer path, + # but keep it small enough that slow TLS CI runners don't spend minutes + # draining an oversized transfer (~40 MB uncompressed). + $master debug populate 4000 test 10000 + } + $master config set rdbcompression no + $master config set repl-rdb-channel no + # If running on Linux, we also measure utime/stime to detect possible I/O handling issues + set os [catch {exec uname}] + set measure_time [expr {$os == "Linux"} ? 1 : 0] + test "diskless $all_drop replicas drop during rdb pipe" { + # Reset config that the timeout subcase may change, so a failing + # subcase does not leave the next one with an aggressive timeout. + $master config set repl-timeout 60 + $master config set rdb-key-save-delay 0 set replicas {} set replicas_alive {} # start one replica that will read the rdb fast, and one that will be slow @@ -917,7 +934,24 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set loglines [count_log_lines -2] [lindex $replicas 0] config set repl-diskless-load swapdb [lindex $replicas 1] config set repl-diskless-load swapdb - [lindex $replicas 0] config set key-load-delay 100 ;# 20k keys and 100 microseconds sleep means at least 2 seconds + if {$all_drop == "all"} { + # Keep the RDB child generating data long enough for + # both replicas to be killed before the pipe reaches + # EOF, so this subcase still covers the last-replica + # drop path instead of racing with normal completion. + $master config set rdb-key-save-delay 1000 + } + # For non-timeout subcases, use key-load-delay to keep + # replica 0 as a steady slow reader for the entire RDB + # transfer. This keeps the expected diskless pipe code + # paths covered without accepting alternate log outcomes. + if {$all_drop != "timeout"} { + # 4k keys with 500 microseconds each keeps replica 0 + # slow for about 2 seconds, which is long enough to + # fill the pipe without turning the transfer into a + # multi-minute TLS run. + [lindex $replicas 0] config set key-load-delay 500 + } [lindex $replicas 0] replicaof $master_host $master_port [lindex $replicas 1] replicaof $master_host $master_port @@ -931,9 +965,16 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set start_time [clock seconds] } - # wait a while so that the pipe socket writer will be - # blocked on write (since replica 0 is slow to read from the socket) - after 500 + if {$all_drop != "timeout"} { + # key-load-delay is already throttling the slow + # replica; just wait for the pipe to fill. + after 500 + } else { + # For the timeout subcase, stop the slow reader so it + # reaches repl-timeout during full sync. + pause_process [srv -1 pid] + after 500 + } # add some command to be present in the command stream after the rdb. $master incr $all_drop @@ -948,14 +989,17 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set replicas_alive [lreplace $replicas_alive 0 0] } if {$all_drop == "timeout"} { + # Let one replica hit repl-timeout while the slow reader + # is paused, then restore a generous timeout so the + # remaining replica can finish the streamed RDB. $master config set repl-timeout 2 - # we want the slow replica to hang on a key for very long so it'll reach repl-timeout - pause_process [srv -1 pid] - after 2000 + wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 200 100 + $master config set repl-timeout 60 } - # wait for rdb child to exit - wait_for_condition 500 100 { + # Use a single generous budget for all subcases; successful + # runs still exit early once the child is done. + wait_for_condition 5000 100 { [s -2 rdb_bgsave_in_progress] == 0 } else { fail "rdb child didn't terminate" @@ -972,7 +1016,6 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 } if {$all_drop == "timeout"} { - wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 1 1 wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 # master disconnected the slow replica, remove from array set replicas_alive [lreplace $replicas_alive 0 0] @@ -996,18 +1039,23 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { assert {$master_utime < 70} assert {$master_stime < 70} } - if {!$::no_latency && ($all_drop == "none" || $all_drop == "fast")} { + if {!$::no_latency && ($all_drop == "no" || $all_drop == "fast")} { assert {$master_utime < 15} assert {$master_stime < 15} } } + # In the "no" case both replicas stay alive through the + # full streamed RDB, so on slow TLS runners the final + # ONLINE transition can lag behind child exit. + set replica_online_wait_tries [expr {$all_drop == "no" ? 600 : 150}] + # verify the data integrity foreach replica $replicas_alive { # Wait that replicas acknowledge they are online so # we are sure that DBSIZE and DEBUG DIGEST will not # fail because of timing issues. - wait_for_condition 150 100 { + wait_for_condition $replica_online_wait_tries 100 { [lindex [$replica role] 3] eq {connected} } else { fail "replicas still not connected after some time" @@ -1032,6 +1080,7 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { } } } +} ;# end of valgrind test "diskless replication child being killed is collected" { # when diskless master is waiting for the replica to become writable diff --git a/tests/sentinel/tests/15-config-set-config-get.tcl b/tests/sentinel/tests/15-config-set-config-get.tcl index f9831f8e866..16b30200060 100644 --- a/tests/sentinel/tests/15-config-set-config-get.tcl +++ b/tests/sentinel/tests/15-config-set-config-get.tcl @@ -6,17 +6,22 @@ test "SENTINEL CONFIG SET and SENTINEL CONFIG GET handles multiple variables" { } assert_match {*yes*1234*} [S 1 SENTINEL CONFIG GET resolve-hostnames announce-port] assert_match {announce-port 1234} [S 1 SENTINEL CONFIG GET announce-port] + foreach_sentinel_id id { + S $id SENTINEL CONFIG SET resolve-hostnames no announce-port 0 + } } test "SENTINEL CONFIG GET for duplicate and unknown variables" { assert_equal {OK} [S 1 SENTINEL CONFIG SET resolve-hostnames yes announce-port 1234] assert_match {resolve-hostnames yes} [S 1 SENTINEL CONFIG GET resolve-hostnames resolve-hostnames does-not-exist] + S 1 SENTINEL CONFIG SET resolve-hostnames no announce-port 0 } test "SENTINEL CONFIG GET for patterns" { assert_equal {OK} [S 1 SENTINEL CONFIG SET loglevel notice announce-port 1234 announce-hostnames yes ] assert_match {loglevel notice} [S 1 SENTINEL CONFIG GET log* *level loglevel] assert_match {announce-hostnames yes announce-ip*announce-port 1234} [S 1 SENTINEL CONFIG GET announce*] + S 1 SENTINEL CONFIG SET announce-port 0 announce-hostnames no } test "SENTINEL CONFIG SET duplicate variables" { @@ -36,6 +41,9 @@ test "SENTINEL CONFIG SET, one option does not exist" { } # The announce-port should not be set to 1234 as it was called with a wrong argument assert_match {*111*} [S 1 SENTINEL CONFIG GET announce-port] + foreach_sentinel_id id { + S $id SENTINEL CONFIG SET announce-port 0 + } } test "SENTINEL CONFIG SET, one option with wrong value" { diff --git a/tests/sentinel/tests/16-config-injection.tcl b/tests/sentinel/tests/16-config-injection.tcl new file mode 100644 index 00000000000..6aff07de989 --- /dev/null +++ b/tests/sentinel/tests/16-config-injection.tcl @@ -0,0 +1,312 @@ +# Test that control characters are rejected where appropriate, and that +# string values are safely quoted when persisted to disk. +# +# Config injection is prevented by sentinelSdscatConfigArg(), which escapes +# values containing special characters at persistence time. Fields like +# notification-script, rename-command, master name, and announce-ip also +# reject control characters at input time as an additional safeguard. + +source "../tests/includes/init-tests.tcl" + +# Helper: read the sentinel config file for a given sentinel id. +proc read_sentinel_config {id} { + set configfile [file join "sentinel_${id}" "sentinel.conf"] + set fp [open $configfile r] + set content [read $fp] + close $fp + return $content +} + +# Helper: count how many lines in the config match a pattern. +proc count_config_lines {content pattern} { + set count 0 + foreach line [split $content "\n"] { + if {[string match $pattern $line]} { + incr count + } + } + return $count +} + +# Helper: restart a (already stopped) sentinel and wait until it responds to PING. +proc start_sentinel_and_wait {sid} { + restart_instance sentinel $sid + wait_for_condition 200 50 { + [catch {S $sid PING}] == 0 + } else { + fail "Sentinel $sid did not restart in time" + } +} + +# Helper: kill sentinel, restart it, and wait until it responds to PING. +proc restart_sentinel_and_wait {sid} { + kill_instance sentinel $sid + start_sentinel_and_wait $sid +} + +# Helper: assert that the sentinel config file contains the expected substring. +proc assert_config_contains {sid expected} { + set content [read_sentinel_config $sid] + assert {[string first $expected $content] >= 0} +} + +# Helper: append lines to a sentinel's config file (sentinel must be stopped). +proc append_to_sentinel_config {sid lines} { + set configfile [file join "sentinel_${sid}" "sentinel.conf"] + set fp [open $configfile a] + foreach line $lines { + puts $fp $line + } + close $fp +} + +# Helper: create an executable script with spaces in its path. +# Returns the full path. Caller should "file delete -force" the directory. +proc create_script_with_spaces {sid} { + set script_dir [file join [pwd] "sentinel_${sid}" "script dir"] + file mkdir $script_dir + set script_path [file join $script_dir "my script.sh"] + set fp [open $script_path w] + puts $fp "#!/bin/sh" + close $fp + file attributes $script_path -permissions 0755 + return $script_path +} + +# -------------------------------------------------------------------------- +# Section 1: Control character rejection in SENTINEL SET +# -------------------------------------------------------------------------- + +test "SENTINEL SET notification-script rejects control characters" { + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster notification-script "/tmp/ok\n/tmp/evil.sh" + } +} + +test "SENTINEL SET client-reconfig-script rejects control characters" { + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster client-reconfig-script "/tmp/ok\n/tmp/evil.sh" + } +} + +test "SENTINEL SET rename-command rejects control characters" { + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster rename-command "CONFIG\nEVIL" "NEWCONFIG" + } + assert_error "*must not contain control characters*" { + S 0 SENTINEL SET mymaster rename-command "CONFIG" "NEW\nCONFIG" + } +} + +# -------------------------------------------------------------------------- +# Section 2: Control character rejection in SENTINEL MONITOR +# -------------------------------------------------------------------------- + +test "SENTINEL MONITOR rejects master name with control characters" { + set port [get_instance_attrib redis 0 port] + assert_error "*must not contain control characters*" { + S 0 SENTINEL MONITOR "bad\nmaster" 127.0.0.1 $port 2 + } + assert_error "*must not contain control characters*" { + S 0 SENTINEL MONITOR "bad\rmaster" 127.0.0.1 $port 2 + } +} + +# -------------------------------------------------------------------------- +# Section 3: Control character rejection in SENTINEL CONFIG SET +# -------------------------------------------------------------------------- + +test "SENTINEL CONFIG SET announce-ip rejects control characters" { + catch {S 0 SENTINEL CONFIG SET announce-ip "1.2.3.4\nevil-directive"} e + assert_match "*must not contain control characters*" $e +} + +# -------------------------------------------------------------------------- +# Section 4: Config injection attempt does not pollute config file +# -------------------------------------------------------------------------- + +test "Newline injection in auth-pass does not pollute config file" { + # Auth-pass accepts control characters, but sentinelSdscatConfigArg + # escapes them at persistence time, preventing config injection. + S 0 SENTINEL SET mymaster auth-pass "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel auth-pass mymaster "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL SET mymaster auth-pass "" +} + +test "Newline injection in auth-user does not pollute config file" { + S 0 SENTINEL SET mymaster auth-user "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel auth-user mymaster "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL SET mymaster auth-user "" +} + +test "Newline injection in sentinel-pass does not pollute config file" { + S 0 SENTINEL CONFIG SET sentinel-pass "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel sentinel-pass "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL CONFIG SET sentinel-pass "" +} + +test "Newline injection in sentinel-user does not pollute config file" { + S 0 SENTINEL CONFIG SET sentinel-user "x\nsentinel notification-script mymaster /tmp/evil.sh" + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + assert {[count_config_lines $content "sentinel notification-script mymaster /tmp/evil.sh"] == 0} + assert_config_contains 0 {sentinel sentinel-user "x\nsentinel notification-script mymaster /tmp/evil.sh"} + S 0 SENTINEL CONFIG SET sentinel-user "" +} + +# -------------------------------------------------------------------------- +# Section 5: Values with special characters survive config round-trip +# -------------------------------------------------------------------------- + +test "auth-pass with special characters persists correctly through restart" { + S 0 SENTINEL SET mymaster auth-pass {my "comp#$&^`'!,lex pass} + set expected {sentinel auth-pass mymaster "my \"comp#$&^`'!,lex pass"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + S 0 SENTINEL SET mymaster auth-pass "" +} + +test "auth-user with spaces persists correctly through restart" { + S 0 SENTINEL SET mymaster auth-user {user with spaces} + set expected {sentinel auth-user mymaster "user with spaces"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + S 0 SENTINEL SET mymaster auth-user "" +} + +test "notification-script with spaces persists correctly through restart" { + set script_path [create_script_with_spaces 0] + S 0 SENTINEL SET mymaster notification-script $script_path + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + # The path must be quoted since it contains spaces. + assert {[string first "notification-script" $content] >= 0} + restart_sentinel_and_wait 0 + set info [S 0 SENTINEL MASTER mymaster] + set idx [lsearch $info "notification-script"] + assert {$idx >= 0} + assert_equal [lindex $info [expr {$idx+1}]] $script_path + S 0 SENTINEL SET mymaster notification-script "" + file delete -force [file dirname $script_path] +} + +test "client-reconfig-script with spaces persists correctly through restart" { + set script_path [create_script_with_spaces 0] + S 0 SENTINEL SET mymaster client-reconfig-script $script_path + S 0 SENTINEL FLUSHCONFIG + set content [read_sentinel_config 0] + # The path must be quoted since it contains spaces. + assert {[string first "client-reconfig-script" $content] >= 0} + restart_sentinel_and_wait 0 + set info [S 0 SENTINEL MASTER mymaster] + set idx [lsearch $info "client-reconfig-script"] + assert {$idx >= 0} + assert_equal [lindex $info [expr {$idx+1}]] $script_path + S 0 SENTINEL SET mymaster client-reconfig-script "" + file delete -force [file dirname $script_path] +} + +test "rename-command persists unquoted through restart" { + S 0 SENTINEL SET mymaster rename-command CONFIG CONF_RENAMED + set expected {sentinel rename-command mymaster CONFIG CONF_RENAMED} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + S 0 SENTINEL SET mymaster rename-command CONFIG CONFIG +} + +# -------------------------------------------------------------------------- +# Section 6: Backward compatibility -- old unquoted config format still loads +# -------------------------------------------------------------------------- + +test "Old unquoted config format for auth-pass and auth-user loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel auth-pass mymaster oldformatpass" + "sentinel auth-user mymaster oldformatuser" + } + start_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 "sentinel auth-pass mymaster oldformatpass" + assert_config_contains 0 "sentinel auth-user mymaster oldformatuser" + S 0 SENTINEL SET mymaster auth-pass "" + S 0 SENTINEL SET mymaster auth-user "" +} + +test "Old unquoted config format for rename-command loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel rename-command mymaster CONFIG NEWCONFIGNAME" + } + start_sentinel_and_wait 0 + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 "sentinel rename-command mymaster CONFIG NEWCONFIGNAME" + S 0 SENTINEL SET mymaster rename-command CONFIG CONFIG +} + +test "Old unquoted config format for sentinel-pass loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel sentinel-pass oldsentinelpass" + } + start_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-pass] + assert_equal [lindex $result 1] "oldsentinelpass" + S 0 SENTINEL CONFIG SET sentinel-pass "" +} + +test "Old unquoted config format for sentinel-user loads correctly" { + kill_instance sentinel 0 + append_to_sentinel_config 0 { + "sentinel sentinel-user oldsentineluser" + } + start_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-user] + assert_equal [lindex $result 1] "oldsentineluser" + S 0 SENTINEL CONFIG SET sentinel-user "" +} + +# -------------------------------------------------------------------------- +# Section 7: Values with special characters survive config round-trip +# -------------------------------------------------------------------------- + +test "sentinel-pass with special characters persists correctly through restart" { + set test_pass {sentinel pass word} + S 0 SENTINEL CONFIG SET sentinel-pass $test_pass + set expected {sentinel sentinel-pass "sentinel pass word"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-pass] + assert_equal [lindex $result 1] $test_pass + S 0 SENTINEL CONFIG SET sentinel-pass "" +} + +test "sentinel-user with special characters persists correctly through restart" { + set test_user {sentinel user name} + S 0 SENTINEL CONFIG SET sentinel-user $test_user + set expected {sentinel sentinel-user "sentinel user name"} + S 0 SENTINEL FLUSHCONFIG + assert_config_contains 0 $expected + restart_sentinel_and_wait 0 + set result [S 0 SENTINEL CONFIG GET sentinel-user] + assert_equal [lindex $result 1] $test_user + S 0 SENTINEL CONFIG SET sentinel-user "" +} diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl index 7f5d6691ad5..afe32e4f960 100644 --- a/tests/unit/client-eviction.tcl +++ b/tests/unit/client-eviction.tcl @@ -52,7 +52,6 @@ proc kb {v} { start_server {} { set maxmemory_clients 3000000 r config set maxmemory-clients $maxmemory_clients - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage test "client evicted due to large argv" { r flushdb @@ -328,7 +327,6 @@ start_server {} { set obuf_limit [mb 3] r config set maxmemory-clients $maxmemory_clients r config set client-output-buffer-limit "normal $obuf_limit 0 0" - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage test "avoid client eviction when client is freed by output buffer limit" { r flushdb @@ -391,7 +389,6 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage test "decrease maxmemory-clients causes client eviction" { set maxmemory_clients [mb 4] @@ -432,8 +429,6 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - test "evict clients only until below limit" { set client_count 10 set client_mem [mb 1] @@ -501,8 +496,6 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - test "evict clients in right order (large to small)" { # Note that each size step needs to be at least x2 larger than previous step # because of how the client-eviction size bucketing works @@ -571,15 +564,13 @@ start_server {} { } start_server {} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - foreach type {"client no-evict" "maxmemory-clients disabled"} { r flushall r client no-evict on r config set maxmemory-clients 0 test "client total memory grows during $type" { - r setrange k [mb 1] v + r setrange k [kb 10] v ;# Keep value <= 16KB to avoid copy-avoidance, which shares memory and slows tot-mem growth. set rr [redis_client] $rr client setname test_client if {$type eq "client no-evict"} { @@ -591,8 +582,9 @@ start_server {} { # Fill output buffer in loop without reading it and make sure # the tot-mem of client has increased (OS buffers didn't swallow it) # and eviction not occurring. + set mget_args [lrepeat 100 k] ;# Use mget with 100 keys so each reply adds ~1MB to tot-mem, reaching 10MB faster. while {true} { - $rr get k + $rr mget {*}$mget_args $rr flush after 10 if {[client_field test_client tot-mem] > [mb 10]} { diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index 57b550ab727..5f78b7f0fd2 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -64,4 +64,31 @@ start_cluster 1 1 {tags {external:skip cluster}} { catch {[$replica EXEC]} err assert_match {EXECABORT*} $err } + + # Regression: shard channel slot must not follow getKeySlot() current_client + # cache when CLIENT KILL runs inside another client's EXEC (pubsubUnsubscribeChannel). + test {Shard pubsub: CLIENT KILL subscriber inside MULTI/EXEC (cross-slot)} { + # SET fixes the transaction client's slot to keyk's slot; the subscriber must + # use a shard channel in a different slot so a wrong-slot lookup would fail. + set keyk "{06S}k" + set channel "{Qi}ch" + assert {[R 0 cluster keyslot $channel] != [R 0 cluster keyslot $keyk]} + + set rd_sub [redis_deferring_client] + $rd_sub client id + set cid [$rd_sub read] + $rd_sub ssubscribe $channel + $rd_sub read + + $primary multi + $primary set $keyk v + $primary client kill id $cid + set got [$primary exec] + + assert_equal {OK 1} $got + assert_equal PONG [$primary ping] + + catch {$rd_sub read} + $rd_sub close + } } diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index ee6d6d19adf..d0246667111 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -393,8 +393,6 @@ start_server {tags {"info" "external:skip"}} { } test {stats: client input and output buffer limit disconnections} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - r config resetstat set info [r info stats] assert_equal [getInfoProperty $info client_query_buffer_limit_disconnections] {0} diff --git a/tests/unit/introspection-2.tcl b/tests/unit/introspection-2.tcl index 1e98fdb6a02..32932ef1a86 100644 --- a/tests/unit/introspection-2.tcl +++ b/tests/unit/introspection-2.tcl @@ -189,6 +189,17 @@ start_server {tags {"introspection"}} { assert_equal {key1 key2} [r command getkeys lcs key1 key2] } + test {COMMAND GETKEYS PFMERGE with and without source keys} { + # dest + sources: both key specs yield keys + assert_equal {dest src1 src2} [r command getkeys PFMERGE dest src1 src2] + + # dest only, no source keys: spec[1] yields empty range (last < first). + # Without pfmergeGetKeys this returned "Invalid arguments" because + # getKeysUsingKeySpecs treated the empty range as invalid_spec, + # discarding the dest key found by spec[0]. + assert_equal {dest} [r command getkeys PFMERGE dest] + } + test {COMMAND GETKEYS MORE THAN 256 KEYS} { set all_keys [list] set numkeys 260 diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 47a5e211439..ac749111adb 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -21,9 +21,9 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { set client_list [r client list] if {[lindex [r config get io-threads] 1] == 1} { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list } else { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client_list } } @@ -36,9 +36,9 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { set client [r client info] if {[lindex [r config get io-threads] 1] == 1} { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client } else { - assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client + assert_match {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 omem-shared=0 omem-unshared=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* io-thread=* tot-net-in=* tot-net-out=* tot-cmds=*} $client } } diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index f9b36b49159..f86ed7ce802 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -15,7 +15,6 @@ start_server {tags {"maxmemory" "external:skip"}} { r config set maxmemory 11mb r config set maxmemory-policy allkeys-lru set server_pid [s process_id] - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage proc init_test {client_eviction} { r flushdb @@ -29,11 +28,11 @@ start_server {tags {"maxmemory" "external:skip"}} { } r config resetstat - # fill 5mb using 50 keys of 100kb - for {set j 0} {$j < 50} {incr j} { - r setrange $j 100000 x + # fill 5mb using 500 keys of 10kb + for {set j 0} {$j < 500} {incr j} { + r setrange key$j 10000 x } - assert_equal [r dbsize] 50 + assert_equal [r dbsize] 500 } # Return true if the eviction occurred (client or key) based on argument @@ -44,12 +43,12 @@ start_server {tags {"maxmemory" "external:skip"}} { if $client_eviction { if {[lindex [r config get io-threads] 1] == 1} { - return [expr $evicted_clients > 0 && $evicted_keys == 0 && $dbsize == 50] + return [expr $evicted_clients > 0 && $evicted_keys == 0 && $dbsize == 500] } else { - return [expr $evicted_clients >= 0 && $evicted_keys >= 0 && $dbsize <= 50] + return [expr $evicted_clients >= 0 && $evicted_keys >= 0 && $dbsize <= 500] } } else { - return [expr $evicted_clients == 0 && $evicted_keys > 0 && $dbsize < 50] + return [expr $evicted_clients == 0 && $evicted_keys > 0 && $dbsize < 500] } } @@ -84,7 +83,7 @@ start_server {tags {"maxmemory" "external:skip"}} { while {![check_eviction_test $client_eviction] && [expr [clock seconds] - $t] < 20} { foreach rr $clients { if {[catch { - $rr mget 1 + $rr mget key1 key2 key3 key4 key5 key6 key7 key8 key9 key10 $rr flush } err]} { lremove clients $rr diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 0ab12c6c7e4..895248606e0 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -83,8 +83,6 @@ run_solo {defrag} { # note: Disabling lookahead because it changes the number and order of allocations which interferes with defrag and causes tests to fail r config set lookahead 1 - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} { test "Active defrag main dictionary: $type" { r config set hz 100 diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 148187b739a..f58eeda8981 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -1,6 +1,4 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} { - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage - test {CONFIG SET client-output-buffer-limit} { set oldval [lindex [r config get client-output-buffer-limit] 1] @@ -237,4 +235,82 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} { assert_match "*I/O error*" $e reconnect } + + test "zero-copy referenced reply bytes are reflected in memory stats" { + r flushdb + r config set client-output-buffer-limit {normal 0 0 0} + # Use a value large enough to trigger copy avoidance + set val_size 100000 + r set bigkey [string repeat v $val_size] + + # Use MULTI/EXEC so all observers see the zero-copy ref before it is sent. + r client setname refmem_test + r multi + r get bigkey ;# adds zero-copy ref to output buffer + r client list ;# per-client omem / omem-shared / omem-unshared / tot-mem + r info memory ;# global mem_clients_normal_shared / mem_clients_normal_unshared + r memory stats ;# clients.normal.shared and clients.normal.unshared + set res [r exec] + + # omem-shared tracks total shared reply bytes, key is still alive so omem-unshared must be 0. + set clients [split [string trim [lindex $res 1]] "\r\n"] + set c [lsearch -inline $clients *name=refmem_test*] + regexp {omem-shared=([0-9]+)} $c - omem_shared + regexp {omem-unshared=([0-9]+)} $c - omem_unshared + assert {$omem_shared >= $val_size} + assert_equal 0 $omem_unshared + + # mem_clients_normal_shared is incremented at write time, before the reply is sent + set info_mem [lindex $res 2] + assert {[getInfoProperty $info_mem mem_clients_normal_shared] >= $val_size} + assert_equal 0 [getInfoProperty $info_mem mem_clients_normal_unshared] + + # MEMORY STATS exposes the same shared bytes; normal.unshared is 0 since the key is still in keyspace + set mem_stats [lindex $res 3] + assert {[dict get $mem_stats clients.normal.shared] >= $val_size} + assert_equal 0 [dict get $mem_stats clients.normal.unshared] ;# key still in keyspace + + # After the reply is fully sent, the global counter must return to 0 + wait_for_condition 50 10 { + [s mem_clients_normal_shared] == 0 + } else { + fail "mem_clients_normal_shared did not return to 0 after reply was sent" + } + } + + test "shared reply bytes are tracked as unshared after the key is deleted" { + r flushdb + r config set client-output-buffer-limit {normal 0 0 0} + + set rr [redis_deferring_client] + $rr client setname test_client + $rr flush + + # Repeatedly SET/GET/DEL a big key on a deferred client and poll CLIENT LIST + # until omem-unshared on test_client reflects the referenced bytes. + set val_size 100000 + set deadline [expr {[clock milliseconds] + 5000}] + while {true} { + r set k [string repeat v $val_size] + $rr get k + $rr del k + $rr flush + after 10 + + set clients [split [r client list] "\r\n"] + set c [lsearch -inline $clients *name=test_client*] + regexp {omem-shared=([0-9]+)} $c - omem_shared + regexp {omem-unshared=([0-9]+)} $c - omem_unshared + if {$omem_unshared >= $val_size} { + assert_morethan_equal $omem_shared $omem_unshared + break + } + + if {[clock milliseconds] > $deadline} { + fail "timed out waiting for omem-unshared to reflect unshared bytes" + } + } + + $rr close + } } diff --git a/tests/unit/replybufsize.tcl b/tests/unit/replybufsize.tcl index 302417cf8f2..151d7757db4 100644 --- a/tests/unit/replybufsize.tcl +++ b/tests/unit/replybufsize.tcl @@ -13,7 +13,6 @@ start_server {tags {"replybufsize"}} { test {verify reply buffer limits} { # In order to reduce test time we can set the peak reset time very low r debug replybuffer peak-reset-time 100 - r debug reply-copy-avoidance 0 ;# Disable copy avoidance because it affects memory usage # Create a simple idle test client variable tc [redis_client] @@ -27,13 +26,13 @@ start_server {tags {"replybufsize"}} { fail "reply buffer of idle client is $rbs after 1 seconds" } - r set bigval [string repeat x 32768] + r set bigval [string repeat x 8192] ;# Keep value <= 16KB to avoid copy-avoidance, which shares memory and slows tot-mem growth. # In order to reduce test time we can set the peak reset time very low r debug replybuffer peak-reset-time never wait_for_condition 10 100 { - [$tc get bigval ; get_reply_buffer_size test_client] >= 16384 && [get_reply_buffer_size test_client] < 32768 + [$tc mget bigval bigval bigval bigval ; get_reply_buffer_size test_client] >= 16384 && [get_reply_buffer_size test_client] < 32768 } else { set rbs [get_reply_buffer_size test_client] fail "reply buffer of busy client is $rbs after 1 seconds" diff --git a/tests/unit/scan.tcl b/tests/unit/scan.tcl index 6a092cb4e95..c3ec5f273c1 100644 --- a/tests/unit/scan.tcl +++ b/tests/unit/scan.tcl @@ -471,6 +471,21 @@ proc test_scan {type} { } } + test "{$type} SCAN COUNT overflow" { + r flushdb + populate 10 + + # count = LONG_MAX/10 + 1, within LONG_MAX so it parses fine, + # but count*10 overflows signed long which is undefined behavior. + # Compute dynamically to support both 32-bit and 64-bit builds. + set long_max [expr {[s arch_bits] == 32 ? 2147483647 : 9223372036854775807}] + set big_count [expr {$long_max / 10 + 1}] + set res [r scan 0 count $big_count] + assert {[llength $res] == 2} + assert_equal 0 [lindex $res 0] + assert_equal 10 [llength [lindex $res 1]] + } + test "{$type} SCAN MATCH pattern implies cluster slot" { # Tests the code path for an optimization for patterns like "{foo}-*" # which implies that all matching keys belong to one slot. diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 666b5930e43..174575eee98 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -413,6 +413,18 @@ start_server {tags {"tracking network logreqres:skip"}} { $r CLIENT TRACKING OFF } + test {BCAST prefix self-overlap past first index reports error without enabling} { + # When any of the provided BCAST prefixes overlap with each other, + # CLIENT TRACKING ON must reply with a single error and leave tracking + # disabled, regardless of the position of the overlapping prefix in + # the argument list. + r CLIENT TRACKING OFF + catch {r CLIENT TRACKING ON BCAST PREFIX BAZ PREFIX FOOBAR PREFIX FOO} output + assert_match {ERR Prefix 'FOOBAR' overlaps with another provided prefix 'FOO'*} $output + # Tracking must not have been enabled after the overlap error. + assert_match {*flags off*} [r CLIENT TRACKINGINFO] + } + test {hdel deliver invalidate message after response in the same connection} { r CLIENT TRACKING off r HELLO 3 diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 3c0f8fbccfe..13bfbb250ab 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -2349,6 +2349,11 @@ start_server {tags {"hash"}} { assert_error {*Parameter*numFields*should be greater than 0*} {r HEXPIRE myhash 60 FIELDS -1 f1} assert_error {*invalid number of fields*} {r HSETEX myhash FIELDS 0 f1 v1 EX 60} assert_error {*invalid number of fields*} {r HGETEX myhash FIELDS 0 f1 EX 60} + set future_sec [expr {[clock seconds] + 60}] + set future_ms [expr {[clock milliseconds] + 60000}] + foreach {cmd expire} [list HEXPIRE 60 HPEXPIRE 60000 HEXPIREAT $future_sec HPEXPIREAT $future_ms] { + assert_error {*wrong number of arguments*} [list r $cmd myhash $expire FIELDS 2147483647 f1] + } # Test missing FIELDS keyword assert_error {*unknown argument*} {r HEXPIRE myhash 60 2 f1 f2} diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 4990275e218..adc12ff8c88 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -1904,7 +1904,80 @@ start_server { } } } - + + start_server {tags {"repl external:skip" "stream"}} { + # Verify that XREADGROUP propagates a newly created consumer to + # the replica in cases where no XCLAIM is generated (XCLAIM + # implicitly creates the consumer, so explicit propagation is + # only needed when it is absent). Two cases are tested: + # 1. Without NOACK and no messages to deliver — no XCLAIM at all. + # 2. With NOACK and messages delivered — NOACK skips PEL/XCLAIM. + test "XREADGROUP propagates new consumer to replica" { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + start_server {tags {"stream"}} { + set replica [srv 0 client] + + $replica replicaof $master_host $master_port + wait_for_sync $replica + + $master DEL mystream + $master XADD mystream 1-0 f v + $master XGROUP CREATE mystream grp 0 + + # Consume the only message so the stream has no + # new messages pending for delivery. + $master XREADGROUP GROUP grp c1 STREAMS mystream > + $master XACK mystream grp 1-0 + + wait_for_ofs_sync $master $replica + + # Case 1: XREADGROUP without NOACK for a brand-new + # consumer when there are NO messages to deliver. + # No XCLAIM is generated, so the consumer must be + # explicitly propagated. + set reply [$master XREADGROUP GROUP grp c2 STREAMS mystream >] + assert_equal $reply {} + + set master_consumers [$master XINFO CONSUMERS mystream grp] + set master_names [lmap c $master_consumers {dict get $c name}] + assert {[lsearch $master_names "c2"] >= 0} + + wait_for_ofs_sync $master $replica + + set replica_consumers [$replica XINFO CONSUMERS mystream grp] + set replica_names [lmap c $replica_consumers {dict get $c name}] + if {[lsearch $replica_names "c2"] < 0} { + fail "Consumer 'c2' not found on replica (have: $replica_names)" + } + + # Case 2: XREADGROUP with NOACK for a brand-new consumer + # when a message IS available. NOACK skips PEL/XCLAIM + # entirely, so the consumer must be explicitly propagated + # even though messages were delivered. + $master XADD mystream 2-0 f v + wait_for_ofs_sync $master $replica + + set reply [$master XREADGROUP GROUP grp c3 NOACK STREAMS mystream >] + assert {$reply ne {}} + + set master_consumers [$master XINFO CONSUMERS mystream grp] + set master_names [lmap c $master_consumers {dict get $c name}] + assert {[lsearch $master_names "c3"] >= 0} + + wait_for_ofs_sync $master $replica + + set replica_consumers [$replica XINFO CONSUMERS mystream grp] + set replica_names [lmap c $replica_consumers {dict get $c name}] + if {[lsearch $replica_names "c3"] < 0} { + fail "Consumer 'c3' not found on replica (have: $replica_names)" + } + } + } + } + start_server {} { if {!$::force_resp3} { test "XREADGROUP CLAIM field types are correct" {