From 392e6337fe22a52f55816190427b5ffde1f8395e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 18 Jan 2024 20:30:38 -0500 Subject: [PATCH 1/8] support dynamic destinations and add tests --- ...torageWriteApiSchemaTransformProvider.java | 59 ++++++++++++-- ...geWriteApiSchemaTransformProviderTest.java | 54 +++++++++++-- .../io/external/xlang_bigqueryio_it_test.py | 71 ++++++++++++++++- sdks/python/apache_beam/io/gcp/bigquery.py | 79 ++++++++++++++++--- 4 files changed, 235 insertions(+), 28 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 8c4edd2244b4..7bc0ef2297fb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Arrays; @@ -35,6 +36,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.metrics.Counter; @@ -56,6 +59,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -81,6 +85,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider private static final String INPUT_ROWS_TAG = "input"; private static final String FAILED_ROWS_TAG = "FailedRows"; private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors"; + // magic string that tells us to write to dynamic destinations + protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; @Override protected Class configurationClass() { @@ -161,7 +167,11 @@ public void validate() { checkArgument( !Strings.isNullOrEmpty(this.getTable()), invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); - checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); + + // if we have an input table spec, validate it + if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) { + checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); + } // validate create and write dispositions if (!Strings.isNullOrEmpty(this.getCreateDisposition())) { @@ -337,22 +347,44 @@ private static class NoOutputDoFn extends DoFn { public void process(ProcessContext c) {} } + private static class RowDynamicDestinations extends DynamicDestinations { + Schema schema; + + RowDynamicDestinations(Schema schema) { + this.schema = schema; + } + + @Override + public String getDestination(ValueInSingleWindow element) { + return element.getValue().getString("destination"); + } + + @Override + public TableDestination getTable(String destination) { + return new TableDestination(destination, null); + } + + @Override + public TableSchema getSchema(String destination) { + return BigQueryUtils.toTableSchema(schema); + } + } + @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { // Check that the input exists checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG); PCollection inputRows = input.get(INPUT_ROWS_TAG); - BigQueryIO.Write write = createStorageWriteApiTransform(); + BigQueryIO.Write write = createStorageWriteApiTransform(inputRows.getSchema()); if (inputRows.isBounded() == IsBounded.UNBOUNDED) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); Boolean autoSharding = configuration.getAutoSharding(); int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams(); boolean useAtLeastOnceSemantics = - configuration.getUseAtLeastOnceSemantics() == null - ? false - : configuration.getUseAtLeastOnceSemantics(); + configuration.getUseAtLeastOnceSemantics() != null + && configuration.getUseAtLeastOnceSemantics(); // Triggering frequency is only applicable for exactly-once if (!useAtLeastOnceSemantics) { write = @@ -425,7 +457,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - BigQueryIO.Write createStorageWriteApiTransform() { + BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { Method writeMethod = configuration.getUseAtLeastOnceSemantics() != null && configuration.getUseAtLeastOnceSemantics() @@ -434,12 +466,23 @@ BigQueryIO.Write createStorageWriteApiTransform() { BigQueryIO.Write write = BigQueryIO.write() - .to(configuration.getTable()) .withMethod(writeMethod) - .useBeamSchema() .withFormatFunction(BigQueryUtils.toTableRow()) .withWriteDisposition(WriteDisposition.WRITE_APPEND); + if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { + checkArgument( + schema.getFieldNames().equals(Arrays.asList("destination", "record")), + "When writing to dynamic destinations, we expect Row Schema with a " + + "\"destination\" string field and a \"record\" Row field."); + write = + write + .to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema())) + .withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record"))); + } else { + write = write.to(configuration.getTable()).useBeamSchema(); + } + if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 54c636bde5fe..64ea0b11d1b9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; @@ -136,8 +137,8 @@ public PCollectionRowTuple runWithConfig( writeTransform.setBigQueryServices(fakeBigQueryServices); String tag = provider.inputCollectionNames().get(0); - - PCollection rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA)); + PCollection rows = + p.apply(Create.of(inputRows).withRowSchema(inputRows.get(0).getSchema())); PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows); PCollectionRowTuple result = input.apply(writeTransform); @@ -155,16 +156,20 @@ public Boolean rowsEquals(List expectedRows, List actualRows) { TableRow actualRow = actualRows.get(i); Row expectedRow = expectedRows.get(Integer.parseInt(actualRow.get("number").toString()) - 1); - if (!expectedRow.getValue("name").equals(actualRow.get("name")) - || !expectedRow - .getValue("number") - .equals(Long.parseLong(actualRow.get("number").toString()))) { + if (!rowEquals(expectedRow, actualRow)) { return false; } } return true; } + public boolean rowEquals(Row expectedRow, TableRow actualRow) { + return expectedRow.getValue("name").equals(actualRow.get("name")) + && expectedRow + .getValue("number") + .equals(Long.parseLong(actualRow.get("number").toString())); + } + @Test public void testSimpleWrite() throws Exception { String tableSpec = "project:dataset.simple_write"; @@ -179,6 +184,43 @@ public void testSimpleWrite() throws Exception { rowsEquals(ROWS, fakeDatasetService.getAllRows("project", "dataset", "simple_write"))); } + @Test + public void testWriteToDynamicDestinations() throws Exception { + String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS; + BigQueryStorageWriteApiSchemaTransformConfiguration config = + BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(dynamic).build(); + + String baseTableSpec = "project:dataset.dynamic_write_"; + + Schema schemaWithDestinations = + Schema.builder().addStringField("destination").addRowField("record", SCHEMA).build(); + List rowsWithDestinations = + ROWS.stream() + .map( + row -> + Row.withSchema(schemaWithDestinations) + .withFieldValue("destination", baseTableSpec + row.getInt64("number")) + .withFieldValue("record", row) + .build()) + .collect(Collectors.toList()); + + runWithConfig(config, rowsWithDestinations); + p.run().waitUntilFinish(); + + assertTrue( + rowEquals( + ROWS.get(0), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_1").get(0))); + assertTrue( + rowEquals( + ROWS.get(1), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_2").get(0))); + assertTrue( + rowEquals( + ROWS.get(2), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_3").get(0))); + } + @Test public void testInputElementCount() throws Exception { String tableSpec = "project:dataset.input_count"; diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 5917ca4dc729..87fa3def7079 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -28,6 +28,7 @@ import pytest from hamcrest.core import assert_that as hamcrest_assert +from hamcrest.core.core.allof import all_of import apache_beam as beam from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery @@ -132,6 +133,8 @@ def tearDown(self): self.project) def parse_expected_data(self, expected_elements): + if not isinstance(expected_elements, list): + expected_elements = [expected_elements] data = [] for row in expected_elements: values = list(row.values()) @@ -246,6 +249,66 @@ def test_write_with_beam_rows(self): table=table_id, expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) + def test_write_to_dynamic_destinations(self): + base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) + spec_with_project = '{}:{}'.format(self.project, base_table_spec) + tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] + + bq_matchers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) + ] + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record['int']), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + use_at_least_once=False, + expansion_service=self.expansion_service)) + hamcrest_assert(p, all_of(*bq_matchers)) + + def test_write_to_dynamic_destinations_with_beam_rows(self): + base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) + spec_with_project = '{}:{}'.format(self.project, base_table_spec) + tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] + + bq_matchers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) + ] + + row_elements = [ + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_numeric=e['numeric'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS + ] + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | beam.Create(row_elements) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record.my_int), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=False, + expansion_service=self.expansion_service)) + hamcrest_assert(p, all_of(*bq_matchers)) + def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): elements = self.ELEMENTS.copy() schema = self.ALL_TYPES_SCHEMA @@ -278,13 +341,16 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) - def test_streaming_with_fixed_num_streams(self): + def skip_if_not_dataflow_runner(self) -> bool: # skip if dataflow runner is not specified if not self._runner or "dataflowrunner" not in self._runner.lower(): self.skipTest( - "The exactly-once route has the requirement " + "Streaming with exactly-once route has the requirement " "`beam:requirement:pardo:on_window_expiration:v1`, " "which is currently only supported by the Dataflow runner") + + def test_streaming_with_fixed_num_streams(self): + self.skip_if_not_dataflow_runner() table = 'streaming_fixed_num_streams' self.run_streaming(table_name=table, num_streams=4) @@ -292,6 +358,7 @@ def test_streaming_with_fixed_num_streams(self): "Streaming to the Storage Write API sink with autosharding is broken " "with Dataflow Runner V2.") def test_streaming_with_auto_sharding(self): + self.skip_if_not_dataflow_runner() table = 'streaming_with_auto_sharding' self.run_streaming(table_name=table) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index ac06425e95a9..5924a9ac5d83 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -484,6 +484,12 @@ def chain_after(result): overhead: https://cloud.google.com/bigquery/quotas#streaming_inserts """ MAX_INSERT_PAYLOAD_SIZE = 9 << 20 +""" +A magic string reserved for the Storage Write API mode, which utilizes +multi-lang. This tells the Java SchemaTransform connector that the incoming +rows are meant to go to dynamic destinations. +""" +DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS" @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference") @@ -2232,7 +2238,7 @@ def find_in_nested_dict(schema): "for writing with STORAGE_WRITE_API.") from exn elif callable(self.schema): raise NotImplementedError( - "Writing to dynamic destinations is not" + "Writing with dynamic schemas is not" "supported for this write method.") elif isinstance(self.schema, vp.ValueProvider): schema = self.schema.get() @@ -2244,18 +2250,67 @@ def find_in_nested_dict(schema): table = bigquery_tools.get_hashable_destination(self.table_reference) # None type is not supported triggering_frequency = self.triggering_frequency or 0 - # SchemaTransform expects Beam Rows, so map to Rows first - if is_rows: - input_beam_rows = pcoll - else: - input_beam_rows = ( + + # For dynamic destinations, we first figure out where each row is going. + # Then we append the destination to each row before sending it over to + # the Java SchemaTransform. + # We need to do this here because there are obstacles to passing the + # destinations function to Java + if callable(table): + # call function and append destination to each row + input_rows = ( pcoll - | "Convert dict to Beam Row" >> beam.Map( - lambda row: bigquery_tools.beam_row_from_dict(row, schema) - ).with_output_types( - RowTypeConstraint.from_fields( - bigquery_tools.get_beam_typehints_from_tableschema(schema))) - ) + | "Append dynamic destinations" >> beam.ParDo( + bigquery_tools.AppendDestinationsFn(table), + *self.table_side_inputs)) + + # if input type is Beam Row, just wrap everything in another Row + if is_rows: + input_beam_rows = ( + input_rows + | "Wrap in Beam Row" >> + beam.Map(lambda row: beam.Row(destination=row[0], record=row[ + 1])).with_output_types( + RowTypeConstraint.from_fields([ + ('destination', str), ('record', pcoll.element_type) + ]))) + # otherwise, convert to Beam Rows. + else: + input_beam_rows = ( + input_rows + | "Convert dict to Beam Row" >> beam.Map( + lambda row: beam.Row( + destination=row[0], + record=bigquery_tools.beam_row_from_dict(row[1], schema)) + ).with_output_types( + RowTypeConstraint.from_fields([ + ('destination', str), + ( + 'record', + RowTypeConstraint.from_fields( + bigquery_tools. + get_beam_typehints_from_tableschema(schema))) + ]))) + input_beam_rows | beam.Map( + lambda e: logging.warning("input_beam_rows: %s", e)) + + # # communicate to Java that this write should use dynamic destinations + table = DYNAMIC_DESTINATIONS + + # if writing to one destination, just convert to Beam rows and send over + else: + if is_rows: + input_beam_rows = pcoll + else: + input_beam_rows = ( + pcoll + | "Convert dict to Beam Row" >> beam.Map( + lambda row: bigquery_tools.beam_row_from_dict(row, schema)). + with_output_types( + RowTypeConstraint.from_fields( + bigquery_tools.get_beam_typehints_from_tableschema( + schema)))) + output_beam_rows = ( input_beam_rows | StorageWriteToBigQuery( From 1a270eac27dac558a7a716c8d36d1c9f6e52d5f2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 19 Jan 2024 10:38:34 -0500 Subject: [PATCH 2/8] remove line --- sdks/python/apache_beam/io/gcp/bigquery.py | 2 - sdks/python/temp.py | 58 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 sdks/python/temp.py diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 5924a9ac5d83..190c29861260 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2291,8 +2291,6 @@ def find_in_nested_dict(schema): bigquery_tools. get_beam_typehints_from_tableschema(schema))) ]))) - input_beam_rows | beam.Map( - lambda e: logging.warning("input_beam_rows: %s", e)) # # communicate to Java that this write should use dynamic destinations table = DYNAMIC_DESTINATIONS diff --git a/sdks/python/temp.py b/sdks/python/temp.py new file mode 100644 index 000000000000..f509e2db2635 --- /dev/null +++ b/sdks/python/temp.py @@ -0,0 +1,58 @@ +import apache_beam as beam +from apache_beam.io.gcp.bigquery import WriteToBigQuery +from apache_beam.typehints.row_type import RowTypeConstraint +import typing +import subprocess +import os +import warnings +import sys +from apache_beam.transforms.external_schematransform_provider import ExternalSchemaTransformProvider +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.typehints.schemas import named_tuple_to_schema +from apache_beam.typehints.schemas import named_tuple_from_schema + +if __name__ == '__main__': + row = beam.Row(num=1, str="a") + nt = named_tuple_from_schema(row) + + print(nt) + + # provider = ExternalSchemaTransformProvider( + # BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) + # + # KafkaWriteTransform = provider.get("KafkaWrite") + # + # with beam.Pipeline() as p: + # p | beam.Create() | WriteToKafka() + # + # try: + # out = subprocess.run(['python', '-V']) + # out = subprocess.run([ + # sys.executable, + # os.path.join('gen_xlang_wrappers.py'), + # '--cleanup', + # '--input-expansion-services', 'standard_expansion_services.yaml', + # '--output-transforms-config', 'standard_external_transforms.yaml'], + # capture_output=True, check=True) + # print(out.stdout) + # except subprocess.CalledProcessError as err: + # raise RuntimeError('Could not generate external transform wrappers: %s', err.stderr) + + # from apache_beam.coders import RowCoder + # row_coder = RowCoder.from_type_hint(RowTypeConstraint.from_fields([ + # ('row', typing.Dict[str, int]), + # ('destination', str), + # ('schema', str) + # ]), None) + # content = {"row": {"what": 1}, "destination": "yerrr", "schema": "u got it"} + # row = beam.Row(row={"what": 1}, destination="yerrr", schema="u got it") + # row2 = beam.Row(**content) + # bytesss = row_coder.encode(row) + # bytesss2 = row_coder.encode(row2) + # print(bytesss) + # print(bytesss2) + # print(bytesss == bytesss2) + + # with beam.Pipeline() as p: + # p | beam.Create([{"hi": 1}, {"hi": 2}]) | WriteToBigQuery(table=lambda x:"google.com:clouddfe:ahmedabualsaud_test.dynamic", schema="hi:INTEGER", method=WriteToBigQuery.Method.STORAGE_WRITE_API) + From 483f8f3722d84a41a015c14ac603baf140400e0d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 19 Jan 2024 11:01:27 -0500 Subject: [PATCH 3/8] remove temp --- sdks/python/temp.py | 58 --------------------------------------------- 1 file changed, 58 deletions(-) delete mode 100644 sdks/python/temp.py diff --git a/sdks/python/temp.py b/sdks/python/temp.py deleted file mode 100644 index f509e2db2635..000000000000 --- a/sdks/python/temp.py +++ /dev/null @@ -1,58 +0,0 @@ -import apache_beam as beam -from apache_beam.io.gcp.bigquery import WriteToBigQuery -from apache_beam.typehints.row_type import RowTypeConstraint -import typing -import subprocess -import os -import warnings -import sys -from apache_beam.transforms.external_schematransform_provider import ExternalSchemaTransformProvider -from apache_beam.transforms.external import BeamJarExpansionService -from apache_beam.typehints.schemas import named_tuple_to_schema -from apache_beam.typehints.schemas import named_tuple_from_schema - -if __name__ == '__main__': - row = beam.Row(num=1, str="a") - nt = named_tuple_from_schema(row) - - print(nt) - - # provider = ExternalSchemaTransformProvider( - # BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) - # - # KafkaWriteTransform = provider.get("KafkaWrite") - # - # with beam.Pipeline() as p: - # p | beam.Create() | WriteToKafka() - # - # try: - # out = subprocess.run(['python', '-V']) - # out = subprocess.run([ - # sys.executable, - # os.path.join('gen_xlang_wrappers.py'), - # '--cleanup', - # '--input-expansion-services', 'standard_expansion_services.yaml', - # '--output-transforms-config', 'standard_external_transforms.yaml'], - # capture_output=True, check=True) - # print(out.stdout) - # except subprocess.CalledProcessError as err: - # raise RuntimeError('Could not generate external transform wrappers: %s', err.stderr) - - # from apache_beam.coders import RowCoder - # row_coder = RowCoder.from_type_hint(RowTypeConstraint.from_fields([ - # ('row', typing.Dict[str, int]), - # ('destination', str), - # ('schema', str) - # ]), None) - # content = {"row": {"what": 1}, "destination": "yerrr", "schema": "u got it"} - # row = beam.Row(row={"what": 1}, destination="yerrr", schema="u got it") - # row2 = beam.Row(**content) - # bytesss = row_coder.encode(row) - # bytesss2 = row_coder.encode(row2) - # print(bytesss) - # print(bytesss2) - # print(bytesss == bytesss2) - - # with beam.Pipeline() as p: - # p | beam.Create([{"hi": 1}, {"hi": 2}]) | WriteToBigQuery(table=lambda x:"google.com:clouddfe:ahmedabualsaud_test.dynamic", schema="hi:INTEGER", method=WriteToBigQuery.Method.STORAGE_WRITE_API) - From 32a7d0a03a725647512a6890db1c02b35bd85955 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 21 Jan 2024 15:38:19 -0500 Subject: [PATCH 4/8] put all relevant logic in StorageWriteToBigQuery --- .../io/external/xlang_bigqueryio_it_test.py | 6 +- sdks/python/apache_beam/io/gcp/bigquery.py | 345 ++++++++---------- 2 files changed, 163 insertions(+), 188 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 87fa3def7079..c1e9754526e8 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -53,9 +53,6 @@ @pytest.mark.uses_gcp_java_expansion_service -# @unittest.skipUnless( -# os.environ.get('EXPANSION_PORT'), -# "EXPANSION_PORT environment var is not provided.") class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' @@ -115,7 +112,8 @@ def setUp(self): _LOGGER.info( "Created dataset %s in project %s", self.dataset_id, self.project) - _LOGGER.info("expansion port: %s", os.environ.get('EXPANSION_PORT')) + self.assertTrue( + os.environ.get('EXPANSION_PORT'), "Expansion service port not found!") self.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) def tearDown(self): diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 190c29861260..01a0c2a4b492 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -484,12 +484,6 @@ def chain_after(result): overhead: https://cloud.google.com/bigquery/quotas#streaming_inserts """ MAX_INSERT_PAYLOAD_SIZE = 9 << 20 -""" -A magic string reserved for the Storage Write API mode, which utilizes -multi-lang. This tells the Java SchemaTransform connector that the incoming -rows are meant to go to dynamic destinations. -""" -DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS" @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference") @@ -2228,120 +2222,17 @@ def find_in_nested_dict(schema): BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS]) elif method_to_use == WriteToBigQuery.Method.STORAGE_WRITE_API: - if self.schema is None: - try: - schema = schema_from_element_type(pcoll.element_type) - is_rows = True - except TypeError as exn: - raise ValueError( - "A schema is required in order to prepare rows" - "for writing with STORAGE_WRITE_API.") from exn - elif callable(self.schema): - raise NotImplementedError( - "Writing with dynamic schemas is not" - "supported for this write method.") - elif isinstance(self.schema, vp.ValueProvider): - schema = self.schema.get() - is_rows = False - else: - schema = self.schema - is_rows = False - - table = bigquery_tools.get_hashable_destination(self.table_reference) - # None type is not supported - triggering_frequency = self.triggering_frequency or 0 - - # For dynamic destinations, we first figure out where each row is going. - # Then we append the destination to each row before sending it over to - # the Java SchemaTransform. - # We need to do this here because there are obstacles to passing the - # destinations function to Java - if callable(table): - # call function and append destination to each row - input_rows = ( - pcoll - | "Append dynamic destinations" >> beam.ParDo( - bigquery_tools.AppendDestinationsFn(table), - *self.table_side_inputs)) - - # if input type is Beam Row, just wrap everything in another Row - if is_rows: - input_beam_rows = ( - input_rows - | "Wrap in Beam Row" >> - beam.Map(lambda row: beam.Row(destination=row[0], record=row[ - 1])).with_output_types( - RowTypeConstraint.from_fields([ - ('destination', str), ('record', pcoll.element_type) - ]))) - # otherwise, convert to Beam Rows. - else: - input_beam_rows = ( - input_rows - | "Convert dict to Beam Row" >> beam.Map( - lambda row: beam.Row( - destination=row[0], - record=bigquery_tools.beam_row_from_dict(row[1], schema)) - ).with_output_types( - RowTypeConstraint.from_fields([ - ('destination', str), - ( - 'record', - RowTypeConstraint.from_fields( - bigquery_tools. - get_beam_typehints_from_tableschema(schema))) - ]))) - - # # communicate to Java that this write should use dynamic destinations - table = DYNAMIC_DESTINATIONS - - # if writing to one destination, just convert to Beam rows and send over - else: - if is_rows: - input_beam_rows = pcoll - else: - input_beam_rows = ( - pcoll - | "Convert dict to Beam Row" >> beam.Map( - lambda row: bigquery_tools.beam_row_from_dict(row, schema)). - with_output_types( - RowTypeConstraint.from_fields( - bigquery_tools.get_beam_typehints_from_tableschema( - schema)))) - - output_beam_rows = ( - input_beam_rows - | StorageWriteToBigQuery( - table=table, - create_disposition=self.create_disposition, - write_disposition=self.write_disposition, - triggering_frequency=triggering_frequency, - use_at_least_once=self.use_at_least_once, - with_auto_sharding=self.with_auto_sharding, - num_storage_api_streams=self._num_storage_api_streams, - expansion_service=self.expansion_service)) - - if is_rows: - failed_rows = output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS] - failed_rows_with_errors = output_beam_rows[ - StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] - else: - # return back from Beam Rows to Python dict elements - failed_rows = ( - output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS] - | beam.Map(lambda row: row.as_dict())) - failed_rows_with_errors = ( - output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] - | beam.Map( - lambda row: { - "error_message": row.error_message, - "failed_row": row.failed_row.as_dict() - })) - - return WriteResult( - method=WriteToBigQuery.Method.STORAGE_WRITE_API, - failed_rows=failed_rows, - failed_rows_with_errors=failed_rows_with_errors) + return pcoll | StorageWriteToBigQuery( + table=self.table_reference, + schema=self.schema, + table_side_inputs=self.table_side_inputs, + create_disposition=self.create_disposition, + write_disposition=self.write_disposition, + triggering_frequency=self.triggering_frequency, + use_at_least_once=self.use_at_least_once, + with_auto_sharding=self.with_auto_sharding, + num_storage_api_streams=self._num_storage_api_streams, + expansion_service=self.expansion_service) else: raise ValueError(f"Unsupported method {method_to_use}") @@ -2435,7 +2326,7 @@ class WriteResult: """ def __init__( self, - method: WriteToBigQuery.Method = None, + method: str = None, destination_load_jobid_pairs: PCollection[Tuple[str, JobReference]] = None, destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]] = None, @@ -2558,24 +2449,25 @@ def __getitem__(self, key): return self.attributes[key].__get__(self, WriteResult) -def _default_io_expansion_service(append_args=None): - return BeamJarExpansionService( - 'sdks:java:io:google-cloud-platform:expansion-service:build', - append_args=append_args) - - class StorageWriteToBigQuery(PTransform): """Writes data to BigQuery using Storage API. Experimental; no backwards compatibility guarantees. """ - URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v2" + IDENTIFIER = "beam:schematransform:org.apache.beam:bigquery_storage_write:v2" FAILED_ROWS = "FailedRows" FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors" + # fields for rows sent to Storage API with dynamic destinations + DESTINATION = "destination" + RECORD = "record" + # magic string to tell Java that these rows are going to dynamic destinations + DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS" def __init__( self, table, + table_side_inputs=None, + schema=None, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_APPEND, triggering_frequency=0, @@ -2583,71 +2475,156 @@ def __init__( with_auto_sharding=False, num_storage_api_streams=0, expansion_service=None): - """Initialize a StorageWriteToBigQuery transform. - - :param table: - Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``. - :param create_disposition: - String specifying the strategy to take when the table doesn't - exist. Possible values are: - * ``'CREATE_IF_NEEDED'``: create if does not exist. - * ``'CREATE_NEVER'``: fail the write if does not exist. - :param write_disposition: - String specifying the strategy to take when the table already - contains data. Possible values are: - * ``'WRITE_TRUNCATE'``: delete existing rows. - * ``'WRITE_APPEND'``: add to existing rows. - * ``'WRITE_EMPTY'``: fail the write if table not empty. - :param triggering_frequency: - The time in seconds between write commits. Should only be specified - for streaming pipelines. Defaults to 5 seconds. - :param use_at_least_once: - Use at-least-once semantics. Is cheaper and provides lower latency, - but will potentially duplicate records. - :param with_auto_sharding: - Experimental. If true, enables using a dynamically determined number of - shards to write to BigQuery. Only applicable to unbounded input. - :param expansion_service: - The address (host:port) of the expansion service. If no expansion - service is provided, will attempt to run the default GCP expansion - service. - """ - super().__init__() self._table = table + self._table_side_inputs = table_side_inputs + self._schema = schema self._create_disposition = create_disposition self._write_disposition = write_disposition self._triggering_frequency = triggering_frequency self._use_at_least_once = use_at_least_once self._with_auto_sharding = with_auto_sharding self._num_storage_api_streams = num_storage_api_streams - self._expansion_service = ( - expansion_service or _default_io_expansion_service()) - self.schematransform_config = SchemaAwareExternalTransform.discover_config( - self._expansion_service, self.URN) + self._expansion_service = expansion_service or BeamJarExpansionService( + 'sdks:java:io:google-cloud-platform:expansion-service:build') def expand(self, input): - external_storage_write = SchemaAwareExternalTransform( - identifier=self.schematransform_config.identifier, - expansion_service=self._expansion_service, - rearrange_based_on_discovery=True, - autoSharding=self._with_auto_sharding, - numStreams=self._num_storage_api_streams, - createDisposition=self._create_disposition, - table=self._table, - triggeringFrequencySeconds=self._triggering_frequency, - useAtLeastOnceSemantics=self._use_at_least_once, - writeDisposition=self._write_disposition, - errorHandling={ - 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS - }) - - input_tag = self.schematransform_config.inputs[0] - - result = {input_tag: input} | external_storage_write - result[StorageWriteToBigQuery.FAILED_ROWS] = result[ - StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] | beam.Map( - lambda row_and_error: row_and_error[0]) - return result + if self._schema is None: + try: + schema = schema_from_element_type(input.element_type) + is_rows = True + except TypeError as exn: + raise ValueError( + "A schema is required in order to prepare rows" + "for writing with STORAGE_WRITE_API.") from exn + elif callable(self._schema): + raise NotImplementedError( + "Writing with dynamic schemas is not" + "supported for this write method.") + elif isinstance(self._schema, vp.ValueProvider): + schema = self._schema.get() + is_rows = False + else: + schema = self._schema + is_rows = False + + table = bigquery_tools.get_hashable_destination(self._table) + + # if writing to one destination, just convert to Beam rows and send over + if not callable(table): + if is_rows: + input_beam_rows = input + else: + input_beam_rows = ( + input + | "Convert dict to Beam Row" >> self.ConvertToBeamRows( + schema, False).with_output_types()) + + # For dynamic destinations, we first figure out where each row is going. + # Then we send (destination, record) rows over to Java SchemaTransform. + # We need to do this here because there are obstacles to passing the + # destinations function to Java + else: + # call function and append destination to each row + input_rows = ( + input + | "Append dynamic destinations" >> beam.ParDo( + bigquery_tools.AppendDestinationsFn(table), + *self._table_side_inputs)) + # if input type is Beam Row, just wrap everything in another Row + if is_rows: + input_beam_rows = ( + input_rows + | "Wrap in Beam Row" >> beam.Map( + lambda row: beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row[0], + StorageWriteToBigQuery.RECORD: row[1] + })).with_output_types( + RowTypeConstraint.from_fields([ + (StorageWriteToBigQuery.DESTINATION, str), + (StorageWriteToBigQuery.RECORD, input.element_type) + ]))) + # otherwise, convert to Beam Rows + else: + input_beam_rows = ( + input_rows + | "Convert dict to Beam Row" >> self.ConvertToBeamRows( + schema, True).with_output_types()) + # communicate to Java that this write should use dynamic destinations + table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS + + output = ( + input_beam_rows + | SchemaAwareExternalTransform( + identifier=StorageWriteToBigQuery.IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + table=table, + createDisposition=self._create_disposition, + writeDisposition=self._write_disposition, + triggeringFrequencySeconds=self._triggering_frequency, + autoSharding=self._with_auto_sharding, + numStreams=self._num_storage_api_streams, + useAtLeastOnceSemantics=self._use_at_least_once, + errorHandling={ + 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS + })) + + failed_rows_with_errors = output[ + StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] + failed_rows = failed_rows_with_errors | beam.Map( + lambda row_and_error: row_and_error[0]) + if not is_rows: + # return back from Beam Rows to Python dict elements + failed_rows = failed_rows | beam.Map(lambda row: row.as_dict()) + failed_rows_with_errors = failed_rows_with_errors | beam.Map( + lambda row: { + "error_message": row.error_message, + "failed_row": row.failed_row.as_dict() + }) + + return WriteResult( + method=WriteToBigQuery.Method.STORAGE_WRITE_API, + failed_rows=failed_rows, + failed_rows_with_errors=failed_rows_with_errors) + + class ConvertToBeamRows(PTransform): + def __init__(self, schema, dynamic_destinations): + self.schema = schema + self.dynamic_destinations = dynamic_destinations + + def expand(self, input_dicts): + if self.dynamic_destinations: + return ( + input_dicts + | "Convert dict to Beam Row" >> beam.Map( + lambda row: beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(row[1], self.schema) + }))) + else: + return ( + input_dicts + | "Convert dict to Beam Row" >> beam.Map( + lambda row: bigquery_tools.beam_row_from_dict(row, self.schema)) + ) + + def with_output_types(self): + row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( + self.schema) + if self.dynamic_destinations: + type_hint = RowTypeConstraint.from_fields([ + (StorageWriteToBigQuery.DESTINATION, str), + ( + StorageWriteToBigQuery.RECORD, + RowTypeConstraint.from_fields(row_type_hints)) + ]) + else: + type_hint = RowTypeConstraint.from_fields(row_type_hints) + + return super().with_output_types(type_hint) class ReadFromBigQuery(PTransform): From a87f004204948fd98c0c8ccd6fedb1633c8630e1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 21 Jan 2024 15:48:04 -0500 Subject: [PATCH 5/8] add trigger files for xlang gcp postcommits --- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 0 .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json new file mode 100644 index 000000000000..e69de29bb2d1 From c6f4b573e0456d5befbad4d41e831eb72787530b Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 22 Jan 2024 08:37:37 -0500 Subject: [PATCH 6/8] touch gcp xlang direct postcommit --- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index e69de29bb2d1..8b137891791f 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -0,0 +1 @@ + From a6bb7139967c807a13ac5d6487d4d5c6514337e4 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 22 Jan 2024 21:10:21 -0500 Subject: [PATCH 7/8] add comment --- sdks/python/apache_beam/io/gcp/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 01a0c2a4b492..4643c8ddf0a5 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2451,6 +2451,7 @@ def __getitem__(self, key): class StorageWriteToBigQuery(PTransform): """Writes data to BigQuery using Storage API. + Supports dynamic destinations. Dynamic schemas are not supported yet. Experimental; no backwards compatibility guarantees. """ From ef949d7f12507faa037308b13fc41fd2b85a82be Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 24 Jan 2024 14:04:43 -0500 Subject: [PATCH 8/8] add to changes.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index a741b87d429b..b3e00f178f25 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Added support for writing to BigQuery dynamic destinations with Python's Storage Write API ([#30045](https://github.com/apache/beam/pull/30045)) * Adding support for Tuples DataType in ClickHouse (Java) ([#29715](https://github.com/apache/beam/pull/29715)). * Added support for handling bad records to FileIO, TextIO, AvroIO ([#29670](https://github.com/apache/beam/pull/29670)). * Added support for handling bad records to BigtableIO ([#29885](https://github.com/apache/beam/pull/29885)).