Skip to content

Commit

Permalink
flush_all was not thread safe.
Browse files Browse the repository at this point in the history
Unfortunately if you disable CAS, all items set in the same second as a
flush_all will immediately expire. This is the old (2006ish) behavior.

However, if CAS is enabled (as is the default), it will still be more or less
exact.

The locking issue is that if the LRU lock is held, you may not be able to
modify an item if the item lock is also held. This means that some items may
not be flushed if locking is done correctly.

In the current code, it could lead to corruption as an item could be locked
and in use while the expunging is happening.
  • Loading branch information
dormando committed Jan 2, 2015
1 parent 69d1c69 commit 90593dc
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 64 deletions.
49 changes: 16 additions & 33 deletions items.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ uint64_t get_cas_id(void) {
return next_id;
}

static int is_flushed(item *it) {
rel_time_t oldest_live = settings.oldest_live;
uint64_t cas = ITEM_get_cas(it);
uint64_t oldest_cas = settings.oldest_cas;
if (oldest_live == 0 || oldest_live > current_time)
return 0;
if ((it->time <= oldest_live)
|| (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) {
return 1;
}
return 0;
}

/* Enable this for reference-count debugging. */
#if 0
# define DEBUG_REFCNT(it,op) \
Expand Down Expand Up @@ -117,7 +130,6 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,
item *search;
item *next_it;
void *hold_lock = NULL;
rel_time_t oldest_live = settings.oldest_live;

search = tails[id];
/* We walk up *only* for locked items. Never searching for expired.
Expand Down Expand Up @@ -165,7 +177,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,

/* Expired or flushed */
if ((search->exptime != 0 && search->exptime < current_time)
|| (search->time <= oldest_live && oldest_live <= current_time)) {
|| is_flushed(search)) {
itemstats[id].reclaimed++;
if ((search->it_flags & ITEM_FETCHED) == 0) {
itemstats[id].expired_unfetched++;
Expand Down Expand Up @@ -617,8 +629,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
}

if (it != NULL) {
if (settings.oldest_live != 0 && settings.oldest_live <= current_time &&
it->time <= settings.oldest_live) {
if (is_flushed(it)) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
Expand Down Expand Up @@ -653,33 +664,6 @@ item *do_item_touch(const char *key, size_t nkey, uint32_t exptime,
return it;
}

/* expires items that are more recent than the oldest_live setting. */
void do_item_flush_expired(void) {
int i;
item *iter, *next;
if (settings.oldest_live == 0)
return;
for (i = 0; i < LARGEST_ID; i++) {
/* The LRU is sorted in decreasing time order, and an item's timestamp
* is never newer than its last access time, so we only need to walk
* back until we hit an item older than the oldest_live time.
* The oldest_live checking will auto-expire the remaining items.
*/
for (iter = heads[i]; iter != NULL; iter = next) {
/* iter->time of 0 are magic objects. */
if (iter->time != 0 && iter->time >= settings.oldest_live) {
next = iter->next;
if ((iter->it_flags & ITEM_SLABBED) == 0) {
do_item_unlink_nolock(iter, hash(ITEM_key(iter), iter->nkey));
}
} else {
/* We've hit the first old item. Continue to the next queue. */
break;
}
}
}
}

static void crawler_link_q(item *it) { /* item is the new tail */
item **head, **tail;
assert(it->slabs_clsid < LARGEST_ID);
Expand Down Expand Up @@ -785,9 +769,8 @@ static item *crawler_crawl_q(item *it) {
* main thread's values too much. Should rethink again.
*/
static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
rel_time_t oldest_live = settings.oldest_live;
if ((search->exptime != 0 && search->exptime < current_time)
|| (search->time <= oldest_live && oldest_live <= current_time)) {
|| is_flushed(search)) {
itemstats[i].crawler_reclaimed++;

if (settings.verbose > 1) {
Expand Down
1 change: 0 additions & 1 deletion items.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ void do_item_stats(ADD_STAT add_stats, void *c);
void do_item_stats_totals(ADD_STAT add_stats, void *c);
/*@null@*/
void do_item_stats_sizes(ADD_STAT add_stats, void *c);
void do_item_flush_expired(void);

item *do_item_get(const char *key, const size_t nkey, const uint32_t hv);
item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv);
Expand Down
50 changes: 31 additions & 19 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ static void settings_init(void) {
settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
settings.verbose = 0;
settings.oldest_live = 0;
settings.oldest_cas = 0; /* supplements accuracy of oldest_live */
settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
settings.socketpath = NULL; /* by default, not using a unix socket */
settings.factor = 1.25;
Expand Down Expand Up @@ -2138,6 +2139,7 @@ static void process_bin_append_prepend(conn *c) {
static void process_bin_flush(conn *c) {
time_t exptime = 0;
protocol_binary_request_flush* req = binary_get_request(c);
rel_time_t new_oldest = 0;

if (!settings.flush_enabled) {
// flush_all is not allowed but we log it on stats
Expand All @@ -2150,11 +2152,17 @@ static void process_bin_flush(conn *c) {
}

if (exptime > 0) {
settings.oldest_live = realtime(exptime) - 1;
new_oldest = realtime(exptime);
} else {
settings.oldest_live = current_time - 1;
new_oldest = current_time;
}
if (settings.use_cas) {
settings.oldest_live = new_oldest - 1;
if (settings.oldest_live <= current_time)
settings.oldest_cas = get_cas_id();
} else {
settings.oldest_live = new_oldest;
}
item_flush_expired();

pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.flush_cmds++;
Expand Down Expand Up @@ -3469,6 +3477,7 @@ static void process_command(conn *c, char *command) {

} else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
time_t exptime = 0;
rel_time_t new_oldest = 0;

set_noreply_maybe(c, tokens, ntokens);

Expand All @@ -3482,17 +3491,12 @@ static void process_command(conn *c, char *command) {
return;
}

if(ntokens == (c->noreply ? 3 : 2)) {
settings.oldest_live = current_time - 1;
item_flush_expired();
out_string(c, "OK");
return;
}

exptime = strtol(tokens[1].value, NULL, 10);
if(errno == ERANGE) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
if (ntokens != (c->noreply ? 3 : 2)) {
exptime = strtol(tokens[1].value, NULL, 10);
if(errno == ERANGE) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
}

/*
Expand All @@ -3501,11 +3505,19 @@ static void process_command(conn *c, char *command) {
value. So we process exptime == 0 the same way we do when
no delay is given at all.
*/
if (exptime > 0)
settings.oldest_live = realtime(exptime) - 1;
else /* exptime == 0 */
settings.oldest_live = current_time - 1;
item_flush_expired();
if (exptime > 0) {
new_oldest = realtime(exptime);
} else { /* exptime == 0 */
new_oldest = current_time;
}

if (settings.use_cas) {
settings.oldest_live = new_oldest - 1;
if (settings.oldest_live <= current_time)
settings.oldest_cas = get_cas_id();
} else {
settings.oldest_live = new_oldest;
}
out_string(c, "OK");
return;

Expand Down
2 changes: 1 addition & 1 deletion memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ struct settings {
char *inter;
int verbose;
rel_time_t oldest_live; /* ignore existing items older than this */
uint64_t oldest_cas; /* ignore existing items with CAS values lower than this */
int evict_to_free;
char *socketpath; /* path to unix socket if using local socket */
int access; /* access mask (a la chmod) for unix domain socket */
Expand Down Expand Up @@ -565,7 +566,6 @@ bool conn_add_to_freelist(conn *c);
int is_listen_thread(void);
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
void item_flush_expired(void);
item *item_get(const char *key, const size_t nkey);
item *item_touch(const char *key, const size_t nkey, uint32_t exptime);
int item_link(item *it);
Expand Down
30 changes: 29 additions & 1 deletion t/flush-all.t
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/perl

use strict;
use Test::More tests => 25;
use Test::More tests => 26;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
Expand Down Expand Up @@ -67,3 +67,31 @@ print $sock "flush_all\r\n";
is(scalar <$sock>, "CLIENT_ERROR flush_all not allowed\r\n", "flush_all was not allowed");
mem_get_is($sock, "foo", "fooval2");

# Test that disabling CAS makes flush_all less accurate.
# Due to lock ordering issues we can no longer evict items newer than
# oldest_live, so we rely on the CAS counter for an exact cliff. So disabling
# CAS now means all items set in the same second will fail to set.
$server = new_memcached('-C');
$sock = $server->sock;

my $failed_nocas = 0;
# Running this 100,000 times failed the test a handful of times. 50 tries
# should be enough.
for (1..50) {
print $sock "flush_all 0\r\n";
my $foo = scalar <$sock>;
print $sock "set foo 0 0 3\r\nnew\r\n";
$foo = scalar <$sock>;
print $sock "get foo\r\n";
my $line = scalar <$sock>;
if ($line =~ /^VALUE/) {
$line = scalar <$sock>;
$line = scalar <$sock>;
print STDERR "Succeeded!!!\n";
next;
} elsif ($line =~ /^END/) {
$failed_nocas++;
next;
}
}
is($failed_nocas, 1, "failed to set value after flush with no CAS at least once");
9 changes: 0 additions & 9 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,15 +597,6 @@ enum store_item_type store_item(item *item, int comm, conn* c) {
return ret;
}

/*
* Flushes expired items after a flush_all call
*/
void item_flush_expired() {
mutex_lock(&cache_lock);
do_item_flush_expired();
mutex_unlock(&cache_lock);
}

/*
* Dumps part of the cache
*/
Expand Down

0 comments on commit 90593dc

Please sign in to comment.