Skip to content

Commit

Permalink
Do not send version conflicts to failure store (elastic#112537)
Browse files Browse the repository at this point in the history
When indexing to a data stream with a failure store it's possible to get
a version conflict. The reproduction path is the following:

```
PUT /_bulk
{"create":{"_index": "my-ds-with-fs", "_id": "1"}}
{"@timestamp": "2022-01-01", "baz": "quick", "a": "brown", "b": "fox"}
{"create":{"_index": "my-ds-with-fs", "_id": "1"}}
{"@timestamp": "2022-01-01", "baz": "lazy", "a": "dog"}
```

We would like the second document to not be sent to the failure store
and return an error to the user:

```
{
  "errors" : true,
  "took" : 409,
  "items" : [
    {
      "create" : {
        "_index" : ".ds-my-ds-with-fs-xxxxx-xxxx",
        "_id" : "1",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    },
    {
      "create" : {
        "_index" : ".ds-my-ds-with-fs-xxxxx-xxxx",
        "_id" : "1",
        "status" : 409,
        "error" : {
          "type" : "version_conflict_engine_exception",
          "reason" : "[1]: version conflict, document already exists (current version [1])",
          "index_uuid" : ".....",
          "shard" : "0",
          "index" : ".ds-my-ds-with-fs-xxxxx-xxxx"
        }
      }
    }
  ]
}
```

The version conflict doc is counted as a rejected doc in APM telemetry.
  • Loading branch information
gmarouli authored Sep 9, 2024
1 parent a587c7d commit a43ffd5
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -748,3 +748,43 @@ teardown:
indices.delete:
index: .fs-logs-foobar-*
- is_true: acknowledged

---
"Version conflicts are not redirected to failure store":
- requires:
cluster_features: ["gte_v8.16.0"]
reason: "Redirecting version conflicts to the failure store is considered a bug fixed in 8.16"
test_runner_features: [allowed_warnings, contains]

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
template:
settings:
number_of_shards: 1
number_of_replicas: 1
mappings:
properties:
'@timestamp':
type: date
count:
type: long

- do:
bulk:
refresh: true
body:
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
- '{ "@timestamp": "2022-01-01", "baz": "quick", "a": "brown", "b": "fox" }'
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
- '{ "@timestamp": "2022-01-01", "baz": "lazy", "a": "dog" }'
- is_true: errors
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.1.create.status: 409 }
- match: { items.1.create.error.type: version_conflict_engine_exception}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.node.NodeClosedException;
Expand Down Expand Up @@ -478,15 +479,18 @@ private void completeShardOperation() {
}

private void processFailure(BulkItemRequest bulkItemRequest, Exception cause) {
var errorType = ElasticsearchException.getExceptionName(ExceptionsHelper.unwrapCause(cause));
var error = ExceptionsHelper.unwrapCause(cause);
var errorType = ElasticsearchException.getExceptionName(error);
DocWriteRequest<?> docWriteRequest = bulkItemRequest.request();
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, getClusterState().metadata());
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
// it has the failure store enabled.
if (failureStoreCandidate != null) {
// Do not redirect documents to a failure store that were already headed to one.
var isFailureStoreDoc = docWriteRequest instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore();
if (isFailureStoreDoc == false && failureStoreCandidate.isFailureStoreEnabled()) {
if (isFailureStoreDoc == false
&& failureStoreCandidate.isFailureStoreEnabled()
&& error instanceof VersionConflictEngineException == false) {
// Redirect to failure store.
maybeMarkFailureStoreForRollover(failureStoreCandidate);
addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName());
Expand Down

0 comments on commit a43ffd5

Please sign in to comment.