From e92d189a952e3e9561d262b8f4ecc0c47a180447 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Wed, 4 Oct 2023 14:13:56 +0100 Subject: [PATCH] Move eviction handling directly into internal cache impl --- .../shared/SharedBlobCacheService.java | 99 ++++++------------- 1 file changed, 32 insertions(+), 67 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index daefc5b884ec6..95aab30ed3ddf 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -54,7 +54,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; -import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -400,50 +399,8 @@ private int getRegionSize(long fileLength, int region) { } CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { - final RegionKey regionKey = new RegionKey<>(cacheKey, region); - // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path if we did not - // find an entry - var entry = cache.get(regionKey); - if (entry == null) { - final int effectiveRegionSize = getRegionSize(fileLength, region); - entry = cache.computeIfAbsent(regionKey, key -> new CacheFileRegion(key, effectiveRegionSize)); - } - // io is volatile, double locking is fine, as long as we assign it last. - if (entry.io == null) { - synchronized (entry) { - if (entry.io == null) { - return initChunk(entry); - } - } - } - return entry; - } - - private CacheFileRegion initChunk(CacheFileRegion entry) { - assert Thread.holdsLock(entry); - RegionKey regionKey = entry.regionKey; - if (cache.get(regionKey) != entry) { - throwAlreadyClosed("no free region found (contender)"); - } - final SharedBytes.IO freeSlot = freeRegions.poll(); - if (freeSlot != null) { - // no need to evict an item, just add - assignToSlot(entry, freeSlot); - } else { - // need to evict something - cache.evict(); - final SharedBytes.IO freeSlotRetry = freeRegions.poll(); - if (freeSlotRetry != null) { - assignToSlot(entry, freeSlotRetry); - } else { - synchronized (this) { - cache.evictMatching(r -> r == regionKey); - } - throwAlreadyClosed("no free region found"); - } - } - - return entry; + final RegionKey regionKey = new RegionKey<>(cacheKey, fileLength, region); + return cache.getAndEvict(regionKey); } private void assignToSlot(CacheFileRegion entry, SharedBytes.IO freeSlot) { @@ -571,7 +528,7 @@ public void close() { cache.close(); } - private record RegionKey(KeyType file, int region) { + private record RegionKey(KeyType file, long length, int region) { @Override public String toString() { return "Chunk{" + "file=" + file + ", region=" + region + '}'; @@ -814,7 +771,7 @@ public boolean tryRead(ByteBuffer buf, long offset) throws IOException { var fileRegion = lastAccessedRegion; if (fileRegion != null && fileRegion.regionKey.region == startRegion) { // existing item, get it to update cache usage - cache.get(fileRegion.regionKey); + cache.getAndEvict(fileRegion.regionKey); } else { fileRegion = get(cacheKey, length, startRegion); } @@ -981,11 +938,7 @@ public record Stats( } private interface Cache { - V get(K key); - - V computeIfAbsent(K key, Function computeFunction); - - void evict(); + V getAndEvict(K key); int evictMatching(Predicate predicate); @@ -1029,33 +982,45 @@ public void close() { } @Override - public CacheFileRegion get(RegionKey key) { + public CacheFileRegion getAndEvict(RegionKey key) { Entry entry = keyMapping.get(key); + final long now = threadPool.relativeTimeInMillis(); if (entry != null) { - final long now = threadPool.relativeTimeInMillis(); // existing item, check if we need to promote item if (now - entry.lastAccessed >= minTimeDelta) { maybePromote(now, entry); } return entry.chunk; } - return null; - } - - @Override - public CacheFileRegion computeIfAbsent(RegionKey key, Function, CacheFileRegion> computeFunction) { - final long now = threadPool.relativeTimeInMillis(); + SharedBytes.IO freePage = freeRegions.poll(); + if (freePage == null) { + // no free regions, so evict something + evict(); + freePage = freeRegions.poll(); + if (freePage == null) { + throwAlreadyClosed("No free regions"); + } + } + var pageToUse = freePage; synchronized (SharedBlobCacheService.this) { - return keyMapping.computeIfAbsent(key, k -> { - var entry = new Entry<>(computeFunction.apply(k), now); - pushEntryToBack(entry); - return entry; - }).chunk; + entry = keyMapping.computeIfAbsent(key, k -> { + var size = getRegionSize(k.length, k.region); + var e = new Entry<>(new CacheFileRegion(key, size), now); + pushEntryToBack(e); + e.chunk.io = pageToUse; + return e; + }); + if (entry.chunk.io != pageToUse) { + // another thread got there first, return the page to the pool + freeRegions.add(pageToUse); + } else { + assert regionOwners.put(entry.chunk.io, entry.chunk) == null; + } } + return entry.chunk; } - @Override - public void evict() { + private void evict() { synchronized (SharedBlobCacheService.this) { for (int i = 0; i < maxFreq; i++) { for (Entry entry = freqs[i]; entry != null; entry = entry.next) {