Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lucene merges should run on the target shard during recovery #10463

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public class IndexShard extends AbstractIndexShardComponent {
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";

/**
* Controls how frequently we automatically refresh the near-real-time searcher.
*/
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";

@Inject
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
Expand Down Expand Up @@ -225,12 +230,12 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS
assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState();
this.localNode = clusterService.localNode();
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
indexSettingsService.addListener(applyRefreshSettings);

this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1));

/* create engine config */
this.config = new EngineConfig(shardId,
indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false),
Expand All @@ -241,6 +246,16 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false");

// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
}
}

public MergeSchedulerProvider mergeScheduler() {
Expand Down Expand Up @@ -802,7 +817,7 @@ public void finalizeRecovery() {
// clear unreferenced files
translog.clearUnreferenced();
engine().refresh("recovery_finalization");
startScheduledTasksIfNeeded();
startEngineRefresher();
config.setEnableGcDeletes(true);
}

Expand Down Expand Up @@ -934,22 +949,13 @@ protected final void verifyStarted() throws IllegalIndexShardStateException {
}
}

private void startScheduledTasksIfNeeded() {
private void startEngineRefresher() {
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
logger.debug("scheduling refresher every {}", refreshInterval);
} else {
logger.debug("scheduled refresher disabled");
}
// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
}
}

private Query filterQueryIfNeeded(Query query, String[] types) {
Expand All @@ -960,8 +966,6 @@ private Query filterQueryIfNeeded(Query query, String[] types) {
return query;
}

public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";

public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
this.failedEngineListener.delegates.add(failedEngineListener);
}
Expand Down Expand Up @@ -1122,7 +1126,8 @@ private void reschedule() {
class EngineMerger implements Runnable {
@Override
public void run() {
if (!engine().possibleMergeNeeded()) {
final Engine engine = engineUnsafe();
if (engine == null || engine.possibleMergeNeeded() == false) {
synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
Expand All @@ -1134,7 +1139,7 @@ public void run() {
@Override
public void run() {
try {
engine().maybeMerge();
engine.maybeMerge();
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (OptimizeFailedEngineException e) {
Expand Down