From a221f98a5f46be985afcb98a65fcec3b46b81f92 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 31 Jan 2024 14:43:34 -0500 Subject: [PATCH] fix: fix error handling in segment reader and add an integration test (#30147) * fix: fix error handling in segment reader and add an integration test * also check for root cause * revert getRootCause change * fix format * add a log line * remove logging * change the log level to debug, update comment --- .../io/gcp/bigtable/BigtableServiceImpl.java | 20 +++- .../sdk/io/gcp/bigtable/BigtableReadIT.java | 93 ++++++++++++++++++- 2 files changed, 106 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 7537a16a188b..d6208be1bf94 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -387,6 +387,11 @@ public void onResponse(Row response) { currentByteSize += response.getSerializedSize(); rows.add(response); if (currentByteSize > maxSegmentByteSize) { + LOG.debug( + "Reached maxSegmentByteSize, cancelling the stream. currentByteSize is {}, maxSegmentByteSize is {}, read rows {}", + currentByteSize, + maxSegmentByteSize, + rows.size()); byteLimitReached = true; controller.cancel(); return; @@ -395,14 +400,25 @@ public void onResponse(Row response) { @Override public void onError(Throwable t) { - future.setException(t); + if (byteLimitReached) { + // When the byte limit is reached we cancel the stream in onResponse. + // In this case we don't want to fail the request with cancellation + // exception. Instead, we construct the next request. + onComplete(); + } else { + future.setException(t); + } } @Override public void onComplete() { ReadRowsRequest nextNextRequest = null; - // When requested rows < limit, the current request will be the last + // Only schedule the next segment fetch when there's a possibility of more + // data to read. We know there might be more data when the current segment + // ended with the artificial byte limit or the row limit. + // If the RPC ended without hitting the byte limit or row limit, we know + // there's no more data to read and nextNextRequest would be null. if (byteLimitReached || rows.size() == nextRequest.getRowsLimit()) { nextNextRequest = truncateRequest(nextRequest, rows.get(rows.size() - 1).getKey()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index 704a895992c2..bc88858ebc33 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -17,7 +17,16 @@ */ package org.apache.beam.sdk.io.gcp.bigtable; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import java.io.IOException; +import java.util.Date; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -25,6 +34,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.values.PCollection; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -32,18 +43,62 @@ /** End-to-end tests of BigtableRead. */ @RunWith(JUnit4.class) public class BigtableReadIT { + private static final String COLUMN_FAMILY_NAME = "cf"; - @Test - public void testE2EBigtableRead() throws Exception { + private String project; + + private BigtableTestOptions options; + private String tableId = String.format("BigtableReadIT-%tF-% count = + p.apply( + BigtableIO.read() + .withBigtableOptions(bigtableOptionsBuilder) + .withTableId(tableId) + .withMaxBufferElementCount(10)) + .apply(Count.globally()); + PAssert.thatSingleton(count).isEqualTo(numRows); + p.run(); + } }