Skip to content

Commit

Permalink
Scripting: split out compile limits and caching
Browse files Browse the repository at this point in the history
Phase 1 of adding compilation limits per context.
* Refactor compilation and caching into separate class, ContextCompiler
  which will be used per context.
* Disable compilation limit for tests.
* Add script.max_compilations_rate = "unlimited" setting which disables
  compilation limits.

Refs: elastic#50152
  • Loading branch information
stu-elastic committed Feb 26, 2020
1 parent fb64c18 commit d8af531
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,6 @@ private void createConfiguration() {
// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
defaultConfig.put("cluster.routing.allocation.disk.watermark.low", "1b");
defaultConfig.put("cluster.routing.allocation.disk.watermark.high", "1b");
// increase script compilation limit since tests can rapid-fire script compilations
defaultConfig.put("script.max_compilations_rate", "2048/1m");
if (getVersion().getMajor() >= 6) {
defaultConfig.put("cluster.routing.allocation.disk.watermark.flood_stage", "1b");
}
Expand Down
249 changes: 249 additions & 0 deletions server/src/main/java/org/elasticsearch/script/ContextCompiler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.script;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Map;
import java.util.Objects;

public class ContextCompiler {

private static final Logger logger = LogManager.getLogger(ScriptService.class);

private Cache<CacheKey, Object> cache;
private final ScriptMetrics scriptMetrics = new ScriptMetrics();

private final Object lock = new Object();

private Tuple<Integer, TimeValue> rate;
private long lastInlineCompileTime;
private double scriptsPerTimeWindow;
private double compilesAllowedPerNano;

// Cache settings
private int cacheSize;
private TimeValue cacheExpire;

public ContextCompiler(
int cacheMaxSize,
TimeValue cacheExpire,
Tuple<Integer, TimeValue> maxCompilationRate
) {
CacheBuilder<CacheKey, Object> cacheBuilder = CacheBuilder.builder();
if (cacheMaxSize >= 0) {
cacheBuilder.setMaximumWeight(cacheMaxSize);
}

if (cacheExpire.getNanos() != 0) {
cacheBuilder.setExpireAfterAccess(cacheExpire);
}

logger.debug("using script cache with max_size [{}], expire [{}]", cacheMaxSize, cacheExpire);
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();

this.lastInlineCompileTime = System.nanoTime();

this.cacheSize = cacheMaxSize;
this.cacheExpire = cacheExpire;
this.setMaxCompilationRate(maxCompilationRate);
}

private Cache<CacheKey,Object> buildCache() {
CacheBuilder<CacheKey, Object> cacheBuilder = CacheBuilder.builder();
if (cacheSize >= 0) {
cacheBuilder.setMaximumWeight(cacheSize);
}
if (cacheExpire.getNanos() != 0) {
cacheBuilder.setExpireAfterAccess(cacheExpire);
}
return cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
}

<FactoryType> FactoryType compile(
ScriptContext<FactoryType> context,
ScriptEngine scriptEngine,
String id,
String idOrCode,
ScriptType type,
Map<String, String> options
) {
String lang = scriptEngine.getType();
CacheKey cacheKey = new CacheKey(lang, idOrCode, context.name, options);
Object compiledScript = cache.get(cacheKey);

if (compiledScript != null) {
return context.factoryClazz.cast(compiledScript);
}

// Synchronize so we don't compile scripts many times during multiple shards all compiling a script
synchronized (lock) {
// Retrieve it again in case it has been put by a different thread
compiledScript = cache.get(cacheKey);

if (compiledScript == null) {
try {
// Either an un-cached inline script or indexed script
// If the script type is inline the name will be the same as the code for identification in exceptions
// but give the script engine the chance to be better, give it separate name + source code
// for the inline case, then its anonymous: null.
if (logger.isTraceEnabled()) {
logger.trace("context [{}]: compiling script, type: [{}], lang: [{}], options: [{}]", context.name, type,
lang, options);
}
// Check whether too many compilations have happened
checkCompilationLimit();
compiledScript = scriptEngine.compile(id, idOrCode, context, options);
} catch (ScriptException good) {
// TODO: remove this try-catch completely, when all script engines have good exceptions!
throw good; // its already good
} catch (Exception exception) {
throw new GeneralScriptException("Failed to compile " + type + " script [" + id + "] using lang [" + lang + "]",
exception);
}

// Since the cache key is the script content itself we don't need to
// invalidate/check the cache if an indexed script changes.
scriptMetrics.onCompilation();
cache.put(cacheKey, compiledScript);
}

}

return context.factoryClazz.cast(compiledScript);
}

public ScriptStats stats() {
return scriptMetrics.stats();
}

/**
* Check whether there have been too many compilations within the last minute, throwing a circuit breaking exception if so.
* This is a variant of the token bucket algorithm: https://en.wikipedia.org/wiki/Token_bucket
*
* It can be thought of as a bucket with water, every time the bucket is checked, water is added proportional to the amount of time that
* elapsed since the last time it was checked. If there is enough water, some is removed and the request is allowed. If there is not
* enough water the request is denied. Just like a normal bucket, if water is added that overflows the bucket, the extra water/capacity
* is discarded - there can never be more water in the bucket than the size of the bucket.
*/
void checkCompilationLimit() {
if (rate.v1() == 0 && rate.v2().getNanos() == 0) {
// unlimited
return;
}

long now = System.nanoTime();
long timePassed = now - lastInlineCompileTime;
lastInlineCompileTime = now;

scriptsPerTimeWindow += (timePassed) * compilesAllowedPerNano;

// It's been over the time limit anyway, readjust the bucket to be level
if (scriptsPerTimeWindow > rate.v1()) {
scriptsPerTimeWindow = rate.v1();
}

// If there is enough tokens in the bucket, allow the request and decrease the tokens by 1
if (scriptsPerTimeWindow >= 1) {
scriptsPerTimeWindow -= 1.0;
} else {
scriptMetrics.onCompilationLimit();
// Otherwise reject the request
throw new CircuitBreakingException("[script] Too many dynamic script compilations within, max: [" +
rate.v1() + "/" + rate.v2() +"]; please use indexed, or scripts with parameters instead; " +
"this limit can be changed by the [script.max_compilations_rate] setting",
CircuitBreaker.Durability.TRANSIENT);
}
}

/**
* This configures the maximum script compilations per five minute window.
*
* @param newRate the new expected maximum number of compilations per five minute window
*/
void setMaxCompilationRate(Tuple<Integer, TimeValue> newRate) {
synchronized (lock) {
this.rate = newRate;
// Reset the counter to allow new compilations
this.scriptsPerTimeWindow = rate.v1();
this.compilesAllowedPerNano = ((double) rate.v1()) / newRate.v2().nanos();

this.cache = buildCache();
}
}

/**
* A small listener for the script cache that calls each
* {@code ScriptEngine}'s {@code scriptRemoved} method when the
* script has been removed from the cache
*/
private class ScriptCacheRemovalListener implements RemovalListener<CacheKey, Object> {
@Override
public void onRemoval(RemovalNotification<CacheKey, Object> notification) {
if (logger.isDebugEnabled()) {
logger.debug(
"removed [{}] from cache, reason: [{}]",
notification.getValue(),
notification.getRemovalReason()
);
}
scriptMetrics.onCacheEviction();
}
}

private static final class CacheKey {
final String lang;
final String idOrCode;
final String context;
final Map<String, String> options;

private CacheKey(String lang, String idOrCode, String context, Map<String, String> options) {
this.lang = lang;
this.idOrCode = idOrCode;
this.context = context;
this.options = options;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(lang, cacheKey.lang) &&
Objects.equals(idOrCode, cacheKey.idOrCode) &&
Objects.equals(context, cacheKey.context) &&
Objects.equals(options, cacheKey.options);
}

@Override
public int hashCode() {
return Objects.hash(lang, idOrCode, context, options);
}
}
}
18 changes: 17 additions & 1 deletion server/src/main/java/org/elasticsearch/script/ScriptContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.script;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.lang.reflect.Method;

/**
Expand Down Expand Up @@ -68,8 +71,13 @@ public final class ScriptContext<FactoryType> {
/** A class that is an instance of a script. */
public final Class<?> instanceClazz;

public final int cacheSizeDefault;
public final TimeValue cacheExpireDefault;
public final Tuple<Integer, TimeValue> maxCompilationRateDefault;

/** Construct a context with the related instance and compiled classes. */
public ScriptContext(String name, Class<FactoryType> factoryClazz) {
public ScriptContext(String name, Class<FactoryType> factoryClazz, int cacheSizeDefault, TimeValue cacheExpireDefault,
Tuple<Integer, TimeValue> maxCompilationRateDefault) {
this.name = name;
this.factoryClazz = factoryClazz;
Method newInstanceMethod = findMethod("FactoryType", factoryClazz, "newInstance");
Expand All @@ -90,6 +98,14 @@ public ScriptContext(String name, Class<FactoryType> factoryClazz) {
+ factoryClazz.getName() + "] for script context [" + name + "]");
}
instanceClazz = newInstanceMethod.getReturnType();

this.cacheSizeDefault = cacheSizeDefault;
this.cacheExpireDefault = cacheExpireDefault;
this.maxCompilationRateDefault = maxCompilationRateDefault;
}

public ScriptContext(String name, Class<FactoryType> factoryClazz) {
this(name, factoryClazz, 100, TimeValue.timeValueMillis(0), new Tuple<>(75, TimeValue.timeValueMinutes(5)));
}

/** Returns a method with the given name, or throws an exception if multiple are found. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class ScriptService implements Closeable, ClusterStateApplier {
// this allows you to easily define rates
static final Function<String, Tuple<Integer, TimeValue>> MAX_COMPILATION_RATE_FUNCTION =
(String value) -> {
if (value.toLowerCase() == "unlimited") {
return Tuple.tuple(0, TimeValue.ZERO);
}
if (value.contains("/") == false || value.startsWith("/") || value.endsWith("/")) {
throw new IllegalArgumentException("parameter must contain a positive integer and a timevalue, i.e. 10/1m, but was [" +
value + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
Expand All @@ -49,8 +48,8 @@

import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
Expand All @@ -69,7 +68,6 @@ private Node startNode() throws NodeValidationException {
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put("transport.type", getTestTransportType())
.put(Node.NODE_DATA_SETTING.getKey(), true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ StoredScriptSource getScriptFromClusterState(String id) {
public void testMaxCompilationRateSetting() throws Exception {
assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("10/1m"), is(Tuple.tuple(10, TimeValue.timeValueMinutes(1))));
assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("10/60s"), is(Tuple.tuple(10, TimeValue.timeValueMinutes(1))));
assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("unlimited"), is(Tuple.tuple(0, TimeValue.ZERO)));
assertException("10/m", IllegalArgumentException.class, "failed to parse [m]");
assertException("6/1.6m", IllegalArgumentException.class, "failed to parse [1.6m], fractional time values are not supported");
assertException("foo/bar", IllegalArgumentException.class, "could not parse [foo] as integer in value [foo/bar]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
Expand Down Expand Up @@ -215,7 +214,6 @@ private NodeConfigurationSource createNodeConfigurationSource(final String leade
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b");
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b");
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b");
builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "2048/1m");
// wait short time for other active shards before actually deleting, default 30s not needed in tests
builder.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS));
builder.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes
Expand Down

0 comments on commit d8af531

Please sign in to comment.