Skip to content

Commit

Permalink
Cleanup Unreferenced file on segment merge failure
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <[email protected]>
  • Loading branch information
Rishav Sagar committed Aug 23, 2023
1 parent 60d272b commit b7d54cb
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
- Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177))
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
- Cleanup Unreferenced file on segment merge failure ([#9483](https://github.com/opensearch-project/OpenSearch/pull/9483))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexSettings.INDEX_SEARCH_THROTTLED,
IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@ public final class IndexSettings {
Property.Dynamic
);

/**
* This setting controls if unreferenced files will be cleaned up in case segment merge fails due to disk full.
*
* Defaults to true which means unreferenced files will be cleaned up in case segment merge fails.
*/
public static final Setting<Boolean> INDEX_UNREFERENCED_FILE_CLEANUP = Setting.boolSetting(
"index.unreferenced_file_cleanup.enabled",
true,
Property.IndexScope,
Property.Dynamic
);

/**
* Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an
* operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted
Expand Down Expand Up @@ -676,6 +688,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile String defaultPipeline;
private volatile String requiredPipeline;
private volatile boolean searchThrottled;
private volatile boolean shouldCleanupUnreferencedFile;
private volatile long mappingNestedFieldsLimit;
private volatile long mappingNestedDocsLimit;
private volatile long mappingTotalFieldsLimit;
Expand Down Expand Up @@ -793,6 +806,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
}

this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.shouldCleanupUnreferencedFile = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -905,6 +919,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
scopedSettings.addSettingsUpdateConsumer(INDEX_UNREFERENCED_FILE_CLEANUP, this::setShouldCleanupUnreferencedFile);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, this::setMappingNestedFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, this::setMappingNestedDocsLimit);
Expand Down Expand Up @@ -1530,6 +1545,18 @@ private void setSearchThrottled(boolean searchThrottled) {
this.searchThrottled = searchThrottled;
}

/**
* Returns true if unreferenced files should be cleaned up on merge failure for this index.
*
*/
public boolean shouldCleanupUnreferencedFile() {
return shouldCleanupUnreferencedFile;
}

private void setShouldCleanupUnreferencedFile(boolean shouldCleanupUnreferencedFile) {
this.shouldCleanupUnreferencedFile = shouldCleanupUnreferencedFile;
}

public long getMappingNestedFieldsLimit() {
return mappingNestedFieldsLimit;
}
Expand Down
41 changes: 41 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand Down Expand Up @@ -950,6 +953,10 @@ protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegm
}
}

boolean shouldCleanupUnreferencedFile() {
return engineConfig.getIndexSettings().shouldCleanupUnreferencedFile();
}

private Map<String, Long> getSegmentFileSizes(SegmentReader segmentReader) {
Directory directory = null;
SegmentCommitInfo segmentCommitInfo = segmentReader.getSegmentInfo();
Expand Down Expand Up @@ -1291,6 +1298,20 @@ public void failEngine(String reason, @Nullable Exception failure) {
);
}
}

// If cleanup unreferenced flag is enabled and force merge or regular merge failed due to disk full,
// cleanup all unreferenced files created during failed and reset the shard state back to last
// Lucene Commit.
if (shouldCleanupUnreferencedFile()
&& (reason.equals("force merge") || reason.equals("merge failed"))
&& failure != null
&& failure.getCause() != null
&& failure.getCause().getCause() != null
&& failure.getCause().getCause().getMessage() != null
&& failure.getCause().getCause().getMessage().contains("No space left on device")) {
cleanUpUnreferencedFiles();
}

eventListener.onFailedEngine(reason, failure);
}
} catch (Exception inner) {
Expand All @@ -1309,6 +1330,26 @@ public void failEngine(String reason, @Nullable Exception failure) {
}
}

/**
* Cleanup all unreferenced files generated during failed segment merge. This resets shard state to last Lucene
* commit.
*/
private void cleanUpUnreferencedFiles() {
try (
IndexWriter writer = new IndexWriter(
store.directory(),
new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND)
)
) {
// do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files
} catch (Exception ex) {
logger.error("Error while deleting unreferenced file ", ex);
}
}

/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Exception e) {
if (Lucene.isCorruptionException(e)) {
Expand Down
Loading

0 comments on commit b7d54cb

Please sign in to comment.