From 7a0c02e88ea1883b122d0f59ee6f12d84d6fdb18 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 11 Feb 2019 13:38:16 +0100 Subject: [PATCH] Do not schedule Refresh/Translog/GlobalCheckpoint tasks for closed indices (#38329) Replicated closed indices do not need to be refreshed, neither they need their translogs or global checkpoint to be fsync. This pull request changes how `BaseAsyncTask` tasks are rescheduled in `IndexService` instances so that the tasks are rescheduled only when the index is opened. Relates to #33888 --- .../org/elasticsearch/index/IndexService.java | 11 ++- .../index/IndexServiceTests.java | 80 ++++++++++++++++++- 2 files changed, 84 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 2a29f1c63667f..e56d7db5f1c7f 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -673,7 +673,7 @@ public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData // once we change the refresh interval we schedule yet another refresh // to ensure we are in a clean and predictable state. // it doesn't matter if we move from or to -1 in both cases we want - // docs to become visible immediately. This also flushes all pending indexing / search reqeusts + // docs to become visible immediately. This also flushes all pending indexing / search requests // that are waiting for a refresh. threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() { @Override @@ -830,17 +830,20 @@ private void sync(final Consumer sync, final String source) { } abstract static class BaseAsyncTask extends AbstractAsyncTask { + protected final IndexService indexService; - BaseAsyncTask(IndexService indexService, TimeValue interval) { + BaseAsyncTask(final IndexService indexService, final TimeValue interval) { super(indexService.logger, indexService.threadPool, interval, true); this.indexService = indexService; rescheduleIfNecessary(); } + @Override protected boolean mustReschedule() { - // don't re-schedule if its closed or if we don't have a single shard here..., we are done - return indexService.closed.get() == false; + // don't re-schedule if the IndexService instance is closed or if the index is closed + return indexService.closed.get() == false + && indexService.indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN; } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index a47d4db2a2579..e5e554818c020 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -47,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.core.IsEqual.equalTo; /** Unit test(s) for IndexService */ @@ -109,7 +111,6 @@ protected String getThreadPool() { latch2.get().countDown(); assertEquals(2, count.get()); - task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) { @Override protected void runInternal() { @@ -117,6 +118,34 @@ protected void runInternal() { } }; assertTrue(task.mustReschedule()); + + // now close the index + final Index index = indexService.index(); + assertAcked(client().admin().indices().prepareClose(index.getName())); + awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + + final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); + assertNotSame(indexService, closedIndexService); + assertFalse(task.mustReschedule()); + assertFalse(task.isClosed()); + assertEquals(1000000, task.getInterval().millis()); + + // now reopen the index + assertAcked(client().admin().indices().prepareOpen(index.getName())); + awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); + assertNotSame(closedIndexService, indexService); + + task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(100000)) { + @Override + protected void runInternal() { + + } + }; + assertTrue(task.mustReschedule()); + assertFalse(task.isClosed()); + assertTrue(task.isScheduled()); + indexService.close("simon says", false); assertFalse("no shards left", task.mustReschedule()); assertTrue(task.isScheduled()); @@ -124,7 +153,7 @@ protected void runInternal() { assertFalse(task.isScheduled()); } - public void testRefreshTaskIsUpdated() throws IOException { + public void testRefreshTaskIsUpdated() throws Exception { IndexService indexService = createIndex("test", Settings.EMPTY); IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask(); assertEquals(1000, refreshTask.getInterval().millis()); @@ -167,12 +196,35 @@ public void testRefreshTaskIsUpdated() throws IOException { assertTrue(refreshTask.isScheduled()); assertFalse(refreshTask.isClosed()); assertEquals(200, refreshTask.getInterval().millis()); + + // now close the index + final Index index = indexService.index(); + assertAcked(client().admin().indices().prepareClose(index.getName())); + awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + + final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); + assertNotSame(indexService, closedIndexService); + assertNotSame(refreshTask, closedIndexService.getRefreshTask()); + assertFalse(closedIndexService.getRefreshTask().mustReschedule()); + assertFalse(closedIndexService.getRefreshTask().isClosed()); + assertEquals(200, closedIndexService.getRefreshTask().getInterval().millis()); + + // now reopen the index + assertAcked(client().admin().indices().prepareOpen(index.getName())); + awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); + assertNotSame(closedIndexService, indexService); + refreshTask = indexService.getRefreshTask(); + assertTrue(indexService.getRefreshTask().mustReschedule()); + assertTrue(refreshTask.isScheduled()); + assertFalse(refreshTask.isClosed()); + indexService.close("simon says", false); assertFalse(refreshTask.isScheduled()); assertTrue(refreshTask.isClosed()); } - public void testFsyncTaskIsRunning() throws IOException { + public void testFsyncTaskIsRunning() throws Exception { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build(); IndexService indexService = createIndex("test", settings); @@ -182,6 +234,28 @@ public void testFsyncTaskIsRunning() throws IOException { assertTrue(fsyncTask.mustReschedule()); assertTrue(fsyncTask.isScheduled()); + // now close the index + final Index index = indexService.index(); + assertAcked(client().admin().indices().prepareClose(index.getName())); + awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + + final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); + assertNotSame(indexService, closedIndexService); + assertNotSame(fsyncTask, closedIndexService.getFsyncTask()); + assertFalse(closedIndexService.getFsyncTask().mustReschedule()); + assertFalse(closedIndexService.getFsyncTask().isClosed()); + assertEquals(5000, closedIndexService.getFsyncTask().getInterval().millis()); + + // now reopen the index + assertAcked(client().admin().indices().prepareOpen(index.getName())); + awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); + assertNotSame(closedIndexService, indexService); + fsyncTask = indexService.getFsyncTask(); + assertTrue(indexService.getRefreshTask().mustReschedule()); + assertTrue(fsyncTask.isScheduled()); + assertFalse(fsyncTask.isClosed()); + indexService.close("simon says", false); assertFalse(fsyncTask.isScheduled()); assertTrue(fsyncTask.isClosed());