Skip to content

Commit

Permalink
Avoid deadlocks in cache (#30461)
Browse files Browse the repository at this point in the history
This commit avoids deadlocks in the cache by removing dangerous places
where we try to take the LRU lock while completing a future. Instead, we
block for the future to complete, and then execute the handling code
under the LRU lock (for example, eviction).
  • Loading branch information
jasontedor committed May 9, 2018
1 parent f835fad commit 6f0267c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 45 deletions.
87 changes: 43 additions & 44 deletions core/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,34 +206,33 @@ private static class CacheSegment<K, V> {
*/
Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = readLock.acquire()) {
future = map.get(key);
}
if (future != null) {
Entry<K, V> entry;
try {
entry = future.handle((ok, ex) -> {
if (ok != null && !isExpired.test(ok)) {
segmentStats.hit();
ok.accessTime = now;
return ok;
} else {
segmentStats.miss();
if (ok != null) {
assert isExpired.test(ok);
onExpiration.accept(ok);
}
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
entry = future.get();
} catch (ExecutionException e) {
assert future.isCompletedExceptionally();
segmentStats.miss();
return null;
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
else {
if (isExpired.test(entry)) {
segmentStats.miss();
onExpiration.accept(entry);
return null;
} else {
segmentStats.hit();
entry.accessTime = now;
return entry;
}
} else {
segmentStats.miss();
return null;
}
return entry;
}

/**
Expand Down Expand Up @@ -269,30 +268,18 @@ Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
/**
* remove an entry from the segment
*
* @param key the key of the entry to remove from the cache
* @return the removed entry if there was one, otherwise null
* @param key the key of the entry to remove from the cache
* @param onRemoval a callback for the removed entry
*/
Entry<K, V> remove(K key) {
void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = writeLock.acquire()) {
future = map.remove(key);
}
if (future != null) {
try {
entry = future.handle((ok, ex) -> {
if (ok != null) {
segmentStats.eviction();
return ok;
} else {
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
segmentStats.eviction();
onRemoval.accept(future);
}
return entry;
}

private static class SegmentStats {
Expand Down Expand Up @@ -476,12 +463,18 @@ private void put(K key, V value, long now) {
*/
public void invalidate(K key) {
CacheSegment<K, V> segment = getCacheSegment(key);
Entry<K, V> entry = segment.remove(key);
if (entry != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
segment.remove(key, f -> {
try {
Entry<K, V> entry = f.get();
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
}
} catch (ExecutionException e) {
// ok
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
});
}

/**
Expand Down Expand Up @@ -580,7 +573,8 @@ public void remove() {

/**
* An LRU sequencing of the values in the cache. This sequence is not protected from mutations
* to the cache. The result of iteration under mutation is undefined.
* to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is
* undefined.
*
* @return an LRU-ordered {@link Iterable} over the values in the cache
*/
Expand All @@ -597,6 +591,11 @@ public boolean hasNext() {
public V next() {
return iterator.next().value;
}

@Override
public void remove() {
iterator.remove();
}
};
}

Expand Down Expand Up @@ -626,7 +625,7 @@ public void remove() {
Entry<K, V> entry = current;
if (entry != null) {
CacheSegment<K, V> segment = getCacheSegment(entry.key);
segment.remove(entry.key);
segment.remove(entry.key, f -> {});
try (ReleasableLock ignored = lruLock.acquire()) {
current = null;
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
Expand Down Expand Up @@ -711,7 +710,7 @@ private void evictEntry(Entry<K, V> entry) {

CacheSegment<K, V> segment = getCacheSegment(entry.key);
if (segment != null) {
segment.remove(entry.key);
segment.remove(entry.key, f -> {});
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ protected long now() {
assertEquals(numberOfEntries, cache.stats().getEvictions());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30428")
public void testComputeIfAbsentDeadlock() throws BrokenBarrierException, InterruptedException {
final int numberOfThreads = randomIntBetween(2, 32);
final Cache<Integer, String> cache =
Expand Down

0 comments on commit 6f0267c

Please sign in to comment.