diff --git a/modules/redisearch/Makefile b/modules/redisearch/Makefile index 7dbb9db6fea..22d3a545062 100644 --- a/modules/redisearch/Makefile +++ b/modules/redisearch/Makefile @@ -3,5 +3,10 @@ MODULE_VERSION = v8.2.13 MODULE_REPO = https://github.com/redisearch/redisearch TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/search-community/redisearch.so + # Set INLINE_LSE_ATOMICS=1 for perf improvement on common ARM CPUs (i.e. Graviton2/3/4); no effect on x86 or macOS. + # Default 0 keeps the binary runnable on pre-Armv8.1-a cores (Cortex-A72, Graviton1, RPi4) that would otherwise SIGILL at module load. +INLINE_LSE_ATOMICS ?= 0 +export INLINE_LSE_ATOMICS + include ../common.mk diff --git a/modules/vector-sets/tests/vsim_limit_efsearch.py b/modules/vector-sets/tests/vsim_limit_efsearch.py new file mode 100644 index 00000000000..25b96891342 --- /dev/null +++ b/modules/vector-sets/tests/vsim_limit_efsearch.py @@ -0,0 +1,32 @@ +from test import TestCase, generate_random_vector +import struct + +class VSIMLimitEFSearch(TestCase): + def getname(self): + return "VSIM Limit EF Search" + + def estimated_runtime(self): + return 0.2 + + def test(self): + dim = 32 + vec = generate_random_vector(dim) + vec_bytes = struct.pack(f'{dim}f', *vec) + + # Add test vector + self.redis.execute_command('VADD', self.test_key, 'FP32', vec_bytes, f'{self.test_key}:item:1') + + query_vec = generate_random_vector(dim) + + # Test EF upper bound (should accept 1000000) + result = self.redis.execute_command('VSIM', self.test_key, 'VALUES', dim, + *[str(x) for x in query_vec], 'EF', 1000000) + assert isinstance(result, list), "EF=1000000 should be accepted" + + # Test EF over limit (should reject > 1000000) + try: + self.redis.execute_command('VSIM', self.test_key, 'VALUES', dim, + *[str(x) for x in query_vec], 'EF', 1000001) + assert False, "EF=1000001 should be rejected" + except Exception as e: + assert "invalid EF" in str(e), f"Expected EF validation error, got: {e}" diff --git a/modules/vector-sets/vset.c b/modules/vector-sets/vset.c index acad46af79c..0ed02a373fb 100644 --- a/modules/vector-sets/vset.c +++ b/modules/vector-sets/vset.c @@ -830,10 +830,12 @@ void VSIM_execute(RedisModuleCtx *ctx, struct vsetObject *vset, if (ef == 0) ef = VSET_DEFAULT_SEARCH_EF; if (count > ef) ef = count; + int slot = hnsw_acquire_read_slot(vset->hnsw); + if (ef > vset->hnsw->node_count) ef = vset->hnsw->node_count; + /* Perform search */ hnswNode **neighbors = RedisModule_Alloc(sizeof(hnswNode*)*ef); float *distances = RedisModule_Alloc(sizeof(float)*ef); - int slot = hnsw_acquire_read_slot(vset->hnsw); unsigned int found; if (ground_truth) { found = hnsw_ground_truth_with_filter(vset->hnsw, vec, ef, neighbors, @@ -1085,7 +1087,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { j += 2; } else if (!strcasecmp(opt, "EF") && j+1 < argc) { if (RedisModule_StringToLongLong(argv[j+1], &ef) != - REDISMODULE_OK || ef <= 0) + REDISMODULE_OK || ef <= 0 || ef > 1000000) { RedisModule_Free(vec); return RedisModule_ReplyWithError(ctx, "ERR invalid EF"); diff --git a/src/blocked.c b/src/blocked.c index 61bfae635c5..fddd59acda8 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -122,6 +122,17 @@ void processUnblockedClients(void) { listDelNode(server.unblocked_clients,ln); c->flags &= ~CLIENT_UNBLOCKED; + /* Reset the client for a new query, unless the client has pending command to process. */ + if (!(c->flags & CLIENT_PENDING_COMMAND)) { + freeClientOriginalArgv(c); + /* Clients that are not blocked on keys are not reprocessed so we must + * call reqresAppendResponse here (for clients blocked on key, + * unblockClientOnKey is called, which eventually calls processCommand, + * which calls reqresAppendResponse) */ + reqresAppendResponse(c); + resetClient(c); + } + if (c->flags & CLIENT_MODULE) { if (!(c->flags & CLIENT_BLOCKED)) { moduleCallCommandUnblockedHandler(c); @@ -191,17 +202,6 @@ void unblockClient(client *c, int queue_for_reprocessing) { serverPanic("Unknown btype in unblockClient()."); } - /* Reset the client for a new query, unless the client has pending command to process - * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ - if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { - freeClientOriginalArgv(c); - /* Clients that are not blocked on keys are not reprocessed so we must - * call reqresAppendResponse here (for clients blocked on key, - * unblockClientOnKey is called, which eventually calls processCommand, - * which calls reqresAppendResponse) */ - reqresAppendResponse(c); - resetClient(c); - } /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ @@ -266,6 +266,7 @@ void replyToClientsBlockedOnShutdown(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) { + c->duration = 0; addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); unblockClient(c, 1); } diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 5f547812a05..0e88ebfb018 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -422,6 +422,7 @@ int clusterLoadConfig(char *filename) { * of the aux fields is insignificant. */ int aux_tcp_port = 0; int aux_tls_port = 0; + int aux_shard_id = 0; for (int i = 2; i < aux_argc; i++) { int field_argc; sds *field_argv; @@ -454,6 +455,7 @@ int clusterLoadConfig(char *filename) { continue; } field_found = 1; + aux_shard_id |= j == af_shard_id; aux_tcp_port |= j == af_tcp_port; aux_tls_port |= j == af_tls_port; if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) { @@ -560,12 +562,14 @@ int clusterLoadConfig(char *filename) { * by an older version of Redis; * ignore replica's shard_id in the file, only use the primary's. * If replica precedes primary in file, it will be corrected - * later by the auxShardIdSetter */ + * later by the auxShardIdSetter. + * Remove node from its old shard before adding it to the new one. */ + if (aux_shard_id == 1) clusterRemoveNodeFromShard(n); memcpy(n->shard_id, master->shard_id, CLUSTER_NAMELEN); clusterAddNodeToShard(master->shard_id, n); n->slaveof = master; clusterNodeAddSlave(master,n); - } else if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { + } else if (aux_shard_id == 0) { /* n is a primary but it does not have a persisted shard_id. * This happens if we are loading a nodes.conf generated by * an older version of Redis. We should manually update the @@ -1852,8 +1856,8 @@ void clusterBlacklistAddNode(clusterNode *node) { /* Return non-zero if the specified node ID exists in the blacklist. * You don't need to pass an sds string here, any pointer to 40 bytes * will work. */ -int clusterBlacklistExists(char *nodeid) { - sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN); +int clusterBlacklistExists(char *nodeid, size_t len) { + sds id = sdsnewlen(nodeid,len); int retval; clusterBlacklistCleanup(); @@ -2208,7 +2212,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { * joining another cluster. */ if (sender && !(flags & CLUSTER_NODE_NOADDR) && - !clusterBlacklistExists(g->nodename)) + !clusterBlacklistExists(g->nodename, CLUSTER_NAMELEN)) { clusterNode *node; node = createClusterNode(g->nodename, flags); @@ -6235,7 +6239,7 @@ int clusterCommandSpecial(client *c) { /* CLUSTER FORGET */ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); if (!n) { - if (clusterBlacklistExists((char*)c->argv[2]->ptr)) + if (clusterBlacklistExists((char*)c->argv[2]->ptr, sdslen(c->argv[2]->ptr))) /* Already forgotten. The deletion may have been gossipped by * another node, so we pretend it succeeded. */ addReply(c,shared.ok); diff --git a/src/config.c b/src/config.c index bb3de01e4fe..0c458fb2c14 100644 --- a/src/config.c +++ b/src/config.c @@ -2405,13 +2405,7 @@ static int isValidAnnouncedNodename(char *val,const char **err) { return 1; } -static int isValidAnnouncedHostname(char *val, const char **err) { - if (strlen(val) >= NET_HOST_STR_LEN) { - *err = "Hostnames must be less than " - STRINGIFY(NET_HOST_STR_LEN) " characters"; - return 0; - } - +static int isValidHostnameChars(char *val, const char **err) { int i = 0; char c; while ((c = val[i])) { @@ -2429,6 +2423,15 @@ static int isValidAnnouncedHostname(char *val, const char **err) { return 1; } +static int isValidAnnouncedHostname(char *val, const char **err) { + if (strlen(val) >= NET_HOST_STR_LEN) { + *err = "Hostnames must be less than " + STRINGIFY(NET_HOST_STR_LEN) " characters"; + return 0; + } + return isValidHostnameChars(val, err); +} + /* Validation function for cluster-announce-ip. * Ensures the IP address is valid and rejects control characters. */ static int isValidClusterAnnounceIp(char *val, const char **err) { @@ -2438,12 +2441,19 @@ static int isValidClusterAnnounceIp(char *val, const char **err) { return 1; } - if (inet_pton(AF_INET, val, buf) != 1 && - inet_pton(AF_INET6, val, buf) != 1) { - *err = "Cluster announce IP must be a valid IPv4 or IPv6 address"; + /* Accept valid IPv4 or IPv6 */ + if (inet_pton(AF_INET, val, buf) == 1 || inet_pton(AF_INET6, val, buf) == 1) { + return 1; + } + /* Also accept valid hostnames, but limited to NET_IP_STR_LEN since + * cluster_announce_ip is stored in a NET_IP_STR_LEN buffer */ + if (strlen(val) >= NET_IP_STR_LEN) { + *err = "Hostnames for cluster-announce-ip must be less than " + STRINGIFY(NET_IP_STR_LEN) " characters"; return 0; } - return 1; + /* Also accept valid hostnames */ + return isValidHostnameChars(val, err); } /* Validate specified string is a valid proc-title-template */ diff --git a/src/db.c b/src/db.c index 8bbc20c3430..2ba1ba4ca03 100644 --- a/src/db.c +++ b/src/db.c @@ -227,8 +227,9 @@ kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) { /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ - if (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH && - server.executing_client->cmd->proc != touchCommand) + if (((flags & LOOKUP_NOTOUCH) == 0) && + (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH) && + (server.executing_client && server.executing_client->cmd->proc != touchCommand)) flags |= LOOKUP_NOTOUCH; if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { @@ -1333,9 +1334,22 @@ void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { /* o and typename can not have values at the same time. */ serverAssert(!((data->type != LLONG_MAX) && o)); + kvobj *kv = NULL; if (!o) { /* If scanning keyspace */ - kvobj *kv = dictGetKV(de); - + kv = dictGetKV(de); + keyStr = kvobjGetKey(kv); + } else { + keyStr = dictGetKey(de); + } + + /* Filter element if it does not match the pattern. */ + if (data->pattern) { + if (!stringmatchlen(data->pattern, sdslen(data->pattern), keyStr, data->strlen(keyStr), 0)) { + return; + } + } + + if (!o) { /* Expiration check first - only for database keyspace scanning. * Use kv obj to avoid robj creation. */ if (expireIfNeeded(data->db, NULL, kv, 0) != KEY_VALID) @@ -1350,17 +1364,6 @@ void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { if (!objectTypeCompare(kv, data->type)) return; } - - keyStr = kvobjGetKey(kv); - } else { - keyStr = dictGetKey(de); - } - - /* Filter element if it does not match the pattern. */ - if (data->pattern) { - if (!stringmatchlen(data->pattern, sdslen(data->pattern), keyStr, data->strlen(keyStr), 0)) { - return; - } } if (o == NULL) { diff --git a/src/dict.h b/src/dict.h index 27b37233cb1..f192f9c6b83 100644 --- a/src/dict.h +++ b/src/dict.h @@ -127,12 +127,14 @@ struct dict { long rehashidx; /* rehashing not in progress if rehashidx == -1 */ - /* Keep small vars at end for optimal (minimal) struct padding */ - unsigned pauserehash : 15; /* If >0 rehashing is paused */ + /* Note: pauserehash is a full unsigned so iterator increments + * don't perform RMW on the same storage unit as other bitfields. */ + unsigned pauserehash; /* If >0 rehashing is paused */ - unsigned useStoredKeyApi : 1; /* See comment of storedHashFunction above */ + /* Keep small vars at end for optimal (minimal) struct padding */ signed char ht_size_exp[2]; /* exponent of size. (size = 1<0 automatic resizing is disallowed (<0 indicates coding error) */ + signed pauseAutoResize: 15; /* If >0 automatic resizing is disallowed (<0 indicates coding error) */ + unsigned useStoredKeyApi: 1; /* See comment of storedHashFunction above */ void *metadata[]; }; diff --git a/src/module.c b/src/module.c index 1f1de37598f..bcc91354d3e 100644 --- a/src/module.c +++ b/src/module.c @@ -9216,7 +9216,7 @@ size_t RM_GetClusterSize(void) { int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) { UNUSED(ctx); - clusterNode *node = clusterLookupNode(id, strlen(id)); + clusterNode *node = clusterLookupNode(id, CLUSTER_NAMELEN); if (node == NULL || clusterNodePending(node)) { return REDISMODULE_ERR; @@ -12968,7 +12968,8 @@ int setModuleStringConfig(ModuleConfig *config, sds strval, const char **err) { int setModuleEnumConfig(ModuleConfig *config, int val, const char **err) { RedisModuleString *error = NULL; - int return_code = config->set_fn.set_enum(config->name, val, config->privdata, &error); + char *rname = getRegisteredConfigName(config); + int return_code = config->set_fn.set_enum(rname, val, config->privdata, &error); propagateErrorString(error, err); return return_code == REDISMODULE_OK ? 1 : 0; } diff --git a/src/networking.c b/src/networking.c index a26149d74a1..e04d14f00a0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2908,7 +2908,7 @@ int processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ - if (c->flags & CLIENT_BLOCKED) break; + if (c->flags & CLIENT_BLOCKED || c->flags & CLIENT_UNBLOCKED) break; /* Don't process more buffers from clients that have already pending * commands to execute in c->argv. */ diff --git a/src/t_stream.c b/src/t_stream.c index 3ef48943fe5..6d3cd27d6b6 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -674,7 +674,9 @@ typedef struct { int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ int delete_strategy; /* DELETE_STRATEGY_* */ int approx_trim; /* If 1 only delete whole radix tree nodes, so - * the trim argument is not applied verbatim. */ + * the trim argument is not applied verbatim. + * Note: This flag is ignored when delete_strategy is non-KEEPREF. + * Individual entries may still be processed for consumer groups. */ long long limit; /* Maximum amount of entries to trim. If 0, no limitation * on the amount of trimming work is enforced. */ /* TRIM_STRATEGY_MAXLEN options */ @@ -781,8 +783,11 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { } /* If we cannot remove a whole element, and approx is true, - * stop here. */ - if (approx) break; + * stop here. However, for non-KEEPREF strategies, if the node was + * eligible for removal but we couldn't remove it (because we need + * to check consumer group references), we should continue to process + * entries within this node. */ + if (approx && delete_strategy == DELETE_STRATEGY_KEEPREF) break; /* Now we have to trim entries from within 'lp' */ int64_t deleted_from_lp = 0; @@ -853,12 +858,12 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { if (can_delete) { /* Mark the entry as deleted. */ - intptr_t delta = p - lp; + intptr_t delta = p ? (p - lp) : 0; /* p may be NULL if this was the last entry */ flags |= STREAM_ITEM_FLAG_DELETED; lp = lpReplaceInteger(lp, &pcopy, flags); deleted_from_lp++; s->length--; - p = lp + delta; + if (p) p = lp + delta; } } } @@ -2786,6 +2791,7 @@ static void streamFreeCG(streamCG *cg) { /* Destroy a consumer group and clean up all associated references. */ void streamDestroyCG(stream *s, streamCG *cg) { + /* Remove all references from the cgroups_ref. */ raxIterator it; raxStart(&it, cg->pel); raxSeek(&it, "^", NULL, 0); @@ -2795,6 +2801,12 @@ void streamDestroyCG(stream *s, streamCG *cg) { } raxStop(&it); + /* If we're destroying the group with the minimum last_id, the cached + * minimum is no longer valid and needs to be recalculated from the + * remaining groups. */ + if (s->min_cgroup_last_id_valid && streamCompareID(&s->min_cgroup_last_id, &cg->last_id) == 0) + s->min_cgroup_last_id_valid = 0; + streamFreeCG(cg); } diff --git a/tests/cluster/tests/00-base.tcl b/tests/cluster/tests/00-base.tcl index 693dded1d8f..514087d66fe 100644 --- a/tests/cluster/tests/00-base.tcl +++ b/tests/cluster/tests/00-base.tcl @@ -87,3 +87,7 @@ test "CLUSTER SLAVES and CLUSTER REPLICAS with zero replicas" { assert_equal {} [R 0 cluster slaves [R 0 CLUSTER MYID]] assert_equal {} [R 0 cluster replicas [R 0 CLUSTER MYID]] } + +test "CLUSTER FORGET with invalid node ID" { + assert_error {*ERR Unknown node*} {R 0 cluster forget 1} +} diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 8f2322823bf..ece8f394e74 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -880,27 +880,44 @@ proc compute_cpu_usage {start end} { return [ list $pucpu $pscpu ] } - +if {!$::valgrind} { # test diskless rdb pipe with multiple replicas, which may drop half way -start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { - set master [srv 0 client] - $master config set repl-diskless-sync yes - $master config set repl-diskless-sync-delay 5 - $master config set repl-diskless-sync-max-replicas 2 - set master_host [srv 0 host] - set master_port [srv 0 port] - set master_pid [srv 0 pid] - # put enough data in the db that the rdb file will be bigger than the socket buffers - # and since we'll have key-load-delay of 100, 20000 keys will take at least 2 seconds - # we also need the replica to process requests during transfer (which it does only once in 2mb) - $master debug populate 20000 test 10000 - $master config set rdbcompression no - $master config set repl-rdb-channel no - # If running on Linux, we also measure utime/stime to detect possible I/O handling issues - set os [catch {exec uname}] - set measure_time [expr {$os == "Linux"} ? 1 : 0] - foreach all_drop {no slow fast all timeout} { +foreach all_drop {no slow fast all timeout} { + start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { + set master [srv 0 client] + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 5 + $master config set repl-diskless-sync-max-replicas 2 + set master_host [srv 0 host] + set master_port [srv 0 port] + set master_pid [srv 0 pid] + if {$all_drop == "timeout"} { + # Use a larger RDB (~100 MB) so it cannot fit into the kernel TCP + # send buffer (autotuning can absorb tens of MB on some hosts). We + # need the primary to hit the blocked writer path + # (repl_last_partial_write != 0) while the slow replica is paused, + # so the cron triggers the "(full sync)" timeout path instead of + # the replica being moved to ONLINE prematurely and timing out via + # the "(streaming sync)" path. + $master debug populate 10000 test 10000 + } else { + # Put enough data in the db that the RDB is comfortably larger than the + # pipe and socket buffers so the primary can hit the blocked writer path, + # but keep it small enough that slow TLS CI runners don't spend minutes + # draining an oversized transfer (~40 MB uncompressed). + $master debug populate 4000 test 10000 + } + $master config set rdbcompression no + $master config set repl-rdb-channel no + # If running on Linux, we also measure utime/stime to detect possible I/O handling issues + set os [catch {exec uname}] + set measure_time [expr {$os == "Linux"} ? 1 : 0] + test "diskless $all_drop replicas drop during rdb pipe" { + # Reset config that the timeout subcase may change, so a failing + # subcase does not leave the next one with an aggressive timeout. + $master config set repl-timeout 60 + $master config set rdb-key-save-delay 0 set replicas {} set replicas_alive {} # start one replica that will read the rdb fast, and one that will be slow @@ -916,7 +933,25 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { # so that the whole rdb generation process is bound to that set loglines [count_log_lines -2] [lindex $replicas 0] config set repl-diskless-load swapdb - [lindex $replicas 0] config set key-load-delay 100 ;# 20k keys and 100 microseconds sleep means at least 2 seconds + [lindex $replicas 1] config set repl-diskless-load swapdb + if {$all_drop == "all"} { + # Keep the RDB child generating data long enough for + # both replicas to be killed before the pipe reaches + # EOF, so this subcase still covers the last-replica + # drop path instead of racing with normal completion. + $master config set rdb-key-save-delay 1000 + } + # For non-timeout subcases, use key-load-delay to keep + # replica 0 as a steady slow reader for the entire RDB + # transfer. This keeps the expected diskless pipe code + # paths covered without accepting alternate log outcomes. + if {$all_drop != "timeout"} { + # 4k keys with 500 microseconds each keeps replica 0 + # slow for about 2 seconds, which is long enough to + # fill the pipe without turning the transfer into a + # multi-minute TLS run. + [lindex $replicas 0] config set key-load-delay 500 + } [lindex $replicas 0] replicaof $master_host $master_port [lindex $replicas 1] replicaof $master_host $master_port @@ -930,9 +965,16 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set start_time [clock seconds] } - # wait a while so that the pipe socket writer will be - # blocked on write (since replica 0 is slow to read from the socket) - after 500 + if {$all_drop != "timeout"} { + # key-load-delay is already throttling the slow + # replica; just wait for the pipe to fill. + after 500 + } else { + # For the timeout subcase, stop the slow reader so it + # reaches repl-timeout during full sync. + pause_process [srv -1 pid] + after 500 + } # add some command to be present in the command stream after the rdb. $master incr $all_drop @@ -947,14 +989,17 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { set replicas_alive [lreplace $replicas_alive 0 0] } if {$all_drop == "timeout"} { + # Let one replica hit repl-timeout while the slow reader + # is paused, then restore a generous timeout so the + # remaining replica can finish the streamed RDB. $master config set repl-timeout 2 - # we want the slow replica to hang on a key for very long so it'll reach repl-timeout - pause_process [srv -1 pid] - after 2000 + wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 200 100 + $master config set repl-timeout 60 } - # wait for rdb child to exit - wait_for_condition 500 100 { + # Use a single generous budget for all subcases; successful + # runs still exit early once the child is done. + wait_for_condition 5000 100 { [s -2 rdb_bgsave_in_progress] == 0 } else { fail "rdb child didn't terminate" @@ -971,7 +1016,6 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 } if {$all_drop == "timeout"} { - wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 1 1 wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 # master disconnected the slow replica, remove from array set replicas_alive [lreplace $replicas_alive 0 0] @@ -995,18 +1039,23 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { assert {$master_utime < 70} assert {$master_stime < 70} } - if {!$::no_latency && ($all_drop == "none" || $all_drop == "fast")} { + if {!$::no_latency && ($all_drop == "no" || $all_drop == "fast")} { assert {$master_utime < 15} assert {$master_stime < 15} } } + # In the "no" case both replicas stay alive through the + # full streamed RDB, so on slow TLS runners the final + # ONLINE transition can lag behind child exit. + set replica_online_wait_tries [expr {$all_drop == "no" ? 600 : 150}] + # verify the data integrity foreach replica $replicas_alive { # Wait that replicas acknowledge they are online so # we are sure that DBSIZE and DEBUG DIGEST will not # fail because of timing issues. - wait_for_condition 150 100 { + wait_for_condition $replica_online_wait_tries 100 { [lindex [$replica role] 3] eq {connected} } else { fail "replicas still not connected after some time" @@ -1031,6 +1080,7 @@ start_server {tags {"repl external:skip tsan:skip"} overrides {save ""}} { } } } +} ;# end of valgrind test "diskless replication child being killed is collected" { # when diskless master is waiting for the replica to become writable diff --git a/tests/integration/shutdown.tcl b/tests/integration/shutdown.tcl index 4169d64b7eb..2c05caf10a3 100644 --- a/tests/integration/shutdown.tcl +++ b/tests/integration/shutdown.tcl @@ -177,6 +177,12 @@ test "Shutting down master waits for replica then fails" { catch { $rd2 read } e2 assert_match "*Errors trying to SHUTDOWN. Check logs*" $e1 assert_match "*Errors trying to SHUTDOWN. Check logs*" $e2 + + # Verify that after shutdown is cancelled, the client is properly + # reset and can handle other commands normally. + $rd1 PING + assert_equal "PONG" [$rd1 read] + $rd1 close $rd2 close diff --git a/tests/modules/moduleconfigs.c b/tests/modules/moduleconfigs.c index 04974f7527e..2c3701537eb 100644 --- a/tests/modules/moduleconfigs.c +++ b/tests/modules/moduleconfigs.c @@ -1,5 +1,7 @@ #include "redismodule.h" +#include #include +#include int mutable_bool_val, no_prefix_bool, no_prefix_bool2; int immutable_bool_val; long long longval, no_prefix_longval; @@ -13,38 +15,37 @@ int flagsval; * to point to the config, and they register the configs as such. Note that one could also just * use names if they wanted, and store anything in privdata. */ int getBoolConfigCommand(const char *name, void *privdata) { - REDISMODULE_NOT_USED(name); + assert(strcmp(name, "mutable_bool") == 0 || strcmp(name, "immutable_bool") == 0); return (*(int *)privdata); } int setBoolConfigCommand(const char *name, int new, void *privdata, RedisModuleString **err) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(err); + assert(strcmp(name, "mutable_bool") == 0 || strcmp(name, "immutable_bool") == 0); *(int *)privdata = new; return REDISMODULE_OK; } long long getNumericConfigCommand(const char *name, void *privdata) { - REDISMODULE_NOT_USED(name); + assert(strcmp(name, "numeric") == 0 || strcmp(name, "memory_numeric") == 0); return (*(long long *) privdata); } int setNumericConfigCommand(const char *name, long long new, void *privdata, RedisModuleString **err) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(err); + assert(strcmp(name, "numeric") == 0 || strcmp(name, "memory_numeric") == 0); *(long long *)privdata = new; return REDISMODULE_OK; } RedisModuleString *getStringConfigCommand(const char *name, void *privdata) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(privdata); + assert(strcmp(name, "string") == 0); return strval; } int setStringConfigCommand(const char *name, RedisModuleString *new, void *privdata, RedisModuleString **err) { - REDISMODULE_NOT_USED(name); - REDISMODULE_NOT_USED(err); REDISMODULE_NOT_USED(privdata); + assert(strcmp(name, "string") == 0); size_t len; if (!strcasecmp(RedisModule_StringPtrLen(new, &len), "rejectisfreed")) { *err = RedisModule_CreateString(NULL, "Cannot set string to 'rejectisfreed'", 36); @@ -57,29 +58,29 @@ int setStringConfigCommand(const char *name, RedisModuleString *new, void *privd } int getEnumConfigCommand(const char *name, void *privdata) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(privdata); + assert(strcmp(name, "enum") == 0); return enumval; } int setEnumConfigCommand(const char *name, int val, void *privdata, RedisModuleString **err) { - REDISMODULE_NOT_USED(name); - REDISMODULE_NOT_USED(err); REDISMODULE_NOT_USED(privdata); + REDISMODULE_NOT_USED(err); + assert(strcmp(name, "enum") == 0); enumval = val; return REDISMODULE_OK; } int getFlagsConfigCommand(const char *name, void *privdata) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(privdata); + assert(strcmp(name, "flags") == 0); return flagsval; } int setFlagsConfigCommand(const char *name, int val, void *privdata, RedisModuleString **err) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(err); REDISMODULE_NOT_USED(privdata); + assert(strcmp(name, "flags") == 0); flagsval = val; return REDISMODULE_OK; } @@ -102,34 +103,60 @@ int longlongApplyFunc(RedisModuleCtx *ctx, void *privdata, RedisModuleString **e return REDISMODULE_ERR; } return REDISMODULE_OK; -} +} RedisModuleString *getStringConfigUnprefix(const char *name, void *privdata) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(privdata); + assert(strcmp(name, "unprefix-string") == 0 || strcmp(name, "unprefix.string-alias") == 0); return strval2; } int setStringConfigUnprefix(const char *name, RedisModuleString *new, void *privdata, RedisModuleString **err) { - REDISMODULE_NOT_USED(name); - REDISMODULE_NOT_USED(err); REDISMODULE_NOT_USED(privdata); + REDISMODULE_NOT_USED(err); + assert(strcmp(name, "unprefix-string") == 0 || strcmp(name, "unprefix.string-alias") == 0); if (strval2) RedisModule_FreeString(NULL, strval2); RedisModule_RetainString(NULL, new); strval2 = new; return REDISMODULE_OK; } +int getBoolConfigUnprefix(const char *name, void *privdata) { + assert(strcmp(name, "unprefix-bool") == 0 || strcmp(name, "unprefix-bool-alias") == 0 || + strcmp(name, "unprefix-noalias-bool") == 0); + return (*(int *)privdata); +} + +int setBoolConfigUnprefix(const char *name, int new, void *privdata, RedisModuleString **err) { + REDISMODULE_NOT_USED(err); + assert(strcmp(name, "unprefix-bool") == 0 || strcmp(name, "unprefix-bool-alias") == 0 || + strcmp(name, "unprefix-noalias-bool") == 0); + *(int *)privdata = new; + return REDISMODULE_OK; +} + +long long getNumericConfigUnprefix(const char *name, void *privdata) { + assert(strcmp(name, "unprefix.numeric") == 0 || strcmp(name, "unprefix.numeric-alias") == 0); + return (*(long long *) privdata); +} + +int setNumericConfigUnprefix(const char *name, long long new, void *privdata, RedisModuleString **err) { + REDISMODULE_NOT_USED(err); + assert(strcmp(name, "unprefix.numeric") == 0 || strcmp(name, "unprefix.numeric-alias") == 0); + *(long long *)privdata = new; + return REDISMODULE_OK; +} + int getEnumConfigUnprefix(const char *name, void *privdata) { - REDISMODULE_NOT_USED(name); REDISMODULE_NOT_USED(privdata); + assert(strcmp(name, "unprefix-enum") == 0 || strcmp(name, "unprefix-enum-alias") == 0); return no_prefix_enumval; } int setEnumConfigUnprefix(const char *name, int val, void *privdata, RedisModuleString **err) { - REDISMODULE_NOT_USED(name); - REDISMODULE_NOT_USED(err); REDISMODULE_NOT_USED(privdata); + REDISMODULE_NOT_USED(err); + assert(strcmp(name, "unprefix-enum") == 0 || strcmp(name, "unprefix-enum-alias") == 0); no_prefix_enumval = val; return REDISMODULE_OK; } @@ -222,21 +249,21 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) } /*** unprefixed and aliased configuration ***/ - if (RedisModule_RegisterBoolConfig(ctx, "unprefix-bool|unprefix-bool-alias", 1, REDISMODULE_CONFIG_DEFAULT|REDISMODULE_CONFIG_UNPREFIXED, - getBoolConfigCommand, setBoolConfigCommand, NULL, &no_prefix_bool) == REDISMODULE_ERR) { + if (RedisModule_RegisterBoolConfig(ctx, "unprefix-bool|unprefix-bool-alias", 1, REDISMODULE_CONFIG_DEFAULT|REDISMODULE_CONFIG_UNPREFIXED, + getBoolConfigUnprefix, setBoolConfigUnprefix, NULL, &no_prefix_bool) == REDISMODULE_ERR) { RedisModule_Log(ctx, "warning", "Failed to register unprefix-bool"); return REDISMODULE_ERR; } if (RedisModule_RegisterBoolConfig(ctx, "unprefix-noalias-bool", 1, REDISMODULE_CONFIG_DEFAULT|REDISMODULE_CONFIG_UNPREFIXED, - getBoolConfigCommand, setBoolConfigCommand, NULL, &no_prefix_bool2) == REDISMODULE_ERR) { + getBoolConfigUnprefix, setBoolConfigUnprefix, NULL, &no_prefix_bool2) == REDISMODULE_ERR) { RedisModule_Log(ctx, "warning", "Failed to register unprefix-noalias-bool"); return REDISMODULE_ERR; - } - if (RedisModule_RegisterNumericConfig(ctx, "unprefix.numeric|unprefix.numeric-alias", -1, REDISMODULE_CONFIG_DEFAULT|REDISMODULE_CONFIG_UNPREFIXED, - -5, 2000, getNumericConfigCommand, setNumericConfigCommand, NULL, &no_prefix_longval) == REDISMODULE_ERR) { + } + if (RedisModule_RegisterNumericConfig(ctx, "unprefix.numeric|unprefix.numeric-alias", -1, REDISMODULE_CONFIG_DEFAULT|REDISMODULE_CONFIG_UNPREFIXED, + -5, 2000, getNumericConfigUnprefix, setNumericConfigUnprefix, NULL, &no_prefix_longval) == REDISMODULE_ERR) { RedisModule_Log(ctx, "warning", "Failed to register unprefix.numeric"); return REDISMODULE_ERR; - } + } if (RedisModule_RegisterStringConfig(ctx, "unprefix-string|unprefix.string-alias", "secret unprefix", REDISMODULE_CONFIG_DEFAULT|REDISMODULE_CONFIG_UNPREFIXED, getStringConfigUnprefix, setStringConfigUnprefix, NULL, NULL) == REDISMODULE_ERR) { RedisModule_Log(ctx, "warning", "Failed to register unprefix-string"); diff --git a/tests/unit/cluster/announced-endpoints.tcl b/tests/unit/cluster/announced-endpoints.tcl index 5784ea6f512..58643a2a78a 100644 --- a/tests/unit/cluster/announced-endpoints.tcl +++ b/tests/unit/cluster/announced-endpoints.tcl @@ -75,21 +75,26 @@ start_cluster 2 2 {tags {external:skip cluster}} { # Tests for cluster-announce-ip validation test "cluster-announce-ip validation" { + # Reject control characters in IP-like values catch {R 0 config set cluster-announce-ip "192.168.1.100\nnext"} err - assert_match "*valid IPv4 or IPv6*" $err + assert_match "*alphanumeric*" $err catch {R 0 config set cluster-announce-ip "10.0.0.1\ttab"} err - assert_match "*valid IPv4 or IPv6*" $err + assert_match "*alphanumeric*" $err catch {R 0 config set cluster-announce-ip "1.2.3.4\r\n"} err - assert_match "*valid IPv4 or IPv6*" $err + assert_match "*alphanumeric*" $err - catch {R 0 config set cluster-announce-ip "redis-node-1.example.com"} err - assert_match "*valid IPv4 or IPv6*" $err + # Reject control characters in hostname-like values + catch {R 0 config set cluster-announce-ip "redis-node\nnext"} err + assert_match "*alphanumeric*" $err - catch {R 0 config set cluster-announce-ip "192.168.1"} err - assert_match "*valid IPv4 or IPv6*" $err + catch {R 0 config set cluster-announce-ip "redis-node\ttab"} err + assert_match "*alphanumeric*" $err + catch {R 0 config set cluster-announce-ip "redis-node\r\n"} err + assert_match "*alphanumeric*" $err + # Accept valid IPv4 R 0 config set cluster-announce-ip "192.168.1.100" assert_equal "192.168.1.100" [lindex [R 0 config get cluster-announce-ip] 1] @@ -98,6 +103,10 @@ start_cluster 2 2 {tags {external:skip cluster}} { R 0 config set cluster-announce-ip "2001:db8::1" assert_equal "2001:db8::1" [lindex [R 0 config get cluster-announce-ip] 1] + # Accept valid hostname + R 0 config set cluster-announce-ip "redis-node-1.example.com" + assert_equal "redis-node-1.example.com" [lindex [R 0 config get cluster-announce-ip] 1] + # Can be cleared R 0 config set cluster-announce-ip "" assert_equal "" [lindex [R 0 config get cluster-announce-ip] 1] diff --git a/tests/unit/scan.tcl b/tests/unit/scan.tcl index 8ab14d3b161..6a092cb4e95 100644 --- a/tests/unit/scan.tcl +++ b/tests/unit/scan.tcl @@ -164,25 +164,29 @@ proc test_scan {type} { r debug set-active-expire 1 } {OK} {needs:debug} - test "{$type} SCAN with expired keys with TYPE filter" { + test "{$type} SCAN with expired keys with TYPE filter and PATTERN filter" { r flushdb # make sure that passive expiration is triggered by the scan r debug set-active-expire 0 populate 1000 - r set foo bar - r pexpire foo 1 + r set key:foo bar + r pexpire key:foo 1 # add a hash type key - r hset hash f v - r pexpire hash 1 + r hset key:hash f v + r pexpire key:hash 1 + + # add a pattern key + r set boo far + r pexpire boo 1 after 2 set cur 0 set keys {} while 1 { - set res [r scan $cur type "string" count 10] + set res [r scan $cur type "string" match key* count 10] set cur [lindex $res 0] set k [lindex $res 1] lappend keys {*}$k @@ -191,12 +195,11 @@ proc test_scan {type} { assert_equal 1000 [llength $keys] - # make sure that expired key have been removed by scan command - assert_equal 1000 [scan [regexp -inline {keys\=([\d]*)} [r info keyspace]] keys=%d] - # TODO: uncomment in redis 8.0 - # make sure that only the expired key in the type match will been removed by scan command - #assert_equal 1001 [scan [regexp -inline {keys\=([\d]*)} [r info keyspace]] keys=%d] - + # make sure that expired key have been removed by scan command, + # pattern check before expired so key filtered by pattern will not be removed + # but expiration check is before type check so key:foo and key:hash will be removed + assert_equal 1001 [scan [regexp -inline {keys\=([\d]*)} [r info keyspace]] keys=%d] + r debug set-active-expire 1 } {OK} {needs:debug} diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index ab69f088a9b..8d17c5c3857 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -2464,4 +2464,37 @@ foreach {pop} {BLPOP BLMPOP_RIGHT} { $rd close } + + test "CLIENT NO-TOUCH with BRPOP and RPUSH regression test" { + # Test scenario: + # 1. Client 1: CLIENT NO-TOUCH on + # 2. Client 2: BRPOP mylist 0 + # 3. Client 1: RPUSH mylist elem + + # cleanup first + r del mylist + + # Create two test clients + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + # Client 1: Enable CLIENT NO-TOUCH + $rd1 client no-touch on + assert_equal {OK} [$rd1 read] + + # Client 2: Block waiting for elements in mylist + $rd2 brpop mylist 0 + wait_for_blocked_client + + # Client 1: Push an element to mylist + $rd1 rpush mylist elem + assert_equal {1} [$rd1 read] + + # Verify Client 2 received the element + assert_equal {mylist elem} [$rd2 read] + + $rd1 close + $rd2 close + } + } ;# stop servers diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a5265056c37..eaaba588e0d 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -567,6 +567,33 @@ start_server { assert_equal 0 [r XLEN mystream] } + test {XGROUP DESTROY correctly manage min_cgroup_last_id cache} { + r DEL mystream + # Add some entries + r XADD mystream 1-0 f1 v1 + r XADD mystream 2-0 f2 v2 + r XADD mystream 3-0 f3 v3 + r XADD mystream 4-0 f4 v4 + r XADD mystream 5-0 f5 v5 + + # Create two consumer groups + r XGROUP CREATE mystream group1 1-0 ;# min_cgroup_last_id is 1-0 now + r XGROUP CREATE mystream group2 3-0 + + # Entry 1-0 should be deletable (1-0 <= min_cgroup_last_id and not in any PEL) + assert_equal {1} [r XDELEX mystream ACKED IDS 1 1-0] + + # Entry 2-0 should be referenced (2-0 > 1-0, not yet consumed by all consume groups) + assert_equal {2} [r XDELEX mystream ACKED IDS 1 2-0] + + # Destroy group1 + # min_cgroup_last_id is 3-0 now + r XGROUP DESTROY mystream group1 + + # Entry 2-0 should now be deletable (2-0 < 3-0 and not in any PEL) + assert_equal {1} [r XDELEX mystream ACKED IDS 1 2-0] + } + test {RENAME can unblock XREADGROUP with data} { r del mystream{t} r XGROUP CREATE mystream{t} mygroup $ MKSTREAM diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 9d888589d6b..1aeeca81d63 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -884,6 +884,61 @@ start_server { assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 1] == 0} assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 2] == 2} } + + test {XTRIM with approx and ACKED deletes entries correctly} { + # This test verifies that when using approx trim (~) with ACKED strategy, + # if the first node cannot be removed (has unacked messages), we should + # continue to check subsequent nodes that might be eligible for removal. + r DEL mystream + set origin_max_entries [config_get_set stream-node-max-entries 2] + + # Create 5 entries in 3 nodes (2 entries per node) + r XADD mystream 1-0 f v + r XADD mystream 2-0 f v + r XADD mystream 3-0 f v + r XADD mystream 4-0 f v + r XADD mystream 5-0 f v + + # Create a consumer group and read all messages + r XGROUP CREATE mystream mygroup 0 + r XREADGROUP GROUP mygroup consumer1 STREAMS mystream > + + # Acknowledge messages: 1-0, 2-0 (first node), and 4-0 (second node) + r XACK mystream mygroup 1-0 2-0 4-0 + + # XTRIM MINID ~ 6-0 ACKED should remove: + # Total 3 entries removed (1-0, 2-0, 4-0), 2 unacked entries remain (3-0, 5-0) + assert_equal 3 [r XTRIM mystream MINID ~ 6-0 ACKED] + assert_equal 2 [r XLEN mystream] + assert_equal {{3-0 {f v}} {5-0 {f v}}} [r XRANGE mystream - +] + + r config set stream-node-max-entries $origin_max_entries + } + + test {XTRIM with approx and DELREF deletes entries correctly} { + # Similar test but with DELREF strategy + r DEL mystream + set origin_max_entries [config_get_set stream-node-max-entries 2] + + # Create 4 entries in 2 nodes + r XADD mystream 1-0 f v + r XADD mystream 2-0 f v + r XADD mystream 3-0 f v + r XADD mystream 4-0 f v + + # Create a consumer group and read all messages + r XGROUP CREATE mystream mygroup 0 + r XREADGROUP GROUP mygroup consumer1 STREAMS mystream > + + # With XTRIM MINID ~ 5-0 DELREF, all eligible nodes should be trimmed + # and PEL entries should be cleaned up + assert_equal 4 [r XTRIM mystream MINID ~ 5-0 DELREF] + assert_equal 0 [r XLEN mystream] + # PEL should be empty after DELREF + assert_equal {0 {} {} {}} [r XPENDING mystream mygroup] + + r config set stream-node-max-entries $origin_max_entries + } } start_server {tags {"stream needs:debug"} overrides {appendonly yes}} {