Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix error handling in segment reader and add an integration test #30147

Merged
merged 7 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public void onResponse(Row response) {
currentByteSize += response.getSerializedSize();
rows.add(response);
if (currentByteSize > maxSegmentByteSize) {
LOG.debug(
"Reached maxSegmentByteSize, cancelling the stream. currentByteSize is {}, maxSegmentByteSize is {}, read rows {}",
currentByteSize,
maxSegmentByteSize,
rows.size());
byteLimitReached = true;
controller.cancel();
return;
Expand All @@ -395,14 +400,25 @@ public void onResponse(Row response) {

@Override
public void onError(Throwable t) {
future.setException(t);
if (byteLimitReached) {
// When the byte limit is reached we cancel the stream in onResponse.
// In this case we don't want to fail the request with cancellation
// exception. Instead, we construct the next request.
onComplete();
mutianf marked this conversation as resolved.
Show resolved Hide resolved
} else {
future.setException(t);
}
}

@Override
public void onComplete() {
ReadRowsRequest nextNextRequest = null;

// When requested rows < limit, the current request will be the last
// Only schedule the next segment fetch when there's a possibility of more
// data to read. We know there might be more data when the current segment
// ended with the artificial byte limit or the row limit.
// If the RPC ended without hitting the byte limit or row limit, we know
// there's no more data to read and nextNextRequest would be null.
if (byteLimitReached || rows.size() == nextRequest.getRowsLimit()) {
nextNextRequest =
truncateRequest(nextRequest, rows.get(rows.size() - 1).getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,88 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.io.IOException;
import java.util.Date;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** End-to-end tests of BigtableRead. */
@RunWith(JUnit4.class)
public class BigtableReadIT {
private static final String COLUMN_FAMILY_NAME = "cf";

@Test
public void testE2EBigtableRead() throws Exception {
private String project;

private BigtableTestOptions options;
private String tableId = String.format("BigtableReadIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());

private BigtableDataClient client;
private BigtableTableAdminClient tableAdminClient;

@Before
public void setup() throws IOException {
PipelineOptionsFactory.register(BigtableTestOptions.class);
BigtableTestOptions options =
TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);

String project = options.getBigtableProject();
options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);

project = options.getBigtableProject();

if (project.equals("")) {
project = options.as(GcpOptions.class).getProject();
}

BigtableDataSettings veneerSettings =
BigtableDataSettings.newBuilder()
.setProjectId(project)
.setInstanceId(options.getInstanceId())
.build();

BigtableTableAdminSettings adminSettings =
BigtableTableAdminSettings.newBuilder()
.setProjectId(project)
.setInstanceId(options.getInstanceId())
.build();

client = BigtableDataClient.create(veneerSettings);
tableAdminClient = BigtableTableAdminClient.create(adminSettings);
}

@After
public void tearDown() {
if (tableAdminClient != null) {
try {
tableAdminClient.deleteTable(tableId);
} catch (Exception e) {
// ignore exceptions
}
tableAdminClient.close();
}

if (client != null) {
client.close();
}
}

@Test
public void testE2EBigtableRead() {
BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

Expand All @@ -57,4 +112,32 @@ public void testE2EBigtableRead() throws Exception {
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
}

@Test
public void testE2EBigtableSegmentRead() {
tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(COLUMN_FAMILY_NAME));

final long numRows = 20L;
String value = StringUtils.repeat("v", 100 * 1000 * 1000);
// populate a table with large rows, each row is 100 MB. This will make the pipeline reach
// segment reader memory limit with 8 rows when running on ec2-standard.
for (int i = 0; i < numRows; i++) {
client.mutateRow(
RowMutation.create(tableId, "key-" + i).setCell(COLUMN_FAMILY_NAME, "q", value));
}

BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

Pipeline p = Pipeline.create(options);
PCollection<Long> count =
p.apply(
BigtableIO.read()
.withBigtableOptions(bigtableOptionsBuilder)
.withTableId(tableId)
.withMaxBufferElementCount(10))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
}
}
Loading