Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add KsqlRocksDBConfigSetter to bound memory and set num threads #3167

Merged
merged 13 commits into from
Sep 5, 2019

Conversation

vcrfxia
Copy link
Contributor

@vcrfxia vcrfxia commented Aug 3, 2019

Description

Adds a KsqlRocksDBConfigSetter plugin that can be used to limit memory across all RocksDB instances and configure the number of threads.

Example usage (add these lines to the server properties file):

ksql.streams.rocksdb.config.setter=io.confluent.ksql.rocksdb.KsqlBoundedMemoryRocksDBConfig
ksql.plugins.rocksdb.total.memory=100000000
ksql.plugins.rocksdb.num.background.threads=2

In order to configure the total memory and number of threads, KsqlRestApplication checks whether the ksql.streams.rocksdb.config.setter implements a static configure() method and calls it if so. A drawback to this approach is that the KsqlRocksDBConfigSetter can only be used when the KSQL server is started, and not via a SET statement afterwards. An alternative to having a static configure() method could be to configure the memory limit from setConfig() instead, but this would require passing arbitrary prefixed configs into the streams app, and also introduces edge cases such as what should happen if the memory limit is changed between queries.

This PR introduces a new ksql-plugins module where the new plugin lives. It is currently the only such plugin, but if we introduce more in the future then they can share a module, rather than having each plugin in a separate repo. (I can easily make the new RocksDB config setter its own module instead, if we think this is premature.)

Open question: This PR does not add the new plugin to our packaging or classpath, so in order to use it a user currently has to build the plugin and then add it to the classpath when starting the KSQL server. Do we want to add the plugin to our packaging and/or classpath? Not sure how to think about whether these sorts of add-on plugins should be packaged with KSQL or not.

Testing done

Added unit tests. Manual testing (thanks to the help of @rodesai !) to verify memory limits and number of threads. Specifically, we set up a KSQL server on an EC2 instance to run a fixed query with fixed topic throughput and observed that without this plugin, the instance ran out of memory, but this was not the case when using the plugin.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@vcrfxia vcrfxia requested a review from a team as a code owner August 3, 2019 05:34

@VisibleForTesting
static void configure(final Map<String, Object> config, final Options options) {
if (configured.getAndSet(true)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an edge case here where if setConfig() is called after configure() is started but before it has finished, we may get undefined behavior. This shouldn't happen since configure() is called as part of the server starting up while setConfig() is only called when running streams apps, but if anyone has suggestions for improving the checks in these files to handle this edge case I'm all ears!

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vcrfxia !

I think it's good to have this in its own module. It makes each plugin module small and targeted.

You probably want to add documentation on the new config option.

My main concern with the PR is its heavy use of static fields and methods. This is nasty! I'm assuming you've done this so that you can have config for the plugin like:

ksql.streams.rocksdb.config.setter=io.confluent.ksql.rocksdb.KsqlBoundedMemoryRocksDBConfig
ksql.plugins.rocksdb.total.memory=100000000
ksql.plugins.rocksdb.num.background.threads=2

However, is it not the case that if you were to change the config to:

ksql.streams.rocksdb.config.setter=io.confluent.ksql.rocksdb.KsqlBoundedMemoryRocksDBConfig
ksql.streams.rocksdb.total.memory=100000000
ksql.streams.rocksdb.num.background.threads=2

Then the plugin's SetConfig call would be passed a map containing:

rocksdb.total.memory=100000000
rocksdb.num.background.threads=2

Which would give us what we need without the need for all this static stuff. Is that the case, or am I missing something?

We can, if you prefer, have plugin in the name:

ksql.streams.rocksdb.plugin.total.memory=100000000
ksql.streams.rocksdb.plugin.num.background.threads=2

Thoughts?

Add me back in as a reviewer if you want me to take a second look :)

ksql-plugins/ksql-rocksdb-config-setter/pom.xml Outdated Show resolved Hide resolved
*/
public class KsqlBoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

private static final String CONFIG_PREFIX = "ksql.plugins.rocksdb.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using AbstractConfig + ConfigDef to define the available config options. This is our standard pattern and has the benefit of allowing type checking, defaults and custom validators being defined in the def, rather than in the code in this class.


private static void limitTotalMemory(final Map<String, Object> config) {
if (!config.containsKey(TOTAL_OFF_HEAP_MEMORY_CONFIG)) {
throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new ConfigException(TOTAL_OFF_HEAP_MEMORY_CONFIG, null) or better yet, use AbstractConfig and make it a required config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion -- will make the switch to AbstractConfig.

throw new IllegalArgumentException(
"Missing configuration: " + TOTAL_OFF_HEAP_MEMORY_CONFIG);
}
totalOffHeapMemory = Long.parseLong((String)config.get(TOTAL_OFF_HEAP_MEMORY_CONFIG));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will currently throw a unhelpful exception if not a number. if you switch to using AbstractConfig you get type checking of configs for free and this can be replaced with config.getLong(TOTAL_OFF_HEAP_MEMORY_CONFIG).

}

private static void configureNumThreads(final Map<String, Object> config, final Options options) {
if (!config.containsKey(N_BACKGROUND_THREADS_CONFIG)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a default for this in our ConfigDef? I think the default is 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add that as a default since it aligns with the RocksDB default.

~ specific language governing permissions and limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of this pom/module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was to have a separate module for plugins so we don't clutter up the parent pom with lots of tiny plugins. In light of your feedback that you prefer having the plugin as its own module, this wrapper module is no longer needed and I'll remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so you mean the parent pom just contains this plugins pom, which itself contains many plugin modules? This is nice, assuming each plugin gets packaged in its own jar...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's what I was going for. Will leave as-is for now and revisit next time we want to add a plugin.

numBackgroundThreads = Integer.parseInt((String)config.get(N_BACKGROUND_THREADS_CONFIG));

// All Options share the same Env, and share a thread pool as a result
options.getEnv().setBackgroundThreads(numBackgroundThreads);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just set this in setConfig then it isn't important that it's ultimately a shared resource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setConfig() is called each time a new query is run/streams app is started. By configuring the thread pool once at the beginning, we avoid edge cases where the user has changed the value of N_BACKGROUND_THREADS_CONFIG between queries.

method = clazz.getMethod("configure", Map.class);
} catch (NoSuchMethodException e) {
// Nothing to configure
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the config is there, but the class does not have the correct method, then we should throw an appropriate exception, not silently swallow it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's valid for the user to pass a different RocksDBConfigSetter that does not implement the static configure() method, and we want that to succeed. See the unit test shouldSkipConfiguringRocksDBConfigSetterWithoutConfigureMethod() below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that the UX is bad if the user wants to have a configure method, but has the signature wrong. It just silently fails.

I think a better UX is to require it, even if its a no-op, and then hard fail if it does not exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better UX is to require it, even if its a no-op, and then hard fail if it does not exist.

This would break backwards compatibility if a user is already running with a RocksDBConfigSetter that doesn't implement the configure() method. If such a user upgraded their KSQL server, the server would fail to start unless they updated their RocksDBConfigSetter to include a no-op configure() method. Doesn't seem like great UX either, but I see your point.

Regardless, I believe this discussion is moot in light of your suggestion to perform the static configuration via streamsConfig.getConfiguredInstance(...) rather than checking for a static configure() method. I'll go with that approach instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @big-andy-coates offline: turns out using getConfiguredInstance(...) isn't a viable approach since ksqlConfig.getConfiguredInstance(...) fails the check within getConfiguredInstance() that the key is present in the ConfigDef, and we can't easily create a StreamsConfig from the KsqlConfig as its missing required configs such as application.id. In light of this, we'll opt to check whether the RocksDBConfigSetter implements Configurable or not (and call configure() if so), rather than call getConfiguredInstance().

try {
method.invoke(null, ksqlConfig.originals());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("failed to configure class: " + clazz.getName(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include the config name, i.e. StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG in the error.

* </p>
* See https://docs.confluent.io/5.3.0/streams/developer-guide/memory-mgmt.html#rocksdb.
*/
public class KsqlBoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid the use of statics in this class if at all possible. Statics are evil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't avoid the static Cache and WriteBufferManager since that's what's allowing us to bound memory across RocksDB instances. I will clean up the others that are only used within the configure() method since they don't need to be declared outside the method.

@big-andy-coates big-andy-coates requested review from a team, rodesai, hjafarpour, spena and agavra and removed request for a team August 6, 2019 12:15
Copy link
Contributor Author

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @big-andy-coates !

I think it's good to have this in its own module. It makes each plugin module small and targeted.

Sure, can do.

You probably want to add documentation on the new config option.

Will do.

However, is it not the case that if you were to change the config to:

ksql.streams.rocksdb.config.setter=io.confluent.ksql.rocksdb.KsqlBoundedMemoryRocksDBConfig
ksql.streams.rocksdb.total.memory=100000000
ksql.streams.rocksdb.num.background.threads=2

Then the plugin's SetConfig call would be passed a map containing:

rocksdb.total.memory=100000000
rocksdb.num.background.threads=2

Which would give us what we need without the need for all this static stuff. Is that the case, or am I missing something?

Currently we validate that configs prefixed with ksql.streams are configs recognized by one of StreamsConfig, ConsumerConfig, or ProducerConfig, which is not the case for the new custom configs added in this PR. Unless we remove this check and pass all configs prefixed with ksql.streams to the Streams app, ksql.streams.rocksdb.total.memory and ksql.streams.rocksdb.num.background.threads will not make it through to setConfig() as you've suggested. In light of this, do you prefer removing the check and passing all configs prefixed with ksql.streams through, or sticking with the static configure() implementation I've currently got? A third option (mentioned in the PR description above) is to introduce a new prefix such as ksql.plugins and pass freeform configs with this new prefix through to the Streams app, but I've got a slight preference for the static configure() method.

method = clazz.getMethod("configure", Map.class);
} catch (NoSuchMethodException e) {
// Nothing to configure
return;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's valid for the user to pass a different RocksDBConfigSetter that does not implement the static configure() method, and we want that to succeed. See the unit test shouldSkipConfiguringRocksDBConfigSetterWithoutConfigureMethod() below.

~ specific language governing permissions and limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was to have a separate module for plugins so we don't clutter up the parent pom with lots of tiny plugins. In light of your feedback that you prefer having the plugin as its own module, this wrapper module is no longer needed and I'll remove it.

numBackgroundThreads = Integer.parseInt((String)config.get(N_BACKGROUND_THREADS_CONFIG));

// All Options share the same Env, and share a thread pool as a result
options.getEnv().setBackgroundThreads(numBackgroundThreads);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setConfig() is called each time a new query is run/streams app is started. By configuring the thread pool once at the beginning, we avoid edge cases where the user has changed the value of N_BACKGROUND_THREADS_CONFIG between queries.

}

private static void configureNumThreads(final Map<String, Object> config, final Options options) {
if (!config.containsKey(N_BACKGROUND_THREADS_CONFIG)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add that as a default since it aligns with the RocksDB default.


private static void limitTotalMemory(final Map<String, Object> config) {
if (!config.containsKey(TOTAL_OFF_HEAP_MEMORY_CONFIG)) {
throw new IllegalArgumentException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion -- will make the switch to AbstractConfig.

private static final String N_BACKGROUND_THREADS_CONFIG =
CONFIG_PREFIX + "num.background.threads";

private static final double INDEX_FILTER_BLOCK_RATIO = 0.1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some but not all.

  • INDEX_FILTER_BLOCK_RATIO defaults to 0.0 (in RocksDB, and Kafka Streams uses this default).

  • The default RocksDBConfigSetter used by Streams sets 3 for N_MEMTABLES, though @ableegoldman 's performance testing suggests that 6 tends to yield better results.

  • BLOCK_SIZE of 4096L aligns with the value set by Streams's default RocksDBConfigSetter.

* </p>
* See https://docs.confluent.io/5.3.0/streams/developer-guide/memory-mgmt.html#rocksdb.
*/
public class KsqlBoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't avoid the static Cache and WriteBufferManager since that's what's allowing us to bound memory across RocksDB instances. I will clean up the others that are only used within the configure() method since they don't need to be declared outside the method.

private static final String N_BACKGROUND_THREADS_CONFIG =
CONFIG_PREFIX + "num.background.threads";

private static final double INDEX_FILTER_BLOCK_RATIO = 0.1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the .1 ratio come from?

totalOffHeapMemory = Long.parseLong((String)config.get(TOTAL_OFF_HEAP_MEMORY_CONFIG));

memtableSize = totalOffHeapMemory / 2 / N_MEMTABLES;
totalMemtableMemory = N_MEMTABLES * memtableSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this math coming from? This is going to create massive memtables on configs with more memory. Let's keep the memtable size the same as streams (16MB), and just set a cap on total memtable memory. What you have here (totalOffHeapMemory / 2) seems fine. Whatever isn't used should just get used for the read cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change. Had to add the line

options.setMaxWriteBufferNumber(Integer.MAX_VALUE);

since Kafka Streams limits the number of memtables to 3 by default: https://github.com/apache/kafka/blob/e9a35fe02effe10dcbec2daed4ef66c40d35716c/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L139

Copy link
Contributor

@rodesai rodesai Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because we don't know how many write buffers we'll wind up needing (since each rocks needs at least 2)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change to reconcile the fact that we have a variable total write buffer memory (passed as the first argument when creating the WriteBufferManager), yet Streams by default sets write buffer size to 16MB and max write buffer number to 3. Unless I've misunderstood, without either increasing write buffer size or write buffer number from the Streams default, we'll never use more than 48MB for the write buffers, despite our desire for that limit to be configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to chime in quickly here, the memtable size (and max number) is still per rocks instance. So each instance could in theory use up to 48MB, but if we limit the total write buffer memory the WriteBufferManager will flush some of these to make sure the total across instances stays within the bounds you set.
Also, write buffer size is actually a minimum size to be a candidate for flushing, not a cap. But in most cases it would likely end up being around this size before being made immutable and/or flushing.


tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
tableConfig.setBlockSize(BLOCK_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to set this. The streams code should set this for you. This class should just be overrides.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Aug 8, 2019

In light of this, do you prefer removing the check and passing all configs prefixed with ksql.streams through, or sticking with the static configure()?

Rather than remove the check, I'd just add another variant that's allowed through. If we have:

ksql.streams.plugin.rocksdb.config.setter=io.confluent.ksql.rocksdb.KsqlBoundedMemoryRocksDBConfig
ksql.streams.plugin.rocksdb.total.memory=100000000
ksql.streams.plugin.rocksdb.num.background.threads=2

Then the check could allow through anything starting with ksql.streams.plugin.

@big-andy-coates
Copy link
Contributor

Regarding the statics etc...

The standard way of instantiating configured instances of classes defined in configs is using config.getConfiguredInstance("plugin.class', RocksDBSettter.class). If the implementation implements Configurable, (careful, there are two versions!), then its configure method is invoked with the config.

Unfortunately, streams does not use getConfiguredInstance to create the setter. Not sure why not. A longer term fix might be to enhance streams to use getConfiguredInstance. This would mean configure would be called on each instantiation and the implementation can choose whether it wan't to initialize a load of statics on first use, or have stuff that reacts to config changes, or whatever.

Until streams supports this, or even once it does, you could instantiate the setter once on startup as you currently are, but using streamsConfig.getConfiguredInstance(....)

This removes the statics from the interface and leaves it up to the implementer if they want/need statics, or to allow the setter to change behaviour of things on each invocation.

@confluentinc confluentinc deleted a comment from vcrfxia Aug 8, 2019
Copy link
Contributor Author

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @big-andy-coates and @rodesai ! Made the following changes:

  • switched to using AbstractConfig and ConfigDef for defining configs

  • switched from checking for a static configure() method to checking whether the RocksDBConfigSetter implements Configurable

  • removed the ksql-plugins wrapper module around the new ksql-rocksdb-config-setter module

  • updated logic around memtable sizing

  • cleaned up statics, including making one of the previously hard-coded values configurable

  • number of background threads config is no longer required, as it defaults to 1 if not supplied

I'll add docs on how to use the new plugin once we've settled on a final design, as I suspect it might still change.

totalOffHeapMemory = Long.parseLong((String)config.get(TOTAL_OFF_HEAP_MEMORY_CONFIG));

memtableSize = totalOffHeapMemory / 2 / N_MEMTABLES;
totalMemtableMemory = N_MEMTABLES * memtableSize;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change. Had to add the line

options.setMaxWriteBufferNumber(Integer.MAX_VALUE);

since Kafka Streams limits the number of memtables to 3 by default: https://github.com/apache/kafka/blob/e9a35fe02effe10dcbec2daed4ef66c40d35716c/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L139

private static final String N_BACKGROUND_THREADS_CONFIG =
CONFIG_PREFIX + "num.background.threads";

private static final double INDEX_FILTER_BLOCK_RATIO = 0.1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made INDEX_FILTER_BLOCK_RATIO block ratio configurable, and updated the logic around N_MEMTABLES according to @rodesai 's suggestion below.

method = clazz.getMethod("configure", Map.class);
} catch (NoSuchMethodException e) {
// Nothing to configure
return;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @big-andy-coates offline: turns out using getConfiguredInstance(...) isn't a viable approach since ksqlConfig.getConfiguredInstance(...) fails the check within getConfiguredInstance() that the key is present in the ConfigDef, and we can't easily create a StreamsConfig from the KsqlConfig as its missing required configs such as application.id. In light of this, we'll opt to check whether the RocksDBConfigSetter implements Configurable or not (and call configure() if so), rather than call getConfiguredInstance().

(Class) streamsProps.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);

if (clazz != null && org.apache.kafka.common.Configurable.class.isAssignableFrom(clazz)) {
((org.apache.kafka.common.Configurable) Utils.newInstance(clazz))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utils.newInstance() seems to require that the configurable class be public, as KsqlRestApplicationTest#shouldConfigureRocksDBConfigSetter() fails if ConfigurableTestRocksDBConfigSetter is not public. Is this a reasonable requirement or have I gone awry with this approach?

If we stick with this approach I'll need to update the error handling around what happens if a non-accessible, configurable, RocksDBConfigSetter class is passed as the current failure is not user-friendly.

Copy link
Contributor

@big-andy-coates big-andy-coates Aug 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's acceptable/normal to require a plugin to have a public no-args constructor. But we should ensure good error messages if it doesn't have one or if the constructor throws. Personally, I would wrap the newInstance call in a try/catch and throw more specific error message that mentions the name of the config from which the class is coming from, e.g. throw new ConfigException(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, clazz, "Failed to instantiate", e)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, except only has constructors such as

ConfigException(String name, Object value, String message)

and not

ConfigException(String name, Object value, String message, Throwable cause)

so I went with just ConfigException(String message, Throwable cause) instead.

@agavra
Copy link
Contributor

agavra commented Aug 13, 2019

Currently we validate that configs prefixed with ksql.streams are configs recognized by one of StreamsConfig, ConsumerConfig, or ProducerConfig, which is not the case for the new custom configs added in this PR. Unless we remove this check and pass all configs prefixed with ksql.streams to the Streams app, ksql.streams.rocksdb.total.memory and ksql.streams.rocksdb.num.background.threads will not make it through to setConfig() as you've suggested. In light of this, do you prefer removing the check and passing all configs prefixed with ksql.streams through, or sticking with the static configure() implementation I've currently got? A third option (mentioned in the PR description above) is to introduce a new prefix such as ksql.plugins and pass freeform configs with this new prefix through to the Streams app, but I've got a slight preference for the static configure() method.

After doing some thinking, I'm in the camp of loosening our checks - we can't predict the use cases for passing in configs. Why arbitrarily restrict it? This way, users will not be able to build their own plugins for configuring RocksDB or anything else streams decides to configure with this pattern.

I imagine a concern would be around the cloud (or any "supported" environment) use case, where the providers want to restrict just to some certain configurations to ensure stuff doesn't break - I think for that we could have a plugin that determines which configs are valid or not (any class that implements ConfigResolver could be used). The default would allow everything.

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rodesai
Copy link
Contributor

rodesai commented Aug 19, 2019

After doing some thinking, I'm in the camp of loosening our checks - we can't predict the use cases for passing in configs. Why arbitrarily restrict it? This way, users will not be able to build their own plugins for configuring RocksDB or anything else streams decides to configure with this pattern.

The original reason for forcing all the configs to be resolved was so that we could leverage ConfigDef's handling for obfuscating secrets to ensure that we don't write secrets into the command topic. So if we go this route we need to be careful about this. One option would be to only write out the compatibility breaking configs to the command topic.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vcrfxia LGTM with a few nits.

Funny thing is, now that I'm looking at the use of Configurable and the fact that configure won't be called on the actual instances used to configure rocks db I'm left wondering if the previous version that invoked a static method isn't actually clearer as an API. I think implementers might reasonably expect configure to be called on all instances, which its not.

I see three options:

  1. Leave things as they are, but maybe expect Github issues about configure not being called.
  2. Switch back to the static configure method.
  3. Have configure called on each instance.

I'll leave it up to you, but personally, I'm a fan of the last one, though it's a little more work...

What we'd need to do is replace any user supplied config for a rocks db config setter with our own special wrapper one. This wrapper would have a static field containing the config and the Class to the inner type. When it is instantiated it would instantiate the inner one and configure it. We instantiate it once on startup, and then KS instantiates it as needed.

In this way, we do the heavy lifting of storing the config and ensuring each instance is 'configured'.

class RocksDbConfigSetterWrapper implements RocksDBConfigSetter {
   private static Map<String, ?> configs;
   private static Class pluginClass;

  private final RocksDBConfigSetter delegate;

   public static void maybeConfigure(KsqlConfig config) {
       final Map<String, Object> streamsProps = ksqlConfig.getKsqlStreamConfigProps();
       final Class<?> clazz =
          (Class) streamsProps.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);

       if (clazz == null || !org.apache.kafka.common.Configurable.class.isAssignableFrom(clazz)) {
           // Return config unchanged:
           return config;
      }

        this.configs = ImmutableMap.copyOf(streamProps);
        this.pluginCLass = clazz;

      return newConfigWith(
             StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
             RocksDbConfigSetterWrapper.class
      );
   }

   public RocksDbConfigSetterWrapper() {
      this.delegate = // create instance
      this.delegate.configure(config);       
   }
   
// RocksDbConfigSetter methods just call through to delegate
}

Then in app:

private static KsqlConfig maybeConfigureRocksDBConfigSetter(final KsqlConfig ksqlConfig) {
   // Can return adjusted KsqlConfig where `StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG` has been set to `RocksDbConfigSetterWrapper`:
    return RocksDbConfigSetterWrapper.maybeConfigure(ksqlConfig);
  }

I think this is the most flexible solution. Though of course that will mean our own memory bounded impl would need to handle multiple configure calls - which is could do by initializing on the first call and also storing the specific static config it cared about, then on each call just check the config hasn't changed. If it has, that's an error!


private static org.rocksdb.Cache cache;
private static org.rocksdb.WriteBufferManager writeBufferManager;
private static final AtomicBoolean configured = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having a flag, which not just check if cache is already set and have reset clear both cache and writeBufferManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own edification, why is that preferable? I thought having the flag was more readable. (I don't feel strongly though, happy to make the switch.)

Copy link
Contributor

@big-andy-coates big-andy-coates Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less code ;)

Though the flag is more thread-safe...

It's not a big thing - just a thought.

Thinking on this a little more, if it were me, I'd have a Statics inner class that held the two fields and then use a single AtomicReference<Statics> to store both...

Copy link
Contributor Author

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @big-andy-coates !

I like your suggestion of having a RocksDbConfigSetterWrapper to allow configure() to be called on each instance, except its implementation introduces some confusion to ksqlConfig in KsqlRestApplication. Specifically, RocksDbConfigSetterWrapper#maybeConfigure(...) needs to return a new KsqlConfig with the updated value for StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, and this new config should be used when starting queries. However, prior to when this happens in initialize(), the old ksqlConfig is used throughout buildApplication() in the constructors for components such as the StatementExecutor, KsqlResource, and so on, which makes keeping track of which KsqlConfig is updated and which isn't rather annoying.

We could avoid this confusion by calling RocksDbConfigSetterWrapper#maybeConfigure(...) at the beginning of buildApplication() (instead of initialize() where it is currently) but that doesn't feel like the right place for it. What do you think? Happy to go with the RocksDbConfigSetterWrapper approach if you think it makes sense to call RocksDbConfigSetterWrapper#maybeConfigure(...) in buildApplication(), otherwise I'll leave it as is to avoid confusion around ksqlConfig.

(Class) streamsProps.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);

if (clazz != null && org.apache.kafka.common.Configurable.class.isAssignableFrom(clazz)) {
((org.apache.kafka.common.Configurable) Utils.newInstance(clazz))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, except only has constructors such as

ConfigException(String name, Object value, String message)

and not

ConfigException(String name, Object value, String message, Throwable cause)

so I went with just ConfigException(String message, Throwable cause) instead.


private static org.rocksdb.Cache cache;
private static org.rocksdb.WriteBufferManager writeBufferManager;
private static final AtomicBoolean configured = new AtomicBoolean(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own edification, why is that preferable? I thought having the flag was more readable. (I don't feel strongly though, happy to make the switch.)

@vcrfxia
Copy link
Contributor Author

vcrfxia commented Sep 5, 2019

Merging despite ongoing conversation as none of the discussions are blocking. (Will address anything else that comes up in follow-up PRs.)

@vcrfxia vcrfxia merged commit cdcaa2d into confluentinc:5.3.x Sep 5, 2019
@vcrfxia vcrfxia deleted the rocksdb-config branch September 5, 2019 20:56
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vcrfxia

I've commented again on a few of my past comments. (See above).

I like your suggestion of having a RocksDbConfigSetterWrapper to allow configure() to be called on each instance, except its implementation introduces some confusion to ksqlConfig in KsqlRestApplication. Specifically, RocksDbConfigSetterWrapper#maybeConfigure(...) needs to return a new KsqlConfig with the updated value for StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, and this new config should be used when starting queries. However, prior to when this happens in initialize(), the old ksqlConfig is used throughout buildApplication() in the constructors for components such as the StatementExecutor, KsqlResource, and so on, which makes keeping track of which KsqlConfig is updated and which isn't rather annoying.

We could avoid this confusion by calling RocksDbConfigSetterWrapper#maybeConfigure(...) at the beginning of buildApplication() (instead of initialize() where it is currently) but that doesn't feel like the right place for it. What do you think? Happy to go with the RocksDbConfigSetterWrapper approach if you think it makes sense to call RocksDbConfigSetterWrapper#maybeConfigure(...) in buildApplication(), otherwise I'll leave it as is to avoid confusion around ksqlConfig.

I'm not sure I understand the issue, so can't really comment. Feel free to grab me for a chat about it...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants