Skip to content

Commit

Permalink
feat: copy preview Change Streams API (googleapis#1309)
Browse files Browse the repository at this point in the history
* chore(deps): update dependency com.google.cloud:libraries-bom to v26 (googleapis#1304)

* chore(deps): update dependency com.google.cloud:libraries-bom to v26

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* feat: Copy preview Change Streams API to java client

Co-authored-by: WhiteSource Renovate <[email protected]>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and Teng Zhong committed Sep 2, 2022
1 parent b3cc7f0 commit d02f0e5
Show file tree
Hide file tree
Showing 22 changed files with 16,940 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -73,6 +77,18 @@ public UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallabl
throw new UnsupportedOperationException("Not implemented: readModifyWriteRowCallable()");
}

public ServerStreamingCallable<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsCallable() {
throw new UnsupportedOperationException(
"Not implemented: listChangeStreamPartitionsCallable()");
}

public ServerStreamingCallable<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamCallable() {
throw new UnsupportedOperationException("Not implemented: readChangeStreamCallable()");
}

@Override
public abstract void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -83,6 +87,11 @@ public class BigtableStubSettings extends StubSettings<BigtableStubSettings> {
private final UnaryCallSettings<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmSettings;
private final UnaryCallSettings<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>
readModifyWriteRowSettings;
private final ServerStreamingCallSettings<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings;

/** Returns the object with the settings used for calls to readRows. */
public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSettings() {
Expand Down Expand Up @@ -122,6 +131,19 @@ public UnaryCallSettings<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmSet
return readModifyWriteRowSettings;
}

/** Returns the object with the settings used for calls to listChangeStreamPartitions. */
public ServerStreamingCallSettings<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings() {
return listChangeStreamPartitionsSettings;
}

/** Returns the object with the settings used for calls to readChangeStream. */
public ServerStreamingCallSettings<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings() {
return readChangeStreamSettings;
}

public BigtableStub createStub() throws IOException {
if (getTransportChannelProvider()
.getTransportName()
Expand Down Expand Up @@ -203,6 +225,9 @@ protected BigtableStubSettings(Builder settingsBuilder) throws IOException {
checkAndMutateRowSettings = settingsBuilder.checkAndMutateRowSettings().build();
pingAndWarmSettings = settingsBuilder.pingAndWarmSettings().build();
readModifyWriteRowSettings = settingsBuilder.readModifyWriteRowSettings().build();
listChangeStreamPartitionsSettings =
settingsBuilder.listChangeStreamPartitionsSettings().build();
readChangeStreamSettings = settingsBuilder.readChangeStreamSettings().build();
}

/** Builder for BigtableStubSettings. */
Expand All @@ -221,6 +246,12 @@ public static class Builder extends StubSettings.Builder<BigtableStubSettings, B
pingAndWarmSettings;
private final UnaryCallSettings.Builder<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>
readModifyWriteRowSettings;
private final ServerStreamingCallSettings.Builder<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings.Builder<
ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings;
private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;

Expand All @@ -241,6 +272,10 @@ public static class Builder extends StubSettings.Builder<BigtableStubSettings, B
definitions.put(
"no_retry_0_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
definitions.put("no_retry_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
definitions.put(
"no_retry_5_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
definitions.put(
"no_retry_6_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
RETRYABLE_CODE_DEFINITIONS = definitions.build();
}

Expand Down Expand Up @@ -294,6 +329,22 @@ public static class Builder extends StubSettings.Builder<BigtableStubSettings, B
definitions.put("no_retry_0_params", settings);
settings = RetrySettings.newBuilder().setRpcTimeoutMultiplier(1.0).build();
definitions.put("no_retry_params", settings);
settings =
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMillis(60000L))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofMillis(60000L))
.setTotalTimeout(Duration.ofMillis(60000L))
.build();
definitions.put("no_retry_5_params", settings);
settings =
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMillis(43200000L))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofMillis(43200000L))
.setTotalTimeout(Duration.ofMillis(43200000L))
.build();
definitions.put("no_retry_6_params", settings);
RETRY_PARAM_DEFINITIONS = definitions.build();
}

Expand All @@ -311,6 +362,8 @@ protected Builder(ClientContext clientContext) {
checkAndMutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
pingAndWarmSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
readModifyWriteRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
listChangeStreamPartitionsSettings = ServerStreamingCallSettings.newBuilder();
readChangeStreamSettings = ServerStreamingCallSettings.newBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
Expand All @@ -331,6 +384,8 @@ protected Builder(BigtableStubSettings settings) {
checkAndMutateRowSettings = settings.checkAndMutateRowSettings.toBuilder();
pingAndWarmSettings = settings.pingAndWarmSettings.toBuilder();
readModifyWriteRowSettings = settings.readModifyWriteRowSettings.toBuilder();
listChangeStreamPartitionsSettings = settings.listChangeStreamPartitionsSettings.toBuilder();
readChangeStreamSettings = settings.readChangeStreamSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
Expand Down Expand Up @@ -389,6 +444,16 @@ private static Builder initDefaults(Builder builder) {
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_0_params"));

builder
.listChangeStreamPartitionsSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_5_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_5_params"));

builder
.readChangeStreamSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_6_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_6_params"));

return builder;
}

Expand Down Expand Up @@ -448,6 +513,19 @@ public UnaryCallSettings.Builder<MutateRowRequest, MutateRowResponse> mutateRowS
return readModifyWriteRowSettings;
}

/** Returns the builder for the settings used for calls to listChangeStreamPartitions. */
public ServerStreamingCallSettings.Builder<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings() {
return listChangeStreamPartitionsSettings;
}

/** Returns the builder for the settings used for calls to readChangeStream. */
public ServerStreamingCallSettings.Builder<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings() {
return readChangeStreamSettings;
}

@Override
public BigtableStubSettings build() throws IOException {
return new BigtableStubSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@
import com.google.api.pathtemplate.PathTemplate;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.common.collect.ImmutableMap;
import com.google.longrunning.stub.GrpcOperationsStub;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
Expand Down Expand Up @@ -122,6 +127,30 @@ public class GrpcBigtableStub extends BigtableStub {
ProtoUtils.marshaller(ReadModifyWriteRowResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsMethodDescriptor =
MethodDescriptor
.<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>newBuilder()
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName("google.bigtable.v2.Bigtable/ListChangeStreamPartitions")
.setRequestMarshaller(
ProtoUtils.marshaller(ListChangeStreamPartitionsRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ListChangeStreamPartitionsResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamMethodDescriptor =
MethodDescriptor.<ReadChangeStreamRequest, ReadChangeStreamResponse>newBuilder()
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName("google.bigtable.v2.Bigtable/ReadChangeStream")
.setRequestMarshaller(
ProtoUtils.marshaller(ReadChangeStreamRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ReadChangeStreamResponse.getDefaultInstance()))
.build();

private final ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> readRowsCallable;
private final ServerStreamingCallable<SampleRowKeysRequest, SampleRowKeysResponse>
sampleRowKeysCallable;
Expand All @@ -132,6 +161,11 @@ public class GrpcBigtableStub extends BigtableStub {
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
private final UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>
readModifyWriteRowCallable;
private final ServerStreamingCallable<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsCallable;
private final ServerStreamingCallable<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamCallable;

private final BackgroundResource backgroundResources;
private final GrpcOperationsStub operationsStub;
Expand Down Expand Up @@ -298,6 +332,29 @@ protected GrpcBigtableStub(
return builder.build();
})
.build();
GrpcCallSettings<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsTransportSettings =
GrpcCallSettings
.<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>newBuilder()
.setMethodDescriptor(listChangeStreamPartitionsMethodDescriptor)
.setParamsExtractor(
request -> {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("table_name", String.valueOf(request.getTableName()));
return params.build();
})
.build();
GrpcCallSettings<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamTransportSettings =
GrpcCallSettings.<ReadChangeStreamRequest, ReadChangeStreamResponse>newBuilder()
.setMethodDescriptor(readChangeStreamMethodDescriptor)
.setParamsExtractor(
request -> {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("table_name", String.valueOf(request.getTableName()));
return params.build();
})
.build();

this.readRowsCallable =
callableFactory.createServerStreamingCallable(
Expand All @@ -324,6 +381,14 @@ protected GrpcBigtableStub(
readModifyWriteRowTransportSettings,
settings.readModifyWriteRowSettings(),
clientContext);
this.listChangeStreamPartitionsCallable =
callableFactory.createServerStreamingCallable(
listChangeStreamPartitionsTransportSettings,
settings.listChangeStreamPartitionsSettings(),
clientContext);
this.readChangeStreamCallable =
callableFactory.createServerStreamingCallable(
readChangeStreamTransportSettings, settings.readChangeStreamSettings(), clientContext);

this.backgroundResources =
new BackgroundResourceAggregation(clientContext.getBackgroundResources());
Expand Down Expand Up @@ -371,6 +436,19 @@ public UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallabl
return readModifyWriteRowCallable;
}

@Override
public ServerStreamingCallable<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsCallable() {
return listChangeStreamPartitionsCallable;
}

@Override
public ServerStreamingCallable<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamCallable() {
return readChangeStreamCallable;
}

@Override
public final void close() {
try {
Expand Down
Loading

0 comments on commit d02f0e5

Please sign in to comment.