Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlocks in cache #30461

Merged
merged 5 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 36 additions & 43 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,34 +206,33 @@ private static class CacheSegment<K, V> {
*/
Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = readLock.acquire()) {
future = map.get(key);
}
if (future != null) {
Entry<K, V> entry;
try {
entry = future.handle((ok, ex) -> {
if (ok != null && !isExpired.test(ok)) {
segmentStats.hit();
ok.accessTime = now;
return ok;
} else {
segmentStats.miss();
if (ok != null) {
assert isExpired.test(ok);
onExpiration.accept(ok);
}
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
entry = future.get();
} catch (ExecutionException e) {
assert future.isCompletedExceptionally();
segmentStats.miss();
return null;
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
else {
if (isExpired.test(entry)) {
segmentStats.miss();
onExpiration.accept(entry);
return null;
} else {
segmentStats.hit();
entry.accessTime = now;
return entry;
}
} else {
segmentStats.miss();
return null;
}
return entry;
}

/**
Expand Down Expand Up @@ -269,30 +268,18 @@ Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
/**
* remove an entry from the segment
*
* @param key the key of the entry to remove from the cache
* @return the removed entry if there was one, otherwise null
* @param key the key of the entry to remove from the cache
* @param onRemoval a callback for the removed entry
*/
Entry<K, V> remove(K key) {
void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = writeLock.acquire()) {
future = map.remove(key);
}
if (future != null) {
try {
entry = future.handle((ok, ex) -> {
if (ok != null) {
segmentStats.eviction();
return ok;
} else {
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
segmentStats.eviction();
onRemoval.accept(future);
}
return entry;
}

private static class SegmentStats {
Expand Down Expand Up @@ -476,12 +463,18 @@ private void put(K key, V value, long now) {
*/
public void invalidate(K key) {
CacheSegment<K, V> segment = getCacheSegment(key);
Entry<K, V> entry = segment.remove(key);
if (entry != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
segment.remove(key, f -> {
try {
Entry<K, V> entry = f.get();
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
}
} catch (ExecutionException e) {
// ok
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
});
}

/**
Expand Down Expand Up @@ -632,7 +625,7 @@ public void remove() {
Entry<K, V> entry = current;
if (entry != null) {
CacheSegment<K, V> segment = getCacheSegment(entry.key);
segment.remove(entry.key);
segment.remove(entry.key, f -> {});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this follow a different pattern than invalidate? as far as I can tell if we don't wait for the future to be completed, it may be re-inserted into the LRU by the future completion logic. I would also like to understand why this isn't a race condition even if you do complete the future (i.e., aren't we susceptible to race conditions in between the execution of the handler and the get() returning), which will cause the LRU to go out of sync (similar issue in put)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasontedor and I discussed this on another channel. The reason for a different execution paths on the call backs has to do with whether we already hold a reference to the relevant entry or not. I personally prefer to not have two paths here but not enough to request a change.

I would also like to understand why this isn't a race condition even if you do complete the future (i.e., aren't we susceptible to race conditions in between the execution of the handler and the get() returning), which will cause the LRU to go out of sync (similar issue in put)

This one is guarded against by the state in the entry. Deleting an entry also changes the state to deleted and thus it will not be re-added by the handler in computeIfAbsent. That said we found another issue there where delete doesn't mark the entry as deleted if it's in the new state. This will be dealt with in a followup.

try (ReleasableLock ignored = lruLock.acquire()) {
current = null;
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
Expand Down Expand Up @@ -717,7 +710,7 @@ private void evictEntry(Entry<K, V> entry) {

CacheSegment<K, V> segment = getCacheSegment(entry.key);
if (segment != null) {
segment.remove(entry.key);
segment.remove(entry.key, f -> {});
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ protected long now() {
assertEquals(numberOfEntries, cache.stats().getEvictions());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30428")
public void testComputeIfAbsentDeadlock() throws BrokenBarrierException, InterruptedException {
final int numberOfThreads = randomIntBetween(2, 32);
final Cache<Integer, String> cache =
Expand Down