diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index 19f400dda3..ad634f0ea5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -16,52 +16,33 @@ package com.google.cloud.bigquery.storage.v1.it; -import static org.junit.Assert.assertFalse; - -import com.google.api.core.ApiFuture; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.cloud.bigquery.storage.v1.TableFieldSchema; -import com.google.cloud.bigquery.storage.v1.TableName; -import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; import java.util.logging.Logger; -import org.json.JSONArray; -import org.json.JSONObject; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.threeten.bp.Duration; /** Integration tests for BigQuery Write API. */ public class ITBigQueryWriteNonQuotaRetryTest { - private static final Logger LOG = - Logger.getLogger(ITBigQueryWriteNonQuotaRetryTest.class.getName()); + private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; - // This project is configured on the server to inject INTERNAL in-stream errors every 10 messages. - // This is done to verify in-stream message retries. + // This project is configured on the server to inject INTERNAL in-stream errors every + // 10 messages. This is done to verify in-stream message retries. private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test"; - private static BigQueryWriteClient client; private static BigQuery bigquery; @@ -102,109 +83,25 @@ public static void afterClass() { @Test public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(5).build(); - String tableName = "CommittedRetry"; - TableId tableId = TableId.of(DATASET, tableName); - Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); - Schema schema = Schema.of(col1); - TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); - bigquery.create(tableInfo); - TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - WriteStream writeStream = - client.createWriteStream( - CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) - .setWriteStream( - WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - .build()); - int totalRequest = 901; - int rowBatch = 1; - ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - Assert.assertEquals( - allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); - } catch (ExecutionException ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } @Test public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(5) - .setMaxRetryDelay(Duration.ofMinutes(1)) - .build(); - String tableName = "JsonTableDefaultStream"; - TableFieldSchema TEST_STRING = - TableFieldSchema.newBuilder() - .setType(TableFieldSchema.Type.STRING) - .setMode(TableFieldSchema.Mode.NULLABLE) - .setName("test_str") - .build(); - TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); - TableInfo tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, tableName), - StandardTableDefinition.of( - Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) - .build(); - - bigquery.create(tableInfo); - TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - int totalRequest = 901; - int rowBatch = 1; - ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(parent.toString(), tableSchema) - .setIgnoreUnknownFields(true) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - assertFalse(allResponses.get(i).get().hasError()); - } catch (Exception ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java index 046411bcbd..d7bb4af93f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -16,40 +16,23 @@ package com.google.cloud.bigquery.storage.v1.it; -import static org.junit.Assert.assertFalse; - -import com.google.api.core.ApiFuture; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.cloud.bigquery.storage.v1.TableFieldSchema; -import com.google.cloud.bigquery.storage.v1.TableName; -import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; import java.util.logging.Logger; -import org.json.JSONArray; -import org.json.JSONObject; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.threeten.bp.Duration; /** Integration tests for BigQuery Write API. */ public class ITBigQueryWriteQuotaRetryTest { @@ -98,117 +81,27 @@ public static void afterClass() { } @Test - public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() + public void testJsonStreamWriterCommittedStreamWithQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(5) - .setMaxRetryDelay(Duration.ofMinutes(1)) - .build(); - String tableName = "CommittedRetry"; - TableId tableId = TableId.of(DATASET, tableName); - Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); - Schema schema = Schema.of(col1); - TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); - bigquery.create(tableInfo); - TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - WriteStream writeStream = - client.createWriteStream( - CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) - .setWriteStream( - WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - .build()); - int totalRequest = 901; - int rowBatch = 1; - ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - Assert.assertEquals( - allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); - } catch (ExecutionException ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } @Test - public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() + public void testJsonStreamWriterDefaultStreamWithQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) - .setMaxAttempts(5) - .setMaxRetryDelay(Duration.ofMinutes(1)) - .build(); - String tableName = "JsonTableDefaultStream"; - TableFieldSchema TEST_STRING = - TableFieldSchema.newBuilder() - .setType(TableFieldSchema.Type.STRING) - .setMode(TableFieldSchema.Mode.NULLABLE) - .setName("test_str") - .build(); - TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); - TableInfo tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, tableName), - StandardTableDefinition.of( - Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) - .build(); - - bigquery.create(tableInfo); - TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName); - - int totalRequest = 901; - int rowBatch = 1; - ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest); - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(parent.toString(), tableSchema) - .setIgnoreUnknownFields(true) - .setRetrySettings(retrySettings) - .build()) { - for (int k = 0; k < totalRequest; k++) { - JSONObject row = new JSONObject(); - row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); - JSONArray jsonArr = new JSONArray(); - // 3MB batch. - for (int j = 0; j < rowBatch; j++) { - jsonArr.put(row); - } - LOG.info("Appending: " + k + "/" + totalRequest); - allResponses.add(jsonStreamWriter.append(jsonArr)); - } - LOG.info("Waiting for all responses to come back"); - for (int i = 0; i < totalRequest; i++) { - LOG.info("Waiting for request " + i); - try { - assertFalse(allResponses.get(i).get().hasError()); - } catch (Exception ex) { - Assert.fail("Unexpected error " + ex); - } - } - } + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java new file mode 100644 index 0000000000..d514522b2a --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java @@ -0,0 +1,157 @@ +package com.google.cloud.bigquery.storage.v1.it; + +import static org.junit.Assert.assertFalse; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Assert; +import org.threeten.bp.Duration; + +public class WriteRetryTestUtil { + private static final Logger LOG = + Logger.getLogger( + com.google.cloud.bigquery.storage.v1.it.ITBigQueryWriteQuotaRetryTest.class.getName()); + + public static void runExclusiveRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + WriteStream.Type streamType, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "RetryTest"; + TableId tableId = TableId.of(dataset, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(WriteStream.newBuilder().setType(streamType).build()) + .build()); + ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatchSize)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatchSize); + } catch (ExecutionException ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } + + public static void runDefaultRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "JsonTableDefaultStream"; + TableFieldSchema TEST_STRING = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("test_str") + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(dataset, tableName), + StandardTableDefinition.of( + Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) + .build(); + + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(parent.toString(), tableSchema) + .setIgnoreUnknownFields(true) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + assertFalse(allResponses.get(i).get().hasError()); + } catch (Exception ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } +}