Skip to content

Commit

Permalink
[BEAM-12093] Overhaul ElasticsearchIO.Write (#14347)
Browse files Browse the repository at this point in the history
Presently, the Write transform has 2 responsibilities:

-Convert input documents into Bulk API entities, serializing based on user settings (partial update, delete, upsert, etc) -> DocToBulk
-Batch the converted Bulk API entities together and interface with the target ES cluster -> BulkIO

This change aims to separate these 2 responsibilities into discrete PTransforms to allow for greater flexibility while also maintaining the convenience of the Write transform to perform both document conversion and IO serially.
  • Loading branch information
egalpin authored May 27, 2021
1 parent 4cc279c commit 4f1f1c1
Show file tree
Hide file tree
Showing 16 changed files with 1,691 additions and 324 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
## I/Os

* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Allow splitting apart document serialization and IO for ElasticsearchIO
* Support Bulk API request size optimization through addition of ElasticsearchIO.Write.withStatefulBatches

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -123,6 +132,23 @@ public void testWriteVolumeWithFullAddressing() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,29 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowedErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
// TODO(egalpin): Remove painless plugin in favour of containerized tests
testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.0"
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
Expand Down Expand Up @@ -85,6 +86,7 @@ public Settings indexSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(Netty4Plugin.class);
plugins.add(PainlessPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -199,6 +201,35 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
// TODO(egalpin): Remove painless plugin in favour of containerized tests
testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.2"
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
Expand Down Expand Up @@ -85,6 +86,7 @@ public Settings indexSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(Netty4Plugin.class);
plugins.add(PainlessPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -193,6 +195,35 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
// TODO(egalpin): Remove painless plugin in favour of containerized tests
testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.7.5"
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
Expand Down Expand Up @@ -89,6 +90,7 @@ public Settings indexSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(Netty4Plugin.class);
plugins.add(PainlessPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -197,6 +199,35 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Loading

0 comments on commit 4f1f1c1

Please sign in to comment.