Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into shard-batch-cache
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 19, 2024
2 parents a893cb8 + 7a6f8b0 commit 3becfc2
Show file tree
Hide file tree
Showing 34 changed files with 1,394 additions and 102 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709))
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))

### Dependencies
Expand Down Expand Up @@ -144,6 +145,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))
- Mark fuzzy filter GA and remove experimental setting ([12631](https://github.com/opensearch-project/OpenSearch/pull/12631))

### Deprecated

Expand Down
4 changes: 2 additions & 2 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ bouncycastle=1.77
randomizedrunner = 2.7.1
junit = 4.13.2
hamcrest = 2.1
mockito = 5.10.0
mockito = 5.11.0
objenesis = 3.2
bytebuddy = 1.14.7
bytebuddy = 1.14.9

# benchmark dependencies
jmh = 1.35
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ static byte[] toBytes(BytesReference reference) {
return ArrayUtil.copyOfSubArray(bytesRef.bytes, bytesRef.offset, bytesRef.offset + bytesRef.length);
}

static byte[] toBytesWithoutCompact(BytesReference reference) {
final BytesRef bytesRef = reference.toBytesRef();
return bytesRef.bytes;
}

/**
* Returns an array of byte buffers from the given BytesReference.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void onRemoval(RemovalNotification<K, V> notification) {
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix
// coming soon
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.build(),
builder.cacheType,
builder.cacheFactories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand Down Expand Up @@ -106,8 +107,11 @@ public MockDiskCacheFactory(long delay, int maxSize) {
}

@Override
@SuppressWarnings({ "unchecked" })
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
return new Builder<K, V>().setMaxSize(maxSize)
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
.setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.build();
Expand All @@ -123,6 +127,8 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

int maxSize;
long delay;
Serializer<K, byte[]> keySerializer;
Serializer<V, byte[]> valueSerializer;

@Override
public ICache<K, V> build() {
Expand All @@ -138,5 +144,16 @@ public Builder<K, V> setDeliberateDelay(long millis) {
this.delay = millis;
return this;
}

public Builder<K, V> setKeySerializer(Serializer<K, byte[]> keySerializer) {
this.keySerializer = keySerializer;
return this;
}

public Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
.getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.build();

ICache<String, String> tieredSpilloverICache = new TieredSpilloverCache.TieredSpilloverCacheFactory().create(
Expand All @@ -127,12 +132,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setSettings(settings)
.setCachedResultParser(new Function<String, CachedQueryResult.PolicyValues>() {
@Override
public CachedQueryResult.PolicyValues apply(String s) {
return new CachedQueryResult.PolicyValues(20_000_000L);
}
}) // Values will always appear to have taken 20_000_000 ns = 20 ms to compute
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
// 20_000_000 ns = 20 ms to compute
.build(),
CacheType.INDICES_REQUEST_CACHE,
Map.of(
Expand All @@ -145,20 +146,16 @@ public CachedQueryResult.PolicyValues apply(String s) {

TieredSpilloverCache<String, String> tieredSpilloverCache = (TieredSpilloverCache<String, String>) tieredSpilloverICache;

// Put values in cache more than it's size and cause evictions from onHeap.
int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize);
List<String> onHeapKeys = new ArrayList<>();
List<String> diskTierKeys = new ArrayList<>();
for (int iter = 0; iter < numOfItems1; iter++) {
String key = UUID.randomUUID().toString();
LoadAwareCacheLoader<String, String> tieredCacheLoader = getLoadAwareCacheLoader();
tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader);
}
tieredSpilloverCache.getOnHeapCache().keys().forEach(onHeapKeys::add);
tieredSpilloverCache.getDiskCache().keys().forEach(diskTierKeys::add);

assertEquals(tieredSpilloverCache.getOnHeapCache().count(), onHeapKeys.size());
assertEquals(tieredSpilloverCache.getDiskCache().count(), diskTierKeys.size());
// Verify on heap cache size.
assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count());
// Verify disk cache size.
assertEquals(numOfItems1 - onHeapCacheSize, tieredSpilloverCache.getDiskCache().count());
}

public void testWithFactoryCreationWithOnHeapCacheNotPresent() {
Expand All @@ -180,6 +177,11 @@ public void testWithFactoryCreationWithOnHeapCacheNotPresent() {
.getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.build();

IllegalArgumentException ex = assertThrows(
Expand Down
Loading

0 comments on commit 3becfc2

Please sign in to comment.