Skip to content

Commit

Permalink
IndexAction to return DocWriteResponse (elastic#99964)
Browse files Browse the repository at this point in the history
IndexRequests can sometimes return an UpdateResponse rather than an IndexResponse if
there is an ingest pipeline that drops documents. This commit changes IndexAction to
return their common superclass DocWriteResponse.
  • Loading branch information
romseygeek authored Oct 2, 2023
1 parent f4a26f3 commit c24cc0f
Show file tree
Hide file tree
Showing 111 changed files with 338 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
Expand Down Expand Up @@ -48,7 +49,6 @@
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -297,7 +297,7 @@ public void testOtherWriteOps() throws Exception {
{
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
DocWriteResponse indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), backingIndexEqualTo(dataStreamName, 1));
}
{
Expand Down Expand Up @@ -1176,7 +1176,7 @@ public void testIndexDocsWithCustomRoutingTargetingDataStreamIsNotAllowed() thro
String dataStream = "logs-foobar";
IndexRequest indexRequest = new IndexRequest(dataStream).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
DocWriteResponse indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), backingIndexEqualTo(dataStream, 1));

// Index doc with custom routing that targets the data stream
Expand Down Expand Up @@ -1238,7 +1238,7 @@ public void testIndexDocsWithCustomRoutingAllowed() throws Exception {
IndexRequest indexRequest = new IndexRequest(dataStream).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE)
.routing("custom");
IndexResponse indexResponse = client().index(indexRequest).actionGet();
DocWriteResponse indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), backingIndexEqualTo(dataStream, 1));
// Index doc with custom routing that targets the data stream
IndexRequest indexRequestWithRouting = new IndexRequest(dataStream).source("@timestamp", System.currentTimeMillis())
Expand Down Expand Up @@ -1266,7 +1266,7 @@ public void testIndexDocsWithCustomRoutingTargetingBackingIndex() throws Excepti
// Index doc that triggers creation of a data stream
IndexRequest indexRequest = new IndexRequest("logs-foobar").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
DocWriteResponse indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), backingIndexEqualTo("logs-foobar", 1));
String backingIndex = indexResponse.getIndex();

Expand All @@ -1277,7 +1277,7 @@ public void testIndexDocsWithCustomRoutingTargetingBackingIndex() throws Excepti
.id(indexResponse.getId())
.setIfPrimaryTerm(indexResponse.getPrimaryTerm())
.setIfSeqNo(indexResponse.getSeqNo());
IndexResponse response = client().index(indexRequestWithRouting).actionGet();
DocWriteResponse response = client().index(indexRequestWithRouting).actionGet();
assertThat(response.getIndex(), equalTo(backingIndex));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -116,7 +115,10 @@ public void setup() throws Exception {
ds2BackingIndexName = dsBackingIndexName.replace("-ds-", "-ds2-");
otherDs2BackingIndexName = otherDsBackingIndexName.replace("-other-ds-", "-other-ds2-");

IndexResponse indexResponse = client.prepareIndex("ds").setOpType(DocWriteRequest.OpType.CREATE).setSource(DOCUMENT_SOURCE).get();
DocWriteResponse indexResponse = client.prepareIndex("ds")
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(DOCUMENT_SOURCE)
.get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
id = indexResponse.getId();

Expand Down Expand Up @@ -297,7 +299,7 @@ public void testSnapshotAndRestoreInPlace() {
}

public void testSnapshotAndRestoreAllIncludeSpecificDataStream() throws Exception {
IndexResponse indexResponse = client.prepareIndex("other-ds")
DocWriteResponse indexResponse = client.prepareIndex("other-ds")
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(DOCUMENT_SOURCE)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void testSystemDataStreamInGlobalState() throws Exception {
}

// Index a doc so that a concrete backing index will be created
IndexResponse indexRepsonse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME)
DocWriteResponse indexRepsonse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME)
.setId("42")
.setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON)
.setOpType(DocWriteRequest.OpType.CREATE)
Expand Down Expand Up @@ -162,7 +162,7 @@ public void testSystemDataStreamInFeatureState() throws Exception {
}

// Index a doc so that a concrete backing index will be created
IndexResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME)
DocWriteResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME)
.setId("42")
.setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON)
.setOpType(DocWriteRequest.OpType.CREATE)
Expand All @@ -171,7 +171,7 @@ public void testSystemDataStreamInFeatureState() throws Exception {
assertThat(indexToDataStreamResponse.status().getStatus(), oneOf(200, 201));

// Index a doc so that a concrete backing index will be created
IndexResponse indexResponse = client().prepareIndex("my-index")
DocWriteResponse indexResponse = client().prepareIndex("my-index")
.setId("42")
.setSource("{ \"name\": \"my-name\" }", XContentType.JSON)
.setOpType(DocWriteRequest.OpType.CREATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -58,7 +58,7 @@ protected Collection<Class<? extends Plugin>> getPlugins() {

public void testGetTimestampFieldTypeForTsdbDataStream() throws IOException {
createTemplate(true);
IndexResponse indexResponse = indexDoc();
DocWriteResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
Expand All @@ -67,14 +67,14 @@ public void testGetTimestampFieldTypeForTsdbDataStream() throws IOException {

public void testGetTimestampFieldTypeForDataStream() throws IOException {
createTemplate(false);
IndexResponse indexResponse = indexDoc();
DocWriteResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
assertThat(result, nullValue());
}

private IndexResponse indexDoc() {
private DocWriteResponse indexDoc() {
Instant time = Instant.now();
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -321,7 +320,7 @@ public boolean validateClusterForming() {
);

// but this one should pass since it has a longer timeout
final PlainActionFuture<IndexResponse> future = new PlainActionFuture<>();
final PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
client().prepareIndex("index")
.setId("passes1")
.setSource("x", 2)
Expand All @@ -333,7 +332,7 @@ public boolean validateClusterForming() {
internalCluster().startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "1"));
ensureYellow("index");

final IndexResponse indexResponse = future.actionGet(timeout);
final DocWriteResponse indexResponse = future.actionGet(timeout);
assertThat(indexResponse.status(), equalTo(RestStatus.CREATED));
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package org.elasticsearch.ingest.geoip;

import org.apache.lucene.util.Constants;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void testLazyLoading() throws IOException {
final IndexRequest indexRequest = new IndexRequest("index");
indexRequest.setPipeline("geoip");
indexRequest.source(Collections.singletonMap("ip", "1.1.1.1"));
final IndexResponse indexResponse = client(ingestNode).index(indexRequest).actionGet();
final DocWriteResponse indexResponse = client(ingestNode).index(indexRequest).actionGet();
assertThat(indexResponse.status(), equalTo(RestStatus.CREATED));
// now the geo-IP database should be loaded on the ingest node
assertDatabaseLoadStatus(ingestNode, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void testIndexChunks() throws IOException {

AtomicInteger chunkIndex = new AtomicInteger();

client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener<IndexResponse> listener) -> {
client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener<DocWriteResponse> listener) -> {
int chunk = chunkIndex.getAndIncrement();
assertEquals(OpType.CREATE, request.opType());
assertThat(request.id(), Matchers.startsWith("test_" + (chunk + 15) + "_"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -30,9 +29,9 @@ public void testThreadedListeners() throws Throwable {
request.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
}

client.index(request, new ActionListener<IndexResponse>() {
client.index(request, new ActionListener<DocWriteResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
public void onResponse(DocWriteResponse indexResponse) {
threadName.set(Thread.currentThread().getName());
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
Expand All @@ -25,7 +26,6 @@
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTransportService;
Expand Down Expand Up @@ -454,7 +454,7 @@ public void waitForTaskCompletion(Task task) {}
}
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
index = new Thread(() -> {
IndexResponse indexResponse = client().prepareIndex("test").setSource("test", "test").get();
DocWriteResponse indexResponse = client().prepareIndex("test").setSource("test", "test").get();
assertArrayEquals(ReplicationResponse.NO_FAILURES, indexResponse.getShardInfo().getFailures());
});
index.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
Expand Down Expand Up @@ -117,7 +116,7 @@ public void testWriteToAliasPrimaryAutoCreatedFirst() throws Exception {
client().execute(AutoCreateAction.INSTANCE, request).get();
}

IndexResponse response = client().prepareIndex(INDEX_NAME).setSource("{\"foo\":\"bar\"}", XContentType.JSON).get();
DocWriteResponse response = client().prepareIndex(INDEX_NAME).setSource("{\"foo\":\"bar\"}", XContentType.JSON).get();
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
}

Expand All @@ -136,7 +135,7 @@ public void testWriteToAliasSecondaryAutoCreatedFirst() throws Exception {
client().execute(AutoCreateAction.INSTANCE, request).get();
}

IndexResponse response = client().prepareIndex(INDEX_NAME).setSource("{\"foo\":\"bar\"}", XContentType.JSON).get();
DocWriteResponse response = client().prepareIndex(INDEX_NAME).setSource("{\"foo\":\"bar\"}", XContentType.JSON).get();
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationRequest;
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testDeleteIndexWhileIndexing() throws Exception {
while (stopped.get() == false && docID.get() < 5000) {
String id = Integer.toString(docID.incrementAndGet());
try {
IndexResponse response = client().prepareIndex(index)
DocWriteResponse response = client().prepareIndex(index)
.setId(id)
.setSource(Map.of("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -697,7 +696,7 @@ public void testNoopUpdate() {
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
internalCluster().ensureAtLeastNumDataNodes(2);
ensureGreen(indexName);
IndexResponse doc = index(indexName, "1", Map.of("user", "xyz"));
DocWriteResponse doc = index(indexName, "1", Map.of("user", "xyz"));
assertThat(doc.getShardInfo().getSuccessful(), equalTo(2));
final BulkResponse bulkResponse = client().prepareBulk()
.add(new UpdateRequest().index(indexName).id("1").detectNoop(true).doc("user", "xyz")) // noop update
Expand Down
Loading

0 comments on commit c24cc0f

Please sign in to comment.