Skip to content

Commit

Permalink
Refactor retry tests into helper class
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Greco committed Nov 9, 2023
1 parent b341de5 commit 7e29b8e
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,33 @@

package com.google.cloud.bigquery.storage.v1.it;

import static org.junit.Assert.assertFalse;

import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.threeten.bp.Duration;

/** Integration tests for BigQuery Write API. */
public class ITBigQueryWriteNonQuotaRetryTest {
private static final Logger LOG =
Logger.getLogger(ITBigQueryWriteNonQuotaRetryTest.class.getName());
private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";
// This project is configured on the server to inject INTERNAL in-stream errors every 10 messages.
// This is done to verify in-stream message retries.
// This project is configured on the server to inject INTERNAL in-stream errors every
// 10 messages. This is done to verify in-stream message retries.
private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test";

private static BigQueryWriteClient client;
private static BigQuery bigquery;

Expand Down Expand Up @@ -102,109 +83,25 @@ public static void afterClass() {
@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(5).build();
String tableName = "CommittedRetry";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema schema = Schema.of(col1);
TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
Assert.assertEquals(
allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch);
} catch (ExecutionException ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runExclusiveRetryTest(
bigquery,
client,
DATASET,
NON_QUOTA_RETRY_PROJECT_ID,
WriteStream.Type.COMMITTED,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

@Test
public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
String tableName = "JsonTableDefaultStream";
TableFieldSchema TEST_STRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_str")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build())))
.build();

bigquery.create(tableInfo);
TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
.setIgnoreUnknownFields(true)
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
assertFalse(allResponses.get(i).get().hasError());
} catch (Exception ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runDefaultRetryTest(
bigquery,
client,
DATASET,
NON_QUOTA_RETRY_PROJECT_ID,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,23 @@

package com.google.cloud.bigquery.storage.v1.it;

import static org.junit.Assert.assertFalse;

import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.threeten.bp.Duration;

/** Integration tests for BigQuery Write API. */
public class ITBigQueryWriteQuotaRetryTest {
Expand Down Expand Up @@ -98,117 +81,27 @@ public static void afterClass() {
}

@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
public void testJsonStreamWriterCommittedStreamWithQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
String tableName = "CommittedRetry";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema schema = Schema.of(col1);
TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
Assert.assertEquals(
allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch);
} catch (ExecutionException ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runExclusiveRetryTest(
bigquery,
client,
DATASET,
QUOTA_RETRY_PROJECT_ID,
WriteStream.Type.COMMITTED,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

@Test
public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
public void testJsonStreamWriterDefaultStreamWithQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
String tableName = "JsonTableDefaultStream";
TableFieldSchema TEST_STRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_str")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build())))
.build();

bigquery.create(tableInfo);
TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
.setIgnoreUnknownFields(true)
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
assertFalse(allResponses.get(i).get().hasError());
} catch (Exception ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runDefaultRetryTest(
bigquery,
client,
DATASET,
QUOTA_RETRY_PROJECT_ID,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}
}
Loading

0 comments on commit 7e29b8e

Please sign in to comment.