Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modules/redisearch/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@ MODULE_VERSION = v8.2.13
MODULE_REPO = https://github.com/redisearch/redisearch
TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/search-community/redisearch.so

# Set INLINE_LSE_ATOMICS=1 for perf improvement on common ARM CPUs (i.e. Graviton2/3/4); no effect on x86 or macOS.
# Default 0 keeps the binary runnable on pre-Armv8.1-a cores (Cortex-A72, Graviton1, RPi4) that would otherwise SIGILL at module load.
INLINE_LSE_ATOMICS ?= 0
export INLINE_LSE_ATOMICS

include ../common.mk

32 changes: 32 additions & 0 deletions modules/vector-sets/tests/vsim_limit_efsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from test import TestCase, generate_random_vector
import struct

class VSIMLimitEFSearch(TestCase):
def getname(self):
return "VSIM Limit EF Search"

def estimated_runtime(self):
return 0.2

def test(self):
dim = 32
vec = generate_random_vector(dim)
vec_bytes = struct.pack(f'{dim}f', *vec)

# Add test vector
self.redis.execute_command('VADD', self.test_key, 'FP32', vec_bytes, f'{self.test_key}:item:1')

query_vec = generate_random_vector(dim)

# Test EF upper bound (should accept 1000000)
result = self.redis.execute_command('VSIM', self.test_key, 'VALUES', dim,
*[str(x) for x in query_vec], 'EF', 1000000)
assert isinstance(result, list), "EF=1000000 should be accepted"

# Test EF over limit (should reject > 1000000)
try:
self.redis.execute_command('VSIM', self.test_key, 'VALUES', dim,
*[str(x) for x in query_vec], 'EF', 1000001)
assert False, "EF=1000001 should be rejected"
except Exception as e:
assert "invalid EF" in str(e), f"Expected EF validation error, got: {e}"
6 changes: 4 additions & 2 deletions modules/vector-sets/vset.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,10 +830,12 @@ void VSIM_execute(RedisModuleCtx *ctx, struct vsetObject *vset,
if (ef == 0) ef = VSET_DEFAULT_SEARCH_EF;
if (count > ef) ef = count;

int slot = hnsw_acquire_read_slot(vset->hnsw);
if (ef > vset->hnsw->node_count) ef = vset->hnsw->node_count;

/* Perform search */
hnswNode **neighbors = RedisModule_Alloc(sizeof(hnswNode*)*ef);
float *distances = RedisModule_Alloc(sizeof(float)*ef);
int slot = hnsw_acquire_read_slot(vset->hnsw);
unsigned int found;
if (ground_truth) {
found = hnsw_ground_truth_with_filter(vset->hnsw, vec, ef, neighbors,
Expand Down Expand Up @@ -1085,7 +1087,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
j += 2;
} else if (!strcasecmp(opt, "EF") && j+1 < argc) {
if (RedisModule_StringToLongLong(argv[j+1], &ef) !=
REDISMODULE_OK || ef <= 0)
REDISMODULE_OK || ef <= 0 || ef > 1000000)
{
RedisModule_Free(vec);
return RedisModule_ReplyWithError(ctx, "ERR invalid EF");
Expand Down
23 changes: 12 additions & 11 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ void processUnblockedClients(void) {
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;

/* Reset the client for a new query, unless the client has pending command to process. */
if (!(c->flags & CLIENT_PENDING_COMMAND)) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
* which calls reqresAppendResponse) */
reqresAppendResponse(c);
resetClient(c);
}

if (c->flags & CLIENT_MODULE) {
if (!(c->flags & CLIENT_BLOCKED)) {
moduleCallCommandUnblockedHandler(c);
Expand Down Expand Up @@ -191,17 +202,6 @@ void unblockClient(client *c, int queue_for_reprocessing) {
serverPanic("Unknown btype in unblockClient().");
}

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
* which calls reqresAppendResponse) */
reqresAppendResponse(c);
resetClient(c);
}

/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
Expand Down Expand Up @@ -266,6 +266,7 @@ void replyToClientsBlockedOnShutdown(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
c->duration = 0;
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
unblockClient(c, 1);
}
Expand Down
16 changes: 10 additions & 6 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ int clusterLoadConfig(char *filename) {
* of the aux fields is insignificant. */
int aux_tcp_port = 0;
int aux_tls_port = 0;
int aux_shard_id = 0;
for (int i = 2; i < aux_argc; i++) {
int field_argc;
sds *field_argv;
Expand Down Expand Up @@ -454,6 +455,7 @@ int clusterLoadConfig(char *filename) {
continue;
}
field_found = 1;
aux_shard_id |= j == af_shard_id;
aux_tcp_port |= j == af_tcp_port;
aux_tls_port |= j == af_tls_port;
if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) {
Expand Down Expand Up @@ -560,12 +562,14 @@ int clusterLoadConfig(char *filename) {
* by an older version of Redis;
* ignore replica's shard_id in the file, only use the primary's.
* If replica precedes primary in file, it will be corrected
* later by the auxShardIdSetter */
* later by the auxShardIdSetter.
* Remove node from its old shard before adding it to the new one. */
if (aux_shard_id == 1) clusterRemoveNodeFromShard(n);
memcpy(n->shard_id, master->shard_id, CLUSTER_NAMELEN);
clusterAddNodeToShard(master->shard_id, n);
n->slaveof = master;
clusterNodeAddSlave(master,n);
} else if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) {
} else if (aux_shard_id == 0) {
/* n is a primary but it does not have a persisted shard_id.
* This happens if we are loading a nodes.conf generated by
* an older version of Redis. We should manually update the
Expand Down Expand Up @@ -1852,8 +1856,8 @@ void clusterBlacklistAddNode(clusterNode *node) {
/* Return non-zero if the specified node ID exists in the blacklist.
* You don't need to pass an sds string here, any pointer to 40 bytes
* will work. */
int clusterBlacklistExists(char *nodeid) {
sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
int clusterBlacklistExists(char *nodeid, size_t len) {
sds id = sdsnewlen(nodeid,len);
int retval;

clusterBlacklistCleanup();
Expand Down Expand Up @@ -2208,7 +2212,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
* joining another cluster. */
if (sender &&
!(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
!clusterBlacklistExists(g->nodename, CLUSTER_NAMELEN))
{
clusterNode *node;
node = createClusterNode(g->nodename, flags);
Expand Down Expand Up @@ -6235,7 +6239,7 @@ int clusterCommandSpecial(client *c) {
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
if (clusterBlacklistExists((char*)c->argv[2]->ptr))
if (clusterBlacklistExists((char*)c->argv[2]->ptr, sdslen(c->argv[2]->ptr)))
/* Already forgotten. The deletion may have been gossipped by
* another node, so we pretend it succeeded. */
addReply(c,shared.ok);
Expand Down
32 changes: 21 additions & 11 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2405,13 +2405,7 @@ static int isValidAnnouncedNodename(char *val,const char **err) {
return 1;
}

static int isValidAnnouncedHostname(char *val, const char **err) {
if (strlen(val) >= NET_HOST_STR_LEN) {
*err = "Hostnames must be less than "
STRINGIFY(NET_HOST_STR_LEN) " characters";
return 0;
}

static int isValidHostnameChars(char *val, const char **err) {
int i = 0;
char c;
while ((c = val[i])) {
Expand All @@ -2429,6 +2423,15 @@ static int isValidAnnouncedHostname(char *val, const char **err) {
return 1;
}

static int isValidAnnouncedHostname(char *val, const char **err) {
if (strlen(val) >= NET_HOST_STR_LEN) {
*err = "Hostnames must be less than "
STRINGIFY(NET_HOST_STR_LEN) " characters";
return 0;
}
return isValidHostnameChars(val, err);
}

/* Validation function for cluster-announce-ip.
* Ensures the IP address is valid and rejects control characters. */
static int isValidClusterAnnounceIp(char *val, const char **err) {
Expand All @@ -2438,12 +2441,19 @@ static int isValidClusterAnnounceIp(char *val, const char **err) {
return 1;
}

if (inet_pton(AF_INET, val, buf) != 1 &&
inet_pton(AF_INET6, val, buf) != 1) {
*err = "Cluster announce IP must be a valid IPv4 or IPv6 address";
/* Accept valid IPv4 or IPv6 */
if (inet_pton(AF_INET, val, buf) == 1 || inet_pton(AF_INET6, val, buf) == 1) {
return 1;
}
/* Also accept valid hostnames, but limited to NET_IP_STR_LEN since
* cluster_announce_ip is stored in a NET_IP_STR_LEN buffer */
if (strlen(val) >= NET_IP_STR_LEN) {
*err = "Hostnames for cluster-announce-ip must be less than "
STRINGIFY(NET_IP_STR_LEN) " characters";
return 0;
}
return 1;
/* Also accept valid hostnames */
return isValidHostnameChars(val, err);
}

/* Validate specified string is a valid proc-title-template */
Expand Down
33 changes: 18 additions & 15 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,9 @@ kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH &&
server.executing_client->cmd->proc != touchCommand)
if (((flags & LOOKUP_NOTOUCH) == 0) &&
(server.current_client && server.current_client->flags & CLIENT_NO_TOUCH) &&
(server.executing_client && server.executing_client->cmd->proc != touchCommand))
flags |= LOOKUP_NOTOUCH;
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
Expand Down Expand Up @@ -1333,9 +1334,22 @@ void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
/* o and typename can not have values at the same time. */
serverAssert(!((data->type != LLONG_MAX) && o));

kvobj *kv = NULL;
if (!o) { /* If scanning keyspace */
kvobj *kv = dictGetKV(de);

kv = dictGetKV(de);
keyStr = kvobjGetKey(kv);
} else {
keyStr = dictGetKey(de);
}

/* Filter element if it does not match the pattern. */
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), keyStr, data->strlen(keyStr), 0)) {
return;
}
}

if (!o) {
/* Expiration check first - only for database keyspace scanning.
* Use kv obj to avoid robj creation. */
if (expireIfNeeded(data->db, NULL, kv, 0) != KEY_VALID)
Expand All @@ -1350,17 +1364,6 @@ void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
if (!objectTypeCompare(kv, data->type))
return;
}

keyStr = kvobjGetKey(kv);
} else {
keyStr = dictGetKey(de);
}

/* Filter element if it does not match the pattern. */
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), keyStr, data->strlen(keyStr), 0)) {
return;
}
}

if (o == NULL) {
Expand Down
10 changes: 6 additions & 4 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,14 @@ struct dict {

long rehashidx; /* rehashing not in progress if rehashidx == -1 */

/* Keep small vars at end for optimal (minimal) struct padding */
unsigned pauserehash : 15; /* If >0 rehashing is paused */
/* Note: pauserehash is a full unsigned so iterator increments
* don't perform RMW on the same storage unit as other bitfields. */
unsigned pauserehash; /* If >0 rehashing is paused */

unsigned useStoredKeyApi : 1; /* See comment of storedHashFunction above */
/* Keep small vars at end for optimal (minimal) struct padding */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
int16_t pauseAutoResize; /* If >0 automatic resizing is disallowed (<0 indicates coding error) */
signed pauseAutoResize: 15; /* If >0 automatic resizing is disallowed (<0 indicates coding error) */
unsigned useStoredKeyApi: 1; /* See comment of storedHashFunction above */
void *metadata[];
};

Expand Down
5 changes: 3 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -9216,7 +9216,7 @@ size_t RM_GetClusterSize(void) {
int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) {
UNUSED(ctx);

clusterNode *node = clusterLookupNode(id, strlen(id));
clusterNode *node = clusterLookupNode(id, CLUSTER_NAMELEN);
if (node == NULL || clusterNodePending(node))
{
return REDISMODULE_ERR;
Expand Down Expand Up @@ -12968,7 +12968,8 @@ int setModuleStringConfig(ModuleConfig *config, sds strval, const char **err) {

int setModuleEnumConfig(ModuleConfig *config, int val, const char **err) {
RedisModuleString *error = NULL;
int return_code = config->set_fn.set_enum(config->name, val, config->privdata, &error);
char *rname = getRegisteredConfigName(config);
int return_code = config->set_fn.set_enum(rname, val, config->privdata, &error);
propagateErrorString(error, err);
return return_code == REDISMODULE_OK ? 1 : 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2908,7 +2908,7 @@ int processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
if (c->flags & CLIENT_BLOCKED || c->flags & CLIENT_UNBLOCKED) break;

/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
Expand Down
22 changes: 17 additions & 5 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,9 @@ typedef struct {
int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
int delete_strategy; /* DELETE_STRATEGY_* */
int approx_trim; /* If 1 only delete whole radix tree nodes, so
* the trim argument is not applied verbatim. */
* the trim argument is not applied verbatim.
* Note: This flag is ignored when delete_strategy is non-KEEPREF.
* Individual entries may still be processed for consumer groups. */
long long limit; /* Maximum amount of entries to trim. If 0, no limitation
* on the amount of trimming work is enforced. */
/* TRIM_STRATEGY_MAXLEN options */
Expand Down Expand Up @@ -781,8 +783,11 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}

/* If we cannot remove a whole element, and approx is true,
* stop here. */
if (approx) break;
* stop here. However, for non-KEEPREF strategies, if the node was
* eligible for removal but we couldn't remove it (because we need
* to check consumer group references), we should continue to process
* entries within this node. */
if (approx && delete_strategy == DELETE_STRATEGY_KEEPREF) break;

/* Now we have to trim entries from within 'lp' */
int64_t deleted_from_lp = 0;
Expand Down Expand Up @@ -853,12 +858,12 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {

if (can_delete) {
/* Mark the entry as deleted. */
intptr_t delta = p - lp;
intptr_t delta = p ? (p - lp) : 0; /* p may be NULL if this was the last entry */
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp, &pcopy, flags);
deleted_from_lp++;
s->length--;
p = lp + delta;
if (p) p = lp + delta;
}
}
}
Expand Down Expand Up @@ -2786,6 +2791,7 @@ static void streamFreeCG(streamCG *cg) {

/* Destroy a consumer group and clean up all associated references. */
void streamDestroyCG(stream *s, streamCG *cg) {
/* Remove all references from the cgroups_ref. */
raxIterator it;
raxStart(&it, cg->pel);
raxSeek(&it, "^", NULL, 0);
Expand All @@ -2795,6 +2801,12 @@ void streamDestroyCG(stream *s, streamCG *cg) {
}
raxStop(&it);

/* If we're destroying the group with the minimum last_id, the cached
* minimum is no longer valid and needs to be recalculated from the
* remaining groups. */
if (s->min_cgroup_last_id_valid && streamCompareID(&s->min_cgroup_last_id, &cg->last_id) == 0)
s->min_cgroup_last_id_valid = 0;

streamFreeCG(cg);
}

Expand Down
4 changes: 4 additions & 0 deletions tests/cluster/tests/00-base.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,7 @@ test "CLUSTER SLAVES and CLUSTER REPLICAS with zero replicas" {
assert_equal {} [R 0 cluster slaves [R 0 CLUSTER MYID]]
assert_equal {} [R 0 cluster replicas [R 0 CLUSTER MYID]]
}

test "CLUSTER FORGET with invalid node ID" {
assert_error {*ERR Unknown node*} {R 0 cluster forget 1}
}
Loading
Loading