Skip to content

Commit

Permalink
feat: enable 20MB request limit, this feature is allowlist only. (#2311)
Browse files Browse the repository at this point in the history
* feat: add enableLargerRequestLimit option

* .

* .

* add multiplexing test

* .

* .

* .

* .

* .

* .

* .

* .

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] authored Nov 14, 2023
1 parent 179193a commit 75c2552
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public static Boolean isDefaultStreamName(String streamName) {

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
return 20L * 1000L * 1000L; // 20 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}

static String extractProjectName(String streamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,8 @@ public void run() throws Throwable {
public void testMessageTooLarge() throws Exception {
StreamWriter writer = getTestStreamWriter();

String oversized = Strings.repeat("a", (int) (StreamWriter.getApiMaxRequestBytes() + 1));
// There is an oppotunity to allow 20MB requests.
String oversized = Strings.repeat("a", (int) (StreamWriter.getApiMaxRequestBytes() * 2 + 1));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {oversized});
assertTrue(appendFuture1.isDone());
StatusRuntimeException actualError =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status;
Expand Down Expand Up @@ -208,6 +210,21 @@ ProtoRows CreateProtoRows(String[] messages) {
return rows.build();
}

ProtoSchema CreateProtoSchemaWithColField() {
return ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProto.newBuilder()
.setName("testProto")
.addField(
FieldDescriptorProto.newBuilder()
.setName("col1")
.setNumber(1)
.setType(FieldDescriptorProto.Type.TYPE_STRING)
.build())
.build())
.build();
}

ProtoRows CreateProtoOptionalRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
Expand Down Expand Up @@ -1541,4 +1558,94 @@ public void testMultiplexingMixedLocation()
assertEquals("us", streamWriter2.getLocation());
assertEquals("eu", streamWriter3.getLocation());
}

// Tested locally but project config is frozen and we need to wait for a while to enable the
// test in automatic workflow.
// @Test
// public void testLargeRequest() throws IOException, InterruptedException, ExecutionException {
// String tableName = "largeRequestTable";
// TableId tableId = TableId.of(DATASET, tableName);
// Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
// Schema originalSchema = Schema.of(col1);
// TableInfo tableInfo =
// TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
// bigquery.create(tableInfo);
// TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
// try (StreamWriter streamWriter =
// StreamWriter.newBuilder(parent.toString() + "/_default")
// .setWriterSchema(CreateProtoSchemaWithColField())
// .build()) {
// List<Integer> sizeSet = Arrays.asList(15 * 1024 * 1024, 1024);
// List<ApiFuture<AppendRowsResponse>> responseList =
// new ArrayList<ApiFuture<AppendRowsResponse>>();
// Random r = new Random();
// for (int i = 0; i < 50; i++) {
// int size = sizeSet.get(r.nextInt(2));
// LOG.info("Sending size: " + size);
// responseList.add(
// streamWriter.append(
// CreateProtoRows(
// new String[] {
// new String(new char[size]).replace('\u0000', (char) (r.nextInt(26) + 'a'))
// })));
// }
// for (int i = 0; i < 50; i++) {
// assertFalse(responseList.get(i).get().hasError());
// }
// TableResult queryResult =
// bigquery.query(
// QueryJobConfiguration.newBuilder("SELECT count(*) from " + DATASET + '.' +
// tableName)
// .build());
// Iterator<FieldValueList> queryIter = queryResult.getValues().iterator();
// assertTrue(queryIter.hasNext());
// assertEquals("50", queryIter.next().get(0).getStringValue());
// }
// }

@Test
public void testDefaultRequestLimit()
throws IOException, InterruptedException, ExecutionException {
DatasetId datasetId =
DatasetId.of("bq-write-api-java-retry-test", RemoteBigQueryHelper.generateDatasetName());
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
bigquery.create(datasetInfo);
try {
String tableName = "requestTable";
TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
try (StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(CreateProtoSchemaWithColField())
.build()) {
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
CreateProtoRows(
new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")}));
try {
response.get();
Assert.fail("Large request should fail with InvalidArgumentError");
} catch (ExecutionException ex) {
assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass());
io.grpc.StatusRuntimeException actualError =
(io.grpc.StatusRuntimeException) ex.getCause();
// This verifies that the Beam connector can consume this custom exception's grpc
// StatusCode
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertThat(
actualError
.getStatus()
.getDescription()
.contains("AppendRows request too large: 19923131 limit 10485760"));
}
}
} finally {
RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString());
}
}
}

0 comments on commit 75c2552

Please sign in to comment.