From c1e7734d8d8906a1d8b4677733486826d2838540 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 14:51:57 -0400 Subject: [PATCH 01/26] support creating dataset in specified region; do it for BigQueryToTableIT --- .../beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java | 6 ++++++ .../apache/beam/sdk/io/gcp/testing/BigqueryClient.java | 10 +++++++++- .../beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java | 8 +++++--- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java index 3574c12ee3a9..7000306e62f1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java @@ -30,4 +30,10 @@ public interface TestBigQueryOptions extends TestPipelineOptions, BigQueryOption String getTargetDataset(); void setTargetDataset(String value); + + @Description("Region to perform BigQuery operations in.") + @Default.String("") + String getBigQueryLocation(); + + void setBigQueryLocation(String location); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java index b21fdd669596..281dbf0756d6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java @@ -395,6 +395,13 @@ public void createNewDataset(String projectId, String datasetId) public void createNewDataset( String projectId, String datasetId, @Nullable Long defaultTableExpirationMs) throws IOException, InterruptedException { + createNewDataset(projectId, datasetId, defaultTableExpirationMs, null); + } + + /** Creates a new dataset with defaultTableExpirationMs and in a specified location. */ + public void createNewDataset( + String projectId, String datasetId, @Nullable Long defaultTableExpirationMs, @Nullable String location) + throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); IOException lastException = null; @@ -410,7 +417,8 @@ public void createNewDataset( projectId, new Dataset() .setDatasetReference(new DatasetReference().setDatasetId(datasetId)) - .setDefaultTableExpirationMs(defaultTableExpirationMs)) + .setDefaultTableExpirationMs(defaultTableExpirationMs) + .setLocation(location)) .execute(); if (response != null) { LOG.info("Successfully created new dataset : " + response.getId()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java index d6b7f8e16412..32e6476a291a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.testing.TestPipeline; @@ -214,7 +215,7 @@ private void verifyStandardQueryRes(String outputTable) throws Exception { } /** Customized PipelineOption for BigQueryToTable Pipeline. */ - public interface BigQueryToTableOptions extends TestPipelineOptions, ExperimentalOptions { + public interface BigQueryToTableOptions extends TestBigQueryOptions, ExperimentalOptions { @Description("The BigQuery query to be used for creating the source") @Validation.Required @@ -252,9 +253,10 @@ public interface BigQueryToTableOptions extends TestPipelineOptions, Experimenta @BeforeClass public static void setupTestEnvironment() throws Exception { PipelineOptionsFactory.register(BigQueryToTableOptions.class); - project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + BigQueryToTableOptions options = TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class); + project = options.as(GcpOptions.class).getProject(); // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID); + BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID, null, options.getBigQueryLocation()); // Create table and insert data for new type query test cases. BQ_CLIENT.createNewTable( From 75615ba77b47f10903108691a69a366636b8b610 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 15:01:34 -0400 Subject: [PATCH 02/26] BigQueryIOStorageWriteIT --- .../bigquery/BigQueryIOStorageWriteIT.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java index 81de67f38502..690888a4c5c4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java @@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; +import java.security.SecureRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; @@ -36,6 +37,8 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -53,24 +56,34 @@ private enum WriteMode { AT_LEAST_ONCE } - private String project; - private static final String DATASET_ID = "big_query_storage"; + private static String project; + private static final String DATASET_ID = "big_query_storage_write_it_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); private static final String TABLE_PREFIX = "storage_write_"; - private BigQueryOptions bqOptions; + private static TestBigQueryOptions bqOptions; private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT"); + @BeforeClass + public static void setup() throws Exception { + PipelineOptionsFactory.register(TestBigQueryOptions.class); + bqOptions = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class); + project = bqOptions.as(GcpOptions.class).getProject(); + // Create one BQ dataset for all test cases. + BQ_CLIENT.createNewDataset(project, DATASET_ID, null, bqOptions.getBigQueryLocation()); + } + + @AfterClass + public static void cleanup() { + BQ_CLIENT.deleteDataset(project, DATASET_ID); + } + private void setUpTestEnvironment(WriteMode writeMode) { - PipelineOptionsFactory.register(BigQueryOptions.class); - bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject()); bqOptions.setUseStorageWriteApi(true); if (writeMode == WriteMode.AT_LEAST_ONCE) { bqOptions.setUseStorageWriteApiAtLeastOnce(true); } bqOptions.setNumStorageWriteApiStreams(2); bqOptions.setStorageWriteApiTriggeringFrequencySec(1); - project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); } static class FillRowFn extends DoFn { From c5de23ab977510a24b2b70db7e7d3ca17591c41b Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 15:36:21 -0400 Subject: [PATCH 03/26] add options to registrar; StorageApiDirectWriteProtosIT --- .../beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java | 2 ++ .../sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java index 1ed9ed6cb6c3..f1ff827fc633 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java @@ -20,6 +20,7 @@ import com.google.auto.service.AutoService; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; +import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions; import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -36,6 +37,7 @@ public Iterable> getPipelineOptions() { .add(BigQueryOptions.class) .add(PubsubOptions.class) .add(FirestoreOptions.class) + .add(TestBigQueryOptions.class) .build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java index 93bc4162409f..31538db34bdc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; @@ -82,8 +83,9 @@ private BigQueryIO.Write.Method getMethod() { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { + PipelineOptionsFactory.register(TestBigQueryOptions.class); // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass From ec12f4119e96de23d09c40cc7a0eba21a30ceb1e Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 15:42:02 -0400 Subject: [PATCH 04/26] StorageApiSinkFailedRowsIT --- .../beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java | 1 - .../beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java | 1 - .../beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java index 690888a4c5c4..c34074b1cf8c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java @@ -65,7 +65,6 @@ private enum WriteMode { @BeforeClass public static void setup() throws Exception { - PipelineOptionsFactory.register(TestBigQueryOptions.class); bqOptions = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class); project = bqOptions.as(GcpOptions.class).getProject(); // Create one BQ dataset for all test cases. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java index 31538db34bdc..6025883be060 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java @@ -83,7 +83,6 @@ private BigQueryIO.Write.Method getMethod() { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { - PipelineOptionsFactory.register(TestBigQueryOptions.class); // Create one BQ dataset for all test cases. BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java index 3dcde8f39cd7..627c0aba7199 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java @@ -111,7 +111,7 @@ private BigQueryIO.Write.Method getMethod() { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass From 764389596181ac75e7e8d9cfcd8ab5b8fd6383d3 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 15:43:08 -0400 Subject: [PATCH 05/26] StorageApiSinkRowUpdateIT --- .../beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java index d5366fe29613..751c7cf0e588 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java @@ -52,7 +52,7 @@ public class StorageApiSinkRowUpdateIT { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass From 0b4a183f7217def0e60ed879d7268589342a89e5 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 15:43:52 -0400 Subject: [PATCH 06/26] StorageApiSinkSchemaUpdateIT --- .../beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index 6931b7ac9b98..03dcc78c2a9e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -131,7 +131,7 @@ public static Iterable data() { public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. LOG.info("Creating dataset {}.", BIG_QUERY_DATASET_ID); - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass From 3beb96cf41f6df25559339134439b4296a2435a6 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 15:44:30 -0400 Subject: [PATCH 07/26] TableRowToStorageApiProtoIT --- .../beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java index 218aa7411414..77a78b1b1723 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java @@ -321,7 +321,7 @@ public class TableRowToStorageApiProtoIT { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass From 3545d65c4fd3a07be0b7175263d2f7dad407f33c Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 15:53:25 -0400 Subject: [PATCH 08/26] BigQuerySchemaUpdateOptionsIT; BigQueryTimePartitioningClusteringIT --- .../BigQuerySchemaUpdateOptionsIT.java | 2 +- .../BigQueryTimePartitioningClusteringIT.java | 24 ++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java index 611c691dca12..2c26975d47d9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java @@ -87,7 +87,7 @@ public class BigQuerySchemaUpdateOptionsIT { @BeforeClass public static void setupTestEnvironment() throws Exception { project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); - BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID); + BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java index 3ceb6f0966b7..515cbee5ecf8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java @@ -24,9 +24,12 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; + +import java.security.SecureRandom; import java.util.Arrays; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -38,8 +41,10 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,7 +54,13 @@ public class BigQueryTimePartitioningClusteringIT { private static final String WEATHER_SAMPLES_TABLE = "apache-beam-testing.samples.weather_stations"; - private static final String DATASET_NAME = "BigQueryTimePartitioningIT"; + + private static String project; + private static final BigqueryClient BQ_CLIENT = + new BigqueryClient("BigQueryTimePartitioningClusteringIT"); + private static final String DATASET_NAME = "BigQueryTimePartitioningIT_" + System.currentTimeMillis() + + "_" + + new SecureRandom().nextInt(32); private static final TimePartitioning TIME_PARTITIONING = new TimePartitioning().setField("date").setType("DAY"); private static final Clustering CLUSTERING = @@ -64,6 +75,12 @@ public class BigQueryTimePartitioningClusteringIT { private Bigquery bqClient; private BigQueryClusteringITOptions options; + @BeforeClass + public static void setupTestEnvironment() throws Exception { + project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + BQ_CLIENT.createNewDataset(project, DATASET_NAME, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + } + @Before public void setUp() { PipelineOptionsFactory.register(BigQueryClusteringITOptions.class); @@ -72,6 +89,11 @@ public void setUp() { bqClient = BigqueryClient.getNewBigqueryClient(options.getAppName()); } + @AfterClass + public static void cleanup() { + BQ_CLIENT.deleteDataset(project, DATASET_NAME); + } + /** Customized PipelineOptions for BigQueryClustering Integration Test. */ public interface BigQueryClusteringITOptions extends TestPipelineOptions, ExperimentalOptions, BigQueryOptions { From c219e51f6498ecee6401cfb7455dcb2510fc317f Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 8 Sep 2023 11:47:55 -0400 Subject: [PATCH 09/26] BigQueryIOStorageQueryIT; BigQueryIOStorageReadIT --- .../beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java | 4 +++- .../beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index 692a12c0f4a7..820718609257 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -52,7 +52,9 @@ public class BigQueryIOStorageQueryIT { "1G", 11110839L, "1T", 11110839000L); - private static final String DATASET_ID = "big_query_storage"; + private static String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? + "big_query_storage_day0" : + "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; private BigQueryIOStorageQueryOptions options; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index 570938470b9d..94560ee4f114 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -65,7 +65,9 @@ public class BigQueryIOStorageReadIT { "1T", 11110839000L, "multi_field", 11110839L); - private static final String DATASET_ID = "big_query_storage"; + private static String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? + "big_query_storage_day0" : + "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; private BigQueryIOStorageReadOptions options; From 02b272f6d26ba49c9b4948c25636b8370c224a1f Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 11 Sep 2023 12:31:58 -0400 Subject: [PATCH 10/26] BigQueryIOStorageReadTableRowIT --- .../bigquery/BigQueryIOStorageQueryIT.java | 2 +- .../gcp/bigquery/BigQueryIOStorageReadIT.java | 2 +- .../BigQueryIOStorageReadTableRowIT.java | 37 ++++++++++--------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index 820718609257..9df64acd4d42 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -52,7 +52,7 @@ public class BigQueryIOStorageQueryIT { "1G", 11110839L, "1T", 11110839000L); - private static String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? + private static final String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? "big_query_storage_day0" : "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index 94560ee4f114..f657c1ff1b8d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -65,7 +65,7 @@ public class BigQueryIOStorageReadIT { "1T", 11110839000L, "multi_field", 11110839L); - private static String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? + private static final String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? "big_query_storage_day0" : "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java index 734c3af2c4d4..2c00df9b0fe2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java @@ -52,7 +52,9 @@ @RunWith(JUnit4.class) public class BigQueryIOStorageReadTableRowIT { - private static final String DATASET_ID = "big_query_import_export"; + private static final String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7") ? + "big_query_import_export_day0": + "big_query_import_export"; private static final String TABLE_PREFIX = "parallel_read_table_row_"; private BigQueryIOStorageReadTableRowOptions options; @@ -67,12 +69,11 @@ public interface BigQueryIOStorageReadTableRowOptions void setInputTable(String table); } - private static class TableRowToKVPairFn extends SimpleFunction> { + private static class TableRowToKVPairFn extends SimpleFunction> { @Override - public KV apply(TableRow input) { - CharSequence sampleString = (CharSequence) input.get("sample_string"); - String key = sampleString != null ? sampleString.toString() : "null"; - return KV.of(key, BigQueryHelpers.toJsonString(input)); + public KV apply(TableRow input) { + Integer rowId = Integer.parseInt((String) input.get("id")); + return KV.of(rowId, BigQueryHelpers.toJsonString(input)); } } @@ -87,7 +88,7 @@ private void setUpTestEnvironment(String tableName) { private static void runPipeline(BigQueryIOStorageReadTableRowOptions pipelineOptions) { Pipeline pipeline = Pipeline.create(pipelineOptions); - PCollection> jsonTableRowsFromExport = + PCollection> jsonTableRowsFromExport = pipeline .apply( "ExportTable", @@ -96,7 +97,7 @@ private static void runPipeline(BigQueryIOStorageReadTableRowOptions pipelineOpt .withMethod(Method.EXPORT)) .apply("MapExportedRows", MapElements.via(new TableRowToKVPairFn())); - PCollection> jsonTableRowsFromDirectRead = + PCollection> jsonTableRowsFromDirectRead = pipeline .apply( "DirectReadTable", @@ -108,16 +109,16 @@ private static void runPipeline(BigQueryIOStorageReadTableRowOptions pipelineOpt final TupleTag exportTag = new TupleTag<>(); final TupleTag directReadTag = new TupleTag<>(); - PCollection>> unmatchedRows = + PCollection>> unmatchedRows = KeyedPCollectionTuple.of(exportTag, jsonTableRowsFromExport) .and(directReadTag, jsonTableRowsFromDirectRead) .apply(CoGroupByKey.create()) .apply( ParDo.of( - new DoFn, KV>>() { + new DoFn, KV>>() { @ProcessElement - public void processElement(ProcessContext c) throws Exception { - KV element = c.element(); + public void processElement(ProcessContext c) { + KV element = c.element(); // Add all the exported rows for the key to a collection. Set uniqueRows = new HashSet<>(); @@ -147,20 +148,20 @@ public void processElement(ProcessContext c) throws Exception { } @Test - public void testBigQueryStorageReadTableRow1() throws Exception { - setUpTestEnvironment("1"); + public void testBigQueryStorageReadTableRow100() { + setUpTestEnvironment("100"); runPipeline(options); } @Test - public void testBigQueryStorageReadTableRow10k() throws Exception { - setUpTestEnvironment("10k"); + public void testBigQueryStorageReadTableRow1k() { + setUpTestEnvironment("1K"); runPipeline(options); } @Test - public void testBigQueryStorageReadTableRow100k() throws Exception { - setUpTestEnvironment("100k"); + public void testBigQueryStorageReadTableRow10k() { + setUpTestEnvironment("10K"); runPipeline(options); } } From b6fa6c6344bad864b7adf8ced17e466e618a414d Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 11 Sep 2023 13:01:56 -0400 Subject: [PATCH 11/26] new gradle task --- .../io/google-cloud-platform/build.gradle | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 560b27aae162..56ecda808db3 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -202,10 +202,8 @@ task integrationTest(type: Test, dependsOn: processTestResources) { exclude '**/BigQueryIOReadIT.class' exclude '**/BigQueryIOStorageQueryIT.class' exclude '**/BigQueryIOStorageReadIT.class' - exclude '**/BigQueryIOStorageReadTableRowIT.class' exclude '**/BigQueryIOStorageWriteIT.class' exclude '**/BigQueryToTableIT.class' - exclude '**/BigQueryIOJsonTest.class' maxParallelForks 4 classpath = sourceSets.test.runtimeClasspath @@ -244,6 +242,41 @@ task integrationTestKms(type: Test) { } } +/* + Integration tests for BigQueryIO that run on BigQuery's early rollout region (us-east7) + with the intended purpose of catching breaking changes from new BigQuery releases. + If these tests fail here but not in `Java_GCP_IO_Direct`, there may be a new BigQuery change + that is breaking the connector. + */ +task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResources) { + group = "Verification" + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-bigquery-day0-tests' + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--runner=DirectRunner", + "--project=${gcpProject}", + "--tempRoot=${gcpTempRoot}", + "--bigQueryLocation=us-east7", + ]) + + outputs.upToDateWhen { false } + + include '**/BigQueryToTableIT.class' + include '**/BigQueryIOJsonIT.class' + include '**/BigQueryIOStorageReadTableRowIT.class' + include '**/StorageApiDirectWriteProtosIT.class' + include '**/StorageApiSinkFailedRowsIT.class' + include '**/StorageApiSinkRowUpdateIT.class' + include '**/StorageApiSinkSchemaUpdateIT.class' + include '**/TableRowToStorageApiProtoIT.class' + include '**/BigQuerySchemaUpdateOptionsIT.class' + include '**/BigQueryTimePartitioningClusteringIT.class' + + maxParallelForks 4 + classpath = sourceSets.test.runtimeClasspath + testClassesDirs = sourceSets.test.output.classesDirs +} + // path(s) for Cloud Spanner related classes def spannerIncludes = [ '**/org/apache/beam/sdk/io/gcp/spanner/**', From 0956fb4e16092cb48ed4ce35b71589065aa48495 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 11 Sep 2023 13:08:57 -0400 Subject: [PATCH 12/26] spotless --- .../beam/sdk/io/gcp/testing/BigqueryClient.java | 5 ++++- .../io/gcp/bigquery/BigQueryIOStorageQueryIT.java | 10 +++++++--- .../io/gcp/bigquery/BigQueryIOStorageReadIT.java | 10 +++++++--- .../bigquery/BigQueryIOStorageReadTableRowIT.java | 12 ++++++++---- .../io/gcp/bigquery/BigQueryIOStorageWriteIT.java | 7 +++++-- .../bigquery/BigQuerySchemaUpdateOptionsIT.java | 6 +++++- .../BigQueryTimePartitioningClusteringIT.java | 15 ++++++++++----- .../sdk/io/gcp/bigquery/BigQueryToTableIT.java | 5 ++--- .../bigquery/StorageApiDirectWriteProtosIT.java | 7 +++++-- .../gcp/bigquery/StorageApiSinkFailedRowsIT.java | 6 +++++- .../gcp/bigquery/StorageApiSinkRowUpdateIT.java | 6 +++++- .../bigquery/StorageApiSinkSchemaUpdateIT.java | 6 +++++- .../gcp/bigquery/TableRowToStorageApiProtoIT.java | 6 +++++- 13 files changed, 73 insertions(+), 28 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java index 281dbf0756d6..1639c64442b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java @@ -400,7 +400,10 @@ public void createNewDataset( /** Creates a new dataset with defaultTableExpirationMs and in a specified location. */ public void createNewDataset( - String projectId, String datasetId, @Nullable Long defaultTableExpirationMs, @Nullable String location) + String projectId, + String datasetId, + @Nullable Long defaultTableExpirationMs, + @Nullable String location) throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index 9df64acd4d42..c87cf440e2e4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -52,9 +52,13 @@ public class BigQueryIOStorageQueryIT { "1G", 11110839L, "1T", 11110839000L); - private static final String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? - "big_query_storage_day0" : - "big_query_storage"; + private static final String DATASET_ID = + TestPipeline.testingPipelineOptions() + .as(TestBigQueryOptions.class) + .getBigQueryLocation() + .equals("us-east7") + ? "big_query_storage_day0" + : "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; private BigQueryIOStorageQueryOptions options; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index f657c1ff1b8d..d0ff62da606b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -65,9 +65,13 @@ public class BigQueryIOStorageReadIT { "1T", 11110839000L, "multi_field", 11110839L); - private static final String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7")? - "big_query_storage_day0" : - "big_query_storage"; + private static final String DATASET_ID = + TestPipeline.testingPipelineOptions() + .as(TestBigQueryOptions.class) + .getBigQueryLocation() + .equals("us-east7") + ? "big_query_storage_day0" + : "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; private BigQueryIOStorageReadOptions options; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java index 2c00df9b0fe2..915394d136bf 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java @@ -52,9 +52,13 @@ @RunWith(JUnit4.class) public class BigQueryIOStorageReadTableRowIT { - private static final String DATASET_ID = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation().equals("us-east7") ? - "big_query_import_export_day0": - "big_query_import_export"; + private static final String DATASET_ID = + TestPipeline.testingPipelineOptions() + .as(TestBigQueryOptions.class) + .getBigQueryLocation() + .equals("us-east7") + ? "big_query_import_export_day0" + : "big_query_import_export"; private static final String TABLE_PREFIX = "parallel_read_table_row_"; private BigQueryIOStorageReadTableRowOptions options; @@ -72,7 +76,7 @@ public interface BigQueryIOStorageReadTableRowOptions private static class TableRowToKVPairFn extends SimpleFunction> { @Override public KV apply(TableRow input) { - Integer rowId = Integer.parseInt((String) input.get("id")); + Integer rowId = Integer.parseInt((String) input.get("id")); return KV.of(rowId, BigQueryHelpers.toJsonString(input)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java index c34074b1cf8c..7134bc3bb553 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -57,7 +56,11 @@ private enum WriteMode { } private static String project; - private static final String DATASET_ID = "big_query_storage_write_it_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); + private static final String DATASET_ID = + "big_query_storage_write_it_" + + System.currentTimeMillis() + + "_" + + new SecureRandom().nextInt(32); private static final String TABLE_PREFIX = "storage_write_"; private static TestBigQueryOptions bqOptions; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java index 2c26975d47d9..833a0a0829c7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java @@ -87,7 +87,11 @@ public class BigQuerySchemaUpdateOptionsIT { @BeforeClass public static void setupTestEnvironment() throws Exception { project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); - BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + BQ_CLIENT.createNewDataset( + project, + BIG_QUERY_DATASET_ID, + null, + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java index 515cbee5ecf8..fcc66db7b223 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java @@ -24,7 +24,6 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; - import java.security.SecureRandom; import java.util.Arrays; import org.apache.beam.sdk.Pipeline; @@ -58,9 +57,11 @@ public class BigQueryTimePartitioningClusteringIT { private static String project; private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryTimePartitioningClusteringIT"); - private static final String DATASET_NAME = "BigQueryTimePartitioningIT_" + System.currentTimeMillis() - + "_" - + new SecureRandom().nextInt(32); + private static final String DATASET_NAME = + "BigQueryTimePartitioningIT_" + + System.currentTimeMillis() + + "_" + + new SecureRandom().nextInt(32); private static final TimePartitioning TIME_PARTITIONING = new TimePartitioning().setField("date").setType("DAY"); private static final Clustering CLUSTERING = @@ -78,7 +79,11 @@ public class BigQueryTimePartitioningClusteringIT { @BeforeClass public static void setupTestEnvironment() throws Exception { project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); - BQ_CLIENT.createNewDataset(project, DATASET_NAME, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + BQ_CLIENT.createNewDataset( + project, + DATASET_NAME, + null, + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @Before diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java index 32e6476a291a..1abe7752b2e0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java @@ -43,11 +43,9 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.ExperimentalOptions; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; @@ -253,7 +251,8 @@ public interface BigQueryToTableOptions extends TestBigQueryOptions, Experimenta @BeforeClass public static void setupTestEnvironment() throws Exception { PipelineOptionsFactory.register(BigQueryToTableOptions.class); - BigQueryToTableOptions options = TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class); + BigQueryToTableOptions options = + TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class); project = options.as(GcpOptions.class).getProject(); // Create one BQ dataset for all test cases. BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID, null, options.getBigQueryLocation()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java index 6025883be060..d77f2381a210 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; @@ -84,7 +83,11 @@ private BigQueryIO.Write.Method getMethod() { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + BQ_CLIENT.createNewDataset( + PROJECT, + BIG_QUERY_DATASET_ID, + null, + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java index 627c0aba7199..ea6a2bf26484 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java @@ -111,7 +111,11 @@ private BigQueryIO.Write.Method getMethod() { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + BQ_CLIENT.createNewDataset( + PROJECT, + BIG_QUERY_DATASET_ID, + null, + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java index 751c7cf0e588..517f930b6c4d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java @@ -52,7 +52,11 @@ public class StorageApiSinkRowUpdateIT { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + BQ_CLIENT.createNewDataset( + PROJECT, + BIG_QUERY_DATASET_ID, + null, + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index 03dcc78c2a9e..e04efe472477 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -131,7 +131,11 @@ public static Iterable data() { public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. LOG.info("Creating dataset {}.", BIG_QUERY_DATASET_ID); - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + BQ_CLIENT.createNewDataset( + PROJECT, + BIG_QUERY_DATASET_ID, + null, + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java index 77a78b1b1723..fd5e65b07db9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java @@ -321,7 +321,11 @@ public class TableRowToStorageApiProtoIT { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + BQ_CLIENT.createNewDataset( + PROJECT, + BIG_QUERY_DATASET_ID, + null, + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); } @AfterClass From 8053b7ab6c624ca1b5e3b762147054ed7c3b5275 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 26 Sep 2023 15:59:55 -0400 Subject: [PATCH 13/26] create yaml workflow --- ...m_PostCommit_Java_BigQueryEarlyRollout.yml | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 .github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml diff --git a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml new file mode 100644 index 000000000000..1146c87d27f1 --- /dev/null +++ b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: PostCommit Java BigQueryEarlyRollout + +on: + issue_comment: + types: [created] + schedule: + - cron: '0 */6 * * *' + workflow_dispatch: + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.body || github.event.sender.login}}' + cancel-in-progress: true + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: read + checks: read + contents: read + deployments: read + id-token: none + issues: read + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +env: + GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Java_BigQueryEarlyRollout: + name: ${{matrix.job_name}} (${{matrix.job_phrase}}) + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 100 + strategy: + matrix: + job_name: [beam_PostCommit_Java_BigQueryEarlyRollout] + job_phrase: [Run Java BigQueryEarlyRollout PostCommit] + if: | + github.event_name == 'workflow_dispatch' || + github.event_name == 'schedule' || + github.event.comment.body == 'Run Java BigQueryEarlyRollout PostCommit' + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: run PostCommit Java BigQueryEarlyRollout script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:java:io:google-cloud-platform:bigQueryEarlyRolloutIntegrationTest + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v3 + if: failure() + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Publish JUnit Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/build/test-results/**/*.xml' \ No newline at end of file From 95b7cff6da4609129789c543932dfef494073836 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 26 Sep 2023 16:15:15 -0400 Subject: [PATCH 14/26] spotless --- .../beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java index 8cb4d4202f87..d061898d55c7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java @@ -42,9 +42,9 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; From 02ad45b670334abaee73cace2ed2f444a4b767e9 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 26 Sep 2023 16:45:27 -0400 Subject: [PATCH 15/26] test locally --- .../workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml index 1146c87d27f1..4b4e271f956a 100644 --- a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml +++ b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml @@ -53,7 +53,8 @@ env: jobs: beam_PostCommit_Java_BigQueryEarlyRollout: name: ${{matrix.job_name}} (${{matrix.job_phrase}}) - runs-on: [self-hosted, ubuntu-20.04, main] +# runs-on: [self-hosted, ubuntu-20.04, main] + runs-on: ubuntu-20.04 timeout-minutes: 100 strategy: matrix: From 6e878d45cd5cd3a1f6ecf2486ec300b20f3d57b9 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 28 Sep 2023 12:22:46 -0400 Subject: [PATCH 16/26] add step to authenticate GCP --- .../beam_PostCommit_Java_BigQueryEarlyRollout.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml index 4b4e271f956a..1eeedc9cebc1 100644 --- a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml +++ b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml @@ -72,6 +72,13 @@ jobs: comment_phrase: ${{ matrix.job_phrase }} github_token: ${{ secrets.GITHUB_TOKEN }} github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Authenticate on GCP + uses: google-github-actions/setup-gcloud@v0 + with: + service_account_email: ${{ secrets.GCP_SA_EMAIL }} + service_account_key: ${{ secrets.GCP_SA_KEY }} + project_id: ${{ secrets.GCP_PROJECT_ID }} + export_default_credentials: true - name: run PostCommit Java BigQueryEarlyRollout script uses: ./.github/actions/gradle-command-self-hosted-action with: From e008b5f7abe8e779550d2a11adf13336628a0ff9 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 28 Sep 2023 13:06:44 -0400 Subject: [PATCH 17/26] add file loads streaming test; add option to query with location --- sdks/java/io/google-cloud-platform/build.gradle | 1 + .../beam/sdk/io/gcp/testing/BigqueryClient.java | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 56ecda808db3..4d0b02849062 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -271,6 +271,7 @@ task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResou include '**/TableRowToStorageApiProtoIT.class' include '**/BigQuerySchemaUpdateOptionsIT.class' include '**/BigQueryTimePartitioningClusteringIT.class' + include '**/FileLoadsStreamingIT.class' maxParallelForks 4 classpath = sourceSets.test.runtimeClasspath diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java index 1639c64442b1..e923626314eb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java @@ -292,6 +292,21 @@ private QueryResponse getTypedTableRows(QueryResponse response) { public List queryUnflattened( String query, String projectId, boolean typed, boolean useStandardSql) throws IOException, InterruptedException { + return queryUnflattened(query, projectId, typed, useStandardSql, null); + } + + /** + * Performs a query without flattening results. May choose a location to perform this operation + * in. + */ + @Nonnull + public List queryUnflattened( + String query, + String projectId, + boolean typed, + boolean useStandardSql, + @Nullable String location) + throws IOException, InterruptedException { Random rnd = new Random(System.currentTimeMillis()); String temporaryDatasetId = String.format("_dataflow_temporary_dataset_%s_%s", System.nanoTime(), rnd.nextInt(1000000)); @@ -302,7 +317,7 @@ public List queryUnflattened( .setDatasetId(temporaryDatasetId) .setTableId(temporaryTableId); - createNewDataset(projectId, temporaryDatasetId); + createNewDataset(projectId, temporaryDatasetId, null, location); createNewTable( projectId, temporaryDatasetId, new Table().setTableReference(tempTableReference)); From 2fdf651b08875085306027582db6543ea7c03c34 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 28 Sep 2023 13:57:17 -0400 Subject: [PATCH 18/26] pass bq location to query operation --- sdks/java/io/google-cloud-platform/build.gradle | 3 +++ .../io/gcp/bigquery/FileLoadsStreamingIT.java | 9 +++++++-- .../bigquery/StorageApiDirectWriteProtosIT.java | 13 +++++++------ .../bigquery/StorageApiSinkFailedRowsIT.java | 17 +++++++++++------ .../gcp/bigquery/StorageApiSinkRowUpdateIT.java | 13 +++++++------ .../bigquery/StorageApiSinkSchemaUpdateIT.java | 15 ++++++++------- .../bigquery/TableRowToStorageApiProtoIT.java | 15 ++++++++------- 7 files changed, 51 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 4d0b02849062..cd3a61869716 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -261,14 +261,17 @@ task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResou outputs.upToDateWhen { false } + // export and direct read include '**/BigQueryToTableIT.class' include '**/BigQueryIOJsonIT.class' include '**/BigQueryIOStorageReadTableRowIT.class' + // storage write api include '**/StorageApiDirectWriteProtosIT.class' include '**/StorageApiSinkFailedRowsIT.class' include '**/StorageApiSinkRowUpdateIT.class' include '**/StorageApiSinkSchemaUpdateIT.class' include '**/TableRowToStorageApiProtoIT.class' + // file loads include '**/BigQuerySchemaUpdateOptionsIT.class' include '**/BigQueryTimePartitioningClusteringIT.class' include '**/FileLoadsStreamingIT.class' diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 012afed6fb43..678708062b8d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -106,11 +106,16 @@ public static Iterable data() { private final Random randomGenerator = new Random(); + // used when test suite specifies a particular GCP location for BigQuery operations + private static String bigQueryLocation; + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. cleanUp(); - BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + bigQueryLocation = + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation(); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, bigQueryLocation); } @AfterClass @@ -293,7 +298,7 @@ private static void checkRowCompleteness( throws IOException, InterruptedException { List actualTableRows = BQ_CLIENT.queryUnflattened( - String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, false); + String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, false, bigQueryLocation); Schema rowSchema = BigQueryUtils.fromTableSchema(schema); List actualBeamRows = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java index d77f2381a210..3da93c42a480 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java @@ -80,14 +80,15 @@ private BigQueryIO.Write.Method getMethod() { : BigQueryIO.Write.Method.STORAGE_WRITE_API; } + // used when test suite specifies a particular GCP location for BigQuery operations + private static String bigQueryLocation; + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset( - PROJECT, - BIG_QUERY_DATASET_ID, - null, - TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + bigQueryLocation = + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation(); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, bigQueryLocation); } @AfterClass @@ -195,7 +196,7 @@ public void testDirectWriteProtos() throws Exception { void assertRowsWritten(String tableSpec, Iterable expectedItems) throws Exception { List rows = BQ_CLIENT.queryUnflattened( - String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true); + String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true, bigQueryLocation); assertThat(rows, containsInAnyOrder(Iterables.toArray(expectedItems, TableRow.class))); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java index ea6a2bf26484..594fa80430eb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java @@ -108,14 +108,15 @@ private BigQueryIO.Write.Method getMethod() { : BigQueryIO.Write.Method.STORAGE_WRITE_API; } + // used when test suite specifies a particular GCP location for BigQuery operations + private static String bigQueryLocation; + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset( - PROJECT, - BIG_QUERY_DATASET_ID, - null, - TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + bigQueryLocation = + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation(); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, bigQueryLocation); } @AfterClass @@ -221,7 +222,11 @@ private void assertGoodRowsWritten(String tableSpec, Iterable goodRows TableRow queryResponse = Iterables.getOnlyElement( BQ_CLIENT.queryUnflattened( - String.format("SELECT COUNT(*) FROM %s", tableSpec), PROJECT, true, true)); + String.format("SELECT COUNT(*) FROM %s", tableSpec), + PROJECT, + true, + true, + bigQueryLocation)); int numRowsWritten = Integer.parseInt((String) queryResponse.get("f0_")); if (useAtLeastOnce) { assertThat(numRowsWritten, Matchers.greaterThanOrEqualTo(Iterables.size(goodRows))); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java index 517f930b6c4d..f8cc797a87cd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java @@ -49,14 +49,15 @@ public class StorageApiSinkRowUpdateIT { private static final String BIG_QUERY_DATASET_ID = "storage_api_sink_rows_update" + System.nanoTime(); + // used when test suite specifies a particular GCP location for BigQuery operations + private static String bigQueryLocation; + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset( - PROJECT, - BIG_QUERY_DATASET_ID, - null, - TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + bigQueryLocation = + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation(); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, bigQueryLocation); } @AfterClass @@ -133,7 +134,7 @@ private void assertRowsWritten(String tableSpec, Iterable expected) throws IOException, InterruptedException { List queryResponse = BQ_CLIENT.queryUnflattened( - String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true); + String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true, bigQueryLocation); assertThat(queryResponse, containsInAnyOrder(Iterables.toArray(expected, TableRow.class))); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index e04efe472477..352a0555d1ef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -127,15 +127,15 @@ public static Iterable data() { private final Random randomGenerator = new Random(); + // used when test suite specifies a particular GCP location for BigQuery operations + private static String bigQueryLocation; + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - LOG.info("Creating dataset {}.", BIG_QUERY_DATASET_ID); - BQ_CLIENT.createNewDataset( - PROJECT, - BIG_QUERY_DATASET_ID, - null, - TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + bigQueryLocation = + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation(); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, bigQueryLocation); } @AfterClass @@ -463,7 +463,8 @@ private static void checkRowCompleteness( String.format("SELECT COUNT(DISTINCT(id)), COUNT(id) FROM [%s]", tableSpec), PROJECT, true, - false)); + false, + bigQueryLocation)); int distinctCount = Integer.parseInt((String) queryResponse.get("f0_")); int totalCount = Integer.parseInt((String) queryResponse.get("f1_")); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java index fd5e65b07db9..f28ae588a5ec 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java @@ -318,14 +318,15 @@ public class TableRowToStorageApiProtoIT { .setFields(BASE_TABLE_SCHEMA.getFields())) .build()); + // used when test suite specifies a particular GCP location for BigQuery operations + private static String bigQueryLocation; + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. - BQ_CLIENT.createNewDataset( - PROJECT, - BIG_QUERY_DATASET_ID, - null, - TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation()); + bigQueryLocation = + TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation(); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null, bigQueryLocation); } @AfterClass @@ -342,7 +343,7 @@ public void testBaseTableRow() throws IOException, InterruptedException { List actualTableRows = BQ_CLIENT.queryUnflattened( - String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true); + String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true, bigQueryLocation); assertEquals(1, actualTableRows.size()); assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0)); @@ -368,7 +369,7 @@ public void testNestedRichTypesAndNull() throws IOException, InterruptedExceptio List actualTableRows = BQ_CLIENT.queryUnflattened( - String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true); + String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true, bigQueryLocation); assertEquals(1, actualTableRows.size()); assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0).get("nestedValue1")); From e939ccfbf966fe78da46360bcd1121bbd4b49fd3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 28 Sep 2023 15:25:28 -0400 Subject: [PATCH 19/26] debug --- .../bigquery/StorageApiSinkFailedRowsIT.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java index 594fa80430eb..85ad467f001c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java @@ -219,14 +219,21 @@ private static String createTable(TableSchema tableSchema) private void assertGoodRowsWritten(String tableSpec, Iterable goodRows) throws IOException, InterruptedException { - TableRow queryResponse = - Iterables.getOnlyElement( - BQ_CLIENT.queryUnflattened( - String.format("SELECT COUNT(*) FROM %s", tableSpec), - PROJECT, - true, - true, - bigQueryLocation)); + TableRow queryResponse; + try { + queryResponse = + Iterables.getOnlyElement( + BQ_CLIENT.queryUnflattened( + String.format("SELECT COUNT(*) FROM %s", tableSpec), + PROJECT, + true, + true, + bigQueryLocation)); + } catch (IOException | InterruptedException e) { + LOG.error("exception: {}", e); + LOG.debug("exception: {}", e); + throw e; + } int numRowsWritten = Integer.parseInt((String) queryResponse.get("f0_")); if (useAtLeastOnce) { assertThat(numRowsWritten, Matchers.greaterThanOrEqualTo(Iterables.size(goodRows))); From 6d60be2b68d55cfcab87f797393f189222e96859 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 28 Sep 2023 17:06:33 -0400 Subject: [PATCH 20/26] pass location into query operations --- .../sdk/io/gcp/testing/BigqueryClient.java | 5 +++- .../bigquery/StorageApiSinkFailedRowsIT.java | 23 +++++++------------ 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java index e923626314eb..04aceaf354fa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java @@ -319,7 +319,9 @@ public List queryUnflattened( createNewDataset(projectId, temporaryDatasetId, null, location); createNewTable( - projectId, temporaryDatasetId, new Table().setTableReference(tempTableReference)); + projectId, + temporaryDatasetId, + new Table().setTableReference(tempTableReference).setLocation(location)); JobConfigurationQuery jcQuery = new JobConfigurationQuery() @@ -340,6 +342,7 @@ public List queryUnflattened( bqClient .jobs() .getQueryResults(projectId, insertedJob.getJobReference().getJobId()) + .setLocation(location) .execute(); } while (!qResponse.getJobComplete()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java index 85ad467f001c..f721f57147e3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java @@ -219,21 +219,14 @@ private static String createTable(TableSchema tableSchema) private void assertGoodRowsWritten(String tableSpec, Iterable goodRows) throws IOException, InterruptedException { - TableRow queryResponse; - try { - queryResponse = - Iterables.getOnlyElement( - BQ_CLIENT.queryUnflattened( - String.format("SELECT COUNT(*) FROM %s", tableSpec), - PROJECT, - true, - true, - bigQueryLocation)); - } catch (IOException | InterruptedException e) { - LOG.error("exception: {}", e); - LOG.debug("exception: {}", e); - throw e; - } + TableRow queryResponse = + Iterables.getOnlyElement( + BQ_CLIENT.queryUnflattened( + String.format("SELECT COUNT(*) FROM `%s`", tableSpec), + PROJECT, + true, + true, + bigQueryLocation)); int numRowsWritten = Integer.parseInt((String) queryResponse.get("f0_")); if (useAtLeastOnce) { assertThat(numRowsWritten, Matchers.greaterThanOrEqualTo(Iterables.size(goodRows))); From 88e2ec5f8d177174116e6599cdac9c0f60132534 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 29 Sep 2023 01:33:09 -0400 Subject: [PATCH 21/26] forgot to add it one test --- .../sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index 352a0555d1ef..bc99a4f50f70 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -121,9 +121,9 @@ public static Iterable data() { // an updated schema. If that happens consistently, just increase these two numbers // to give it more time. // Total number of rows written to the sink - private static final int TOTAL_N = 60; + private static final int TOTAL_N = 70; // Number of rows with the original schema - private static final int ORIGINAL_N = 50; + private static final int ORIGINAL_N = 60; private final Random randomGenerator = new Random(); @@ -484,7 +484,7 @@ public void checkRowsWithUpdatedSchema( throws IOException, InterruptedException { List actualRows = BQ_CLIENT.queryUnflattened( - String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, false); + String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, false, bigQueryLocation); for (TableRow row : actualRows) { // Rows written to the table should not have the extra field if From d87e169ffc285734676ff954320abc947df2457d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 29 Sep 2023 11:18:01 -0400 Subject: [PATCH 22/26] checks write permission --- .github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml index 1eeedc9cebc1..eeecb67b94de 100644 --- a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml +++ b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml @@ -33,7 +33,7 @@ concurrency: permissions: actions: write pull-requests: read - checks: read + checks: write contents: read deployments: read id-token: none From 86727d0eabab1752eb70804cca95dedc4c9b75c3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 29 Sep 2023 12:02:16 -0400 Subject: [PATCH 23/26] use self-hosted machine --- .../workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml index eeecb67b94de..952273e810d2 100644 --- a/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml +++ b/.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml @@ -53,8 +53,7 @@ env: jobs: beam_PostCommit_Java_BigQueryEarlyRollout: name: ${{matrix.job_name}} (${{matrix.job_phrase}}) -# runs-on: [self-hosted, ubuntu-20.04, main] - runs-on: ubuntu-20.04 + runs-on: [self-hosted, ubuntu-20.04, main] timeout-minutes: 100 strategy: matrix: From 14a60ca97548b680c519432996d5872d74d8ab2f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 2 Oct 2023 16:18:34 -0400 Subject: [PATCH 24/26] add some direction on what to do when tests fail; make early rollout region a final value to reference --- sdks/java/io/google-cloud-platform/build.gradle | 7 ++++--- .../beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java | 1 + .../apache/beam/sdk/io/gcp/testing/BigqueryClient.java | 8 +++++--- .../sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java | 4 +++- .../beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java | 3 ++- .../io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java | 4 +++- 6 files changed, 18 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index cd3a61869716..9c7263086360 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -246,12 +246,13 @@ task integrationTestKms(type: Test) { Integration tests for BigQueryIO that run on BigQuery's early rollout region (us-east7) with the intended purpose of catching breaking changes from new BigQuery releases. If these tests fail here but not in `Java_GCP_IO_Direct`, there may be a new BigQuery change - that is breaking the connector. + that is breaking the connector. If this is the case, we should verify with the appropriate + BigQuery infrastructure API team. */ task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResources) { group = "Verification" - def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' - def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-bigquery-day0-tests' + def gcpProject = project.findProperty('gcpProject') ?: 'google.com:clouddfe'//'apache-beam-testing' + def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://ahmedabualsaud-wordcount'// 'gs://temp-storage-for-bigquery-day0-tests' systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ "--runner=DirectRunner", "--project=${gcpProject}", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java index 7000306e62f1..4d8095c1879d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQueryOptions.java @@ -24,6 +24,7 @@ /** {@link TestPipelineOptions} for {@link TestBigQuery}. */ public interface TestBigQueryOptions extends TestPipelineOptions, BigQueryOptions, GcpOptions { + String BIGQUERY_EARLY_ROLLOUT_REGION = "us-east7"; @Description("Dataset used in the integration tests. Default is integ_test") @Default.String("integ_test") diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java index 04aceaf354fa..0e9476e6a226 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java @@ -296,8 +296,8 @@ public List queryUnflattened( } /** - * Performs a query without flattening results. May choose a location to perform this operation - * in. + * Performs a query without flattening results. May choose a location (GCP region) to perform this + * operation in. */ @Nonnull public List queryUnflattened( @@ -416,7 +416,9 @@ public void createNewDataset( createNewDataset(projectId, datasetId, defaultTableExpirationMs, null); } - /** Creates a new dataset with defaultTableExpirationMs and in a specified location. */ + /** + * Creates a new dataset with defaultTableExpirationMs and in a specified location (GCP region). + */ public void createNewDataset( String projectId, String datasetId, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index c87cf440e2e4..d355d6bb9336 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION; + import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; @@ -56,7 +58,7 @@ public class BigQueryIOStorageQueryIT { TestPipeline.testingPipelineOptions() .as(TestBigQueryOptions.class) .getBigQueryLocation() - .equals("us-east7") + .equals(BIGQUERY_EARLY_ROLLOUT_REGION) ? "big_query_storage_day0" : "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index d0ff62da606b..b4f6ddb76f72 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION; import static org.junit.Assert.assertEquals; import com.google.cloud.bigquery.storage.v1.DataFormat; @@ -69,7 +70,7 @@ public class BigQueryIOStorageReadIT { TestPipeline.testingPipelineOptions() .as(TestBigQueryOptions.class) .getBigQueryLocation() - .equals("us-east7") + .equals(BIGQUERY_EARLY_ROLLOUT_REGION) ? "big_query_storage_day0" : "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java index 915394d136bf..35e2676c70ef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION; + import com.google.api.services.bigquery.model.TableRow; import java.util.HashSet; import java.util.Set; @@ -56,7 +58,7 @@ public class BigQueryIOStorageReadTableRowIT { TestPipeline.testingPipelineOptions() .as(TestBigQueryOptions.class) .getBigQueryLocation() - .equals("us-east7") + .equals(BIGQUERY_EARLY_ROLLOUT_REGION) ? "big_query_import_export_day0" : "big_query_import_export"; private static final String TABLE_PREFIX = "parallel_read_table_row_"; From 2bc98d9a1ed5d9d98d1b7d5fb543926071ba1a4a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 2 Oct 2023 16:20:16 -0400 Subject: [PATCH 25/26] one more comment --- sdks/java/io/google-cloud-platform/build.gradle | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 9c7263086360..efc9ff3db9c6 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -248,11 +248,13 @@ task integrationTestKms(type: Test) { If these tests fail here but not in `Java_GCP_IO_Direct`, there may be a new BigQuery change that is breaking the connector. If this is the case, we should verify with the appropriate BigQuery infrastructure API team. + + To test in a BigQuery location, we just need to create our datasets in that location. */ task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResources) { group = "Verification" - def gcpProject = project.findProperty('gcpProject') ?: 'google.com:clouddfe'//'apache-beam-testing' - def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://ahmedabualsaud-wordcount'// 'gs://temp-storage-for-bigquery-day0-tests' + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-bigquery-day0-tests' systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ "--runner=DirectRunner", "--project=${gcpProject}", From 759ce06401779d248043f9443b72e378bdc9d4b3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 3 Oct 2023 12:59:22 -0400 Subject: [PATCH 26/26] fix time partitioning test --- .../BigQueryTimePartitioningClusteringIT.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java index fcc66db7b223..da5f396e8d89 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java @@ -137,8 +137,7 @@ public ClusteredDestinations(String tableName) { @Override public TableDestination getDestination(ValueInSingleWindow element) { - return new TableDestination( - String.format("%s.%s", DATASET_NAME, tableName), null, TIME_PARTITIONING, CLUSTERING); + return new TableDestination(tableName, null, TIME_PARTITIONING, CLUSTERING); } @Override @@ -203,6 +202,7 @@ public void testE2EBigQueryClustering() throws Exception { @Test public void testE2EBigQueryClusteringTableFunction() throws Exception { String tableName = "weather_stations_clustered_table_function_" + System.currentTimeMillis(); + String destination = String.format("%s.%s", DATASET_NAME, tableName); Pipeline p = Pipeline.create(options); @@ -212,11 +212,7 @@ public void testE2EBigQueryClusteringTableFunction() throws Exception { BigQueryIO.writeTableRows() .to( (ValueInSingleWindow vsw) -> - new TableDestination( - String.format("%s.%s", DATASET_NAME, tableName), - null, - TIME_PARTITIONING, - CLUSTERING)) + new TableDestination(destination, null, TIME_PARTITIONING, CLUSTERING)) .withClustering() .withSchema(SCHEMA) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) @@ -233,6 +229,7 @@ public void testE2EBigQueryClusteringTableFunction() throws Exception { public void testE2EBigQueryClusteringDynamicDestinations() throws Exception { String tableName = "weather_stations_clustered_dynamic_destinations_" + System.currentTimeMillis(); + String destination = String.format("%s.%s", DATASET_NAME, tableName); Pipeline p = Pipeline.create(options); @@ -240,7 +237,7 @@ public void testE2EBigQueryClusteringDynamicDestinations() throws Exception { .apply(ParDo.of(new KeepStationNumberAndConvertDate())) .apply( BigQueryIO.writeTableRows() - .to(new ClusteredDestinations(tableName)) + .to(new ClusteredDestinations(destination)) .withClustering() .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));