diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6cf60e2b123ca..20473fba3aa00 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -185,6 +185,11 @@ public class IndexShard extends AbstractIndexShardComponent { */ public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close"; + /** + * Controls how frequently we automatically refresh the near-real-time searcher. + */ + public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; + @Inject public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService, @@ -225,12 +230,12 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState(); this.localNode = clusterService.localNode(); state = IndexShardState.CREATED; - this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); indexSettingsService.addListener(applyRefreshSettings); this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1)); + /* create engine config */ this.config = new EngineConfig(shardId, indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false), @@ -241,6 +246,16 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS logger.debug("state: [CREATED]"); this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false"); + + // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing + // so, make sure we periodically call it, this need to be a small enough value so mergine will actually + // happen and reduce the number of segments + if (mergeInterval.millis() > 0) { + mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger()); + logger.debug("scheduling optimizer / merger every {}", mergeInterval); + } else { + logger.debug("scheduled optimizer / merger disabled"); + } } public MergeSchedulerProvider mergeScheduler() { @@ -802,7 +817,7 @@ public void finalizeRecovery() { // clear unreferenced files translog.clearUnreferenced(); engine().refresh("recovery_finalization"); - startScheduledTasksIfNeeded(); + startEngineRefresher(); config.setEnableGcDeletes(true); } @@ -934,22 +949,13 @@ protected final void verifyStarted() throws IllegalIndexShardStateException { } } - private void startScheduledTasksIfNeeded() { + private void startEngineRefresher() { if (refreshInterval.millis() > 0) { refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); logger.debug("scheduling refresher every {}", refreshInterval); } else { logger.debug("scheduled refresher disabled"); } - // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing - // so, make sure we periodically call it, this need to be a small enough value so mergine will actually - // happen and reduce the number of segments - if (mergeInterval.millis() > 0) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger()); - logger.debug("scheduling optimizer / merger every {}", mergeInterval); - } else { - logger.debug("scheduled optimizer / merger disabled"); - } } private Query filterQueryIfNeeded(Query query, String[] types) { @@ -960,8 +966,6 @@ private Query filterQueryIfNeeded(Query query, String[] types) { return query; } - public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; - public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) { this.failedEngineListener.delegates.add(failedEngineListener); } @@ -1122,7 +1126,8 @@ private void reschedule() { class EngineMerger implements Runnable { @Override public void run() { - if (!engine().possibleMergeNeeded()) { + final Engine engine = engineUnsafe(); + if (engine == null || engine.possibleMergeNeeded() == false) { synchronized (mutex) { if (state != IndexShardState.CLOSED) { mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this); @@ -1134,7 +1139,7 @@ public void run() { @Override public void run() { try { - engine().maybeMerge(); + engine.maybeMerge(); } catch (EngineClosedException e) { // we are being closed, ignore } catch (OptimizeFailedEngineException e) {