Skip to content

Commit

Permalink
fix: fix error handling in segment reader and add an integration test (
Browse files Browse the repository at this point in the history
…#30147)

* fix: fix error handling in segment reader and add an integration test

* also check for root cause

* revert getRootCause change

* fix format

* add a log line

* remove logging

* change the log level to debug, update comment
  • Loading branch information
mutianf authored Jan 31, 2024
1 parent 89d1c06 commit a221f98
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public void onResponse(Row response) {
currentByteSize += response.getSerializedSize();
rows.add(response);
if (currentByteSize > maxSegmentByteSize) {
LOG.debug(
"Reached maxSegmentByteSize, cancelling the stream. currentByteSize is {}, maxSegmentByteSize is {}, read rows {}",
currentByteSize,
maxSegmentByteSize,
rows.size());
byteLimitReached = true;
controller.cancel();
return;
Expand All @@ -395,14 +400,25 @@ public void onResponse(Row response) {

@Override
public void onError(Throwable t) {
future.setException(t);
if (byteLimitReached) {
// When the byte limit is reached we cancel the stream in onResponse.
// In this case we don't want to fail the request with cancellation
// exception. Instead, we construct the next request.
onComplete();
} else {
future.setException(t);
}
}

@Override
public void onComplete() {
ReadRowsRequest nextNextRequest = null;

// When requested rows < limit, the current request will be the last
// Only schedule the next segment fetch when there's a possibility of more
// data to read. We know there might be more data when the current segment
// ended with the artificial byte limit or the row limit.
// If the RPC ended without hitting the byte limit or row limit, we know
// there's no more data to read and nextNextRequest would be null.
if (byteLimitReached || rows.size() == nextRequest.getRowsLimit()) {
nextNextRequest =
truncateRequest(nextRequest, rows.get(rows.size() - 1).getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,88 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.io.IOException;
import java.util.Date;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** End-to-end tests of BigtableRead. */
@RunWith(JUnit4.class)
public class BigtableReadIT {
private static final String COLUMN_FAMILY_NAME = "cf";

@Test
public void testE2EBigtableRead() throws Exception {
private String project;

private BigtableTestOptions options;
private String tableId = String.format("BigtableReadIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());

private BigtableDataClient client;
private BigtableTableAdminClient tableAdminClient;

@Before
public void setup() throws IOException {
PipelineOptionsFactory.register(BigtableTestOptions.class);
BigtableTestOptions options =
TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);

String project = options.getBigtableProject();
options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);

project = options.getBigtableProject();

if (project.equals("")) {
project = options.as(GcpOptions.class).getProject();
}

BigtableDataSettings veneerSettings =
BigtableDataSettings.newBuilder()
.setProjectId(project)
.setInstanceId(options.getInstanceId())
.build();

BigtableTableAdminSettings adminSettings =
BigtableTableAdminSettings.newBuilder()
.setProjectId(project)
.setInstanceId(options.getInstanceId())
.build();

client = BigtableDataClient.create(veneerSettings);
tableAdminClient = BigtableTableAdminClient.create(adminSettings);
}

@After
public void tearDown() {
if (tableAdminClient != null) {
try {
tableAdminClient.deleteTable(tableId);
} catch (Exception e) {
// ignore exceptions
}
tableAdminClient.close();
}

if (client != null) {
client.close();
}
}

@Test
public void testE2EBigtableRead() {
BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

Expand All @@ -57,4 +112,32 @@ public void testE2EBigtableRead() throws Exception {
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
}

@Test
public void testE2EBigtableSegmentRead() {
tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(COLUMN_FAMILY_NAME));

final long numRows = 20L;
String value = StringUtils.repeat("v", 100 * 1000 * 1000);
// populate a table with large rows, each row is 100 MB. This will make the pipeline reach
// segment reader memory limit with 8 rows when running on ec2-standard.
for (int i = 0; i < numRows; i++) {
client.mutateRow(
RowMutation.create(tableId, "key-" + i).setCell(COLUMN_FAMILY_NAME, "q", value));
}

BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

Pipeline p = Pipeline.create(options);
PCollection<Long> count =
p.apply(
BigtableIO.read()
.withBigtableOptions(bigtableOptionsBuilder)
.withTableId(tableId)
.withMaxBufferElementCount(10))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
}
}

0 comments on commit a221f98

Please sign in to comment.