diff --git a/src/cli_commands.c b/src/cli_commands.c index 989428ec1b0..bb9131d0292 100644 --- a/src/cli_commands.c +++ b/src/cli_commands.c @@ -6,6 +6,7 @@ num_history, tips, num_tips, function, arity, flags, acl, get_dbid_args, key_specs, \ key_specs_num, get_keys, numargs) \ name, summary, group, since, numargs +#define ASSIGN_PREFETCH_PROC(fn) #define MAKE_ARG(name, type, key_spec_index, token, summary, since, flags, numsubargs, deprecated_since) \ name, type, token, since, flags, numsubargs #define COMMAND_ARG cliCommandArg diff --git a/src/commands.c b/src/commands.c index 7744f15e756..88f908dd121 100644 --- a/src/commands.c +++ b/src/commands.c @@ -6,6 +6,7 @@ key_specs_num, get_keys, numargs) \ name, summary, complexity, since, doc_flags, replaced, deprecated, group_enum, history, num_history, tips, \ num_tips, function, arity, flags, acl, get_dbid_args, key_specs, key_specs_num, get_keys, numargs +#define ASSIGN_PREFETCH_PROC(fn) .prefetch_proc = fn, #define MAKE_ARG(name, type, key_spec_index, token, summary, since, flags, numsubargs, deprecated_since) \ name, type, key_spec_index, token, summary, since, flags, deprecated_since, numsubargs #define COMMAND_STRUCT serverCommand diff --git a/src/commands.def b/src/commands.def index ca87553cd5d..9ac31eba724 100644 --- a/src/commands.def +++ b/src/commands.def @@ -11919,7 +11919,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { {MAKE_CMD("hexpire","Sets expiry time on hash fields.","O(N) where N is the number of specified fields.","9.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HEXPIRE_History,0,HEXPIRE_Tips,0,hexpireCommand,-6,CMD_WRITE|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HEXPIRE_Keyspecs,1,NULL,4),.args=HEXPIRE_Args}, {MAKE_CMD("hexpireat","Sets expiry time on hash fields.","O(N) where N is the number of specified fields.","9.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HEXPIREAT_History,0,HEXPIREAT_Tips,0,hexpireatCommand,-6,CMD_WRITE|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HEXPIREAT_Keyspecs,1,NULL,4),.args=HEXPIREAT_Args}, {MAKE_CMD("hexpiretime","Returns Unix timestamps in seconds since the epoch at which the given key's field(s) will expire.","O(N) where N is the number of specified fields.","9.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HEXPIRETIME_History,0,HEXPIRETIME_Tips,0,hexpiretimeCommand,-5,CMD_READONLY|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_READ,NULL,HEXPIRETIME_Keyspecs,1,NULL,2),.args=HEXPIRETIME_Args}, -{MAKE_CMD("hget","Returns the value of a field in a hash.","O(1)","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HGET_History,0,HGET_Tips,0,hgetCommand,3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_READ,NULL,HGET_Keyspecs,1,NULL,2),.args=HGET_Args}, +{MAKE_CMD("hget","Returns the value of a field in a hash.","O(1)","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HGET_History,0,HGET_Tips,0,hgetCommand,3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_READ,NULL,HGET_Keyspecs,1,NULL,2),.args=HGET_Args,ASSIGN_PREFETCH_PROC(hashValuePrefetchCallback)}, {MAKE_CMD("hgetall","Returns all fields and values in a hash.","O(N) where N is the size of the hash.","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HGETALL_History,0,HGETALL_Tips,1,hgetallCommand,2,CMD_READONLY,ACL_CATEGORY_HASH|ACL_CATEGORY_READ|ACL_CATEGORY_SLOW,NULL,HGETALL_Keyspecs,1,NULL,1),.args=HGETALL_Args}, {MAKE_CMD("hgetdel","Returns the values of one or more fields and deletes them from a hash.","O(N) where N is the number of fields to be retrieved and deleted.","9.1.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HGETDEL_History,0,HGETDEL_Tips,0,hgetdelCommand,-5,CMD_WRITE|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HGETDEL_Keyspecs,1,NULL,2),.args=HGETDEL_Args}, {MAKE_CMD("hgetex","Gets the value of one or more fields of a given hash key, and optionally sets their expiration time or time-to-live (TTL).","O(N) where N is the number of specified fields.","9.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HGETEX_History,0,HGETEX_Tips,0,hgetexCommand,-5,CMD_WRITE|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HGETEX_Keyspecs,1,NULL,3),.args=HGETEX_Args}, @@ -11936,7 +11936,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { {MAKE_CMD("hpttl","Returns the remaining time to live in milliseconds of a hash key's field(s) that have an associated expiration.","O(N) where N is the number of specified fields.","9.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HPTTL_History,0,HPTTL_Tips,0,hpttlCommand,-5,CMD_READONLY|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_READ,NULL,HPTTL_Keyspecs,1,NULL,2),.args=HPTTL_Args}, {MAKE_CMD("hrandfield","Returns one or more random fields from a hash.","O(N) where N is the number of fields returned","6.2.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HRANDFIELD_History,0,HRANDFIELD_Tips,1,hrandfieldCommand,-2,CMD_READONLY,ACL_CATEGORY_HASH|ACL_CATEGORY_READ|ACL_CATEGORY_SLOW,NULL,HRANDFIELD_Keyspecs,1,NULL,2),.args=HRANDFIELD_Args}, {MAKE_CMD("hscan","Iterates over fields and values of a hash.","O(1) for every call. O(N) for a complete iteration, including enough command calls for the cursor to return back to 0. N is the number of elements inside the collection.","2.8.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSCAN_History,0,HSCAN_Tips,1,hscanCommand,-3,CMD_READONLY,ACL_CATEGORY_HASH|ACL_CATEGORY_READ|ACL_CATEGORY_SLOW,NULL,HSCAN_Keyspecs,1,NULL,5),.args=HSCAN_Args}, -{MAKE_CMD("hset","Creates or modifies the value of a field in a hash.","O(1) for each field/value pair added, so O(N) to add N field/value pairs when the command is called with multiple field/value pairs.","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSET_History,1,HSET_Tips,0,hsetCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HSET_Keyspecs,1,NULL,2),.args=HSET_Args}, +{MAKE_CMD("hset","Creates or modifies the value of a field in a hash.","O(1) for each field/value pair added, so O(N) to add N field/value pairs when the command is called with multiple field/value pairs.","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSET_History,1,HSET_Tips,0,hsetCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HSET_Keyspecs,1,NULL,2),.args=HSET_Args,ASSIGN_PREFETCH_PROC(hashValuePrefetchCallback)}, {MAKE_CMD("hsetex","Sets the value of one or more fields of a given hash key, and optionally sets their expiration time.","O(N) where N is the number of specified fields.","9.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSETEX_History,0,HSETEX_Tips,0,hsetexCommand,-6,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HSETEX_Keyspecs,1,NULL,5),.args=HSETEX_Args}, {MAKE_CMD("hsetnx","Sets the value of a field in a hash only when the field doesn't exist.","O(1)","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSETNX_History,0,HSETNX_Tips,0,hsetnxCommand,4,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_WRITE,NULL,HSETNX_Keyspecs,1,NULL,3),.args=HSETNX_Args}, {MAKE_CMD("hstrlen","Returns the length of the value of a field.","O(1)","3.2.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSTRLEN_History,0,HSTRLEN_Tips,0,hstrlenCommand,3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_FAST|ACL_CATEGORY_HASH|ACL_CATEGORY_READ,NULL,HSTRLEN_Keyspecs,1,NULL,2),.args=HSTRLEN_Args}, diff --git a/src/commands/hget.json b/src/commands/hget.json index b81b03a9ac6..c5e014e0a5d 100644 --- a/src/commands/hget.json +++ b/src/commands/hget.json @@ -6,6 +6,7 @@ "since": "2.0.0", "arity": 3, "function": "hgetCommand", + "prefetch_function": "hashValuePrefetchCallback", "command_flags": [ "READONLY", "FAST" diff --git a/src/commands/hset.json b/src/commands/hset.json index 6b0dbec96cd..d4bd0c674b3 100644 --- a/src/commands/hset.json +++ b/src/commands/hset.json @@ -6,6 +6,7 @@ "since": "2.0.0", "arity": -4, "function": "hsetCommand", + "prefetch_function": "hashValuePrefetchCallback", "history": [ [ "4.0.0", diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 963514937c8..a61c887742d 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -12,6 +12,7 @@ #include "server.h" #include "io_threads.h" + typedef enum { PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */ PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */ @@ -21,6 +22,8 @@ typedef enum { typedef struct KeyPrefetchInfo { PrefetchState state; /* Current state of the prefetch operation */ hashtableIncrementalFindState hashtab_state; + client *client; + ValuePrefetchInfo value_prefetch_info; } KeyPrefetchInfo; /* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */ @@ -115,6 +118,8 @@ static void initBatchInfo(hashtable **tables) { continue; } info->state = PREFETCH_ENTRY; + info->client = batch->clients[i]; + info->value_prefetch_info.state = HASHTABLE_PREFETCH_ENTRY; hashtableIncrementalFindInit(&info->hashtab_state, tables[i], batch->keys[i]); } } @@ -139,10 +144,12 @@ static void prefetchValue(KeyPrefetchInfo *info) { robj *val = entry; if (val->encoding == OBJ_ENCODING_RAW && val->type == OBJ_STRING) { valkey_prefetch(objectGetVal(val)); + } else if (info->client && info->client->parsed_cmd->prefetch_proc) { + if (info->client->parsed_cmd->prefetch_proc(info->client, &info->value_prefetch_info, val)) return; } } - markKeyAsdone(info); + moveToNextKey(); } /* Prefetch hashtable data for an array of keys. @@ -160,13 +167,19 @@ static void hashtablePrefetch(hashtable **tables) { KeyPrefetchInfo *info; while ((info = getNextPrefetchInfo())) { switch (info->state) { - case PREFETCH_ENTRY: prefetchEntry(info); break; - case PREFETCH_VALUE: prefetchValue(info); break; + case PREFETCH_ENTRY: + prefetchEntry(info); + break; + case PREFETCH_VALUE: { + prefetchValue(info); + break; + } default: serverPanic("Unknown prefetch state %d", info->state); } } } + static void resetCommandsBatch(void) { batch->cur_idx = 0; batch->keys_done = 0; diff --git a/src/memory_prefetch.h b/src/memory_prefetch.h index 064b30526d2..b73091720ba 100644 --- a/src/memory_prefetch.h +++ b/src/memory_prefetch.h @@ -1,7 +1,24 @@ #ifndef MEMORY_PREFETCH_H #define MEMORY_PREFETCH_H +#include "hashtable.h" + struct client; +struct robj; + +/* Value prefetch state machine for nested data structures (hashtables, etc.) */ +typedef enum { + HASHTABLE_PREFETCH_ENTRY, /* Initial state, prefetch hashtable pointer */ + HASHTABLE_PREFETCH_INIT, /* Initialize incremental find in the hashtable */ + HASHTABLE_PREFETCH_VALUE, /* Step through incremental find in the hashtable */ +} HashtablePrefetchState; + +typedef struct { + HashtablePrefetchState state; + union { + hashtableIncrementalFindState hashtab_state; + } data; +} ValuePrefetchInfo; void prefetchCommandsBatchInit(void); void processClientsCommandsBatch(void); diff --git a/src/server.h b/src/server.h index 8702ed222e7..9b85992a320 100644 --- a/src/server.h +++ b/src/server.h @@ -2566,6 +2566,8 @@ typedef int serverGetKeysProc(struct serverCommand *cmd, robj **argv, int argc, * Caller should free the returned array. */ typedef int *commandDbIdArgs(robj **argv, int argc, int *count); +typedef bool serverValuePrefetchProc(client *c, ValuePrefetchInfo *info, robj *val); + /* Command structure. * * Note that the command table is in commands.c and it is auto-generated. @@ -2703,6 +2705,7 @@ struct serverCommand { struct serverCommand *subcommands; /* Array of arguments (may be NULL) */ struct serverCommandArg *args; + serverValuePrefetchProc *prefetch_proc; /* Value prefetch callback (NULL if unused) */ #ifdef LOG_REQ_RES /* Reply schema */ struct jsonObject *reply_schema; @@ -4241,6 +4244,9 @@ int *swapdbDbIdArgs(robj **argv, int argc, int *count); int *moveDbIdArgs(robj **argv, int argc, int *count); int *copyDbIdArgs(robj **argv, int argc, int *count); +/* Helper functions for object memory prefecthing */ +bool hashValuePrefetchCallback(client *c, ValuePrefetchInfo *info, robj *val); + #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__((deprecated)); void free(void *ptr) __attribute__((deprecated)); diff --git a/src/t_hash.c b/src/t_hash.c index 821b43bb4ac..cbf9440956d 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -2501,3 +2501,27 @@ size_t hashTypeScanDefrag(robj *ob, size_t cursor, void *(*defragAllocfn)(void * } return (long)vset_cursor; } + +/* Prefetch callback for hash values with hashtable encoding. + * implements a 3-phase state machine for incremental prefetching. */ +bool hashValuePrefetchCallback(client *c, ValuePrefetchInfo *info, robj *val) { + if (val->type != OBJ_HASH || val->encoding != OBJ_ENCODING_HASHTABLE) return false; + + if (info->state == HASHTABLE_PREFETCH_ENTRY) { + /* Phase 1: Prefetch the hashtable pointer itself */ + valkey_prefetch(objectGetVal(val)); + info->state = HASHTABLE_PREFETCH_INIT; + return true; + } + if (info->state == HASHTABLE_PREFETCH_INIT) { + /* Phase 2: Initialize incremental find in the hash hashtable + * c->argv[2] is the field name for both HGET and HSET */ + hashtableIncrementalFindInit(&info->data.hashtab_state, + objectGetVal(val), + objectGetVal(c->argv[2])); + info->state = HASHTABLE_PREFETCH_VALUE; + return true; + } + /* Phase 3: Step through incremental find */ + return hashtableIncrementalFindStep(&info->data.hashtab_state); +} diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index fb4194696b5..830c8805bae 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -2632,6 +2632,12 @@ int main(int argc, char **argv) { free(cmd); } + if (test_is_selected("hget")) { + len = valkeyFormatCommand(&cmd, "HGET myhash%s element:__rand_int__", tag); + benchmark("HGET", cmd, len); + free(cmd); + } + if (test_is_selected("spop")) { len = valkeyFormatCommand(&cmd, "SPOP myset%s", tag); benchmark("SPOP", cmd, len); diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 7a3674f2a22..8bdeff70c92 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -1272,7 +1272,7 @@ proc bp {{s {}}} { } elseif {[lsearch -exact $::bp_skip $s]>=0} return if [catch {info level -1} who] {set who ::} while 1 { - puts -nonewline "$who/$s> "; flush stdout + puts -nonewline "> "; flush stdout gets stdin line if {$line=="c"} {puts "continuing.."; break} if {$line=="i"} {set line "info locals"} diff --git a/utils/generate-command-code.py b/utils/generate-command-code.py index 63ebd1ae680..189ed685c97 100755 --- a/utils/generate-command-code.py +++ b/utils/generate-command-code.py @@ -496,6 +496,9 @@ def _doc_flags_code(): if self.args: s += ".args=%s," % self.arg_table_name() + if self.desc.get("prefetch_function"): + s += "ASSIGN_PREFETCH_PROC(%s)," % self.desc["prefetch_function"] + if self.reply_schema and args.with_reply_schema: s += ".reply_schema=&%s," % self.reply_schema_name()