diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml index 991504b27f65f..af3204ed443ab 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml @@ -748,3 +748,43 @@ teardown: indices.delete: index: .fs-logs-foobar-* - is_true: acknowledged + +--- +"Version conflicts are not redirected to failure store": + - requires: + cluster_features: ["gte_v8.16.0"] + reason: "Redirecting version conflicts to the failure store is considered a bug fixed in 8.16" + test_runner_features: [allowed_warnings, contains] + + - do: + allowed_warnings: + - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" + indices.put_index_template: + name: generic_logs_template + body: + index_patterns: logs-* + data_stream: + failure_store: true + template: + settings: + number_of_shards: 1 + number_of_replicas: 1 + mappings: + properties: + '@timestamp': + type: date + count: + type: long + + - do: + bulk: + refresh: true + body: + - '{ "create": { "_index": "logs-foobar", "_id": "1" } }' + - '{ "@timestamp": "2022-01-01", "baz": "quick", "a": "brown", "b": "fox" }' + - '{ "create": { "_index": "logs-foobar", "_id": "1" } }' + - '{ "@timestamp": "2022-01-01", "baz": "lazy", "a": "dog" }' + - is_true: errors + - match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { items.1.create.status: 409 } + - match: { items.1.create.error.type: version_conflict_engine_exception} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 813203afe42c5..64e0b80aca74a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -44,6 +44,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.node.NodeClosedException; @@ -478,7 +479,8 @@ private void completeShardOperation() { } private void processFailure(BulkItemRequest bulkItemRequest, Exception cause) { - var errorType = ElasticsearchException.getExceptionName(ExceptionsHelper.unwrapCause(cause)); + var error = ExceptionsHelper.unwrapCause(cause); + var errorType = ElasticsearchException.getExceptionName(error); DocWriteRequest docWriteRequest = bulkItemRequest.request(); DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, getClusterState().metadata()); // If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if @@ -486,7 +488,9 @@ private void processFailure(BulkItemRequest bulkItemRequest, Exception cause) { if (failureStoreCandidate != null) { // Do not redirect documents to a failure store that were already headed to one. var isFailureStoreDoc = docWriteRequest instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore(); - if (isFailureStoreDoc == false && failureStoreCandidate.isFailureStoreEnabled()) { + if (isFailureStoreDoc == false + && failureStoreCandidate.isFailureStoreEnabled() + && error instanceof VersionConflictEngineException == false) { // Redirect to failure store. maybeMarkFailureStoreForRollover(failureStoreCandidate); addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName());