Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions src/adlist.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,22 +196,13 @@ void listUnlinkNode(list *list, listNode *node) {
* call to listNext() will return the next element of the list.
*
* This function can't fail. */
listIter *listGetIterator(list *list, int direction)
void listInitIterator(listIter *iter, list *list, int direction)
{
listIter *iter;

if ((iter = zmalloc(sizeof(*iter))) == NULL) return NULL;
if (direction == AL_START_HEAD)
iter->next = list->head;
else
iter->next = list->tail;
iter->direction = direction;
return iter;
}

/* Release the iterator memory */
void listReleaseIterator(listIter *iter) {
zfree(iter);
}

/* Create an iterator in the list private iterator structure */
Expand Down
3 changes: 1 addition & 2 deletions src/adlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ list *listAddNodeHead(list *list, void *value);
list *listAddNodeTail(list *list, void *value);
list *listInsertNode(list *list, listNode *old_node, void *value, int after);
void listDelNode(list *list, listNode *node);
listIter *listGetIterator(list *list, int direction);
void listInitIterator(listIter *iter, list *list, int direction);
listNode *listNext(listIter *iter);
void listReleaseIterator(listIter *iter);
list *listDup(list *orig);
listNode *listSearchKey(list *list, void *key);
listNode *listIndex(list *list, long index);
Expand Down
63 changes: 33 additions & 30 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1920,17 +1920,18 @@ int rioWriteBulkObject(rio *r, robj *obj) {
int rewriteListObject(rio *r, robj *key, robj *o) {
long long count = 0, items = listTypeLength(o);

listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
listTypeIterator li;
listTypeEntry entry;
while (listTypeNext(li,&entry)) {
listTypeInitIterator(&li, o, 0, LIST_TAIL);
while (listTypeNext(&li, &entry)) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
if (!rioWriteBulkCount(r,'*',2+cmd_items) ||
!rioWriteBulkString(r,"RPUSH",5) ||
!rioWriteBulkObject(r,key))
{
listTypeReleaseIterator(li);
listTypeResetIterator(&li);
return 0;
}
}
Expand All @@ -1941,52 +1942,53 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
vstr = listTypeGetValue(&entry,&vlen,&lval);
if (vstr) {
if (!rioWriteBulkString(r,(char*)vstr,vlen)) {
listTypeReleaseIterator(li);
listTypeResetIterator(&li);
return 0;
}
} else {
if (!rioWriteBulkLongLong(r,lval)) {
listTypeReleaseIterator(li);
listTypeResetIterator(&li);
return 0;
}
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
listTypeReleaseIterator(li);
listTypeResetIterator(&li);
return 1;
}

/* Emit the commands needed to rebuild a set object.
* The function returns 0 on error, 1 on success. */
int rewriteSetObject(rio *r, robj *key, robj *o) {
long long count = 0, items = setTypeSize(o);
setTypeIterator *si = setTypeInitIterator(o);
setTypeIterator si;
char *str;
size_t len;
int64_t llval;
while (setTypeNext(si, &str, &len, &llval) != -1) {
setTypeInitIterator(&si, o);
while (setTypeNext(&si, &str, &len, &llval) != -1) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
if (!rioWriteBulkCount(r,'*',2+cmd_items) ||
!rioWriteBulkString(r,"SADD",4) ||
!rioWriteBulkObject(r,key))
{
setTypeReleaseIterator(si);
setTypeResetIterator(&si);
return 0;
}
}
size_t written = str ?
rioWriteBulkString(r, str, len) : rioWriteBulkLongLong(r, llval);
if (!written) {
setTypeReleaseIterator(si);
setTypeResetIterator(&si);
return 0;
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
setTypeReleaseIterator(si);
setTypeResetIterator(&si);
return 1;
}

Expand Down Expand Up @@ -2104,14 +2106,14 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
int rewriteHashObject(rio *r, robj *key, robj *o) {
int res = 0; /*fail*/

hashTypeIterator *hi;
hashTypeIterator hi;
long long count = 0, items = hashTypeLength(o, 0);

int isHFE = hashTypeGetMinExpire(o, 0) != EB_EXPIRE_TIME_INVALID;
hi = hashTypeInitIterator(o);
hashTypeInitIterator(&hi, o);

if (!isHFE) {
while (hashTypeNext(hi, 0) != C_ERR) {
while (hashTypeNext(&hi, 0) != C_ERR) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
Expand All @@ -2121,31 +2123,31 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
goto reHashEnd;
}

if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) ||
!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE))
if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_KEY) ||
!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE))
goto reHashEnd;

if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
} else {
while (hashTypeNext(hi, 0) != C_ERR) {
while (hashTypeNext(&hi, 0) != C_ERR) {

char hmsetCmd[] = "*4\r\n$5\r\nHMSET\r\n";
if ( (!rioWrite(r, hmsetCmd, sizeof(hmsetCmd) - 1)) ||
(!rioWriteBulkObject(r, key)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) )
(!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_KEY)) ||
(!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) )
goto reHashEnd;

if (hi->expire_time != EB_EXPIRE_TIME_INVALID) {
if (hi.expire_time != EB_EXPIRE_TIME_INVALID) {
char cmd[] = "*6\r\n$10\r\nHPEXPIREAT\r\n";
if ( (!rioWrite(r, cmd, sizeof(cmd) - 1)) ||
(!rioWriteBulkObject(r, key)) ||
(!rioWriteBulkLongLong(r, hi->expire_time)) ||
(!rioWriteBulkLongLong(r, hi.expire_time)) ||
(!rioWriteBulkString(r, "FIELDS", 6)) ||
(!rioWriteBulkString(r, "1", 1)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) )
(!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_KEY)) )
goto reHashEnd;
}
}
Expand All @@ -2154,7 +2156,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
res = 1; /* success */

reHashEnd:
hashTypeReleaseIterator(hi);
hashTypeResetIterator(&hi);
return res;
}

Expand Down Expand Up @@ -2428,7 +2430,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
long key_count = 0;
long long updated_time = 0;
unsigned long long skipped = 0;
kvstoreIterator *kvs_it = NULL;
kvstoreIterator kvs_it;

/* Record timestamp at the beginning of rewriting AOF. */
if (server.aof_timestamp_enabled) {
Expand All @@ -2448,9 +2450,9 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

kvs_it = kvstoreIteratorInit(db->keys);
kvstoreIteratorInit(&kvs_it, db->keys);
/* Iterate this DB writing every entry */
while((de = kvstoreIteratorNext(kvs_it)) != NULL) {
while((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;

Expand All @@ -2462,7 +2464,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {

/* Skip keys that are being trimmed */
if (server.cluster_enabled) {
int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it);
int curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it);
if (isSlotInTrimJob(curr_slot)) {
skipped++;
continue;
Expand All @@ -2473,7 +2475,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
robj key;
initStaticStringObject(key, kvobjGetKey(o));

if (rewriteObject(aof, &key, o, j, expiretime) == C_ERR) goto werr;
if (rewriteObject(aof, &key, o, j, expiretime) == C_ERR) goto werr2;

/* In fork child process, we can try to release memory back to the
* OS and possibly avoid or decrease COW. We give the dismiss
Expand All @@ -2496,13 +2498,14 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
}
kvstoreIteratorRelease(kvs_it);
kvstoreIteratorReset(&kvs_it);
}
serverLog(LL_NOTICE, "AOF rewrite done, %ld keys saved, %llu keys skipped.", key_count, skipped);
return C_OK;

werr2:
kvstoreIteratorReset(&kvs_it);
werr:
if (kvs_it) kvstoreIteratorRelease(kvs_it);
return C_ERR;
}

Expand Down
16 changes: 8 additions & 8 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1054,16 +1054,16 @@ void clusterCommand(client *c) {
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c,numkeys);
kvstoreDictIterator *kvs_di = NULL;
kvstoreDictIterator kvs_di;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictIterator(server.db->keys, slot);
kvstoreInitDictIterator(&kvs_di, server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = kvstoreDictIteratorNext(kvs_di);
de = kvstoreDictIteratorNext(&kvs_di);
serverAssert(de != NULL);
sds sdskey = kvobjGetKey(dictGetKV(de));
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreResetDictIterator(&kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
!strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
/* CLUSTER SLAVES <NODE ID> */
Expand Down Expand Up @@ -1670,10 +1670,10 @@ unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) {
if (!kvstoreDictSize(server.db->keys, (int) hashslot))
return 0;

kvstoreDictIterator *kvs_di = NULL;
kvstoreDictIterator kvs_di;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictSafeIterator(server.db->keys, (int) hashslot);
while((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
kvstoreInitDictSafeIterator(&kvs_di, server.db->keys, (int) hashslot);
while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
enterExecutionUnit(1, 0);
sds sdskey = kvobjGetKey(dictGetKV(de));
robj *key = createStringObject(sdskey, sdslen(sdskey));
Expand All @@ -1700,7 +1700,7 @@ unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) {
j++;
server.dirty++;
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreResetDictIterator(&kvs_di);
return j;
}

Expand Down
35 changes: 18 additions & 17 deletions src/cluster_asm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,7 @@ int slotSnapshotSaveRio(int req, rio *rdb, int *error) {
serverAssert(req & SLAVE_REQ_SLOTS_SNAPSHOT);

dictEntry *de;
kvstoreDictIterator *kvs_di = NULL;
kvstoreDictIterator kvs_di;

if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, ASM_SEND_BULK_AND_STREAM)))
rioAbort(rdb); /* Simulate a failure */
Expand Down Expand Up @@ -2228,9 +2228,9 @@ int slotSnapshotSaveRio(int req, rio *rdb, int *error) {
/* Iterate all keys in the slot range */
for (int k = sr->start; k <= sr->end; k++) {
int send_slot_info = 0;
kvs_di = kvstoreGetDictIterator(server.db->keys, k);

while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
kvstoreInitDictIterator(&kvs_di, server.db->keys, k);
while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
/* Send slot info before the first key in the slot */
if (!send_slot_info) {
/* Format slot info */
Expand All @@ -2241,28 +2241,27 @@ int slotSnapshotSaveRio(int req, rio *rdb, int *error) {
serverAssert(len > 0 && len < (int)sizeof(buf));

/* Send slot info */
if (rioWriteBulkCount(rdb, '*', 5) == 0) goto werr;
if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr;
if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr;
if (rioWriteBulkString(rdb, "CONF", 4) == 0) goto werr;
if (rioWriteBulkString(rdb, "SLOT-INFO", 9) == 0) goto werr;
if (rioWriteBulkString(rdb, buf, len) == 0) goto werr;
if (rioWriteBulkCount(rdb, '*', 5) == 0) goto werr2;
if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr2;
if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr2;
if (rioWriteBulkString(rdb, "CONF", 4) == 0) goto werr2;
if (rioWriteBulkString(rdb, "SLOT-INFO", 9) == 0) goto werr2;
if (rioWriteBulkString(rdb, buf, len) == 0) goto werr2;
send_slot_info = 1;
}

/* Save a key-value pair */
kvobj *o = dictGetKV(de);
if (slotSnapshotSaveKeyValuePair(rdb, o, db->id) == C_ERR) goto werr;
if (slotSnapshotSaveKeyValuePair(rdb, o, db->id) == C_ERR) goto werr2;

/* Delay return if required (for testing) */
if (unlikely(server.rdb_key_save_delay)) {
/* Send buffer to the destination ASAP. */
if (rioFlush(rdb) == 0) goto werr;
if (rioFlush(rdb) == 0) goto werr2;
debugDelay(server.rdb_key_save_delay);
}
}
kvstoreReleaseDictIterator(kvs_di);
kvs_di = NULL;
kvstoreResetDictIterator(&kvs_di);
}
}
}
Expand All @@ -2274,8 +2273,9 @@ int slotSnapshotSaveRio(int req, rio *rdb, int *error) {
if (rioWriteBulkString(rdb, "SNAPSHOT-EOF", 12) == 0) goto werr;
return C_OK;

werr2:
kvstoreResetDictIterator(&kvs_di);
werr:
if (kvs_di) kvstoreReleaseDictIterator(kvs_di);
if (error) *error = errno;
return C_ERR;
}
Expand Down Expand Up @@ -3356,8 +3356,9 @@ void asmActiveTrimCycle(void) {

while (!time_exceeded && slot != -1) {
dictEntry *de;
kvstoreDictIterator *kvs_di = kvstoreGetDictSafeIterator(server.db[0].keys, slot);
while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
kvstoreDictIterator kvs_di;
kvstoreInitDictSafeIterator(&kvs_di, server.db[0].keys, slot);
while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
kvobj *kv = dictGetKV(de);
sds sdskey = kvobjGetKey(kv);

Expand All @@ -3375,7 +3376,7 @@ void asmActiveTrimCycle(void) {
break;
}
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreResetDictIterator(&kvs_di);
if (!time_exceeded) slot = slotRangeArrayNext(asmManager->active_trim_it);
}

Expand Down
Loading