From 408f82e7817b53218e5da588f580172645aa3588 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 13 Oct 2023 18:17:19 +0000 Subject: [PATCH 01/28] Add integration tests with RetrySettings enabled. --- .../bigquerystorage/WriteCommittedStream.java | 16 ++- .../bigquerystorage/WriteToDefaultStream.java | 14 ++- .../WriteCommittedStreamRetryIT.java | 105 +++++++++++++++++ .../WriteToDefaultStreamRetryIT.java | 110 ++++++++++++++++++ 4 files changed, 239 insertions(+), 6 deletions(-) create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index c8c9334374..d9dd1ba09d 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; @@ -37,6 +38,7 @@ import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; +import org.threeten.bp.Duration; public class WriteCommittedStream { @@ -52,12 +54,17 @@ public static void runWriteCommittedStream() public static void writeCommittedStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { + writeCommittedStream(projectId, datasetName, tableName, null); + } + + public static void writeCommittedStream(String projectId, String datasetName, String tableName, RetrySettings retrySettings) + throws DescriptorValidationException, InterruptedException, IOException { BigQueryWriteClient client = BigQueryWriteClient.create(); TableName parentTable = TableName.of(projectId, datasetName, tableName); DataWriter writer = new DataWriter(); // One time initialization. - writer.initialize(parentTable, client); + writer.initialize(parentTable, client, retrySettings); try { // Write two batches of fake data to the stream, each with 10 JSON records. Data may be @@ -67,7 +74,9 @@ public static void writeCommittedStream(String projectId, String datasetName, St for (int i = 0; i < 2; i++) { // Create a JSON object that is compatible with the table schema. JSONArray jsonArr = new JSONArray(); - for (int j = 0; j < 10; j++) { + // Use odd record count to allow retry testing to not loop, as simulated errors can occur + // every 10 messages. + for (int j = 0; j < 11; j++) { JSONObject record = new JSONObject(); record.put("col1", String.format("batch-record %03d-%03d", i, j)); jsonArr.put(record); @@ -99,7 +108,7 @@ private static class DataWriter { @GuardedBy("lock") private RuntimeException error = null; - void initialize(TableName parentTable, BigQueryWriteClient client) + void initialize(TableName parentTable, BigQueryWriteClient client, RetrySettings retrySettings) throws IOException, DescriptorValidationException, InterruptedException { // Initialize a write stream for the specified table. // For more information on WriteStream.Type, see: @@ -118,6 +127,7 @@ void initialize(TableName parentTable, BigQueryWriteClient client) // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) + .setRetrySettings(retrySettings) .build(); } diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index feccef61f0..19ad76815f 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.QueryJobConfiguration; @@ -49,6 +50,7 @@ import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; +import org.threeten.bp.Duration; public class WriteToDefaultStream { @@ -81,11 +83,16 @@ private static JSONObject buildRecord(int i, int j) { public static void writeToDefaultStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { + writeToDefaultStream(projectId, datasetName, tableName, null); + } + + public static void writeToDefaultStream(String projectId, String datasetName, String tableName, RetrySettings retrySettings) + throws DescriptorValidationException, InterruptedException, IOException { TableName parentTable = TableName.of(projectId, datasetName, tableName); DataWriter writer = new DataWriter(); // One time initialization for the worker. - writer.initialize(parentTable); + writer.initialize(parentTable, retrySettings); // Write two batches of fake data to the stream, each with 10 JSON records. Data may be // batched up to the maximum request size: @@ -161,7 +168,7 @@ private static class DataWriter { private AtomicInteger recreateCount = new AtomicInteger(0); - public void initialize(TableName parentTable) + public void initialize(TableName parentTable, RetrySettings retrySettings) throws DescriptorValidationException, IOException, InterruptedException { // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. @@ -178,7 +185,8 @@ public void initialize(TableName parentTable) .setKeepAliveWithoutCalls(true) .setChannelsPerCpu(2) .build()) - .setEnableConnectionPool(true) + .setEnableConnectionPool(false) + .setRetrySettings(retrySettings) // If value is missing in json and there is a default value configured on bigquery // column, apply the default value to the missing value field. .setDefaultMissingValueInterpretation( diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java new file mode 100644 index 0000000000..3abb854560 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class WriteCommittedStreamRetryIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private PrintStream out; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "COMMITTED_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = Schema.of(Field.of("col1", StandardSQLTypeName.STRING)); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); + } + + @Test + public void testWriteCommittedStreamRetry() throws Exception { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(50)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(20) + .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) + .build(); + WriteCommittedStream.writeCommittedStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); + assertThat(bout.toString()).contains("Appended records successfully."); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java new file mode 100644 index 0000000000..74435764fa --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java @@ -0,0 +1,110 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class WriteToDefaultStreamRetryIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private PrintStream out; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("test_string", StandardSQLTypeName.STRING) + .setMaxLength(20L) + .build(), + com.google.cloud.bigquery.Field.newBuilder("test_bytes", StandardSQLTypeName.BYTES) + .build()); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); + } + + @Test + public void testWriteToDefaultStreamRetry() throws Exception { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(50)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(20) + .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) + .build(); + WriteToDefaultStream.writeToDefaultStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); + assertThat(bout.toString()).contains("Appended records successfully."); + } +} From ab069b57291ed7cc4aad64d5008b296b0283c295 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 13 Oct 2023 18:28:14 +0000 Subject: [PATCH 02/28] Fix formatting --- .../java/com/example/bigquerystorage/WriteCommittedStream.java | 3 ++- .../java/com/example/bigquerystorage/WriteToDefaultStream.java | 3 ++- .../example/bigquerystorage/WriteCommittedStreamRetryIT.java | 3 ++- .../example/bigquerystorage/WriteToDefaultStreamRetryIT.java | 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index d9dd1ba09d..0bf0d77ee3 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -57,7 +57,8 @@ public static void writeCommittedStream(String projectId, String datasetName, St writeCommittedStream(projectId, datasetName, tableName, null); } - public static void writeCommittedStream(String projectId, String datasetName, String tableName, RetrySettings retrySettings) + public static void writeCommittedStream( + String projectId, String datasetName, String tableName, RetrySettings retrySettings) throws DescriptorValidationException, InterruptedException, IOException { BigQueryWriteClient client = BigQueryWriteClient.create(); TableName parentTable = TableName.of(projectId, datasetName, tableName); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 19ad76815f..5d23aedc9d 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -86,7 +86,8 @@ public static void writeToDefaultStream(String projectId, String datasetName, St writeToDefaultStream(projectId, datasetName, tableName, null); } - public static void writeToDefaultStream(String projectId, String datasetName, String tableName, RetrySettings retrySettings) + public static void writeToDefaultStream( + String projectId, String datasetName, String tableName, RetrySettings retrySettings) throws DescriptorValidationException, InterruptedException, IOException { TableName parentTable = TableName.of(projectId, datasetName, tableName); diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java index 3abb854560..cdd0fb143b 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java @@ -99,7 +99,8 @@ public void testWriteCommittedStreamRetry() throws Exception { .setMaxAttempts(20) .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) .build(); - WriteCommittedStream.writeCommittedStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); + WriteCommittedStream.writeCommittedStream( + GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); assertThat(bout.toString()).contains("Appended records successfully."); } } diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java index 74435764fa..efba87dd11 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java @@ -104,7 +104,8 @@ public void testWriteToDefaultStreamRetry() throws Exception { .setMaxAttempts(20) .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) .build(); - WriteToDefaultStream.writeToDefaultStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); + WriteToDefaultStream.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); assertThat(bout.toString()).contains("Appended records successfully."); } } From 51f1f51e112df2ee74b93b8d7b4c800688f5f18b Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 30 Oct 2023 19:31:54 +0000 Subject: [PATCH 03/28] Add quota and non-quota e2e tests. --- .../it/ITBigQueryWriteNonQuotaRetryTest.java | 277 ++++++++++++++++++ .../v1/it/ITBigQueryWriteQuotaRetryTest.java | 236 +++++++++++++++ 2 files changed, 513 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java new file mode 100644 index 0000000000..67f4cbdb9a --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -0,0 +1,277 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.ServiceOptions; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.test.Test.ComplicateType; +import com.google.cloud.bigquery.storage.test.Test.FooTimestampType; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.test.Test.InnerType; +import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; +import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder; +import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; +import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; +import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; +import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import io.grpc.Status; +import io.grpc.Status.Code; +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.threeten.bp.Duration; +import org.threeten.bp.LocalDateTime; + +/** Integration tests for BigQuery Write API. */ +public class ITBigQueryWriteNonQuotaRetryTest { + private static final Logger LOG = + Logger.getLogger(ITBigQueryWriteNonQuotaRetryTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String TABLE = "testtable"; + private static final String TABLE2 = "complicatedtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test"; + + private static BigQueryWriteClient client; + private static TableInfo tableInfo; + private static String tableId; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryWriteClient.create(); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); + Field.Builder innerTypeFieldBuilder = + Field.newBuilder( + "inner_type", + LegacySQLTypeName.RECORD, + Field.newBuilder("value", LegacySQLTypeName.STRING) + .setMode(Field.Mode.REPEATED) + .build()); + bigquery.create(tableInfo); + tableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, TABLE); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + @Test + public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() + throws IOException, InterruptedException, ExecutionException, + DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "CommittedRetry"; + TableId tableId = TableId.of(DATASET, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + int totalRequest = 901; + int rowBatch = 1; + ArrayList> allResponses = + new ArrayList>(totalRequest); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < totalRequest; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatch; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + totalRequest); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < totalRequest; i++) { + LOG.info("Waiting for request " + i); + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); + } catch (ExecutionException ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } + + @Test + public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() + throws IOException, InterruptedException, ExecutionException, + DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "JsonTableDefaultStream"; + TableFieldSchema TEST_STRING = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("test_str") + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder( + "test_str", StandardSQLTypeName.STRING) + .build()))) + .build(); + + bigquery.create(tableInfo); + TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName); + + int totalRequest = 901; + int rowBatch = 1; + ArrayList> allResponses = + new ArrayList>(totalRequest); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(parent.toString(), tableSchema) + .setIgnoreUnknownFields(true) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < totalRequest; k++) { + JSONObject row = new JSONObject(); + row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatch; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + totalRequest); + allResponses.add(jsonStreamWriter.append(jsonArr)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < totalRequest; i++) { + LOG.info("Waiting for request " + i); + try { + assertFalse(allResponses.get(i).get().hasError()); + } catch (Exception ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java new file mode 100644 index 0000000000..cf3c6c0969 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -0,0 +1,236 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import static org.junit.Assert.assertFalse; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.ServiceOptions; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.threeten.bp.Duration; + +/** Integration tests for BigQuery Write API. */ +public class ITBigQueryWriteQuotaRetryTest { + private static final Logger LOG = + Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String TABLE = "testtable"; + private static final String TABLE2 = "complicatedtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + private static final String QUOTA_RETRY_PROJECT_ID = "bq-writeapi-java-quota-retry"; + + private static BigQueryWriteClient client; + private static TableInfo tableInfo; + private static String tableId; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryWriteClient.create(); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); + Field.Builder innerTypeFieldBuilder = + Field.newBuilder( + "inner_type", + LegacySQLTypeName.RECORD, + Field.newBuilder("value", LegacySQLTypeName.STRING) + .setMode(Field.Mode.REPEATED) + .build()); + bigquery.create(tableInfo); + tableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, TABLE); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + @Test + public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() + throws IOException, InterruptedException, ExecutionException, + DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "CommittedRetry"; + TableId tableId = TableId.of(DATASET, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + int totalRequest = 901; + int rowBatch = 1; + ArrayList> allResponses = + new ArrayList>(totalRequest); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < totalRequest; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatch; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + totalRequest); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < totalRequest; i++) { + LOG.info("Waiting for request " + i); + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); + } catch (ExecutionException ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } + + @Test + public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() + throws IOException, InterruptedException, ExecutionException, + DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "JsonTableDefaultStream"; + TableFieldSchema TEST_STRING = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("test_str") + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder( + "test_str", StandardSQLTypeName.STRING) + .build()))) + .build(); + + bigquery.create(tableInfo); + TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName); + + int totalRequest = 901; + int rowBatch = 1; + ArrayList> allResponses = + new ArrayList>(totalRequest); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(parent.toString(), tableSchema) + .setIgnoreUnknownFields(true) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < totalRequest; k++) { + JSONObject row = new JSONObject(); + row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatch; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + totalRequest); + allResponses.add(jsonStreamWriter.append(jsonArr)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < totalRequest; i++) { + LOG.info("Waiting for request " + i); + try { + assertFalse(allResponses.get(i).get().hasError()); + } catch (Exception ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } +} From 8c82567455fbd5925c027775d1037ab62c3707d7 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 30 Oct 2023 20:28:45 +0000 Subject: [PATCH 04/28] Cleanup --- .../it/ITBigQueryWriteNonQuotaRetryTest.java | 85 +++---------------- .../v1/it/ITBigQueryWriteQuotaRetryTest.java | 45 +++------- 2 files changed, 28 insertions(+), 102 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index 67f4cbdb9a..776e2c2114 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -16,73 +16,32 @@ package com.google.cloud.bigquery.storage.v1.it; -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; import com.google.api.gax.retrying.RetrySettings; -import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.LegacySQLTypeName; -import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; -import com.google.cloud.bigquery.TableResult; -import com.google.cloud.bigquery.storage.test.Test.ComplicateType; -import com.google.cloud.bigquery.storage.test.Test.FooTimestampType; -import com.google.cloud.bigquery.storage.test.Test.FooType; -import com.google.cloud.bigquery.storage.test.Test.InnerType; -import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; -import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; -import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder; -import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; -import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; -import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; -import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException; -import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; -import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.cloud.bigquery.storage.v1.ProtoRows; -import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; -import com.google.cloud.bigquery.storage.v1.StreamWriter; import com.google.cloud.bigquery.storage.v1.TableFieldSchema; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; -import com.google.common.collect.ImmutableList; -import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.DescriptorValidationException; -import io.grpc.Status; -import io.grpc.Status.Code; import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import org.json.JSONArray; import org.json.JSONObject; @@ -91,7 +50,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.threeten.bp.Duration; -import org.threeten.bp.LocalDateTime; /** Integration tests for BigQuery Write API. */ public class ITBigQueryWriteNonQuotaRetryTest { @@ -99,13 +57,10 @@ public class ITBigQueryWriteNonQuotaRetryTest { Logger.getLogger(ITBigQueryWriteNonQuotaRetryTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; - private static final String TABLE2 = "complicatedtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test"; private static BigQueryWriteClient client; - private static TableInfo tableInfo; - private static String tableId; private static BigQuery bigquery; @BeforeClass @@ -118,27 +73,15 @@ public static void beforeClass() throws IOException { DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); - tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, TABLE), - StandardTableDefinition.of( - Schema.of( - Field.newBuilder("foo", LegacySQLTypeName.STRING) - .setMode(Field.Mode.NULLABLE) - .build()))) - .build(); - Field.Builder innerTypeFieldBuilder = - Field.newBuilder( - "inner_type", - LegacySQLTypeName.RECORD, - Field.newBuilder("value", LegacySQLTypeName.STRING) - .setMode(Field.Mode.REPEATED) - .build()); + TableInfo tableInfo = TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); bigquery.create(tableInfo); - tableId = - String.format( - "projects/%s/datasets/%s/tables/%s", - ServiceOptions.getDefaultProjectId(), DATASET, TABLE); } @AfterClass @@ -155,8 +98,8 @@ public static void afterClass() { @Test public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() - throws IOException, InterruptedException, ExecutionException, - DescriptorValidationException { + throws IOException, InterruptedException, + DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(500)) @@ -182,7 +125,7 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() int totalRequest = 901; int rowBatch = 1; ArrayList> allResponses = - new ArrayList>(totalRequest); + new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) .setRetrySettings(retrySettings) @@ -213,8 +156,8 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() @Test public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() - throws IOException, InterruptedException, ExecutionException, - DescriptorValidationException { + throws IOException, InterruptedException, + DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(500)) @@ -246,7 +189,7 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() int totalRequest = 901; int rowBatch = 1; ArrayList> allResponses = - new ArrayList>(totalRequest); + new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(parent.toString(), tableSchema) .setIgnoreUnknownFields(true) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java index cf3c6c0969..0cbf3b00b9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -20,7 +20,6 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.retrying.RetrySettings; -import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; @@ -58,13 +57,9 @@ public class ITBigQueryWriteQuotaRetryTest { Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; - private static final String TABLE2 = "complicatedtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; private static final String QUOTA_RETRY_PROJECT_ID = "bq-writeapi-java-quota-retry"; - private static BigQueryWriteClient client; - private static TableInfo tableInfo; - private static String tableId; private static BigQuery bigquery; @BeforeClass @@ -77,27 +72,15 @@ public static void beforeClass() throws IOException { DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); - tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, TABLE), - StandardTableDefinition.of( - Schema.of( - Field.newBuilder("foo", LegacySQLTypeName.STRING) - .setMode(Field.Mode.NULLABLE) - .build()))) - .build(); - Field.Builder innerTypeFieldBuilder = - Field.newBuilder( - "inner_type", - LegacySQLTypeName.RECORD, - Field.newBuilder("value", LegacySQLTypeName.STRING) - .setMode(Field.Mode.REPEATED) - .build()); + TableInfo tableInfo = TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); bigquery.create(tableInfo); - tableId = - String.format( - "projects/%s/datasets/%s/tables/%s", - ServiceOptions.getDefaultProjectId(), DATASET, TABLE); } @AfterClass @@ -114,8 +97,8 @@ public static void afterClass() { @Test public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() - throws IOException, InterruptedException, ExecutionException, - DescriptorValidationException { + throws IOException, InterruptedException, + DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(500)) @@ -141,7 +124,7 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() int totalRequest = 901; int rowBatch = 1; ArrayList> allResponses = - new ArrayList>(totalRequest); + new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) .setRetrySettings(retrySettings) @@ -172,8 +155,8 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() @Test public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() - throws IOException, InterruptedException, ExecutionException, - DescriptorValidationException { + throws IOException, InterruptedException, + DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(500)) @@ -205,7 +188,7 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() int totalRequest = 901; int rowBatch = 1; ArrayList> allResponses = - new ArrayList>(totalRequest); + new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(parent.toString(), tableSchema) .setIgnoreUnknownFields(true) From 9abfa88a2a9b18264ef32e336bc529f069def578 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 17:24:06 +0000 Subject: [PATCH 05/28] Revert sample changes to keep this branch only for e2e tests --- .../bigquerystorage/WriteCommittedStream.java | 17 +++-------------- .../bigquerystorage/WriteToDefaultStream.java | 15 +++------------ 2 files changed, 6 insertions(+), 26 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 0bf0d77ee3..c8c9334374 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -20,7 +20,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; @@ -38,7 +37,6 @@ import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; -import org.threeten.bp.Duration; public class WriteCommittedStream { @@ -54,18 +52,12 @@ public static void runWriteCommittedStream() public static void writeCommittedStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { - writeCommittedStream(projectId, datasetName, tableName, null); - } - - public static void writeCommittedStream( - String projectId, String datasetName, String tableName, RetrySettings retrySettings) - throws DescriptorValidationException, InterruptedException, IOException { BigQueryWriteClient client = BigQueryWriteClient.create(); TableName parentTable = TableName.of(projectId, datasetName, tableName); DataWriter writer = new DataWriter(); // One time initialization. - writer.initialize(parentTable, client, retrySettings); + writer.initialize(parentTable, client); try { // Write two batches of fake data to the stream, each with 10 JSON records. Data may be @@ -75,9 +67,7 @@ public static void writeCommittedStream( for (int i = 0; i < 2; i++) { // Create a JSON object that is compatible with the table schema. JSONArray jsonArr = new JSONArray(); - // Use odd record count to allow retry testing to not loop, as simulated errors can occur - // every 10 messages. - for (int j = 0; j < 11; j++) { + for (int j = 0; j < 10; j++) { JSONObject record = new JSONObject(); record.put("col1", String.format("batch-record %03d-%03d", i, j)); jsonArr.put(record); @@ -109,7 +99,7 @@ private static class DataWriter { @GuardedBy("lock") private RuntimeException error = null; - void initialize(TableName parentTable, BigQueryWriteClient client, RetrySettings retrySettings) + void initialize(TableName parentTable, BigQueryWriteClient client) throws IOException, DescriptorValidationException, InterruptedException { // Initialize a write stream for the specified table. // For more information on WriteStream.Type, see: @@ -128,7 +118,6 @@ void initialize(TableName parentTable, BigQueryWriteClient client, RetrySettings // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) - .setRetrySettings(retrySettings) .build(); } diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 5d23aedc9d..feccef61f0 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -22,7 +22,6 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.FixedExecutorProvider; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.QueryJobConfiguration; @@ -50,7 +49,6 @@ import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; -import org.threeten.bp.Duration; public class WriteToDefaultStream { @@ -83,17 +81,11 @@ private static JSONObject buildRecord(int i, int j) { public static void writeToDefaultStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { - writeToDefaultStream(projectId, datasetName, tableName, null); - } - - public static void writeToDefaultStream( - String projectId, String datasetName, String tableName, RetrySettings retrySettings) - throws DescriptorValidationException, InterruptedException, IOException { TableName parentTable = TableName.of(projectId, datasetName, tableName); DataWriter writer = new DataWriter(); // One time initialization for the worker. - writer.initialize(parentTable, retrySettings); + writer.initialize(parentTable); // Write two batches of fake data to the stream, each with 10 JSON records. Data may be // batched up to the maximum request size: @@ -169,7 +161,7 @@ private static class DataWriter { private AtomicInteger recreateCount = new AtomicInteger(0); - public void initialize(TableName parentTable, RetrySettings retrySettings) + public void initialize(TableName parentTable) throws DescriptorValidationException, IOException, InterruptedException { // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. @@ -186,8 +178,7 @@ public void initialize(TableName parentTable, RetrySettings retrySettings) .setKeepAliveWithoutCalls(true) .setChannelsPerCpu(2) .build()) - .setEnableConnectionPool(false) - .setRetrySettings(retrySettings) + .setEnableConnectionPool(true) // If value is missing in json and there is a default value configured on bigquery // column, apply the default value to the missing value field. .setDefaultMissingValueInterpretation( From 3bea4df454f5843ea97b76bfe81a459feb190682 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 17:32:11 +0000 Subject: [PATCH 06/28] Remove additional retry-specific sample code --- .../WriteCommittedStreamRetryIT.java | 106 ----------------- .../WriteToDefaultStreamRetryIT.java | 111 ------------------ 2 files changed, 217 deletions(-) delete mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java delete mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java deleted file mode 100644 index cdd0fb143b..0000000000 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamRetryIT.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.bigquerystorage; - -import static com.google.common.truth.Truth.assertThat; -import static junit.framework.TestCase.assertNotNull; - -import com.google.api.gax.retrying.RetrySettings; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.UUID; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.threeten.bp.Duration; - -@RunWith(JUnit4.class) -public class WriteCommittedStreamRetryIT { - - private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); - - private ByteArrayOutputStream bout; - private PrintStream out; - private BigQuery bigquery; - private String datasetName; - private String tableName; - - private static void requireEnvVar(String varName) { - assertNotNull( - "Environment variable " + varName + " is required to perform these tests.", - System.getenv(varName)); - } - - @BeforeClass - public static void checkRequirements() { - requireEnvVar("GOOGLE_CLOUD_PROJECT"); - } - - @Before - public void setUp() { - bout = new ByteArrayOutputStream(); - out = new PrintStream(bout); - System.setOut(out); - - bigquery = BigQueryOptions.getDefaultInstance().getService(); - - // Create a new dataset and table for each test. - datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); - tableName = "COMMITTED_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); - Schema schema = Schema.of(Field.of("col1", StandardSQLTypeName.STRING)); - bigquery.create(DatasetInfo.newBuilder(datasetName).build()); - TableInfo tableInfo = - TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) - .build(); - bigquery.create(tableInfo); - } - - @After - public void tearDown() { - bigquery.delete( - DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); - System.setOut(null); - } - - @Test - public void testWriteCommittedStreamRetry() throws Exception { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(50)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(20) - .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) - .build(); - WriteCommittedStream.writeCommittedStream( - GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); - assertThat(bout.toString()).contains("Appended records successfully."); - } -} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java deleted file mode 100644 index efba87dd11..0000000000 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamRetryIT.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.bigquerystorage; - -import static com.google.common.truth.Truth.assertThat; -import static junit.framework.TestCase.assertNotNull; - -import com.google.api.gax.retrying.RetrySettings; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.UUID; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.threeten.bp.Duration; - -@RunWith(JUnit4.class) -public class WriteToDefaultStreamRetryIT { - - private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); - - private ByteArrayOutputStream bout; - private PrintStream out; - private BigQuery bigquery; - private String datasetName; - private String tableName; - - private static void requireEnvVar(String varName) { - assertNotNull( - "Environment variable " + varName + " is required to perform these tests.", - System.getenv(varName)); - } - - @BeforeClass - public static void checkRequirements() { - requireEnvVar("GOOGLE_CLOUD_PROJECT"); - } - - @Before - public void setUp() { - bout = new ByteArrayOutputStream(); - out = new PrintStream(bout); - System.setOut(out); - - bigquery = BigQueryOptions.getDefaultInstance().getService(); - - // Create a new dataset and table for each test. - datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); - tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); - Schema schema = - Schema.of( - com.google.cloud.bigquery.Field.newBuilder("test_string", StandardSQLTypeName.STRING) - .setMaxLength(20L) - .build(), - com.google.cloud.bigquery.Field.newBuilder("test_bytes", StandardSQLTypeName.BYTES) - .build()); - bigquery.create(DatasetInfo.newBuilder(datasetName).build()); - TableInfo tableInfo = - TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) - .build(); - bigquery.create(tableInfo); - } - - @After - public void tearDown() { - bigquery.delete( - DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); - System.setOut(null); - } - - @Test - public void testWriteToDefaultStreamRetry() throws Exception { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(50)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(20) - .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) - .build(); - WriteToDefaultStream.writeToDefaultStream( - GOOGLE_CLOUD_PROJECT, datasetName, tableName, retrySettings); - assertThat(bout.toString()).contains("Appended records successfully."); - } -} From 97614b1230d64982442fd9da1b1ffbb17c0d7ece Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 17:36:13 +0000 Subject: [PATCH 07/28] Remove backoff-related retry settings for non quota tests --- .../storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index 776e2c2114..150021030b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -102,10 +102,7 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) .setMaxAttempts(5) - .setMaxRetryDelay(Duration.ofMinutes(1)) .build(); String tableName = "CommittedRetry"; TableId tableId = TableId.of(DATASET, tableName); From 52cb39e8bf5b9a7acafdfa05eeb636836752d905 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 19:50:27 +0000 Subject: [PATCH 08/28] Update kokoro build to ignore retry tests until retry-specific Kokoro configurations are in place. Add comments explaining usage of retry-specific projects. --- .kokoro/build.sh | 1 + .../storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java | 2 ++ .../bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java | 2 ++ 3 files changed, 5 insertions(+) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 53e7c2e74b..cec58c7da5 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,6 +65,7 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ + -Dtest=\!ITBigQueryWriteNonQuotaRetryTest,\!ITBigQueryWriteQuotaRetryTest -fae \ verify RETURN_CODE=$? diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index 150021030b..f7eec6edc8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -58,6 +58,8 @@ public class ITBigQueryWriteNonQuotaRetryTest { private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + // This project is configured on the server to inject INTERNAL in-stream errors every 10 messages. + // This is done to verify in-stream message retries. private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test"; private static BigQueryWriteClient client; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java index 0cbf3b00b9..0ecf005329 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -58,6 +58,8 @@ public class ITBigQueryWriteQuotaRetryTest { private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + // This project is configured on the server to inject RESOURCE_EXHAUSTED in-stream errors every + // 10 messages. This is done to verify in-stream message retries. private static final String QUOTA_RETRY_PROJECT_ID = "bq-writeapi-java-quota-retry"; private static BigQueryWriteClient client; private static BigQuery bigquery; From 9e8506e582b4144fc5aa9efe2d48ba86a0b944ce Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 19:52:56 +0000 Subject: [PATCH 09/28] Run format --- .../it/ITBigQueryWriteNonQuotaRetryTest.java | 39 +++++++------------ .../v1/it/ITBigQueryWriteQuotaRetryTest.java | 37 +++++++----------- 2 files changed, 30 insertions(+), 46 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index f7eec6edc8..19f400dda3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -75,14 +75,15 @@ public static void beforeClass() throws IOException { DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); - TableInfo tableInfo = TableInfo.newBuilder( - TableId.of(DATASET, TABLE), - StandardTableDefinition.of( - Schema.of( - Field.newBuilder("foo", LegacySQLTypeName.STRING) - .setMode(Field.Mode.NULLABLE) - .build()))) - .build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); bigquery.create(tableInfo); } @@ -100,12 +101,8 @@ public static void afterClass() { @Test public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() - throws IOException, InterruptedException, - DescriptorValidationException { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setMaxAttempts(5) - .build(); + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(5).build(); String tableName = "CommittedRetry"; TableId tableId = TableId.of(DATASET, tableName); Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); @@ -123,8 +120,7 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() .build()); int totalRequest = 901; int rowBatch = 1; - ArrayList> allResponses = - new ArrayList<>(totalRequest); + ArrayList> allResponses = new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) .setRetrySettings(retrySettings) @@ -155,8 +151,7 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() @Test public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() - throws IOException, InterruptedException, - DescriptorValidationException { + throws IOException, InterruptedException, DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(500)) @@ -176,10 +171,7 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() TableInfo.newBuilder( TableId.of(DATASET, tableName), StandardTableDefinition.of( - Schema.of( - Field.newBuilder( - "test_str", StandardSQLTypeName.STRING) - .build()))) + Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) .build(); bigquery.create(tableInfo); @@ -187,8 +179,7 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() int totalRequest = 901; int rowBatch = 1; - ArrayList> allResponses = - new ArrayList<>(totalRequest); + ArrayList> allResponses = new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(parent.toString(), tableSchema) .setIgnoreUnknownFields(true) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java index 0ecf005329..046411bcbd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -53,8 +53,7 @@ /** Integration tests for BigQuery Write API. */ public class ITBigQueryWriteQuotaRetryTest { - private static final Logger LOG = - Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); + private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; @@ -74,14 +73,15 @@ public static void beforeClass() throws IOException { DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); - TableInfo tableInfo = TableInfo.newBuilder( - TableId.of(DATASET, TABLE), - StandardTableDefinition.of( - Schema.of( - Field.newBuilder("foo", LegacySQLTypeName.STRING) - .setMode(Field.Mode.NULLABLE) - .build()))) - .build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); bigquery.create(tableInfo); } @@ -99,8 +99,7 @@ public static void afterClass() { @Test public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() - throws IOException, InterruptedException, - DescriptorValidationException { + throws IOException, InterruptedException, DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(500)) @@ -125,8 +124,7 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() .build()); int totalRequest = 901; int rowBatch = 1; - ArrayList> allResponses = - new ArrayList<>(totalRequest); + ArrayList> allResponses = new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) .setRetrySettings(retrySettings) @@ -157,8 +155,7 @@ public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() @Test public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() - throws IOException, InterruptedException, - DescriptorValidationException { + throws IOException, InterruptedException, DescriptorValidationException { RetrySettings retrySettings = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(500)) @@ -178,10 +175,7 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() TableInfo.newBuilder( TableId.of(DATASET, tableName), StandardTableDefinition.of( - Schema.of( - Field.newBuilder( - "test_str", StandardSQLTypeName.STRING) - .build()))) + Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) .build(); bigquery.create(tableInfo); @@ -189,8 +183,7 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() int totalRequest = 901; int rowBatch = 1; - ArrayList> allResponses = - new ArrayList<>(totalRequest); + ArrayList> allResponses = new ArrayList<>(totalRequest); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(parent.toString(), tableSchema) .setIgnoreUnknownFields(true) From 872acffc6347b825d2c1e01509214f7b2e87362f Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 20:36:03 +0000 Subject: [PATCH 10/28] Wrap -Dtest args --- .kokoro/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index cec58c7da5..9b06db04e8 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,7 +65,7 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ - -Dtest=\!ITBigQueryWriteNonQuotaRetryTest,\!ITBigQueryWriteQuotaRetryTest + -Dtest="\!ITBigQueryWriteNonQuotaRetryTest,\!ITBigQueryWriteQuotaRetryTest" -fae \ verify RETURN_CODE=$? From 610f03210e326444358ad6e29a7bb597f28af51e Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 20:48:56 +0000 Subject: [PATCH 11/28] Fix integration command --- .kokoro/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 9b06db04e8..fc60a756f8 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,7 +65,7 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ - -Dtest="\!ITBigQueryWriteNonQuotaRetryTest,\!ITBigQueryWriteQuotaRetryTest" + -Dtest="\!ITBigQueryWriteNonQuotaRetryTest,\!ITBigQueryWriteQuotaRetryTest" \ -fae \ verify RETURN_CODE=$? From 18f442c38f7520c0761e308375783bb989fd1e60 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Tue, 31 Oct 2023 20:56:42 +0000 Subject: [PATCH 12/28] Fix integration command --- .kokoro/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index fc60a756f8..614c515195 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,7 +65,7 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ - -Dtest="\!ITBigQueryWriteNonQuotaRetryTest,\!ITBigQueryWriteQuotaRetryTest" \ + -Dtest="!ITBigQueryWrite*RetryTest" \ -fae \ verify RETURN_CODE=$? From 0bbc3ff9d67134ff04612df746f60544a56aba3a Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 1 Nov 2023 16:29:44 +0000 Subject: [PATCH 13/28] Remove -Dtest arg --- .kokoro/build.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 614c515195..53e7c2e74b 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,7 +65,6 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ - -Dtest="!ITBigQueryWrite*RetryTest" \ -fae \ verify RETURN_CODE=$? From f39a19d316733ccb2df77160ae70e38f0138c446 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 3 Nov 2023 17:55:19 +0000 Subject: [PATCH 14/28] Fix integration test to ignore retry tests --- .kokoro/build.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 53e7c2e74b..d6dc21cb30 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,6 +65,8 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ + -Dtest=!ITBigQueryWrite*RetryTest \ + -Dsurefire.failIfNoSpecifiedTests=false \ -fae \ verify RETURN_CODE=$? From 54011435aa81bb5a8055c4670ff7a858d88e58eb Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 3 Nov 2023 18:41:16 +0000 Subject: [PATCH 15/28] Use list instead of regex for ignoring retry tests when integration testing --- .kokoro/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index d6dc21cb30..fff95ec0d6 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,7 +65,7 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ - -Dtest=!ITBigQueryWrite*RetryTest \ + -Dtest=!ITBigQueryWriteQuotaRetryTest,!ITBigQueryWriteNoneQuotaRetryTest \ -Dsurefire.failIfNoSpecifiedTests=false \ -fae \ verify From 5984f9fe87ab86b8d587387c4e1c45b49af6ce82 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Fri, 3 Nov 2023 19:16:51 +0000 Subject: [PATCH 16/28] Fix typo in integration test command --- .kokoro/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index fff95ec0d6..5f72a2eead 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,7 +65,7 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ - -Dtest=!ITBigQueryWriteQuotaRetryTest,!ITBigQueryWriteNoneQuotaRetryTest \ + -Dtest=!ITBigQueryWriteQuotaRetryTest,!ITBigQueryWriteNonQuotaRetryTest \ -Dsurefire.failIfNoSpecifiedTests=false \ -fae \ verify From 8a44d41f12d770e793cb1ebdc9383fac0e68796c Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 15:34:23 +0000 Subject: [PATCH 17/28] Fix ignore retry test settings --- .kokoro/build.sh | 3 ++- pom.xml | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 5f72a2eead..2a2637e002 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,8 +65,9 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ - -Dtest=!ITBigQueryWriteQuotaRetryTest,!ITBigQueryWriteNonQuotaRetryTest \ + -Dit.test=!ITBigQueryWrite*RetryTest \ -Dsurefire.failIfNoSpecifiedTests=false \ + -Dfailsafe.failIfNoSpecifiedTests=false \ -fae \ verify RETURN_CODE=$? diff --git a/pom.xml b/pom.xml index b1dd675bdf..9a3fc4cdb0 100644 --- a/pom.xml +++ b/pom.xml @@ -261,6 +261,12 @@ tutorials + + custom-tests + + **/ITBigQueryWrite*RetryTest.java + + From 8432f8086362ac7178035c854f46c7e5760a0725 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 17:35:51 +0000 Subject: [PATCH 18/28] Add debug logs to see why test fails in github --- .../java/com/google/cloud/bigquery/storage/v1/StreamWriter.java | 1 + .../cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 538bec4e32..da71bcae60 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -375,6 +375,7 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { + log.info(String.format("first: %s, second: %s", this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)); paramsValidatedFailed = "Trace id"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 0724f33546..7a378ed44c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -395,6 +395,7 @@ public void testCloseExternalClient() externalClient) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) + .setLocation("us") .setEnableConnectionPool(true) .build(); streamWriterList.add(sw); From 385d3ea2bc3a66b669e3761ef6750d3b6711eb9d Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 18:04:59 +0000 Subject: [PATCH 19/28] Remove log --- .../java/com/google/cloud/bigquery/storage/v1/StreamWriter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index da71bcae60..538bec4e32 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -375,7 +375,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { - log.info(String.format("first: %s, second: %s", this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)); paramsValidatedFailed = "Trace id"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), From 1de97ceef34b9f8480ee6ca3c6f0d051250e4952 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 18:38:49 +0000 Subject: [PATCH 20/28] Add additional retry-based logging --- .../google/cloud/bigquery/storage/v1/ConnectionWorker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 52373596ce..98825fadd8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -985,14 +985,14 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r Long offset = requestWrapper.message.hasOffset() ? requestWrapper.message.getOffset().getValue() : -1; if (isDefaultStreamName(streamName) || offset == -1) { - log.fine( + log.info( String.format( "Retrying default stream message in stream %s for in-stream error: %s, retry count:" + " %s", streamName, errorCode, requestWrapper.retryCount)); addMessageToFrontOfWaitingQueue(requestWrapper); } else { - log.fine( + log.info( String.format( "Retrying exclusive message in stream %s at offset %d for in-stream error: %s, retry" + " count: %s", @@ -1089,6 +1089,7 @@ private void requestCallback(AppendRowsResponse response) { // Retries need to happen on the same thread as queue locking may occur if (response.hasError()) { if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) { + log.info("Attempting to retry on error: " + response.getError().toString()); return; } } From 3e2ed8159b96d5c53c014806099e0c5972ad468e Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 19:06:02 +0000 Subject: [PATCH 21/28] Remove unused profile --- pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pom.xml b/pom.xml index 9a3fc4cdb0..b1dd675bdf 100644 --- a/pom.xml +++ b/pom.xml @@ -261,12 +261,6 @@ tutorials - - custom-tests - - **/ITBigQueryWrite*RetryTest.java - - From a46d327cfd689d6748c241725f38c861e571a5a6 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 19:33:59 +0000 Subject: [PATCH 22/28] Add more debugging logs for connection worker test --- .../com/google/cloud/bigquery/storage/v1/StreamWriter.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 538bec4e32..c6e40ba41b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -375,6 +375,11 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { + log.info( + String.format( + "first: %s, second: %s", + this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), + builder.traceId)); paramsValidatedFailed = "Trace id"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), From 4b0d1e172a5d7f18b1b8a1bd1dc1e97dec13fa04 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 21:09:09 +0000 Subject: [PATCH 23/28] rearrange builder order --- .../cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 7a378ed44c..c13d1261f9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -393,10 +393,10 @@ public void testCloseExternalClient() StreamWriter.newBuilder( String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i), externalClient) + .setEnableConnectionPool(true) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) .setLocation("us") - .setEnableConnectionPool(true) .build(); streamWriterList.add(sw); } From fdf20d63626031901ed0623cbf57342aa606b9e6 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 21:34:30 +0000 Subject: [PATCH 24/28] Remove debug log --- .../com/google/cloud/bigquery/storage/v1/StreamWriter.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index c6e40ba41b..538bec4e32 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -375,11 +375,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { - log.info( - String.format( - "first: %s, second: %s", - this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), - builder.traceId)); paramsValidatedFailed = "Trace id"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), From d083ac00034dfaec4447b56d7dcd0be8e5cac5f4 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Wed, 8 Nov 2023 22:04:28 +0000 Subject: [PATCH 25/28] Directly add streamwriter to list in connection worker pool test --- .../cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index c13d1261f9..ccae51dcbd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -389,7 +389,7 @@ public void testCloseExternalClient() // Create some stream writers. List streamWriterList = new ArrayList<>(); for (int i = 0; i < 4; i++) { - StreamWriter sw = + streamWriterList.add( StreamWriter.newBuilder( String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i), externalClient) @@ -397,8 +397,7 @@ public void testCloseExternalClient() .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) .setLocation("us") - .build(); - streamWriterList.add(sw); + .build()); } for (long i = 0; i < appendCount; i++) { From 7e29b8eb9625c3fac1d869766810defd32b0206d Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Thu, 9 Nov 2023 17:56:21 +0000 Subject: [PATCH 26/28] Refactor retry tests into helper class --- .../it/ITBigQueryWriteNonQuotaRetryTest.java | 139 ++-------------- .../v1/it/ITBigQueryWriteQuotaRetryTest.java | 141 ++-------------- .../storage/v1/it/WriteRetryTestUtil.java | 157 ++++++++++++++++++ 3 files changed, 192 insertions(+), 245 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index 19f400dda3..ad634f0ea5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -16,52 +16,33 @@ package com.google.cloud.bigquery.storage.v1.it; -import static org.junit.Assert.assertFalse; - -import com.google.api.core.ApiFuture; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.cloud.bigquery.storage.v1.TableFieldSchema; -import com.google.cloud.bigquery.storage.v1.TableName; -import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; import java.util.logging.Logger; -import org.json.JSONArray; -import org.json.JSONObject; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.threeten.bp.Duration; /** Integration tests for BigQuery Write API. */ public class ITBigQueryWriteNonQuotaRetryTest { - private static final Logger LOG = - Logger.getLogger(ITBigQueryWriteNonQuotaRetryTest.class.getName()); + private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; - // This project is configured on the server to inject INTERNAL in-stream errors every 10 messages. - // This is done to verify in-stream message retries. + // This project is configured on the server to inject INTERNAL in-stream errors every + // 10 messages. This is done to verify in-stream message retries. private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test"; - private static BigQueryWriteClient client; private static BigQuery bigquery; @@ -102,109 +83,25 @@ public static void afterClass() { @Test public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(5).build(); - String tableName = "CommittedRetry"; - TableId tableId = TableId.of(DATASET, tableName); - Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); - Schema schema = Schema.of(col1); - TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); - bigquery.create(tableInfo); - TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - WriteStream writeStream = - client.createWriteStream( - CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) - .setWriteStream( - WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - .build()); - int totalRequest = 901; - int rowBatch = 1; - ArrayList> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - Assert.assertEquals( - allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); - } catch (ExecutionException ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } @Test public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(5) - .setMaxRetryDelay(Duration.ofMinutes(1)) - .build(); - String tableName = "JsonTableDefaultStream"; - TableFieldSchema TEST_STRING = - TableFieldSchema.newBuilder() - .setType(TableFieldSchema.Type.STRING) - .setMode(TableFieldSchema.Mode.NULLABLE) - .setName("test_str") - .build(); - TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); - TableInfo tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, tableName), - StandardTableDefinition.of( - Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) - .build(); - - bigquery.create(tableInfo); - TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - int totalRequest = 901; - int rowBatch = 1; - ArrayList> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(parent.toString(), tableSchema) - .setIgnoreUnknownFields(true) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - assertFalse(allResponses.get(i).get().hasError()); - } catch (Exception ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java index 046411bcbd..d7bb4af93f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -16,40 +16,23 @@ package com.google.cloud.bigquery.storage.v1.it; -import static org.junit.Assert.assertFalse; - -import com.google.api.core.ApiFuture; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.cloud.bigquery.storage.v1.TableFieldSchema; -import com.google.cloud.bigquery.storage.v1.TableName; -import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; import java.util.logging.Logger; -import org.json.JSONArray; -import org.json.JSONObject; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.threeten.bp.Duration; /** Integration tests for BigQuery Write API. */ public class ITBigQueryWriteQuotaRetryTest { @@ -98,117 +81,27 @@ public static void afterClass() { } @Test - public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() + public void testJsonStreamWriterCommittedStreamWithQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(5) - .setMaxRetryDelay(Duration.ofMinutes(1)) - .build(); - String tableName = "CommittedRetry"; - TableId tableId = TableId.of(DATASET, tableName); - Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); - Schema schema = Schema.of(col1); - TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); - bigquery.create(tableInfo); - TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - WriteStream writeStream = - client.createWriteStream( - CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) - .setWriteStream( - WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - .build()); - int totalRequest = 901; - int rowBatch = 1; - ArrayList> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - Assert.assertEquals( - allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); - } catch (ExecutionException ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } @Test - public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() + public void testJsonStreamWriterDefaultStreamWithQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(5) - .setMaxRetryDelay(Duration.ofMinutes(1)) - .build(); - String tableName = "JsonTableDefaultStream"; - TableFieldSchema TEST_STRING = - TableFieldSchema.newBuilder() - .setType(TableFieldSchema.Type.STRING) - .setMode(TableFieldSchema.Mode.NULLABLE) - .setName("test_str") - .build(); - TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); - TableInfo tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, tableName), - StandardTableDefinition.of( - Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) - .build(); - - bigquery.create(tableInfo); - TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - int totalRequest = 901; - int rowBatch = 1; - ArrayList> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(parent.toString(), tableSchema) - .setIgnoreUnknownFields(true) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - assertFalse(allResponses.get(i).get().hasError()); - } catch (Exception ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java new file mode 100644 index 0000000000..d514522b2a --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java @@ -0,0 +1,157 @@ +package com.google.cloud.bigquery.storage.v1.it; + +import static org.junit.Assert.assertFalse; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Assert; +import org.threeten.bp.Duration; + +public class WriteRetryTestUtil { + private static final Logger LOG = + Logger.getLogger( + com.google.cloud.bigquery.storage.v1.it.ITBigQueryWriteQuotaRetryTest.class.getName()); + + public static void runExclusiveRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + WriteStream.Type streamType, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "RetryTest"; + TableId tableId = TableId.of(dataset, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(WriteStream.newBuilder().setType(streamType).build()) + .build()); + ArrayList> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatchSize)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatchSize); + } catch (ExecutionException ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } + + public static void runDefaultRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "JsonTableDefaultStream"; + TableFieldSchema TEST_STRING = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("test_str") + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(dataset, tableName), + StandardTableDefinition.of( + Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) + .build(); + + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + ArrayList> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(parent.toString(), tableSchema) + .setIgnoreUnknownFields(true) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + assertFalse(allResponses.get(i).get().hasError()); + } catch (Exception ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } +} From 4803de7a09a28b8f5dece39cdc3da8af4ca9303e Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Thu, 9 Nov 2023 18:14:53 +0000 Subject: [PATCH 27/28] Fix file headers --- .../v1/it/ITBigQueryWriteNonQuotaRetryTest.java | 2 +- .../v1/it/ITBigQueryWriteQuotaRetryTest.java | 2 +- .../storage/v1/it/WriteRetryTestUtil.java | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index ad634f0ea5..3493fb0255 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java index d7bb4af93f..f567ca5487 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java index d514522b2a..44d3dd307a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.bigquery.storage.v1.it; import static org.junit.Assert.assertFalse; From 057cf8c3f722e9e032d7fbf299f5d754d42f2dea Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 13 Nov 2023 18:19:33 +0000 Subject: [PATCH 28/28] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ccf1fd95ae..5ac5115b67 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.26.0') +implementation platform('com.google.cloud:libraries-bom:26.27.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ```