From cdcaa2d93697d2cf05339780d41f293675aee86b Mon Sep 17 00:00:00 2001
From: Victoria Xia
Date: Thu, 5 Sep 2019 16:56:02 -0400
Subject: [PATCH] feat: add KsqlRocksDBConfigSetter to bound memory and set num
threads (#3167)
---
.../ksql/rest/server/KsqlRestApplication.java | 22 ++-
.../rest/util/RocksDBConfigSetterHandler.java | 46 +++++
.../rest/server/KsqlRestApplicationTest.java | 15 +-
.../util/RocksDBConfigSetterHandlerTest.java | 134 +++++++++++++
ksql-rocksdb-config-setter/pom.xml | 76 +++++++
.../KsqlBoundedMemoryRocksDBConfig.java | 69 +++++++
.../KsqlBoundedMemoryRocksDBConfigSetter.java | 118 +++++++++++
...lBoundedMemoryRocksDBConfigSetterTest.java | 187 ++++++++++++++++++
.../KsqlBoundedMemoryRocksDBConfigTest.java | 108 ++++++++++
pom.xml | 1 +
10 files changed, 770 insertions(+), 6 deletions(-)
create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandler.java
create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandlerTest.java
create mode 100644 ksql-rocksdb-config-setter/pom.xml
create mode 100644 ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfig.java
create mode 100644 ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetter.java
create mode 100644 ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetterTest.java
create mode 100644 ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigTest.java
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
index 6c80edd09134..f5107543faa7 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
@@ -66,6 +66,7 @@
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.ProcessingLogServerUtils;
+import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.services.DefaultServiceContext;
import io.confluent.ksql.services.LazyServiceContext;
import io.confluent.ksql.services.ServiceContext;
@@ -97,6 +98,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executors;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -137,11 +139,13 @@ public final class KsqlRestApplication extends Application imple
private final ServerState serverState;
private final ProcessingLogContext processingLogContext;
private final List preconditions;
+ private final Consumer rocksDBConfigSetterHandler;
public static String getCommandsStreamName() {
return COMMANDS_STREAM_NAME;
}
+ @VisibleForTesting
// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
KsqlRestApplication(
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
@@ -160,7 +164,8 @@ public static String getCommandsStreamName() {
final KsqlSecurityExtension securityExtension,
final ServerState serverState,
final ProcessingLogContext processingLogContext,
- final List preconditions
+ final List preconditions,
+ final Consumer rocksDBConfigSetterHandler
) {
super(config);
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
@@ -183,8 +188,9 @@ public static String getCommandsStreamName() {
this.serviceContextBinderFactory = Objects.requireNonNull(
serviceContextBinderFactory, "serviceContextBinderFactory");
this.securityExtension = Objects.requireNonNull(
- securityExtension, "securityExtension"
- );
+ securityExtension, "securityExtension");
+ this.rocksDBConfigSetterHandler = Objects.requireNonNull(
+ rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler");
}
@Override
@@ -250,6 +256,8 @@ private void waitForPreconditions() {
}
private void initialize() {
+ rocksDBConfigSetterHandler.accept(ksqlConfig);
+
final String commandTopic = commandStore.getCommandTopicName();
KsqlInternalTopicUtils.ensureTopic(
commandTopic,
@@ -543,6 +551,9 @@ static KsqlRestApplication buildApplication(
KsqlServerPrecondition.class
);
+ final Consumer rocksDBConfigSetterHandler =
+ RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter;
+
return new KsqlRestApplication(
serviceContext,
ksqlEngine,
@@ -559,7 +570,8 @@ static KsqlRestApplication buildApplication(
securityExtension,
serverState,
processingLogContext,
- preconditions
+ preconditions,
+ rocksDBConfigSetterHandler
);
}
@@ -596,7 +608,7 @@ private void displayWelcomeMessage() {
writer.flush();
}
- static void maybeCreateProcessingLogStream(
+ private static void maybeCreateProcessingLogStream(
final ProcessingLogConfig config,
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandler.java
new file mode 100644
index 000000000000..ef01ba3a3fed
--- /dev/null
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rest.util;
+
+import io.confluent.ksql.util.KsqlConfig;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+public final class RocksDBConfigSetterHandler {
+
+ private RocksDBConfigSetterHandler() {
+ }
+
+ public static void maybeConfigureRocksDBConfigSetter(final KsqlConfig ksqlConfig) {
+ final Map streamsProps = ksqlConfig.getKsqlStreamConfigProps();
+ final Class> clazz =
+ (Class) streamsProps.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
+
+ if (clazz != null && org.apache.kafka.common.Configurable.class.isAssignableFrom(clazz)) {
+ try {
+ ((org.apache.kafka.common.Configurable) Utils.newInstance(clazz))
+ .configure(ksqlConfig.originals());
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Failed to configure Configurable RocksDBConfigSetter. "
+ + StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG + ": " + clazz.getName(),
+ e);
+ }
+ }
+ }
+}
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java
index 67a5a8eef2c0..5b56d460a350 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java
@@ -56,6 +56,7 @@
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.rest.RestConfig;
import java.util.Collections;
+import java.util.function.Consumer;
import javax.ws.rs.core.Configurable;
import java.util.LinkedList;
import java.util.Optional;
@@ -119,6 +120,8 @@ public class KsqlRestApplicationTest {
private KsqlServerPrecondition precondition1;
@Mock
private KsqlServerPrecondition precondition2;
+ @Mock
+ private Consumer rocksDBConfigSetterHandler;
private PreparedStatement logCreateStatement;
private KsqlRestApplication app;
@@ -164,7 +167,8 @@ public void setUp() {
securityExtension,
serverState,
processingLogContext,
- ImmutableList.of(precondition1, precondition2)
+ ImmutableList.of(precondition1, precondition2),
+ rocksDBConfigSetterHandler
);
}
@@ -351,4 +355,13 @@ public void shouldNotInitializeUntilPreconditionsChecked() {
inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext);
inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext);
}
+
+ @Test
+ public void shouldConfigureRocksDBConfigSetter() {
+ // When:
+ app.startKsql();
+
+ // Then:
+ verify(rocksDBConfigSetterHandler).accept(ksqlConfig);
+ }
}
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandlerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandlerTest.java
new file mode 100644
index 000000000000..f2107532c4c7
--- /dev/null
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/RocksDBConfigSetterHandlerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rest.util;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import io.confluent.ksql.util.KsqlConfig;
+import io.confluent.ksql.util.KsqlException;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.rocksdb.Options;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocksDBConfigSetterHandlerTest {
+
+ @Mock
+ private KsqlConfig ksqlConfig;
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void shouldConfigure() throws Exception {
+ // Given:
+ when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
+ ImmutableMap.of(
+ StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+ Class.forName("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$ConfigurableTestRocksDBConfigSetter"))
+ );
+ final Runnable mockRunnable = mock(Runnable.class);
+ when(ksqlConfig.originals()).thenReturn(
+ ImmutableMap.of(ConfigurableTestRocksDBConfigSetter.TEST_CONFIG, mockRunnable));
+
+ // When:
+ RocksDBConfigSetterHandler.maybeConfigureRocksDBConfigSetter(ksqlConfig);
+
+ // Then:
+ verify(mockRunnable).run();
+ }
+
+ @Test
+ public void shouldStartWithNonConfigurableRocksDBConfigSetter() throws Exception {
+ // Given:
+ when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
+ ImmutableMap.of(
+ StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+ Class.forName("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$NonConfigurableTestRocksDBConfigSetter"))
+ );
+
+ // No error when:
+ RocksDBConfigSetterHandler.maybeConfigureRocksDBConfigSetter(ksqlConfig);
+ }
+
+ @Test
+ public void shouldThrowIfFailToRocksDBConfigSetter() throws Exception {
+ // Given:
+ when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
+ ImmutableMap.of(
+ StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+ Class.forName("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$ConfigurableTestRocksDBConfigSetterWithoutPublicConstructor"))
+ );
+
+ // Expect:
+ expectedException.expect(ConfigException.class);
+ expectedException.expectMessage(containsString("Failed to configure Configurable RocksDBConfigSetter."));
+ expectedException.expectMessage(containsString(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG));
+ expectedException.expectMessage(containsString("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$ConfigurableTestRocksDBConfigSetterWithoutPublicConstructor"));
+
+ // When:
+ RocksDBConfigSetterHandler.maybeConfigureRocksDBConfigSetter(ksqlConfig);
+ }
+
+ public static class ConfigurableTestRocksDBConfigSetter
+ extends NonConfigurableTestRocksDBConfigSetter
+ implements org.apache.kafka.common.Configurable {
+
+ static final String TEST_CONFIG = "test.runnable";
+
+ @Override
+ public void configure(final Map config) {
+ final Runnable supplier = (Runnable) config.get(TEST_CONFIG);
+ supplier.run();
+ }
+ }
+
+ static class ConfigurableTestRocksDBConfigSetterWithoutPublicConstructor
+ extends NonConfigurableTestRocksDBConfigSetter
+ implements org.apache.kafka.common.Configurable {
+
+ @Override
+ public void configure(final Map config) {
+ }
+ }
+
+ private static class NonConfigurableTestRocksDBConfigSetter implements RocksDBConfigSetter {
+
+ @Override
+ public void setConfig(
+ final String storeName,
+ final Options options,
+ final Map configs) {
+ // do nothing
+ }
+
+ @Override
+ public void close(final String storeName, final Options options) {
+ }
+ }
+}
\ No newline at end of file
diff --git a/ksql-rocksdb-config-setter/pom.xml b/ksql-rocksdb-config-setter/pom.xml
new file mode 100644
index 000000000000..4b1fe9699ff8
--- /dev/null
+++ b/ksql-rocksdb-config-setter/pom.xml
@@ -0,0 +1,76 @@
+
+
+
+
+ 4.0.0
+
+
+ io.confluent.ksql
+ ksql-parent
+ 5.3.1-SNAPSHOT
+
+
+ ksql-rocksdb-config-setter
+ KSQL RocksDB Config Setter
+ jar
+
+
+
+ org.apache.kafka
+ kafka-streams
+
+
+
+ junit
+ junit
+ test
+
+
+ org.hamcrest
+ hamcrest-all
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ com.google.guava
+ guava
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
diff --git a/ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfig.java b/ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfig.java
new file mode 100644
index 000000000000..09975b051591
--- /dev/null
+++ b/ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rocksdb;
+
+import java.util.Map;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+public class KsqlBoundedMemoryRocksDBConfig extends AbstractConfig {
+
+ private static final String CONFIG_PREFIX = "ksql.plugins.rocksdb.";
+
+ public static final String TOTAL_OFF_HEAP_MEMORY_CONFIG = CONFIG_PREFIX + "total.memory";
+ private static final String TOTAL_OFF_HEAP_MEMORY_DOC =
+ "All RocksDB instances across all KSQL queries (i.e., all Kafka Streams applications) "
+ + "will be limited to sharing this much memory (in bytes).";
+
+ public static final String N_BACKGROUND_THREADS_CONFIG =
+ CONFIG_PREFIX + "num.background.threads";
+ private static final String N_BACKGROUND_THREADS_DOC =
+ "Number of low-priority RocksDB threads to be shared among instances for compaction.";
+
+ public static final String INDEX_FILTER_BLOCK_RATIO_CONFIG =
+ CONFIG_PREFIX + "index.filter.block.ratio";
+ private static final double INDEX_FILTER_BLOCK_RATIO_DEFAULT = 0.0;
+ private static final String INDEX_FILTER_BLOCK_RATIO_DOC =
+ "Percentage of the RocksDB block cache to set aside for high-priority entries, i.e., "
+ + "index and filter blocks.";
+
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(
+ TOTAL_OFF_HEAP_MEMORY_CONFIG,
+ Type.LONG,
+ ConfigDef.NO_DEFAULT_VALUE,
+ Importance.HIGH,
+ TOTAL_OFF_HEAP_MEMORY_DOC)
+ .define(
+ N_BACKGROUND_THREADS_CONFIG,
+ Type.INT,
+ 1,
+ Importance.MEDIUM,
+ N_BACKGROUND_THREADS_DOC)
+ .define(
+ INDEX_FILTER_BLOCK_RATIO_CONFIG,
+ Type.DOUBLE,
+ INDEX_FILTER_BLOCK_RATIO_DEFAULT,
+ Importance.LOW,
+ INDEX_FILTER_BLOCK_RATIO_DOC
+ );
+
+ public KsqlBoundedMemoryRocksDBConfig(final Map, ?> properties) {
+ super(CONFIG_DEF, properties);
+ }
+}
diff --git a/ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetter.java b/ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetter.java
new file mode 100644
index 000000000000..9ab95220bd4f
--- /dev/null
+++ b/ksql-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetter.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rocksdb;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+
+/**
+ * This {@code RocksDBConfigSetter} implementation limits the total memory used
+ * across all RocksDB instances to the number of bytes passed via
+ * "ksql.plugins.rocksdb.total.memory", and also configures
+ * the shared RocksDB thread pool to use "ksql.plugins.rocksdb.num.background.threads" threads.
+ *
+ * See https://docs.confluent.io/5.3.0/streams/developer-guide/memory-mgmt.html#rocksdb.
+ */
+public class KsqlBoundedMemoryRocksDBConfigSetter implements RocksDBConfigSetter, Configurable {
+
+ private static org.rocksdb.Cache cache;
+ private static org.rocksdb.WriteBufferManager writeBufferManager;
+ private static final AtomicBoolean configured = new AtomicBoolean(false);
+
+ @Override
+ public void configure(final Map config) {
+ configure(config, new Options());
+ }
+
+ @VisibleForTesting
+ static void configure(final Map config, final Options options) {
+ if (configured.getAndSet(true)) {
+ throw new IllegalStateException(
+ "KsqlBoundedMemoryRocksDBConfigSetter has already been configured. Cannot re-configure.");
+ }
+
+ try {
+ final KsqlBoundedMemoryRocksDBConfig pluginConfig =
+ new KsqlBoundedMemoryRocksDBConfig(config);
+
+ limitTotalMemory(pluginConfig);
+ configureNumThreads(pluginConfig, options);
+ } catch (IllegalArgumentException e) {
+ reset();
+ throw e;
+ }
+ }
+
+ @VisibleForTesting
+ static void reset() {
+ configured.set(false);
+ }
+
+ private static void limitTotalMemory(final KsqlBoundedMemoryRocksDBConfig config) {
+ final long totalOffHeapMemory =
+ config.getLong(KsqlBoundedMemoryRocksDBConfig.TOTAL_OFF_HEAP_MEMORY_CONFIG);
+ final long totalMemtableMemory = totalOffHeapMemory / 2;
+
+ final double indexFilterBlockRatio =
+ config.getDouble(KsqlBoundedMemoryRocksDBConfig.INDEX_FILTER_BLOCK_RATIO_CONFIG);
+
+ cache = new org.rocksdb.LRUCache(totalOffHeapMemory, -1, false, indexFilterBlockRatio);
+ writeBufferManager = new org.rocksdb.WriteBufferManager(totalMemtableMemory, cache);
+ }
+
+ private static void configureNumThreads(
+ final KsqlBoundedMemoryRocksDBConfig config,
+ final Options options) {
+ final int numBackgroundThreads =
+ config.getInt(KsqlBoundedMemoryRocksDBConfig.N_BACKGROUND_THREADS_CONFIG);
+
+ // All Options share the same Env, and share a thread pool as a result
+ options.getEnv().setBackgroundThreads(numBackgroundThreads);
+ }
+
+ @Override
+ public void setConfig(
+ final String storeName,
+ final Options options,
+ final Map configs) {
+ if (!configured.get()) {
+ throw new IllegalStateException(
+ "Cannot use KsqlBoundedMemoryRocksDBConfigSetter before it's been configured.");
+ }
+
+ final BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)options.tableFormatConfig();
+
+ tableConfig.setBlockCache(cache);
+ tableConfig.setCacheIndexAndFilterBlocks(true);
+ options.setWriteBufferManager(writeBufferManager);
+
+ tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
+ tableConfig.setPinTopLevelIndexAndFilter(true);
+
+ options.setStatsDumpPeriodSec(0);
+
+ options.setTableFormatConfig(tableConfig);
+ }
+
+ @Override
+ public void close(final String storeName, final Options options) {
+ }
+}
\ No newline at end of file
diff --git a/ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetterTest.java b/ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetterTest.java
new file mode 100644
index 000000000000..ea410c38c2f2
--- /dev/null
+++ b/ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetterTest.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rocksdb;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Cache;
+import org.rocksdb.Env;
+import org.rocksdb.Options;
+import org.rocksdb.WriteBufferManager;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KsqlBoundedMemoryRocksDBConfigSetterTest {
+
+ private static final long TOTAL_OFF_HEAP_MEMORY = 16 * 1024 * 1024 * 1024L;
+ private static final int NUM_BACKGROUND_THREADS = 4;
+
+ private static final Map CONFIG_PROPS = ImmutableMap.of(
+ "ksql.plugins.rocksdb.total.memory", TOTAL_OFF_HEAP_MEMORY,
+ "ksql.plugins.rocksdb.num.background.threads", NUM_BACKGROUND_THREADS);
+
+ @Mock
+ private Options rocksOptions;
+ @Mock
+ private Options secondRocksOptions;
+ @Mock
+ private BlockBasedTableConfig tableConfig;
+ @Mock
+ private BlockBasedTableConfig secondTableConfig;
+ @Mock
+ private Env env;
+ @Captor
+ private ArgumentCaptor writeBufferManagerCaptor;
+ @Captor
+ private ArgumentCaptor secondWriteBufferManagerCaptor;
+ @Captor
+ private ArgumentCaptor cacheCaptor;
+ @Captor
+ private ArgumentCaptor secondCacheCaptor;
+
+ private KsqlBoundedMemoryRocksDBConfigSetter rocksDBConfig;
+ private KsqlBoundedMemoryRocksDBConfigSetter secondRocksDBConfig;
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setUp() {
+ KsqlBoundedMemoryRocksDBConfigSetter.reset();
+
+ rocksDBConfig = new KsqlBoundedMemoryRocksDBConfigSetter();
+ secondRocksDBConfig = new KsqlBoundedMemoryRocksDBConfigSetter();
+
+ when(rocksOptions.tableFormatConfig()).thenReturn(tableConfig);
+ when(secondRocksOptions.tableFormatConfig()).thenReturn(secondTableConfig);
+ when(rocksOptions.getEnv()).thenReturn(env);
+ }
+
+ @Test
+ public void shouldFailWithoutConfigure() {
+ // Expect:
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(
+ "Cannot use KsqlBoundedMemoryRocksDBConfigSetter before it's been configured.");
+
+ // When:
+ rocksDBConfig.setConfig("store_name", rocksOptions, Collections.emptyMap());
+ }
+
+ @Test
+ public void shouldFailIfConfiguredTwiceFromSameInstance() {
+ // Given:
+ rocksDBConfig.configure(CONFIG_PROPS);
+
+ // Expect:
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(
+ "KsqlBoundedMemoryRocksDBConfigSetter has already been configured. Cannot re-configure.");
+
+ // When:
+ rocksDBConfig.configure(CONFIG_PROPS);
+ }
+
+ @Test
+ public void shouldFailIfConfiguredTwiceFromDifferentInstances() {
+ // Given:
+ rocksDBConfig.configure(CONFIG_PROPS);
+
+ // Expect:
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(
+ "KsqlBoundedMemoryRocksDBConfigSetter has already been configured. Cannot re-configure.");
+
+ // When:
+ secondRocksDBConfig.configure(CONFIG_PROPS);
+ }
+
+ @Test
+ public void shouldSetConfig() {
+ // Given:
+ rocksDBConfig.configure(CONFIG_PROPS);
+
+ // When:
+ rocksDBConfig.setConfig("store_name", rocksOptions, Collections.emptyMap());
+
+ // Then:
+ verify(rocksOptions).setWriteBufferManager(any());
+ verify(rocksOptions).setStatsDumpPeriodSec(0);
+ verify(rocksOptions).setTableFormatConfig(tableConfig);
+
+ verify(tableConfig).setBlockCache(any());
+ verify(tableConfig).setCacheIndexAndFilterBlocks(true);
+ verify(tableConfig).setCacheIndexAndFilterBlocksWithHighPriority(true);
+ verify(tableConfig).setPinTopLevelIndexAndFilter(true);
+ }
+
+ @Test
+ public void shouldShareCacheAcrossInstances() {
+ // Given:
+ rocksDBConfig.configure(CONFIG_PROPS);
+ rocksDBConfig.setConfig("store_name", rocksOptions, Collections.emptyMap());
+
+ // When:
+ secondRocksDBConfig.setConfig("store_name", secondRocksOptions, Collections.emptyMap());
+
+ // Then:
+ verify(tableConfig).setBlockCache(cacheCaptor.capture());
+ verify(secondTableConfig).setBlockCache(secondCacheCaptor.capture());
+ assertThat(cacheCaptor.getValue(), sameInstance(secondCacheCaptor.getValue()));
+ }
+
+ @Test
+ public void shouldShareWriteBufferManagerAcrossInstances() {
+ // Given:
+ rocksDBConfig.configure(CONFIG_PROPS);
+ rocksDBConfig.setConfig("store_name", rocksOptions, Collections.emptyMap());
+
+ // When:
+ secondRocksDBConfig.setConfig("store_name", secondRocksOptions, Collections.emptyMap());
+
+ // Then:
+ verify(rocksOptions).setWriteBufferManager(writeBufferManagerCaptor.capture());
+ verify(secondRocksOptions).setWriteBufferManager(secondWriteBufferManagerCaptor.capture());
+ assertThat(
+ writeBufferManagerCaptor.getValue(),
+ sameInstance(secondWriteBufferManagerCaptor.getValue()));
+ }
+
+ @Test
+ public void shouldSetNumThreads() {
+ // When:
+ KsqlBoundedMemoryRocksDBConfigSetter.configure(CONFIG_PROPS, rocksOptions);
+
+ // Then:
+ verify(env).setBackgroundThreads(NUM_BACKGROUND_THREADS);
+ }
+}
\ No newline at end of file
diff --git a/ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigTest.java b/ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigTest.java
new file mode 100644
index 000000000000..9eb54bd81499
--- /dev/null
+++ b/ksql-rocksdb-config-setter/src/test/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigTest.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rocksdb;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class KsqlBoundedMemoryRocksDBConfigTest {
+
+ private static final long TOTAL_OFF_HEAP_MEMORY = 16 * 1024 * 1024 * 1024L;
+ private static final int NUM_BACKGROUND_THREADS = 4;
+ private static final double INDEX_FILTER_BLOCK_RATIO = 0.1;
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void shouldCreateConfig() {
+ // Given:
+ final Map configs = ImmutableMap.of(
+ "ksql.plugins.rocksdb.total.memory", TOTAL_OFF_HEAP_MEMORY,
+ "ksql.plugins.rocksdb.num.background.threads", NUM_BACKGROUND_THREADS,
+ "ksql.plugins.rocksdb.index.filter.block.ratio", INDEX_FILTER_BLOCK_RATIO
+ );
+
+ // When:
+ final KsqlBoundedMemoryRocksDBConfig pluginConfig = new KsqlBoundedMemoryRocksDBConfig(configs);
+
+ // Then:
+ assertThat(
+ pluginConfig.getLong(KsqlBoundedMemoryRocksDBConfig.TOTAL_OFF_HEAP_MEMORY_CONFIG),
+ is(TOTAL_OFF_HEAP_MEMORY));
+ assertThat(
+ pluginConfig.getInt(KsqlBoundedMemoryRocksDBConfig.N_BACKGROUND_THREADS_CONFIG),
+ is(NUM_BACKGROUND_THREADS));
+ assertThat(
+ pluginConfig.getDouble(KsqlBoundedMemoryRocksDBConfig.INDEX_FILTER_BLOCK_RATIO_CONFIG),
+ is(INDEX_FILTER_BLOCK_RATIO));
+ }
+
+ @Test
+ public void shouldFailWithoutTotalMemoryConfig() {
+ // Given:
+ final Map configs = ImmutableMap.of(
+ "ksql.plugins.rocksdb.num.background.threads", NUM_BACKGROUND_THREADS
+ );
+
+ // Expect:
+ expectedException.expect(ConfigException.class);
+ expectedException.expectMessage(
+ "Missing required configuration \"ksql.plugins.rocksdb.total.memory\" which has no default value.");
+
+ // When:
+ new KsqlBoundedMemoryRocksDBConfig(configs);
+ }
+
+ @Test
+ public void shouldDefaultNumThreadsConfig() {
+ // Given:
+ final Map configs = ImmutableMap.of(
+ "ksql.plugins.rocksdb.total.memory", TOTAL_OFF_HEAP_MEMORY
+ );
+
+ // When:
+ final KsqlBoundedMemoryRocksDBConfig pluginConfig = new KsqlBoundedMemoryRocksDBConfig(configs);
+
+ // Then:
+ assertThat(
+ pluginConfig.getInt(KsqlBoundedMemoryRocksDBConfig.N_BACKGROUND_THREADS_CONFIG),
+ is(1));
+ }
+
+ @Test
+ public void shouldDefaultIndexFilterBlockRatioConfig() {
+ // Given:
+ final Map configs = ImmutableMap.of(
+ "ksql.plugins.rocksdb.total.memory", TOTAL_OFF_HEAP_MEMORY
+ );
+
+ // When:
+ final KsqlBoundedMemoryRocksDBConfig pluginConfig = new KsqlBoundedMemoryRocksDBConfig(configs);
+
+ // Then:
+ assertThat(
+ pluginConfig.getDouble(KsqlBoundedMemoryRocksDBConfig.INDEX_FILTER_BLOCK_RATIO_CONFIG),
+ is(0.0));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 5b51b1250780..54c250e7eeb3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@
ksql-test-util
ksql-benchmark
ksql-functional-tests
+ ksql-rocksdb-config-setter