From b83ac93dcc77f2102ff74c13a0a567418f8510b0 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 31 Jan 2024 15:36:18 -0500 Subject: [PATCH] Clean up stale code in BigtableService --- .../sdk/io/gcp/bigtable/BigtableService.java | 7 -- .../io/gcp/bigtable/BigtableServiceImpl.java | 78 ++----------------- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 10 --- .../gcp/bigtable/BigtableServiceImplTest.java | 43 +--------- 4 files changed, 7 insertions(+), 131 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java index 3a3de5622cd2..1e3839b5df4f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletionStage; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.values.KV; -import org.joda.time.Duration; /** An interface for real or fake implementations of Cloud Bigtable. */ interface BigtableService extends Serializable { @@ -76,12 +75,6 @@ interface Reader { * current row because the last such call was unsuccessful. */ Row getCurrentRow() throws NoSuchElementException; - - // Workaround for ReadRows requests which requires to pass the timeouts in - // ApiContext. Can be removed later once it's fixed in Veneer. - Duration getAttemptTimeout(); - - Duration getOperationTimeout(); } /** Returns a {@link Reader} that will read from the specified source. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index d6208be1bf94..dad3370dae60 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -23,7 +23,6 @@ import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatchingException; import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; @@ -48,8 +47,6 @@ import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.protobuf.ByteString; -import io.grpc.CallOptions; -import io.grpc.Deadline; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.ArrayDeque; @@ -108,9 +105,6 @@ class BigtableServiceImpl implements BigtableService { BigtableServiceImpl(BigtableDataSettings settings) throws IOException { this.projectId = settings.getProjectId(); this.instanceId = settings.getInstanceId(); - RetrySettings retry = settings.getStubSettings().readRowsSettings().getRetrySettings(); - this.readAttemptTimeout = Duration.millis(retry.getInitialRpcTimeout().toMillis()); - this.readOperationTimeout = Duration.millis(retry.getTotalTimeout().toMillis()); this.client = BigtableDataClient.create(settings); LOG.info("Started Bigtable service with settings {}", settings); } @@ -119,10 +113,6 @@ class BigtableServiceImpl implements BigtableService { private final String projectId; private final String instanceId; - private final Duration readAttemptTimeout; - - private final Duration readOperationTimeout; - @Override public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) { return new BigtableWriterImpl( @@ -145,9 +135,6 @@ static class BigtableReaderImpl implements Reader { private final RowFilter rowFilter; private Iterator results; - private final Duration attemptTimeout; - private final Duration operationTimeout; - private Row currentRow; @VisibleForTesting @@ -157,18 +144,13 @@ static class BigtableReaderImpl implements Reader { String instanceId, String tableId, List ranges, - @Nullable RowFilter rowFilter, - Duration attemptTimeout, - Duration operationTimeout) { + @Nullable RowFilter rowFilter) { this.client = client; this.projectId = projectId; this.instanceId = instanceId; this.tableId = tableId; this.ranges = ranges; this.rowFilter = rowFilter; - - this.attemptTimeout = attemptTimeout; - this.operationTimeout = operationTimeout; } @Override @@ -189,7 +171,7 @@ public boolean start() throws IOException { results = client .readRowsCallable(new BigtableRowProtoAdapter()) - .call(query, createScanCallContext(attemptTimeout, operationTimeout)) + .call(query, GrpcCallContext.createDefault()) .iterator(); serviceCallMetric.call("ok"); } catch (StatusRuntimeException e) { @@ -215,16 +197,6 @@ public Row getCurrentRow() throws NoSuchElementException { } return currentRow; } - - @Override - public Duration getAttemptTimeout() { - return attemptTimeout; - } - - @Override - public Duration getOperationTimeout() { - return operationTimeout; - } } @VisibleForTesting @@ -238,8 +210,6 @@ static class BigtableSegmentReaderImpl implements Reader { private final int refillSegmentWaterMark; private final long maxSegmentByteSize; private ServiceCallMetric serviceCallMetric; - private final Duration attemptTimeout; - private final Duration operationTimeout; private static class UpstreamResults { private final List rows; @@ -258,9 +228,7 @@ static BigtableSegmentReaderImpl create( String tableId, List ranges, @Nullable RowFilter rowFilter, - int maxBufferedElementCount, - Duration attemptTimeout, - Duration operationTimeout) { + int maxBufferedElementCount) { RowSet.Builder rowSetBuilder = RowSet.newBuilder(); if (ranges.isEmpty()) { @@ -292,8 +260,6 @@ static BigtableSegmentReaderImpl create( filter, maxBufferedElementCount, maxSegmentByteSize, - attemptTimeout, - operationTimeout, createCallMetric(projectId, instanceId, tableId)); } @@ -307,8 +273,6 @@ static BigtableSegmentReaderImpl create( @Nullable RowFilter filter, int maxRowsInBuffer, long maxSegmentByteSize, - Duration attemptTimeout, - Duration operationTimeout, ServiceCallMetric serviceCallMetric) { if (rowSet.equals(rowSet.getDefaultInstanceForType())) { rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build(); @@ -329,8 +293,6 @@ static BigtableSegmentReaderImpl create( // Asynchronously refill buffer when there is 10% of the elements are left this.refillSegmentWaterMark = Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE)); - this.attemptTimeout = attemptTimeout; - this.operationTimeout = operationTimeout; } @Override @@ -426,7 +388,7 @@ public void onComplete() { future.set(new UpstreamResults(rows, nextNextRequest)); } }, - createScanCallContext(attemptTimeout, operationTimeout)); + GrpcCallContext.createDefault()); return future; } @@ -481,16 +443,6 @@ public Row getCurrentRow() throws NoSuchElementException { } return currentRow; } - - @Override - public Duration getAttemptTimeout() { - return attemptTimeout; - } - - @Override - public Duration getOperationTimeout() { - return operationTimeout; - } } @VisibleForTesting @@ -660,9 +612,7 @@ public Reader createReader(BigtableSource source) throws IOException { source.getTableId().get(), source.getRanges(), source.getRowFilter(), - source.getMaxBufferElementCount(), - readAttemptTimeout, - readOperationTimeout); + source.getMaxBufferElementCount()); } else { return new BigtableReaderImpl( client, @@ -670,26 +620,10 @@ public Reader createReader(BigtableSource source) throws IOException { instanceId, source.getTableId().get(), source.getRanges(), - source.getRowFilter(), - readAttemptTimeout, - readOperationTimeout); + source.getRowFilter()); } } - // - per attempt deadlines - veneer doesn't implement deadlines for attempts. To workaround this, - // the timeouts are set per call in the ApiCallContext. However this creates a separate issue of - // over running the operation deadline, so gRPC deadline is also set. - private static GrpcCallContext createScanCallContext( - Duration attemptTimeout, Duration operationTimeout) { - GrpcCallContext ctx = GrpcCallContext.createDefault(); - - ctx.withCallOptions( - CallOptions.DEFAULT.withDeadline( - Deadline.after(operationTimeout.getMillis(), TimeUnit.MILLISECONDS))); - ctx.withTimeout(org.threeten.bp.Duration.ofMillis(attemptTimeout.getMillis())); - return ctx; - } - @Override public List getSampleRowKeys(BigtableSource source) { return client.sampleRowKeys(source.getTableId().get()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index bffca8652089..5efef3212931 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -1846,16 +1846,6 @@ public Row getCurrentRow() { } return currentRow; } - - @Override - public Duration getAttemptTimeout() { - return Duration.millis(100); - } - - @Override - public Duration getOperationTimeout() { - return Duration.millis(1000); - } } /** A {@link FakeBigtableReader} implementation that throw exceptions at given stage. */ diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java index 9a07f625ca1c..ef3b3591f78c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java @@ -29,7 +29,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.Batcher; -import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; @@ -171,8 +170,6 @@ public void testRead() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableReaderImpl( mockBigtableDataClient, @@ -180,9 +177,7 @@ public void testRead() throws IOException { bigtableDataSettings.getInstanceId(), mockBigtableSource.getTableId().get(), mockBigtableSource.getRanges(), - null, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis())); + null); underTest.start(); Assert.assertEquals(expectedRow, underTest.getCurrentRow()); @@ -230,8 +225,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID)); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -242,8 +235,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); underTest.start(); @@ -287,8 +278,6 @@ public void testReadSingleRangeAboveSegmentLimit() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -299,8 +288,6 @@ public void testReadSingleRangeAboveSegmentLimit() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -357,8 +344,6 @@ public void testReadMultipleRanges() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -369,8 +354,6 @@ public void testReadMultipleRanges() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -428,8 +411,6 @@ public void testReadMultipleRangesOverlappingKeys() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -440,8 +421,6 @@ public void testReadMultipleRangesOverlappingKeys() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -484,8 +463,6 @@ public void testReadFullTableScan() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -496,8 +473,6 @@ public void testReadFullTableScan() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -555,8 +530,6 @@ public void testReadFillBuffer() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -567,8 +540,6 @@ public void testReadFillBuffer() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -624,8 +595,6 @@ public void testRefillOnLowWatermark() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -636,8 +605,6 @@ public void testRefillOnLowWatermark() throws IOException { RowFilter.getDefaultInstance(), segmentSize, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); Assert.assertTrue(underTest.start()); @@ -707,8 +674,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -719,8 +684,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { RowFilter.getDefaultInstance(), SEGMENT_SIZE, segmentByteLimit, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -773,8 +736,6 @@ public void run() { mockStub.createReadRowsCallable(new BigtableServiceImpl.BigtableRowProtoAdapter()); // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); - RetrySettings retrySettings = - bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -785,8 +746,6 @@ public void run() { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, - Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), - Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); IOException returnedError = null;