Skip to content

Commit

Permalink
Adding support for append only indices
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
  • Loading branch information
RS146BIJAY committed Dec 19, 2024
1 parent 1935650 commit 5334da3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.TransportWriteAction;
import org.opensearch.common.IndexingRetryException;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -297,20 +299,36 @@ public void markOperationAsExecuted(Engine.Result result) {
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
break;
case FAILURE:
executionResult = new BulkItemResponse(
current.id(),
docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(
request.index(),
docWriteRequest.id(),
result.getFailure(),
if (result.getFailure() instanceof IndexingRetryException) {
Engine.IndexResult indexResult = (Engine.IndexResult) result;
DocWriteResponse indexResponse = new IndexResponse(
primary.shardId(),
requestToExecute.id(),
result.getSeqNo(),
result.getTerm()
)
);
result.getTerm(),
indexResult.getVersion(),
indexResult.isCreated()
);

executionResult = new BulkItemResponse(current.id(), current.request().opType(), indexResponse);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
} else {
executionResult = new BulkItemResponse(
current.id(),
docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(
request.index(),
docWriteRequest.id(),
result.getFailure(),
result.getSeqNo(),
result.getTerm()
)
);
}
break;
default:
throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ValidationException;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -538,6 +539,11 @@ protected void doRun() {
if (docWriteRequest == null) {
continue;
}

if(addFailureIfIndexAppendOnlyAndOpsDeleteOrUpdate(docWriteRequest, i, concreteIndices)) {
continue;
}

if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
Expand Down Expand Up @@ -756,6 +762,30 @@ public void onTimeout(TimeValue timeout) {
});
}

private boolean addFailureIfIndexAppendOnlyAndOpsDeleteOrUpdate(DocWriteRequest<?> request, int idx,
final ConcreteIndices concreteIndices) {
Index concreteIndex = concreteIndices.getConcreteIndex(request.index());
if (concreteIndex == null) {
try {
concreteIndex = concreteIndices.resolveIfAbsent(request);
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
addFailure(request, idx, ex);
return true;
}
}

if ((request.opType() == DocWriteRequest.OpType.UPDATE || request.opType() == DocWriteRequest.OpType.DELETE)
&& indicesService.indexServiceSafe(concreteIndex).getIndexSettings().isAppendOnly() == true) {
ValidationException exception = new ValidationException();
exception.addValidationError("Operation [" + request.opType()
+ "] is not allowed for append only index " + request.index());
addFailure(request, idx, exception);
return true;
}

return false;
}

private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest<?> request, int idx, final Metadata metadata) {
if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) {
Exception exception = new IndexNotFoundException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexSettings.INDEX_SEARCH_THROTTLED,
IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP,
IndexSettings.INDEX_APPEND_ONLY,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
23 changes: 23 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,18 @@ public static IndexMergePolicy fromString(String text) {
Property.Dynamic
);

/**
* This setting controls if unreferenced files will be cleaned up in case segment merge fails due to disk full.
* <p>
* Defaults to true which means unreferenced files will be cleaned up in case segment merge fails.
*/
public static final Setting<Boolean> INDEX_APPEND_ONLY = Setting.boolSetting(
"index.append_only.enabled",
true,
Property.IndexScope,
Property.Dynamic
);

/**
* Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an
* operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted
Expand Down Expand Up @@ -830,6 +842,7 @@ public static IndexMergePolicy fromString(String text) {
private final RemoteStorePathStrategy remoteStorePathStrategy;
private final boolean isTranslogMetadataEnabled;
private volatile boolean allowDerivedField;
private volatile boolean isAppendOnly;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -1011,6 +1024,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti

this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.shouldCleanupUnreferencedFiles = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings);
this.isAppendOnly = INDEX_APPEND_ONLY.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -1170,6 +1184,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
scopedSettings.addSettingsUpdateConsumer(INDEX_UNREFERENCED_FILE_CLEANUP, this::setShouldCleanupUnreferencedFiles);
scopedSettings.addSettingsUpdateConsumer(INDEX_APPEND_ONLY, this::setAppendOnly);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, this::setMappingNestedFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, this::setMappingNestedDocsLimit);
Expand Down Expand Up @@ -1900,6 +1915,14 @@ private void setShouldCleanupUnreferencedFiles(boolean shouldCleanupUnreferenced
this.shouldCleanupUnreferencedFiles = shouldCleanupUnreferencedFiles;
}

public boolean isAppendOnly() {
return isAppendOnly;
}

private void setAppendOnly(boolean appendOnly) {
this.isAppendOnly = appendOnly;
}

public long getMappingNestedFieldsLimit() {
return mappingNestedFieldsLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.IndexingRetryException;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.ValidationException;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.LoggerInfoStream;
Expand Down Expand Up @@ -932,7 +934,7 @@ public IndexResult index(Index index) throws IOException {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translogManager.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && indexResult.getFailure() != null && !(indexResult.getFailure() instanceof IndexingRetryException)) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
Expand All @@ -955,7 +957,7 @@ public IndexResult index(Index index) throws IOException {
);
}
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
if (indexResult.getTranslogLocation() == null) {
if (indexResult.getTranslogLocation() == null && !(indexResult.getFailure() != null && (indexResult.getFailure() instanceof IndexingRetryException))) {
// the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
Expand Down Expand Up @@ -1049,7 +1051,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
} else {
versionMap.enforceSafeAccess();
// resolves incoming version
final VersionValue versionValue = resolveDocVersion(index, index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
final VersionValue versionValue = resolveDocVersion(index, true);
final long currentVersion;
final boolean currentNotFoundOrDeleted;
if (versionValue == null) {
Expand Down Expand Up @@ -1092,6 +1094,17 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
if (reserveError != null) {
plan = IndexingStrategy.failAsTooManyDocs(reserveError);
} else if (currentVersion >= 1 && engineConfig.getIndexSettings().isAppendOnly()) {
if (index.isRetry()) {
IndexingRetryException retryException = new IndexingRetryException("Indexing operation retried for append only indices");
final IndexResult result = new IndexResult(retryException, currentVersion, versionValue.term, versionValue.seqNo);
plan = IndexingStrategy.failAsIndexAppendOnly(result, currentVersion, 0);
} else {
ValidationException validationException = new ValidationException();
validationException.addValidationError("Operation [" + index.operationType() + "] is not allowed for append only indices");
final IndexResult result = new IndexResult(validationException, Versions.NOT_FOUND);
plan = IndexingStrategy.failAsIndexAppendOnly(result, Versions.NOT_FOUND, 0);
}
} else {
plan = IndexingStrategy.processNormally(
currentNotFoundOrDeleted,
Expand Down Expand Up @@ -1283,6 +1296,10 @@ static IndexingStrategy failAsTooManyDocs(Exception e) {
final IndexResult result = new IndexResult(e, Versions.NOT_FOUND);
return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result);
}

static IndexingStrategy failAsIndexAppendOnly(IndexResult result, long versionForIndexing, int reservedDocs) {
return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, result);
}
}

/**
Expand Down

0 comments on commit 5334da3

Please sign in to comment.