Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,32 @@
#include "server.h"
#include "io_threads.h"


typedef enum {
PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */
PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */
PREFETCH_DONE /* Indicates that prefetching for this key is complete */
} PrefetchState;

typedef enum {
HASHTABLE_PREFETCH_ENTRY, /* Initial state, prefetch the hashtable pointer */
HASHTABLE_PREFETCH_INIT, /* Initialize incremental find for the nested hashtable */
HASHTABLE_PREFETCH_VALUE, /* Prefetch the value object via incremental find steps */
} HashtablePrefetchState;

typedef struct {
HashtablePrefetchState state; /* Current state of the prefetch operation */
union {
hashtableIncrementalFindState hashtab_state;
hashtableIncrementalFindState set_state;
} data;
} ValuePrefetchInfo;

typedef struct KeyPrefetchInfo {
PrefetchState state; /* Current state of the prefetch operation */
hashtableIncrementalFindState hashtab_state;
client *client;
ValuePrefetchInfo value_prefetch_info;
} KeyPrefetchInfo;

/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
Expand Down Expand Up @@ -83,24 +100,22 @@ int onMaxBatchSizeChange(const char **err) {
return 1;
}

/* Move to the next key in the batch. */
static void moveToNextKey(void) {
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
}

static void markKeyAsdone(KeyPrefetchInfo *info) {
info->state = PREFETCH_DONE;
server.stat_total_prefetch_entries++;
batch->keys_done++;
}

/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
/* Returns the next KeyPrefetchInfo structure that needs to be processed,
* advancing the index internally. Returns NULL when all keys are done. */
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
if (batch->keys_done >= batch->key_count) return NULL;

size_t start_idx = batch->cur_idx;
do {
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
if (info->state != PREFETCH_DONE) return info;
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
if (info->state != PREFETCH_DONE) return info;
} while (batch->cur_idx != start_idx);
return NULL;
}
Expand All @@ -115,21 +130,47 @@ static void initBatchInfo(hashtable **tables) {
continue;
}
info->state = PREFETCH_ENTRY;
info->client = batch->clients[i];
info->value_prefetch_info.state = HASHTABLE_PREFETCH_ENTRY;
hashtableIncrementalFindInit(&info->hashtab_state, tables[i], batch->keys[i]);
}
}

static void prefetchEntry(KeyPrefetchInfo *info) {
if (hashtableIncrementalFindStep(&info->hashtab_state)) {
/* Not done yet */
moveToNextKey();
if (hashtableIncrementalFindStep(&info->hashtab_state) != 1) {
/* Step complete, move to value prefetch */
} else if (server.io_threads_num >= server.min_io_threads_copy_avoid) {
/* Copy avoidance should be more efficient without value prefetch
* starting certain number of I/O threads */
markKeyAsdone(info);
} else {
info->state = PREFETCH_VALUE;
}
/* Otherwise still in progress, will continue on next iteration */
}

/* Prefetch hashtable value.
* Returns 1 if prefetch is still in progress, 0 if complete. */
static int prefetchHashtableValue(KeyPrefetchInfo *info, robj *val, int key_arg_index) {
if (!info->client) return 0;

switch (info->value_prefetch_info.state) {
case HASHTABLE_PREFETCH_ENTRY:
valkey_prefetch(objectGetVal(val));
info->value_prefetch_info.state = HASHTABLE_PREFETCH_INIT;
return 1;
case HASHTABLE_PREFETCH_INIT:
hashtableIncrementalFindInit(&info->value_prefetch_info.data.hashtab_state, objectGetVal(val),
objectGetVal(info->client->argv[key_arg_index]));
info->value_prefetch_info.state = HASHTABLE_PREFETCH_VALUE;
return 1;
case HASHTABLE_PREFETCH_VALUE:
if (hashtableIncrementalFindStep(&info->value_prefetch_info.data.hashtab_state)) {
return 1;
}
return 0;
}
return 0;
}

/* Prefetch the entry's value. If the value is found.*/
Expand All @@ -139,6 +180,11 @@ static void prefetchValue(KeyPrefetchInfo *info) {
robj *val = entry;
if (val->encoding == OBJ_ENCODING_RAW && val->type == OBJ_STRING) {
valkey_prefetch(objectGetVal(val));
} else if (val->encoding == OBJ_ENCODING_HASHTABLE && val->type == OBJ_HASH) {
if (info->client && (info->client->parsed_cmd->proc == hsetCommand ||
info->client->parsed_cmd->proc == hgetCommand)) {
if (prefetchHashtableValue(info, val, 2)) return;
}
}
}

Expand All @@ -152,8 +198,6 @@ static void prefetchValue(KeyPrefetchInfo *info) {
* on those keys.
*
* tables - An array of hashtables to prefetch data from.
* prefetch_value - If true, we prefetch the value data for each key.
* to bring the key's value data closer to the L1 cache as well.
*/
static void hashtablePrefetch(hashtable **tables) {
initBatchInfo(tables);
Expand All @@ -167,6 +211,7 @@ static void hashtablePrefetch(hashtable **tables) {
}
}


static void resetCommandsBatch(void) {
batch->cur_idx = 0;
batch->keys_done = 0;
Expand Down
6 changes: 6 additions & 0 deletions src/valkey-benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -2498,6 +2498,12 @@ int main(int argc, char **argv) {
free(cmd);
}

if (test_is_selected("hget")) {
len = valkeyFormatCommand(&cmd, "HGET myhash%s element:__rand_int__", tag);
benchmark("HGET", cmd, len);
free(cmd);
}

if (test_is_selected("spop")) {
len = valkeyFormatCommand(&cmd, "SPOP myset%s", tag);
benchmark("SPOP", cmd, len);
Expand Down
2 changes: 1 addition & 1 deletion tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ proc bp {{s {}}} {
} elseif {[lsearch -exact $::bp_skip $s]>=0} return
if [catch {info level -1} who] {set who ::}
while 1 {
puts -nonewline "$who/$s> "; flush stdout
puts -nonewline "> "; flush stdout
gets stdin line
if {$line=="c"} {puts "continuing.."; break}
if {$line=="i"} {set line "info locals"}
Expand Down
Loading