From 2df610d86aa955085c324e0deb5cb1782a835947 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 10 Nov 2023 22:30:56 +0000 Subject: [PATCH] . --- .../bigquery/storage/v1/ConnectionWorker.java | 11 ++-------- .../storage/v1/ConnectionWorkerPool.java | 14 ++++--------- .../bigquery/storage/v1/StreamWriter.java | 9 +------- .../storage/v1/ConnectionWorkerTest.java | 21 +++++++------------ .../it/ITBigQueryWriteManualClientTest.java | 11 +++++----- 5 files changed, 20 insertions(+), 46 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 406a5112bf..0b46468753 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -235,8 +235,6 @@ class ConnectionWorker implements AutoCloseable { private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null; private long testOnlyAppendLoopSleepTime = 0; - private boolean enableLargerRequest = false; - /* * Tracks the number of responses to ignore in the case of exclusive stream retry */ @@ -262,10 +260,7 @@ public static Boolean isDefaultStreamName(String streamName) { /** The maximum size of one request. Defined by the API. */ public long getApiMaxRequestBytes() { - if (enableLargerRequest) { - return 20L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) - } - return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) + return 20L * 1000L * 1000L; // 20 megabytes (https://en.wikipedia.org/wiki/Megabyte) } static String extractProjectName(String streamName) { @@ -294,8 +289,7 @@ public ConnectionWorker( String traceId, @Nullable String compressorName, BigQueryWriteSettings clientSettings, - RetrySettings retrySettings, - boolean enableLargerRequest) + RetrySettings retrySettings) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); @@ -318,7 +312,6 @@ public ConnectionWorker( this.inflightRequestQueue = new LinkedList(); this.compressorName = compressorName; this.retrySettings = retrySettings; - this.enableLargerRequest = enableLargerRequest; // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 69377c360e..cbf9b8a839 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -301,10 +301,7 @@ private ConnectionWorker createOrReuseConnectionWorker( if (connectionWorkerPool.size() < currentMaxConnectionCount) { // Always create a new connection if we haven't reached current maximum. return createConnectionWorker( - streamWriter.getStreamName(), - streamWriter.getLocation(), - streamWriter.getProtoSchema(), - streamWriter.getEnableLargerRequest()); + streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); } else { ConnectionWorker existingBestConnection = pickBestLoadConnection( @@ -323,8 +320,7 @@ private ConnectionWorker createOrReuseConnectionWorker( return createConnectionWorker( streamWriter.getStreamName(), streamWriter.getLocation(), - streamWriter.getProtoSchema(), - streamWriter.getEnableLargerRequest()); + streamWriter.getProtoSchema()); } else { // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { @@ -380,8 +376,7 @@ static ConnectionWorker pickBestLoadConnection( * computeIfAbsent(...) which is at most once per key. */ private ConnectionWorker createConnectionWorker( - String streamName, String location, ProtoSchema writeSchema, boolean enableLargeRequest) - throws IOException { + String streamName, String location, ProtoSchema writeSchema) throws IOException { if (enableTesting) { // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); @@ -398,8 +393,7 @@ private ConnectionWorker createConnectionWorker( traceId, compressorName, clientSettings, - retrySettings, - enableLargeRequest); + retrySettings); connectionWorkerPool.add(connectionWorker); log.info( String.format( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 6fdc94bfdd..b9d16ca84a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -82,11 +82,6 @@ public class StreamWriter implements AutoCloseable { */ private final String location; - /* - * If larger request is enabled. - */ - private final Boolean enableLargerRequest; - /* * If user has closed the StreamWriter. */ @@ -216,7 +211,6 @@ private StreamWriter(Builder builder) throws IOException { this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation; - this.enableLargerRequest = builder.enableLargerRequest; BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); if (!builder.enableConnectionPool) { this.location = builder.location; @@ -233,8 +227,7 @@ private StreamWriter(Builder builder) throws IOException { builder.traceId, builder.compressorName, clientSettings, - builder.retrySettings, - builder.enableLargerRequest)); + builder.retrySettings)); } else { if (!isDefaultStream(streamName)) { log.warning( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 7be11c2afa..71e4d47673 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -343,8 +343,7 @@ public void testAppendButInflightQueueFull() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings, - false); + retrySettings); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -401,8 +400,7 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings, - false); + retrySettings); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -471,8 +469,7 @@ public void testLocationMismatch() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings, - false); + retrySettings); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -504,8 +501,7 @@ public void testStreamNameMismatch() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings, - false); + retrySettings); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -558,8 +554,7 @@ private ConnectionWorker createConnectionWorker( TEST_TRACE_ID, null, client.getSettings(), - retrySettings, - false); + retrySettings); } private ProtoSchema createProtoSchema(String protoName) { @@ -654,8 +649,7 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E TEST_TRACE_ID, null, client.getSettings(), - retrySettings, - false); + retrySettings); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); long appendCount = 10; @@ -717,8 +711,7 @@ public void testLongTimeIdleWontFail() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings, - false); + retrySettings); long appendCount = 10; for (int i = 0; i < appendCount * 2; i++) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 3087fe1013..e88711dd64 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -1614,8 +1614,7 @@ public void testDefaultRequestLimit() TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build(); bigquery.create(tableInfo); - TableName parent = - TableName.of(ServiceOptions.getDefaultProjectId(), datasetId.getDataset(), tableName); + TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName); try (StreamWriter streamWriter = StreamWriter.newBuilder(parent.toString() + "/_default") .setWriterSchema(CreateProtoSchemaWithColField()) @@ -1634,9 +1633,11 @@ public void testDefaultRequestLimit() // This verifies that the Beam connector can consume this custom exception's grpc // StatusCode assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode()); - assertEquals( - "MessageSize is too large. Max allow: 10000000 Actual: 19922986", - actualError.getStatus().getDescription()); + assertThat( + actualError + .getStatus() + .getDescription() + .contains("AppendRows request too large: 19923131 limit 10485760")); } } } finally {