Skip to content

Commit

Permalink
Clean up stale code in BigtableService (#30172)
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf authored May 26, 2024
1 parent 836e77e commit 5c30b1d
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.CompletionStage;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;

/** An interface for real or fake implementations of Cloud Bigtable. */
interface BigtableService extends Serializable {
Expand Down Expand Up @@ -76,12 +75,6 @@ interface Reader {
* current row because the last such call was unsuccessful.
*/
Row getCurrentRow() throws NoSuchElementException;

// Workaround for ReadRows requests which requires to pass the timeouts in
// ApiContext. Can be removed later once it's fixed in Veneer.
Duration getAttemptTimeout();

Duration getOperationTimeout();
}

/** Returns a {@link Reader} that will read from the specified source. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
Expand All @@ -48,8 +47,6 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -108,9 +105,6 @@ class BigtableServiceImpl implements BigtableService {
BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
this.projectId = settings.getProjectId();
this.instanceId = settings.getInstanceId();
RetrySettings retry = settings.getStubSettings().readRowsSettings().getRetrySettings();
this.readAttemptTimeout = Duration.millis(retry.getInitialRpcTimeout().toMillis());
this.readOperationTimeout = Duration.millis(retry.getTotalTimeout().toMillis());
this.client = BigtableDataClient.create(settings);
LOG.info("Started Bigtable service with settings {}", settings);
}
Expand All @@ -119,10 +113,6 @@ class BigtableServiceImpl implements BigtableService {
private final String projectId;
private final String instanceId;

private final Duration readAttemptTimeout;

private final Duration readOperationTimeout;

@Override
public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
return new BigtableWriterImpl(
Expand All @@ -145,9 +135,6 @@ static class BigtableReaderImpl implements Reader {
private final RowFilter rowFilter;
private Iterator<Row> results;

private final Duration attemptTimeout;
private final Duration operationTimeout;

private Row currentRow;

@VisibleForTesting
Expand All @@ -157,18 +144,13 @@ static class BigtableReaderImpl implements Reader {
String instanceId,
String tableId,
List<ByteKeyRange> ranges,
@Nullable RowFilter rowFilter,
Duration attemptTimeout,
Duration operationTimeout) {
@Nullable RowFilter rowFilter) {
this.client = client;
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
this.ranges = ranges;
this.rowFilter = rowFilter;

this.attemptTimeout = attemptTimeout;
this.operationTimeout = operationTimeout;
}

@Override
Expand All @@ -189,7 +171,7 @@ public boolean start() throws IOException {
results =
client
.readRowsCallable(new BigtableRowProtoAdapter())
.call(query, createScanCallContext(attemptTimeout, operationTimeout))
.call(query, GrpcCallContext.createDefault())
.iterator();
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
Expand All @@ -215,16 +197,6 @@ public Row getCurrentRow() throws NoSuchElementException {
}
return currentRow;
}

@Override
public Duration getAttemptTimeout() {
return attemptTimeout;
}

@Override
public Duration getOperationTimeout() {
return operationTimeout;
}
}

@VisibleForTesting
Expand All @@ -238,8 +210,6 @@ static class BigtableSegmentReaderImpl implements Reader {
private final int refillSegmentWaterMark;
private final long maxSegmentByteSize;
private ServiceCallMetric serviceCallMetric;
private final Duration attemptTimeout;
private final Duration operationTimeout;

private static class UpstreamResults {
private final List<Row> rows;
Expand All @@ -258,9 +228,7 @@ static BigtableSegmentReaderImpl create(
String tableId,
List<ByteKeyRange> ranges,
@Nullable RowFilter rowFilter,
int maxBufferedElementCount,
Duration attemptTimeout,
Duration operationTimeout) {
int maxBufferedElementCount) {

RowSet.Builder rowSetBuilder = RowSet.newBuilder();
if (ranges.isEmpty()) {
Expand Down Expand Up @@ -292,8 +260,6 @@ static BigtableSegmentReaderImpl create(
filter,
maxBufferedElementCount,
maxSegmentByteSize,
attemptTimeout,
operationTimeout,
createCallMetric(projectId, instanceId, tableId));
}

Expand All @@ -307,8 +273,6 @@ static BigtableSegmentReaderImpl create(
@Nullable RowFilter filter,
int maxRowsInBuffer,
long maxSegmentByteSize,
Duration attemptTimeout,
Duration operationTimeout,
ServiceCallMetric serviceCallMetric) {
if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
Expand All @@ -329,8 +293,6 @@ static BigtableSegmentReaderImpl create(
// Asynchronously refill buffer when there is 10% of the elements are left
this.refillSegmentWaterMark =
Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
this.attemptTimeout = attemptTimeout;
this.operationTimeout = operationTimeout;
}

@Override
Expand Down Expand Up @@ -426,7 +388,7 @@ public void onComplete() {
future.set(new UpstreamResults(rows, nextNextRequest));
}
},
createScanCallContext(attemptTimeout, operationTimeout));
GrpcCallContext.createDefault());
return future;
}

Expand Down Expand Up @@ -481,16 +443,6 @@ public Row getCurrentRow() throws NoSuchElementException {
}
return currentRow;
}

@Override
public Duration getAttemptTimeout() {
return attemptTimeout;
}

@Override
public Duration getOperationTimeout() {
return operationTimeout;
}
}

@VisibleForTesting
Expand Down Expand Up @@ -660,36 +612,18 @@ public Reader createReader(BigtableSource source) throws IOException {
source.getTableId().get(),
source.getRanges(),
source.getRowFilter(),
source.getMaxBufferElementCount(),
readAttemptTimeout,
readOperationTimeout);
source.getMaxBufferElementCount());
} else {
return new BigtableReaderImpl(
client,
projectId,
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter(),
readAttemptTimeout,
readOperationTimeout);
source.getRowFilter());
}
}

// - per attempt deadlines - veneer doesn't implement deadlines for attempts. To workaround this,
// the timeouts are set per call in the ApiCallContext. However this creates a separate issue of
// over running the operation deadline, so gRPC deadline is also set.
private static GrpcCallContext createScanCallContext(
Duration attemptTimeout, Duration operationTimeout) {
GrpcCallContext ctx = GrpcCallContext.createDefault();

ctx.withCallOptions(
CallOptions.DEFAULT.withDeadline(
Deadline.after(operationTimeout.getMillis(), TimeUnit.MILLISECONDS)));
ctx.withTimeout(org.threeten.bp.Duration.ofMillis(attemptTimeout.getMillis()));
return ctx;
}

@Override
public List<KeyOffset> getSampleRowKeys(BigtableSource source) {
return client.sampleRowKeys(source.getTableId().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,16 +1846,6 @@ public Row getCurrentRow() {
}
return currentRow;
}

@Override
public Duration getAttemptTimeout() {
return Duration.millis(100);
}

@Override
public Duration getOperationTimeout() {
return Duration.millis(1000);
}
}

/** A {@link FakeBigtableReader} implementation that throw exceptions at given stage. */
Expand Down
Loading

0 comments on commit 5c30b1d

Please sign in to comment.