Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6b8bd41
Set pending_command flag consistently across all command execution pa…
harrylin98 May 27, 2026
738bf48
Forkless Save
JimB123 Apr 21, 2026
4bd498f
continue on unit tests
JimB123 Apr 28, 2026
e08abcc
Forkless Save
JimB123 Apr 28, 2026
90644e4
Forkless Save
JimB123 May 14, 2026
7a94916
Forkless Save
JimB123 May 14, 2026
b95abdc
Forkless Save
JimB123 May 14, 2026
4c46bfa
Forkless Save
JimB123 May 14, 2026
7861669
Forkless Save
JimB123 May 15, 2026
2e3806a
Forkless Save
JimB123 May 15, 2026
312b0ab
Forkless Save
JimB123 May 18, 2026
e2c82a8
Forkless Save
JimB123 May 18, 2026
798eef9
Forkless Save
JimB123 May 19, 2026
243a170
Forkless Save
JimB123 May 19, 2026
2391260
Forkless Save
JimB123 May 19, 2026
df33d89
Forkless Save
JimB123 May 20, 2026
10b08c8
Forkless Save
JimB123 May 28, 2026
a135589
Forkless Save
JimB123 May 28, 2026
9a4feef
Forkless Save
JimB123 May 28, 2026
648565b
Forkless Save
JimB123 May 28, 2026
5935e9a
Forkless save
JimB123 May 29, 2026
0f17822
Forkless Save
JimB123 May 29, 2026
a88d079
Forkless Save
JimB123 May 29, 2026
97d3bb6
Forkless Save
JimB123 Jun 1, 2026
bc72ca4
Forkless Save
JimB123 Jun 1, 2026
bdea400
Forkless Save
JimB123 Jun 1, 2026
8a8de8d
Forkless Save
JimB123 Jun 1, 2026
bc437ac
Forkless Save
JimB123 Jun 1, 2026
47f01ff
Forkless Save
JimB123 Jun 2, 2026
bd0ef82
Forkless Save
JimB123 Jun 2, 2026
2b59edd
Forkless Save
JimB123 Jun 2, 2026
3e4692c
Forkless Save
JimB123 Jun 2, 2026
7a79901
Forkless Save
JimB123 Jun 4, 2026
7a15069
Forkless Save
JimB123 Jun 4, 2026
412207a
Forkless Save
JimB123 Jun 5, 2026
a2c8e56
Forkless Save
JimB123 Jun 5, 2026
a55f705
Forkless Save
JimB123 Jun 5, 2026
3628aae
Forkless Save
JimB123 Jun 8, 2026
fd0d263
Forkless Save
JimB123 Jun 9, 2026
a4d5e28
Forkless Save
JimB123 Jun 9, 2026
91a0712
Merge branch 'forkless-pre-threadsave' into forkless-bgiterator
JimB123 Jun 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,493 changes: 717 additions & 776 deletions src/bgiteration.c

Large diffs are not rendered by default.

106 changes: 49 additions & 57 deletions src/bgiteration.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
* implements the logic of the iteration client.
*
* Iteration clients are expected to read through the keyspace until the iteration is complete or
* terminated. An iteration client may not perform modifications on a key.
*/
* terminated. An iteration client may not perform modifications on a key. */

/* Avoids dependency on server.h */
typedef struct serverObject dbEntry; // An object with key/value inserted into main dictionary
Expand All @@ -33,21 +32,30 @@ typedef struct client client;
typedef struct bgIterator bgIterator;


/* Flag indicates that a consistent iteration is required. This is used to create a point-in-time
* iteration. The iteration client will see all keys AS THEY EXISTED at the time when the iterator
* was created.
* Note: The DBID provided with the DICTENTRY events is the original DBID (at the time of iteration
* start). SWAPDB events are NOT provided during a consistent iteration. */
#define BGITERATOR_FLAG_CONSISTENT (1 << 0)

/* Flag indicating that the replication stream for keys which have already been processed should be
* forwarded to the iteration client. Most useful for non-consistent iteration to track changes
* to keys already processed. By tracking changes, this allows an non-consistent iteration client
* to achieve a consistent view at the END of the iteration.
* NOTE: Replication events will be provided ordered and synchronized with any SWAPDB events.
* LIMITATION: Since SWAPDB events are not provided during CONSISTENT iteration, it is not
* permitted to use both CONSISTENT and REPLICATION on a non-clustermode instance. */
#define BGITERATOR_FLAG_REPLICATION (1 << 1)
/* Consistency type for iteration. */
typedef enum {
/* With no consistency requirements, dbEntries are provided to the iteration client as they
* appear at the time of iteration. No replication is provided. The only guarantee is that
* dbEntries which existed at the start of iteration, and remained through the duration of
* iteration, will be provided to the iteration client once (and only once). If a dbEntry is
* modified during iteration, either the old or the new value may be provided. */
BGITERATOR_CONSISTENCY_NONE = 0,

/* With consistency at the start of iteration, a point-in-time iteration is performed. The
* iteration client will see all keys AS THEY EXISTED at the time when the iterator was created.
* Note: The DBID provided with the DICTENTRY events is the original DBID (at the time of iteration
* start). SWAPDB events will not be provided. */
BGITERATOR_CONSISTENCY_START = 1,

/* With an eventually consistent iteration, dbEntries will be followed by relevant replication.
* This will allow a client to achieve a consistent state at the END of the iteration. Once a
* dbEntry has been provided to the iteration client, any replication related to that entry will
* also be forwarded to the iteration client. With eventual consistency, keys are provided as
* they are at the time of iteration. This mode requires that the iteration client be aware of
* SWAPDB events. If a SWAPDB is performed, the client will receive a SWAPDB event.
* Replication events will be provided ordered and synchronized with any SWAPDB events. */
BGITERATOR_CONSISTENCY_EVENTUAL = 2
} bgIteratorConsistency;


/* When running an iterator with replication, a replication-done function (callback) may be
Expand All @@ -60,8 +68,7 @@ typedef struct bgIterator bgIterator;
* Returns true when an iterator stops accepting any replication item into the queue for the client.
* If false is returned, replication will continue, and bgiteration will periodically call the callback
* until true is returned. In this context, returning false indicates that the client is not ready to
* stop receiving replication, it is requesting that replication be continued.
*/
* stop receiving replication, it is requesting that replication be continued. */
typedef bool (*bgIteratorReplDoneFunc)(void *privdata);


Expand All @@ -71,8 +78,7 @@ typedef bool (*bgIteratorReplDoneFunc)(void *privdata);
* TERMINATED: will be passed as TRUE if the iteration process was terminated early (either by
* the main thread calling bgIteratorTerminate() or the iteration client calling
* bgIteratorClose()).
* PRIVDATA: this pointer is for data private to the iteration client.
*/
* PRIVDATA: this pointer is for data private to the iteration client. */
typedef void (*bgIteratorCleanupFunc)(bool terminated, void *privdata);


Expand All @@ -90,11 +96,10 @@ typedef void (*bgIteratorCleanupFunc)(bool terminated, void *privdata);
* to implement the iteration client which will read from the returned bgIterator.
*
* There is no need to delete/destroy a bgIterator. It will automatically be cleaned up after the
* last item is read.
*/
* last item is read. */
bgIterator *bgIteratorCreateFullScanIter(
const char *name,
int flags,
bgIteratorConsistency consistency,
bgIteratorReplDoneFunc repldone,
bgIteratorCleanupFunc cleanup,
void *privdata);
Expand All @@ -119,11 +124,10 @@ bgIterator *bgIteratorCreateFullScanIter(
* just copy its data and leave the array untouched.
*
* There is no need to delete/destroy a bgIterator. It will automatically be cleaned up after the
* last item is read.
*/
* last item is read. */
bgIterator *bgIteratorCreateSlotsIter(
const char *name,
int flags,
bgIteratorConsistency consistency,
const int *slots,
int slots_count,
bgIteratorReplDoneFunc repldone,
Expand All @@ -132,8 +136,7 @@ bgIterator *bgIteratorCreateSlotsIter(


/* Find an existing bgIterator by name.
* Returns NULL if the iterator does not exist (or has completed).
*/
* Returns NULL if the iterator does not exist (or has completed). */
bgIterator *bgIteratorFind(const char *name);


Expand Down Expand Up @@ -162,8 +165,7 @@ typedef struct {

/* Get the status of a background iteration.
*
* The caller-provided bgIteratorStatus will be populated.
*/
* The caller-provided bgIteratorStatus will be populated. */
void bgIteratorGetStatus(bgIterator *iter, bgIteratorStatus *status);


Expand All @@ -172,8 +174,7 @@ void bgIteratorGetStatus(bgIterator *iter, bgIteratorStatus *status);
* An iteration is terminated by the Valkey main thread. It is expected that the iteration client
* will continue to read, receiving BGITERATOR_ITEM_TERMINATED or BGITERATOR_ITEM_COMPLETE to
* complete the iteration. (This is necessary to ensure proper cleanup.)
* NOTE: If the iteration client wants to terminate iteration, it may call bgIteratorClose().
*/
* NOTE: If the iteration client wants to terminate iteration, it may call bgIteratorClose(). */
void bgIteratorTerminate(bgIterator *iter);


Expand All @@ -182,8 +183,7 @@ void bgIteratorTerminate(bgIterator *iter);
* This checks if the iterator is in the process of terminating. For the Valkey main thread, this
* can be used to determine if a call has already been made to bgIteratorTerminate. For an
* iteration client, it normally learns about terminate by reading the next item, this allows
* out-of-band detection of termination which can be useful when processing a large key.
*/
* out-of-band detection of termination which can be useful when processing a large key. */
bool bgIteratorIsTerminating(bgIterator *iter);


Expand Down Expand Up @@ -254,8 +254,7 @@ typedef struct {
* NOTE: Reading an item returns previously read items to Valkey. It is unsafe to reference an item
* previously read.
*
* (All memory management is the responsibility of the bgIterator - not the reader.)
*/
* (All memory management is the responsibility of the bgIterator - not the reader.) */
bgIteratorItem *bgIteratorRead(bgIterator *iter);


Expand All @@ -267,19 +266,15 @@ bgIteratorItem *bgIteratorRead(bgIterator *iter);
* BGITERATOR_ITEM_TERMINATED and signals that the background activity is complete.
*
* This may also be called by the iteration client to force terminate an iteration early. The
* bgIterator will be marked as terminated.
*/
* bgIterator will be marked as terminated. */
void bgIteratorClose(bgIterator *iter);


/********************************************************************************************
* BGITERATION HOOKS REQUIRED TO SUPPORT ITERATION - CALLS INSERTED INTO MAIN VALKEY CODE
********************************************************************************************/

typedef struct {
uint32_t iterator_epoch; // iterator epoch of last modification
} bgIterationEntryMetadata;

#define BGITERATION_ENTRY_METADATA_SIZE 4

/* Must be called once (and only once) at server startup. */
void bgIteration_init(void);
Expand All @@ -292,15 +287,13 @@ bool bgIteration_iterationActive(void);
/* Notify bgIteration that a key is being deleted. In Valkey, key deletion can occur in a READ
* command if the key is expired. Note that this notification is more about status than memory.
* Since the dbEntry is a reference counted object, the dbEntry can't be physically deleted if
* bgIteration is still actively using it.
*/
* bgIteration is still actively using it. */
void bgIteration_keyDelete(int dbid, const_sds key);


/* Iteration needs to know if a FLUSHALL is being performed. For normal clients, this comes through
* the standard "blockClientIfRequired" interface. This interface is for cases where Valkey
* performs the FLUSHALL operation independently of clients (e.g. when syncing with master).
*/
* performs the FLUSHALL operation independently of clients (e.g. when syncing with master). */
void bgIteration_flushall(void);


Expand All @@ -311,8 +304,7 @@ void bgIteration_flushall(void);
*
* We can't update the dbEntry if the entry is actually in use (bgIteration_isEntryInuse)!
*
* To simplify calling code, this function does nothing if old_entry == new_entry.
*/
* To simplify calling code, this function does nothing if old_entry == new_entry. */
void bgIteration_updateDbEntryPtr(dbEntry *old_entry, dbEntry *new_entry);


Expand All @@ -332,16 +324,14 @@ void bgIteration_updateDbEntryPtr(dbEntry *old_entry, dbEntry *new_entry);
* performs SWAPDB, a synchronous block may be performed (returning false) on
* individual commands within the script.
*
* Note: this function should be called for all commands (not just writes).
*/
* Note: this function should be called for all commands (not just writes). */
bool bgIteration_blockClientIfRequired(client *c);


/* After execution of a write command, the Valkey main thread must provide the command to iterators
* which are interested in the replication feed. It is required that all commands have been passed
* through bgIteration_blockClientIfRequired(), however, it is permitted that the command can be
* re-written for propagation.
*/
* re-written for propagation. */
void bgIteration_handleCommandReplication(
int dbid,
struct serverCommand *cmd,
Expand All @@ -351,16 +341,18 @@ void bgIteration_handleCommandReplication(

/* The memory that bgIteration uses while temporarily buffering replication data is not included in
* the maxmemory computation used for eviction. This function provides insight into the current
* amount of memory used for buffered replication data.
*/
* amount of memory used for buffered replication data. */
size_t bgIteration_memoryInuseForReplication(void);


/* Check if a dbEntry is currently in-use/locked by bgIteration. */
bool bgIteration_isEntryInuse(dbEntry *de);


/* Get the current iteration epoch, for tagging metadata on keys. */
uint32_t bgIteration_getEpoch(void);
/* Notify bgIteration that a dbEntry has been added/modified.
* - If caller has a dbEntry*, dbEntryModified is more efficient
* - If caller has a dbid/key, a lookup is performed to find the dbEntry */
void bgIteration_dbEntryModified(dbEntry *de);
void bgIteration_keyModified(int dbid, const_sds key);

#endif
13 changes: 2 additions & 11 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,7 @@ void setKey(client *c, serverDb *db, robj *key, robj **valref, int flags) {
} else {
dbSetValue(db, key, valref, 1, NULL);
}
bgIterationEntryMetadata *md = (bgIterationEntryMetadata *)objectGetMetadata(*valref);
if (md) md->iterator_epoch = bgIteration_getEpoch();
bgIteration_dbEntryModified(*valref);
if (!(flags & SETKEY_KEEPTTL)) removeExpire(db, key);
if (!(flags & SETKEY_NO_SIGNAL)) signalModifiedKey(c, db, key);
}
Expand Down Expand Up @@ -762,15 +761,7 @@ long long dbTotalServerKeyCount(void) {
void signalModifiedKey(client *c, serverDb *db, robj *key) {
touchWatchedKey(db, key);
trackingInvalidateKey(c, key, 1);

/* If bgIteration is running, need to maintain the iteration epoch. */
if (bgIteration_iterationActive()) {
dbEntry *o = dbFind(db, objectGetVal(key));
if (o) {
bgIterationEntryMetadata *md = (bgIterationEntryMetadata *)objectGetMetadata(o);
if (md) md->iterator_epoch = bgIteration_getEpoch();
}
}
bgIteration_keyModified(db->id, objectGetVal(key));
}

void signalFlushedDb(int dbid, int async) {
Expand Down
8 changes: 8 additions & 0 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "eval.h"
#include "script.h"
#include "module.h"
#include "bgiteration.h"
#include <stdbool.h>
#include <stddef.h>

Expand Down Expand Up @@ -708,6 +709,8 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) {
unsigned char *newzl;
ob = *elemref;

if (bgIteration_isEntryInuse(ob)) return;

/* Try to defrag robj and/or string value. */
if ((newob = activeDefragStringOb(ob))) {
*elemref = newob;
Expand Down Expand Up @@ -815,6 +818,11 @@ static void defragPubsubScanCallback(void *privdata, void *elemref) {
* and 1 if time is up and more work is needed. */
static int defragLaterItem(robj *ob, unsigned long *cursor, monotime endtime, int dbid) {
if (ob) {
if (bgIteration_isEntryInuse(ob)) {
*cursor = 0;
return 0;
}

if (ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST) {
return scanLaterList(ob, cursor, endtime);
} else if (ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HASHTABLE) {
Expand Down
9 changes: 9 additions & 0 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "cluster.h"
#include "cluster_migrateslots.h"
#include "util.h"
#include "bgiteration.h"

/*-----------------------------------------------------------------------------
* Incremental collection of expired keys.
Expand Down Expand Up @@ -167,6 +168,14 @@ void fieldExpireScanCallback(void *privdata, void *volaKey, int didx) {
robj *o = volaKey;
serverAssert(o);
serverAssert(hashTypeHasVolatileFields(o));

data->has_more_expired_entries = false;

if (bgIteration_isEntryInuse(o)) {
data->sampled++;
return;
}

mstime_t now = server.mstime;
size_t expired_fields = dbReclaimExpiredFields(o, data->db, now, data->max_entries, didx);
if (expired_fields) {
Expand Down
Loading
Loading