Skip to content

Commit

Permalink
First draft of single-StatsHolder setup
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Apr 9, 2024
1 parent a594e47 commit 96c9493
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.stats.CacheStats;
import org.opensearch.common.cache.stats.DimensionNode;
import org.opensearch.common.cache.stats.StatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -64,8 +65,9 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// In future we want to just read the stats from the individual tiers' statsHolder objects, but this isn't
// possible right now because of the way computeIfAbsent is implemented.
private final StatsHolder heapStats;
private final StatsHolder diskStats;
//private final StatsHolder heapStats;
//private final StatsHolder diskStats;
private final StatsHolder statsHolder;

private ToLongBiFunction<ICacheKey<K>, V> weigher;
private final List<String> dimensionNames;
Expand All @@ -76,9 +78,14 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final List<Tuple<ICache<K, V>, StatsHolder>> cacheAndStatsList;
private final List<Tuple<ICache<K, V>, String>> cacheAndTierValueList;
private final List<Predicate<V>> policies;

// Common values used for tier dimension
public static final String TIER_DIMENSION_NAME = "tier";
public static final String TIER_DIMENSION_VALUE_ON_HEAP = "on_heap";
public static final String TIER_DIMENSION_VALUE_DISK = "disk";

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Expand Down Expand Up @@ -116,12 +123,14 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.heapStats = new StatsHolder(dimensionNames);
this.diskStats = new StatsHolder(dimensionNames);
this.cacheAndStatsList = List.of(
new Tuple<>(onHeapCache, heapStats),
new Tuple<>(diskCache, diskStats)
//this.heapStats = new StatsHolder(dimensionNames);
//this.diskStats = new StatsHolder(dimensionNames);
this.cacheAndTierValueList = List.of(
new Tuple<>(onHeapCache, TIER_DIMENSION_VALUE_ON_HEAP),
new Tuple<>(diskCache, TIER_DIMENSION_VALUE_DISK)
);
// Pass "tier" as the innermost dimension name, on top of whatever dimensions are specified for the cache as a whole
this.statsHolder = new StatsHolder(addTierValueToDimensionValues(dimensionNames, TIER_DIMENSION_NAME));
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

Expand All @@ -144,7 +153,7 @@ public V get(ICacheKey<K> key) {
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
updateStatsOnPut(heapStats, key, value);
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
}
}

Expand All @@ -161,7 +170,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
value = onHeapCache.computeIfAbsent(key, loader);
if (loader.isLoaded()) {
// The value was just computed and added to the cache
updateStatsOnPut(heapStats, key, value);
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
}
}
return value;
Expand All @@ -176,8 +185,13 @@ public void invalidate(ICacheKey<K> key) {
// also trigger a hit/miss listener event, so ignoring it for now.
// We don't update stats here, as this is handled by the removal listeners for the tiers.
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidate(key);
for (Tuple<ICache<K, V>, String> pair : cacheAndTierValueList) {
//cache.invalidate(key);
if (key.getDropStatsForDimensions()) {
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, pair.v2());
statsHolder.removeDimensions(dimensionValues);
}
pair.v1().invalidate(key);
}
}
}
Expand All @@ -189,8 +203,7 @@ public void invalidateAll() {
cache.invalidateAll();
}
}
heapStats.reset();
diskStats.reset();
statsHolder.reset();
}

/**
Expand All @@ -205,7 +218,7 @@ public Iterable<ICacheKey<K>> keys() {

@Override
public long count() {
return heapStats.count() + diskStats.count();
return statsHolder.count();
}

@Override
Expand All @@ -226,21 +239,23 @@ public void close() throws IOException {

@Override
public CacheStats stats() {
// TODO: Just reuse MDCS
//return new TieredSpilloverCacheStats(heapStats.createSnapshot(), diskStats.createSnapshot(), dimensionNames);
return null;
return statsHolder.getCacheStats();
}

private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (Tuple<ICache<K, V>, StatsHolder> pair : cacheAndStatsList) {
for (Tuple<ICache<K, V>, String> pair : cacheAndTierValueList) {
V value = pair.v1().get(key);
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, pair.v2()); // Get the tier value corresponding to this cache
if (value != null) {
pair.v2().incrementHits(key.dimensions);
//pair.v2().incrementHits(key.dimensions);
statsHolder.incrementHits(dimensionValues);
return value;
} else {
pair.v2().incrementMisses(key.dimensions);
statsHolder.incrementMisses(dimensionValues);
//pair.v2().incrementMisses(key.dimensions);
}
}
}
Expand All @@ -257,7 +272,7 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
try (ReleasableLock ignore = writeLock.acquire()) {
if (evaluatePolicies(notification.getValue())) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
updateStatsOnPut(diskStats, key, notification.getValue());
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
}
}
wasEvicted = true;
Expand All @@ -267,7 +282,7 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
// If the removal was for another reason, send this notification to the TSC's removal listener, as the value is leaving the TSC entirely
removalListener.onRemoval(notification);
}
updateStatsOnRemoval(heapStats, wasEvicted, key, notification.getValue());
updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue());
}

void handleRemovalFromDiskTier(RemovalNotification<ICacheKey<K>, V> notification) {
Expand All @@ -279,20 +294,22 @@ void handleRemovalFromDiskTier(RemovalNotification<ICacheKey<K>, V> notification
|| RemovalReason.CAPACITY.equals(notification.getRemovalReason())) {
wasEvicted = true;
}
updateStatsOnRemoval(diskStats, wasEvicted, notification.getKey(), notification.getValue());
updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue());
}

void updateStatsOnRemoval(StatsHolder statsHolder, boolean wasEvicted, ICacheKey<K> key, V value) {
void updateStatsOnRemoval(String removedFromTierValue, boolean wasEvicted, ICacheKey<K> key, V value) {
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, removedFromTierValue);
if (wasEvicted) {
statsHolder.incrementEvictions(key.dimensions);
statsHolder.incrementEvictions(dimensionValues);
}
statsHolder.decrementEntries(key.dimensions);
statsHolder.decrementSizeInBytes(key.dimensions, weigher.applyAsLong(key, value));
statsHolder.decrementEntries(dimensionValues);
statsHolder.decrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value));
}

void updateStatsOnPut(StatsHolder statsHolder, ICacheKey<K> key, V value) {
statsHolder.incrementEntries(key.dimensions);
statsHolder.incrementSizeInBytes(key.dimensions, weigher.applyAsLong(key, value));
void updateStatsOnPut(String destinationTierValue, ICacheKey<K> key, V value) {
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, destinationTierValue);
statsHolder.incrementEntries(dimensionValues);
statsHolder.incrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value));
}

boolean evaluatePolicies(V value) {
Expand All @@ -304,6 +321,15 @@ boolean evaluatePolicies(V value) {
return true;
}

/**
* Add tierValue to the end of a copy of the initial dimension values.
*/
private List<String> addTierValueToDimensionValues(List<String> initialDimensions, String tierValue) {
List<String> result = new ArrayList<>(initialDimensions);
result.add(tierValue);
return result;
}

/**
* A class which receives removal events from the heap tier.
*/
Expand Down

This file was deleted.

Loading

0 comments on commit 96c9493

Please sign in to comment.