Skip to content

Commit

Permalink
Move eviction handling directly into internal cache impl
Browse files Browse the repository at this point in the history
  • Loading branch information
romseygeek committed Oct 4, 2023
1 parent cd3a742 commit e92d189
Showing 1 changed file with 32 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -400,50 +399,8 @@ private int getRegionSize(long fileLength, int region) {
}

CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
// try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path if we did not
// find an entry
var entry = cache.get(regionKey);
if (entry == null) {
final int effectiveRegionSize = getRegionSize(fileLength, region);
entry = cache.computeIfAbsent(regionKey, key -> new CacheFileRegion(key, effectiveRegionSize));
}
// io is volatile, double locking is fine, as long as we assign it last.
if (entry.io == null) {
synchronized (entry) {
if (entry.io == null) {
return initChunk(entry);
}
}
}
return entry;
}

private CacheFileRegion initChunk(CacheFileRegion entry) {
assert Thread.holdsLock(entry);
RegionKey<KeyType> regionKey = entry.regionKey;
if (cache.get(regionKey) != entry) {
throwAlreadyClosed("no free region found (contender)");
}
final SharedBytes.IO freeSlot = freeRegions.poll();
if (freeSlot != null) {
// no need to evict an item, just add
assignToSlot(entry, freeSlot);
} else {
// need to evict something
cache.evict();
final SharedBytes.IO freeSlotRetry = freeRegions.poll();
if (freeSlotRetry != null) {
assignToSlot(entry, freeSlotRetry);
} else {
synchronized (this) {
cache.evictMatching(r -> r == regionKey);
}
throwAlreadyClosed("no free region found");
}
}

return entry;
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, fileLength, region);
return cache.getAndEvict(regionKey);
}

private void assignToSlot(CacheFileRegion entry, SharedBytes.IO freeSlot) {
Expand Down Expand Up @@ -571,7 +528,7 @@ public void close() {
cache.close();
}

private record RegionKey<KeyType>(KeyType file, int region) {
private record RegionKey<KeyType>(KeyType file, long length, int region) {
@Override
public String toString() {
return "Chunk{" + "file=" + file + ", region=" + region + '}';
Expand Down Expand Up @@ -814,7 +771,7 @@ public boolean tryRead(ByteBuffer buf, long offset) throws IOException {
var fileRegion = lastAccessedRegion;
if (fileRegion != null && fileRegion.regionKey.region == startRegion) {
// existing item, get it to update cache usage
cache.get(fileRegion.regionKey);
cache.getAndEvict(fileRegion.regionKey);
} else {
fileRegion = get(cacheKey, length, startRegion);
}
Expand Down Expand Up @@ -981,11 +938,7 @@ public record Stats(
}

private interface Cache<K, V> {
V get(K key);

V computeIfAbsent(K key, Function<K, V> computeFunction);

void evict();
V getAndEvict(K key);

int evictMatching(Predicate<K> predicate);

Expand Down Expand Up @@ -1029,33 +982,45 @@ public void close() {
}

@Override
public CacheFileRegion get(RegionKey<KeyType> key) {
public CacheFileRegion getAndEvict(RegionKey<KeyType> key) {
Entry<CacheFileRegion> entry = keyMapping.get(key);
final long now = threadPool.relativeTimeInMillis();
if (entry != null) {
final long now = threadPool.relativeTimeInMillis();
// existing item, check if we need to promote item
if (now - entry.lastAccessed >= minTimeDelta) {
maybePromote(now, entry);
}
return entry.chunk;
}
return null;
}

@Override
public CacheFileRegion computeIfAbsent(RegionKey<KeyType> key, Function<RegionKey<KeyType>, CacheFileRegion> computeFunction) {
final long now = threadPool.relativeTimeInMillis();
SharedBytes.IO freePage = freeRegions.poll();
if (freePage == null) {
// no free regions, so evict something
evict();
freePage = freeRegions.poll();
if (freePage == null) {
throwAlreadyClosed("No free regions");
}
}
var pageToUse = freePage;
synchronized (SharedBlobCacheService.this) {
return keyMapping.computeIfAbsent(key, k -> {
var entry = new Entry<>(computeFunction.apply(k), now);
pushEntryToBack(entry);
return entry;
}).chunk;
entry = keyMapping.computeIfAbsent(key, k -> {
var size = getRegionSize(k.length, k.region);
var e = new Entry<>(new CacheFileRegion(key, size), now);
pushEntryToBack(e);
e.chunk.io = pageToUse;
return e;
});
if (entry.chunk.io != pageToUse) {
// another thread got there first, return the page to the pool
freeRegions.add(pageToUse);
} else {
assert regionOwners.put(entry.chunk.io, entry.chunk) == null;
}
}
return entry.chunk;
}

@Override
public void evict() {
private void evict() {
synchronized (SharedBlobCacheService.this) {
for (int i = 0; i < maxFreq; i++) {
for (Entry<CacheFileRegion> entry = freqs[i]; entry != null; entry = entry.next) {
Expand Down

0 comments on commit e92d189

Please sign in to comment.