From 58e2c22eb5807be31cffb5844a52e58fc4f1e3dc Mon Sep 17 00:00:00 2001 From: Jack Mazanec Date: Fri, 1 May 2020 13:15:48 -0700 Subject: [PATCH] Added index cache information in stats (#97) --- README.md | 34 ++- .../knn/index/KNNIndexCache.java | 115 +++++++- .../knn/plugin/stats/KNNStatsConfig.java | 6 +- .../knn/plugin/stats/StatNames.java | 2 + .../knn/index/KNNIndexCacheTests.java | 257 ++++++++++++++++++ 5 files changed, 399 insertions(+), 15 deletions(-) create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCacheTests.java diff --git a/README.md b/README.md index 78affb4d..fa54e0c2 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ To build the JNI Library used to incorporate NMSLIB functionality, follow these cd jni cmake . make -``` +``` The library will be placed in the `buildSrc` directory. @@ -65,7 +65,7 @@ The Elasticsearch server JVM will connect to a debugger attached to `localhost:5 To debug code running in an integration test (which exercises the server from a separate JVM), first, setup a remote debugger listening on port `8000`, and then run: ``` -./gradlew :integTest -Dtest.debug=1 +./gradlew :integTest -Dtest.debug=1 ``` The test runner JVM will connect to a debugger attached to `localhost:8000` before running the tests. @@ -94,15 +94,15 @@ PUT /myindex "my_vector1": { "type": "knn_vector", "dimension": 2 - }, + }, "my_vector2": { "type": "knn_vector", "dimension": 4 - }, + }, "my_vector3": { "type": "knn_vector", "dimension": 8 - } + } } } } @@ -231,7 +231,7 @@ This setting indicates whether or not the KNN Plugin is enabled. If it is disabl This setting specifies how many threads the NMS library should use to create the graph in memory. By default, the NMS library sets this value to the number of cores the machine has. However, because ES can spawn the same number of threads for searching, this could lead to (number of cores)^2 threads running and lead to 100% CPU utilization. The default value is *1.* #### Cache -The KNN Plugin uses a Guava cache to keep track of the graphs currently loaded into native memory. When a query is run against a graph for the first time, the graph is loaded into native memory (outside the Java heap). Because Elasticsearch runs inside of the JVM, it cannot manage native memory directly. So, it keeps track of native memory by adding an entry into a Guava cache that contains the pointer to the graph in native memory and how much memory it uses. The cache’s weight just means how much native memory all of the elements in the cache are taking up. If the maximum weight (this value is set by *knn.memory.circuit_breaker.limit*) of the cache is exceeded when it tries to load a graph into memory, the cache evicts an entry to make room for the new entry. Additionally, the cache can evict entries based on how long it has been since they were last accessed. +The KNN Plugin uses a Guava cache to keep track of the graphs currently loaded into native memory. When a query is run against a graph for the first time, the graph is loaded into native memory (outside the Java heap). Because Elasticsearch runs inside of the JVM, it cannot manage native memory directly. So, it keeps track of native memory by adding an entry into a Guava cache that contains the pointer to the graph in native memory and how much memory it uses. The cache’s weight just means how much native memory all of the elements in the cache are taking up. If the maximum weight (this value is set by *knn.memory.circuit_breaker.limit*) of the cache is exceeded when it tries to load a graph into memory, the cache evicts an entry to make room for the new entry. Additionally, the cache can evict entries based on how long it has been since they were last accessed. ##### knn.cache.item.expiry.enabled This setting indicates that the cache should evict entries that have expired (not been accessed for *knn.cache.item.expiry.minutes*). The default value is *false.* @@ -243,7 +243,7 @@ This setting indicates how long an item can be in the cache without being access For KNN, the circuit breaker is used to indicate when performance may degrade because the graphs loaded into native memory are reaching the cluster’s total limits. Currently, the system does not perform any action once this limit is reached. ##### knn.memory.circuit_breaker.enabled -This setting enables or disables the circuit breaker feature. Disabling this setting will keep you at risk of Out of memory as we do not have control on the memory usage for the graphs. The default value is *true*. +This setting enables or disables the circuit breaker feature. Disabling this setting will keep you at risk of Out of memory as we do not have control on the memory usage for the graphs. The default value is *true*. ##### knn.memory.circuit_breaker.limit This setting indicates the maximum capacity of the cache. When the cache attempts to load in a graph that exceeds this limit, it is forced to evict an entry and *knn.circuit_breaker.triggered *is set to *true.* The default value for this setting is *60% *of the machines total memory outside the Elasticsearch jvm . However, a value in *KB* can be given as well. @@ -288,7 +288,7 @@ Indicates whether the circuit breaker is triggered. The number of evictions that have occurred in the guava cache. *note:* explicit evictions that occur because of index deletion are not counted. #### hit_count -The number of cache hits that have occurred on the node. A cache hit occurs when a user queries a graph and it is already loaded into memory. +The number of cache hits that have occurred on the node. A cache hit occurs when a user queries a graph and it is already loaded into memory. #### miss_count The number of cache misses that have occurred on the node. A cache miss occurs when a user queries a graph and it has not yet been loaded into memory. @@ -296,6 +296,9 @@ The number of cache misses that have occurred on the node. A cache miss occurs w #### graph_memory_usage The current weight of the cache (the total size in native memory of all of the graphs) in Kilobytes. +#### graph_memory_usage_percentage +The current weight of the cache as a percentage of the maximum cache capacity. + #### graph_index_requests The number of requests to add the knn_vector field of a document into a graph. @@ -303,13 +306,13 @@ The number of requests to add the knn_vector field of a document into a graph. The number of requests to add the knn_vector field of a document into a graph that have produced an error. #### graph_query_requests -The number of graph queries that have been made. +The number of graph queries that have been made. #### graph_query_errors The number of graph queries that have produced an error. #### knn_query_requests -The number of KNN query requests received. +The number of KNN query requests received. #### cache_capacity_reached Whether the cache capacity for this node has been reached. This capacity can be controlled as part of the *knn.memory.circuit_breaker.limit.* @@ -323,6 +326,9 @@ The number of times an item is successfully loaded into the cache. #### total_load_time The total time in nanoseconds it has taken to load items into cache (cumulative). +#### indices_in_cache +For each index that has graphs in the cache, this stat provides the number of graphs that index has and the total graph_memory_usage that index is using in Kilobytes. + #### Examples ``` @@ -340,11 +346,19 @@ GET /_opendistro/_knn/stats?pretty "eviction_count" : 0, "miss_count" : 1, "graph_memory_usage" : 1, + "graph_memory_usage_percentage" : 3.68, "graph_index_requests" : 7, "graph_index_errors" : 1, "knn_query_requests" : 4, "graph_query_requests" : 30, "graph_query_errors" : 15, + "indices_in_cache" : { + "myindex" : { + "graph_memory_usage" : 2, + "graph_memory_usage_percentage" : 3.68, + "graph_count" : 2 + } + }, "cache_capacity_reached" : false, "load_exception_count" : 0, "hit_count" : 0, diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCache.java b/src/main/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCache.java index 047ee75a..0cacf86d 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCache.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCache.java @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.knn.index; import com.amazon.opendistroforelasticsearch.knn.index.v1736.KNNIndex; +import com.amazon.opendistroforelasticsearch.knn.plugin.stats.StatNames; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheStats; @@ -29,20 +30,29 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.WatcherHandle; +import java.io.Closeable; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static com.amazon.opendistroforelasticsearch.knn.index.KNNSettings.getCircuitBreakerLimit; /** * KNNIndex level caching with weight based, time based evictions. This caching helps us * to manage the hnsw graphs in the memory and garbage collect them after specified timeout * or when weightCircuitBreaker is hit. */ -public class KNNIndexCache { +public class KNNIndexCache implements Closeable { + public static String GRAPH_COUNT = "graph_count"; + private static Logger logger = LogManager.getLogger(KNNIndexCache.class); private static KNNIndexCache INSTANCE; @@ -60,6 +70,10 @@ public static void setResourceWatcherService(final ResourceWatcherService resour getInstance().resourceWatcherService = resourceWatcherService; } + public void close() { + executor.shutdown(); + } + /** * Make sure we just have one instance of cache * @return KNNIndexCache instance @@ -77,7 +91,7 @@ private void initCache() { .concurrencyLevel(1) .removalListener(k -> onRemoval(k)); if(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) { - cacheBuilder.maximumWeight(KNNSettings.getCircuitBreakerLimit().getKb()).weigher((k, v) -> (int)v.getKnnIndex().getIndexSize()); + cacheBuilder.maximumWeight(getCircuitBreakerLimit().getKb()).weigher((k, v) -> (int)v.getKnnIndex().getIndexSize()); } if(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED)) { @@ -113,6 +127,9 @@ private void onRemoval(RemovalNotification removalNo executor.execute(() -> knnIndexCacheEntry.getKnnIndex().close()); + String esIndexName = removalNotification.getValue().getEsIndexName(); + String indexPathUrl = removalNotification.getValue().getIndexPathUrl(); + if (RemovalCause.SIZE == removalNotification.getCause()) { KNNSettings.state().updateCircuitBreakerSettings(true); setCacheCapacityReached(true); @@ -147,6 +164,35 @@ public CacheStats getStats() { return cache.stats(); } + /** + * Get the stats of all of the Elasticsearch indices currently loaded into the cache + * + * @return Map containing all of the Elasticsearch indices in the cache and their stats + */ + public Map> getIndicesCacheStats() { + Map> statValues = new HashMap<>(); + String indexName; + for (Map.Entry index : cache.asMap().entrySet()) { + indexName = index.getValue().getEsIndexName(); + statValues.putIfAbsent(indexName, new HashMap<>()); + statValues.get(indexName).put(GRAPH_COUNT, ((Integer) statValues.get(indexName) + .getOrDefault(GRAPH_COUNT, 0)) + 1); + statValues.get(indexName).putIfAbsent(StatNames.GRAPH_MEMORY_USAGE.getName(), + getWeightInKilobytes(indexName)); + statValues.get(indexName).putIfAbsent(StatNames.GRAPH_MEMORY_USAGE_PERCENTAGE.getName(), + getWeightAsPercentage(indexName)); + } + + return statValues; + } + + protected Set getGraphNamesForIndex(String indexName) { + return cache.asMap().values().stream() + .filter(knnIndexCacheEntry -> indexName.equals(knnIndexCacheEntry.getEsIndexName())) + .map(KNNIndexCacheEntry::getIndexPathUrl) + .collect(Collectors.toSet()); + } + /** * Returns the current weight of the cache in KiloBytes * @@ -156,6 +202,36 @@ public Long getWeightInKilobytes() { return cache.asMap().values().stream().map(KNNIndexCacheEntry::getKnnIndex).mapToLong(KNNIndex::getIndexSize).sum(); } + /** + * Returns the current weight of an index in the cache in KiloBytes + * + * @param indexName Name if index to get the weight for + * @return Weight of the index in the cache in kilobytes + */ + public Long getWeightInKilobytes(final String indexName) { + return cache.asMap().values().stream() + .filter(knnIndexCacheEntry -> indexName.equals(knnIndexCacheEntry.getEsIndexName())) + .map(KNNIndexCacheEntry::getKnnIndex).mapToLong(KNNIndex::getIndexSize).sum(); + } + + /** + * Returns how full the cache is as a percentage of the total cache capacity + * + * @return Percentage of the cache full + */ + public Float getWeightAsPercentage() { + return 100 * getWeightInKilobytes() / (float) getCircuitBreakerLimit().getKb(); + } + + /** + * Returns the how much space an index is taking up in the cache is as a percentage of the total cache capacity + * + * @return Percentage of the cache full + */ + public Float getWeightAsPercentage(final String indexName) { + return 100 * getWeightInKilobytes(indexName) / (float) getCircuitBreakerLimit().getKb(); + } + /** * Returns whether or not the capacity of the cache has been reached * @@ -174,6 +250,24 @@ public void setCacheCapacityReached(Boolean value) { cacheCapacityReached.set(value); } + /** + * Evict a graph in the cache manually + * + * @param indexFilePath path to segment file. Also, key in cache + */ + public void evictGraphFromCache(String indexFilePath) { + logger.info("[KNN] " + indexFilePath + " invalidated explicitly"); + cache.invalidate(indexFilePath); + } + + /** + * Evict all graphs in the cache manually + */ + public void evictAllGraphsFromCache() { + logger.info("[KNN] All entries in cache invalidated explicitly"); + cache.invalidateAll(); + } + /** * Loads hnsw index to memory. Registers the location of the serialized graph with ResourceWatcher. * @@ -203,7 +297,7 @@ public KNNIndexCacheEntry loadIndex(String indexPathUrl, String indexName) throw // causes us to invalidate an entry before the key has been fully loaded. final WatcherHandle watcherHandle = resourceWatcherService.add(fileWatcher); - return new KNNIndexCacheEntry(knnIndex, watcherHandle); + return new KNNIndexCacheEntry(knnIndex, indexPathUrl, indexName, watcherHandle); } /** @@ -213,10 +307,15 @@ public KNNIndexCacheEntry loadIndex(String indexPathUrl, String indexName) throw */ private static class KNNIndexCacheEntry { private final KNNIndex knnIndex; + private final String indexPathUrl; + private final String esIndexName; private final WatcherHandle fileWatcherHandle; - private KNNIndexCacheEntry(final KNNIndex knnIndex, final WatcherHandle fileWatcherHandle) { + private KNNIndexCacheEntry(final KNNIndex knnIndex, final String indexPathUrl, final String esIndexName, + final WatcherHandle fileWatcherHandle) { this.knnIndex = knnIndex; + this.indexPathUrl = indexPathUrl; + this.esIndexName = esIndexName; this.fileWatcherHandle = fileWatcherHandle; } @@ -224,6 +323,14 @@ private KNNIndex getKnnIndex() { return knnIndex; } + private String getIndexPathUrl() { + return indexPathUrl; + } + + private String getEsIndexName() { + return esIndexName; + } + private WatcherHandle getFileWatcherHandle() { return fileWatcherHandle; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/KNNStatsConfig.java b/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/KNNStatsConfig.java index 30c36ab9..989e7027 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/KNNStatsConfig.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/KNNStatsConfig.java @@ -41,6 +41,8 @@ public class KNNStatsConfig { new KNNInnerCacheStatsSupplier(CacheStats::evictionCount))) .put(StatNames.GRAPH_MEMORY_USAGE.getName(), new KNNStat<>(false, new KNNCacheSupplier<>(KNNIndexCache::getWeightInKilobytes))) + .put(StatNames.GRAPH_MEMORY_USAGE_PERCENTAGE.getName(), new KNNStat<>(false, + new KNNCacheSupplier<>(KNNIndexCache::getWeightAsPercentage))) .put(StatNames.CACHE_CAPACITY_REACHED.getName(), new KNNStat<>(false, new KNNCacheSupplier<>(KNNIndexCache::isCacheCapacityReached))) .put(StatNames.GRAPH_QUERY_ERRORS.getName(), new KNNStat<>(false, @@ -54,5 +56,7 @@ public class KNNStatsConfig { .put(StatNames.CIRCUIT_BREAKER_TRIGGERED.getName(), new KNNStat<>(true, new KNNCircuitBreakerSupplier())) .put(StatNames.KNN_QUERY_REQUESTS.getName(), new KNNStat<>(false, - new KNNCounterSupplier(KNNCounter.KNN_QUERY_REQUESTS))).build(); + new KNNCounterSupplier(KNNCounter.KNN_QUERY_REQUESTS))) + .put(StatNames.INDICES_IN_CACHE.getName(), new KNNStat<>(false, + new KNNCacheSupplier<>(KNNIndexCache::getIndicesCacheStats))).build(); } \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/StatNames.java b/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/StatNames.java index 924207a6..73536469 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/StatNames.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/knn/plugin/stats/StatNames.java @@ -29,7 +29,9 @@ public enum StatNames { TOTAL_LOAD_TIME("total_load_time"), EVICTION_COUNT("eviction_count"), GRAPH_MEMORY_USAGE("graph_memory_usage"), + GRAPH_MEMORY_USAGE_PERCENTAGE("graph_memory_usage_percentage"), CACHE_CAPACITY_REACHED("cache_capacity_reached"), + INDICES_IN_CACHE("indices_in_cache"), CIRCUIT_BREAKER_TRIGGERED("circuit_breaker_triggered"), GRAPH_QUERY_ERRORS(KNNCounter.GRAPH_QUERY_ERRORS.getName()), GRAPH_QUERY_REQUESTS(KNNCounter.GRAPH_QUERY_REQUESTS.getName()), diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCacheTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCacheTests.java new file mode 100644 index 00000000..a139b0d0 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/knn/index/KNNIndexCacheTests.java @@ -0,0 +1,257 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.knn.index; + +import com.amazon.opendistroforelasticsearch.knn.plugin.KNNPlugin; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static com.amazon.opendistroforelasticsearch.knn.index.KNNIndexCache.GRAPH_COUNT; + +public class KNNIndexCacheTests extends ESSingleNodeTestCase { + private final String testIndexName = "test_index"; + private final String testFieldName = "test_field"; + + @Override + protected Collection> getPlugins() { + return Collections.singletonList(KNNPlugin.class); + } + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @Override + public void tearDown() throws Exception { + KNNIndexCache.getInstance().evictAllGraphsFromCache(); + KNNIndexCache.getInstance().close(); + super.tearDown(); + } + + public void testGetIndicesCacheStats() throws IOException, InterruptedException, ExecutionException { + // Check that indiceCacheStats starts out at 0 + Map> indiceCacheStats = KNNIndexCache.getInstance().getIndicesCacheStats(); + assertEquals(0, indiceCacheStats.size()); + + // Create one KNN index + String testIndexName1 = testIndexName + "1"; + createIndex(testIndexName1, getKNNDefaultIndexSettings()); + createKnnIndexMapping(testIndexName1, testFieldName, 2); + + Long[] vector = {0L, 0L}; + addKnnDoc(testIndexName1, "1", testFieldName, vector); + float[] queryVector = {0L, 0L}; + searchKNNIndex(testIndexName1, testFieldName, queryVector, 2); + + // Confirm that 1 index is added to IndicesCacheStats and 1 graph is added + indiceCacheStats = KNNIndexCache.getInstance().getIndicesCacheStats(); + assertEquals(1L, indiceCacheStats.size()); + assertEquals(1, indiceCacheStats.get(testIndexName1).get(GRAPH_COUNT)); + + // Create second KNN index + String testIndexName2 = testIndexName + "2"; + createIndex(testIndexName2, getKNNDefaultIndexSettings()); + createKnnIndexMapping(testIndexName2, testFieldName, 2); + + for (int i = 0; i < 3; i++) { + addKnnDoc(testIndexName2, Integer.toString(i), testFieldName, vector); + Thread.sleep(200); + } + + searchKNNIndex(testIndexName2, testFieldName, queryVector, 2); + + // Confirm that 2 indices are added in getIndicesCacheStats + indiceCacheStats = KNNIndexCache.getInstance().getIndicesCacheStats(); + assertEquals(2L, indiceCacheStats.size()); + + // Evict all of the graphs from testIndexName2 out of the cache + for (String graphName : KNNIndexCache.getInstance().getGraphNamesForIndex(testIndexName2)) { + KNNIndexCache.getInstance().evictGraphFromCache(graphName); + } + + // Confirm that after all of the graphs of an index get evicted, there is only one index in the indiceCacheStats + indiceCacheStats = KNNIndexCache.getInstance().getIndicesCacheStats(); + assertEquals(1L, indiceCacheStats.size()); + } + + public void testGetWeightInKilobytes() throws IOException, InterruptedException, ExecutionException { + // Check that indiceCacheStats starts out at 0 + Map> indiceCacheStats = KNNIndexCache.getInstance().getIndicesCacheStats(); + assertEquals(0, indiceCacheStats.size()); + + // Assert total weight in cache is 0 + assertEquals((Long) 0L, KNNIndexCache.getInstance().getWeightInKilobytes()); + + // Create 1 index + String testIndexName1 = testIndexName + "1"; + createIndex(testIndexName1, getKNNDefaultIndexSettings()); + createKnnIndexMapping(testIndexName1, testFieldName, 2); + + Long[] vector = {0L, 0L}; + for (int i = 0; i < 3; i++) { + addKnnDoc(testIndexName1, Integer.toString(i), testFieldName, vector); + Thread.sleep(200); + } + + float[] queryVector = {0L, 0L}; + searchKNNIndex(testIndexName1, testFieldName, queryVector, 2); + + // Confirm that that index's weight in cache is equal to the total weight in the cache + assertTrue(KNNIndexCache.getInstance().getWeightInKilobytes() > 0L); + assertEquals(KNNIndexCache.getInstance().getWeightInKilobytes(), + KNNIndexCache.getInstance().getWeightInKilobytes(testIndexName1)); + + // Add a second index + String testIndexName2 = testIndexName + "2"; + createIndex(testIndexName2, getKNNDefaultIndexSettings()); + createKnnIndexMapping(testIndexName2, testFieldName, 2); + for (int i = 0; i < 3; i++) { + addKnnDoc(testIndexName2, Integer.toString(i), testFieldName, vector); + Thread.sleep(200); + } + searchKNNIndex(testIndexName2, testFieldName, queryVector, 2); + + // Confirm that that index's weight in cache is greater than 0 and that the weights of testIndexName1 and + // testIndexName2 add up to the total weight + assertTrue(KNNIndexCache.getInstance().getWeightInKilobytes(testIndexName2) > 0L); + assertEquals(KNNIndexCache.getInstance().getWeightInKilobytes(), + (Long)(KNNIndexCache.getInstance().getWeightInKilobytes(testIndexName1) + + KNNIndexCache.getInstance().getWeightInKilobytes(testIndexName2))); + } + + public void testGetGraphNames() throws IOException, InterruptedException, ExecutionException { + // Create an index + String testIndexName1 = testIndexName + "1"; + createIndex(testIndexName1, getKNNDefaultIndexSettings()); + createKnnIndexMapping(testIndexName1, testFieldName, 2); + + Long[] vector = {0L, 0L}; + for (int i = 0; i < 5; i++) { + addKnnDoc(testIndexName1, Integer.toString(i), testFieldName, vector); + Thread.sleep(200); + } + + float[] queryVector = {0L, 0L}; + searchKNNIndex(testIndexName1, testFieldName, queryVector, 2); + + // Make sure it returns the correct names + assertTrue(KNNIndexCache.getInstance().getGraphNamesForIndex(testIndexName1).size() > 0); + for (String graphName : KNNIndexCache.getInstance().getGraphNamesForIndex(testIndexName1)) { + assertTrue(graphName.contains("hnsw")); + } + } + + public void testEvictGraphs() throws IOException, InterruptedException, ExecutionException { + // Create an index + String testIndexName1 = testIndexName + "1"; + createIndex(testIndexName1, getKNNDefaultIndexSettings()); + createKnnIndexMapping(testIndexName1, testFieldName, 2); + + Long[] vector = {0L, 0L}; + for (int i = 0; i < 3; i++) { + addKnnDoc(testIndexName1, Integer.toString(i), testFieldName, vector); + Thread.sleep(200); + } + + float[] queryVector = {0L, 0L}; + searchKNNIndex(testIndexName1, testFieldName, queryVector, 2); + + // Confirm that there is at least 1 graph in the cache for testIndexName1 + assertTrue((int) KNNIndexCache.getInstance().getIndicesCacheStats().get(testIndexName1) + .get(GRAPH_COUNT) > 0); + + // Evict all of the graphs for one index from the graph + for (String graphName : KNNIndexCache.getInstance().getGraphNamesForIndex(testIndexName1)) { + KNNIndexCache.getInstance().evictGraphFromCache(graphName); + } + assertFalse(KNNIndexCache.getInstance().getIndicesCacheStats().containsKey(testIndexName1)); + + // add testIndexName1 back into the cache by searching + searchKNNIndex(testIndexName1, testFieldName, queryVector, 2); + + // Create a second index + String testIndexName2 = testIndexName + "2"; + createIndex(testIndexName2, getKNNDefaultIndexSettings()); + createKnnIndexMapping(testIndexName2, testFieldName, 2); + for (int i = 0; i < 3; i++) { + addKnnDoc(testIndexName2, Integer.toString(i), testFieldName, vector); + Thread.sleep(200); + } + searchKNNIndex(testIndexName2, testFieldName, queryVector, 2); + + // Confirm that the cache has at least 1 graph for each index + assertTrue((int) KNNIndexCache.getInstance().getIndicesCacheStats().get(testIndexName1) + .get(GRAPH_COUNT) > 0); + assertTrue((int) KNNIndexCache.getInstance().getIndicesCacheStats().get(testIndexName2) + .get(GRAPH_COUNT) > 0); + + // Evict all and make sure the graph is empty + KNNIndexCache.getInstance().evictAllGraphsFromCache(); + assertEquals(0, KNNIndexCache.getInstance().getIndicesCacheStats().size()); + } + + protected void createKnnIndexMapping(String indexName, String fieldName, Integer dimensions) { + PutMappingRequest request = new PutMappingRequest(indexName).type("_doc"); + request.source(fieldName, "type=knn_vector,dimension="+dimensions); + ElasticsearchAssertions.assertAcked(client().admin().indices().putMapping(request).actionGet()); + } + + protected Settings getKNNDefaultIndexSettings() { + return Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .put("index.knn", true) + .build(); + } + + protected void addKnnDoc(String index, String docId, String fieldName, Object[] vector) + throws IOException, InterruptedException, ExecutionException { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject() + .field(fieldName, vector) + .endObject(); + IndexRequest indexRequest = new IndexRequest() + .index(index) + .id(docId) + .source(builder) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + IndexResponse response = client().index(indexRequest).get(); + assertEquals(response.status(), RestStatus.CREATED); + } + + protected void searchKNNIndex(String index, String fieldName, float[] vector, int k) { + SearchResponse response = client().prepareSearch(index).setQuery(new KNNQueryBuilder(fieldName, vector, k)) + .get(); + assertEquals(response.status(), RestStatus.OK); + } +}