From d8af531dbe0266def31ee9b9ca508a2a9a48530e Mon Sep 17 00:00:00 2001 From: Stuart Tettemer Date: Tue, 18 Feb 2020 17:35:31 -0700 Subject: [PATCH] Scripting: split out compile limits and caching Phase 1 of adding compilation limits per context. * Refactor compilation and caching into separate class, ContextCompiler which will be used per context. * Disable compilation limit for tests. * Add script.max_compilations_rate = "unlimited" setting which disables compilation limits. Refs: #50152 --- .../testclusters/ElasticsearchNode.java | 2 - .../elasticsearch/script/ContextCompiler.java | 249 ++++++++++++++++++ .../elasticsearch/script/ScriptContext.java | 18 +- .../elasticsearch/script/ScriptService.java | 3 + .../indices/IndicesServiceCloseTests.java | 4 +- .../script/ScriptServiceTests.java | 1 + .../elasticsearch/xpack/CcrIntegTestCase.java | 2 - 7 files changed, 271 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/script/ContextCompiler.java diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java index 81dbf95fae145..9d5fd2be1db38 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java @@ -1014,8 +1014,6 @@ private void createConfiguration() { // Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space defaultConfig.put("cluster.routing.allocation.disk.watermark.low", "1b"); defaultConfig.put("cluster.routing.allocation.disk.watermark.high", "1b"); - // increase script compilation limit since tests can rapid-fire script compilations - defaultConfig.put("script.max_compilations_rate", "2048/1m"); if (getVersion().getMajor() >= 6) { defaultConfig.put("cluster.routing.allocation.disk.watermark.flood_stage", "1b"); } diff --git a/server/src/main/java/org/elasticsearch/script/ContextCompiler.java b/server/src/main/java/org/elasticsearch/script/ContextCompiler.java new file mode 100644 index 0000000000000..0e349823169cd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/script/ContextCompiler.java @@ -0,0 +1,249 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.script; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; +import org.elasticsearch.common.cache.RemovalListener; +import org.elasticsearch.common.cache.RemovalNotification; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Map; +import java.util.Objects; + +public class ContextCompiler { + + private static final Logger logger = LogManager.getLogger(ScriptService.class); + + private Cache cache; + private final ScriptMetrics scriptMetrics = new ScriptMetrics(); + + private final Object lock = new Object(); + + private Tuple rate; + private long lastInlineCompileTime; + private double scriptsPerTimeWindow; + private double compilesAllowedPerNano; + + // Cache settings + private int cacheSize; + private TimeValue cacheExpire; + + public ContextCompiler( + int cacheMaxSize, + TimeValue cacheExpire, + Tuple maxCompilationRate + ) { + CacheBuilder cacheBuilder = CacheBuilder.builder(); + if (cacheMaxSize >= 0) { + cacheBuilder.setMaximumWeight(cacheMaxSize); + } + + if (cacheExpire.getNanos() != 0) { + cacheBuilder.setExpireAfterAccess(cacheExpire); + } + + logger.debug("using script cache with max_size [{}], expire [{}]", cacheMaxSize, cacheExpire); + this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build(); + + this.lastInlineCompileTime = System.nanoTime(); + + this.cacheSize = cacheMaxSize; + this.cacheExpire = cacheExpire; + this.setMaxCompilationRate(maxCompilationRate); + } + + private Cache buildCache() { + CacheBuilder cacheBuilder = CacheBuilder.builder(); + if (cacheSize >= 0) { + cacheBuilder.setMaximumWeight(cacheSize); + } + if (cacheExpire.getNanos() != 0) { + cacheBuilder.setExpireAfterAccess(cacheExpire); + } + return cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build(); + } + + FactoryType compile( + ScriptContext context, + ScriptEngine scriptEngine, + String id, + String idOrCode, + ScriptType type, + Map options + ) { + String lang = scriptEngine.getType(); + CacheKey cacheKey = new CacheKey(lang, idOrCode, context.name, options); + Object compiledScript = cache.get(cacheKey); + + if (compiledScript != null) { + return context.factoryClazz.cast(compiledScript); + } + + // Synchronize so we don't compile scripts many times during multiple shards all compiling a script + synchronized (lock) { + // Retrieve it again in case it has been put by a different thread + compiledScript = cache.get(cacheKey); + + if (compiledScript == null) { + try { + // Either an un-cached inline script or indexed script + // If the script type is inline the name will be the same as the code for identification in exceptions + // but give the script engine the chance to be better, give it separate name + source code + // for the inline case, then its anonymous: null. + if (logger.isTraceEnabled()) { + logger.trace("context [{}]: compiling script, type: [{}], lang: [{}], options: [{}]", context.name, type, + lang, options); + } + // Check whether too many compilations have happened + checkCompilationLimit(); + compiledScript = scriptEngine.compile(id, idOrCode, context, options); + } catch (ScriptException good) { + // TODO: remove this try-catch completely, when all script engines have good exceptions! + throw good; // its already good + } catch (Exception exception) { + throw new GeneralScriptException("Failed to compile " + type + " script [" + id + "] using lang [" + lang + "]", + exception); + } + + // Since the cache key is the script content itself we don't need to + // invalidate/check the cache if an indexed script changes. + scriptMetrics.onCompilation(); + cache.put(cacheKey, compiledScript); + } + + } + + return context.factoryClazz.cast(compiledScript); + } + + public ScriptStats stats() { + return scriptMetrics.stats(); + } + + /** + * Check whether there have been too many compilations within the last minute, throwing a circuit breaking exception if so. + * This is a variant of the token bucket algorithm: https://en.wikipedia.org/wiki/Token_bucket + * + * It can be thought of as a bucket with water, every time the bucket is checked, water is added proportional to the amount of time that + * elapsed since the last time it was checked. If there is enough water, some is removed and the request is allowed. If there is not + * enough water the request is denied. Just like a normal bucket, if water is added that overflows the bucket, the extra water/capacity + * is discarded - there can never be more water in the bucket than the size of the bucket. + */ + void checkCompilationLimit() { + if (rate.v1() == 0 && rate.v2().getNanos() == 0) { + // unlimited + return; + } + + long now = System.nanoTime(); + long timePassed = now - lastInlineCompileTime; + lastInlineCompileTime = now; + + scriptsPerTimeWindow += (timePassed) * compilesAllowedPerNano; + + // It's been over the time limit anyway, readjust the bucket to be level + if (scriptsPerTimeWindow > rate.v1()) { + scriptsPerTimeWindow = rate.v1(); + } + + // If there is enough tokens in the bucket, allow the request and decrease the tokens by 1 + if (scriptsPerTimeWindow >= 1) { + scriptsPerTimeWindow -= 1.0; + } else { + scriptMetrics.onCompilationLimit(); + // Otherwise reject the request + throw new CircuitBreakingException("[script] Too many dynamic script compilations within, max: [" + + rate.v1() + "/" + rate.v2() +"]; please use indexed, or scripts with parameters instead; " + + "this limit can be changed by the [script.max_compilations_rate] setting", + CircuitBreaker.Durability.TRANSIENT); + } + } + + /** + * This configures the maximum script compilations per five minute window. + * + * @param newRate the new expected maximum number of compilations per five minute window + */ + void setMaxCompilationRate(Tuple newRate) { + synchronized (lock) { + this.rate = newRate; + // Reset the counter to allow new compilations + this.scriptsPerTimeWindow = rate.v1(); + this.compilesAllowedPerNano = ((double) rate.v1()) / newRate.v2().nanos(); + + this.cache = buildCache(); + } + } + + /** + * A small listener for the script cache that calls each + * {@code ScriptEngine}'s {@code scriptRemoved} method when the + * script has been removed from the cache + */ + private class ScriptCacheRemovalListener implements RemovalListener { + @Override + public void onRemoval(RemovalNotification notification) { + if (logger.isDebugEnabled()) { + logger.debug( + "removed [{}] from cache, reason: [{}]", + notification.getValue(), + notification.getRemovalReason() + ); + } + scriptMetrics.onCacheEviction(); + } + } + + private static final class CacheKey { + final String lang; + final String idOrCode; + final String context; + final Map options; + + private CacheKey(String lang, String idOrCode, String context, Map options) { + this.lang = lang; + this.idOrCode = idOrCode; + this.context = context; + this.options = options; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(lang, cacheKey.lang) && + Objects.equals(idOrCode, cacheKey.idOrCode) && + Objects.equals(context, cacheKey.context) && + Objects.equals(options, cacheKey.options); + } + + @Override + public int hashCode() { + return Objects.hash(lang, idOrCode, context, options); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/script/ScriptContext.java b/server/src/main/java/org/elasticsearch/script/ScriptContext.java index 081a26d1e511a..338bcb3709060 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptContext.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptContext.java @@ -19,6 +19,9 @@ package org.elasticsearch.script; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; + import java.lang.reflect.Method; /** @@ -68,8 +71,13 @@ public final class ScriptContext { /** A class that is an instance of a script. */ public final Class instanceClazz; + public final int cacheSizeDefault; + public final TimeValue cacheExpireDefault; + public final Tuple maxCompilationRateDefault; + /** Construct a context with the related instance and compiled classes. */ - public ScriptContext(String name, Class factoryClazz) { + public ScriptContext(String name, Class factoryClazz, int cacheSizeDefault, TimeValue cacheExpireDefault, + Tuple maxCompilationRateDefault) { this.name = name; this.factoryClazz = factoryClazz; Method newInstanceMethod = findMethod("FactoryType", factoryClazz, "newInstance"); @@ -90,6 +98,14 @@ public ScriptContext(String name, Class factoryClazz) { + factoryClazz.getName() + "] for script context [" + name + "]"); } instanceClazz = newInstanceMethod.getReturnType(); + + this.cacheSizeDefault = cacheSizeDefault; + this.cacheExpireDefault = cacheExpireDefault; + this.maxCompilationRateDefault = maxCompilationRateDefault; + } + + public ScriptContext(String name, Class factoryClazz) { + this(name, factoryClazz, 100, TimeValue.timeValueMillis(0), new Tuple<>(75, TimeValue.timeValueMinutes(5))); } /** Returns a method with the given name, or throws an exception if multiple are found. */ diff --git a/server/src/main/java/org/elasticsearch/script/ScriptService.java b/server/src/main/java/org/elasticsearch/script/ScriptService.java index 7cf23d26187ad..402081ff0913a 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptService.java @@ -65,6 +65,9 @@ public class ScriptService implements Closeable, ClusterStateApplier { // this allows you to easily define rates static final Function> MAX_COMPILATION_RATE_FUNCTION = (String value) -> { + if (value.toLowerCase() == "unlimited") { + return Tuple.tuple(0, TimeValue.ZERO); + } if (value.contains("/") == false || value.startsWith("/") || value.endsWith("/")) { throw new IllegalArgumentException("parameter must contain a positive integer and a timevalue, i.e. 10/1m, but was [" + value + "]"); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java index 96bb4c13cb8aa..88899a5d237ce 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; @@ -49,8 +48,8 @@ import java.nio.file.Path; import java.util.Arrays; -import java.util.concurrent.TimeUnit; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -69,7 +68,6 @@ private Node startNode() throws NodeValidationException { .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo")) .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent()) .put(Node.NODE_NAME_SETTING.getKey(), nodeName) - .put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m") .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created .put("transport.type", getTestTransportType()) .put(Node.NODE_DATA_SETTING.getKey(), true) diff --git a/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java b/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java index 814578a29f2f7..c1dfe43a99b21 100644 --- a/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java +++ b/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java @@ -99,6 +99,7 @@ StoredScriptSource getScriptFromClusterState(String id) { public void testMaxCompilationRateSetting() throws Exception { assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("10/1m"), is(Tuple.tuple(10, TimeValue.timeValueMinutes(1)))); assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("10/60s"), is(Tuple.tuple(10, TimeValue.timeValueMinutes(1)))); + assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("unlimited"), is(Tuple.tuple(0, TimeValue.ZERO))); assertException("10/m", IllegalArgumentException.class, "failed to parse [m]"); assertException("6/1.6m", IllegalArgumentException.class, "failed to parse [1.6m], fractional time values are not supported"); assertException("foo/bar", IllegalArgumentException.class, "could not parse [foo] as integer in value [foo/bar]"); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 7181dd9b6c9a2..a76c2a72464e8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -61,7 +61,6 @@ import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; @@ -215,7 +214,6 @@ private NodeConfigurationSource createNodeConfigurationSource(final String leade builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b"); builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b"); builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b"); - builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "2048/1m"); // wait short time for other active shards before actually deleting, default 30s not needed in tests builder.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)); builder.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes