diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 7537a16a188b..9595e84174bd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -395,7 +395,14 @@ public void onResponse(Row response) { @Override public void onError(Throwable t) { - future.setException(t); + if (byteLimitReached) { + // When the byte limit is reached we cancel the stream in onResponse. + // In this case we don't want to fail the request with cancellation + // exception. Instead, we construct the next request. + onComplete(); + } else { + future.setException(t); + } } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index 704a895992c2..bc88858ebc33 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -17,7 +17,16 @@ */ package org.apache.beam.sdk.io.gcp.bigtable; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import java.io.IOException; +import java.util.Date; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -25,6 +34,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.values.PCollection; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -32,18 +43,62 @@ /** End-to-end tests of BigtableRead. */ @RunWith(JUnit4.class) public class BigtableReadIT { + private static final String COLUMN_FAMILY_NAME = "cf"; - @Test - public void testE2EBigtableRead() throws Exception { + private String project; + + private BigtableTestOptions options; + private String tableId = String.format("BigtableReadIT-%tF-% count = + p.apply( + BigtableIO.read() + .withBigtableOptions(bigtableOptionsBuilder) + .withTableId(tableId) + .withMaxBufferElementCount(10)) + .apply(Count.globally()); + PAssert.thatSingleton(count).isEqualTo(numRows); + p.run(); + } }