From b7d54cb0f33c06e2112eee9b050ca0d525c96d03 Mon Sep 17 00:00:00 2001 From: Rishav Sagar Date: Sun, 13 Aug 2023 15:28:30 +0530 Subject: [PATCH] Cleanup Unreferenced file on segment merge failure Signed-off-by: Rishav Sagar --- CHANGELOG.md | 1 + .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexSettings.java | 27 ++ .../org/opensearch/index/engine/Engine.java | 41 +++ .../index/engine/InternalEngineTests.java | 323 ++++++++++++++++++ .../index/engine/EngineTestCase.java | 33 +- .../test/OpenSearchIntegTestCase.java | 12 + 7 files changed, 436 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c64865a01eb37..a047d9a5c9af9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -151,6 +151,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) - Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177)) - Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412)) +- Cleanup Unreferenced file on segment merge failure ([#9483](https://github.com/opensearch-project/OpenSearch/pull/9483)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 68d02151b50f5..30a3550d62c8a 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -171,6 +171,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexSettings.INDEX_SEARCH_IDLE_AFTER, IndexSettings.INDEX_SEARCH_THROTTLED, + IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index ec719c99e163f..4f5844f7f6a3e 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -511,6 +511,18 @@ public final class IndexSettings { Property.Dynamic ); + /** + * This setting controls if unreferenced files will be cleaned up in case segment merge fails due to disk full. + * + * Defaults to true which means unreferenced files will be cleaned up in case segment merge fails. + */ + public static final Setting INDEX_UNREFERENCED_FILE_CLEANUP = Setting.boolSetting( + "index.unreferenced_file_cleanup.enabled", + true, + Property.IndexScope, + Property.Dynamic + ); + /** * Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an * operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted @@ -676,6 +688,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile String defaultPipeline; private volatile String requiredPipeline; private volatile boolean searchThrottled; + private volatile boolean shouldCleanupUnreferencedFile; private volatile long mappingNestedFieldsLimit; private volatile long mappingNestedDocsLimit; private volatile long mappingTotalFieldsLimit; @@ -793,6 +806,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti } this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); + this.shouldCleanupUnreferencedFile = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings); @@ -905,6 +919,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); + scopedSettings.addSettingsUpdateConsumer(INDEX_UNREFERENCED_FILE_CLEANUP, this::setShouldCleanupUnreferencedFile); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis); scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, this::setMappingNestedFieldsLimit); scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, this::setMappingNestedDocsLimit); @@ -1530,6 +1545,18 @@ private void setSearchThrottled(boolean searchThrottled) { this.searchThrottled = searchThrottled; } + /** + * Returns true if unreferenced files should be cleaned up on merge failure for this index. + * + */ + public boolean shouldCleanupUnreferencedFile() { + return shouldCleanupUnreferencedFile; + } + + private void setShouldCleanupUnreferencedFile(boolean shouldCleanupUnreferencedFile) { + this.shouldCleanupUnreferencedFile = shouldCleanupUnreferencedFile; + } + public long getMappingNestedFieldsLimit() { return mappingNestedFieldsLimit; } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 80e2eb92e6956..7bc0a43bc644f 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -38,7 +38,10 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; @@ -950,6 +953,10 @@ protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegm } } + boolean shouldCleanupUnreferencedFile() { + return engineConfig.getIndexSettings().shouldCleanupUnreferencedFile(); + } + private Map getSegmentFileSizes(SegmentReader segmentReader) { Directory directory = null; SegmentCommitInfo segmentCommitInfo = segmentReader.getSegmentInfo(); @@ -1291,6 +1298,20 @@ public void failEngine(String reason, @Nullable Exception failure) { ); } } + + // If cleanup unreferenced flag is enabled and force merge or regular merge failed due to disk full, + // cleanup all unreferenced files created during failed and reset the shard state back to last + // Lucene Commit. + if (shouldCleanupUnreferencedFile() + && (reason.equals("force merge") || reason.equals("merge failed")) + && failure != null + && failure.getCause() != null + && failure.getCause().getCause() != null + && failure.getCause().getCause().getMessage() != null + && failure.getCause().getCause().getMessage().contains("No space left on device")) { + cleanUpUnreferencedFiles(); + } + eventListener.onFailedEngine(reason, failure); } } catch (Exception inner) { @@ -1309,6 +1330,26 @@ public void failEngine(String reason, @Nullable Exception failure) { } } + /** + * Cleanup all unreferenced files generated during failed segment merge. This resets shard state to last Lucene + * commit. + */ + private void cleanUpUnreferencedFiles() { + try ( + IndexWriter writer = new IndexWriter( + store.directory(), + new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) + .setCommitOnClose(false) + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND) + ) + ) { + // do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files + } catch (Exception ex) { + logger.error("Error while deleting unreferenced file ", ex); + } + } + /** Check whether the engine should be failed */ protected boolean maybeFailEngine(String source, Exception e) { if (Lucene.isCorruptionException(e)) { diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 0f7a571987df0..ca6fc1b021916 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -40,6 +40,7 @@ import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; +import org.apache.lucene.codecs.LiveDocsFormat; import org.apache.lucene.document.Field; import org.apache.lucene.document.KeywordField; import org.apache.lucene.document.LongPoint; @@ -78,6 +79,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.apache.lucene.tests.mockfile.ExtrasFS; import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -152,6 +154,7 @@ import org.opensearch.index.translog.TranslogException; import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.MockLogAppender; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; @@ -3229,6 +3232,326 @@ public void testFailStart() throws IOException { } } + public void testUnreferencedFileCleanUpOnSegmentMergeFailureWithCleanUpEnabled() throws Exception { + MockDirectoryWrapper wrapper = newMockDirectory(); + final CountDownLatch cleanupCompleted = new CountDownLatch(1); + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + public boolean didFail1; + public boolean didFail2; + + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + if (!doFail) { + return; + } + + // Fail segment merge with diskfull during merging terms. + if (callStackContainsAnyOf("mergeTerms") && !didFail1) { + didFail1 = true; + throw new IOException("No space left on device"); + } + if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) { + didFail2 = true; + throw new IOException("No space left on device"); + } + } + }; + + wrapper.failOn(fail); + try { + Store store = createStore(wrapper); + final Engine.EventListener eventListener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + try { + // extra0 file is added as a part of + // https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html + // Safe to remove from file count along with write.lock without impacting the test + long fileCount = Arrays.stream(store.directory().listAll()) + .filter(file -> file.equals("write.lock") == false && ExtrasFS.isExtra(file) == false) + .count(); + + // Since only one document is committed and unreferenced files are cleaned up, + // there are 4 files (*cfs, *cfe, *si and segments_*). + assertThat(fileCount, equalTo(4L)); + wrapper.close(); + store.close(); + engine.close(); + cleanupCompleted.countDown(); + } catch (IOException ex) { + throw new AssertionError(ex); + } + } + }; + + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final AtomicLong retentionLeasesVersion = new AtomicLong(); + final AtomicReference retentionLeasesHolder = new AtomicReference<>( + new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList()) + ); + InternalEngine engine = createEngine( + config( + defaultSettings, + store, + createTempDir(), + newMergePolicy(), + null, + null, + null, + globalCheckpoint::get, + retentionLeasesHolder::get, + new NoneCircuitBreakerService(), + eventListener + ) + ); + + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + engine.flush(); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(1)); + + ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null); + engine.index(indexForDoc(doc2)); + engine.refresh("test"); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(2)); + + fail.setDoFail(); + // IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not. + expectThrowsAnyOf( + Arrays.asList(IOException.class, IllegalStateException.class), + () -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()) + ); + + assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS)); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + + public void testUnreferencedFileCleanUpOnSegmentMergeFailureWithCleanUpDisabled() throws Exception { + MockDirectoryWrapper wrapper = newMockDirectory(); + final CountDownLatch cleanupCompleted = new CountDownLatch(1); + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + public boolean didFail1; + public boolean didFail2; + + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + if (!doFail) { + return; + } + if (callStackContainsAnyOf("mergeTerms") && !didFail1) { + didFail1 = true; + throw new IOException("No space left on device"); + } + if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) { + didFail2 = true; + throw new IOException("No space left on device"); + } + } + }; + + wrapper.failOn(fail); + try { + Store store = createStore(wrapper); + final Engine.EventListener eventListener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + try { + // extra0 file is added as a part of + // https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html + // Safe to remove from file count along with write.lock without impacting the test + long fileCount = Arrays.stream(store.directory().listAll()) + .filter(file -> file.equals("write.lock") == false && ExtrasFS.isExtra(file) == false) + .count(); + + // Since now cleanup is not happening now, all unrefrenced files now be present as well. + assertThat(fileCount, equalTo(13L)); + wrapper.close(); + store.close(); + engine.close(); + cleanupCompleted.countDown(); + } catch (IOException ex) { + throw new AssertionError(ex); + } + } + }; + + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final AtomicLong retentionLeasesVersion = new AtomicLong(); + final AtomicReference retentionLeasesHolder = new AtomicReference<>( + new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList()) + ); + InternalEngine engine = createEngine( + config( + defaultSettings, + store, + createTempDir(), + newMergePolicy(), + null, + null, + null, + globalCheckpoint::get, + retentionLeasesHolder::get, + new NoneCircuitBreakerService(), + eventListener + ) + ); + + // Disable cleanup + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) + .settings( + Settings.builder().put(indexSettings.getSettings()).put(IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP.getKey(), false) + ) + .build(); + indexSettings.updateIndexMetadata(indexMetadata); + + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + engine.flush(); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(1)); + + ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null); + engine.index(indexForDoc(doc2)); + engine.refresh("test"); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(2)); + + fail.setDoFail(); + // IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not. + expectThrowsAnyOf( + Arrays.asList(IOException.class, IllegalStateException.class), + () -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()) + ); + + assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS)); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + + public void testUnreferencedFileCleanUpFailsOnSegmentMergeFailureWhenDirectoryClosed() throws Exception { + MockDirectoryWrapper wrapper = newMockDirectory(); + final CountDownLatch cleanupCompleted = new CountDownLatch(1); + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + public boolean didFail1; + public boolean didFail2; + + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + if (!doFail) { + return; + } + if (callStackContainsAnyOf("mergeTerms") && !didFail1) { + didFail1 = true; + throw new IOException("No space left on device"); + } + if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) { + didFail2 = true; + throw new IOException("No space left on device"); + } + } + }; + + wrapper.failOn(fail); + MockLogAppender mockAppender = MockLogAppender.createForLoggers(Loggers.getLogger(Engine.class, shardId)); + try { + Store store = createStore(wrapper); + final Engine.EventListener eventListener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + try { + store.close(); + engine.close(); + mockAppender.assertAllExpectationsMatched(); + mockAppender.close(); + cleanupCompleted.countDown(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + }; + + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final AtomicLong retentionLeasesVersion = new AtomicLong(); + final AtomicReference retentionLeasesHolder = new AtomicReference<>( + new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList()) + ); + InternalEngine engine = createEngine( + config( + defaultSettings, + store, + createTempDir(), + newMergePolicy(), + null, + null, + null, + globalCheckpoint::get, + retentionLeasesHolder::get, + new NoneCircuitBreakerService(), + eventListener + ) + ); + + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + engine.flush(); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(1)); + + ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null); + engine.index(indexForDoc(doc2)); + engine.refresh("test"); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(2)); + + fail.setDoFail(); + // Close the store so that unreferenced file cleanup will fail. + store.close(); + + final String message = "Error while deleting unreferenced file *"; + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation("expected message", Engine.class.getCanonicalName(), Level.ERROR, message) + ); + + // IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not. + expectThrowsAnyOf( + Arrays.asList(IOException.class, IllegalStateException.class), + () -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()) + ); + + assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS)); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + public void testSettings() { CodecService codecService = new CodecService(null, engine.config().getIndexSettings(), logger); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 961033db1daa3..15f9ee546fe6b 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -840,10 +840,39 @@ public EngineConfig config( final @Nullable Supplier maybeRetentionLeasesSupplier, final CircuitBreakerService breakerService ) { - final IndexWriterConfig iwc = newIndexWriterConfig(); - final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final Engine.EventListener eventListener = new Engine.EventListener() { }; // we don't need to notify anybody in this test + + return config( + indexSettings, + store, + translogPath, + mergePolicy, + externalRefreshListener, + internalRefreshListener, + indexSort, + maybeGlobalCheckpointSupplier, + maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY, + breakerService, + eventListener + ); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener externalRefreshListener, + final ReferenceManager.RefreshListener internalRefreshListener, + final Sort indexSort, + final @Nullable LongSupplier maybeGlobalCheckpointSupplier, + final @Nullable Supplier maybeRetentionLeasesSupplier, + final CircuitBreakerService breakerService, + final Engine.EventListener eventListener + ) { + final IndexWriterConfig iwc = newIndexWriterConfig(); + final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final List extRefreshListenerList = externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 382152652cafe..3232ce7ddb87b 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -1457,6 +1457,18 @@ protected ForceMergeResponse forceMerge() { return actionGet; } + protected ForceMergeResponse forceMerge(int maxNumSegments) { + waitForRelocation(); + ForceMergeResponse actionGet = client().admin() + .indices() + .prepareForceMerge() + .setMaxNumSegments(maxNumSegments) + .execute() + .actionGet(); + assertNoFailures(actionGet); + return actionGet; + } + /** * Returns true iff the given index exists otherwise false */