diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index c237d03e052..944f33a596d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -42,13 +42,13 @@ @Experimental(Kind.SCHEMAS) public abstract class TypedSchemaTransformProvider implements SchemaTransformProvider { - abstract Class configurationClass(); + protected abstract Class configurationClass(); /** * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a * {@link InvalidSchemaException}. */ - abstract SchemaTransform from(ConfigT configuration); + protected abstract SchemaTransform from(ConfigT configuration); /** * List the dependencies needed for this transform. Jars from classpath are used by default when diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index e4018d3b6c1..744b4f3bf0b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -63,7 +63,7 @@ public String identifier() { } @Override - Class configurationClass() { + protected Class configurationClass() { return Configuration.class; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java new file mode 100644 index 00000000000..f964a87be16 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Configuration for reading from BigQuery. + * + *

This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class BigQuerySchemaTransformReadConfiguration { + + /** Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder}. */ + public static Builder builder() { + return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder(); + } + + private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); + private static final TypeDescriptor TYPE_DESCRIPTOR = + TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class); + private static final SerializableFunction + ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR); + + /** Serializes configuration to a {@link Row}. */ + Row toBeamRow() { + return ROW_SERIALIZABLE_FUNCTION.apply(this); + } + + /** Configures the BigQuery read job with the SQL query. */ + @Nullable + public abstract String getQuery(); + + /** + * Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for + * more details on the expected format. + */ + @Nullable + public abstract String getTableSpec(); + + /** BigQuery geographic location where the query job will be executed. */ + @Nullable + public abstract String getQueryLocation(); + + /** Enables BigQuery's Standard SQL dialect when reading from a query. */ + @Nullable + public abstract Boolean getUseStandardSql(); + + @AutoValue.Builder + public abstract static class Builder { + + /** Configures the BigQuery read job with the SQL query. */ + public abstract Builder setQuery(String value); + + /** + * Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for + * more details on the expected format. + */ + public abstract Builder setTableSpec(String value); + + /** BigQuery geographic location where the query job will be executed. */ + public abstract Builder setQueryLocation(String value); + + /** Enables BigQuery's Standard SQL dialect when reading from a query. */ + public abstract Builder setUseStandardSql(Boolean value); + + /** Builds the {@link BigQuerySchemaTransformReadConfiguration} configuration. */ + public abstract BigQuerySchemaTransformReadConfiguration build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java new file mode 100644 index 00000000000..3ffc4dfa1c0 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using + * {@link BigQuerySchemaTransformReadConfiguration}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +@Internal +@Experimental(Kind.SCHEMAS) +public class BigQuerySchemaTransformReadProvider + extends TypedSchemaTransformProvider { + + private static final String API = "bigquery"; + private static final String OUTPUT_TAG = "OUTPUT"; + + /** Returns the expected class of the configuration. */ + @Override + protected Class configurationClass() { + return BigQuerySchemaTransformReadConfiguration.class; + } + + /** Returns the expected {@link SchemaTransform} of the configuration. */ + @Override + protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) { + return new BigQueryReadSchemaTransform(configuration); + } + + /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ + @Override + public String identifier() { + return String.format("%s:read", API); + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since + * no input is expected, this returns an empty list. + */ + @Override + public List inputCollectionNames() { + return Collections.emptyList(); + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since + * a single output is expected, this returns a list with a single name. + */ + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + /** + * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link + * BigQuerySchemaTransformReadConfiguration}. + */ + private static class BigQueryReadSchemaTransform implements SchemaTransform { + private final BigQuerySchemaTransformReadConfiguration configuration; + + BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) { + this.configuration = configuration; + } + + /** Implements {@link SchemaTransform} buildTransform method. */ + @Override + public PTransform buildTransform() { + return new PCollectionRowTupleTransform(configuration); + } + } + + /** + * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link + * BigQuerySchemaTransformReadConfiguration}. + */ + static class PCollectionRowTupleTransform + extends PTransform { + + private final BigQuerySchemaTransformReadConfiguration configuration; + + /** An instance of {@link BigQueryServices} used for testing. */ + private BigQueryServices testBigQueryServices = null; + + PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) { + this.configuration = configuration; + } + + @VisibleForTesting + void setTestBigQueryServices(BigQueryServices testBigQueryServices) { + this.testBigQueryServices = testBigQueryServices; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + if (!input.getAll().isEmpty()) { + throw new IllegalArgumentException( + String.format( + "%s %s input is expected to be empty", + input.getClass().getSimpleName(), getClass().getSimpleName())); + } + + BigQueryIO.TypedRead read = toTypedRead(); + if (testBigQueryServices != null) { + read = read.withTestServices(testBigQueryServices).withoutValidation(); + } + + PCollection tableRowPCollection = input.getPipeline().apply(read); + Schema schema = tableRowPCollection.getSchema(); + PCollection rowPCollection = + tableRowPCollection.apply( + MapElements.into(TypeDescriptor.of(Row.class)) + .via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow))); + return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection.setRowSchema(schema)); + } + + BigQueryIO.TypedRead toTypedRead() { + BigQueryIO.TypedRead read = BigQueryIO.readTableRowsWithSchema(); + + if (!Strings.isNullOrEmpty(configuration.getQuery())) { + read = read.fromQuery(configuration.getQuery()); + } + + if (!Strings.isNullOrEmpty(configuration.getTableSpec())) { + read = read.from(configuration.getTableSpec()); + } + + if (configuration.getUseStandardSql() != null && configuration.getUseStandardSql()) { + read = read.usingStandardSql(); + } + + if (!Strings.isNullOrEmpty(configuration.getQueryLocation())) { + read = read.withQueryLocation(configuration.getQueryLocation()); + } + + return read; + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java new file mode 100644 index 00000000000..5cbea3c49f0 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Configuration for writing to BigQuery. + * + *

This class is meant to be used with {@link BigQuerySchemaTransformWriteProvider}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class BigQuerySchemaTransformWriteConfiguration { + + /** Instantiates a {@link BigQuerySchemaTransformWriteConfiguration.Builder}. */ + public static Builder builder() { + return new AutoValue_BigQuerySchemaTransformWriteConfiguration.Builder(); + } + + private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); + private static final TypeDescriptor TYPE_DESCRIPTOR = + TypeDescriptor.of(BigQuerySchemaTransformWriteConfiguration.class); + private static final SerializableFunction + ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR); + + /** + * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the + * expected format. + */ + public abstract String getTableSpec(); + + /** Specifies whether the table should be created if it does not exist. */ + public abstract String getCreateDisposition(); + + /** Specifies what to do with existing data in the table, in case the table already exists. */ + public abstract String getWriteDisposition(); + + /** Serializes configuration to a {@link Row}. */ + Row toBeamRow() { + return ROW_SERIALIZABLE_FUNCTION.apply(this); + } + + @AutoValue.Builder + public abstract static class Builder { + + /** + * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the + * expected format. + */ + public abstract Builder setTableSpec(String value); + + /** Specifies whether the table should be created if it does not exist. */ + public abstract Builder setCreateDisposition(String value); + + /** Specifies what to do with existing data in the table, in case the table already exists. */ + public abstract Builder setWriteDisposition(String value); + + /** Builds the {@link BigQuerySchemaTransformWriteConfiguration} configuration. */ + public abstract BigQuerySchemaTransformWriteConfiguration build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java new file mode 100644 index 00000000000..bd8d6adbd1a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured + * using {@link BigQuerySchemaTransformWriteConfiguration}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +@Internal +@Experimental(Kind.SCHEMAS) +public class BigQuerySchemaTransformWriteProvider + extends TypedSchemaTransformProvider { + + private static final String API = "bigquery"; + static final String INPUT_TAG = "INPUT"; + + /** Returns the expected class of the configuration. */ + @Override + protected Class configurationClass() { + return BigQuerySchemaTransformWriteConfiguration.class; + } + + /** Returns the expected {@link SchemaTransform} of the configuration. */ + @Override + protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) { + return new BigQueryWriteSchemaTransform(configuration); + } + + /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ + @Override + public String identifier() { + return String.format("%s:write", API); + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a + * single is expected, this returns a list with a single name. + */ + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since + * no output is expected, this returns an empty list. + */ + @Override + public List outputCollectionNames() { + return Collections.emptyList(); + } + + /** + * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link + * BigQuerySchemaTransformWriteConfiguration}. + */ + private static class BigQueryWriteSchemaTransform implements SchemaTransform { + private final BigQuerySchemaTransformWriteConfiguration configuration; + + BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration configuration) { + this.configuration = configuration; + } + + /** + * Overrides {@link SchemaTransform#buildTransform()} by returning a {@link + * PCollectionRowTupleTransform}. + */ + @Override + public PTransform buildTransform() { + return new PCollectionRowTupleTransform(configuration); + } + } + + /** + * An implementation of {@link PTransform} for BigQuery write jobs configured using {@link + * BigQuerySchemaTransformWriteConfiguration}. + */ + static class PCollectionRowTupleTransform + extends PTransform { + + private final BigQuerySchemaTransformWriteConfiguration configuration; + + /** An instance of {@link BigQueryServices} used for testing. */ + private BigQueryServices testBigQueryServices = null; + + PCollectionRowTupleTransform(BigQuerySchemaTransformWriteConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public void validate(PipelineOptions options) { + if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) { + return; + } + + BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class); + + BigQueryServices bigQueryServices = new BigQueryServicesImpl(); + if (testBigQueryServices != null) { + bigQueryServices = testBigQueryServices; + } + + DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions); + TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec()); + + try { + Table table = datasetService.getTable(tableReference); + if (table == null) { + throw new NullPointerException(); + } + + if (table.getSchema() == null) { + throw new InvalidConfigurationException( + String.format("could not fetch schema for table: %s", configuration.getTableSpec())); + } + + } catch (NullPointerException | InterruptedException | IOException ex) { + throw new InvalidConfigurationException( + String.format( + "could not fetch table %s, error: %s", + configuration.getTableSpec(), ex.getMessage())); + } + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + validate(input); + PCollection rowPCollection = input.get(INPUT_TAG); + Schema schema = rowPCollection.getSchema(); + BigQueryIO.Write write = toWrite(schema); + if (testBigQueryServices != null) { + write = write.withTestServices(testBigQueryServices); + } + + PCollection tableRowPCollection = + rowPCollection.apply( + MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow)); + tableRowPCollection.apply(write); + return PCollectionRowTuple.empty(input.getPipeline()); + } + + /** Instantiates a {@link BigQueryIO.Write} from a {@link Schema}. */ + BigQueryIO.Write toWrite(Schema schema) { + TableSchema tableSchema = BigQueryUtils.toTableSchema(schema); + CreateDisposition createDisposition = + CreateDisposition.valueOf(configuration.getCreateDisposition()); + WriteDisposition writeDisposition = + WriteDisposition.valueOf(configuration.getWriteDisposition()); + + return BigQueryIO.writeTableRows() + .to(configuration.getTableSpec()) + .withCreateDisposition(createDisposition) + .withWriteDisposition(writeDisposition) + .withSchema(tableSchema); + } + + /** Setter for testing using {@link BigQueryServices}. */ + @VisibleForTesting + void setTestBigQueryServices(BigQueryServices testBigQueryServices) { + this.testBigQueryServices = testBigQueryServices; + } + + /** Validate a {@link PCollectionRowTuple} input. */ + void validate(PCollectionRowTuple input) { + if (!input.has(INPUT_TAG)) { + throw new IllegalArgumentException( + String.format( + "%s %s is missing expected tag: %s", + getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG)); + } + + PCollection rowInput = input.get(INPUT_TAG); + Schema sourceSchema = rowInput.getSchema(); + + if (sourceSchema == null) { + throw new IllegalArgumentException( + String.format("%s is null for input of tag: %s", Schema.class, INPUT_TAG)); + } + + if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) { + return; + } + + BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + + BigQueryServices bigQueryServices = new BigQueryServicesImpl(); + if (testBigQueryServices != null) { + bigQueryServices = testBigQueryServices; + } + + DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions); + TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec()); + + try { + Table table = datasetService.getTable(tableReference); + if (table == null) { + throw new NullPointerException(); + } + + TableSchema tableSchema = table.getSchema(); + if (tableSchema == null) { + throw new NullPointerException(); + } + + Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema); + if (destinationSchema == null) { + throw new NullPointerException(); + } + + validateMatching(sourceSchema, destinationSchema); + + } catch (NullPointerException | InterruptedException | IOException e) { + throw new InvalidConfigurationException( + String.format( + "could not validate input for create disposition: %s and table: %s, error: %s", + configuration.getCreateDisposition(), + configuration.getTableSpec(), + e.getMessage())); + } + } + + void validateMatching(Schema sourceSchema, Schema destinationSchema) { + if (!sourceSchema.equals(destinationSchema)) { + throw new IllegalArgumentException( + String.format( + "source and destination schema mismatch for table: %s", + configuration.getTableSpec())); + } + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java new file mode 100644 index 00000000000..5bca0cb2ad3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; +import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform; +import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; +import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; +import org.apache.beam.sdk.io.gcp.testing.FakeJobService; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Identifier; +import org.apache.beam.sdk.transforms.display.DisplayData.Item; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test for {@link BigQuerySchemaTransformReadProvider}. */ +@RunWith(JUnit4.class) +public class BigQuerySchemaTransformReadProviderTest { + private static final String PROJECT = "fakeproject"; + private static final String DATASET = "fakedataset"; + private static final String TABLE_ID = "faketable"; + + private static final String QUERY = "select * from `fakeproject.fakedataset.faketable`"; + private static final String LOCATION = "kingdom-of-figaro"; + + private static final TableReference TABLE_REFERENCE = + new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID); + + private static final String TABLE_SPEC = BigQueryHelpers.toTableSpec(TABLE_REFERENCE); + + private static final Schema SCHEMA = + Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64)); + + private static final List RECORDS = + Arrays.asList( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L)); + + private static final List ROWS = + Arrays.asList( + Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(), + Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(), + Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build()); + + private static final TableSchema TABLE_SCHEMA = BigQueryUtils.toTableSchema(SCHEMA); + private static final BigQueryOptions OPTIONS = + TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + private final FakeDatasetService fakeDatasetService = new FakeDatasetService(); + private final FakeJobService fakeJobService = new FakeJobService(); + private final Table fakeTable = new Table(); + private final TemporaryFolder temporaryFolder = new TemporaryFolder(); + private final FakeBigQueryServices fakeBigQueryServices = + new FakeBigQueryServices() + .withJobService(fakeJobService) + .withDatasetService(fakeDatasetService); + + @Before + public void setUp() throws IOException, InterruptedException, ExecutionException { + FakeDatasetService.setUp(); + FakeJobService.setUp(); + BigQueryIO.clearStaticCaches(); + fakeTable.setSchema(TABLE_SCHEMA); + fakeTable.setTableReference(TABLE_REFERENCE); + fakeDatasetService.createDataset(PROJECT, DATASET, LOCATION, "", null); + fakeDatasetService.createTable(fakeTable); + fakeDatasetService.insertAll(fakeTable.getTableReference(), RECORDS, null); + temporaryFolder.create(); + OPTIONS.setProject(PROJECT); + OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath()); + } + + @After + public void tearDown() { + temporaryFolder.delete(); + } + + @Rule + public transient TestPipeline p = + TestPipeline.fromOptions(OPTIONS).enableAbandonedNodeEnforcement(false); + + @Test + public void testQuery() { + // Previous attempts using FakeBigQueryServices with a Read configuration using a query failed. + // For now, we test using DisplayData and the toTypedRead method. + List>> cases = + Arrays.asList( + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder().setQuery(QUERY), + BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY)), + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder() + .setQuery(QUERY) + .setQueryLocation(LOCATION), + BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY).withQueryLocation(LOCATION)), + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder() + .setQuery(QUERY) + .setUseStandardSql(true), + BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY).usingStandardSql()), + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder() + .setQuery(QUERY) + .setUseStandardSql(false), + BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY))); + + for (Pair> caze : cases) { + Map want = DisplayData.from(caze.getRight()).asMap(); + SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider(); + BigQuerySchemaTransformReadConfiguration configuration = caze.getLeft().build(); + Row configurationRow = configuration.toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PCollectionRowTupleTransform pCollectionRowTupleTransform = + (PCollectionRowTupleTransform) schemaTransform.buildTransform(); + Map got = + DisplayData.from(pCollectionRowTupleTransform.toTypedRead()).asMap(); + assertEquals(want, got); + } + } + + @Test + public void testExtract() { + SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider(); + BigQuerySchemaTransformReadConfiguration configuration = + BigQuerySchemaTransformReadConfiguration.builder().setTableSpec(TABLE_SPEC).build(); + Row configurationRow = configuration.toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PCollectionRowTupleTransform pCollectionRowTupleTransform = + (PCollectionRowTupleTransform) schemaTransform.buildTransform(); + + pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + PCollectionRowTuple input = PCollectionRowTuple.empty(p); + String tag = provider.outputCollectionNames().get(0); + PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + assertTrue(output.has(tag)); + PCollection got = output.get(tag); + PAssert.that(got).containsInAnyOrder(ROWS); + + p.run(); + } + + @Test + public void testInvalidConfiguration() { + SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider(); + for (Pair< + BigQuerySchemaTransformReadConfiguration.Builder, + ? extends Class> + caze : + Arrays.asList( + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder(), + IllegalArgumentException.class), + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder() + .setQuery(QUERY) + .setTableSpec(TABLE_SPEC), + IllegalStateException.class), + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder().setQueryLocation(LOCATION), + IllegalArgumentException.class), + Pair.of( + BigQuerySchemaTransformReadConfiguration.builder().setUseStandardSql(true), + IllegalArgumentException.class))) { + Row configurationRow = caze.getLeft().build().toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PCollectionRowTupleTransform pCollectionRowTupleTransform = + (PCollectionRowTupleTransform) schemaTransform.buildTransform(); + pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + PCollectionRowTuple empty = PCollectionRowTuple.empty(p); + assertThrows(caze.getRight(), () -> empty.apply(pCollectionRowTupleTransform)); + } + } + + @Test + public void testInvalidInput() { + SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider(); + BigQuerySchemaTransformReadConfiguration configuration = + BigQuerySchemaTransformReadConfiguration.builder().setTableSpec(TABLE_SPEC).build(); + Row configurationRow = configuration.toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PCollectionRowTupleTransform pCollectionRowTupleTransform = + (PCollectionRowTupleTransform) schemaTransform.buildTransform(); + + pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + PCollectionRowTuple input = PCollectionRowTuple.of("badinput", p.apply(Create.of(ROWS))); + assertThrows(IllegalArgumentException.class, () -> input.apply(pCollectionRowTupleTransform)); + } + + private void assertEquals(Map want, Map got) { + Set keys = new HashSet<>(); + keys.addAll(want.keySet()); + keys.addAll(got.keySet()); + for (Identifier key : keys) { + Item wantItem = null; + Item gotItem = null; + if (want.containsKey(key)) { + wantItem = want.get(key); + } + if (got.containsKey(key)) { + gotItem = got.get(key); + } + Assert.assertEquals(wantItem, gotItem); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java new file mode 100644 index 00000000000..b752b182ebd --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformWriteProvider.INPUT_TAG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformWriteProvider.PCollectionRowTupleTransform; +import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; +import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; +import org.apache.beam.sdk.io.gcp.testing.FakeJobService; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Identifier; +import org.apache.beam.sdk.transforms.display.DisplayData.Item; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** Test for {@link BigQuerySchemaTransformWriteProvider}. */ +public class BigQuerySchemaTransformWriteProviderTest { + + private static final String PROJECT = "fakeproject"; + private static final String DATASET = "fakedataset"; + private static final String TABLE_ID = "faketable"; + + private static final TableReference TABLE_REFERENCE = + new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID); + + private static final Schema SCHEMA = + Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64)); + + private static final TableSchema TABLE_SCHEMA = BigQueryUtils.toTableSchema(SCHEMA); + + private static final List ROWS = + Arrays.asList( + Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(), + Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(), + Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build()); + + private static final BigQueryOptions OPTIONS = + TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + private final FakeDatasetService fakeDatasetService = new FakeDatasetService(); + private final FakeJobService fakeJobService = new FakeJobService(); + private final TemporaryFolder temporaryFolder = new TemporaryFolder(); + private final FakeBigQueryServices fakeBigQueryServices = + new FakeBigQueryServices() + .withJobService(fakeJobService) + .withDatasetService(fakeDatasetService); + + @Before + public void setUp() throws IOException, InterruptedException { + FakeDatasetService.setUp(); + fakeDatasetService.createDataset(PROJECT, DATASET, "", "", null); + temporaryFolder.create(); + OPTIONS.setProject(PROJECT); + OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath()); + } + + @After + public void tearDown() { + temporaryFolder.delete(); + } + + @Rule public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS); + + @Test + public void testLoad() throws IOException, InterruptedException { + SchemaTransformProvider provider = new BigQuerySchemaTransformWriteProvider(); + BigQuerySchemaTransformWriteConfiguration configuration = + BigQuerySchemaTransformWriteConfiguration.builder() + .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) + .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()) + .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) + .build(); + Row configurationRow = configuration.toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PCollectionRowTupleTransform pCollectionRowTupleTransform = + (PCollectionRowTupleTransform) schemaTransform.buildTransform(); + pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + String tag = provider.inputCollectionNames().get(0); + PCollectionRowTuple input = + PCollectionRowTuple.of(tag, p.apply(Create.of(ROWS).withRowSchema(SCHEMA))); + input.apply(pCollectionRowTupleTransform); + + p.run(); + + assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE)); + assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, TABLE_ID).size()); + } + + @Test + public void testValidatePipelineOptions() { + List>> + cases = + Arrays.asList( + Pair.of( + BigQuerySchemaTransformWriteConfiguration.builder() + .setTableSpec("project.doesnot.exist") + .setCreateDisposition(CreateDisposition.CREATE_NEVER.name()) + .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()), + InvalidConfigurationException.class), + Pair.of( + BigQuerySchemaTransformWriteConfiguration.builder() + .setTableSpec(String.format("%s.%s.%s", PROJECT, DATASET, "doesnotexist")) + .setCreateDisposition(CreateDisposition.CREATE_NEVER.name()) + .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()), + InvalidConfigurationException.class), + Pair.of( + BigQuerySchemaTransformWriteConfiguration.builder() + .setTableSpec("project.doesnot.exist") + .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) + .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()), + null)); + for (Pair> caze : + cases) { + PCollectionRowTupleTransform transform = transformFrom(caze.getLeft().build()); + if (caze.getRight() != null) { + assertThrows(caze.getRight(), () -> transform.validate(p.getOptions())); + } else { + transform.validate(p.getOptions()); + } + } + } + + @Test + public void testToWrite() { + List>> + cases = + Arrays.asList( + Pair.of( + BigQuerySchemaTransformWriteConfiguration.builder() + .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) + .setCreateDisposition(CreateDisposition.CREATE_NEVER.name()) + .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()), + BigQueryIO.writeTableRows() + .to(TABLE_REFERENCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_EMPTY) + .withSchema(TABLE_SCHEMA)), + Pair.of( + BigQuerySchemaTransformWriteConfiguration.builder() + .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) + .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) + .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()), + BigQueryIO.writeTableRows() + .to(TABLE_REFERENCE) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE) + .withSchema(TABLE_SCHEMA))); + for (Pair> caze : + cases) { + PCollectionRowTupleTransform transform = transformFrom(caze.getLeft().build()); + Map gotDisplayData = DisplayData.from(transform.toWrite(SCHEMA)).asMap(); + Map wantDisplayData = DisplayData.from(caze.getRight()).asMap(); + Set keys = new HashSet<>(); + keys.addAll(gotDisplayData.keySet()); + keys.addAll(wantDisplayData.keySet()); + for (Identifier key : keys) { + Item got = null; + Item want = null; + if (gotDisplayData.containsKey(key)) { + got = gotDisplayData.get(key); + } + if (wantDisplayData.containsKey(key)) { + want = wantDisplayData.get(key); + } + assertEquals(want, got); + } + } + } + + @Test + public void validatePCollectionRowTupleInput() { + PCollectionRowTuple empty = PCollectionRowTuple.empty(p); + PCollectionRowTuple valid = + PCollectionRowTuple.of( + INPUT_TAG, p.apply("CreateRowsWithValidSchema", Create.of(ROWS)).setRowSchema(SCHEMA)); + + PCollectionRowTuple invalid = + PCollectionRowTuple.of( + INPUT_TAG, + p.apply( + "CreateRowsWithInvalidSchema", + Create.of( + Row.nullRow( + Schema.builder().addNullableField("name", FieldType.STRING).build())))); + + PCollectionRowTupleTransform transform = + transformFrom( + BigQuerySchemaTransformWriteConfiguration.builder() + .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) + .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) + .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()) + .build()); + + assertThrows(IllegalArgumentException.class, () -> transform.validate(empty)); + + assertThrows(IllegalStateException.class, () -> transform.validate(invalid)); + + transform.validate(valid); + + p.run(); + } + + private PCollectionRowTupleTransform transformFrom( + BigQuerySchemaTransformWriteConfiguration configuration) { + SchemaTransformProvider provider = new BigQuerySchemaTransformWriteProvider(); + PCollectionRowTupleTransform transform = + (PCollectionRowTupleTransform) provider.from(configuration.toBeamRow()).buildTransform(); + + transform.setTestBigQueryServices(fakeBigQueryServices); + + return transform; + } +}