Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of ActiveDefrag to reduce latencies #1242

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->timeEventNextId = 1;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
Expand Down
5 changes: 3 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3208,10 +3208,11 @@ standardConfig static_configs[] = {
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-keepalive", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcpkeepalive, 300, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-migration-barrier", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_migration_barrier, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */
createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */
createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration),
createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.replica_priority, 100, INTEGER_CONFIG, NULL, NULL),
Expand Down
1,078 changes: 666 additions & 412 deletions src/defrag.c

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {

/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
static void dictDefragBucket(dictEntry **bucketref, const dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
Expand Down Expand Up @@ -1499,7 +1499,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
* where NULL means that no reallocation happened and the old memory is still
* valid. */
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) {
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata) {
int htidx0, htidx1;
const dictEntry *de, *next;
unsigned long m0, m1;
Expand Down
2 changes: 1 addition & 1 deletion src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);

Expand Down
23 changes: 18 additions & 5 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d) return 0;
Expand All @@ -750,14 +750,27 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
* within dict, it only reallocates the memory used by the dict structure itself using
* the provided allocation function. This feature was added for the active defrag feature.
*
* The 'defragfn' callback is called with a reference to the dict
* that callback can reallocate. */
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = 0; didx < kvs->num_dicts; didx++) {
* With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time
* to execute. A "cursor" is used to perform the operation iteratively. When first called, a
* cursor value of 0 should be provided. The return value is an updated cursor which should be
* provided on the next iteration. The operation is complete when 0 is returned.
*
* The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = cursor; didx < kvs->num_dicts; didx++) {
dict **d = kvstoreGetDictRef(kvs, didx), *newd;
if (!*d) continue;

listNode *rehashing_node = NULL;
if (listLength(kvs->rehashing) > 0) {
rehashing_node = ((kvstoreDictMetadata *)dictMetadata(*d))->rehashing_node;
}

if ((newd = defragfn(*d))) *d = newd;
if (rehashing_node) listNodeValue(rehashing_node) = *d;
return (didx + 1);
}
return 0;
}

uint64_t kvstoreGetHash(kvstore *kvs, const void *key) {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata);
typedef dict *(kvstoreDictLUTDefragFunction)(dict *d);
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn);
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn);
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key);
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key);
dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing);
Expand Down
29 changes: 5 additions & 24 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1138,8 +1138,8 @@ void databasesCron(void) {
}
}

/* Defrag keys gradually. */
activeDefragCycle();
/* Start active defrag cycle or adjust defrag CPU if needed. */
monitorActiveDefrag();

/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
Expand Down Expand Up @@ -1609,24 +1609,7 @@ void whileBlockedCron(void) {
mstime_t latency;
latencyStartMonitor(latency);

/* In some cases we may be called with big intervals, so we may need to do
* extra work here. This is because some of the functions in serverCron rely
* on the fact that it is performed every 10 ms or so. For instance, if
* activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we
* need to call it multiple times. */
long hz_ms = 1000 / server.hz;
while (server.blocked_last_cron < server.mstime) {
/* Defrag keys gradually. */
activeDefragCycle();
JimB123 marked this conversation as resolved.
Show resolved Hide resolved

server.blocked_last_cron += hz_ms;

/* Increment cronloop so that run_with_period works. */
server.cronloops++;
}

/* Other cron jobs do not need to be done in a loop. No need to check
* server.blocked_last_cron since we have an early exit at the top. */
defragWhileBlocked();

/* Update memory stats during loading (excluding blocked scripts) */
if (server.loading) cronUpdateMemoryStats();
Expand Down Expand Up @@ -2118,7 +2101,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.active_defrag_running = 0;
server.active_defrag_cpu_percent = 0;
server.active_defrag_configuration_changed = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
Expand Down Expand Up @@ -2733,8 +2716,6 @@ void initServer(void) {
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later, (void (*)(void *))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
Expand Down Expand Up @@ -5704,7 +5685,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"mem_aof_buffer:%zu\r\n", mh->aof_buffer,
"mem_allocator:%s\r\n", ZMALLOC_LIB,
"mem_overhead_db_hashtable_rehashing:%zu\r\n", mh->overhead_db_hashtable_rehashing,
"active_defrag_running:%d\r\n", server.active_defrag_running,
"active_defrag_running:%d\r\n", server.active_defrag_cpu_percent,
"lazyfree_pending_objects:%zu\r\n", lazyfreeGetPendingObjectsCount(),
"lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount()));
freeMemoryOverheadData(mh);
Expand Down
11 changes: 6 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,6 @@ typedef struct serverDb {
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} serverDb;

/* forward declaration for functions ctx */
Expand Down Expand Up @@ -1690,7 +1689,7 @@ struct valkeyServer {
int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */
int shutdown_flags; /* Flags passed to prepareForShutdown(). */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
int active_defrag_cpu_percent; /* Current desired CPU percentage for active defrag */
char *pidfile; /* PID file path */
int arch_bits; /* 32 or 64 depending on sizeof(long) */
int cronloops; /* Number of times the cron function run */
Expand Down Expand Up @@ -1887,8 +1886,9 @@ struct valkeyServer {
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cpu_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cpu_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cycle_us; /* standard duration of defrag cycle */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from
within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
Expand Down Expand Up @@ -3341,7 +3341,8 @@ void bytesToHuman(char *s, size_t size, unsigned long long n);
void enterExecutionUnit(int update_cached_time, long long us);
void exitExecutionUnit(void);
void resetServerStats(void);
void activeDefragCycle(void);
void monitorActiveDefrag(void);
void defragWhileBlocked(void);
unsigned int getLRUClock(void);
unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);
Expand Down
25 changes: 16 additions & 9 deletions tests/unit/memefficiency.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ run_solo {defrag} {
proc test_active_defrag {type} {
if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} {
test "Active defrag main dictionary: $type" {
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
Expand Down Expand Up @@ -89,6 +88,8 @@ run_solo {defrag} {
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75

after 1000 ;# Give defrag time to work (might be multiple cycles)

# Wait for the active defrag to stop working.
wait_for_condition 2000 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -138,12 +139,13 @@ run_solo {defrag} {
r config resetstat
r config set key-load-delay -25 ;# sleep on average 1/25 usec
r debug loadaof
after 1000 ;# give defrag a chance to work before turning it off
r config set activedefrag no

# measure hits and misses right after aof loading
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]

after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
Expand Down Expand Up @@ -181,7 +183,6 @@ run_solo {defrag} {
r flushdb sync
r script flush sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
Expand All @@ -203,7 +204,7 @@ run_solo {defrag} {
$rd read ; # Discard script load replies
$rd read ; # Discard set replies
}
after 120 ;# serverCron only updates the info once in 100ms
after 1000 ;# give defrag some time to work
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -239,6 +240,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag time to work (might be multiple cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -266,7 +269,6 @@ run_solo {defrag} {
test "Active defrag big keys: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
Expand Down Expand Up @@ -361,6 +363,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -407,7 +411,6 @@ run_solo {defrag} {
test "Active defrag pubsub: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
Expand All @@ -430,7 +433,6 @@ run_solo {defrag} {
$rd read ; # Discard set replies
}

after 120 ;# serverCron only updates the info once in 100ms
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -466,6 +468,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand All @@ -475,6 +479,7 @@ run_solo {defrag} {
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
r config set activedefrag no ;# disable before we accidentally create more frag

# test the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
Expand Down Expand Up @@ -507,7 +512,6 @@ run_solo {defrag} {
test "Active defrag big list: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
Expand Down Expand Up @@ -561,6 +565,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -619,7 +625,6 @@ run_solo {defrag} {
start_server {tags {"defrag"} overrides {save ""}} {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
Expand Down Expand Up @@ -685,6 +690,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down
1 change: 0 additions & 1 deletion tests/unit/moduleapi/defrag.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ set testmodule [file normalize tests/modules/defragtest.so]

start_server {tags {"modules"} overrides {{save ""}}} {
r module load $testmodule 10000
r config set hz 100
r config set active-defrag-ignore-bytes 1
r config set active-defrag-threshold-lower 0
r config set active-defrag-cycle-min 99
Expand Down
Loading
Loading