Skip to content

Commit

Permalink
Add BulkProcessor methods with XContentType parameter (#23078)
Browse files Browse the repository at this point in the history
This commit adds methods to the BulkProcessor that accept bytes and a XContentType to avoid content type detection. The
methods that do not accept XContentType with bytes have been deprecated by this commit.

Relates #22691
  • Loading branch information
jaymode committed Feb 10, 2017
1 parent 685fc0f commit 85acad8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.Closeable;
import java.util.Objects;
Expand Down Expand Up @@ -288,16 +289,46 @@ private synchronized void internalAdd(DocWriteRequest request, @Nullable Object
executeIfNeeded();
}

/**
* Adds the data from the bytes to be processed by the bulk processor
* @deprecated use {@link #add(BytesReference, String, String, XContentType)} instead to avoid content type auto-detection
*/
@Deprecated
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
return add(data, defaultIndex, defaultType, null, null);
}

public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultPipeline, @Nullable Object payload) throws Exception {
/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
XContentType xContentType) throws Exception {
return add(data, defaultIndex, defaultType, null, null, xContentType);
}

/**
* Adds the data from the bytes to be processed by the bulk processor
* @deprecated use {@link #add(BytesReference, String, String, String, Object, XContentType)} instead to avoid content type
* auto-detection
*/
@Deprecated
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true);
executeIfNeeded();
return this;
}

/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
executeIfNeeded();
return this;
}

private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.MockTransportClient;
Expand All @@ -54,7 +57,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class BulkProcessorIT extends ESIntegTestCase {
public void testThatBulkProcessorCountIsCorrect() throws InterruptedException {
public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);

Expand All @@ -77,7 +80,7 @@ public void testThatBulkProcessorCountIsCorrect() throws InterruptedException {
}
}

public void testBulkProcessorFlush() throws InterruptedException {
public void testBulkProcessorFlush() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);

Expand Down Expand Up @@ -296,11 +299,18 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
}

private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) {
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) throws Exception {
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
for (int i = 1; i <= numDocs; i++) {
processor.add(new IndexRequest("test", "test", Integer.toString(i))
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
if (randomBoolean()) {
processor.add(new IndexRequest("test", "test", Integer.toString(i))
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
+ JsonXContent.contentBuilder()
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject().string() + "\n";
processor.add(new BytesArray(source), null, null, XContentType.JSON);
}
multiGetRequestBuilder.add("test", "test", Integer.toString(i));
}
return multiGetRequestBuilder;
Expand All @@ -313,7 +323,8 @@ private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("test"));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat(bulkItemResponse.isFailed(), equalTo(false));
assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
bulkItemResponse.isFailed(), equalTo(false));
}
}

Expand Down

0 comments on commit 85acad8

Please sign in to comment.