Skip to content

Commit

Permalink
fix: fix error handling in segment reader and add an integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jan 29, 2024
1 parent ee4f8cb commit d64b076
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,14 @@ public void onResponse(Row response) {

@Override
public void onError(Throwable t) {
future.setException(t);
if (byteLimitReached) {
// When the byte limit is reached we cancel the stream in onResponse.
// In this case we don't want to fail the request with cancellation
// exception. Instead, we construct the next request.
onComplete();
} else {
future.setException(t);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,88 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.io.IOException;
import java.util.Date;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** End-to-end tests of BigtableRead. */
@RunWith(JUnit4.class)
public class BigtableReadIT {
private static final String COLUMN_FAMILY_NAME = "cf";

@Test
public void testE2EBigtableRead() throws Exception {
private String project;

private BigtableTestOptions options;
private String tableId = String.format("BigtableReadIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());

private BigtableDataClient client;
private BigtableTableAdminClient tableAdminClient;

@Before
public void setup() throws IOException {
PipelineOptionsFactory.register(BigtableTestOptions.class);
BigtableTestOptions options =
TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);

String project = options.getBigtableProject();
options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);

project = options.getBigtableProject();

if (project.equals("")) {
project = options.as(GcpOptions.class).getProject();
}

BigtableDataSettings veneerSettings =
BigtableDataSettings.newBuilder()
.setProjectId(project)
.setInstanceId(options.getInstanceId())
.build();

BigtableTableAdminSettings adminSettings =
BigtableTableAdminSettings.newBuilder()
.setProjectId(project)
.setInstanceId(options.getInstanceId())
.build();

client = BigtableDataClient.create(veneerSettings);
tableAdminClient = BigtableTableAdminClient.create(adminSettings);
}

@After
public void tearDown() {
if (tableAdminClient != null) {
try {
tableAdminClient.deleteTable(tableId);
} catch (Exception e) {
// ignore exceptions
}
tableAdminClient.close();
}

if (client != null) {
client.close();
}
}

@Test
public void testE2EBigtableRead() {
BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

Expand All @@ -57,4 +112,32 @@ public void testE2EBigtableRead() throws Exception {
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
}

@Test
public void testE2EBigtableSegmentRead() {
tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(COLUMN_FAMILY_NAME));

final long numRows = 20L;
String value = StringUtils.repeat("v", 100 * 1000 * 1000);
// populate a table with large rows, each row is 100 MB. This will make the pipeline reach
// segment reader memory limit with 8 rows when running on ec2-standard.
for (int i = 0; i < numRows; i++) {
client.mutateRow(
RowMutation.create(tableId, "key-" + i).setCell(COLUMN_FAMILY_NAME, "q", value));
}

BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

Pipeline p = Pipeline.create(options);
PCollection<Long> count =
p.apply(
BigtableIO.read()
.withBigtableOptions(bigtableOptionsBuilder)
.withTableId(tableId)
.withMaxBufferElementCount(10))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
}
}

0 comments on commit d64b076

Please sign in to comment.