Skip to content

Commit

Permalink
Add upper limit for scroll expiry
Browse files Browse the repository at this point in the history
This change adds a dynamic cluster setting named `search.max_keep_alive`.
It is used as an upper limit for scroll expiry time in scroll queries and defaults to 1 hour.
This change also ensures that the existing setting `search.default_keep_alive` is always smaller than `search.max_keep_alive`.

Relates elastic#11511
  • Loading branch information
jimczi committed Aug 30, 2017
1 parent 432f162 commit 76a0bd9
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,31 @@ synchronized void addSettingsUpdater(SettingUpdater<?> updater) {
}

/**
* Adds a settings consumer that accepts the values for two settings. The consumer if only notified if one or both settings change.
* Adds a settings consumer that accepts the values for two settings.
* See {@link #addSettingsUpdateConsumer(Setting, Setting, BiConsumer, BiConsumer)} for details.
*/
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
addSettingsUpdateConsumer(a, b, consumer, (i, j) -> {} );

}

/**
* Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change
* and if the provided validator succeeded.
* <p>
* Note: Only settings registered in {@link SettingsModule} can be changed dynamically.
* </p>
* This method registers a compound updater that is useful if two settings are depending on each other. The consumer is always provided
* with both values even if only one of the two changes.
* This method registers a compound updater that is useful if two settings are depending on each other.
* The consumer is always provided with both values even if only one of the two changes.
*/
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer, BiConsumer<A, B> validator) {
if (a != get(a.getKey())) {
throw new IllegalArgumentException("Setting is not registered for key [" + a.getKey() + "]");
}
if (b != get(b.getKey())) {
throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]");
}
addSettingsUpdater(Setting.compoundUpdater(consumer, a, b, logger));
addSettingsUpdater(Setting.compoundUpdater(consumer, validator, a, b, logger));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ public void apply(Settings value, Settings current, Settings previous) {
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
SearchService.DEFAULT_KEEPALIVE_SETTING,
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.MAX_KEEPALIVE_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ AbstractScopedSettings.SettingUpdater<T> newUpdater(Consumer<T> consumer, Logger
* See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, BiConsumer)} and its usage for details.
*/
static <A, B> AbstractScopedSettings.SettingUpdater<Tuple<A, B>> compoundUpdater(final BiConsumer<A, B> consumer,
final Setting<A> aSetting, final Setting<B> bSetting, Logger logger) {
final BiConsumer<A, B> validator, final Setting<A> aSetting, final Setting<B> bSetting, Logger logger) {
final AbstractScopedSettings.SettingUpdater<A> aSettingUpdater = aSetting.newUpdater(null, logger);
final AbstractScopedSettings.SettingUpdater<B> bSettingUpdater = bSetting.newUpdater(null, logger);
return new AbstractScopedSettings.SettingUpdater<Tuple<A, B>>() {
Expand All @@ -490,7 +490,10 @@ public boolean hasChanged(Settings current, Settings previous) {

@Override
public Tuple<A, B> getValue(Settings current, Settings previous) {
return new Tuple<>(aSettingUpdater.getValue(current, previous), bSettingUpdater.getValue(current, previous));
A valueA = aSettingUpdater.getValue(current, previous);
B valueB = bSettingUpdater.getValue(current, previous);
validator.accept(valueA, valueB);
return new Tuple<>(valueA, valueB);
}

@Override
Expand Down
45 changes: 40 additions & 5 deletions core/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.InnerHitContextBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
Expand All @@ -106,14 +108,17 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;

import static org.elasticsearch.common.unit.TimeValue.timeValueHours;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;

public class SearchService extends AbstractLifecycleComponent implements IndexEventListener {

// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
public static final Setting<TimeValue> DEFAULT_KEEPALIVE_SETTING =
Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope);
Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope, Property.Dynamic);
public static final Setting<TimeValue> MAX_KEEPALIVE_SETTING =
Setting.positiveTimeSetting("search.max_keep_alive", timeValueHours(1), Property.NodeScope, Property.Dynamic);
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING =
Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope);
/**
Expand Down Expand Up @@ -147,7 +152,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final FetchPhase fetchPhase;

private final long defaultKeepAlive;
private volatile long defaultKeepAlive;

private volatile long maxKeepAlive;

private volatile TimeValue defaultSearchTimeout;

Expand All @@ -174,6 +181,10 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
this.defaultKeepAlive = DEFAULT_KEEPALIVE_SETTING.get(settings).millis();
this.maxKeepAlive = MAX_KEEPALIVE_SETTING.get(settings).millis();

clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_KEEPALIVE_SETTING, MAX_KEEPALIVE_SETTING,
this::setKeepAlives, this::validateKeepAlives);

this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval, Names.SAME);

Expand All @@ -184,6 +195,20 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
if (defaultKeepAlive.millis() > maxKeepAlive.millis()) {
throw new IllegalArgumentException("Default keep alive setting for scroll [" + DEFAULT_KEEPALIVE_SETTING.getKey() + "]" +
" should be smaller than max keep alive [" + MAX_KEEPALIVE_SETTING.getKey() + "], " +
"was (" + defaultKeepAlive.format() + " > " + maxKeepAlive.format() + ")");
}
}

private void setKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
validateKeepAlives(defaultKeepAlive, maxKeepAlive);
this.maxKeepAlive = maxKeepAlive.millis();
this.defaultKeepAlive = defaultKeepAlive.millis();
}

private void setDefaultSearchTimeout(TimeValue defaultSearchTimeout) {
this.defaultSearchTimeout = defaultSearchTimeout;
}
Expand Down Expand Up @@ -547,7 +572,7 @@ final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.S
if (request.scroll() != null && request.scroll().keepAlive() != null) {
keepAlive = request.scroll().keepAlive().millis();
}
context.keepAlive(keepAlive);
contextScrollKeepAlive(context, keepAlive);
context.lowLevelCancellation(lowLevelCancellation);
} catch (Exception e) {
context.close();
Expand Down Expand Up @@ -625,6 +650,16 @@ public void freeAllScrollContexts() {
}
}

private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
if (keepAlive > maxKeepAlive) {
throw new QueryPhaseExecutionException(context,
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive).format() + ") is too large. " +
"It must be less than (" + TimeValue.timeValueMillis(maxKeepAlive).format() + "). " +
"This limit can be set by changing the [" + MAX_KEEPALIVE_SETTING.getKey() + "] cluster level setting.");
}
context.keepAlive(keepAlive);
}

private void contextProcessing(SearchContext context) {
// disable timeout while executing a search
context.accessed(-1);
Expand Down Expand Up @@ -847,13 +882,13 @@ private void shortcutDocIdsToLoad(SearchContext context) {
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
}

private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
// process scroll
context.from(context.from() + context.size());
context.scrollContext().scroll = request.scroll();
// update the context keep alive based on the new scroll value
if (request.scroll() != null && request.scroll().keepAlive() != null) {
context.keepAlive(request.scroll().keepAlive().millis());
contextScrollKeepAlive(context, request.scroll().keepAlive().millis());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,20 @@ public void set(Integer a, Integer b) {
this.a = a;
this.b = b;
}

public void validate(Integer a, Integer b) {
if (Integer.signum(a) != Integer.signum(b)) {
throw new IllegalArgumentException("boom");
}
}
}


public void testComposite() {
Composite c = new Composite();
Setting<Integer> a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope);
Setting<Integer> b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope);
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, a, b, logger);
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger);
assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY));
assertNull(c.a);
assertNull(c.b);
Expand All @@ -392,6 +398,40 @@ public void testComposite() {

}

public void testCompositeValidator() {
Composite c = new Composite();
Setting<Integer> a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope);
Setting<Integer> b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope);
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger);
assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY));
assertNull(c.a);
assertNull(c.b);

Settings build = Settings.builder().put("foo.int.bar.a", 2).build();
assertTrue(settingUpdater.apply(build, Settings.EMPTY));
assertEquals(2, c.a.intValue());
assertEquals(1, c.b.intValue());

Integer aValue = c.a;
assertFalse(settingUpdater.apply(build, build));
assertSame(aValue, c.a);
Settings previous = build;
build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build();
assertTrue(settingUpdater.apply(build, previous));
assertEquals(2, c.a.intValue());
assertEquals(5, c.b.intValue());

Settings invalid = Settings.builder().put("foo.int.bar.a", -2).put("foo.int.bar.b", 5).build();
IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(invalid, previous));
assertThat(exc.getMessage(), equalTo("boom"));

// reset to default
assertTrue(settingUpdater.apply(Settings.EMPTY, build));
assertEquals(1, c.a.intValue());
assertEquals(1, c.b.intValue());

}

public void testListSettings() {
Setting<List<String>> listSetting = Setting.listSetting("foo.bar", Arrays.asList("foo,bar"), (s) -> s.toString(),
Property.Dynamic, Property.NodeScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.search.scroll;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -35,10 +37,12 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.After;

import java.io.IOException;
import java.util.Map;
Expand All @@ -54,6 +58,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand All @@ -63,6 +68,13 @@
* Tests for scrolling.
*/
public class SearchScrollIT extends ESIntegTestCase {
@After
public void cleanup() throws Exception {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull("*"))
.setTransientSettings(Settings.builder().putNull("*")));
}

public void testSimpleScrollQueryThenFetch() throws Exception {
client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 3)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
Expand Down Expand Up @@ -518,6 +530,70 @@ public void testCloseAndReopenOrDeleteWithActiveScroll() throws IOException {
}
}

public void testScrollInvalidDefaultKeepAlive() throws IOException {
IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () ->
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.max_keep_alive", "1m", "search.default_keep_alive", "2m")).get());
assertThat(exc.getMessage(), containsString("was (2 minutes > 1 minute)"));

assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "2m")).get());

assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.max_keep_alive", "2m")).get());


exc = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "3m")).get());
assertThat(exc.getMessage(), containsString("was (3 minutes > 2 minutes)"));

assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "1m")).get());

exc = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.max_keep_alive", "30s")).get());
assertThat(exc.getMessage(), containsString("was (1 minute > 30 seconds)"));
}

public void testInvalidScrollKeepAlive() throws IOException {
createIndex("test");
for (int i = 0; i < 10; i++) {
client().prepareIndex("test", "type1",
Integer.toString(i)).setSource(jsonBuilder().startObject().field("field", i).endObject()).execute().actionGet();
}
refresh();

Exception exc = expectThrows(Exception.class,
() -> client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(5)
.setScroll(TimeValue.timeValueHours(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet());
QueryPhaseExecutionException queryPhaseExecutionException =
(QueryPhaseExecutionException) ExceptionsHelper.unwrap(exc, QueryPhaseExecutionException.class);
assertNotNull(queryPhaseExecutionException);
assertThat(queryPhaseExecutionException.getMessage(), containsString("Keep alive for scroll (2 hours) is too large"));

SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(5)
.setScroll(TimeValue.timeValueMinutes(1))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
assertNotNull(searchResponse.getScrollId());
assertThat(searchResponse.getHits().getTotalHits(), equalTo(10L));
assertThat(searchResponse.getHits().getHits().length, equalTo(5));

exc = expectThrows(Exception.class,
() -> client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueHours(3)).get());
queryPhaseExecutionException =
(QueryPhaseExecutionException) ExceptionsHelper.unwrap(exc, QueryPhaseExecutionException.class);
assertNotNull(queryPhaseExecutionException);
assertThat(queryPhaseExecutionException.getMessage(), containsString("Keep alive for scroll (3 hours) is too large"));
}

private void assertToXContentResponse(ClearScrollResponse response, boolean succeed, int numFreed) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down
Loading

0 comments on commit 76a0bd9

Please sign in to comment.