diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 4963c9f03b40c..30f8f2aa702ab 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -75,7 +75,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation private final static String OP_TYPE_UPDATE = "update"; private final static String OP_TYPE_DELETE = "delete"; - private static final String ACTION_NAME = BulkAction.NAME + "/shard"; + public static final String ACTION_NAME = BulkAction.NAME + "/shard"; private final MappingUpdatedAction mappingUpdatedAction; private final UpdateHelper updateHelper; diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 78548ef2f76c0..61e6a69c3e403 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -468,6 +468,7 @@ public void handleException(TransportException exp) { if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || retryPrimaryException(exp)) { primaryOperationStarted.set(false); + request.setCanHaveDuplicates(); // we already marked it as started when we executed it (removed the listener) so pass false // to re-add to the cluster listener logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index ec88ba95fa07a..9af316fea5be8 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -454,6 +454,17 @@ private void innerCreateNoLock(Create create, IndexWriter writer, long currentVe // #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't // conflict, so we must also update here on the replica to remain consistent: doUpdate = true; + } else if (create.origin() == Operation.Origin.PRIMARY && create.autoGeneratedId() && create.canHaveDuplicates() && currentVersion == 1 && create.version() == Versions.MATCH_ANY) { + /** + * If bulk index request fails due to a disconnect, unavailable shard etc. then the request is + * retried before it actually fails. However, the documents might already be indexed. + * For autogenerated ids this means that a version conflict will be reported in the bulk request + * although the document was indexed properly. + * To avoid this we have to make sure that the index request is treated as an update and set updatedVersion to 1. + * See also discussion on https://github.com/elasticsearch/elasticsearch/pull/9125 + */ + doUpdate = true; + updatedVersion = 1; } else { // On primary, we throw DAEE if the _uid is already in the index with an older version: assert create.origin() == Operation.Origin.PRIMARY; diff --git a/src/test/java/org/elasticsearch/index/store/ExceptionRetryTests.java b/src/test/java/org/elasticsearch/index/store/ExceptionRetryTests.java new file mode 100644 index 0000000000000..971bc62994550 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/store/ExceptionRetryTests.java @@ -0,0 +1,135 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.store; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.*; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE) +public class ExceptionRetryTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + .put(super.nodeSettings(nodeOrdinal)).put("gateway.type", "local") + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) + .build(); + } + + /** + * Tests retry mechanism when indexing. If an exception occurs when indexing then the indexing request is tried again before finally failing. + * If auto generated ids are used this must not lead to duplicate ids + * see https://github.com/elasticsearch/elasticsearch/issues/8788 + */ + @Test + public void testRetryDueToExceptionOnNetworkLayer() throws ExecutionException, InterruptedException, IOException { + final AtomicBoolean exceptionThrown = new AtomicBoolean(false); + int numDocs = scaledRandomIntBetween(100, 1000); + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); + NodeStats unluckyNode = randomFrom(nodeStats.getNodes()); + assertAcked(client().admin().indices().prepareCreate("index")); + ensureGreen("index"); + + //create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry. + for (NodeStats dataNode : nodeStats.getNodes()) { + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); + mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + super.sendRequest(node, requestId, action, request, options); + if (action.equals(TransportShardBulkAction.ACTION_NAME) && !exceptionThrown.get()) { + logger.debug("Throw ConnectTransportException"); + exceptionThrown.set(true); + throw new ConnectTransportException(node, action); + } + } + }); + } + + BulkRequestBuilder bulkBuilder = client().prepareBulk(); + for (int i = 0; i < numDocs; i++) { + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client().prepareIndex("index", "type").setSource(doc)); + } + + BulkResponse response = bulkBuilder.get(); + if (response.hasFailures()) { + for (BulkItemResponse singleIndexRespons : response.getItems()) { + if (singleIndexRespons.isFailed()) { + fail("None of the bulk items should fail but got " + singleIndexRespons.getFailureMessage()); + } + } + } + + refresh(); + SearchResponse searchResponse = client().prepareSearch("index").setSize(numDocs * 2).addField("_id").get(); + + Set uniqueIds = new HashSet(); + long dupCounter = 0; + boolean found_duplicate_already = false; + for (int i = 0; i < searchResponse.getHits().getHits().length; i++) { + if (!uniqueIds.add(searchResponse.getHits().getHits()[i].getId())) { + if (!found_duplicate_already) { + SearchResponse dupIdResponse = client().prepareSearch("index").setQuery(termQuery("_id", searchResponse.getHits().getHits()[i].getId())).setExplain(true).get(); + assertThat(dupIdResponse.getHits().totalHits(), greaterThan(1l)); + logger.info("found a duplicate id:"); + for (SearchHit hit : dupIdResponse.getHits()) { + logger.info("Doc {} was found on shard {}", hit.getId(), hit.getShard().getShardId()); + } + logger.info("will not print anymore in case more duplicates are found."); + found_duplicate_already = true; + } + dupCounter++; + } + } + assertSearchResponse(searchResponse); + assertThat(dupCounter, equalTo(0l)); + assertHitCount(searchResponse, numDocs); + } +}