From e708d63058fa26bb9b142e3984d6d3fef5e7691c Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Wed, 31 Dec 2025 09:10:25 +0200 Subject: [PATCH 01/14] avoid memory leak of new argv when hexpire commands target only non-exiting fields (#2973) When HEXPIRE commands are set with a time-in-the-past they are all deleting the specified fields. In such cases we allocate a temporal new argv in order to replicate `HDEL`. However in case no mutation was done (ie all fields do not exist) we do not deallocate the temporal new_argv and there is a memory leak. example: ``` HSET myhash field1 value1 1 HEXPIRE myhash 0 FIELDS 1 field2 -2 ``` --------- Signed-off-by: Ran Shidlansik --- src/t_hash.c | 17 ++++++++--------- tests/unit/hashexpire.tcl | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index f5200ea342f..e97ccffbc7f 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -1663,19 +1663,18 @@ void hexpireGenericCommand(client *c, long long basetime, int unit) { /* From this point we would return array reply */ addReplyArrayLen(c, num_fields); - /* In case we are expiring all the elements prepare a new argv since we are going to delete all the expired fields. */ - if (set_expired) { - new_argv = zmalloc(sizeof(robj *) * (num_fields + 3)); - new_argv[new_argc++] = shared.hdel; - incrRefCount(shared.hdel); - new_argv[new_argc++] = c->argv[1]; - incrRefCount(c->argv[1]); - } - for (i = 0; i < num_fields; i++) { expiryModificationResult result = EXPIRATION_MODIFICATION_NOT_EXIST; if (set_expired) { if (obj && hashTypeDelete(obj, c->argv[fields_index + i]->ptr)) { + /* In case we are expiring all the elements prepare a new argv since we are going to delete all the expired fields. */ + if (new_argv == NULL) { + new_argv = zmalloc(sizeof(robj *) * (num_fields + 3)); + new_argv[new_argc++] = shared.hdel; + incrRefCount(shared.hdel); + new_argv[new_argc++] = c->argv[1]; + incrRefCount(c->argv[1]); + } /* In case we deleted the field, add it to the new hdel command vector. */ new_argv[new_argc++] = c->argv[fields_index + i]; incrRefCount(c->argv[fields_index + i]); diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index fd472d54f85..6bdb332033f 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -792,6 +792,21 @@ start_server {tags {"hashexpire"}} { assert_equal -2 [r HTTL myhash FIELDS 1 f2] } + # HEXPIRE on a non-existent field + test {HEXPIRE on a non-existent field (should not issue notifications)} { + r FLUSHALL + r HSET myhash f1 v1 + set rd [setup_single_keyspace_notification r] + + r HEXPIRE myhash 1000 FIELDS 1 f2 + r HEXPIRE myhash 0 FIELDS 1 f2 + # Verify no notification (getting hset and not hexpire) + r HSET dummy dummy dummy + assert_keyevent_patterns $rd dummy hset + assert_equal 0 [get_keys_with_volatile_items r] + $rd close + } + # Error Cases test {HEXPIRE - conflicting conditions error} { r FLUSHALL From 402c28b5f353a3c89c1d4749b17eed72cf6228d4 Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Mon, 29 Dec 2025 15:30:59 +0200 Subject: [PATCH 02/14] fix hincrby* update volatile key tracking (#2974) Following Hash-Field-Expiration feature, a hash object can hold volatile fields. volatile fields which are already expired are deleted and reclaimed ONLY by the active-expiration background job. This means that hash object can contain items which have not yet expired. In case mutations are requesting to set a value on these "already-expired" fields, they will be overwritten with the new value. In such cases, though, it is requiered to update the global per-db tracking map by removing the key if it has no more volatile fields. This was implemented in all mutation cases of the hash commands but the `INCRBY` and `INCRBYFLOAT`. This can lead to a dangling object which has no volatile items, which might lead to assertion during the active-expiration job: example reproduction: ``` DEBUG SET-ACTIVE-EXPIRE 0 hset myhash f1 10 hexpire myhash 1 FIELDS 1 f1 sleep(10) hincrby myhash f1 1 DEBUG SET-ACTIVE-EXPIRE 1 ``` NOTE: we actually had tests for this scenario, only the test did not include explicit assertion in case the item is still tracked after the mutation. Signed-off-by: Ran Shidlansik --- src/t_hash.c | 8 ++++++++ tests/unit/hashexpire.tcl | 6 ++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index e97ccffbc7f..20bc6cedd39 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -907,7 +907,11 @@ void hincrbyCommand(client *c) { } value += incr; new = sdsfromlonglong(value); + bool has_volatile_fields = hashTypeHasVolatileFields(o); hashTypeSet(o, c->argv[2]->ptr, new, expiry, HASH_SET_TAKE_VALUE); + if (has_volatile_fields != hashTypeHasVolatileFields(o)) { + dbUpdateObjectWithVolatileItemsTracking(c->db, o); + } signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH, "hincrby", c->argv[1], c->db->id); server.dirty++; @@ -952,7 +956,11 @@ void hincrbyfloatCommand(client *c) { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf, sizeof(buf), value, LD_STR_HUMAN); new = sdsnewlen(buf, len); + bool has_volatile_fields = hashTypeHasVolatileFields(o); hashTypeSet(o, c->argv[2]->ptr, new, expiry, HASH_SET_TAKE_VALUE); + if (has_volatile_fields != hashTypeHasVolatileFields(o)) { + dbUpdateObjectWithVolatileItemsTracking(c->db, o); + } signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH, "hincrbyfloat", c->argv[1], c->db->id); server.dirty++; diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index 6bdb332033f..728732c8821 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -1647,11 +1647,12 @@ start_server {tags {"hashexpire"}} { # Sanity check: check we only have one field in the hash assert_equal 1 [r HLEN myhash] - # TTL should now be gone; field becomes persistent + # TTL should now be gone; field becomes persistent; key should not be tracked set ttl [r HPTTL myhash FIELDS 1 field1] assert_equal -1 $ttl assert_equal 1 [r HGET myhash field1] assert_equal 1 [r HLEN myhash] + assert_equal 0 [get_keys_with_volatile_items r] # set expiration on the field assert_equal 1 [r HEXPIRE myhash 100000000 FIELDS 1 field1] @@ -1689,11 +1690,12 @@ start_server {tags {"hashexpire"}} { # Sanity check: check we only have one field in the hash assert_equal 1 [r HLEN myhash] - # TTL should now be gone; field becomes persistent + # TTL should now be gone; field becomes persistent; key should not be tracked set ttl [r HPTTL myhash FIELDS 1 field1] assert_equal -1 $ttl assert_equal 1 [r HGET myhash field1] assert_equal 1 [r HLEN myhash] + assert_equal 0 [get_keys_with_volatile_items r] # set expiration on the field assert_equal 1 [r HEXPIRE myhash 100000000 FIELDS 1 field1] From 54c9c42ed46511291bd953cc16d4ddb48aac70c0 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Sun, 4 Jan 2026 05:33:08 -0800 Subject: [PATCH 03/14] Fix zero length hash creation with HSETEX (#2998) If you send an `HSETEX key EXAT 0 FIELDS 1 foo bar`, it will create an empty length string. This is not good behavior as other places in the code will crash if an empty hash is accessed. This change just makes it so that the key is freed if it ends up being empty. This has some odd behavior w.r.t. to keyspace notifications though. Today, if you do `HSETEX key EXAT 0 FIELDS 1 foo bar` with a hash that exists, it won't send an `HSET` notification or an `HEXPIRE` notification unless foo is already present. It will also send a `DEL` notification if foo is last the field, effectively deleting the key. This isn't consistent with Redis, which will still send an `HSET` and `HDEL` (*NOTE* not `hexpire`) notification if a key is added past the expiration. Likewise, in the case where the key doesn't exist and `HSETEX key EXAT 0 FIELDS 1 foo bar` is sent, Redis will send an HSET, HDEL, and DEL notification in that order. I think it makes sense to keep consistency with what we have today (given that it's been out for awhile), but document the behavior. Basically, if you send a command with an expiration in the past, we will basically ignore the input unless it's "deleting" a key. Signed-off-by: Madelyn Olson --- src/t_hash.c | 12 +++++++----- tests/unit/hashexpire.tcl | 3 +++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index 20bc6cedd39..185cb2c4d6a 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -1313,11 +1313,6 @@ void hsetexCommand(client *c) { } } signalModifiedKey(c, c->db, c->argv[1]); - /* Delete the object in case it was left empty */ - if (hashTypeLength(o) == 0) { - dbDelete(c->db, c->argv[1]); - notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); - } server.dirty += changes; } else { /* If no changes were done we still need to free the new argv array and the refcount of the first argument. */ @@ -1325,6 +1320,13 @@ void hsetexCommand(client *c) { decrRefCount(c->argv[1]); if (new_argv) zfree(new_argv); } + + /* Delete the object in case it was left empty or created with all expired items. */ + if (hashTypeLength(o) == 0) { + dbDelete(c->db, c->argv[1]); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + } + addReplyLongLong(c, changes == num_fields ? 1 : 0); } diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index 728732c8821..bfbad593be4 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -607,6 +607,9 @@ start_server {tags {"hashexpire"}} { r HSETEX myhash PX 0 FIELDS 1 field1 val1 after 10 assert_equal 0 [r HEXISTS myhash field1] + # The hash should also not exist + assert_equal 0 [r EXISTS myhash] + assert_equal 0 [r HLEN myhash] } test {HSETEX PX - test mix of expiring and persistent fields} { From 6df3787c335243bd8eea93e257f061f098d6be9d Mon Sep 17 00:00:00 2001 From: Sourav Singh Rawat Date: Sat, 3 Jan 2026 23:48:39 +0530 Subject: [PATCH 04/14] fix(HSETEX): replace strcmp with strcasecmp (#3000) HSETEX right now uses `strcmp` which does not account for case-insensitivity replaced it with `strcasecmp`. Fixes #2996 --------- Signed-off-by: frostzt Signed-off-by: Sourav Singh Rawat Co-authored-by: Ran Shidlansik --- src/t_hash.c | 51 ++++++++++++++++++++++++++++----------- tests/unit/hashexpire.tcl | 21 ++++++++++++++++ 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index 185cb2c4d6a..a8391de4240 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -1198,6 +1198,7 @@ void hsetexCommand(client *c) { int changes = 0; robj **new_argv = NULL; int new_argc = 0; + int need_rewrite_argv = 0; for (; fields_index < c->argc - 1; fields_index++) { if (!strcasecmp(c->argv[fields_index]->ptr, "fields")) { @@ -1258,15 +1259,41 @@ void hsetexCommand(client *c) { dbAdd(c->db, c->argv[1], &o); } + if (flags & (ARGS_SET_FNX | ARGS_SET_FXX | ARGS_EX | ARGS_PX | ARGS_EXAT)) { + need_rewrite_argv = 1; + } + bool has_volatile_fields = hashTypeHasVolatileFields(o); - /* In case we are expiring all the elements prepare a new argv since we are going to delete all the expired fields. */ + /* Prepare a new argv when rewriting the command. If set_expired is true, + * all expired fields will be deleted. Otherwise, if rewriting is needed due to FNX/FXX flags, + * copy the command, key, and optional arguments, skipping the FNX/FXX flags. */ if (set_expired) { new_argv = zmalloc(sizeof(robj *) * (num_fields + 2)); new_argv[new_argc++] = shared.hdel; incrRefCount(shared.hdel); new_argv[new_argc++] = c->argv[1]; incrRefCount(c->argv[1]); + } else if (need_rewrite_argv) { + /* We use new_argv for rewrite */ + new_argv = zmalloc(sizeof(robj *) * c->argc); + // Copy optional args (skip FNX/FXX) + for (int i = 0; i < fields_index; i++) { + if (strcasecmp(c->argv[i]->ptr, "FNX") && + strcasecmp(c->argv[i]->ptr, "FXX")) { + /* Propagate as HSETEX Key Value PXAT millisecond-timestamp if there is + * EX/PX/EXAT flag. */ + if (expire && !(flags & ARGS_PXAT) && c->argv[i + 1] == expire) { + robj *milliseconds_obj = createStringObjectFromLongLong(when); + new_argv[new_argc++] = shared.pxat; + new_argv[new_argc++] = milliseconds_obj; + i++; // skip the original expire argument + } else { + new_argv[new_argc++] = c->argv[i]; + incrRefCount(c->argv[i]); + } + } + } } for (i = fields_index; i < c->argc; i += 2) { @@ -1281,6 +1308,12 @@ void hsetexCommand(client *c) { } else { hashTypeSet(o, c->argv[i]->ptr, c->argv[i + 1]->ptr, when, set_flags); changes++; + if (need_rewrite_argv) { + new_argv[new_argc++] = c->argv[i]; + incrRefCount(c->argv[i]); + new_argv[new_argc++] = c->argv[i + 1]; + incrRefCount(c->argv[i + 1]); + } } } @@ -1295,20 +1328,10 @@ void hsetexCommand(client *c) { notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); } else { notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); + if (need_rewrite_argv) { + replaceClientCommandVector(c, new_argc, new_argv); + } if (expire) { - /* Propagate as HSETEX Key Value PXAT millisecond-timestamp if there is - * EX/PX/EXAT flag. */ - if (!(flags & ARGS_PXAT)) { - for (int i = 2; i < fields_index; i++) { - if (c->argv[i + 1] == expire) { - robj *milliseconds_obj = createStringObjectFromLongLong(when); - rewriteClientCommandArgument(c, i, shared.pxat); - rewriteClientCommandArgument(c, i + 1, milliseconds_obj); - decrRefCount(milliseconds_obj); - break; - } - } - } notifyKeyspaceEvent(NOTIFY_HASH, "hexpire", c->argv[1], c->db->id); } } diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index bfbad593be4..499fe43f15d 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -693,6 +693,26 @@ start_server {tags {"hashexpire"}} { assert_equal 0 [r EXISTS myhash] } + test {HSETEX is not replicating validation arguments} { + r flushall + set repl [attach_to_replication_stream] + set exp [get_longer_then_long_expire_value PXAT] + + r HSETEX myhash FNX PXAT $exp FIELDS 1 f2 v2 + r HSETEX myhash FXX PXAT $exp FIELDS 1 f2 v2 + r HSETEX myhash2 fnx PXAT $exp FIELDS 1 f2 v2 + r HSETEX myhash2 fxx PXAT $exp FIELDS 1 f2 v2 + + assert_replication_stream $repl [subst { + {select *} + {hsetex myhash PXAT $exp FIELDS 1 f2 v2} + {hsetex myhash PXAT $exp FIELDS 1 f2 v2} + {hsetex myhash2 PXAT $exp FIELDS 1 f2 v2} + {hsetex myhash2 PXAT $exp FIELDS 1 f2 v2} + }] + close_replication_stream $repl + } + ###### Test EXPIRE ############# @@ -2202,6 +2222,7 @@ start_server {tags {"hashexpire external:skip"}} { fail "hash object was not deleted on replica after timeout" } } + test {HEXPIREAT with expired time is propagated to the replica} { $primary flushall From 7b0f3635054803ada931147d755dcba6f165d94f Mon Sep 17 00:00:00 2001 From: Sourav Singh Rawat Date: Sun, 4 Jan 2026 23:53:11 +0530 Subject: [PATCH 05/14] fix(hash): Untrack hash with volatile fields when it is overwritten (#3003) In `dbSetValue` if the robj is a hash with volatile fields we need to clean it up. It causes two core problems: 1. Type Confusion: Allows converting from hash to string with `keys_with_volatile_items` tracked 2. Tracked Items with dangling pointer: Converting from `hash` to ie. a string breaks this as `keys_with_volatile_items` is not cleaned up. Check #3000 --------- Signed-off-by: frostzt Signed-off-by: Ran Shidlansik --- src/db.c | 8 ++++++++ tests/unit/hashexpire.tcl | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/src/db.c b/src/db.c index f398191d2d6..91664bd2673 100644 --- a/src/db.c +++ b/src/db.c @@ -382,6 +382,14 @@ static void dbSetValue(serverDb *db, robj *key, robj **valref, int overwrite, vo *expireref = new; } } + + /* If overwriting a hash object, un-track it from the volatile items tracking if it contains volatile items.*/ + if (old->type == OBJ_HASH && hashTypeHasVolatileFields(old)) { + dbUntrackKeyWithVolatileItems(db, old); + } + /* If the new object is a hash with volatile items we need to track it again */ + dbTrackKeyWithVolatileItems(db, new); + /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ if (tryOffloadFreeObjToIOThreads(old) == C_OK) { /* OK */ diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index 499fe43f15d..10de801ceb9 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -4397,3 +4397,38 @@ start_server {tags {"hashexpire external:skip"}} { } } } + +start_server {tags {"hash"}} { + test {Overwriting hash with volatile fields updates keys_with_volatile_items tracking} { + r FLUSHALL + r DEBUG SET-ACTIVE-EXPIRE 0 + + r HSETEX myhash EX 100 FIELDS 1 field1 value1 + + set info1 [r INFO keyspace] + assert_match {*keys_with_volatile_items=1*} $info1 + assert_equal 1 [r EXISTS myhash] + + r SET myhash "I'm a string now" + + set info2 [r INFO keyspace] + assert_match {*keys_with_volatile_items=0*} $info2 + assert_equal {string} [r TYPE myhash] + assert_equal "I'm a string now" [r GET myhash] + + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + + test {RESTORE REPLACE clears keys_with_volatile_items tracking} { + r FLUSHALL + r HSETEX myhash EX 100 FIELDS 1 f1 v1 + assert_match {*keys_with_volatile_items=1*} [r INFO keyspace] + + r SET tempkey "I'm a string" + set serialized [r DUMP tempkey] + + r RESTORE myhash 0 $serialized REPLACE + assert_match {*keys_with_volatile_items=0*} [r INFO keyspace] + assert_equal {string} [r TYPE myhash] + } +} From 48a45b833ac133a13a135ebddc58d116012dfa19 Mon Sep 17 00:00:00 2001 From: Daniil Kashapov Date: Mon, 5 Jan 2026 14:45:34 +0500 Subject: [PATCH 06/14] Untrack key based on old->hasembkey (#3007) In `dbSetValue()` the `old` pointer may be reassigned to point to the incoming value object which was created without an embedded key, so calling `dbUntrackKeyWithVolatileItems()` would call `objectGetKey()` which returns NULL, causing a crash in `hashtableSdsHash()` when trying to hash the NULL key. Idea is to assign `old_was_hash_with_volatile` before the swap and use `new` instead of `old` for untracking when theres no embedded key. Introduced in #3003 Run with NULL ptr dereference: https://github.com/valkey-io/valkey/actions/runs/20701343184/job/59424029880 --------- Signed-off-by: Daniil Kashapov Co-authored-by: Ran Shidlansik Signed-off-by: Ran Shidlansik --- src/db.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index 91664bd2673..b56a473d008 100644 --- a/src/db.c +++ b/src/db.c @@ -385,7 +385,10 @@ static void dbSetValue(serverDb *db, robj *key, robj **valref, int overwrite, vo /* If overwriting a hash object, un-track it from the volatile items tracking if it contains volatile items.*/ if (old->type == OBJ_HASH && hashTypeHasVolatileFields(old)) { - dbUntrackKeyWithVolatileItems(db, old); + /* Some commands create a new value (with NO key) and use setKey to change the value of an existing key. + * In this case the old can be replaced with the provided value and be left without a key + * however it is still a hashObject with optional volatile items and we need to untrack it. */ + dbUntrackKeyWithVolatileItems(db, old->hasembkey ? old : new); } /* If the new object is a hash with volatile items we need to track it again */ dbTrackKeyWithVolatileItems(db, new); From 4e986ee79f81429d87861e27aa194450a6ad5f0f Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Wed, 21 Jan 2026 13:34:03 +0200 Subject: [PATCH 07/14] Fix HRANDFIELD to return null response when no field could be found (#3022) if the hrandfield(e.g. hrandfield myhash) command without other args does not find a valid field, it will return an uninitialized lval. ``` debug set-active-expire no hsetex myhash ex 1 fields 2 f1 v1 f2 v2 after 1s... hrandfield myhash [will return some uninitialized number] ``` related: https://github.com/valkey-io/valkey/issues/3021 Signed-off-by: Ran Shidlansik --- src/t_hash.c | 26 +++++++++++++++----------- tests/unit/hashexpire.tcl | 13 +++++++++++++ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index a8391de4240..37930736173 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -837,8 +837,11 @@ void hashReplyFromListpackEntry(client *c, listpackEntry *e) { /* Return random element from a non empty hash. * 'field' and 'val' will be set to hold the element. * The memory in them is not to be freed or modified by the caller. - * 'val' can be NULL in which case it's not extracted. */ -static void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpackEntry *field, listpackEntry *val) { + * 'val' can be NULL in which case it's not extracted. + * Return C_ERR in case no random element was found (when all existing elements are expired). + * Return C_OK otherwise. */ +static int hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpackEntry *field, listpackEntry *val) { + int rc = C_OK; if (hashobj->encoding == OBJ_ENCODING_HASHTABLE) { void *e = NULL; int maxtries = 100; @@ -851,8 +854,7 @@ static void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpac } else if (maxtries == 0) { /* in case we will not be able to locate an entry which is not expired, we will just not return any * result. An alternative would have been that we end up returning an expired entry. */ - field->sval = NULL; - if (val) val->sval = NULL; + rc = C_ERR; break; } sds sds_field = entryGetField(e); @@ -872,6 +874,7 @@ static void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpac } else { serverPanic("Unknown hash encoding"); } + return rc; } @@ -1949,11 +1952,11 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { if (hash->encoding == OBJ_ENCODING_HASHTABLE) { while (count--) { listpackEntry field, value; - hashTypeRandomElement(hash, size, &field, &value); /* In case we were unable to locate random element, it is probably because there is no such element * since all elements are expired. */ - if (!field.sval) break; + if (hashTypeRandomElement(hash, size, &field, &value) != C_OK) + break; if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2); addWritePreparedReplyBulkCBuffer(wpc, field.sval, field.slen); @@ -2080,11 +2083,10 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { hashtable *ht = hashtableCreate(&setHashtableType); hashtableExpand(ht, count); while (added < count) { - hashTypeRandomElement(hash, size, &field, withvalues ? &value : NULL); - /* In case we were unable to locate random element, it is probably because there is no such element * since all elements are expired. */ - if (!field.sval) break; + if (hashTypeRandomElement(hash, size, &field, withvalues ? &value : NULL) != C_OK) + break; /* Try to add the object to the hashtable. If expired, stop adding (there are probably non left). * If it already exists free it, otherwise increment the number of objects we have @@ -2143,8 +2145,10 @@ void hrandfieldCommand(client *c) { if ((hash = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp])) == NULL || checkType(c, hash, OBJ_HASH)) { return; } - hashTypeRandomElement(hash, hashTypeLength(hash), &ele, NULL); - hashReplyFromListpackEntry(c, &ele); + if (hashTypeRandomElement(hash, hashTypeLength(hash), &ele, NULL) == C_OK) + hashReplyFromListpackEntry(c, &ele); + else + addReplyNull(c); } /* Context structure for tracking expiry operations on hash fields. */ diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index 10de801ceb9..f815a9d25fa 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -1417,6 +1417,19 @@ start_server {tags {"hashexpire"}} { } } + test "HRANDFIELD - returns null response when all fields are expired" { + r FLUSHALL + r DEBUG SET-ACTIVE-EXPIRE 0 + assert_equal {1} [r HSETEX myhash PX 1 fields 2 f1 v1 f2 v2] + wait_for_condition 100 100 { + [r HGETALL myhash] eq {} + } else { + fail "Hash is showing expired elements" + } + assert_equal {} [r hrandfield myhash2] + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + foreach cmd {RENAME RESTORE} { test "$cmd Preserves Field TTLs" { r FLUSHALL From 2be603480ec7db3b4b36858b1aa6b446ccf05539 Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Sun, 11 Jan 2026 07:49:44 +0200 Subject: [PATCH 08/14] Fix HEXPIRE should not delete items when GT rule is used and expiration is in the past (#3023) if an expired time is used, the condition is ignored, and it directly becomes the effect of the `hdel` command. This is mainly important for cases where the user would like to protect deleting the data in case the `HEXPIREAT` command is used and the user would like to protect delay execution of this command to delete fields. fixes: https://github.com/valkey-io/valkey/issues/3021 --------- Signed-off-by: Ran Shidlansik --- src/t_hash.c | 4 +++- tests/unit/hashexpire.tcl | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/t_hash.c b/src/t_hash.c index 37930736173..b39adf01108 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -1701,7 +1701,9 @@ void hexpireGenericCommand(client *c, long long basetime, int unit) { for (i = 0; i < num_fields; i++) { expiryModificationResult result = EXPIRATION_MODIFICATION_NOT_EXIST; - if (set_expired) { + /* If the flags included the GT flag, we cannot delete the entries since existing entries + * MUST have expiration time bigger than a past time. */ + if (set_expired && !(flag & EXPIRE_GT)) { if (obj && hashTypeDelete(obj, c->argv[fields_index + i]->ptr)) { /* In case we are expiring all the elements prepare a new argv since we are going to delete all the expired fields. */ if (new_argv == NULL) { diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index f815a9d25fa..e44c31deba6 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -782,6 +782,14 @@ start_server {tags {"hashexpire"}} { assert_equal {0} $res2 } + test {HEXPIRE GT - Do not expire items when expiration in the past} { + r FLUSHALL + r HSETEX myhash EX 600 FIELDS 1 field1 val1 + assert_equal {1} [r HLEN myhash] + assert_equal {0 -2} [r HEXPIRE myhash 0 GT FIELDS 2 field1 field2] + assert_equal {1} [r HLEN myhash] + } + # Conditionals: LT test {HEXPIRE LT - only set if new TTL < existing TTL} { r FLUSHALL From fa1c398646172a8e49c13d550620ca3059134e3f Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Wed, 14 Jan 2026 06:17:18 +0200 Subject: [PATCH 09/14] Fix HEXPIRE should not delete items when validation rules fail and expiration is in the past (#3048) https://github.com/valkey-io/valkey/pull/3023 was only partially solving the problem. We need to avoid expiring items in case of any validation rule failed. --------- Signed-off-by: Ran Shidlansik Co-authored-by: Binbin Signed-off-by: Ran Shidlansik --- src/t_hash.c | 56 ++++++++++++++++++++------------------- tests/unit/hashexpire.tcl | 5 ++++ 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index b39adf01108..be26f2ecf0a 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -32,6 +32,7 @@ * SPDX-License-Identifier: BSD-3-Clause */ +#include "expire.h" #include "hashtable.h" #include "rax.h" #include "sds.h" @@ -433,6 +434,7 @@ static expiryModificationResult hashTypeSetExpire(robj *o, sds field, long long /* If no object we will return -2 */ if (o == NULL) return EXPIRATION_MODIFICATION_NOT_EXIST; + bool time_is_expired = checkAlreadyExpired(expiry); if (o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *vstr; unsigned int vlen; @@ -444,9 +446,13 @@ static expiryModificationResult hashTypeSetExpire(robj *o, sds field, long long } /* When listpack representation is used, we consider it as infinite TTL, * so expire command with gt always fail the GT as well as existence(XX). + * Else, if the ttl is set in the past, just delete the entry (we know it exists) * Else, we already know we are going to set an expiration so we expend to hashtable encoding. */ if (flag & EXPIRE_XX || flag & EXPIRE_GT) { return EXPIRATION_MODIFICATION_FAILED_CONDITION; + } else if (time_is_expired) { + serverAssert(hashTypeDelete(o, field)); + return EXPIRATION_MODIFICATION_EXPIRE_ASAP; } else { hashTypeConvert(o, OBJ_ENCODING_HASHTABLE); } @@ -493,6 +499,12 @@ static expiryModificationResult hashTypeSetExpire(robj *o, sds field, long long } } } + /* In case we are set to expire the entry after we went through all the validations, + * we can just delete the entry. */ + if (time_is_expired) { + serverAssert(hashTypeDelete(o, field)); + return EXPIRATION_MODIFICATION_EXPIRE_ASAP; + } *entry_ref = entrySetExpiry(current_entry, expiry); hashTypeTrackUpdateEntry(o, current_entry, *entry_ref, current_expire, expiry); return EXPIRATION_MODIFICATION_SUCCESSFUL; @@ -1662,7 +1674,6 @@ void hexpireGenericCommand(client *c, long long basetime, int unit) { int fields_index = 3; long long num_fields = 0; int i, expired = 0, updated = 0; - int set_expired = 0; robj **new_argv = NULL; int new_argc = 0; @@ -1684,9 +1695,6 @@ void hexpireGenericCommand(client *c, long long basetime, int unit) { if (convertExpireArgumentToUnixTime(c, param, basetime, unit, &when) == C_ERR) return; - if (checkAlreadyExpired(when)) - set_expired = 1; - robj *obj = lookupKeyWrite(c->db, key); /* Non HASH type return simple error */ @@ -1700,30 +1708,24 @@ void hexpireGenericCommand(client *c, long long basetime, int unit) { addReplyArrayLen(c, num_fields); for (i = 0; i < num_fields; i++) { - expiryModificationResult result = EXPIRATION_MODIFICATION_NOT_EXIST; - /* If the flags included the GT flag, we cannot delete the entries since existing entries - * MUST have expiration time bigger than a past time. */ - if (set_expired && !(flag & EXPIRE_GT)) { - if (obj && hashTypeDelete(obj, c->argv[fields_index + i]->ptr)) { - /* In case we are expiring all the elements prepare a new argv since we are going to delete all the expired fields. */ - if (new_argv == NULL) { - new_argv = zmalloc(sizeof(robj *) * (num_fields + 3)); - new_argv[new_argc++] = shared.hdel; - incrRefCount(shared.hdel); - new_argv[new_argc++] = c->argv[1]; - incrRefCount(c->argv[1]); - } - /* In case we deleted the field, add it to the new hdel command vector. */ - new_argv[new_argc++] = c->argv[fields_index + i]; - incrRefCount(c->argv[fields_index + i]); - result = EXPIRATION_MODIFICATION_EXPIRE_ASAP; - /* we treat this case exactly as active expiration. */ - server.stat_expiredfields++; - expired++; + expiryModificationResult result = hashTypeSetExpire(obj, c->argv[fields_index + i]->ptr, when, flag); + if (result == EXPIRATION_MODIFICATION_SUCCESSFUL) + updated++; + else if (result == EXPIRATION_MODIFICATION_EXPIRE_ASAP) { + /* In case we are expiring all the elements prepare a new argv since we are going to delete all the expired fields. */ + if (new_argv == NULL) { + new_argv = zmalloc(sizeof(robj *) * (num_fields + 3)); + new_argv[new_argc++] = shared.hdel; + incrRefCount(shared.hdel); + new_argv[new_argc++] = c->argv[1]; + incrRefCount(c->argv[1]); } - } else { - result = hashTypeSetExpire(obj, c->argv[fields_index + i]->ptr, when, flag); - if (result == EXPIRATION_MODIFICATION_SUCCESSFUL) updated++; + /* In case we deleted the field, add it to the new hdel command vector. */ + new_argv[new_argc++] = c->argv[fields_index + i]; + incrRefCount(c->argv[fields_index + i]); + /* we treat this case exactly as active expiration. */ + server.stat_expiredfields++; + expired++; } addReplyLongLong(c, result); } diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index e44c31deba6..a7885b0c27d 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -788,6 +788,11 @@ start_server {tags {"hashexpire"}} { assert_equal {1} [r HLEN myhash] assert_equal {0 -2} [r HEXPIRE myhash 0 GT FIELDS 2 field1 field2] assert_equal {1} [r HLEN myhash] + assert_equal {0 -2} [r HEXPIRE myhash 0 NX FIELDS 2 field1 field2] + assert_equal {1} [r HLEN myhash] + r HSET myhash field1 val1 + assert_equal {0 -2} [r HEXPIRE myhash 0 XX FIELDS 2 field1 field2] + assert_equal {1} [r HLEN myhash] } # Conditionals: LT From 811bf28c950942aabbaf3bdccc7615d0046495c3 Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Mon, 26 Jan 2026 10:10:32 +0200 Subject: [PATCH 10/14] Fix how hash is handling overriding of expired fields overwrite (#3060) There are currently several issues with the existing hash field expiration mechanism: 1. `HINCRBY` is propagated to the replica "as-is". It mean it relies on the fact that the state of the hash is the same on the primary and the replica. HFE did change this assumption as the field might be expired only when the replica will handle the propagated `hincrby`. the problem is that the replica does not "expire" fields by it's own. it needs to respect the request from the primary and always try to use the existing field. This can lead to either miss-alignment with the value on the primary and the replica AND even a disconnection since the replica might hold and "expired" field which is not in "integer" format... 2. HINCRBYFLOAT is currently ALWAYS propagating `hset` - this means that the expiration time of an entry will always be removed on the replica side (it needs to propagate HSETEX when expiration time needs to be maintained) 3. Currently all hash write commands which are mutating values might overwrite an expired field. In such cases the existing implementation will "silently" do so. The problem is that the user will not get any key-space-notificaiton explaining the reason for the behavior. For example, when `hincrby` is issued overwriting an expired field which was not yet "cleaned" by active-expiration it will reset the counter to '0' before incrementing it. this means that the user might ask: why is the value '1' and not bigger, "I did not see any notification that the old value expired"... 4. HSETEX with KEEPTTL suffers from a "somewhat" similar problem as if the primary "replaced" the entry which is expired now but might not have been expired when the primary applied it. There are 2 options for a solution: 1. we could propagate `hdel` for every entry we are "overwritting" (batch them if we can) 2. propagate the commands "by effect". For example - have `hincrby` always propagate either HSET or HSETEX. This will not solve the '#(4)' problem above though, for which we might HAVE to propagate `hdel` I tend to go with the second option. The reason is that it is expected to have less impact on replication stream and should include less processing time on the replicas and network traffic. Specifically for HSETEX with KEEPTTL we will have to propagate the `hdel` in case we overwritten an expired field, but that would help limit the impact of this propagation. --------- Signed-off-by: Ran Shidlansik Co-authored-by: Sourav Singh Rawat Co-authored-by: Binbin Signed-off-by: Ran Shidlansik --- src/db.c | 8 +- src/module.c | 2 +- src/server.c | 1 + src/server.h | 5 +- src/t_hash.c | 153 +++++++++++++++++++++++++--- tests/unit/hashexpire.tcl | 207 +++++++++++++++++++++++++++++++++++++- 6 files changed, 354 insertions(+), 22 deletions(-) diff --git a/src/db.c b/src/db.c index b56a473d008..e412b3bd270 100644 --- a/src/db.c +++ b/src/db.c @@ -1988,12 +1988,15 @@ void propagateDeletion(serverDb *db, robj *key, int lazy, int slot) { * * This function builds and propagates a single HDEL command with multiple fields * for the given hash object `o`. It temporarily enables replication (if needed), - * constructs the command using the field names, and sends it via alsoPropagate(). */ -static void propagateFieldsDeletion(serverDb *db, robj *o, size_t n_fields, robj *fields[], int didx) { + * constructs the command using the field names, and sends it via alsoPropagate(). + * Returns how many fields where propagated */ +int propagateFieldsDeletion(serverDb *db, robj *o, size_t n_fields, robj *fields[], int didx) { int prev_replication_allowed = server.replication_allowed; server.replication_allowed = 1; robj *argv[EXPIRE_BULK_LIMIT + 2]; /* HDEL + key + fields */ + if (n_fields > EXPIRE_BULK_LIMIT) n_fields = EXPIRE_BULK_LIMIT; + int argc = 0; robj *keyobj = createStringObjectFromSds(objectGetKey(o)); argv[argc++] = shared.hdel; // HDEL command @@ -2008,6 +2011,7 @@ static void propagateFieldsDeletion(serverDb *db, robj *o, size_t n_fields, robj for (int i = 0; i < argc; i++) { decrRefCount(argv[i]); } + return n_fields; } /* Process expired fields for a hash delete them and propagate changes to replicas and AOF. diff --git a/src/module.c b/src/module.c index 8d1bad4fec4..4daa3a5864d 100644 --- a/src/module.c +++ b/src/module.c @@ -5396,7 +5396,7 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) { robj *argv[2] = {field, value}; hashTypeTryConversion(key->value, argv, 0, 1); - int updated = hashTypeSet(key->value, field->ptr, value->ptr, EXPIRY_NONE, low_flags); + int updated = hashTypeSet(key->value, field->ptr, value->ptr, EXPIRY_NONE, low_flags, NULL); count += (flags & VALKEYMODULE_HASH_COUNT_ALL) ? 1 : updated; /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(), diff --git a/src/server.c b/src/server.c index f78251a5efc..bec96243375 100644 --- a/src/server.c +++ b/src/server.c @@ -2141,6 +2141,7 @@ void createSharedObjects(void) { shared.multi = createSharedString("MULTI"); shared.exec = createSharedString("EXEC"); shared.hset = createSharedString("HSET"); + shared.hsetex = createSharedString("HSETEX"); shared.hdel = createSharedString("HDEL"); shared.hpexpireat = createSharedString("HPEXPIREAT"); shared.hpersist = createSharedString("HPERSIST"); diff --git a/src/server.h b/src/server.h index 25f01a31e44..51f06a64878 100644 --- a/src/server.h +++ b/src/server.h @@ -1408,7 +1408,7 @@ struct sharedObjectsStruct { *bgsaveerr_variants[2], *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, - *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *hdel, *hpexpireat, *hpersist, *srem, + *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *hsetex, *hdel, *hpexpireat, *hpersist, *srem, *xgroup, *xclaim, *script, *replconf, *eval, *cluster, *syncslots, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterisk, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk, *fields, @@ -3455,7 +3455,7 @@ sds hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what); sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what); robj *hashTypeLookupWriteOrCreate(client *c, robj *key); robj *hashTypeGetValueObject(robj *o, sds field); -int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags); +int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags, bool *expired_overwritten); robj *hashTypeDup(robj *o); bool hashTypeHasVolatileFields(robj *o); @@ -3573,6 +3573,7 @@ void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj); void deleteExpiredKeyAndPropagateWithDictIndex(serverDb *db, robj *keyobj, int dict_index); void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj); void propagateDeletion(serverDb *db, robj *key, int lazy, int slot); +int propagateFieldsDeletion(serverDb *db, robj *o, size_t n_fields, robj *fields[], int slot); size_t dbReclaimExpiredFields(robj *o, serverDb *db, mstime_t now, unsigned long max_entries, int didx); int keyIsExpired(serverDb *db, robj *key); long long getExpire(serverDb *db, robj *key); diff --git a/src/t_hash.c b/src/t_hash.c index be26f2ecf0a..aa51653dc8f 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -329,9 +329,9 @@ int hashTypeExists(robj *o, sds field) { * semantics of copying the values if needed. * */ -int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags) { +int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags, bool *expired_overwritten) { int update = 0; - + bool is_expired = false; /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ @@ -396,7 +396,7 @@ int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags) { long long entry_expiry = entryGetExpiry(existing); /* It is possible that the entry is already expired. In this case we can override it, but we need to make sure to expire it first * and treat it like it did not exist. */ - bool is_expired = timestampIsExpired(entry_expiry); + is_expired = entry_expiry != EXPIRY_NONE && checkAlreadyExpired(entry_expiry); if (!is_expired && flags & HASH_SET_KEEP_EXPIRY) { /* In case the HASH_SET_KEEP_EXPIRY will force keeping the existing entry expiry. */ expiry = entry_expiry; @@ -422,6 +422,8 @@ int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags) { * want this function to be responsible. */ if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field); if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value); + /* Update that we lazy expired the old entry */ + if (expired_overwritten) *expired_overwritten = is_expired; return update; } @@ -901,6 +903,7 @@ void hincrbyCommand(client *c) { unsigned char *vstr; unsigned int vlen; long long expiry = EXPIRY_NONE; + if (getLongLongFromObjectOrReply(c, c->argv[3], &incr, NULL) != C_OK) return; if ((o = hashTypeLookupWriteOrCreate(c, c->argv[1])) == NULL) return; if (hashTypeGetValue(o, c->argv[2]->ptr, &vstr, &vlen, &value, &expiry) == C_OK) { @@ -923,14 +926,49 @@ void hincrbyCommand(client *c) { value += incr; new = sdsfromlonglong(value); bool has_volatile_fields = hashTypeHasVolatileFields(o); - hashTypeSet(o, c->argv[2]->ptr, new, expiry, HASH_SET_TAKE_VALUE); + bool expired_overwritten = false; + hashTypeSet(o, c->argv[2]->ptr, new, expiry, HASH_SET_TAKE_VALUE, &expired_overwritten); if (has_volatile_fields != hashTypeHasVolatileFields(o)) { dbUpdateObjectWithVolatileItemsTracking(c->db, o); } signalModifiedKey(c, c->db, c->argv[1]); + /* In case we overitten an expired field, we need to act as if it was just expired */ + if (expired_overwritten) { + server.stat_expiredfields++; + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + } notifyKeyspaceEvent(NOTIFY_HASH, "hincrby", c->argv[1], c->db->id); server.dirty++; addReplyLongLong(c, value); + + /* Always replicate HINCRBY as an HSET or HSETEX command with the final value + * when hash has volatile item, since we do not know what will the field state be when the command reach the replica. + * HSET is used to override the resulting value and HSETEX is used in order to maintain the expiration time on the target. */ + if (has_volatile_fields) { + char buf[MAX_LONG_DOUBLE_CHARS]; + int len = ld2string(buf, sizeof(buf), value, LD_STR_HUMAN); + robj *newobj = createRawStringObject(buf, len); + if (expiry == EXPIRY_NONE) { + rewriteClientCommandArgument(c, 0, shared.hset); + rewriteClientCommandArgument(c, 3, newobj); + decrRefCount(newobj); + } else { + int new_argc = 8; /* HSETEX(1) + key(2) + PXAT(3) + unix-time-milliseconds(4) + FIELDS(5) + numfields(6) + field(7) + value(8) */ + robj **new_argv = zmalloc(sizeof(robj *) * (8)); + robj *milliseconds_obj = createStringObjectFromLongLong(expiry); + new_argv[0] = shared.hsetex; + new_argv[1] = c->argv[1]; + incrRefCount(c->argv[1]); + new_argv[2] = shared.pxat; + new_argv[3] = milliseconds_obj; + new_argv[4] = shared.fields; + new_argv[5] = shared.integers[1]; + new_argv[6] = c->argv[2]; + incrRefCount(c->argv[2]); + new_argv[7] = newobj; + replaceClientCommandVector(c, new_argc, new_argv); + } + } } void hincrbyfloatCommand(client *c) { @@ -972,23 +1010,47 @@ void hincrbyfloatCommand(client *c) { int len = ld2string(buf, sizeof(buf), value, LD_STR_HUMAN); new = sdsnewlen(buf, len); bool has_volatile_fields = hashTypeHasVolatileFields(o); - hashTypeSet(o, c->argv[2]->ptr, new, expiry, HASH_SET_TAKE_VALUE); + bool expired_overwritten = false; + hashTypeSet(o, c->argv[2]->ptr, new, expiry, HASH_SET_TAKE_VALUE, &expired_overwritten); if (has_volatile_fields != hashTypeHasVolatileFields(o)) { dbUpdateObjectWithVolatileItemsTracking(c->db, o); } signalModifiedKey(c, c->db, c->argv[1]); + /* In case we overitten an expired field, we need to act as if it was just expired */ + if (expired_overwritten) { + server.stat_expiredfields++; + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + } notifyKeyspaceEvent(NOTIFY_HASH, "hincrbyfloat", c->argv[1], c->db->id); server.dirty++; addReplyBulkCBuffer(c, buf, len); - /* Always replicate HINCRBYFLOAT as an HSET command with the final value + /* Always replicate HINCRBYFLOAT as an HSET or HSETEX command with the final value * in order to make sure that differences in float precision or formatting - * will not create differences in replicas or after an AOF restart. */ + * will not create differences in replicas or after an AOF restart. + * HSETEX is used in order to maintain the expiration time on the target. */ robj *newobj; newobj = createRawStringObject(buf, len); - rewriteClientCommandArgument(c, 0, shared.hset); - rewriteClientCommandArgument(c, 3, newobj); - decrRefCount(newobj); + if (expiry == EXPIRY_NONE) { + rewriteClientCommandArgument(c, 0, shared.hset); + rewriteClientCommandArgument(c, 3, newobj); + decrRefCount(newobj); + } else { + int new_argc = 8; /* HSETEX(1) + key(2) + PXAT(3) + unix-time-milliseconds(4) + FIELDS(5) + numfields(6) + field(7) + value(8) */ + robj **new_argv = zmalloc(sizeof(robj *) * (8)); + robj *milliseconds_obj = createStringObjectFromLongLong(expiry); + new_argv[0] = shared.hsetex; + new_argv[1] = c->argv[1]; + incrRefCount(c->argv[1]); + new_argv[2] = shared.pxat; + new_argv[3] = milliseconds_obj; + new_argv[4] = shared.fields; + new_argv[5] = shared.integers[1]; + new_argv[6] = c->argv[2]; + incrRefCount(c->argv[2]); + new_argv[7] = newobj; + replaceClientCommandVector(c, new_argc, new_argv); + } } static void addHashFieldToReply(client *c, robj *o, sds field) { @@ -1110,14 +1172,24 @@ void hsetnxCommand(client *c) { addReply(c, shared.czero); } else { hashTypeTryConversion(o, c->argv, 2, 3); - bool has_volatile_fields = hashTypeHasVolatileFields(o); - hashTypeSet(o, c->argv[2]->ptr, c->argv[3]->ptr, EXPIRY_NONE, HASH_SET_COPY | HASH_SET_KEEP_EXPIRY); + bool has_volatile_fields = hashTypeHasVolatileFields(o), expired_overwritten = false; + hashTypeSet(o, c->argv[2]->ptr, c->argv[3]->ptr, EXPIRY_NONE, HASH_SET_COPY, &expired_overwritten); if (has_volatile_fields != hashTypeHasVolatileFields(o)) { dbUpdateObjectWithVolatileItemsTracking(c->db, o); } signalModifiedKey(c, c->db, c->argv[1]); + /* In case we overitten an expired field, we need to act as if it was just expired */ + if (expired_overwritten) { + server.stat_expiredfields++; + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + } notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); server.dirty++; + /* we always have to propagate the effect of the command when we have volatile items, + * since on the replica side it might find that the fields was expired */ + if (has_volatile_fields) { + rewriteClientCommandArgument(c, 0, shared.hset); + } addReply(c, shared.cone); } } @@ -1134,13 +1206,23 @@ void hsetCommand(client *c) { if ((o = hashTypeLookupWriteOrCreate(c, c->argv[1])) == NULL) return; hashTypeTryConversion(o, c->argv, 2, c->argc - 1); bool has_volatile_fields = hashTypeHasVolatileFields(o); + int expired_overwritten = 0; for (i = 2; i < c->argc; i += 2) { - created += !hashTypeSet(o, c->argv[i]->ptr, c->argv[i + 1]->ptr, EXPIRY_NONE, HASH_SET_COPY); + bool expired = false; + created += !hashTypeSet(o, c->argv[i]->ptr, c->argv[i + 1]->ptr, EXPIRY_NONE, HASH_SET_COPY, &expired); + /* NOTE - We do not need to track all expired items which are overitten in order to propagate them, since the replica will surely just override them + * we just need to remember that we had such items to report the keyspace notification and update the stats */ + if (expired) expired_overwritten++; } if (has_volatile_fields != hashTypeHasVolatileFields(o)) { dbUpdateObjectWithVolatileItemsTracking(c->db, o); } signalModifiedKey(c, c->db, c->argv[1]); + /* In case we overitten an expired field, we need to act as if it was just expired */ + if (expired_overwritten) { + server.stat_expiredfields += expired_overwritten; + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + } notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); server.dirty += (c->argc - 2) / 2; @@ -1214,6 +1296,8 @@ void hsetexCommand(client *c) { robj **new_argv = NULL; int new_argc = 0; int need_rewrite_argv = 0; + robj **keepttl_fields = NULL; + int expired_overwritten = 0; for (; fields_index < c->argc - 1; fields_index++) { if (!strcasecmp(c->argv[fields_index]->ptr, "fields")) { @@ -1321,8 +1405,24 @@ void hsetexCommand(client *c) { changes++; } } else { - hashTypeSet(o, c->argv[i]->ptr, c->argv[i + 1]->ptr, when, set_flags); + bool expired; + hashTypeSet(o, c->argv[i]->ptr, c->argv[i + 1]->ptr, when, set_flags, &expired); changes++; + + if (expired) { + /* When KEEPTTL is used, we need to track all fields to propagate hdel per each of them + * Replicas will not ignore expired fields on the replication stream. This is why we have to explicitly delete them, + * so the replica will take the new field expiration time. */ + if ((flags & ARGS_KEEPTTL)) { + if (keepttl_fields == NULL) { + keepttl_fields = zmalloc(sizeof(robj *) * num_fields); + } + keepttl_fields[expired_overwritten] = c->argv[i]; + incrRefCount(c->argv[i]); + } + expired_overwritten++; + } + if (need_rewrite_argv) { new_argv[new_argc++] = c->argv[i]; incrRefCount(c->argv[i]); @@ -1332,7 +1432,6 @@ void hsetexCommand(client *c) { } } - if (changes) { if (has_volatile_fields != hashTypeHasVolatileFields(o)) { dbUpdateObjectWithVolatileItemsTracking(c->db, o); @@ -1342,7 +1441,28 @@ void hsetexCommand(client *c) { /* We would like to reduce the number of hexpired events in case there are potential many expired fields. */ notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); } else { + /* In case we overwritten fields which were expired we need to act as if we actively expired them */ + if (expired_overwritten > 0) { + server.stat_expiredfields += expired_overwritten; + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + /* Propagate deletions for expired/non-existent fields in batches. + * When KEEPTTL is used the replica has noway telling if, at the time the primary was executing the command, + * the fields were expired or not. When the replica executes the command it will ALWAYS overwrite the field, so + * we need to propagate hdel explicitly to prevent the replica from keeping the TTL on it's side. */ + if (keepttl_fields != NULL) { + /* Propagate individual fields deletions */ + int idx = 0; + while (idx < expired_overwritten) { + idx += propagateFieldsDeletion(c->db, o, expired_overwritten - idx, + &keepttl_fields[idx], c->slot); + } + zfree(keepttl_fields); + keepttl_fields = NULL; + } + } + notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); + if (need_rewrite_argv) { replaceClientCommandVector(c, new_argc, new_argv); } @@ -1364,7 +1484,8 @@ void hsetexCommand(client *c) { dbDelete(c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } - + /* make sure that if we ever allocated this it was freed */ + serverAssert(keepttl_fields == NULL); addReplyLongLong(c, changes == num_fields ? 1 : 0); } diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index a7885b0c27d..b74207d009f 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -116,7 +116,9 @@ proc wait_for_active_expiry {r key expected_len initial_expired expected_increme wait_for_condition $timeout $interval { [check_myhash_and_expired_subkeys $r $key $expected_len $initial_expired $expected_increment] } else { - fail "Active expiry did not occur as expected" + set expired_fields [info_field [$r info stats] expired_fields] + set expected_expired [expr {$initial_expired + $expected_increment}] + fail "Active expiry did not occur as expected expected: $expected_expired ststs: $expired_fields" } } @@ -4304,6 +4306,7 @@ start_server {tags {"hashexpire external:skip"}} { set replica [srv 0 client] set replica_host [srv 0 host] set replica_port [srv 0 port] + set replica_pid [srv 0 pid] test {expired_fields metric increments only on primary not replica during field expiry} { lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired @@ -4421,6 +4424,208 @@ start_server {tags {"hashexpire external:skip"}} { $rd_primary close $rd_replica close } + + foreach command {HINCRBY HINCRBYFLOAT} { + array set primary_ksn_event { + HINCRBY hincrby + HINCRBYFLOAT hincrbyfloat + } + array set replica_ksn_event { + HINCRBY hincrby + HINCRBYFLOAT hset + } + test "$command is executed on repilca's expired fields" { + lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired + # Initialize deferred clients and subscribe to keyspace notifications + foreach instance [list $primary $replica] { + $instance config set notify-keyspace-events KEA + } + set primary_ksn [valkey_deferring_client -1] + set replica_ksn [valkey_deferring_client $replica_host $replica_port] + foreach rd [list $primary_ksn $replica_ksn] { + assert_equal {1} [psubscribe $rd __keyevent@*] + } + + $primary debug set-active-expire 0 + + $primary flushall + + $primary $command myhash f1 1 + wait_for_ofs_sync $primary $replica + assert_equal 1 [$primary hpexpire myhash 1 fields 1 f1] + wait_for_condition 50 100 { + [$primary hexists myhash f1] == 0 + } else { + fail "Field was not logically expired on primary" + } + $primary $command myhash f1 1 + wait_for_ofs_sync $primary $replica + + # verify the value is freshly incremented on the primary and replica + assert_equal {1} [$primary hget myhash f1] + assert_equal {1} [$replica hget myhash f1] + # verify the entry has no expiry on the primary and the replica + assert_equal {-1} [$primary httl myhash fields 1 f1] + assert_equal {-1} [$replica httl myhash fields 1 f1] + + assert_keyevent_patterns $primary_ksn myhash $primary_ksn_event($command) hexpire hexpired $primary_ksn_event($command) + assert_keyevent_patterns $replica_ksn myhash $replica_ksn_event($command) hexpire hset + $primary_ksn close + $replica_ksn close + $primary debug set-active-expire 1 + } {OK} {needs:debug} + } + + test {HINCRBYFLOAT maintains TTL on repilca's fields} { + lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired + $primary debug set-active-expire 0 + $primary flushall + set long_expiry [get_long_expire_value HEXPIRE] + $primary hsetex myhash ex $long_expiry fields 1 f1 1 + wait_for_ofs_sync $primary $replica + + assert_equal {1} [$primary hget myhash f1] + assert_equal [$replica hget myhash f1] [$primary hget myhash f1] + assert_equal [$primary HPEXPIRETIME myhash FIELDS 1 f1] [$replica HPEXPIRETIME myhash FIELDS 1 f1] + + $primary hincrbyfloat myhash f1 1.0 + wait_for_ofs_sync $primary $replica + + assert_equal {2} [$primary hget myhash f1] + assert_equal [$replica hget myhash f1] [$primary hget myhash f1] + assert_equal [$primary HPEXPIRETIME myhash FIELDS 1 f1] [$replica HPEXPIRETIME myhash FIELDS 1 f1] + + $primary debug set-active-expire 1 + } {OK} {needs:debug} + + test {HSETNX set the value for expired replica field} { + lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired + # Initialize deferred clients and subscribe to keyspace notifications + foreach instance [list $primary $replica] { + $instance config set notify-keyspace-events KEA + } + set primary_ksn [valkey_deferring_client -1] + set replica_ksn [valkey_deferring_client $replica_host $replica_port] + foreach rd [list $primary_ksn $replica_ksn] { + assert_equal {1} [psubscribe $rd __keyevent@*] + } + $primary debug set-active-expire 0 + $primary flushall + + $primary hsetex myhash px 1 fields 1 f1 v1 + + wait_for_condition 50 100 { + [$primary hexists myhash f1] == 0 + } else { + fail "Field was not logically expired on primary" + } + wait_for_ofs_sync $primary $replica + + assert_equal {1} [$primary hlen myhash] + assert_equal {1} [$replica hlen myhash] + assert_equal {0} [$replica hexists myhash f1] + + $primary hsetnx myhash f1 v2 + wait_for_ofs_sync $primary $replica + + assert_equal {v2} [$primary hget myhash f1] + assert_equal {v2} [$replica hget myhash f1] + assert_equal [$primary HPEXPIRETIME myhash FIELDS 1 f1] [$replica HPEXPIRETIME myhash FIELDS 1 f1] + assert_keyevent_patterns $primary_ksn myhash hset hexpire hexpired hset + assert_keyevent_patterns $replica_ksn myhash hset hexpire hset + $primary debug set-active-expire 1 + } {OK} {needs:debug} + + test {HMSET reports hexpired when overwrites expired fields} { + lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired + # Initialize deferred clients and subscribe to keyspace notifications + foreach instance [list $primary $replica] { + $instance config set notify-keyspace-events KEA + } + set primary_ksn [valkey_deferring_client -1] + set replica_ksn [valkey_deferring_client $replica_host $replica_port] + foreach rd [list $primary_ksn $replica_ksn] { + assert_equal {1} [psubscribe $rd __keyevent@*] + } + $primary debug set-active-expire 0 + $primary flushall + + $primary hsetex myhash px 1 fields 5 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 + + wait_for_condition 50 100 { + [$primary hgetall myhash] eq {} + } else { + fail "Fields were not logically expired on primary" + } + wait_for_ofs_sync $primary $replica + + assert_equal {5} [$primary hlen myhash] + assert_equal {5} [$replica hlen myhash] + assert_equal {} [$replica hgetall myhash] + + $primary hmset myhash f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 + + wait_for_ofs_sync $primary $replica + + assert_equal [$primary hgetall myhash] [$replica hgetall myhash] + assert_keyevent_patterns $primary_ksn myhash hset hexpire hexpired hset + assert_keyevent_patterns $replica_ksn myhash hset hexpire hset + $primary debug set-active-expire 1 + } {OK} {needs:debug} + + test {HSETEX KEEPTTL replica should preserve ttl when field is not expired on primary} { + lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired + $primary debug set-active-expire 0 + + $primary hset myhash f1 v1 + + wait_for_ofs_sync $primary $replica + + pause_process $replica_pid + + $primary multi + $primary hpexpire myhash 1 fields 1 f1 + $primary hsetex myhash KEEPTTL fields 1 f1 v2 + $primary exec + + # wait for f1 to expired + wait_for_condition 50 100 { + [$primary httl myhash fields 1 f1] == -2 + } else { + fail "Field was not logically expired on primary" + } + + resume_process $replica_pid + + wait_for_ofs_sync $primary $replica + + assert_equal {-2} [$primary httl myhash fields 1 f1] + assert_equal {-2} [$replica httl myhash fields 1 f1] + $primary debug set-active-expire 1 + } {OK} {needs:debug} + + test {HSETEX KEEPTTL replica should NOT preserve ttl when field is expired on primary} { + lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired + $primary debug set-active-expire 0 + + # write a short lived field on the primary and wait for the propagation + $primary hsetex myhash PX 1 fields 1 f1 v1 + + # wait for f1 to expired + wait_for_condition 50 100 { + [$primary httl myhash fields 1 f1] == -2 + } else { + fail "Field was not logically expired on primary" + } + + # Now overite the expired field on the primary and wait for it to propagate to the replica + $primary hsetex myhash KEEPTTL fields 1 f1 v2 + wait_for_ofs_sync $primary $replica + + assert_equal {v2} [$primary hget myhash f1] + assert_equal {v2} [$replica hget myhash f1] + $primary debug set-active-expire 1 + } {OK} {needs:debug} } } From 4f040fff449e84f72089504ccaa7ba961e17e713 Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Wed, 28 Jan 2026 10:47:44 +0200 Subject: [PATCH 11/14] deflake HSETEX EXAT single field expires leaving other fields intact (#3120) The test is currently flakey, because until we merge: https://github.com/valkey-io/valkey/pull/3001 when the expiration time provided is in the past, and the field does not exist the HSETX will just silently ignore it, without incrementing the statistics. I prefer to focus on writing a dedicated test for: https://github.com/valkey-io/valkey/pull/3001 and deflake this test now. Signed-off-by: Ran Shidlansik --- tests/unit/hashexpire.tcl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index b74207d009f..f11299cf7a7 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -3015,8 +3015,8 @@ start_server {tags {"hashexpire external:skip"}} { test "HSETEX $command single field expires leaving other fields intact" { r FLUSHALL set initial_expired [info_field [r info stats] expired_fields] - r HSET myhash f2 v2 - assert_equal 1 [r HLEN myhash] + r HSET myhash f1 v1 f2 v2 + assert_equal 2 [r HLEN myhash] assert_equal 0 [get_keys_with_volatile_items r] # Use HSETEX to set expiry r HSETEX myhash $command [get_short_expire_value $command] FIELDS 1 f1 v1 From f1fd905948d53a0f765c085a6e6c1265d3aba6b5 Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Tue, 27 Jan 2026 17:09:58 +0200 Subject: [PATCH 12/14] remove unneeded include of expire.h (#3117) This caused some unaligned load of server values on 32bit compilations. Example: https://github.com/valkey-io/valkey/actions/runs/21352969942/job/61491923381?pr=3111#step:6:3064 Some insights here: https://github.com/valkey-io/valkey/pull/3111#issuecomment-3805700959 Signed-off-by: Ran Shidlansik --- src/t_hash.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/t_hash.c b/src/t_hash.c index aa51653dc8f..7a941d2a030 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -32,7 +32,6 @@ * SPDX-License-Identifier: BSD-3-Clause */ -#include "expire.h" #include "hashtable.h" #include "rax.h" #include "sds.h" From 050ec6090b14f87614f3da34239095f33fdee085 Mon Sep 17 00:00:00 2001 From: cjx-zar <56825069+cjx-zar@users.noreply.github.com> Date: Tue, 6 Jan 2026 15:50:12 +0800 Subject: [PATCH 13/14] HFE make zero a valid ttl during import mode and data loading (#3006) The HFE uses EXPIRY_NONE(-1) for fields without a TTL. A bug exists in `HSETEX` and RDB loading where `expiry > 0` is used to check for an expiration. This is problematic because `0` might be treated as no expiry in `import-mode`, instead of an already expired timestamp, leading to incorrect behavior. --------- Signed-off-by: cjx-zar --- src/rdb.c | 2 +- src/t_hash.c | 2 +- tests/unit/hashexpire.tcl | 19 +++++++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index b9006a49869..c7584420f42 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2237,7 +2237,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { return NULL; } - if (rdbtype == RDB_TYPE_HASH_2 && itemexpiry > 0) { + if (rdbtype == RDB_TYPE_HASH_2 && itemexpiry != EXPIRY_NONE) { hashTypeTrackEntry(o, entry); } } diff --git a/src/t_hash.c b/src/t_hash.c index 7a941d2a030..4f78fc2d5b4 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -335,7 +335,7 @@ int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags, bool * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ if (o->encoding == OBJ_ENCODING_LISTPACK) { - if (expiry > 0 || sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value) + if (expiry != EXPIRY_NONE || sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value) hashTypeConvert(o, OBJ_ENCODING_HASHTABLE); } diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index f11299cf7a7..81840438210 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -4662,4 +4662,23 @@ start_server {tags {"hash"}} { assert_match {*keys_with_volatile_items=0*} [r INFO keyspace] assert_equal {string} [r TYPE myhash] } + + test {Zero is a valid ttl in HFE} { + r flushall + r hset myhash f1 v1 + assert_equal [r OBJECT ENCODING myhash] "listpack" + assert_equal [r hsetex myhash exat 0 fields 2 f2 v2 f3 v3] 0 + assert_equal [r hlen myhash] 1 + assert_equal [r OBJECT ENCODING myhash] "listpack" + r config set import-mode yes + assert_equal [r hsetex myhash exat 0 fields 2 f2 v2 f3 v3] 1 + assert_equal [r hlen myhash] 3 + assert_equal [r OBJECT ENCODING myhash] "hashtable" + r config set import-mode no + wait_for_condition 30 100 { + [r hlen myhash] == 1 + } else { + fail "field wasn't expired" + } + } } From b2ac0a8ac30e403b20b9afc1fd719a14693bd3df Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Thu, 29 Jan 2026 19:23:18 +0200 Subject: [PATCH 14/14] HSETEX - Always issue keyspace notifications after validation (#3001) In the original implementation of Hash Field Expiration (https://github.com/valkey-io/valkey/pull/2089), the HSETEX command was implemented to report keyspace notifications only for performed changes. This is mostly aligned with other Hash commands (for example, HDEL will also not report `hdel` event for items which does not exist) The HSETEX case is somewhat different and is more like the `HSET` case. During HSETEX, after the command validations pass, items are ALWAYS "added" to the object, even though they might not actually be added. This case is the same for when the hash object is empty or when all the provided fields do not exist in the object (as reported [here](https://github.com/valkey-io/valkey/pull/2998)) This PR changes the way `HSETEX` will report keyspace notifications so that: 1. `hset` notification will ALWAYS be reported if all command validations pass. 2. `hexpire` will be reported in case the command include an expiration time (even past time) 3. `hxpired` will be reported in case the provided expiration time is in the past (or 0) 4. `hdel` will be reported in case the hash exists (or created as part of the command) and following the command execution it was left empty. 5. we will always return '1' as a return value of tHSETEX command which passed all validations. Before that we returned 1 only if we applied the change cross ALL the input fields, so in case some of them did not exist and a past time was set we would return 0. --------- Signed-off-by: Ran Shidlansik Co-authored-by: Jacob Murphy --- src/t_hash.c | 71 +++++++++++++++++++++------------------ tests/unit/hashexpire.tcl | 15 ++++----- 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/src/t_hash.c b/src/t_hash.c index 4f78fc2d5b4..486de9a7a6f 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -1326,6 +1326,7 @@ void hsetexCommand(client *c) { return; if (checkAlreadyExpired(when)) { + need_rewrite_argv = 1; set_expired = 1; } } @@ -1396,13 +1397,15 @@ void hsetexCommand(client *c) { for (i = fields_index; i < c->argc; i += 2) { if (set_expired) { + hashTypeIgnoreTTL(o, true); if (hashTypeDelete(o, c->argv[i]->ptr)) { new_argv[new_argc++] = c->argv[i]; incrRefCount(c->argv[i]); - /* we treat this case exactly as active expiration. */ - server.stat_expiredfields++; changes++; } + /* we treat this case exactly as active expiration. */ + server.stat_expiredfields++; + hashTypeIgnoreTTL(o, false); } else { bool expired; hashTypeSet(o, c->argv[i]->ptr, c->argv[i + 1]->ptr, when, set_flags, &expired); @@ -1435,40 +1438,31 @@ void hsetexCommand(client *c) { if (has_volatile_fields != hashTypeHasVolatileFields(o)) { dbUpdateObjectWithVolatileItemsTracking(c->db, o); } - if (set_expired) { - replaceClientCommandVector(c, new_argc, new_argv); - /* We would like to reduce the number of hexpired events in case there are potential many expired fields. */ + + /* In case we overwritten fields which were expired we need to act as if we actively expired them */ + if (expired_overwritten > 0) { + server.stat_expiredfields += expired_overwritten; notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); - } else { - /* In case we overwritten fields which were expired we need to act as if we actively expired them */ - if (expired_overwritten > 0) { - server.stat_expiredfields += expired_overwritten; - notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); - /* Propagate deletions for expired/non-existent fields in batches. - * When KEEPTTL is used the replica has noway telling if, at the time the primary was executing the command, - * the fields were expired or not. When the replica executes the command it will ALWAYS overwrite the field, so - * we need to propagate hdel explicitly to prevent the replica from keeping the TTL on it's side. */ - if (keepttl_fields != NULL) { - /* Propagate individual fields deletions */ - int idx = 0; - while (idx < expired_overwritten) { - idx += propagateFieldsDeletion(c->db, o, expired_overwritten - idx, - &keepttl_fields[idx], c->slot); - } - zfree(keepttl_fields); - keepttl_fields = NULL; + /* Propagate deletions for expired/non-existent fields in batches. + * When KEEPTTL is used the replica has noway telling if, at the time the primary was executing the command, + * the fields were expired or not. When the replica executes the command it will ALWAYS overwrite the field, so + * we need to propagate hdel explicitly to prevent the replica from keeping the TTL on it's side. */ + if (keepttl_fields != NULL) { + /* Propagate individual fields deletions */ + int idx = 0; + while (idx < expired_overwritten) { + idx += propagateFieldsDeletion(c->db, o, expired_overwritten - idx, + &keepttl_fields[idx], c->slot); } + zfree(keepttl_fields); + keepttl_fields = NULL; } + } - notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); - - if (need_rewrite_argv) { - replaceClientCommandVector(c, new_argc, new_argv); - } - if (expire) { - notifyKeyspaceEvent(NOTIFY_HASH, "hexpire", c->argv[1], c->db->id); - } + if (need_rewrite_argv) { + replaceClientCommandVector(c, new_argc, new_argv); } + signalModifiedKey(c, c->db, c->argv[1]); server.dirty += changes; } else { @@ -1478,6 +1472,17 @@ void hsetexCommand(client *c) { if (new_argv) zfree(new_argv); } + /* Handle keyspace notifications and object delete if needed. + * since setting fields in hash object should always work in case all validations pass, + * it is safe to assume that in case we reach this point events should be issues */ + notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); + if (expire) { + notifyKeyspaceEvent(NOTIFY_HASH, "hexpire", c->argv[1], c->db->id); + } + if (set_expired) { + /* We would like to reduce the number of hexpired events in case there are potential many expired fields. */ + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + } /* Delete the object in case it was left empty or created with all expired items. */ if (hashTypeLength(o) == 0) { dbDelete(c->db, c->argv[1]); @@ -1485,7 +1490,9 @@ void hsetexCommand(client *c) { } /* make sure that if we ever allocated this it was freed */ serverAssert(keepttl_fields == NULL); - addReplyLongLong(c, changes == num_fields ? 1 : 0); + /* In case we reached here we know that we operated on ALL the fields, + * even in case we end up in the same original state, we still need to reflect as the operation was done on all the fields. */ + addReply(c, shared.cone); } /* High-Level Algorithm of HGETEX Command: diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index 81840438210..639ed998dad 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -555,7 +555,7 @@ start_server {tags {"hashexpire"}} { } {ERR *} foreach command {EX PX EXAT PXAT} { - test "HSETEX $command 0/past time works correctly with 1 field" { + test "HSETEX $command 0/past time works correctly with 2 fields" { r FLUSHALL r config resetstat # Create hash with field @@ -566,16 +566,16 @@ start_server {tags {"hashexpire"}} { set rd [setup_single_keyspace_notification r] # Set field to expire immediately - assert_equal {1} [r HSETEX myhash $command [get_past_zero_expire_value $command] FIELDS 1 f1 v1] + assert_equal {1} [r HSETEX myhash $command [get_past_zero_expire_value $command] FIELDS 2 f1 v1 f2 v2] # Verify field and keys are deleted - assert_keyevent_patterns $rd myhash hexpired del + assert_keyevent_patterns $rd myhash hset hexpire hexpired del assert_equal -2 [r HTTL myhash FIELDS 1 f1] assert_equal 0 [r HLEN myhash] assert_equal 0 [r EXISTS myhash] assert_equal 0 [get_keys r] assert_equal 0 [get_keys_with_volatile_items r] - assert_equal 1 [info_field [r info stats] expired_fields] + assert_equal 2 [info_field [r info stats] expired_fields] $rd close } } @@ -632,7 +632,6 @@ start_server {tags {"hashexpire"}} { assert_error {ERR numfields should be greater than 0 and match the provided number of fields} {r HSETEX myhash PX 100 FIELDS 1 field1 val1 extra} } - ## FNX/FXX # hsetex throws ERR *, it shouldn't @@ -835,7 +834,7 @@ start_server {tags {"hashexpire"}} { r FLUSHALL r HSET myhash f1 v1 set rd [setup_single_keyspace_notification r] - + r HEXPIRE myhash 1000 FIELDS 1 f2 r HEXPIRE myhash 0 FIELDS 1 f2 # Verify no notification (getting hset and not hexpire) @@ -844,7 +843,7 @@ start_server {tags {"hashexpire"}} { assert_equal 0 [get_keys_with_volatile_items r] $rd close } - + # Error Cases test {HEXPIRE - conflicting conditions error} { r FLUSHALL @@ -4667,7 +4666,7 @@ start_server {tags {"hash"}} { r flushall r hset myhash f1 v1 assert_equal [r OBJECT ENCODING myhash] "listpack" - assert_equal [r hsetex myhash exat 0 fields 2 f2 v2 f3 v3] 0 + assert_equal [r hsetex myhash exat 0 fields 2 f2 v2 f3 v3] 1 assert_equal [r hlen myhash] 1 assert_equal [r OBJECT ENCODING myhash] "listpack" r config set import-mode yes