Skip to content

Commit

Permalink
Do not schedule Refresh/Translog/GlobalCheckpoint tasks for closed in…
Browse files Browse the repository at this point in the history
…dices (elastic#38329)

Replicated closed indices do not need to be refreshed, neither they need 
their translogs or global checkpoint to be fsync. This pull request changes 
how `BaseAsyncTask` tasks are rescheduled in `IndexService` instances 
so that the tasks are rescheduled only when the index is opened.

Relates to elastic#33888
  • Loading branch information
tlrx committed Mar 1, 2019
1 parent 224ee2e commit 7a0c02e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 7 deletions.
11 changes: 7 additions & 4 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
// docs to become visible immediately. This also flushes all pending indexing / search reqeusts
// docs to become visible immediately. This also flushes all pending indexing / search requests
// that are waiting for a refresh.
threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
@Override
Expand Down Expand Up @@ -830,17 +830,20 @@ private void sync(final Consumer<IndexShard> sync, final String source) {
}

abstract static class BaseAsyncTask extends AbstractAsyncTask {

protected final IndexService indexService;

BaseAsyncTask(IndexService indexService, TimeValue interval) {
BaseAsyncTask(final IndexService indexService, final TimeValue interval) {
super(indexService.logger, indexService.threadPool, interval, true);
this.indexService = indexService;
rescheduleIfNecessary();
}

@Override
protected boolean mustReschedule() {
// don't re-schedule if its closed or if we don't have a single shard here..., we are done
return indexService.closed.get() == false;
// don't re-schedule if the IndexService instance is closed or if the index is closed
return indexService.closed.get() == false
&& indexService.indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
Expand All @@ -47,6 +48,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.core.IsEqual.equalTo;

/** Unit test(s) for IndexService */
Expand Down Expand Up @@ -109,22 +111,49 @@ protected String getThreadPool() {
latch2.get().countDown();
assertEquals(2, count.get());


task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) {
@Override
protected void runInternal() {

}
};
assertTrue(task.mustReschedule());

// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
assertFalse(task.mustReschedule());
assertFalse(task.isClosed());
assertEquals(1000000, task.getInterval().millis());

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);

task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(100000)) {
@Override
protected void runInternal() {

}
};
assertTrue(task.mustReschedule());
assertFalse(task.isClosed());
assertTrue(task.isScheduled());

indexService.close("simon says", false);
assertFalse("no shards left", task.mustReschedule());
assertTrue(task.isScheduled());
task.close();
assertFalse(task.isScheduled());
}

public void testRefreshTaskIsUpdated() throws IOException {
public void testRefreshTaskIsUpdated() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask();
assertEquals(1000, refreshTask.getInterval().millis());
Expand Down Expand Up @@ -167,12 +196,35 @@ public void testRefreshTaskIsUpdated() throws IOException {
assertTrue(refreshTask.isScheduled());
assertFalse(refreshTask.isClosed());
assertEquals(200, refreshTask.getInterval().millis());

// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
assertNotSame(refreshTask, closedIndexService.getRefreshTask());
assertFalse(closedIndexService.getRefreshTask().mustReschedule());
assertFalse(closedIndexService.getRefreshTask().isClosed());
assertEquals(200, closedIndexService.getRefreshTask().getInterval().millis());

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
refreshTask = indexService.getRefreshTask();
assertTrue(indexService.getRefreshTask().mustReschedule());
assertTrue(refreshTask.isScheduled());
assertFalse(refreshTask.isClosed());

indexService.close("simon says", false);
assertFalse(refreshTask.isScheduled());
assertTrue(refreshTask.isClosed());
}

public void testFsyncTaskIsRunning() throws IOException {
public void testFsyncTaskIsRunning() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build();
IndexService indexService = createIndex("test", settings);
Expand All @@ -182,6 +234,28 @@ public void testFsyncTaskIsRunning() throws IOException {
assertTrue(fsyncTask.mustReschedule());
assertTrue(fsyncTask.isScheduled());

// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
assertNotSame(fsyncTask, closedIndexService.getFsyncTask());
assertFalse(closedIndexService.getFsyncTask().mustReschedule());
assertFalse(closedIndexService.getFsyncTask().isClosed());
assertEquals(5000, closedIndexService.getFsyncTask().getInterval().millis());

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
fsyncTask = indexService.getFsyncTask();
assertTrue(indexService.getRefreshTask().mustReschedule());
assertTrue(fsyncTask.isScheduled());
assertFalse(fsyncTask.isClosed());

indexService.close("simon says", false);
assertFalse(fsyncTask.isScheduled());
assertTrue(fsyncTask.isClosed());
Expand Down

0 comments on commit 7a0c02e

Please sign in to comment.