Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add training stats and library initialized stats #191

Merged
merged 10 commits into from
Nov 9, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,22 @@ public long getCacheSizeInKilobytes() {
}

/**
* Returns the current size of an index in the cache in KiloBytes.
* Returns how full the cache is as a percentage of the total cache capacity.
*
* @param indexName Name if index to get the weight for
* @return Size of the index in the cache in kilobytes
* @return Percentage of the cache full
*/
public Long getIndexSizeInKilobytes(final String indexName) {
Validate.notNull(indexName, "Index name cannot be null");
public Float getCacheSizeAsPercentage() {
return 100 * getCacheSizeInKilobytes() / (float) KNNSettings.getCircuitBreakerLimit().getKb();
}

/**
* Getter for current size of all indices in Kilobytes.
*
* @return current size of the cache
*/
public long getIndicesSizeInKilobytes() {
return cache.asMap().values().stream()
.filter(nativeMemoryAllocation -> nativeMemoryAllocation instanceof NativeMemoryAllocation.IndexAllocation)
.filter(indexAllocation -> indexName.equals(((NativeMemoryAllocation.IndexAllocation) indexAllocation).getOpenSearchIndexName()))
.mapToLong(NativeMemoryAllocation::getSizeInKB)
.sum();
}
Expand All @@ -137,8 +143,23 @@ public Long getIndexSizeInKilobytes(final String indexName) {
*
* @return Percentage of the cache full
*/
public Float getCacheSizeAsPercentage() {
return 100 * getCacheSizeInKilobytes() / (float) KNNSettings.getCircuitBreakerLimit().getKb();
public Float getIndicesSizeAsPercentage() {
return 100 * getIndicesSizeInKilobytes() / (float) KNNSettings.getCircuitBreakerLimit().getKb();
}

/**
* Returns the current size of an index in the cache in KiloBytes.
*
* @param indexName Name if index to get the weight for
* @return Size of the index in the cache in kilobytes
*/
public Long getIndexSizeInKilobytes(final String indexName) {
Validate.notNull(indexName, "Index name cannot be null");
return cache.asMap().values().stream()
.filter(nativeMemoryAllocation -> nativeMemoryAllocation instanceof NativeMemoryAllocation.IndexAllocation)
.filter(indexAllocation -> indexName.equals(((NativeMemoryAllocation.IndexAllocation) indexAllocation).getOpenSearchIndexName()))
.mapToLong(NativeMemoryAllocation::getSizeInKB)
.sum();
}

/**
Expand All @@ -152,6 +173,30 @@ public Float getIndexSizeAsPercentage(final String indexName) {
return 100 * getIndexSizeInKilobytes(indexName) / (float) KNNSettings.getCircuitBreakerLimit().getKb();
}

/**
* Getter for current size of all training jobs in Kilobytes.
*
* @return current size of the cache
*/
public long getTrainingSizeInKilobytes() {
// Currently, all allocations that are not index allocations will be for training.
return cache.asMap().values().stream()
.filter(nativeMemoryAllocation ->
nativeMemoryAllocation instanceof NativeMemoryAllocation.TrainingDataAllocation ||
nativeMemoryAllocation instanceof NativeMemoryAllocation.AnonymousAllocation)
.mapToLong(NativeMemoryAllocation::getSizeInKB)
.sum();
}

/**
* Returns how full the cache is as a percentage of the total cache capacity.
*
* @return Percentage of the cache full
*/
public Float getTrainingSizeAsPercentage() {
return 100 * getTrainingSizeInKilobytes() / (float) KNNSettings.getCircuitBreakerLimit().getKb();
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Getter for maximum weight of the cache.
*
Expand Down Expand Up @@ -245,9 +290,9 @@ public void setCacheCapacityReached(Boolean value) {
}

/**
* Get the stats of all of the Elasticsearch indices currently loaded into the cache
* Get the stats of all of the OpenSearch indices currently loaded into the cache
*
* @return Map containing all of the Elasticsearch indices in the cache and their stats
* @return Map containing all of the OpenSearch indices in the cache and their stats
*/
public Map<String, Map<String, Object>> getIndicesCacheStats() {
Map<String, Map<String, Object>> statValues = new HashMap<>();
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/opensearch/knn/index/util/KNNEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.knn.index.SpaceType;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.knn.common.KNNConstants.FAISS_NAME;
import static org.opensearch.knn.common.KNNConstants.NMSLIB_NAME;
Expand All @@ -41,9 +42,11 @@ public enum KNNEngine implements KNNLibrary {
KNNEngine(String name, KNNLibrary knnLibrary) {
this.name = name;
this.knnLibrary = knnLibrary;
this.initialized = new AtomicBoolean(false);
}

private String name;
private AtomicBoolean initialized;
private KNNLibrary knnLibrary;

/**
Expand Down Expand Up @@ -143,4 +146,14 @@ public Map<String, Object> getMethodAsMap(KNNMethodContext knnMethodContext) {
public int estimateOverheadInKB(KNNMethodContext knnMethodContext, int dimension) {
return knnLibrary.estimateOverheadInKB(knnMethodContext, dimension);
}

@Override
public Boolean isInitialized() {
return knnLibrary.isInitialized();
}

@Override
public void setInitialized(Boolean isInitialized) {
knnLibrary.setInitialized(isInitialized);
}
}
27 changes: 27 additions & 0 deletions src/main/java/org/opensearch/knn/index/util/KNNLibrary.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.opensearch.knn.common.KNNConstants.BYTES_PER_KILOBYTES;
Expand Down Expand Up @@ -138,6 +139,20 @@ public interface KNNLibrary {
*/
Map<String, Object> getMethodAsMap(KNNMethodContext knnMethodContext);

/**
* Getter for initialized
*
* @return whether library has been initialized
*/
Boolean isInitialized();
VijayanB marked this conversation as resolved.
Show resolved Hide resolved

/**
* Set initialized to true
*
* @param isInitialized whether library has been initialized
*/
void setInitialized(Boolean isInitialized);

/**
* Abstract implementation of KNNLibrary. It contains several default methods and fields that
* are common across different underlying libraries.
Expand All @@ -148,6 +163,7 @@ abstract class NativeLibrary implements KNNLibrary {
private String latestLibraryBuildVersion;
private String latestLibraryVersion;
private String extension;
private AtomicBoolean initialized;

/**
* Constructor for NativeLibrary
Expand All @@ -166,6 +182,7 @@ public NativeLibrary(Map<String, KNNMethod> methods, Map<SpaceType, Function<Flo
this.latestLibraryBuildVersion = latestLibraryBuildVersion;
this.latestLibraryVersion = latestLibraryVersion;
this.extension = extension;
this.initialized = new AtomicBoolean(false);
}

@Override
Expand Down Expand Up @@ -235,6 +252,16 @@ public Map<String, Object> getMethodAsMap(KNNMethodContext knnMethodContext) {

return knnMethod.getAsMap(knnMethodContext);
}

@Override
public Boolean isInitialized() {
return initialized.get();
}

@Override
public void setInitialized(Boolean isInitialized) {
initialized.set(isInitialized);
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/knn/jni/FaissService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.KNNQueryResult;
import org.opensearch.knn.index.util.KNNEngine;

import java.security.AccessController;
import java.security.PrivilegedAction;
Expand All @@ -34,6 +35,7 @@ class FaissService {
initLibrary();
return null;
});
KNNEngine.FAISS.setInitialized(true);
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/knn/jni/NmslibService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.KNNQueryResult;
import org.opensearch.knn.index.util.KNNEngine;

import java.security.AccessController;
import java.security.PrivilegedAction;
Expand All @@ -34,6 +35,7 @@ class NmslibService {
initLibrary();
return null;
});
KNNEngine.NMSLIB.setInitialized(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public enum KNNCounter {
SCRIPT_COMPILATIONS("script_compilations"),
SCRIPT_COMPILATION_ERRORS("script_compilation_errors"),
SCRIPT_QUERY_REQUESTS("script_query_requests"),
SCRIPT_QUERY_ERRORS("script_query_errors");
SCRIPT_QUERY_ERRORS("script_query_errors"),
TRAINING_REQUESTS("training_requests"),
TRAINING_ERRORS("training_errors");

private String name;
private AtomicLong count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import com.google.common.collect.ImmutableMap;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.util.KNNEngine;
import org.opensearch.knn.indices.ModelCache;
import org.opensearch.knn.indices.ModelDao;
import org.opensearch.knn.plugin.stats.suppliers.LibraryInitializedSupplier;
import org.opensearch.knn.plugin.stats.suppliers.EventOccurredWithinThresholdSupplier;
import org.opensearch.knn.plugin.stats.suppliers.KNNCircuitBreakerSupplier;
import org.opensearch.knn.plugin.stats.suppliers.KNNCounterSupplier;
Expand All @@ -57,9 +59,9 @@ public class KNNStatsConfig {
.put(StatNames.EVICTION_COUNT.getName(), new KNNStat<>(false,
new KNNInnerCacheStatsSupplier(CacheStats::evictionCount)))
.put(StatNames.GRAPH_MEMORY_USAGE.getName(), new KNNStat<>(false,
new NativeMemoryCacheManagerSupplier<>(NativeMemoryCacheManager::getCacheSizeInKilobytes)))
new NativeMemoryCacheManagerSupplier<>(NativeMemoryCacheManager::getIndicesSizeInKilobytes)))
.put(StatNames.GRAPH_MEMORY_USAGE_PERCENTAGE.getName(), new KNNStat<>(false,
new NativeMemoryCacheManagerSupplier<>(NativeMemoryCacheManager::getCacheSizeAsPercentage)))
new NativeMemoryCacheManagerSupplier<>(NativeMemoryCacheManager::getIndicesSizeAsPercentage)))
.put(StatNames.INDICES_IN_CACHE.getName(), new KNNStat<>(false,
new NativeMemoryCacheManagerSupplier<>(NativeMemoryCacheManager::getIndicesCacheStats)))
.put(StatNames.CACHE_CAPACITY_REACHED.getName(), new KNNStat<>(false,
Expand Down Expand Up @@ -91,5 +93,17 @@ public class KNNStatsConfig {
new ModelIndexingDegradingSupplier(ModelCache::getEvictedDueToSizeAt),
KNNConstants.MODEL_CACHE_CAPACITY_ATROPHY_THRESHOLD_IN_MINUTES,
ChronoUnit.MINUTES)))
.put(StatNames.FAISS_LOADED.getName(), new KNNStat<>(false,
new LibraryInitializedSupplier(KNNEngine.FAISS)))
.put(StatNames.NMSLIB_LOADED.getName(), new KNNStat<>(false,
new LibraryInitializedSupplier(KNNEngine.NMSLIB)))
.put(StatNames.TRAINING_REQUESTS.getName(), new KNNStat<>(false,
new KNNCounterSupplier(KNNCounter.TRAINING_REQUESTS)))
.put(StatNames.TRAINING_ERRORS.getName(), new KNNStat<>(false,
new KNNCounterSupplier(KNNCounter.TRAINING_ERRORS)))
.put(StatNames.TRAINING_MEMORY_USAGE.getName(), new KNNStat<>(false,
new NativeMemoryCacheManagerSupplier<>(NativeMemoryCacheManager::getTrainingSizeInKilobytes)))
.put(StatNames.TRAINING_MEMORY_USAGE_PERCENTAGE.getName(), new KNNStat<>(false,
new NativeMemoryCacheManagerSupplier<>(NativeMemoryCacheManager::getTrainingSizeAsPercentage)))
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public enum StatNames {
INDICES_IN_CACHE("indices_in_cache"),
CIRCUIT_BREAKER_TRIGGERED("circuit_breaker_triggered"),
MODEL_INDEX_STATUS("model_index_status"),
FAISS_LOADED("faiss_initialized"),
NMSLIB_LOADED("nmslib_initialized"),
INDEXING_FROM_MODEL_DEGRADED("indexing_from_model_degraded"),
GRAPH_QUERY_ERRORS(KNNCounter.GRAPH_QUERY_ERRORS.getName()),
GRAPH_QUERY_REQUESTS(KNNCounter.GRAPH_QUERY_REQUESTS.getName()),
Expand All @@ -53,6 +55,10 @@ public enum StatNames {
SCRIPT_COMPILATIONS(KNNCounter.SCRIPT_COMPILATIONS.getName()),
SCRIPT_COMPILATION_ERRORS(KNNCounter.SCRIPT_COMPILATION_ERRORS.getName()),
SCRIPT_QUERY_REQUESTS(KNNCounter.SCRIPT_QUERY_REQUESTS.getName()),
TRAINING_REQUESTS(KNNCounter.TRAINING_REQUESTS.getName()),
TRAINING_ERRORS(KNNCounter.TRAINING_ERRORS.getName()),
TRAINING_MEMORY_USAGE("training_memory_usage"),
TRAINING_MEMORY_USAGE_PERCENTAGE("training_memory_usage_percentage"),
SCRIPT_QUERY_ERRORS(KNNCounter.SCRIPT_QUERY_ERRORS.getName());

private String name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.knn.plugin.stats.suppliers;

import org.opensearch.knn.index.util.KNNLibrary;

import java.util.function.Supplier;

/**
* Supplier to determine whether library has been initialized
*/
public class LibraryInitializedSupplier implements Supplier<Boolean> {
private KNNLibrary knnLibrary;

/**
* Constructor
*
* @param knnLibrary to check if initialized
*/
public LibraryInitializedSupplier(KNNLibrary knnLibrary) {
this.knnLibrary = knnLibrary;
}

@Override
public Boolean get() {
return knnLibrary.isInitialized();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.opensearch.common.ValidationException;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.inject.Inject;
import org.opensearch.knn.plugin.stats.KNNCounter;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportService;

import javax.swing.*;
import java.util.concurrent.RejectedExecutionException;

import static org.opensearch.knn.common.KNNConstants.BYTES_PER_KILOBYTES;
Expand Down Expand Up @@ -58,10 +60,15 @@ protected void doExecute(Task task, TrainingModelRequest request,
// Get the size of the training request and then route the request. We get/set this here, as opposed to in
// TrainingModelTransportAction, because in the future, we may want to use size to factor into our routing
// decision.
KNNCounter.TRAINING_REQUESTS.increment();
ActionListener<TrainingModelResponse> wrappedListener = ActionListener.wrap(listener::onResponse, ex -> {
KNNCounter.TRAINING_ERRORS.increment();
listener.onFailure(ex);
});
getTrainingIndexSizeInKB(request, ActionListener.wrap(size -> {
request.setTrainingDataSizeInKB(size);
routeRequest(request, listener);
}, listener::onFailure));
routeRequest(request, wrappedListener);
}, wrappedListener::onFailure));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}, wrappedListener::onFailure));
}, ex-> {
KNNCounter.TRAINING_ERRORS.increment();
listener.onFailure(ex);
}));

Copy link
Member Author

@jmazanec15 jmazanec15 Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In its new location, I think it makes sense to be a variable

}

protected void routeRequest(TrainingModelRequest request, ActionListener<TrainingModelResponse> listener) {
Expand Down
Loading