Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Added index cache information in stats (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmazanec15 authored May 1, 2020
1 parent b1bd276 commit 58e2c22
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 15 deletions.
34 changes: 24 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ To build the JNI Library used to incorporate NMSLIB functionality, follow these
cd jni
cmake .
make
```
```

The library will be placed in the `buildSrc` directory.

Expand Down Expand Up @@ -65,7 +65,7 @@ The Elasticsearch server JVM will connect to a debugger attached to `localhost:5
To debug code running in an integration test (which exercises the server from a separate JVM), first, setup a remote debugger listening on port `8000`, and then run:

```
./gradlew :integTest -Dtest.debug=1
./gradlew :integTest -Dtest.debug=1
```

The test runner JVM will connect to a debugger attached to `localhost:8000` before running the tests.
Expand Down Expand Up @@ -94,15 +94,15 @@ PUT /myindex
"my_vector1": {
"type": "knn_vector",
"dimension": 2
},
},
"my_vector2": {
"type": "knn_vector",
"dimension": 4
},
},
"my_vector3": {
"type": "knn_vector",
"dimension": 8
}
}
}
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ This setting indicates whether or not the KNN Plugin is enabled. If it is disabl
This setting specifies how many threads the NMS library should use to create the graph in memory. By default, the NMS library sets this value to the number of cores the machine has. However, because ES can spawn the same number of threads for searching, this could lead to (number of cores)^2 threads running and lead to 100% CPU utilization. The default value is *1.*

#### Cache
The KNN Plugin uses a Guava cache to keep track of the graphs currently loaded into native memory. When a query is run against a graph for the first time, the graph is loaded into native memory (outside the Java heap). Because Elasticsearch runs inside of the JVM, it cannot manage native memory directly. So, it keeps track of native memory by adding an entry into a Guava cache that contains the pointer to the graph in native memory and how much memory it uses. The cache’s weight just means how much native memory all of the elements in the cache are taking up. If the maximum weight (this value is set by *knn.memory.circuit_breaker.limit*) of the cache is exceeded when it tries to load a graph into memory, the cache evicts an entry to make room for the new entry. Additionally, the cache can evict entries based on how long it has been since they were last accessed.
The KNN Plugin uses a Guava cache to keep track of the graphs currently loaded into native memory. When a query is run against a graph for the first time, the graph is loaded into native memory (outside the Java heap). Because Elasticsearch runs inside of the JVM, it cannot manage native memory directly. So, it keeps track of native memory by adding an entry into a Guava cache that contains the pointer to the graph in native memory and how much memory it uses. The cache’s weight just means how much native memory all of the elements in the cache are taking up. If the maximum weight (this value is set by *knn.memory.circuit_breaker.limit*) of the cache is exceeded when it tries to load a graph into memory, the cache evicts an entry to make room for the new entry. Additionally, the cache can evict entries based on how long it has been since they were last accessed.

##### knn.cache.item.expiry.enabled
This setting indicates that the cache should evict entries that have expired (not been accessed for *knn.cache.item.expiry.minutes*). The default value is *false.*
Expand All @@ -243,7 +243,7 @@ This setting indicates how long an item can be in the cache without being access
For KNN, the circuit breaker is used to indicate when performance may degrade because the graphs loaded into native memory are reaching the cluster’s total limits. Currently, the system does not perform any action once this limit is reached.

##### knn.memory.circuit_breaker.enabled
This setting enables or disables the circuit breaker feature. Disabling this setting will keep you at risk of Out of memory as we do not have control on the memory usage for the graphs. The default value is *true*.
This setting enables or disables the circuit breaker feature. Disabling this setting will keep you at risk of Out of memory as we do not have control on the memory usage for the graphs. The default value is *true*.

##### knn.memory.circuit_breaker.limit
This setting indicates the maximum capacity of the cache. When the cache attempts to load in a graph that exceeds this limit, it is forced to evict an entry and *knn.circuit_breaker.triggered *is set to *true.* The default value for this setting is *60% *of the machines total memory outside the Elasticsearch jvm . However, a value in *KB* can be given as well.
Expand Down Expand Up @@ -288,28 +288,31 @@ Indicates whether the circuit breaker is triggered.
The number of evictions that have occurred in the guava cache. *note:* explicit evictions that occur because of index deletion are not counted.

#### hit_count
The number of cache hits that have occurred on the node. A cache hit occurs when a user queries a graph and it is already loaded into memory.
The number of cache hits that have occurred on the node. A cache hit occurs when a user queries a graph and it is already loaded into memory.

#### miss_count
The number of cache misses that have occurred on the node. A cache miss occurs when a user queries a graph and it has not yet been loaded into memory.

#### graph_memory_usage
The current weight of the cache (the total size in native memory of all of the graphs) in Kilobytes.

#### graph_memory_usage_percentage
The current weight of the cache as a percentage of the maximum cache capacity.

#### graph_index_requests
The number of requests to add the knn_vector field of a document into a graph.

#### graph_index_errors
The number of requests to add the knn_vector field of a document into a graph that have produced an error.

#### graph_query_requests
The number of graph queries that have been made.
The number of graph queries that have been made.

#### graph_query_errors
The number of graph queries that have produced an error.

#### knn_query_requests
The number of KNN query requests received.
The number of KNN query requests received.

#### cache_capacity_reached
Whether the cache capacity for this node has been reached. This capacity can be controlled as part of the *knn.memory.circuit_breaker.limit.*
Expand All @@ -323,6 +326,9 @@ The number of times an item is successfully loaded into the cache.
#### total_load_time
The total time in nanoseconds it has taken to load items into cache (cumulative).

#### indices_in_cache
For each index that has graphs in the cache, this stat provides the number of graphs that index has and the total graph_memory_usage that index is using in Kilobytes.

#### Examples
```
Expand All @@ -340,11 +346,19 @@ GET /_opendistro/_knn/stats?pretty
"eviction_count" : 0,
"miss_count" : 1,
"graph_memory_usage" : 1,
"graph_memory_usage_percentage" : 3.68,
"graph_index_requests" : 7,
"graph_index_errors" : 1,
"knn_query_requests" : 4,
"graph_query_requests" : 30,
"graph_query_errors" : 15,
"indices_in_cache" : {
"myindex" : {
"graph_memory_usage" : 2,
"graph_memory_usage_percentage" : 3.68,
"graph_count" : 2
}
},
"cache_capacity_reached" : false,
"load_exception_count" : 0,
"hit_count" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.knn.index;

import com.amazon.opendistroforelasticsearch.knn.index.v1736.KNNIndex;
import com.amazon.opendistroforelasticsearch.knn.plugin.stats.StatNames;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheStats;
Expand All @@ -29,20 +30,29 @@
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.watcher.WatcherHandle;

import java.io.Closeable;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.amazon.opendistroforelasticsearch.knn.index.KNNSettings.getCircuitBreakerLimit;

/**
* KNNIndex level caching with weight based, time based evictions. This caching helps us
* to manage the hnsw graphs in the memory and garbage collect them after specified timeout
* or when weightCircuitBreaker is hit.
*/
public class KNNIndexCache {
public class KNNIndexCache implements Closeable {
public static String GRAPH_COUNT = "graph_count";

private static Logger logger = LogManager.getLogger(KNNIndexCache.class);

private static KNNIndexCache INSTANCE;
Expand All @@ -60,6 +70,10 @@ public static void setResourceWatcherService(final ResourceWatcherService resour
getInstance().resourceWatcherService = resourceWatcherService;
}

public void close() {
executor.shutdown();
}

/**
* Make sure we just have one instance of cache
* @return KNNIndexCache instance
Expand All @@ -77,7 +91,7 @@ private void initCache() {
.concurrencyLevel(1)
.removalListener(k -> onRemoval(k));
if(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) {
cacheBuilder.maximumWeight(KNNSettings.getCircuitBreakerLimit().getKb()).weigher((k, v) -> (int)v.getKnnIndex().getIndexSize());
cacheBuilder.maximumWeight(getCircuitBreakerLimit().getKb()).weigher((k, v) -> (int)v.getKnnIndex().getIndexSize());
}

if(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED)) {
Expand Down Expand Up @@ -113,6 +127,9 @@ private void onRemoval(RemovalNotification<String, KNNIndexCacheEntry> removalNo

executor.execute(() -> knnIndexCacheEntry.getKnnIndex().close());

String esIndexName = removalNotification.getValue().getEsIndexName();
String indexPathUrl = removalNotification.getValue().getIndexPathUrl();

if (RemovalCause.SIZE == removalNotification.getCause()) {
KNNSettings.state().updateCircuitBreakerSettings(true);
setCacheCapacityReached(true);
Expand Down Expand Up @@ -147,6 +164,35 @@ public CacheStats getStats() {
return cache.stats();
}

/**
* Get the stats of all of the Elasticsearch indices currently loaded into the cache
*
* @return Map containing all of the Elasticsearch indices in the cache and their stats
*/
public Map<String, Map<String, Object>> getIndicesCacheStats() {
Map<String, Map<String, Object>> statValues = new HashMap<>();
String indexName;
for (Map.Entry<String, KNNIndexCacheEntry> index : cache.asMap().entrySet()) {
indexName = index.getValue().getEsIndexName();
statValues.putIfAbsent(indexName, new HashMap<>());
statValues.get(indexName).put(GRAPH_COUNT, ((Integer) statValues.get(indexName)
.getOrDefault(GRAPH_COUNT, 0)) + 1);
statValues.get(indexName).putIfAbsent(StatNames.GRAPH_MEMORY_USAGE.getName(),
getWeightInKilobytes(indexName));
statValues.get(indexName).putIfAbsent(StatNames.GRAPH_MEMORY_USAGE_PERCENTAGE.getName(),
getWeightAsPercentage(indexName));
}

return statValues;
}

protected Set<String> getGraphNamesForIndex(String indexName) {
return cache.asMap().values().stream()
.filter(knnIndexCacheEntry -> indexName.equals(knnIndexCacheEntry.getEsIndexName()))
.map(KNNIndexCacheEntry::getIndexPathUrl)
.collect(Collectors.toSet());
}

/**
* Returns the current weight of the cache in KiloBytes
*
Expand All @@ -156,6 +202,36 @@ public Long getWeightInKilobytes() {
return cache.asMap().values().stream().map(KNNIndexCacheEntry::getKnnIndex).mapToLong(KNNIndex::getIndexSize).sum();
}

/**
* Returns the current weight of an index in the cache in KiloBytes
*
* @param indexName Name if index to get the weight for
* @return Weight of the index in the cache in kilobytes
*/
public Long getWeightInKilobytes(final String indexName) {
return cache.asMap().values().stream()
.filter(knnIndexCacheEntry -> indexName.equals(knnIndexCacheEntry.getEsIndexName()))
.map(KNNIndexCacheEntry::getKnnIndex).mapToLong(KNNIndex::getIndexSize).sum();
}

/**
* Returns how full the cache is as a percentage of the total cache capacity
*
* @return Percentage of the cache full
*/
public Float getWeightAsPercentage() {
return 100 * getWeightInKilobytes() / (float) getCircuitBreakerLimit().getKb();
}

/**
* Returns the how much space an index is taking up in the cache is as a percentage of the total cache capacity
*
* @return Percentage of the cache full
*/
public Float getWeightAsPercentage(final String indexName) {
return 100 * getWeightInKilobytes(indexName) / (float) getCircuitBreakerLimit().getKb();
}

/**
* Returns whether or not the capacity of the cache has been reached
*
Expand All @@ -174,6 +250,24 @@ public void setCacheCapacityReached(Boolean value) {
cacheCapacityReached.set(value);
}

/**
* Evict a graph in the cache manually
*
* @param indexFilePath path to segment file. Also, key in cache
*/
public void evictGraphFromCache(String indexFilePath) {
logger.info("[KNN] " + indexFilePath + " invalidated explicitly");
cache.invalidate(indexFilePath);
}

/**
* Evict all graphs in the cache manually
*/
public void evictAllGraphsFromCache() {
logger.info("[KNN] All entries in cache invalidated explicitly");
cache.invalidateAll();
}

/**
* Loads hnsw index to memory. Registers the location of the serialized graph with ResourceWatcher.
*
Expand Down Expand Up @@ -203,7 +297,7 @@ public KNNIndexCacheEntry loadIndex(String indexPathUrl, String indexName) throw
// causes us to invalidate an entry before the key has been fully loaded.
final WatcherHandle<FileWatcher> watcherHandle = resourceWatcherService.add(fileWatcher);

return new KNNIndexCacheEntry(knnIndex, watcherHandle);
return new KNNIndexCacheEntry(knnIndex, indexPathUrl, indexName, watcherHandle);
}

/**
Expand All @@ -213,17 +307,30 @@ public KNNIndexCacheEntry loadIndex(String indexPathUrl, String indexName) throw
*/
private static class KNNIndexCacheEntry {
private final KNNIndex knnIndex;
private final String indexPathUrl;
private final String esIndexName;
private final WatcherHandle<FileWatcher> fileWatcherHandle;

private KNNIndexCacheEntry(final KNNIndex knnIndex, final WatcherHandle<FileWatcher> fileWatcherHandle) {
private KNNIndexCacheEntry(final KNNIndex knnIndex, final String indexPathUrl, final String esIndexName,
final WatcherHandle<FileWatcher> fileWatcherHandle) {
this.knnIndex = knnIndex;
this.indexPathUrl = indexPathUrl;
this.esIndexName = esIndexName;
this.fileWatcherHandle = fileWatcherHandle;
}

private KNNIndex getKnnIndex() {
return knnIndex;
}

private String getIndexPathUrl() {
return indexPathUrl;
}

private String getEsIndexName() {
return esIndexName;
}

private WatcherHandle<FileWatcher> getFileWatcherHandle() {
return fileWatcherHandle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class KNNStatsConfig {
new KNNInnerCacheStatsSupplier(CacheStats::evictionCount)))
.put(StatNames.GRAPH_MEMORY_USAGE.getName(), new KNNStat<>(false,
new KNNCacheSupplier<>(KNNIndexCache::getWeightInKilobytes)))
.put(StatNames.GRAPH_MEMORY_USAGE_PERCENTAGE.getName(), new KNNStat<>(false,
new KNNCacheSupplier<>(KNNIndexCache::getWeightAsPercentage)))
.put(StatNames.CACHE_CAPACITY_REACHED.getName(), new KNNStat<>(false,
new KNNCacheSupplier<>(KNNIndexCache::isCacheCapacityReached)))
.put(StatNames.GRAPH_QUERY_ERRORS.getName(), new KNNStat<>(false,
Expand All @@ -54,5 +56,7 @@ public class KNNStatsConfig {
.put(StatNames.CIRCUIT_BREAKER_TRIGGERED.getName(), new KNNStat<>(true,
new KNNCircuitBreakerSupplier()))
.put(StatNames.KNN_QUERY_REQUESTS.getName(), new KNNStat<>(false,
new KNNCounterSupplier(KNNCounter.KNN_QUERY_REQUESTS))).build();
new KNNCounterSupplier(KNNCounter.KNN_QUERY_REQUESTS)))
.put(StatNames.INDICES_IN_CACHE.getName(), new KNNStat<>(false,
new KNNCacheSupplier<>(KNNIndexCache::getIndicesCacheStats))).build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public enum StatNames {
TOTAL_LOAD_TIME("total_load_time"),
EVICTION_COUNT("eviction_count"),
GRAPH_MEMORY_USAGE("graph_memory_usage"),
GRAPH_MEMORY_USAGE_PERCENTAGE("graph_memory_usage_percentage"),
CACHE_CAPACITY_REACHED("cache_capacity_reached"),
INDICES_IN_CACHE("indices_in_cache"),
CIRCUIT_BREAKER_TRIGGERED("circuit_breaker_triggered"),
GRAPH_QUERY_ERRORS(KNNCounter.GRAPH_QUERY_ERRORS.getName()),
GRAPH_QUERY_REQUESTS(KNNCounter.GRAPH_QUERY_REQUESTS.getName()),
Expand Down
Loading

0 comments on commit 58e2c22

Please sign in to comment.