Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
1583d60
Missing --memkeys and --keystats for some options in redis-cli help t…
yveslb Feb 13, 2025
87124a3
Fix wrongly updating fsynced_reploff_pending when appendfsync=everyse…
ShooterIT Feb 13, 2025
662cb2f
Don't send unnecessary PING to replicas (#13790)
ShooterIT Feb 13, 2025
7f5f588
AOF offset info (#13773)
ShooterIT Feb 13, 2025
57807cd
Memory Usage command LIST accuracy fix (#13783)
ofirluzon Feb 14, 2025
e260847
Add HGETDEL, HGETEX and HSETEX hash commands (#13798)
tezc Feb 14, 2025
6c202f4
Remove DENYOOM flag from hexpire command (#13800)
tezc Feb 16, 2025
2b2e4e2
Re-implement RM_DefragRedisModuleDict API
sundb Feb 17, 2025
971514c
Update the comment
sundb Feb 17, 2025
9a714d7
Remove unused code
sundb Feb 17, 2025
4d6848a
Code style
sundb Feb 17, 2025
2a80c99
unify seekTo and nextToSeek
sundb Feb 17, 2025
e2d0820
Return NULL if module dict was moved
sundb Feb 17, 2025
a9d8eb2
add a static dict index to keep the current defrag dict
sundb Feb 17, 2025
03f75f6
Add global_dicts_pauses
sundb Feb 17, 2025
a1c3d6f
Reset seekTo when cursor is 0
sundb Feb 17, 2025
cc57fc0
Free seekTo before reset to NULL
sundb Feb 17, 2025
14d0db1
Move RedisModuleDefragDictValueCallback to correct place
sundb Feb 17, 2025
895c80c
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 17, 2025
53304e9
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 17, 2025
e62da96
Add support for both v1 and v2
sundb Feb 17, 2025
ef54425
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 17, 2025
9440c15
Reduce the size of the dictionary to give other tests a chance
sundb Feb 18, 2025
63ea2ed
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 18, 2025
ab9efe4
Move defrag_later into defragKeysCtx
sundb Feb 18, 2025
1a17989
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 18, 2025
b65b09c
Fix complaint
sundb Feb 18, 2025
6193444
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 18, 2025
1d1adc1
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 18, 2025
52bb376
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 18, 2025
725d0c2
Update src/module.c
sundb Feb 18, 2025
f7c3c20
Add new frag.create_global_frag command to make some frags
sundb Feb 18, 2025
1d354bf
Remove unused code
sundb Feb 18, 2025
7300d8f
Add new defragtest_global_dicts_defragged to check if we break in the…
sundb Feb 19, 2025
53a3f21
Stop defrag before create frag
sundb Feb 19, 2025
c5f91ab
Fix syntax issue in comments of src/module.c (#13802)
yunxiao3 Feb 19, 2025
b045fe4
Fix overflow on 32-bit systems when calculating idle time for evictio…
luozongle01 Feb 19, 2025
66df58f
Do not send NL if replica client is already closed (#13813)
guybe7 Feb 19, 2025
e8b54c3
Merge branch 'incrementail_defrag_module' into DefragRedisModuleDictDict
sundb Feb 19, 2025
a8714ba
Refine the comment for DEFRAG_CYCLE_US
sundb Feb 19, 2025
725cd26
Refactor of ActiveDefrag to reduce latencies (#13814)
sundb Feb 19, 2025
695126c
Add support for incremental defragmentation of global module data (#1…
sundb Feb 19, 2025
8d4f93a
Merge branch 'unstable' into DefragRedisModuleDictDict
sundb Feb 19, 2025
3121e82
Fix memory leak
sundb Feb 20, 2025
a55c0c5
Add ctx for RedisModuleDefragDictValueCallback
sundb Feb 20, 2025
d060c25
Create more fragments for the test to pass
sundb Feb 20, 2025
6f8723b
Refine the test
sundb Feb 20, 2025
4f72f7d
Spell
sundb Feb 20, 2025
04c5836
Revert some change and refine the test
sundb Feb 20, 2025
c3d8aec
Add comment for block
sundb Feb 20, 2025
3d05ecc
Revert changes not related to this PR
sundb Feb 20, 2025
6cd97f8
Revert changes not related to this PR
sundb Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 105 additions & 32 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath);
void aofManifestFreeAndUpdate(aofManifest *am);
void aof_background_fsync_and_close(int fd);

/* When we call 'startAppendOnly', we will create a temp INCR AOF, and rename
* it to the real INCR AOF name when the AOFRW is done, so if want to know the
* accurate start offset of the INCR AOF, we need to record it when we create
* the temp INCR AOF. This variable is used to record the start offset, and
* set the start offset of the real INCR AOF when the AOFRW is done. */
static long long tempIncAofStartReplOffset = 0;

/* ----------------------------------------------------------------------------
* AOF Manifest file implementation.
*
Expand Down Expand Up @@ -73,10 +80,15 @@ void aof_background_fsync_and_close(int fd);
#define AOF_MANIFEST_KEY_FILE_NAME "file"
#define AOF_MANIFEST_KEY_FILE_SEQ "seq"
#define AOF_MANIFEST_KEY_FILE_TYPE "type"
#define AOF_MANIFEST_KEY_FILE_STARTOFFSET "startoffset"
#define AOF_MANIFEST_KEY_FILE_ENDOFFSET "endoffset"

/* Create an empty aofInfo. */
aofInfo *aofInfoCreate(void) {
return zcalloc(sizeof(aofInfo));
aofInfo *ai = zcalloc(sizeof(aofInfo));
ai->start_offset = -1;
ai->end_offset = -1;
return ai;
}

/* Free the aofInfo structure (pointed to by ai) and its embedded file_name. */
Expand All @@ -93,6 +105,8 @@ aofInfo *aofInfoDup(aofInfo *orig) {
ai->file_name = sdsdup(orig->file_name);
ai->file_seq = orig->file_seq;
ai->file_type = orig->file_type;
ai->start_offset = orig->start_offset;
ai->end_offset = orig->end_offset;
return ai;
}

Expand All @@ -105,10 +119,19 @@ sds aofInfoFormat(sds buf, aofInfo *ai) {
if (sdsneedsrepr(ai->file_name))
filename_repr = sdscatrepr(sdsempty(), ai->file_name, sdslen(ai->file_name));

sds ret = sdscatprintf(buf, "%s %s %s %lld %s %c\n",
sds ret = sdscatprintf(buf, "%s %s %s %lld %s %c",
AOF_MANIFEST_KEY_FILE_NAME, filename_repr ? filename_repr : ai->file_name,
AOF_MANIFEST_KEY_FILE_SEQ, ai->file_seq,
AOF_MANIFEST_KEY_FILE_TYPE, ai->file_type);

if (ai->start_offset != -1) {
ret = sdscatprintf(ret, " %s %lld", AOF_MANIFEST_KEY_FILE_STARTOFFSET, ai->start_offset);
if (ai->end_offset != -1) {
ret = sdscatprintf(ret, " %s %lld", AOF_MANIFEST_KEY_FILE_ENDOFFSET, ai->end_offset);
}
}

ret = sdscatlen(ret, "\n", 1);
sdsfree(filename_repr);

return ret;
Expand Down Expand Up @@ -304,6 +327,10 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath) {
ai->file_seq = atoll(argv[i+1]);
} else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_TYPE)) {
ai->file_type = (argv[i+1])[0];
} else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_STARTOFFSET)) {
ai->start_offset = atoll(argv[i+1]);
} else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_ENDOFFSET)) {
ai->end_offset = atoll(argv[i+1]);
}
/* else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_OTHER)) {} */
}
Expand Down Expand Up @@ -433,12 +460,13 @@ sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) {
* for example:
* appendonly.aof.1.incr.aof
*/
sds getNewIncrAofName(aofManifest *am) {
sds getNewIncrAofName(aofManifest *am, long long start_reploff) {
aofInfo *ai = aofInfoCreate();
ai->file_type = AOF_FILE_TYPE_INCR;
ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename,
++am->curr_incr_file_seq, INCR_FILE_SUFFIX, AOF_FORMAT_SUFFIX);
ai->file_seq = am->curr_incr_file_seq;
ai->start_offset = start_reploff;
listAddNodeTail(am->incr_aof_list, ai);
am->dirty = 1;
return ai->file_name;
Expand All @@ -456,7 +484,7 @@ sds getLastIncrAofName(aofManifest *am) {

/* If 'incr_aof_list' is empty, just create a new one. */
if (!listLength(am->incr_aof_list)) {
return getNewIncrAofName(am);
return getNewIncrAofName(am, server.master_repl_offset);
}

/* Or return the last one. */
Expand Down Expand Up @@ -781,10 +809,11 @@ int openNewIncrAofForAppend(void) {
if (server.aof_state == AOF_WAIT_REWRITE) {
/* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */
new_aof_name = getTempIncrAofName();
tempIncAofStartReplOffset = server.master_repl_offset;
} else {
/* Dup a temp aof_manifest to modify. */
temp_am = aofManifestDup(server.aof_manifest);
new_aof_name = sdsdup(getNewIncrAofName(temp_am));
new_aof_name = sdsdup(getNewIncrAofName(temp_am, server.master_repl_offset));
}
sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
Expand Down Expand Up @@ -833,6 +862,50 @@ int openNewIncrAofForAppend(void) {
return C_ERR;
}

/* When we close gracefully the AOF file, we have the chance to persist the
* end replication offset of current INCR AOF. */
void updateCurIncrAofEndOffset(void) {
if (server.aof_state != AOF_ON) return;
serverAssert(server.aof_manifest != NULL);

if (listLength(server.aof_manifest->incr_aof_list) == 0) return;
aofInfo *ai = listNodeValue(listLast(server.aof_manifest->incr_aof_list));
ai->end_offset = server.master_repl_offset;
server.aof_manifest->dirty = 1;
/* It doesn't matter if the persistence fails since this information is not
* critical, we can get an approximate value by start offset plus file size. */
persistAofManifest(server.aof_manifest);
}

/* After loading AOF data, we need to update the `server.master_repl_offset`
* based on the information of the last INCR AOF, to avoid the rollback of
* the start offset of new INCR AOF. */
void updateReplOffsetAndResetEndOffset(void) {
if (server.aof_state != AOF_ON) return;
serverAssert(server.aof_manifest != NULL);

/* If the INCR file has an end offset, we directly use it, and clear it
* to avoid the next time we load the manifest file, we will use the same
* offset, but the real offset may have advanced. */
if (listLength(server.aof_manifest->incr_aof_list) == 0) return;
aofInfo *ai = listNodeValue(listLast(server.aof_manifest->incr_aof_list));
if (ai->end_offset != -1) {
server.master_repl_offset = ai->end_offset;
ai->end_offset = -1;
server.aof_manifest->dirty = 1;
/* We must update the end offset of INCR file correctly, otherwise we
* may keep wrong information in the manifest file, since we continue
* to append data to the same INCR file. */
if (persistAofManifest(server.aof_manifest) != AOF_OK)
exit(1);
} else {
/* If the INCR file doesn't have an end offset, we need to calculate
* the replication offset by the start offset plus the file size. */
server.master_repl_offset = (ai->start_offset == -1 ? 0 : ai->start_offset) +
getAppendOnlyFileSize(ai->file_name, NULL);
}
}

/* Whether to limit the execution of Background AOF rewrite.
*
* At present, if AOFRW fails, redis will automatically retry. If it continues
Expand Down Expand Up @@ -938,6 +1011,7 @@ void stopAppendOnly(void) {
server.aof_last_fsync = server.mstime;
}
close(server.aof_fd);
updateCurIncrAofEndOffset();

server.aof_fd = -1;
server.aof_selected_db = -1;
Expand Down Expand Up @@ -1071,35 +1145,34 @@ void flushAppendOnlyFile(int force) {
mstime_t latency;

if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_last_incr_fsync_offset != server.aof_last_incr_size &&
server.mstime - server.aof_last_fsync >= 1000 &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;

/* Check if we need to do fsync even the aof buffer is empty,
* the reason is described in the previous AOF_FSYNC_EVERYSEC block,
* and AOF_FSYNC_ALWAYS is also checked here to handle a case where
* aof_fsync is changed from everysec to always. */
} else if (server.aof_fsync == AOF_FSYNC_ALWAYS &&
server.aof_last_incr_fsync_offset != server.aof_last_incr_size)
{
goto try_fsync;
} else {
if (server.aof_last_incr_fsync_offset == server.aof_last_incr_size) {
/* All data is fsync'd already: Update fsynced_reploff_pending just in case.
* This is needed to avoid a WAITAOF hang in case a module used RM_Call with the NO_AOF flag,
* in which case master_repl_offset will increase but fsynced_reploff_pending won't be updated
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
* This is needed to avoid a WAITAOF hang in case a module used RM_Call
* with the NO_AOF flag, in which case master_repl_offset will increase but
* fsynced_reploff_pending won't be updated (because there's no reason, from
* the AOF POV, to call fsync) and then WAITAOF may wait on the higher offset
* (which contains data that was only propagated to replicas, and not to AOF) */
if (!aofFsyncInProgress())
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
return;
} else {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.mstime - server.aof_last_fsync >= 1000 &&
!(sync_in_progress = aofFsyncInProgress()))
goto try_fsync;

/* Check if we need to do fsync even the aof buffer is empty,
* the reason is described in the previous AOF_FSYNC_EVERYSEC block,
* and AOF_FSYNC_ALWAYS is also checked here to handle a case where
* aof_fsync is changed from everysec to always. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
goto try_fsync;
}
return;
}

if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
Expand Down Expand Up @@ -2665,7 +2738,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
sds temp_incr_aof_name = getTempIncrAofName();
sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name);
/* Get next new incr aof name. */
sds new_incr_filename = getNewIncrAofName(temp_am);
sds new_incr_filename = getNewIncrAofName(temp_am, tempIncAofStartReplOffset);
new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
latencyStartMonitor(latency);
if (rename(temp_incr_filepath, new_incr_filepath) == -1) {
Expand Down
Loading