From 4f1f1c11268d18d2045723f51915210fbca1bdd9 Mon Sep 17 00:00:00 2001 From: Evan Galpin Date: Thu, 27 May 2021 09:21:07 -0400 Subject: [PATCH] [BEAM-12093] Overhaul ElasticsearchIO.Write (#14347) Presently, the Write transform has 2 responsibilities: -Convert input documents into Bulk API entities, serializing based on user settings (partial update, delete, upsert, etc) -> DocToBulk -Batch the converted Bulk API entities together and interface with the target ES cluster -> BulkIO This change aims to separate these 2 responsibilities into discrete PTransforms to allow for greater flexibility while also maintaining the convenience of the Write transform to perform both document conversion and IO serially. --- CHANGES.md | 2 + .../io/elasticsearch/ElasticsearchIOIT.java | 26 + .../io/elasticsearch/ElasticsearchIOTest.java | 23 + .../elasticsearch-tests-5/build.gradle | 2 + .../io/elasticsearch/ElasticsearchIOIT.java | 32 + .../io/elasticsearch/ElasticsearchIOTest.java | 31 + .../elasticsearch-tests-6/build.gradle | 2 + .../io/elasticsearch/ElasticsearchIOIT.java | 32 + .../io/elasticsearch/ElasticsearchIOTest.java | 31 + .../elasticsearch-tests-7/build.gradle | 2 + .../io/elasticsearch/ElasticsearchIOIT.java | 32 + .../io/elasticsearch/ElasticsearchIOTest.java | 31 + .../ElasticsearchIOTestCommon.java | 320 ++++- .../ElasticsearchIOTestUtils.java | 171 ++- sdks/java/io/elasticsearch/OWNERS | 1 + .../sdk/io/elasticsearch/ElasticsearchIO.java | 1277 +++++++++++++---- 16 files changed, 1691 insertions(+), 324 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8586fabd2132..343d0f050e53 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -93,6 +93,8 @@ ## I/Os * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* Allow splitting apart document serialization and IO for ElasticsearchIO +* Support Bulk API request size optimization through addition of ElasticsearchIO.Write.withStatefulBatches ## New Features / Improvements diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index 6c8aa3cb7613..3daa97790584 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -103,6 +103,15 @@ public void testWriteVolume() throws Exception { elasticsearchIOTestCommonWrite.testWrite(); } + @Test + public void testWriteVolumeStateful() throws Exception { + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = + new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonWrite.setPipeline(pipeline); + elasticsearchIOTestCommonWrite.testWriteStateful(); + } + @Test public void testSizesVolume() throws Exception { elasticsearchIOTestCommon.testSizes(); @@ -123,6 +132,23 @@ public void testWriteVolumeWithFullAddressing() throws Exception { elasticsearchIOTestCommonWrite.testWriteWithFullAddressing(); } + @Test + public void testWriteWithAllowableErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + /** * This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned * and then a new field is added to each document using a partial update. The test then asserts diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index d1b43f04bed1..9036bf9c5e22 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -201,6 +201,29 @@ public void testWritePartialUpdate() throws Exception { elasticsearchIOTestCommon.testWritePartialUpdate(); } + @Test + public void testWriteWithAllowedErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + + @Test + public void testMaxParallelRequestsPerWindow() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow(); + } + @Test public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/build.gradle b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/build.gradle index 718b9037be0b..1d55bf592df1 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/build.gradle +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/build.gradle @@ -48,6 +48,8 @@ dependencies { testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime") testCompile "org.elasticsearch.test:framework:$elastic_search_version" testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version" + // TODO(egalpin): Remove painless plugin in favour of containerized tests + testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.0" testCompile "org.elasticsearch:elasticsearch:$elastic_search_version" diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index c9032dd3a15f..9503853071fb 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception { elasticsearchIOTestCommonWrite.testWrite(); } + @Test + public void testWriteVolumeStateful() throws Exception { + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = + new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonWrite.setPipeline(pipeline); + elasticsearchIOTestCommonWrite.testWriteStateful(); + } + @Test public void testSizesVolume() throws Exception { elasticsearchIOTestCommon.testSizes(); @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception { elasticsearchIOTestCommonWrite.testWriteWithFullAddressing(); } + @Test + public void testWriteWithAllowableErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteScriptedUpsert() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteScriptedUpsert(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + /** * This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned * and then a new field is added to each document using a partial update. The test then asserts diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index e675d4357cb0..8a82db910250 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -30,6 +30,7 @@ import java.util.Collection; import org.apache.beam.sdk.testing.TestPipeline; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.painless.PainlessPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.Netty4Plugin; @@ -85,6 +86,7 @@ public Settings indexSettings() { protected Collection> nodePlugins() { ArrayList> plugins = new ArrayList<>(); plugins.add(Netty4Plugin.class); + plugins.add(PainlessPlugin.class); return plugins; } @@ -199,6 +201,35 @@ public void testWritePartialUpdate() throws Exception { elasticsearchIOTestCommon.testWritePartialUpdate(); } + @Test + public void testWriteWithAllowableErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteScriptedUpsert() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteScriptedUpsert(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + + @Test + public void testMaxParallelRequestsPerWindow() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow(); + } + @Test public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle index 6eee3a06b461..3144cff0815f 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle @@ -48,6 +48,8 @@ dependencies { testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime") testCompile "org.elasticsearch.test:framework:$elastic_search_version" testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version" + // TODO(egalpin): Remove painless plugin in favour of containerized tests + testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.2" testCompile "org.elasticsearch:elasticsearch:$elastic_search_version" diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index f9ae9f6a530e..18bca065b641 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception { elasticsearchIOTestCommonWrite.testWrite(); } + @Test + public void testWriteVolumeStateful() throws Exception { + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = + new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonWrite.setPipeline(pipeline); + elasticsearchIOTestCommonWrite.testWriteStateful(); + } + @Test public void testSizesVolume() throws Exception { elasticsearchIOTestCommon.testSizes(); @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception { elasticsearchIOTestCommonWrite.testWriteWithFullAddressing(); } + @Test + public void testWriteWithAllowableErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteScriptedUpsert() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteScriptedUpsert(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + /** * This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned * and then a new field is added to each document using a partial update. The test then asserts diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 0de739868d72..fa194a13c07e 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -30,6 +30,7 @@ import java.util.Collection; import org.apache.beam.sdk.testing.TestPipeline; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.painless.PainlessPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.Netty4Plugin; @@ -85,6 +86,7 @@ public Settings indexSettings() { protected Collection> nodePlugins() { ArrayList> plugins = new ArrayList<>(); plugins.add(Netty4Plugin.class); + plugins.add(PainlessPlugin.class); return plugins; } @@ -193,6 +195,35 @@ public void testWritePartialUpdate() throws Exception { elasticsearchIOTestCommon.testWritePartialUpdate(); } + @Test + public void testWriteWithAllowableErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteScriptedUpsert() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteScriptedUpsert(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + + @Test + public void testMaxParallelRequestsPerWindow() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow(); + } + @Test public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/build.gradle b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/build.gradle index ead50d7324f1..1cc9d0155cbf 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/build.gradle +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/build.gradle @@ -48,6 +48,8 @@ dependencies { testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime") testCompile "org.elasticsearch.test:framework:$elastic_search_version" testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version" + // TODO(egalpin): Remove painless plugin in favour of containerized tests + testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.7.5" testCompile "org.elasticsearch:elasticsearch:$elastic_search_version" diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index ce4a041b8d8a..8e4e76ab1824 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception { elasticsearchIOTestCommonWrite.testWrite(); } + @Test + public void testWriteVolumeStateful() throws Exception { + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = + new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonWrite.setPipeline(pipeline); + elasticsearchIOTestCommonWrite.testWriteStateful(); + } + @Test public void testSizesVolume() throws Exception { elasticsearchIOTestCommon.testSizes(); @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception { elasticsearchIOTestCommonWrite.testWriteWithFullAddressing(); } + @Test + public void testWriteWithAllowableErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteScriptedUpsert() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteScriptedUpsert(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + /** * This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned * and then a new field is added to each document using a partial update. The test then asserts diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index e9dceb7d8468..b2a272011922 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -30,6 +30,7 @@ import java.util.Collection; import org.apache.beam.sdk.testing.TestPipeline; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.painless.PainlessPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.Netty4Plugin; @@ -89,6 +90,7 @@ public Settings indexSettings() { protected Collection> nodePlugins() { ArrayList> plugins = new ArrayList<>(); plugins.add(Netty4Plugin.class); + plugins.add(PainlessPlugin.class); return plugins; } @@ -197,6 +199,35 @@ public void testWritePartialUpdate() throws Exception { elasticsearchIOTestCommon.testWritePartialUpdate(); } + @Test + public void testWriteWithAllowableErrors() throws Exception { + elasticsearchIOTestCommon.testWriteWithAllowedErrors(); + } + + @Test + public void testWriteWithRouting() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithRouting(); + } + + @Test + public void testWriteScriptedUpsert() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteScriptedUpsert(); + } + + @Test + public void testWriteWithDocVersion() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithDocVersion(); + } + + @Test + public void testMaxParallelRequestsPerWindow() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow(); + } + @Test public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index 2f3219f0570d..291a0a487be5 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -18,20 +18,27 @@ package org.apache.beam.sdk.io.elasticsearch; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BulkIO; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.DocToBulk; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.FAMOUS_SCIENTISTS; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.NUM_SCIENTISTS; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.SCRIPT_SOURCE; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByMatch; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByScientistName; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.insertTestDocuments; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshAllIndices; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshIndexAndGetCurrentNumDocs; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.apache.beam.sdk.values.TypeDescriptors.integers; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -39,6 +46,7 @@ import static org.junit.Assert.fail; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -46,8 +54,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.BiFunction; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BulkIO.StatefulBatching; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate; import org.apache.beam.sdk.options.PipelineOptions; @@ -59,7 +69,9 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; @@ -68,6 +80,10 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.hamcrest.CustomMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.joda.time.Duration; import org.junit.rules.ExpectedException; import org.slf4j.Logger; @@ -257,6 +273,25 @@ void testWrite() throws Exception { executeWriteTest(write); } + void testWriteStateful() throws Exception { + Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withUseStatefulBatches(true); + executeWriteTest(write); + } + + List serializeDocs(ElasticsearchIO.Write write, List jsonDocs) + throws IOException { + List serializedInput = new ArrayList<>(); + for (String doc : jsonDocs) { + serializedInput.add( + DocToBulk.createBulkApiEntity( + write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration))); + } + return serializedInput; + } + void testWriteWithErrors() throws Exception { Write write = ElasticsearchIO.write() @@ -265,6 +300,7 @@ void testWriteWithErrors() throws Exception { List input = ElasticsearchIOTestUtils.createDocuments( numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS); + expectedException.expect(isA(IOException.class)); expectedException.expectMessage( new CustomMatcher("RegExp matcher") { @@ -284,11 +320,32 @@ public boolean matches(Object o) { + "Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*"); } }); + // write bundles size is the runner decision, we cannot force a bundle size, // so we test the Writer as a DoFn outside of a runner. - try (DoFnTester fnTester = DoFnTester.of(new Write.WriteFn(write))) { + try (DoFnTester fnTester = + DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) { // inserts into Elasticsearch - fnTester.processBundle(input); + fnTester.processBundle(serializeDocs(write, input)); + } + } + + void testWriteWithAllowedErrors() throws Exception { + Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withMaxBatchSize(BATCH_SIZE) + .withAllowableResponseErrors(Collections.singleton("json_parse_exception")); + List input = + ElasticsearchIOTestUtils.createDocuments( + numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS); + + // write bundles size is the runner decision, we cannot force a bundle size, + // so we test the Writer as a DoFn outside of a runner. + try (DoFnTester fnTester = + DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) { + // inserts into Elasticsearch + fnTester.processBundle(serializeDocs(write, input)); } } @@ -297,15 +354,24 @@ void testWriteWithMaxBatchSize() throws Exception { ElasticsearchIO.write() .withConnectionConfiguration(connectionConfiguration) .withMaxBatchSize(BATCH_SIZE); + // write bundles size is the runner decision, we cannot force a bundle size, // so we test the Writer as a DoFn outside of a runner. - try (DoFnTester fnTester = DoFnTester.of(new Write.WriteFn(write))) { + try (DoFnTester fnTester = + DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) { List input = ElasticsearchIOTestUtils.createDocuments( numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + + List serializedInput = new ArrayList<>(); + for (String doc : input) { + serializedInput.add( + DocToBulk.createBulkApiEntity( + write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration))); + } long numDocsProcessed = 0; long numDocsInserted = 0; - for (String document : input) { + for (String document : serializedInput) { fnTester.processElement(document); numDocsProcessed++; // test every 100 docs to avoid overloading ES @@ -340,15 +406,22 @@ void testWriteWithMaxBatchSizeBytes() throws Exception { .withMaxBatchSizeBytes(BATCH_SIZE_BYTES); // write bundles size is the runner decision, we cannot force a bundle size, // so we test the Writer as a DoFn outside of a runner. - try (DoFnTester fnTester = DoFnTester.of(new Write.WriteFn(write))) { + try (DoFnTester fnTester = + DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) { List input = ElasticsearchIOTestUtils.createDocuments( numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + List serializedInput = new ArrayList<>(); + for (String doc : input) { + serializedInput.add( + DocToBulk.createBulkApiEntity( + write.getDocToBulk(), doc, getBackendVersion(connectionConfiguration))); + } long numDocsProcessed = 0; long sizeProcessed = 0; long numDocsInserted = 0; long batchInserted = 0; - for (String document : input) { + for (String document : serializedInput) { fnTester.processElement(document); numDocsProcessed++; sizeProcessed += document.getBytes(StandardCharsets.UTF_8).length; @@ -411,7 +484,7 @@ void testWriteWithIdFn() throws Exception { long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); assertEquals(NUM_SCIENTISTS, currentNumDocs); - int count = countByScientistName(connectionConfiguration, restClient, "Einstein"); + int count = countByScientistName(connectionConfiguration, restClient, "Einstein", null); assertEquals(1, count); } @@ -532,6 +605,34 @@ void testWriteWithFullAddressing() throws Exception { } } + /** + * Tests that documents are correctly routed when routingFn function is provided to overwrite the + * defaults of using the configuration and auto-generation of the document IDs by Elasticsearch. + * The scientist name is used for routing. As a result there should be numDocs/NUM_SCIENTISTS in + * each index. + */ + void testWriteWithRouting() throws Exception { + List data = + ElasticsearchIOTestUtils.createDocuments( + numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + pipeline + .apply(Create.of(data)) + .apply( + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withRoutingFn(new ExtractValueFn("scientist"))); + pipeline.run(); + + refreshAllIndices(restClient); + for (String scientist : FAMOUS_SCIENTISTS) { + Map urlParams = Collections.singletonMap("routing", scientist); + + assertEquals( + numDocs / NUM_SCIENTISTS, + countByScientistName(connectionConfiguration, restClient, scientist, urlParams)); + } + } + /** * Tests partial updates by adding a group field to each document in the standard test set. The * group field is populated as the modulo 2 of the document id allowing for a test to ensure the @@ -568,11 +669,198 @@ void testWritePartialUpdate() throws Exception { assertEquals(numDocs, currentNumDocs); assertEquals( numDocs / NUM_SCIENTISTS, - countByScientistName(connectionConfiguration, restClient, "Einstein")); + countByScientistName(connectionConfiguration, restClient, "Einstein", null)); // Partial update assertions - assertEquals(numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "0")); - assertEquals(numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "1")); + assertEquals( + numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "0", null, null)); + assertEquals( + numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "1", null, null)); + } + + void testWriteWithDocVersion() throws Exception { + List jsonData = + ElasticsearchIOTestUtils.createJsonDocuments( + numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + + List data = new ArrayList<>(); + for (ObjectNode doc : jsonData) { + doc.put("my_version", "1"); + data.add(doc.toString()); + } + + insertTestDocuments(connectionConfiguration, data, restClient); + long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); + assertEquals(numDocs, currentNumDocs); + // Check that all docs have the same "my_version" + assertEquals( + numDocs, + countByMatch( + connectionConfiguration, restClient, "my_version", "1", null, KV.of(1, numDocs))); + + Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withIdFn(new ExtractValueFn("id")) + .withDocVersionFn(new ExtractValueFn("my_version")) + .withDocVersionType("external"); + + data = new ArrayList<>(); + for (ObjectNode doc : jsonData) { + // Set version to larger number than originally set, and larger than next logical version + // number set by default by ES. + doc.put("my_version", "3"); + data.add(doc.toString()); + } + + // Test that documents with lower version are rejected, but rejections ignored when specified + pipeline.apply(Create.of(data)).apply(write); + pipeline.run(); + + currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); + assertEquals(numDocs, currentNumDocs); + + // my_version and doc version should have changed + assertEquals( + 0, + countByMatch( + connectionConfiguration, restClient, "my_version", "1", null, KV.of(1, numDocs))); + assertEquals( + numDocs, + countByMatch( + connectionConfiguration, restClient, "my_version", "3", null, KV.of(3, numDocs))); + } + + /** + * Tests upsert script by adding a group field to each document in the standard test set. The + * group field is populated as the modulo 2 of the document id allowing for a test to ensure the + * documents are split into 2 groups. + */ + void testWriteScriptedUpsert() throws Exception { + List data = + ElasticsearchIOTestUtils.createDocuments( + numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + + Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withIdFn(new ExtractValueFn("id")) + .withUpsertScript(SCRIPT_SOURCE); + + // Test that documents can be inserted/created by using withUpsertScript + pipeline.apply(Create.of(data)).apply(write); + pipeline.run(); + + // defensive coding to ensure our initial state is as expected + long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); + // check we have not unwittingly modified existing behaviour + assertEquals(numDocs, currentNumDocs); + assertEquals( + numDocs / NUM_SCIENTISTS, + countByScientistName(connectionConfiguration, restClient, "Einstein", null)); + + // All docs should have have group = 0 added by the script upon creation + assertEquals( + numDocs, countByMatch(connectionConfiguration, restClient, "group", "0", null, null)); + + // Run the same data again. This time, because all docs exist in the index already, scripted + // updates should happen rather than scripted inserts. + pipeline.apply(Create.of(data)).apply(write); + pipeline.run(); + + currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); + + // check we have not unwittingly modified existing behaviour + assertEquals(numDocs, currentNumDocs); + assertEquals( + numDocs / NUM_SCIENTISTS, + countByScientistName(connectionConfiguration, restClient, "Einstein", null)); + + // The script will set either 0 or 1 for the group value on update operations + assertEquals( + numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "0", null, null)); + assertEquals( + numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "1", null, null)); + } + + void testMaxParallelRequestsPerWindow() throws Exception { + List data = + ElasticsearchIOTestUtils.createDocuments( + numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + + Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withMaxParallelRequestsPerWindow(1); + + PCollection>> batches = + pipeline.apply(Create.of(data)).apply(StatefulBatching.fromSpec(write.getBulkIO())); + + PCollection keyValues = + batches.apply( + MapElements.into(integers()) + .via((SerializableFunction>, Integer>) KV::getKey)); + + // Number of unique keys produced should be number of maxParallelRequestsPerWindow * numWindows + // There is only 1 request (key) per window, and 1 (global) window ie. one key total where + // key value is 0 + PAssert.that(keyValues).containsInAnyOrder(0); + + PAssert.that(batches).satisfies(new AssertThatHasExpectedContents(0, data)); + + pipeline.run(); + } + + private static class AssertThatHasExpectedContents + implements SerializableFunction>>, Void> { + + private final int key; + private final List expectedContents; + + AssertThatHasExpectedContents(int key, List expected) { + this.key = key; + this.expectedContents = expected; + } + + @Override + public Void apply(Iterable>> actual) { + assertThat( + actual, + IsIterableContainingInAnyOrder.containsInAnyOrder( + KvMatcher.isKv( + is(key), + IsIterableContainingInAnyOrder.containsInAnyOrder(expectedContents.toArray())))); + return null; + } + } + + public static class KvMatcher extends TypeSafeMatcher> { + final Matcher keyMatcher; + final Matcher valueMatcher; + + public static KvMatcher isKv(Matcher keyMatcher, Matcher valueMatcher) { + return new KvMatcher<>(keyMatcher, valueMatcher); + } + + public KvMatcher(Matcher keyMatcher, Matcher valueMatcher) { + this.keyMatcher = keyMatcher; + this.valueMatcher = valueMatcher; + } + + @Override + public boolean matchesSafely(KV kv) { + return keyMatcher.matches(kv.getKey()) && valueMatcher.matches(kv.getValue()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("a KV(") + .appendValue(keyMatcher) + .appendText(", ") + .appendValue(valueMatcher) + .appendText(")"); + } } /** @@ -627,7 +915,7 @@ void testWriteRetry() throws Throwable { // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and // retry started. expectedException.expectMessage( - String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG, EXPECTED_RETRIES)); + String.format(ElasticsearchIO.BulkIO.RETRY_FAILED_LOG, EXPECTED_RETRIES)); ElasticsearchIO.Write write = ElasticsearchIO.write() @@ -660,7 +948,7 @@ private void executeWriteTest(ElasticsearchIO.Write write) throws Exception { long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); assertEquals(numDocs, currentNumDocs); - int count = countByScientistName(connectionConfiguration, restClient, "Einstein"); + int count = countByScientistName(connectionConfiguration, restClient, "Einstein", null); assertEquals(numDocs / NUM_SCIENTISTS, count); } @@ -699,11 +987,11 @@ void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception { // check we have not unwittingly modified existing behaviour assertEquals( numDocs / NUM_SCIENTISTS, - countByScientistName(connectionConfiguration, restClient, "Einstein")); + countByScientistName(connectionConfiguration, restClient, "Einstein", null)); // Check if documents are deleted as expected assertEquals(numDocs / 2, currentNumDocs); - assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin")); + assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin", null)); } /** @@ -741,10 +1029,10 @@ void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception { // check we have not unwittingly modified existing behaviour assertEquals( numDocs / NUM_SCIENTISTS, - countByScientistName(connectionConfiguration, restClient, "Einstein")); + countByScientistName(connectionConfiguration, restClient, "Einstein", null)); // Check if documents are deleted as expected assertEquals(numDocs / 2, currentNumDocs); - assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin")); + assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin", null)); } } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java index cc5a18e513c1..7a5585371cdc 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java @@ -21,14 +21,20 @@ import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.values.KV; import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; +import org.checkerframework.checker.nullness.qual.Nullable; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; @@ -47,7 +53,11 @@ class ElasticsearchIOTestUtils { "Galilei", "Maxwell" }; + static final ObjectMapper MAPPER = new ObjectMapper(); static final int NUM_SCIENTISTS = FAMOUS_SCIENTISTS.length; + static final String SCRIPT_SOURCE = + "if(ctx._source.group != null) { ctx._source.group = params.id % 2 } else { ctx._source" + + ".group = 0 }"; /** Enumeration that specifies whether to insert malformed documents. */ public enum InjectionMode { @@ -101,11 +111,8 @@ static void copyIndex(RestClient restClient, String source, String target) throw /** Inserts the given number of test documents into Elasticsearch. */ static void insertTestDocuments( - ConnectionConfiguration connectionConfiguration, long numDocs, RestClient restClient) + ConnectionConfiguration connectionConfiguration, List data, RestClient restClient) throws IOException { - List data = - ElasticsearchIOTestUtils.createDocuments( - numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); StringBuilder bulkRequest = new StringBuilder(); int i = 0; for (String document : data) { @@ -126,9 +133,24 @@ static void insertTestDocuments( request.addParameters(Collections.singletonMap("refresh", "wait_for")); request.setEntity(requestBody); Response response = restClient.performRequest(request); - ElasticsearchIO.checkForErrors( - response.getEntity(), ElasticsearchIO.getBackendVersion(connectionConfiguration), false); + ElasticsearchIO.checkForErrors(response.getEntity(), Collections.emptySet()); } + + /** Inserts the given number of test documents into Elasticsearch. */ + static void insertTestDocuments( + ConnectionConfiguration connectionConfiguration, long numDocs, RestClient restClient) + throws IOException { + List data = + ElasticsearchIOTestUtils.createDocuments( + numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + insertTestDocuments(connectionConfiguration, data, restClient); + } + + static void refreshAllIndices(RestClient restClient) throws IOException { + Request request = new Request("POST", "/_refresh"); + restClient.performRequest(request); + } + /** * Forces a refresh of the given index to make recently inserted documents available for search * using the index and type named in the connectionConfiguration. @@ -144,31 +166,66 @@ static long refreshIndexAndGetCurrentNumDocs( restClient, connectionConfiguration.getIndex(), connectionConfiguration.getType(), - getBackendVersion(connectionConfiguration)); + getBackendVersion(connectionConfiguration), + null); } + /** + * Forces a refresh of the given index to make recently inserted documents available for search + * using the index and type named in the connectionConfiguration. + * + * @param connectionConfiguration providing the index and type + * @param restClient To use for issuing queries + * @param urlParams Optional key/value pairs describing URL params for ES APIs + * @return The number of docs in the index + * @throws IOException On error communicating with Elasticsearch + */ + static long refreshIndexAndGetCurrentNumDocs( + ConnectionConfiguration connectionConfiguration, + RestClient restClient, + @Nullable Map urlParams) + throws IOException { + return refreshIndexAndGetCurrentNumDocs( + restClient, + connectionConfiguration.getIndex(), + connectionConfiguration.getType(), + getBackendVersion(connectionConfiguration), + urlParams); + } + + static long refreshIndexAndGetCurrentNumDocs( + RestClient restClient, String index, String type, int backendVersion) throws IOException { + return refreshIndexAndGetCurrentNumDocs(restClient, index, type, backendVersion, null); + } /** * Forces a refresh of the given index to make recently inserted documents available for search. * * @param restClient To use for issuing queries * @param index The Elasticsearch index * @param type The Elasticsearch type + * @param urlParams Optional key/value pairs describing URL params for ES APIs * @return The number of docs in the index * @throws IOException On error communicating with Elasticsearch */ static long refreshIndexAndGetCurrentNumDocs( - RestClient restClient, String index, String type, int backenVersion) throws IOException { + RestClient restClient, + String index, + String type, + int backendVersion, + @Nullable Map urlParams) + throws IOException { long result = 0; try { - String endPoint = String.format("/%s/_refresh", index); - Request request = new Request("POST", endPoint); - restClient.performRequest(request); + refreshAllIndices(restClient); - endPoint = String.format("/%s/%s/_search", index, type); - request = new Request("GET", endPoint); + String endPoint = generateSearchPath(index, type); + Request request = new Request("GET", endPoint); + if (urlParams != null) { + request.addParameters(urlParams); + } Response response = restClient.performRequest(request); JsonNode searchResult = ElasticsearchIO.parseResponse(response.getEntity()); - if (backenVersion >= 7) { + if (backendVersion >= 7) { result = searchResult.path("hits").path("total").path("value").asLong(); } else { result = searchResult.path("hits").path("total").asLong(); @@ -207,19 +264,67 @@ static List createDocuments(long numDocs, InjectionMode injectionMode) { return data; } + static List createJsonDocuments(long numDocs, InjectionMode injectionMode) + throws JsonProcessingException { + List stringData = createDocuments(numDocs, injectionMode); + List data = new ArrayList<>(); + + for (String doc : stringData) { + data.add((ObjectNode) MAPPER.readTree(doc)); + } + return data; + } + /** * Executes a query for the named scientist and returns the count from the result. * * @param connectionConfiguration Specifies the index and type * @param restClient To use to execute the call * @param scientistName The scientist to query for + * @param urlParams Optional key/value pairs describing URL params for ES APIs * @return The count of documents found * @throws IOException On error talking to Elasticsearch */ static int countByScientistName( - ConnectionConfiguration connectionConfiguration, RestClient restClient, String scientistName) + ConnectionConfiguration connectionConfiguration, + RestClient restClient, + String scientistName, + @Nullable Map urlParams) throws IOException { - return countByMatch(connectionConfiguration, restClient, "scientist", scientistName); + return countByMatch( + connectionConfiguration, restClient, "scientist", scientistName, urlParams, null); + } + + /** + * Creates a _search API path depending on ConnectionConfiguration and url params. + * + * @param index Optional Elasticsearch index + * @param type Optional Elasticsearch type + * @return The _search endpoint for the provided settings. + */ + static String generateSearchPath(@Nullable String index, @Nullable String type) { + StringBuilder sb = new StringBuilder(); + if (index != null) { + sb.append("/").append(index); + } + if (type != null) { + sb.append("/").append(type); + } + + sb.append("/_search"); + + return sb.toString(); + } + + /** + * Creates a _search API path depending on ConnectionConfiguration and url params. + * + * @param connectionConfiguration Specifies the index and type + * @return The _search endpoint for the provided settings. + */ + static String generateSearchPath(ConnectionConfiguration connectionConfiguration) { + return generateSearchPath( + connectionConfiguration.getIndex(), connectionConfiguration.getType()); } /** @@ -229,6 +334,8 @@ static int countByScientistName( * @param restClient To use to execute the call * @param field The field to query * @param value The value to match + * @param urlParams Optional key/value pairs describing URL params for ES APIs + * @param versionNumberCountPair Optional pair of [version_number, expected_num_doc_with_version] * @return The count of documents in the search result * @throws IOException On error communicating with Elasticsearch */ @@ -236,10 +343,18 @@ static int countByMatch( ConnectionConfiguration connectionConfiguration, RestClient restClient, String field, - String value) + String value, + @Nullable Map urlParams, + @Nullable KV versionNumberCountPair) throws IOException { + String size = + versionNumberCountPair == null ? "10" : versionNumberCountPair.getValue().toString(); String requestBody = "{\n" + + "\"size\": " + + size + + ",\n" + + "\"version\" : true,\n" + " \"query\" : {\"match\": {\n" + " \"" + field @@ -248,17 +363,29 @@ static int countByMatch( + "\"\n" + " }}\n" + "}\n"; - String endPoint = - String.format( - "/%s/%s/_search", - connectionConfiguration.getIndex(), connectionConfiguration.getType()); + + String endPoint = generateSearchPath(connectionConfiguration); HttpEntity httpEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON); Request request = new Request("GET", endPoint); - request.addParameters(Collections.emptyMap()); request.setEntity(httpEntity); + if (urlParams != null) { + request.addParameters(urlParams); + } + Response response = restClient.performRequest(request); JsonNode searchResult = parseResponse(response.getEntity()); + + if (versionNumberCountPair != null) { + int numHits = 0; + for (JsonNode hit : searchResult.path("hits").path("hits")) { + if (hit.path("_version").asInt() == versionNumberCountPair.getKey()) { + numHits++; + } + } + return numHits; + } + if (getBackendVersion(connectionConfiguration) >= 7) { return searchResult.path("hits").path("total").path("value").asInt(); } else { diff --git a/sdks/java/io/elasticsearch/OWNERS b/sdks/java/io/elasticsearch/OWNERS index c2bb79459d8b..c4a265bcd848 100644 --- a/sdks/java/io/elasticsearch/OWNERS +++ b/sdks/java/io/elasticsearch/OWNERS @@ -1,6 +1,7 @@ # See the OWNERS docs at https://s.apache.org/beam-owners reviewers: + - egalpin - echauchot - jbonofre - timrobertson100 diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index ec88c099a092..465a03d61969 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -32,6 +32,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.security.KeyStore; @@ -39,11 +41,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; import javax.annotation.Nonnull; import javax.net.ssl.SSLContext; @@ -55,19 +60,23 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.http.ConnectionClosedException; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; @@ -75,6 +84,7 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.entity.BufferedHttpEntity; @@ -117,12 +127,48 @@ * *

You can also specify a query on the {@code read()} using {@code withQuery()}. * + *

There are many more configuration options which can be found by looking at the with* methods + * of {@link ElasticsearchIO.Read} + * *

Writing to Elasticsearch

* *

To write documents to Elasticsearch, use {@link ElasticsearchIO#write * ElasticsearchIO.write()}, which writes JSON documents from a {@link PCollection * PCollection<String>} (which can be bounded or unbounded). * + *

{@link ElasticsearchIO.Write} involves 2 discrete steps: + * + *

    + *
  • Converting the input PCollection of valid ES documents into Bulk API directives i.e. Should + * the input document result in: update, insert, delete, with version, with routing, etc (See + * {@link ElasticsearchIO.DocToBulk}) + *
  • Batching Bulk API directives together and interfacing with an Elasticsearch cluster. (See + * {@link ElasticsearchIO.BulkIO}) + *
+ * + *

In most cases, using {@link ElasticsearchIO#write} will be desirable. In some cases, one may + * want to use {@link ElasticsearchIO.DocToBulk} and {@link ElasticsearchIO.BulkIO} directly. Such + * cases might include: + * + *

    + *
  • Unit testing. Ensure that output Bulk API entities for a given set of inputs will produce + * an expected result, without the need for an available Elasticsearch cluster. See {@link + * ElasticsearchIO.Write#docToBulk} + *
  • Flexible options for data backup. Serialized Bulk API entities can be forked and sent to + * both Elasticsearch and a data lake. + *
  • Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would + * require duplicate computation. + *
  • Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk + * API directives based on multiple input types, and then "fan-in" all serialized Bulk + * directives into a single BulkIO transform to improve batching semantics. + *
  • Decoupled jobs. Job(s) could be made to produce Bulk directives and then publish them to a + * message bus. A distinct job could consume from that message bus and solely be responsible + * for IO with the target cluster(s). + *
+ * + *

Note that configurations options for {@link ElasticsearchIO.Write} are a union of + * configutation options for {@link ElasticsearchIO.DocToBulk} and {@link ElasticsearchIO.BulkIO}. + * *

To configure {@link ElasticsearchIO#write ElasticsearchIO.write()}, similar to the read, you * have to provide a connection configuration. For instance: * @@ -135,25 +181,8 @@ * * } * - *

Optionally, you can provide {@code withBatchSize()} and {@code withBatchSizeBytes()} to - * specify the size of the write batch in number of documents or in bytes. - * - *

Optionally, you can provide an {@link ElasticsearchIO.Write.FieldValueExtractFn} using {@code - * withIdFn()} that will be run to extract the id value out of the provided document rather than - * using the document id auto-generated by Elasticsearch. - * - *

Optionally, you can provide {@link ElasticsearchIO.Write.FieldValueExtractFn} using {@code - * withIndexFn()} or {@code withTypeFn()} to enable per-document routing to the target Elasticsearch - * index (all versions) and type (version > 6). Support for type routing was removed in - * Elasticsearch 6 (see - * https://www.elastic.co/blog/index-type-parent-child-join-now-future-in-elasticsearch) - * - *

When {withUsePartialUpdate()} is enabled, the input document must contain an id field and - * {@code withIdFn()} must be used to allow its extraction by the ElasticsearchIO. - * - *

Optionally, {@code withSocketTimeout()} can be used to override the default retry timeout and - * socket timeout of 30000ms. {@code withConnectTimeout()} can be used to override the default - * connect timeout of 1000ms. + *

There are many more configuration options which can be found by looking at the with* methods + * of {@link ElasticsearchIO.Write} */ @Experimental(Kind.SOURCE_SINK) @SuppressWarnings({ @@ -161,10 +190,16 @@ }) public class ElasticsearchIO { + private static final List VALID_CLUSTER_VERSIONS = Arrays.asList(2, 5, 6, 7); + private static final List VERSION_TYPES = + Arrays.asList("internal", "external", "external_gt", "external_gte"); + private static final String VERSION_CONFLICT_ERROR = "version_conflict_engine_exception"; + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIO.class); public static Read read() { - // default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read calls + // default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read + // calls // default batchSize to 100 as recommended by ES dev team as a safe value when dealing // with big documents and still a good compromise for performances return new AutoValue_ElasticsearchIO_Read.Builder() @@ -174,16 +209,27 @@ public static Read read() { .build(); } - public static Write write() { - return new AutoValue_ElasticsearchIO_Write.Builder() + public static DocToBulk docToBulk() { + return new AutoValue_ElasticsearchIO_DocToBulk.Builder() + .setUsePartialUpdate(false) // default is document upsert + .build(); + } + + public static BulkIO bulkIO() { + return new AutoValue_ElasticsearchIO_BulkIO.Builder() // advised default starting batch size in ES docs .setMaxBatchSize(1000L) // advised default starting batch size in ES docs .setMaxBatchSizeBytes(5L * 1024L * 1024L) - .setUsePartialUpdate(false) // default is document upsert + .setUseStatefulBatches(false) + .setMaxParallelRequestsPerWindow(1) .build(); } + public static Write write() { + return new Write(); + } + private ElasticsearchIO() {} private static final ObjectMapper mapper = new ObjectMapper(); @@ -193,44 +239,48 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException { return mapper.readValue(responseEntity.getContent(), JsonNode.class); } - static void checkForErrors(HttpEntity responseEntity, int backendVersion, boolean partialUpdate) + static void checkForErrors(HttpEntity responseEntity, @Nullable Set allowedErrorTypes) throws IOException { + JsonNode searchResult = parseResponse(responseEntity); boolean errors = searchResult.path("errors").asBoolean(); if (errors) { + int numErrors = 0; + StringBuilder errorMessages = new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:"); JsonNode items = searchResult.path("items"); + if (items.isMissingNode() || items.size() == 0) { + errorMessages.append(searchResult.toString()); + } // some items present in bulk might have errors, concatenate error messages for (JsonNode item : items) { - - String errorRootName = ""; - // when use partial update, the response items includes all the update. - if (partialUpdate) { - errorRootName = "update"; - } else { - if (backendVersion == 2) { - errorRootName = "create"; - } else if (backendVersion >= 5) { - errorRootName = "index"; - } - } - JsonNode errorRoot = item.path(errorRootName); - JsonNode error = errorRoot.get("error"); + JsonNode error = item.findValue("error"); if (error != null) { + // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed. String type = error.path("type").asText(); String reason = error.path("reason").asText(); - String docId = errorRoot.path("_id").asText(); - errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type)); - JsonNode causedBy = error.get("caused_by"); - if (causedBy != null) { - String cbReason = causedBy.path("reason").asText(); - String cbType = causedBy.path("type").asText(); - errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType)); + String docId = item.findValue("_id").asText(); + JsonNode causedBy = error.path("caused_by"); // May not be present + String cbReason = causedBy.path("reason").asText(); + String cbType = causedBy.path("type").asText(); + + if (allowedErrorTypes == null + || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) { + // 'error' and 'causedBy` fields are not null, and the error is not being ignored. + numErrors++; + + errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type)); + + if (!causedBy.isMissingNode()) { + errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType)); + } } } } - throw new IOException(errorMessages.toString()); + if (numErrors > 0) { + throw new IOException(errorMessages.toString()); + } } } @@ -314,6 +364,73 @@ public static ConnectionConfiguration create(String[] addresses, String index, S .build(); } + /** + * Creates a new Elasticsearch connection configuration with no default type. + * + * @param addresses list of addresses of Elasticsearch nodes + * @param index the index toward which the requests will be issued + * @return the connection configuration object + */ + public static ConnectionConfiguration create(String[] addresses, String index) { + checkArgument(addresses != null, "addresses can not be null"); + checkArgument(addresses.length > 0, "addresses can not be empty"); + checkArgument(index != null, "index can not be null"); + return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder() + .setAddresses(Arrays.asList(addresses)) + .setIndex(index) + .setType("") + .setTrustSelfSignedCerts(false) + .build(); + } + + /** + * Creates a new Elasticsearch connection configuration with no default index nor type. + * + * @param addresses list of addresses of Elasticsearch nodes + * @return the connection configuration object + */ + public static ConnectionConfiguration create(String[] addresses) { + checkArgument(addresses != null, "addresses can not be null"); + checkArgument(addresses.length > 0, "addresses can not be empty"); + return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder() + .setAddresses(Arrays.asList(addresses)) + .setIndex("") + .setType("") + .setTrustSelfSignedCerts(false) + .build(); + } + + /** + * Generates the bulk API endpoint based on the set values. + * + *

Based on ConnectionConfiguration constructors, we know that one of the following is true: + * + *

    + *
  • index and type are non-empty strings + *
  • index is non-empty string, type is empty string + *
  • index and type are empty string + *
+ * + *

Valid endpoints therefore include: + * + *

    + *
  • /_bulk + *
  • /index_name/_bulk + *
  • /index_name/type_name/_bulk + *
+ */ + public String getBulkEndPoint() { + StringBuilder sb = new StringBuilder(); + if (!Strings.isNullOrEmpty(getIndex())) { + sb.append("/").append(getIndex()); + } + if (!Strings.isNullOrEmpty(getType())) { + sb.append("/").append(getType()); + } + sb.append("/").append("_bulk"); + return sb.toString(); + } + /** * If Elasticsearch authentication is enabled, provide the username. * @@ -1073,37 +1190,41 @@ public boolean test(HttpEntity responseEntity) { } } - /** A {@link PTransform} writing data to Elasticsearch. */ + /** A {@link PTransform} converting docs to their Bulk API counterparts. */ @AutoValue - public abstract static class Write extends PTransform, PDone> { + public abstract static class DocToBulk + extends PTransform, PCollection> { - /** - * Interface allowing a specific field value to be returned from a parsed JSON document. This is - * used for using explicit document ids, and for dynamic routing (index/Type) on a document - * basis. A null response will result in default behaviour and an exception will be propagated - * as a failure. - */ - public interface FieldValueExtractFn extends SerializableFunction {} + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race conditions on updates - public interface BooleanFieldValueExtractFn extends SerializableFunction {} + static { + SimpleModule module = new SimpleModule(); + module.addSerializer(DocumentMetadata.class, new DocumentMetadataSerializer()); + OBJECT_MAPPER.registerModule(module); + } abstract @Nullable ConnectionConfiguration getConnectionConfiguration(); - abstract long getMaxBatchSize(); + abstract Write.@Nullable FieldValueExtractFn getIdFn(); - abstract long getMaxBatchSizeBytes(); + abstract Write.@Nullable FieldValueExtractFn getIndexFn(); - abstract @Nullable FieldValueExtractFn getIdFn(); + abstract Write.@Nullable FieldValueExtractFn getRoutingFn(); - abstract @Nullable FieldValueExtractFn getIndexFn(); + abstract Write.@Nullable FieldValueExtractFn getTypeFn(); - abstract @Nullable FieldValueExtractFn getTypeFn(); + abstract Write.@Nullable FieldValueExtractFn getDocVersionFn(); - abstract @Nullable RetryConfiguration getRetryConfiguration(); + abstract @Nullable String getDocVersionType(); + + abstract @Nullable String getUpsertScript(); - abstract boolean getUsePartialUpdate(); + abstract @Nullable Boolean getUsePartialUpdate(); - abstract @Nullable BooleanFieldValueExtractFn getIsDeleteFn(); + abstract Write.@Nullable BooleanFieldValueExtractFn getIsDeleteFn(); + + abstract @Nullable Integer getBackendVersion(); abstract Builder builder(); @@ -1111,77 +1232,50 @@ public interface BooleanFieldValueExtractFn extends SerializableFunction 0, "batchSize must be > 0, but was %s", batchSize); - return builder().setMaxBatchSize(batchSize).build(); - } - - /** - * Provide a maximum size in bytes for the batch see bulk API - * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 5MB - * (like Elasticsearch bulk size advice). See - * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the - * execution engine, size of bundles may vary, this sets the maximum size. Change this if you - * need to have smaller Elasticsearch bulks. - * - * @param batchSizeBytes maximum batch size in bytes - * @return the {@link Write} with connection batch size in bytes set - */ - public Write withMaxBatchSizeBytes(long batchSizeBytes) { - checkArgument(batchSizeBytes > 0, "batchSizeBytes must be > 0, but was %s", batchSizeBytes); - return builder().setMaxBatchSizeBytes(batchSizeBytes).build(); - } - /** * Provide a function to extract the id from the document. This id will be used as the document * id in Elasticsearch. Should the function throw an Exception then the batch will fail and the * exception propagated. * * @param idFn to extract the document ID - * @return the {@link Write} with the function set + * @return the {@link DocToBulk} with the function set */ - public Write withIdFn(FieldValueExtractFn idFn) { + public DocToBulk withIdFn(Write.FieldValueExtractFn idFn) { checkArgument(idFn != null, "idFn must not be null"); return builder().setIdFn(idFn).build(); } @@ -1192,13 +1286,26 @@ public Write withIdFn(FieldValueExtractFn idFn) { * exception propagated. * * @param indexFn to extract the destination index from - * @return the {@link Write} with the function set + * @return the {@link DocToBulk} with the function set */ - public Write withIndexFn(FieldValueExtractFn indexFn) { + public DocToBulk withIndexFn(Write.FieldValueExtractFn indexFn) { checkArgument(indexFn != null, "indexFn must not be null"); return builder().setIndexFn(indexFn).build(); } + /** + * Provide a function to extract the target routing from the document allowing for dynamic + * document routing. Should the function throw an Exception then the batch will fail and the + * exception propagated. + * + * @param routingFn to extract the destination index from + * @return the {@link DocToBulk} with the function set + */ + public DocToBulk withRoutingFn(Write.FieldValueExtractFn routingFn) { + checkArgument(routingFn != null, "routingFn must not be null"); + return builder().setRoutingFn(routingFn).build(); + } + /** * Provide a function to extract the target type from the document allowing for dynamic document * routing. Should the function throw an Exception then the batch will fail and the exception @@ -1208,9 +1315,9 @@ public Write withIndexFn(FieldValueExtractFn indexFn) { * discussed in this blog. * * @param typeFn to extract the destination index from - * @return the {@link Write} with the function set + * @return the {@link DocToBulk} with the function set */ - public Write withTypeFn(FieldValueExtractFn typeFn) { + public DocToBulk withTypeFn(Write.FieldValueExtractFn typeFn) { checkArgument(typeFn != null, "typeFn must not be null"); return builder().setTypeFn(typeFn).build(); } @@ -1220,12 +1327,536 @@ public Write withTypeFn(FieldValueExtractFn typeFn) { * Elasticsearch. * * @param usePartialUpdate set to true to issue partial updates - * @return the {@link Write} with the partial update control set + * @return the {@link DocToBulk} with the partial update control set */ - public Write withUsePartialUpdate(boolean usePartialUpdate) { + public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) { return builder().setUsePartialUpdate(usePartialUpdate).build(); } + /** + * Whether to use scripted updates and what script to use. + * + * @param source set to the value of the script source, painless lang + * @return the {@link DocToBulk} with the scripted updates set + */ + public DocToBulk withUpsertScript(String source) { + if (getBackendVersion() == null || getBackendVersion() == 2) { + LOG.warn("Painless scripts are not supported on Elasticsearch clusters before version 5.0"); + } + return builder().setUsePartialUpdate(false).setUpsertScript(source).build(); + } + + /** + * Provide a function to extract the doc version from the document. This version number will be + * used as the document version in Elasticsearch. Should the function throw an Exception then + * the batch will fail and the exception propagated. Incompatible with update operations and + * should only be used with withUsePartialUpdate(false) + * + * @param docVersionFn to extract the document version + * @return the {@link DocToBulk} with the function set + */ + public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) { + checkArgument(docVersionFn != null, "docVersionFn must not be null"); + return builder().setDocVersionFn(docVersionFn).build(); + } + + /** + * Provide a function to extract the target operation either upsert or delete from the document + * fields allowing dynamic bulk operation decision. While using withIsDeleteFn, it should be + * taken care that the document's id extraction is defined using the withIdFn function or else + * IllegalArgumentException is thrown. Should the function throw an Exception then the batch + * will fail and the exception propagated. + * + * @param isDeleteFn set to true for deleting the specific document + * @return the {@link Write} with the function set + */ + public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) { + checkArgument(isDeleteFn != null, "deleteFn is required"); + return builder().setIsDeleteFn(isDeleteFn).build(); + } + + /** + * Provide a function to extract the doc version from the document. This version number will be + * used as the document version in Elasticsearch. Should the function throw an Exception then + * the batch will fail and the exception propagated. Incompatible with update operations and + * should only be used with withUsePartialUpdate(false) + * + * @param docVersionType the version type to use, one of {@value VERSION_TYPES} + * @return the {@link DocToBulk} with the doc version type set + */ + public DocToBulk withDocVersionType(String docVersionType) { + checkArgument( + VERSION_TYPES.contains(docVersionType), + "docVersionType must be one of " + "%s", + String.join(", ", VERSION_TYPES)); + return builder().setDocVersionType(docVersionType).build(); + } + + /** + * Use to set explicitly which version of Elasticsearch the destination cluster is running. + * Providing this hint means there is no need for setting {@link + * DocToBulk#withConnectionConfiguration}. This can also be very useful for testing purposes. + * + *

Note: if the value of @param backendVersion differs from the version the destination + * cluster is running, behavior is undefined and likely to yield errors. + * + * @param backendVersion the major version number of the version of Elasticsearch being run in + * the cluster where documents will be indexed. + * @return the {@link DocToBulk} with the Elasticsearch major version number set + */ + public DocToBulk withBackendVersion(int backendVersion) { + checkArgument( + VALID_CLUSTER_VERSIONS.contains(backendVersion), + "Backend version may only be one of " + "%s", + String.join(", ", VERSION_TYPES)); + return builder().setBackendVersion(backendVersion).build(); + } + + @Override + public PCollection expand(PCollection docs) { + ConnectionConfiguration connectionConfiguration = getConnectionConfiguration(); + Integer backendVersion = getBackendVersion(); + Write.FieldValueExtractFn idFn = getIdFn(); + Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn(); + checkState( + (backendVersion != null || connectionConfiguration != null), + "withBackendVersion() or withConnectionConfiguration() is required"); + checkArgument( + isDeleteFn == null || idFn != null, + "Id needs to be specified by withIdFn for delete operation"); + + return docs.apply(ParDo.of(new DocToBulkFn(this))); + } + + // Encapsulates the elements which form the metadata for an Elasticsearch bulk operation + private static class DocumentMetadata implements Serializable { + final String index; + final String type; + final String id; + final Integer retryOnConflict; + final String routing; + final Integer backendVersion; + final String version; + final String versionType; + + DocumentMetadata( + String index, + String type, + String id, + Integer retryOnConflict, + String routing, + Integer backendVersion, + String version, + String versionType) { + this.index = index; + this.id = id; + this.type = type; + this.retryOnConflict = retryOnConflict; + this.routing = routing; + this.backendVersion = backendVersion; + this.version = version; + this.versionType = versionType; + } + } + + private static class DocumentMetadataSerializer extends StdSerializer { + private DocumentMetadataSerializer() { + super(DocumentMetadata.class); + } + + @Override + public void serialize(DocumentMetadata value, JsonGenerator gen, SerializerProvider provider) + throws IOException { + gen.writeStartObject(); + if (value.index != null) { + gen.writeStringField("_index", value.index); + } + if (value.type != null) { + gen.writeStringField("_type", value.type); + } + if (value.id != null) { + gen.writeStringField("_id", value.id); + } + if (value.routing != null) { + gen.writeStringField("routing", value.routing); + } + if (value.retryOnConflict != null && value.backendVersion <= 6) { + gen.writeNumberField("_retry_on_conflict", value.retryOnConflict); + } + if (value.retryOnConflict != null && value.backendVersion >= 7) { + gen.writeNumberField("retry_on_conflict", value.retryOnConflict); + } + if (value.version != null) { + gen.writeStringField("version", value.version); + } + if (value.versionType != null) { + gen.writeStringField("version_type", value.versionType); + } + gen.writeEndObject(); + } + } + + @VisibleForTesting + static String createBulkApiEntity(DocToBulk spec, String document, int backendVersion) + throws IOException { + String documentMetadata = "{}"; + boolean isDelete = false; + if (spec.getIndexFn() != null + || spec.getTypeFn() != null + || spec.getIdFn() != null + || spec.getRoutingFn() != null) { + // parse once and reused for efficiency + JsonNode parsedDocument = OBJECT_MAPPER.readTree(document); + documentMetadata = getDocumentMetadata(spec, parsedDocument, backendVersion); + if (spec.getIsDeleteFn() != null) { + isDelete = spec.getIsDeleteFn().apply(parsedDocument); + } + } + + if (isDelete) { + // delete request used for deleting a document + return String.format("{ \"delete\" : %s }%n", documentMetadata); + } else { + // index is an insert/upsert and update is a partial update (or insert if not + // existing) + if (spec.getUsePartialUpdate()) { + return String.format( + "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : true }%n", + documentMetadata, document); + } else if (spec.getUpsertScript() != null) { + return String.format( + "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", " + + "\"params\": %s}, \"upsert\" : %s, \"scripted_upsert\": true}%n", + documentMetadata, spec.getUpsertScript(), document, document); + } else { + return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document); + } + } + } + + private static String lowerCaseOrNull(String input) { + return input == null ? null : input.toLowerCase(); + } + + /** + * Extracts the components that comprise the document address from the document using the {@link + * Write.FieldValueExtractFn} configured. This allows any or all of the index, type and document + * id to be controlled on a per document basis. If none are provided then an empty default of + * {@code {}} is returned. Sanitization of the index is performed, automatically lower-casing + * the value as required by Elasticsearch. + * + * @param parsedDocument the json from which the index, type and id may be extracted + * @return the document address as JSON or the default + * @throws IOException if the document cannot be parsed as JSON + */ + private static String getDocumentMetadata( + DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws IOException { + DocumentMetadata metadata = + new DocumentMetadata( + spec.getIndexFn() != null + ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument)) + : null, + spec.getTypeFn() != null ? spec.getTypeFn().apply(parsedDocument) : null, + spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : null, + (spec.getUsePartialUpdate() + || (spec.getUpsertScript() != null && !spec.getUpsertScript().isEmpty())) + ? DEFAULT_RETRY_ON_CONFLICT + : null, + spec.getRoutingFn() != null ? spec.getRoutingFn().apply(parsedDocument) : null, + backendVersion, + spec.getDocVersionFn() != null ? spec.getDocVersionFn().apply(parsedDocument) : null, + spec.getDocVersionType()); + return OBJECT_MAPPER.writeValueAsString(metadata); + } + + /** {@link DoFn} to for the {@link DocToBulk} transform. */ + @VisibleForTesting + static class DocToBulkFn extends DoFn { + private final DocToBulk spec; + private int backendVersion; + + public DocToBulkFn(DocToBulk spec) { + this.spec = spec; + } + + @Setup + public void setup() throws IOException { + if (spec.getBackendVersion() != null) { + backendVersion = spec.getBackendVersion(); + } else { + backendVersion = ElasticsearchIO.getBackendVersion(spec.getConnectionConfiguration()); + } + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + c.output(createBulkApiEntity(spec, c.element(), backendVersion)); + } + } + } + + /** + * A {@link PTransform} writing data to Elasticsearch. + * + *

This {@link PTransform} acts as a convenience wrapper for doing both document to bulk API + * serialization as well as batching those Bulk API entities and writing them to an Elasticsearch + * cluster. This class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for + * convenience and backward compatibility. + */ + public static class Write extends PTransform, PDone> { + public interface FieldValueExtractFn extends SerializableFunction {} + + public interface BooleanFieldValueExtractFn extends SerializableFunction {} + + private DocToBulk docToBulk = + new AutoValue_ElasticsearchIO_DocToBulk.Builder() + .setUsePartialUpdate(false) // default is document upsert + .build(); + + private BulkIO bulkIO = + new AutoValue_ElasticsearchIO_BulkIO.Builder() + // advised default starting batch size in ES docs + .setMaxBatchSize(1000L) + // advised default starting batch size in ES docs + .setMaxBatchSizeBytes(5L * 1024L * 1024L) + .setUseStatefulBatches(false) + .setMaxParallelRequestsPerWindow(1) + .build(); + + public DocToBulk getDocToBulk() { + return docToBulk; + } + + public BulkIO getBulkIO() { + return bulkIO; + } + + // For building Doc2Bulk + /** Refer to {@link DocToBulk#withIdFn}. */ + public Write withIdFn(FieldValueExtractFn idFn) { + docToBulk = docToBulk.withIdFn(idFn); + return this; + } + + /** Refer to {@link DocToBulk#withIndexFn}. */ + public Write withIndexFn(FieldValueExtractFn indexFn) { + docToBulk = docToBulk.withIndexFn(indexFn); + return this; + } + + /** Refer to {@link DocToBulk#withRoutingFn}. */ + public Write withRoutingFn(FieldValueExtractFn routingFn) { + docToBulk = docToBulk.withRoutingFn(routingFn); + return this; + } + + /** Refer to {@link DocToBulk#withTypeFn}. */ + public Write withTypeFn(FieldValueExtractFn typeFn) { + docToBulk = docToBulk.withTypeFn(typeFn); + return this; + } + + /** Refer to {@link DocToBulk#withDocVersionFn}. */ + public Write withDocVersionFn(FieldValueExtractFn docVersionFn) { + docToBulk = docToBulk.withDocVersionFn(docVersionFn); + return this; + } + + /** Refer to {@link DocToBulk#withDocVersionType}. */ + public Write withDocVersionType(String docVersionType) { + docToBulk = docToBulk.withDocVersionType(docVersionType); + return this; + } + + /** Refer to {@link DocToBulk#withUsePartialUpdate}. */ + public Write withUsePartialUpdate(boolean usePartialUpdate) { + docToBulk = docToBulk.withUsePartialUpdate(usePartialUpdate); + return this; + } + + /** Refer to {@link DocToBulk#withUpsertScript}. */ + public Write withUpsertScript(String source) { + docToBulk = docToBulk.withUpsertScript(source); + return this; + } + + /** Refer to {@link DocToBulk#withBackendVersion}. */ + public Write withBackendVersion(int backendVersion) { + docToBulk = docToBulk.withBackendVersion(backendVersion); + return this; + } + + /** Refer to {@link DocToBulk#withIsDeleteFn}. */ + public Write withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) { + docToBulk = docToBulk.withIsDeleteFn(isDeleteFn); + return this; + } + // End building Doc2Bulk + + /** Refer to {@link BulkIO#withConnectionConfiguration}. */ + public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) { + checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null"); + docToBulk = docToBulk.withConnectionConfiguration(connectionConfiguration); + bulkIO = bulkIO.withConnectionConfiguration(connectionConfiguration); + return this; + } + + /** Refer to {@link BulkIO#withMaxBatchSize}. */ + public Write withMaxBatchSize(long batchSize) { + bulkIO = bulkIO.withMaxBatchSize(batchSize); + return this; + } + + /** Refer to {@link BulkIO#withMaxBatchSizeBytes}. */ + public Write withMaxBatchSizeBytes(long batchSizeBytes) { + bulkIO = bulkIO.withMaxBatchSizeBytes(batchSizeBytes); + return this; + } + + /** Refer to {@link BulkIO#withRetryConfiguration}. */ + public Write withRetryConfiguration(RetryConfiguration retryConfiguration) { + bulkIO = bulkIO.withRetryConfiguration(retryConfiguration); + return this; + } + + /** Refer to {@link BulkIO#withIgnoreVersionConflicts}. */ + public Write withIgnoreVersionConflicts(boolean ignoreVersionConflicts) { + bulkIO = bulkIO.withIgnoreVersionConflicts(ignoreVersionConflicts); + return this; + } + + /** Refer to {@link BulkIO#withUseStatefulBatches}. */ + public Write withUseStatefulBatches(boolean useStatefulBatches) { + bulkIO = bulkIO.withUseStatefulBatches(useStatefulBatches); + return this; + } + + /** Refer to {@link BulkIO#withMaxBufferingDuration}. */ + public Write withMaxBufferingDuration(Duration maxBufferingDuration) { + bulkIO = bulkIO.withMaxBufferingDuration(maxBufferingDuration); + return this; + } + + /** Refer to {@link BulkIO#withMaxParallelRequestsPerWindow}. */ + public Write withMaxParallelRequestsPerWindow(int maxParallelRequestsPerWindow) { + bulkIO = bulkIO.withMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow); + return this; + } + + /** Refer to {@link BulkIO#withAllowableResponseErrors}. */ + public Write withAllowableResponseErrors(@Nullable Set allowableResponseErrors) { + if (allowableResponseErrors == null) { + allowableResponseErrors = new HashSet<>(); + } + + bulkIO = bulkIO.withAllowableResponseErrors(allowableResponseErrors); + return this; + } + + @Override + public PDone expand(PCollection input) { + return input.apply(docToBulk).apply(bulkIO); + } + } + + /** + * A {@link PTransform} writing Bulk API entities created by {@link ElasticsearchIO.DocToBulk} to + * an Elasticsearch cluster. Typically, using {@link ElasticsearchIO.Write} is preferred, whereas + * using {@link ElasticsearchIO.DocToBulk} and BulkIO separately is for advanced use cases such as + * mirroring data to multiple clusters or data lakes without recomputation. + */ + @AutoValue + public abstract static class BulkIO extends PTransform, PDone> { + @VisibleForTesting + static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. Retry attempt[%d]"; + + @VisibleForTesting + static final String RETRY_FAILED_LOG = + "Error writing to ES after %d attempt(s). No more attempts allowed"; + + abstract @Nullable ConnectionConfiguration getConnectionConfiguration(); + + abstract long getMaxBatchSize(); + + abstract long getMaxBatchSizeBytes(); + + abstract @Nullable Duration getMaxBufferingDuration(); + + abstract boolean getUseStatefulBatches(); + + abstract int getMaxParallelRequestsPerWindow(); + + abstract @Nullable RetryConfiguration getRetryConfiguration(); + + abstract @Nullable Set getAllowedResponseErrors(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration); + + abstract Builder setMaxBatchSize(long maxBatchSize); + + abstract Builder setMaxBatchSizeBytes(long maxBatchSizeBytes); + + abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration); + + abstract Builder setAllowedResponseErrors(Set allowedResponseErrors); + + abstract Builder setMaxBufferingDuration(Duration maxBufferingDuration); + + abstract Builder setUseStatefulBatches(boolean useStatefulBatches); + + abstract Builder setMaxParallelRequestsPerWindow(int maxParallelRequestsPerWindow); + + abstract BulkIO build(); + } + + /** + * Provide the Elasticsearch connection configuration object. + * + * @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object + * @return the {@link BulkIO} with connection configuration set + */ + public BulkIO withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) { + checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null"); + + return builder().setConnectionConfiguration(connectionConfiguration).build(); + } + + /** + * Provide a maximum size in number of documents for the batch see bulk API + * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 1000 + * docs (like Elasticsearch bulk size advice). See + * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the + * execution engine, size of bundles may vary, this sets the maximum size. Change this if you + * need to have smaller ElasticSearch bulks. + * + * @param batchSize maximum batch size in number of documents + * @return the {@link BulkIO} with connection batch size set + */ + public BulkIO withMaxBatchSize(long batchSize) { + checkArgument(batchSize > 0, "batchSize must be > 0, but was %s", batchSize); + return builder().setMaxBatchSize(batchSize).build(); + } + + /** + * Provide a maximum size in bytes for the batch see bulk API + * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 5MB + * (like Elasticsearch bulk size advice). See + * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the + * execution engine, size of bundles may vary, this sets the maximum size. Change this if you + * need to have smaller ElasticSearch bulks. + * + * @param batchSizeBytes maximum batch size in bytes + * @return the {@link BulkIO} with connection batch size in bytes set + */ + public BulkIO withMaxBatchSizeBytes(long batchSizeBytes) { + checkArgument(batchSizeBytes > 0, "batchSizeBytes must be > 0, but was %s", batchSizeBytes); + return builder().setMaxBatchSizeBytes(batchSizeBytes).build(); + } + /** * Provides configuration to retry a failed batch call to Elasticsearch. A batch is considered * as failed if the underlying {@link RestClient} surfaces 429 HTTP status code as error for one @@ -1246,88 +1877,208 @@ public Write withUsePartialUpdate(boolean usePartialUpdate) { * } * * @param retryConfiguration the rules which govern the retry behavior - * @return the {@link Write} with retrying configured + * @return the {@link BulkIO} with retrying configured */ - public Write withRetryConfiguration(RetryConfiguration retryConfiguration) { + public BulkIO withRetryConfiguration(RetryConfiguration retryConfiguration) { checkArgument(retryConfiguration != null, "retryConfiguration is required"); return builder().setRetryConfiguration(retryConfiguration).build(); } /** - * Provide a function to extract the target operation either upsert or delete from the document - * fields allowing dynamic bulk operation decision. While using withIsDeleteFn, it should be - * taken care that the document's id extraction is defined using the withIdFn function or else - * IllegalArgumentException is thrown. Should the function throw an Exception then the batch - * will fail and the exception propagated. + * Whether or not to suppress version conflict errors in a Bulk API response. This can be useful + * if your use case involves using external version types. * - * @param isDeleteFn set to true for deleting the specific document - * @return the {@link Write} with the function set + * @param ignoreVersionConflicts true to suppress version conflicts, false to surface version + * conflict errors. + * @return the {@link BulkIO} with version conflict handling configured */ - public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) { - checkArgument(isDeleteFn != null, "deleteFn is required"); - return builder().setIsDeleteFn(isDeleteFn).build(); + public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) { + Set allowedResponseErrors = getAllowedResponseErrors(); + if (allowedResponseErrors == null) { + allowedResponseErrors = new HashSet<>(); + } + if (ignoreVersionConflicts) { + allowedResponseErrors.add(VERSION_CONFLICT_ERROR); + } + + return builder().setAllowedResponseErrors(allowedResponseErrors).build(); + } + + /** + * Provide a set of textual error types which can be contained in Bulk API response + * items[].error.type field. Any element in @param allowableResponseErrorTypes will suppress + * errors of the same type in Bulk responses. + * + *

See also + * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex + * + * @param allowableResponseErrorTypes + * @return the {@link BulkIO} with allowable response errors set + */ + public BulkIO withAllowableResponseErrors(@Nullable Set allowableResponseErrorTypes) { + if (allowableResponseErrorTypes == null) { + allowableResponseErrorTypes = new HashSet<>(); + } + + return builder().setAllowedResponseErrors(allowableResponseErrorTypes).build(); + } + + /** + * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set a maximum elapsed + * time before buffered elements are emitted to Elasticsearch as a Bulk API request. If this + * config is not set, Bulk requests will not be issued until {@link BulkIO#getMaxBatchSize} + * number of documents have been buffered. This may result in higher latency in particular if + * your max batch size is set to a large value and your pipeline input is low volume. + * + * @param maxBufferingDuration the maximum duration to wait before sending any buffered + * documents to Elasticsearch, regardless of maxBatchSize. + * @return the {@link BulkIO} with maximum buffering duration set + */ + public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) { + LOG.warn( + "Use of withMaxBufferingDuration requires withUseStatefulBatches(true). " + + "Setting that automatically."); + return builder() + .setUseStatefulBatches(true) + .setMaxBufferingDuration(maxBufferingDuration) + .build(); + } + + /** + * Whether or not to use Stateful Processing to ensure bulk requests have the desired number of + * entities i.e. as close to the maxBatchSize as possible. By default without this feature + * enabled, Bulk requests will not contain more than maxBatchSize entities, but the lower bound + * of batch size is determined by Beam Runner bundle sizes, which may be as few as 1. + * + * @param useStatefulBatches true enables the use of Stateful Processing to ensure that batches + * are as close to the maxBatchSize as possible. + * @return the {@link BulkIO} with Stateful Processing enabled or disabled + */ + public BulkIO withUseStatefulBatches(boolean useStatefulBatches) { + return builder().setUseStatefulBatches(useStatefulBatches).build(); + } + + /** + * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, states and therefore + * batches are maintained per-key-per-window. BE AWARE that low values for @param + * maxParallelRequestsPerWindow, in particular if the input data has a finite number of windows, + * can reduce parallelism greatly. If data is globally windowed and @param + * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 request in flight. Having + * only a single request in flight can be beneficial for ensuring an Elasticsearch cluster is + * not overwhelmed by parallel requests,but may not work for all use cases. If this number is + * less than the number of maximum workers in your pipeline, the IO work will result in a + * sub-distribution of the last write step with most of the runners. + * + * @param maxParallelRequestsPerWindow the maximum number of parallel bulk requests for a window + * of data + * @return the {@link BulkIO} with maximum parallel bulk requests per window set + */ + public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequestsPerWindow) { + checkArgument( + maxParallelRequestsPerWindow > 0, "parameter value must be positive " + "a integer"); + return builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build(); + } + + /** + * Creates batches of documents using Stateful Processing based on user configurable settings of + * withMaxBufferingDuration and withMaxParallelRequestsPerWindow. + * + *

Mostly exists for testability of withMaxParallelRequestsPerWindow. + */ + @VisibleForTesting + static class StatefulBatching + extends PTransform, PCollection>>> { + final BulkIO spec; + + private StatefulBatching(BulkIO bulkSpec) { + spec = bulkSpec; + } + + public static StatefulBatching fromSpec(BulkIO spec) { + return new StatefulBatching(spec); + } + + @Override + public PCollection>> expand(PCollection input) { + GroupIntoBatches groupIntoBatches = + GroupIntoBatches.ofSize(spec.getMaxBatchSize()); + + if (spec.getMaxBufferingDuration() != null) { + groupIntoBatches = + groupIntoBatches.withMaxBufferingDuration(spec.getMaxBufferingDuration()); + } + + return input + .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow()))) + .apply(groupIntoBatches); + } } @Override public PDone expand(PCollection input) { ConnectionConfiguration connectionConfiguration = getConnectionConfiguration(); - FieldValueExtractFn idFn = getIdFn(); - BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn(); + checkState(connectionConfiguration != null, "withConnectionConfiguration() is required"); - checkArgument( - isDeleteFn == null || idFn != null, - "Id needs to be specified by withIdFn for delete operation"); - input.apply(ParDo.of(new WriteFn(this))); + + if (getUseStatefulBatches()) { + input.apply(StatefulBatching.fromSpec(this)).apply(ParDo.of(new BulkIOStatefulFn(this))); + } else { + input.apply(ParDo.of(new BulkIOBundleFn(this))); + } return PDone.in(input.getPipeline()); } - /** {@link DoFn} to for the {@link Write} transform. */ - @VisibleForTesting - static class WriteFn extends DoFn { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race conditions on updates + static class BulkIOBundleFn extends BulkIOBaseFn { + @VisibleForTesting + BulkIOBundleFn(BulkIO bulkSpec) { + super(bulkSpec); + } - private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5); + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + String bulkApiEntity = context.element(); + addAndMaybeFlush(bulkApiEntity); + } + } + /* + Intended for use in conjunction with {@link GroupIntoBatches} + */ + static class BulkIOStatefulFn extends BulkIOBaseFn>> { @VisibleForTesting - static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. Retry attempt[%d]"; + BulkIOStatefulFn(BulkIO bulkSpec) { + super(bulkSpec); + } - @VisibleForTesting - static final String RETRY_FAILED_LOG = - "Error writing to ES after %d attempt(s). No more attempts allowed"; + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + Iterable bulkApiEntities = context.element().getValue(); + for (String bulkApiEntity : bulkApiEntities) { + addAndMaybeFlush(bulkApiEntity); + } + } + } + + /** {@link DoFn} to for the {@link BulkIO} transform. */ + @VisibleForTesting + private abstract static class BulkIOBaseFn extends DoFn { + private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5); private transient FluentBackoff retryBackoff; - private int backendVersion; - private final Write spec; + private BulkIO spec; private transient RestClient restClient; private ArrayList batch; - private long currentBatchSizeBytes; - - // Encapsulates the elements which form the metadata for an Elasticsearch bulk operation - private static class DocumentMetadata implements Serializable { - final String index; - final String type; - final String id; - final Integer retryOnConflict; - - DocumentMetadata(String index, String type, String id, Integer retryOnConflict) { - this.index = index; - this.type = type; - this.id = id; - this.retryOnConflict = retryOnConflict; - } - } + long currentBatchSizeBytes; - @VisibleForTesting - WriteFn(Write spec) { - this.spec = spec; + protected BulkIOBaseFn(BulkIO bulkSpec) { + this.spec = bulkSpec; } @Setup public void setup() throws IOException { ConnectionConfiguration connectionConfiguration = spec.getConnectionConfiguration(); - backendVersion = getBackendVersion(connectionConfiguration); + restClient = connectionConfiguration.createClient(); retryBackoff = @@ -1340,11 +2091,6 @@ public void setup() throws IOException { .withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1) .withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration()); } - // configure a custom serializer for metadata to be able to change serialization based - // on ES version - SimpleModule module = new SimpleModule(); - module.addSerializer(DocumentMetadata.class, new DocumentMetadataSerializer()); - OBJECT_MAPPER.registerModule(module); } @StartBundle @@ -1353,135 +2099,86 @@ public void startBundle(StartBundleContext context) { currentBatchSizeBytes = 0; } - private class DocumentMetadataSerializer extends StdSerializer { - - private DocumentMetadataSerializer() { - super(DocumentMetadata.class); - } - - @Override - public void serialize( - DocumentMetadata value, JsonGenerator gen, SerializerProvider provider) - throws IOException { - gen.writeStartObject(); - if (value.index != null) { - gen.writeStringField("_index", value.index); - } - if (value.type != null) { - gen.writeStringField("_type", value.type); - } - if (value.id != null) { - gen.writeStringField("_id", value.id); - } - if (value.retryOnConflict != null && (backendVersion <= 6)) { - gen.writeNumberField("_retry_on_conflict", value.retryOnConflict); - } - if (value.retryOnConflict != null && backendVersion >= 7) { - gen.writeNumberField("retry_on_conflict", value.retryOnConflict); - } - gen.writeEndObject(); - } - } - /** - * Extracts the components that comprise the document address from the document using the - * {@link FieldValueExtractFn} configured. This allows any or all of the index, type and - * document id to be controlled on a per document basis. Sanitization of the index is - * performed, automatically lower-casing the value as required by Elasticsearch. - * - * @param parsedDocument the json from which the index, type and id may be extracted - * @return the document address as JSON or the default - * @throws IOException if the document cannot be parsed as JSON - */ - private String getDocumentMetadata(JsonNode parsedDocument) throws IOException { - DocumentMetadata metadata = - new DocumentMetadata( - spec.getIndexFn() != null - ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument)) - : null, - spec.getTypeFn() != null ? spec.getTypeFn().apply(parsedDocument) : null, - spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : null, - spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null); - return OBJECT_MAPPER.writeValueAsString(metadata); - } - - private static String lowerCaseOrNull(String input) { - return input == null ? null : input.toLowerCase(); + @FinishBundle + public void finishBundle(FinishBundleContext context) + throws IOException, InterruptedException { + flushBatch(); } - @ProcessElement - public void processElement(ProcessContext context) throws Exception { - String document = context.element(); // use configuration and auto-generated document IDs - String documentMetadata = "{}"; - boolean isDelete = false; - if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) { - // parse once and reused for efficiency - JsonNode parsedDocument = OBJECT_MAPPER.readTree(document); - documentMetadata = getDocumentMetadata(parsedDocument); - if (spec.getIsDeleteFn() != null) { - isDelete = spec.getIsDeleteFn().apply(parsedDocument); - } - } - - if (isDelete) { - // delete request used for deleting a document. - batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata)); - } else { - // index is an insert/upsert and update is a partial update (or insert if not existing) - if (spec.getUsePartialUpdate()) { - batch.add( - String.format( - "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n", - documentMetadata, document)); - } else { - batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document)); - } - } + protected void addAndMaybeFlush(String bulkApiEntity) + throws IOException, InterruptedException { + batch.add(bulkApiEntity); + currentBatchSizeBytes += bulkApiEntity.getBytes(StandardCharsets.UTF_8).length; - currentBatchSizeBytes += document.getBytes(StandardCharsets.UTF_8).length; if (batch.size() >= spec.getMaxBatchSize() || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) { flushBatch(); } } - @FinishBundle - public void finishBundle(FinishBundleContext context) - throws IOException, InterruptedException { - flushBatch(); + private boolean isRetryableClientException(Throwable t) { + // RestClient#performRequest only throws wrapped IOException so we must inspect the + // exception cause to determine if the exception is likely transient i.e. retryable or + // not. + return t.getCause() instanceof ConnectTimeoutException + || t.getCause() instanceof SocketTimeoutException + || t.getCause() instanceof ConnectionClosedException + || t.getCause() instanceof ConnectException; } private void flushBatch() throws IOException, InterruptedException { if (batch.isEmpty()) { return; } + + LOG.info( + "ElasticsearchIO batch size: {}, batch size bytes: {}", + batch.size(), + currentBatchSizeBytes); + StringBuilder bulkRequest = new StringBuilder(); for (String json : batch) { bulkRequest.append(json); } + batch.clear(); - currentBatchSizeBytes = 0; - Response response; - HttpEntity responseEntity; - // Elasticsearch will default to the index/type provided here if none are set in the - // document meta (i.e. using ElasticsearchIO$Write#withIndexFn and - // ElasticsearchIO$Write#withTypeFn options) - String endPoint = - String.format( - "/%s/%s/_bulk", - spec.getConnectionConfiguration().getIndex(), - spec.getConnectionConfiguration().getType()); + currentBatchSizeBytes = 0L; + + Response response = null; + HttpEntity responseEntity = null; + + // Elasticsearch will default to the index/type provided the {@link + // ConnectionConfiguration} if none are set in the document meta (i.e. + // using ElasticsearchIO$DocToBulk#withIndexFn and + // ElasticsearchIO$DocToBulk#withTypeFn options) + String endPoint = spec.getConnectionConfiguration().getBulkEndPoint(); + HttpEntity requestBody = new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); - Request request = new Request("POST", endPoint); - request.addParameters(Collections.emptyMap()); - request.setEntity(requestBody); - response = restClient.performRequest(request); - responseEntity = new BufferedHttpEntity(response.getEntity()); + try { + Request request = new Request("POST", endPoint); + request.addParameters(Collections.emptyMap()); + request.setEntity(requestBody); + response = restClient.performRequest(request); + responseEntity = new BufferedHttpEntity(response.getEntity()); + } catch (java.io.IOException ex) { + if (spec.getRetryConfiguration() == null || !isRetryableClientException(ex)) { + throw ex; + } + LOG.error("Caught ES timeout, retrying", ex); + } + if (spec.getRetryConfiguration() != null - && spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) { + && (response == null + || responseEntity == null + || spec.getRetryConfiguration().getRetryPredicate().test(responseEntity))) { + if (responseEntity != null + && spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) { + LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS."); + } responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody); } - checkForErrors(responseEntity, backendVersion, spec.getUsePartialUpdate()); + checkForErrors(responseEntity, spec.getAllowedResponseErrors()); } /** retry request based on retry configuration policy. */ @@ -1489,21 +2186,32 @@ private HttpEntity handleRetry( String method, String endpoint, Map params, HttpEntity requestBody) throws IOException, InterruptedException { Response response; - HttpEntity responseEntity; + HttpEntity responseEntity = null; Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = retryBackoff.backoff(); int attempt = 0; // while retry policy exists while (BackOffUtils.next(sleeper, backoff)) { LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt)); - Request request = new Request(method, endpoint); - request.addParameters(params); - request.setEntity(requestBody); - response = restClient.performRequest(request); - responseEntity = new BufferedHttpEntity(response.getEntity()); + try { + Request request = new Request(method, endpoint); + request.addParameters(params); + request.setEntity(requestBody); + response = restClient.performRequest(request); + responseEntity = new BufferedHttpEntity(response.getEntity()); + } catch (java.io.IOException ex) { + if (isRetryableClientException(ex)) { + LOG.error("Caught ES timeout, retrying", ex); + continue; + } + } // if response has no 429 errors - if (!spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) { + if (!Objects.requireNonNull(spec.getRetryConfiguration()) + .getRetryPredicate() + .test(responseEntity)) { return responseEntity; + } else { + LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS."); } } throw new IOException(String.format(RETRY_FAILED_LOG, attempt)); @@ -1526,10 +2234,7 @@ static int getBackendVersion(ConnectionConfiguration connectionConfiguration) { int backendVersion = Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 1)); checkArgument( - (backendVersion == 2 - || backendVersion == 5 - || backendVersion == 6 - || backendVersion == 7), + (VALID_CLUSTER_VERSIONS.contains(backendVersion)), "The Elasticsearch version to connect to is %s.x. " + "This version of the ElasticsearchIO is only compatible with " + "Elasticsearch v7.x, v6.x, v5.x and v2.x",