diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index e0e70310f..526a529f1 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -20,16 +20,17 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; import org.opensearch.knn.index.memory.NativeMemoryCacheManager; +import org.opensearch.knn.index.memory.NativeMemoryCacheManagerDto; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.monitor.os.OsProbe; import java.security.InvalidParameterException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -41,16 +42,15 @@ /** * This class defines - * 1. KNN settings to hold the HNSW algorithm parameters. - * https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md + * 1. KNN settings to hold the HNSW algorithm parameters. * 2. KNN settings to enable/disable plugin, circuit breaker settings * 3. KNN settings to manage graphs loaded in native memory */ public class KNNSettings { - private static Logger logger = LogManager.getLogger(KNNSettings.class); + private static final Logger logger = LogManager.getLogger(KNNSettings.class); private static KNNSettings INSTANCE; - private static OsProbe osProbe = OsProbe.getInstance(); + private static final OsProbe osProbe = OsProbe.getInstance(); private static final int INDEX_THREAD_QTY_MAX = 32; @@ -85,6 +85,7 @@ public class KNNSettings { public static final Integer KNN_DEFAULT_CIRCUIT_BREAKER_UNSET_PERCENTAGE = 75; public static final Integer KNN_DEFAULT_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE = 10; // By default, set aside 10% of the JVM for the limit public static final Integer KNN_MAX_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE = 25; // Model cache limit cannot exceed 25% of the JVM heap + public static final String KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT = "50%"; /** * Settings Definition @@ -233,7 +234,13 @@ public class KNNSettings { put(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, Setting.boolSetting(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, true, NodeScope, Dynamic)); put( KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, - knnMemoryCircuitBreakerSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, "50%", NodeScope, Dynamic) + new Setting<>( + KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, + KNNSettings.KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT, + (s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT), + NodeScope, + Dynamic + ) ); /** @@ -247,9 +254,6 @@ public class KNNSettings { } }; - /** Latest setting value for each registered key. Thread-safe is required. */ - private final Map latestSettings = new ConcurrentHashMap<>(); - private ClusterService clusterService; private Client client; @@ -262,35 +266,32 @@ public static synchronized KNNSettings state() { return INSTANCE; } - public void setSettingsUpdateConsumers() { - for (Setting setting : dynamicCacheSettings.values()) { - clusterService.getClusterSettings().addSettingsUpdateConsumer(setting, newVal -> { - logger.debug("The value of setting [{}] changed to [{}]", setting.getKey(), newVal); - latestSettings.put(setting.getKey(), newVal); - - // Rebuild the cache with updated limit - NativeMemoryCacheManager.getInstance().rebuildCache(); - }); - } + private void setSettingsUpdateConsumers() { + clusterService.getClusterSettings().addSettingsUpdateConsumer(updatedSettings -> { + // When any of the dynamic settings are updated, rebuild the cache with the updated values. Use the current + // cluster settings values as defaults. + NativeMemoryCacheManagerDto.NativeMemoryCacheManagerDtoBuilder builder = NativeMemoryCacheManagerDto.builder(); - /** - * We do not have to rebuild the cache for below settings - */ - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING, - newVal -> { latestSettings.put(KNN_CIRCUIT_BREAKER_TRIGGERED, newVal); } + builder.isWeightLimited( + updatedSettings.getAsBoolean(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) ); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING, - newVal -> { latestSettings.put(KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE, newVal); } + + builder.maxWeight(((ByteSizeValue) getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb()); + if (updatedSettings.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)) { + builder.maxWeight(((ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).get(updatedSettings)).getKb()); + } + + builder.isExpirationLimited( + updatedSettings.getAsBoolean(KNN_CACHE_ITEM_EXPIRY_ENABLED, getSettingValue(KNN_CACHE_ITEM_EXPIRY_ENABLED)) ); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - KNN_ALGO_PARAM_INDEX_THREAD_QTY_SETTING, - newVal -> { latestSettings.put(KNN_ALGO_PARAM_INDEX_THREAD_QTY, newVal); } + + builder.expiryTimeInMin( + updatedSettings.getAsTime(KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES, getSettingValue(KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)) + .getMinutes() ); + + NativeMemoryCacheManager.getInstance().rebuildCache(builder.build()); + }, new ArrayList<>(dynamicCacheSettings.values())); } /** @@ -302,10 +303,10 @@ public void setSettingsUpdateConsumers() { */ @SuppressWarnings("unchecked") public T getSettingValue(String key) { - return (T) latestSettings.getOrDefault(key, getSetting(key).getDefault(Settings.EMPTY)); + return (T) clusterService.getClusterSettings().get(getSetting(key)); } - public Setting getSetting(String key) { + private Setting getSetting(String key) { if (dynamicCacheSettings.containsKey(key)) { return dynamicCacheSettings.get(key); } @@ -364,19 +365,6 @@ public void initialize(Client client, ClusterService clusterService) { setSettingsUpdateConsumers(); } - /** - * Creates a setting which specifies a circuit breaker memory limit. This can either be - * specified as an absolute bytes value or as a percentage. - * - * @param key the key for the setting - * @param defaultValue the default value for this setting - * @param properties properties properties for this setting like scope, filtering... - * @return the setting object - */ - public static Setting knnMemoryCircuitBreakerSetting(String key, String defaultValue, Setting.Property... properties) { - return new Setting<>(key, defaultValue, (s) -> parseknnMemoryCircuitBreakerValue(s, key), properties); - } - public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, String settingName) { settingName = Objects.requireNonNull(settingName); if (sValue != null && sValue.endsWith("%")) { @@ -436,24 +424,11 @@ public void onFailure(Exception e) { * @return efSearch value */ public static int getEfSearchParam(String index) { - return getIndexSettingValue(index, KNN_ALGO_PARAM_EF_SEARCH, 512); - } - - /** - * - * @param index Name of the index - * @return spaceType name in KNN plugin - */ - public static String getSpaceType(String index) { return KNNSettings.state().clusterService.state() .getMetadata() .index(index) .getSettings() - .get(KNN_SPACE_TYPE, SpaceType.DEFAULT.getValue()); - } - - public static int getIndexSettingValue(String index, String settingName, int defaultValue) { - return KNNSettings.state().clusterService.state().getMetadata().index(index).getSettings().getAsInt(settingName, defaultValue); + .getAsInt(KNNSettings.KNN_ALGO_PARAM_EF_SEARCH, 512); } public void setClusterService(ClusterService clusterService) { @@ -475,7 +450,6 @@ public void validate(String value) { public void onIndexModule(IndexModule module) { module.addSettingsUpdateConsumer(INDEX_KNN_ALGO_PARAM_EF_SEARCH_SETTING, newVal -> { logger.debug("The value of [KNN] setting [{}] changed to [{}]", KNN_ALGO_PARAM_EF_SEARCH, newVal); - latestSettings.put(KNN_ALGO_PARAM_EF_SEARCH, newVal); // TODO: replace cache-rebuild with index reload into the cache NativeMemoryCacheManager.getInstance().rebuildCache(); }); diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index efdc4fd31..8b3a3bce1 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -40,11 +40,11 @@ public class NativeMemoryCacheManager implements Closeable { public static String GRAPH_COUNT = "graph_count"; - private static Logger logger = LogManager.getLogger(NativeMemoryCacheManager.class); + private static final Logger logger = LogManager.getLogger(NativeMemoryCacheManager.class); private static NativeMemoryCacheManager INSTANCE; private Cache cache; - private ExecutorService executor; + private final ExecutorService executor; private AtomicBoolean cacheCapacityReached; private long maxWeight; @@ -68,20 +68,31 @@ public static synchronized NativeMemoryCacheManager getInstance() { } private void initialize() { + initialize( + NativeMemoryCacheManagerDto.builder() + .isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) + .maxWeight(KNNSettings.getCircuitBreakerLimit().getKb()) + .isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED)) + .expiryTimeInMin( + ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes() + ) + .build() + ); + } + + private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { CacheBuilder cacheBuilder = CacheBuilder.newBuilder() .recordStats() .concurrencyLevel(1) .removalListener(this::onRemoval); - if (KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) { - maxWeight = KNNSettings.getCircuitBreakerLimit().getKb(); - cacheBuilder.maximumWeight(maxWeight).weigher((k, v) -> v.getSizeInKB()); + if (nativeMemoryCacheDTO.isWeightLimited()) { + this.maxWeight = nativeMemoryCacheDTO.getMaxWeight(); + cacheBuilder.maximumWeight(this.maxWeight).weigher((k, v) -> v.getSizeInKB()); } - if (KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED)) { - long expiryTime = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)) - .getMinutes(); - cacheBuilder.expireAfterAccess(expiryTime, TimeUnit.MINUTES); + if (nativeMemoryCacheDTO.isExpirationLimited()) { + cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES); } cacheCapacityReached = new AtomicBoolean(false); @@ -93,13 +104,32 @@ private void initialize() { * Evicts all entries from the cache and rebuilds. */ public synchronized void rebuildCache() { + rebuildCache( + NativeMemoryCacheManagerDto.builder() + .isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) + .maxWeight(KNNSettings.getCircuitBreakerLimit().getKb()) + .isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED)) + .expiryTimeInMin( + ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes() + ) + .build() + ); + } + + /** + * Evict all entries from the cache and rebuilds + * + * @param nativeMemoryCacheDTO DTO for cache configuration + */ + public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) { logger.info("KNN Cache rebuilding."); - // TODO: Does this really need to be executed with an executor? Also, does invalidateAll really need to be - // called? + // TODO: Does this really need to be executed with an executor? executor.execute(() -> { + // Explicitly invalidate all so that we do not have to wait for garbage collection to be invoked to + // free up native memory cache.invalidateAll(); - initialize(); + initialize(nativeMemoryCacheDTO); }); } diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerDto.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerDto.java new file mode 100644 index 000000000..e5c1484ed --- /dev/null +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerDto.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.index.memory; + +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class NativeMemoryCacheManagerDto { + boolean isWeightLimited; + long maxWeight; + boolean isExpirationLimited; + long expiryTimeInMin; +} diff --git a/src/test/java/org/opensearch/knn/KNNTestCase.java b/src/test/java/org/opensearch/knn/KNNTestCase.java index d5ac75287..f5955f959 100644 --- a/src/test/java/org/opensearch/knn/KNNTestCase.java +++ b/src/test/java/org/opensearch/knn/KNNTestCase.java @@ -5,6 +5,13 @@ package org.opensearch.knn; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.memory.NativeMemoryCacheManager; import org.opensearch.knn.plugin.stats.KNNCounter; import org.opensearch.common.bytes.BytesReference; @@ -12,24 +19,52 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.test.OpenSearchTestCase; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.mockito.Mockito.when; /** * Base class for integration tests for KNN plugin. Contains several methods for testing KNN ES functionality. */ public class KNNTestCase extends OpenSearchTestCase { + + @Mock + protected ClusterService clusterService; + private AutoCloseable openMocks; + + @Override + public void setUp() throws Exception { + super.setUp(); + openMocks = MockitoAnnotations.openMocks(this); + } + @Override public void tearDown() throws Exception { super.tearDown(); resetState(); + openMocks.close(); } - public static void resetState() { + public void resetState() { // Reset all of the counters for (KNNCounter knnCounter : KNNCounter.values()) { knnCounter.set(0L); } + Set> defaultClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + defaultClusterSettings.addAll( + KNNSettings.state() + .getSettings() + .stream() + .filter(s -> s.getProperties().contains(Setting.Property.NodeScope)) + .collect(Collectors.toList()) + ); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, defaultClusterSettings)); + KNNSettings.state().setClusterService(clusterService); + // Clean up the cache NativeMemoryCacheManager.getInstance().invalidateAll(); NativeMemoryCacheManager.getInstance().close(); diff --git a/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java new file mode 100644 index 000000000..724c9ad6a --- /dev/null +++ b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.index; + +import lombok.SneakyThrows; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.env.Environment; +import org.opensearch.knn.KNNTestCase; +import org.opensearch.knn.plugin.KNNPlugin; +import org.opensearch.node.MockNode; +import org.opensearch.node.Node; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.MockHttpTransport; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.opensearch.test.NodeRoles.dataNode; + +public class KNNSettingsTests extends KNNTestCase { + + @SneakyThrows + public void testGetSettingValueFromConfig() { + long expectedKNNCircuitBreakerLimit = 13; + Node mockNode = createMockNode( + Map.of(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, "\"" + expectedKNNCircuitBreakerLimit + "kb\"") + ); + mockNode.start(); + ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class); + KNNSettings.state().setClusterService(clusterService); + long actualKNNCircuitBreakerLimit = ((ByteSizeValue) KNNSettings.state() + .getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb(); + mockNode.close(); + assertEquals(expectedKNNCircuitBreakerLimit, actualKNNCircuitBreakerLimit); + } + + @SneakyThrows + public void testGetSettingValueDefault() { + Node mockNode = createMockNode(Collections.emptyMap()); + mockNode.start(); + ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class); + KNNSettings.state().setClusterService(clusterService); + long actualKNNCircuitBreakerLimit = ((ByteSizeValue) KNNSettings.state() + .getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb(); + mockNode.close(); + assertEquals( + ((ByteSizeValue) KNNSettings.dynamicCacheSettings.get(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).getDefault(Settings.EMPTY)) + .getKb(), + actualKNNCircuitBreakerLimit + + ); + } + + private Node createMockNode(Map configSettings) throws IOException { + Path configDir = createTempDir(); + File configFile = configDir.resolve("opensearch.yml").toFile(); + FileWriter configFileWriter = new FileWriter(configFile); + + for (Map.Entry setting : configSettings.entrySet()) { + configFileWriter.write("\"" + setting.getKey() + "\": " + setting.getValue()); + } + configFileWriter.close(); + return new MockNode(baseSettings().build(), basePlugins(), configDir, true); + } + + private List> basePlugins() { + List> plugins = new ArrayList<>(); + plugins.add(getTestTransportPlugin()); + plugins.add(MockHttpTransport.TestPlugin.class); + plugins.add(KNNPlugin.class); + return plugins; + } + + private static Settings.Builder baseSettings() { + final Path tempDir = createTempDir(); + return Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", randomLong())) + .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) + .put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType()) + .put(dataNode()); + } +} diff --git a/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java b/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java index edb3ed75d..b5ef7f180 100644 --- a/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java +++ b/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.index.mapper.MapperService; import org.opensearch.knn.KNNTestCase; import org.opensearch.knn.common.KNNConstants; @@ -51,8 +52,10 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -95,6 +98,15 @@ protected void setUpMockClusterService() { ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS); Settings settings = Settings.Builder.EMPTY_SETTINGS; when(clusterService.state().getMetadata().index(Mockito.anyString()).getSettings()).thenReturn(settings); + Set> defaultClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + defaultClusterSettings.addAll( + KNNSettings.state() + .getSettings() + .stream() + .filter(s -> s.getProperties().contains(Setting.Property.NodeScope)) + .collect(Collectors.toList()) + ); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, defaultClusterSettings)); KNNSettings.state().setClusterService(clusterService); }