From 1c511ae1eb371bc0003b285f2d2e1578873d53d8 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Mon, 9 Oct 2023 13:01:31 +0100 Subject: [PATCH] Internal cache impl for SharedBlobCacheService --- .../shared/SharedBlobCacheService.java | 627 ++++++++++-------- .../shared/SharedBlobCacheServiceTests.java | 28 +- 2 files changed, 353 insertions(+), 302 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index d7cbddf490df5..4c4752afa0211 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -41,6 +41,7 @@ import java.io.UncheckedIOException; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; +import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -250,9 +251,24 @@ public void validate(ByteSizeValue value, Map, Object> settings, bool Setting.Property.NodeScope ); + private interface Cache extends Releasable { + CacheEntry get(K cacheKey, long fileLength, int region); + + int forceEvict(Predicate cacheKeyPredicate); + } + + private abstract static class CacheEntry { + final T chunk; + + private CacheEntry(T chunk) { + this.chunk = chunk; + } + + abstract void touch(); + } + private static final Logger logger = LogManager.getLogger(SharedBlobCacheService.class); - private final ConcurrentHashMap, Entry> keyMapping; private final ThreadPool threadPool; // executor to run reading from the blobstore on @@ -269,13 +285,10 @@ public void validate(ByteSizeValue value, Map, Object> settings, bool private final int numRegions; private final ConcurrentLinkedQueue freeRegions = new ConcurrentLinkedQueue<>(); - private final Entry[] freqs; - private final int maxFreq; - private final long minTimeDelta; - private final ConcurrentHashMap regionOwners; // to assert exclusive access of regions + private final Cache cache; - private final CacheDecayTask decayTask; + private final ConcurrentHashMap regionOwners; // to assert exclusive access of regions private final LongAdder writeCount = new LongAdder(); private final LongAdder writeBytes = new LongAdder(); @@ -289,8 +302,6 @@ public SharedBlobCacheService(NodeEnvironment environment, Settings settings, Th this(environment, settings, threadPool, ioExecutor, ioExecutor); } - // gradlew requires 'rawtypes' even if IntelliJ doesn't - @SuppressWarnings({ "unchecked", "rawtypes" }) public SharedBlobCacheService( NodeEnvironment environment, Settings settings, @@ -310,7 +321,6 @@ public SharedBlobCacheService( this.cacheSize = calculateCacheSize(settings, totalFsSize); final int regionSize = Math.toIntExact(SHARED_CACHE_REGION_SIZE_SETTING.get(settings).getBytes()); this.numRegions = Math.toIntExact(cacheSize / regionSize); - keyMapping = new ConcurrentHashMap<>(); if (Assertions.ENABLED) { regionOwners = new ConcurrentHashMap<>(); } else { @@ -318,9 +328,7 @@ public SharedBlobCacheService( } this.regionSize = regionSize; assert regionSize > 0L; - this.maxFreq = SHARED_CACHE_MAX_FREQ_SETTING.get(settings); - this.minTimeDelta = SHARED_CACHE_MIN_TIME_DELTA_SETTING.get(settings).millis(); - freqs = new Entry[maxFreq]; + this.cache = new LFUCache(settings); try { sharedBytes = new SharedBytes( numRegions, @@ -336,8 +344,7 @@ public SharedBlobCacheService( for (int i = 0; i < numRegions; i++) { freeRegions.add(sharedBytes.getFileChannel(i)); } - decayTask = new CacheDecayTask(threadPool, threadPool.generic(), SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings)); - decayTask.rescheduleIfNecessary(); + this.rangeSize = SHARED_CACHE_RANGE_SIZE_SETTING.get(settings); this.recoveryRangeSize = SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings); } @@ -408,32 +415,8 @@ private int getRegionSize(long fileLength, int region) { return effectiveRegionSize; } - Entry get(KeyType cacheKey, long fileLength, int region) { - final RegionKey regionKey = new RegionKey<>(cacheKey, region); - final long now = threadPool.relativeTimeInMillis(); - // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path if we did not - // find an entry - var entry = keyMapping.get(regionKey); - if (entry == null) { - final int effectiveRegionSize = getRegionSize(fileLength, region); - entry = keyMapping.computeIfAbsent(regionKey, key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)); - } - // io is volatile, double locking is fine, as long as we assign it last. - if (entry.chunk.io == null) { - synchronized (entry.chunk) { - if (entry.chunk.io == null) { - return initChunk(entry); - } - } - } - assert assertChunkActiveOrEvicted(entry); - - // existing item, check if we need to promote item - if (now - entry.lastAccessed >= minTimeDelta) { - maybePromote(now, entry); - } - - return entry; + CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { + return cache.get(cacheKey, fileLength, region).chunk; } /** @@ -471,7 +454,7 @@ public boolean maybeFetchFullEntry(KeyType cacheKey, long length, RangeMissingHa return true; } final ActionListener regionListener = refCountingListener.acquire(ignored -> {}); - final SharedBlobCacheService.Entry entry; + final CacheFileRegion entry; try { entry = get(cacheKey, length, region); } catch (AlreadyClosedException e) { @@ -480,7 +463,7 @@ public boolean maybeFetchFullEntry(KeyType cacheKey, long length, RangeMissingHa return false; } // set read range == write range so the listener completes only once all the bytes have been downloaded - entry.chunk.populateAndRead( + entry.populateAndRead( rangeToWrite, rangeToWrite, (channel, pos, relativePos, len) -> Math.toIntExact(len), @@ -499,78 +482,10 @@ public boolean maybeFetchFullEntry(KeyType cacheKey, long length, RangeMissingHa return true; } - private Entry initChunk(Entry entry) { - assert Thread.holdsLock(entry.chunk); - RegionKey regionKey = entry.chunk.regionKey; - if (keyMapping.get(regionKey) != entry) { - throwAlreadyClosed("no free region found (contender)"); - } - // new item - assert entry.freq == 0; - assert entry.prev == null; - assert entry.next == null; - final SharedBytes.IO freeSlot = freeRegions.poll(); - if (freeSlot != null) { - // no need to evict an item, just add - assignToSlot(entry, freeSlot); - } else { - // need to evict something - synchronized (this) { - maybeEvict(); - } - final SharedBytes.IO freeSlotRetry = freeRegions.poll(); - if (freeSlotRetry != null) { - assignToSlot(entry, freeSlotRetry); - } else { - boolean removed = keyMapping.remove(regionKey, entry); - assert removed; - throwAlreadyClosed("no free region found"); - } - } - - return entry; - } - - private void maybePromote(long now, Entry entry) { - synchronized (this) { - if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) { - unlink(entry); - entry.freq++; - entry.lastAccessed = now; - pushEntryToBack(entry); - } - } - } - - private void assignToSlot(Entry entry, SharedBytes.IO freeSlot) { - assert regionOwners.put(freeSlot, entry.chunk) == null; - synchronized (this) { - if (entry.chunk.isEvicted()) { - assert regionOwners.remove(freeSlot) == entry.chunk; - freeRegions.add(freeSlot); - keyMapping.remove(entry.chunk.regionKey, entry); - throwAlreadyClosed("evicted during free region allocation"); - } - pushEntryToBack(entry); - // assign io only when chunk is ready for use. Under lock to avoid concurrent tryEvict. - entry.chunk.io = freeSlot; - } - } - private static void throwAlreadyClosed(String message) { throw new AlreadyClosedException(message); } - private boolean assertChunkActiveOrEvicted(Entry entry) { - synchronized (this) { - // assert linked (or evicted) - assert entry.prev != null || entry.chunk.isEvicted(); - - } - assert regionOwners.get(entry.chunk.io) == entry.chunk || entry.chunk.isEvicted(); - return true; - } - // used by tests int freeRegionCount() { return freeRegions.size(); @@ -589,116 +504,6 @@ public Stats getStats() { ); } - private synchronized boolean invariant(final Entry e, boolean present) { - boolean found = false; - for (int i = 0; i < maxFreq; i++) { - assert freqs[i] == null || freqs[i].prev != null; - assert freqs[i] == null || freqs[i].prev != freqs[i] || freqs[i].next == null; - assert freqs[i] == null || freqs[i].prev.next == null; - for (Entry entry = freqs[i]; entry != null; entry = entry.next) { - assert entry.next == null || entry.next.prev == entry; - assert entry.prev != null; - assert entry.prev.next == null || entry.prev.next == entry; - assert entry.freq == i; - if (entry == e) { - found = true; - } - } - for (Entry entry = freqs[i]; entry != null && entry.prev != freqs[i]; entry = entry.prev) { - assert entry.next == null || entry.next.prev == entry; - assert entry.prev != null; - assert entry.prev.next == null || entry.prev.next == entry; - assert entry.freq == i; - if (entry == e) { - found = true; - } - } - } - assert found == present; - return true; - } - - private void maybeEvict() { - assert Thread.holdsLock(this); - for (int i = 0; i < maxFreq; i++) { - for (Entry entry = freqs[i]; entry != null; entry = entry.next) { - boolean evicted = entry.chunk.tryEvict(); - if (evicted && entry.chunk.io != null) { - unlink(entry); - keyMapping.remove(entry.chunk.regionKey, entry); - return; - } - } - } - } - - private void pushEntryToBack(final Entry entry) { - assert Thread.holdsLock(this); - assert invariant(entry, false); - assert entry.prev == null; - assert entry.next == null; - final Entry currFront = freqs[entry.freq]; - if (currFront == null) { - freqs[entry.freq] = entry; - entry.prev = entry; - entry.next = null; - } else { - assert currFront.freq == entry.freq; - final Entry last = currFront.prev; - currFront.prev = entry; - last.next = entry; - entry.prev = last; - entry.next = null; - } - assert freqs[entry.freq].prev == entry; - assert freqs[entry.freq].prev.next == null; - assert entry.prev != null; - assert entry.prev.next == null || entry.prev.next == entry; - assert entry.next == null; - assert invariant(entry, true); - } - - private void unlink(final Entry entry) { - assert Thread.holdsLock(this); - assert invariant(entry, true); - assert entry.prev != null; - final Entry currFront = freqs[entry.freq]; - assert currFront != null; - if (currFront == entry) { - freqs[entry.freq] = entry.next; - if (entry.next != null) { - assert entry.prev != entry; - entry.next.prev = entry.prev; - } - } else { - if (entry.next != null) { - entry.next.prev = entry.prev; - } - entry.prev.next = entry.next; - if (currFront.prev == entry) { - currFront.prev = entry.prev; - } - } - entry.next = null; - entry.prev = null; - assert invariant(entry, false); - } - - private void computeDecay() { - synchronized (this) { - long now = threadPool.relativeTimeInMillis(); - for (int i = 0; i < maxFreq; i++) { - for (Entry entry = freqs[i]; entry != null; entry = entry.next) { - if (entry.freq > 0 && now - entry.lastAccessed >= 2 * minTimeDelta) { - unlink(entry); - entry.freq--; - pushEntryToBack(entry); - } - } - } - } - } - public void removeFromCache(KeyType cacheKey) { forceEvict(cacheKey::equals); } @@ -710,59 +515,21 @@ public void removeFromCache(KeyType cacheKey) { * @return The number of entries evicted from the keyMapping. */ public int forceEvict(Predicate cacheKeyPredicate) { - final List> matchingEntries = new ArrayList<>(); - keyMapping.forEach((key, value) -> { - if (cacheKeyPredicate.test(key.file)) { - matchingEntries.add(value); - } - }); - var evictedCount = 0; - if (matchingEntries.isEmpty() == false) { - synchronized (this) { - for (Entry entry : matchingEntries) { - boolean evicted = entry.chunk.forceEvict(); - if (evicted && entry.chunk.io != null) { - unlink(entry); - keyMapping.remove(entry.chunk.regionKey, entry); - evictedCount++; - } - } - } - } - return evictedCount; + return cache.forceEvict(cacheKeyPredicate); + } // used by tests int getFreq(CacheFileRegion cacheFileRegion) { - return keyMapping.get(cacheFileRegion.regionKey).freq; + if (cache instanceof LFUCache lfuCache) { + return lfuCache.getFreq(cacheFileRegion); + } + return -1; } @Override public void close() { sharedBytes.decRef(); - decayTask.close(); - } - - class CacheDecayTask extends AbstractAsyncTask { - - CacheDecayTask(ThreadPool threadPool, Executor executor, TimeValue interval) { - super(logger, Objects.requireNonNull(threadPool), executor, Objects.requireNonNull(interval), true); - } - - @Override - protected boolean mustReschedule() { - return true; - } - - @Override - public void runInternal() { - computeDecay(); - } - - @Override - public String toString() { - return "shared_cache_decay_task"; - } } private record RegionKey(KeyType file, int region) { @@ -772,19 +539,6 @@ public String toString() { } } - static class Entry { - final T chunk; - Entry prev; - Entry next; - int freq; - volatile long lastAccessed; - - Entry(T chunk, long lastAccessed) { - this.chunk = chunk; - this.lastAccessed = lastAccessed; - } - } - /** * This class models a reference counted object that also tracks a flag for eviction of an instance. * It is only inherited by CacheFileRegion to enable the use of a static var handle in on a non-static inner class. @@ -992,7 +746,7 @@ public class CacheFile { private final KeyType cacheKey; private final long length; - private Entry lastAccessedRegion; + private CacheEntry lastAccessedRegion; private CacheFile(KeyType cacheKey, long length) { this.cacheKey = cacheKey; @@ -1021,18 +775,16 @@ public boolean tryRead(ByteBuffer buf, long offset) throws IOException { var fileRegion = lastAccessedRegion; if (fileRegion != null && fileRegion.chunk.regionKey.region == startRegion) { // existing item, check if we need to promote item - long now = threadPool.relativeTimeInMillis(); - if (now - fileRegion.lastAccessed >= minTimeDelta) { - maybePromote(now, fileRegion); - } + fileRegion.touch(); + } else { - fileRegion = get(cacheKey, length, startRegion); + fileRegion = cache.get(cacheKey, length, startRegion); } - final var region = fileRegion.chunk; - if (region.tracker.checkAvailable(end - getRegionStart(startRegion)) == false) { + final var region = fileRegion; + if (region.chunk.tracker.checkAvailable(end - getRegionStart(startRegion)) == false) { return false; } - boolean res = region.tryRead(buf, offset); + boolean res = region.chunk.tryRead(buf, offset); lastAccessedRegion = res ? fileRegion : null; return res; } @@ -1063,7 +815,7 @@ private int readSingleRegion( int region ) throws InterruptedException, ExecutionException { final PlainActionFuture readFuture = PlainActionFuture.newFuture(); - final CacheFileRegion fileRegion = get(cacheKey, length, region).chunk; + final CacheFileRegion fileRegion = get(cacheKey, length, region); final long regionStart = getRegionStart(region); fileRegion.populateAndRead( mapSubRangeToRegion(rangeToWrite, region), @@ -1093,7 +845,7 @@ private int readMultiRegions( // nothing to read, skip continue; } - final CacheFileRegion fileRegion = get(cacheKey, length, region).chunk; + final CacheFileRegion fileRegion = get(cacheKey, length, region); final long regionStart = getRegionStart(region); fileRegion.populateAndRead( mapSubRangeToRegion(rangeToWrite, region), @@ -1189,4 +941,303 @@ public record Stats( ) { public static final Stats EMPTY = new Stats(0, 0L, 0L, 0L, 0L, 0L, 0L, 0L); } + + private class LFUCache implements Cache { + + class LFUCacheEntry extends CacheEntry { + LFUCacheEntry prev; + LFUCacheEntry next; + int freq; + volatile long lastAccessed; + + LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) { + super(chunk); + this.lastAccessed = lastAccessed; + } + + void touch() { + long now = threadPool.relativeTimeInMillis(); + if (now - lastAccessed >= minTimeDelta) { + maybePromote(now, this); + } + } + } + + private final ConcurrentHashMap, LFUCacheEntry> keyMapping = new ConcurrentHashMap<>(); + private final LFUCacheEntry[] freqs; + private final int maxFreq; + private final long minTimeDelta; + private final CacheDecayTask decayTask; + + @SuppressWarnings("unchecked") + LFUCache(Settings settings) { + this.maxFreq = SHARED_CACHE_MAX_FREQ_SETTING.get(settings); + this.minTimeDelta = SHARED_CACHE_MIN_TIME_DELTA_SETTING.get(settings).millis(); + freqs = (LFUCacheEntry[]) Array.newInstance(LFUCacheEntry.class, maxFreq); + decayTask = new CacheDecayTask(threadPool, threadPool.generic(), SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings)); + decayTask.rescheduleIfNecessary(); + } + + @Override + public void close() { + decayTask.close(); + } + + int getFreq(CacheFileRegion cacheFileRegion) { + return keyMapping.get(cacheFileRegion.regionKey).freq; + } + + @Override + public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { + final RegionKey regionKey = new RegionKey<>(cacheKey, region); + final long now = threadPool.relativeTimeInMillis(); + // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path + // if we did not find an entry + var entry = keyMapping.get(regionKey); + if (entry == null) { + final int effectiveRegionSize = getRegionSize(fileLength, region); + entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now)); + } + // io is volatile, double locking is fine, as long as we assign it last. + if (entry.chunk.io == null) { + synchronized (entry.chunk) { + if (entry.chunk.io == null) { + return initChunk(entry); + } + } + } + assert assertChunkActiveOrEvicted(entry); + + // existing item, check if we need to promote item + if (now - entry.lastAccessed >= minTimeDelta) { + maybePromote(now, entry); + } + + return entry; + } + + @Override + public int forceEvict(Predicate cacheKeyPredicate) { + final List matchingEntries = new ArrayList<>(); + keyMapping.forEach((key, value) -> { + if (cacheKeyPredicate.test(key.file)) { + matchingEntries.add(value); + } + }); + var evictedCount = 0; + if (matchingEntries.isEmpty() == false) { + synchronized (SharedBlobCacheService.this) { + for (LFUCacheEntry entry : matchingEntries) { + boolean evicted = entry.chunk.forceEvict(); + if (evicted && entry.chunk.io != null) { + unlink(entry); + keyMapping.remove(entry.chunk.regionKey, entry); + evictedCount++; + } + } + } + } + return evictedCount; + } + + private LFUCacheEntry initChunk(LFUCacheEntry entry) { + assert Thread.holdsLock(entry.chunk); + RegionKey regionKey = entry.chunk.regionKey; + if (keyMapping.get(regionKey) != entry) { + throwAlreadyClosed("no free region found (contender)"); + } + // new item + assert entry.freq == 0; + assert entry.prev == null; + assert entry.next == null; + final SharedBytes.IO freeSlot = freeRegions.poll(); + if (freeSlot != null) { + // no need to evict an item, just add + assignToSlot(entry, freeSlot); + } else { + // need to evict something + synchronized (SharedBlobCacheService.this) { + maybeEvict(); + } + final SharedBytes.IO freeSlotRetry = freeRegions.poll(); + if (freeSlotRetry != null) { + assignToSlot(entry, freeSlotRetry); + } else { + boolean removed = keyMapping.remove(regionKey, entry); + assert removed; + throwAlreadyClosed("no free region found"); + } + } + + return entry; + } + + private void assignToSlot(LFUCacheEntry entry, SharedBytes.IO freeSlot) { + assert regionOwners.put(freeSlot, entry.chunk) == null; + synchronized (SharedBlobCacheService.this) { + if (entry.chunk.isEvicted()) { + assert regionOwners.remove(freeSlot) == entry.chunk; + freeRegions.add(freeSlot); + keyMapping.remove(entry.chunk.regionKey, entry); + throwAlreadyClosed("evicted during free region allocation"); + } + pushEntryToBack(entry); + // assign io only when chunk is ready for use. Under lock to avoid concurrent tryEvict. + entry.chunk.io = freeSlot; + } + } + + private void pushEntryToBack(final LFUCacheEntry entry) { + assert Thread.holdsLock(SharedBlobCacheService.this); + assert invariant(entry, false); + assert entry.prev == null; + assert entry.next == null; + final LFUCacheEntry currFront = freqs[entry.freq]; + if (currFront == null) { + freqs[entry.freq] = entry; + entry.prev = entry; + entry.next = null; + } else { + assert currFront.freq == entry.freq; + final LFUCacheEntry last = currFront.prev; + currFront.prev = entry; + last.next = entry; + entry.prev = last; + entry.next = null; + } + assert freqs[entry.freq].prev == entry; + assert freqs[entry.freq].prev.next == null; + assert entry.prev != null; + assert entry.prev.next == null || entry.prev.next == entry; + assert entry.next == null; + assert invariant(entry, true); + } + + private synchronized boolean invariant(final LFUCacheEntry e, boolean present) { + boolean found = false; + for (int i = 0; i < maxFreq; i++) { + assert freqs[i] == null || freqs[i].prev != null; + assert freqs[i] == null || freqs[i].prev != freqs[i] || freqs[i].next == null; + assert freqs[i] == null || freqs[i].prev.next == null; + for (LFUCacheEntry entry = freqs[i]; entry != null; entry = entry.next) { + assert entry.next == null || entry.next.prev == entry; + assert entry.prev != null; + assert entry.prev.next == null || entry.prev.next == entry; + assert entry.freq == i; + if (entry == e) { + found = true; + } + } + for (LFUCacheEntry entry = freqs[i]; entry != null && entry.prev != freqs[i]; entry = entry.prev) { + assert entry.next == null || entry.next.prev == entry; + assert entry.prev != null; + assert entry.prev.next == null || entry.prev.next == entry; + assert entry.freq == i; + if (entry == e) { + found = true; + } + } + } + assert found == present; + return true; + } + + private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) { + synchronized (SharedBlobCacheService.this) { + // assert linked (or evicted) + assert entry.prev != null || entry.chunk.isEvicted(); + + } + assert regionOwners.get(entry.chunk.io) == entry.chunk || entry.chunk.isEvicted(); + return true; + } + + private void maybePromote(long now, LFUCacheEntry entry) { + synchronized (SharedBlobCacheService.this) { + if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) { + unlink(entry); + entry.freq++; + entry.lastAccessed = now; + pushEntryToBack(entry); + } + } + } + + private void unlink(final LFUCacheEntry entry) { + assert Thread.holdsLock(SharedBlobCacheService.this); + assert invariant(entry, true); + assert entry.prev != null; + final LFUCacheEntry currFront = freqs[entry.freq]; + assert currFront != null; + if (currFront == entry) { + freqs[entry.freq] = entry.next; + if (entry.next != null) { + assert entry.prev != entry; + entry.next.prev = entry.prev; + } + } else { + if (entry.next != null) { + entry.next.prev = entry.prev; + } + entry.prev.next = entry.next; + if (currFront.prev == entry) { + currFront.prev = entry.prev; + } + } + entry.next = null; + entry.prev = null; + assert invariant(entry, false); + } + + private void maybeEvict() { + assert Thread.holdsLock(SharedBlobCacheService.this); + for (int i = 0; i < maxFreq; i++) { + for (LFUCacheEntry entry = freqs[i]; entry != null; entry = entry.next) { + boolean evicted = entry.chunk.tryEvict(); + if (evicted && entry.chunk.io != null) { + unlink(entry); + keyMapping.remove(entry.chunk.regionKey, entry); + return; + } + } + } + } + + private void computeDecay() { + synchronized (SharedBlobCacheService.this) { + long now = threadPool.relativeTimeInMillis(); + for (int i = 0; i < maxFreq; i++) { + for (LFUCacheEntry entry = freqs[i]; entry != null; entry = entry.next) { + if (entry.freq > 0 && now - entry.lastAccessed >= 2 * minTimeDelta) { + unlink(entry); + entry.freq--; + pushEntryToBack(entry); + } + } + } + } + } + + class CacheDecayTask extends AbstractAsyncTask { + + CacheDecayTask(ThreadPool threadPool, Executor executor, TimeValue interval) { + super(logger, Objects.requireNonNull(threadPool), executor, Objects.requireNonNull(interval), true); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + public void runInternal() { + computeDecay(); + } + + @Override + public String toString() { + return "shared_cache_decay_task"; + } + } + } } diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index 181b8d10ec863..300f9f1d8efa9 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -70,13 +70,13 @@ public void testBasicEviction() throws IOException { ) { final var cacheKey = generateCacheKey(); assertEquals(5, cacheService.freeRegionCount()); - final var region0 = cacheService.get(cacheKey, size(250), 0).chunk; + final var region0 = cacheService.get(cacheKey, size(250), 0); assertEquals(size(100), region0.tracker.getLength()); assertEquals(4, cacheService.freeRegionCount()); - final var region1 = cacheService.get(cacheKey, size(250), 1).chunk; + final var region1 = cacheService.get(cacheKey, size(250), 1); assertEquals(size(100), region1.tracker.getLength()); assertEquals(3, cacheService.freeRegionCount()); - final var region2 = cacheService.get(cacheKey, size(250), 2).chunk; + final var region2 = cacheService.get(cacheKey, size(250), 2); assertEquals(size(50), region2.tracker.getLength()); assertEquals(2, cacheService.freeRegionCount()); @@ -130,17 +130,17 @@ public void testAutoEviction() throws IOException { ) { final var cacheKey = generateCacheKey(); assertEquals(2, cacheService.freeRegionCount()); - final var region0 = cacheService.get(cacheKey, size(250), 0).chunk; + final var region0 = cacheService.get(cacheKey, size(250), 0); assertEquals(size(100), region0.tracker.getLength()); assertEquals(1, cacheService.freeRegionCount()); - final var region1 = cacheService.get(cacheKey, size(250), 1).chunk; + final var region1 = cacheService.get(cacheKey, size(250), 1); assertEquals(size(100), region1.tracker.getLength()); assertEquals(0, cacheService.freeRegionCount()); assertFalse(region0.isEvicted()); assertFalse(region1.isEvicted()); // acquire region 2, which should evict region 0 (oldest) - final var region2 = cacheService.get(cacheKey, size(250), 2).chunk; + final var region2 = cacheService.get(cacheKey, size(250), 2); assertEquals(size(50), region2.tracker.getLength()); assertEquals(0, cacheService.freeRegionCount()); assertTrue(region0.isEvicted()); @@ -169,9 +169,9 @@ public void testForceEviction() throws IOException { final var cacheKey1 = generateCacheKey(); final var cacheKey2 = generateCacheKey(); assertEquals(5, cacheService.freeRegionCount()); - final var region0 = cacheService.get(cacheKey1, size(250), 0).chunk; + final var region0 = cacheService.get(cacheKey1, size(250), 0); assertEquals(4, cacheService.freeRegionCount()); - final var region1 = cacheService.get(cacheKey2, size(250), 1).chunk; + final var region1 = cacheService.get(cacheKey2, size(250), 1); assertEquals(3, cacheService.freeRegionCount()); assertFalse(region0.isEvicted()); assertFalse(region1.isEvicted()); @@ -197,9 +197,9 @@ public void testForceEvictResponse() throws IOException { final var cacheKey1 = generateCacheKey(); final var cacheKey2 = generateCacheKey(); assertEquals(5, cacheService.freeRegionCount()); - final var region0 = cacheService.get(cacheKey1, size(250), 0).chunk; + final var region0 = cacheService.get(cacheKey1, size(250), 0); assertEquals(4, cacheService.freeRegionCount()); - final var region1 = cacheService.get(cacheKey2, size(250), 1).chunk; + final var region1 = cacheService.get(cacheKey2, size(250), 1); assertEquals(3, cacheService.freeRegionCount()); assertFalse(region0.isEvicted()); assertFalse(region1.isEvicted()); @@ -224,9 +224,9 @@ public void testDecay() throws IOException { final var cacheKey1 = generateCacheKey(); final var cacheKey2 = generateCacheKey(); assertEquals(5, cacheService.freeRegionCount()); - final var region0 = cacheService.get(cacheKey1, size(250), 0).chunk; + final var region0 = cacheService.get(cacheKey1, size(250), 0); assertEquals(4, cacheService.freeRegionCount()); - final var region1 = cacheService.get(cacheKey2, size(250), 1).chunk; + final var region1 = cacheService.get(cacheKey2, size(250), 1); assertEquals(3, cacheService.freeRegionCount()); assertEquals(0, cacheService.getFreq(region0)); @@ -235,7 +235,7 @@ public void testDecay() throws IOException { taskQueue.advanceTime(); taskQueue.runAllRunnableTasks(); - final var region0Again = cacheService.get(cacheKey1, size(250), 0).chunk; + final var region0Again = cacheService.get(cacheKey1, size(250), 0); assertSame(region0Again, region0); assertEquals(1, cacheService.getFreq(region0)); assertEquals(0, cacheService.getFreq(region1)); @@ -302,7 +302,7 @@ public void testGetMultiThreaded() throws IOException { cacheKeys[i], fileLength, regions[i] - ).chunk; + ); if (cacheFileRegion.tryIncRef()) { if (yield[i] == 0) { Thread.yield();