Skip to content

Commit

Permalink
feat: Copy preview Change Streams API to java client
Browse files Browse the repository at this point in the history
  • Loading branch information
tonytanger committed Jul 12, 2022
1 parent 88bf067 commit f706695
Show file tree
Hide file tree
Showing 25 changed files with 16,943 additions and 139 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.0.0</version>
<version>25.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -73,6 +77,18 @@ public UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallabl
throw new UnsupportedOperationException("Not implemented: readModifyWriteRowCallable()");
}

public ServerStreamingCallable<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsCallable() {
throw new UnsupportedOperationException(
"Not implemented: listChangeStreamPartitionsCallable()");
}

public ServerStreamingCallable<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamCallable() {
throw new UnsupportedOperationException("Not implemented: readChangeStreamCallable()");
}

@Override
public abstract void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -83,6 +87,11 @@ public class BigtableStubSettings extends StubSettings<BigtableStubSettings> {
private final UnaryCallSettings<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmSettings;
private final UnaryCallSettings<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>
readModifyWriteRowSettings;
private final ServerStreamingCallSettings<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings;

/** Returns the object with the settings used for calls to readRows. */
public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSettings() {
Expand Down Expand Up @@ -122,6 +131,19 @@ public UnaryCallSettings<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmSet
return readModifyWriteRowSettings;
}

/** Returns the object with the settings used for calls to listChangeStreamPartitions. */
public ServerStreamingCallSettings<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings() {
return listChangeStreamPartitionsSettings;
}

/** Returns the object with the settings used for calls to readChangeStream. */
public ServerStreamingCallSettings<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings() {
return readChangeStreamSettings;
}

public BigtableStub createStub() throws IOException {
if (getTransportChannelProvider()
.getTransportName()
Expand Down Expand Up @@ -203,6 +225,9 @@ protected BigtableStubSettings(Builder settingsBuilder) throws IOException {
checkAndMutateRowSettings = settingsBuilder.checkAndMutateRowSettings().build();
pingAndWarmSettings = settingsBuilder.pingAndWarmSettings().build();
readModifyWriteRowSettings = settingsBuilder.readModifyWriteRowSettings().build();
listChangeStreamPartitionsSettings =
settingsBuilder.listChangeStreamPartitionsSettings().build();
readChangeStreamSettings = settingsBuilder.readChangeStreamSettings().build();
}

/** Builder for BigtableStubSettings. */
Expand All @@ -221,6 +246,12 @@ public static class Builder extends StubSettings.Builder<BigtableStubSettings, B
pingAndWarmSettings;
private final UnaryCallSettings.Builder<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>
readModifyWriteRowSettings;
private final ServerStreamingCallSettings.Builder<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings.Builder<
ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings;
private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;

Expand All @@ -241,6 +272,10 @@ public static class Builder extends StubSettings.Builder<BigtableStubSettings, B
definitions.put(
"no_retry_0_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
definitions.put("no_retry_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
definitions.put(
"no_retry_5_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
definitions.put(
"no_retry_6_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
RETRYABLE_CODE_DEFINITIONS = definitions.build();
}

Expand Down Expand Up @@ -294,6 +329,22 @@ public static class Builder extends StubSettings.Builder<BigtableStubSettings, B
definitions.put("no_retry_0_params", settings);
settings = RetrySettings.newBuilder().setRpcTimeoutMultiplier(1.0).build();
definitions.put("no_retry_params", settings);
settings =
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMillis(60000L))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofMillis(60000L))
.setTotalTimeout(Duration.ofMillis(60000L))
.build();
definitions.put("no_retry_5_params", settings);
settings =
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMillis(43200000L))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofMillis(43200000L))
.setTotalTimeout(Duration.ofMillis(43200000L))
.build();
definitions.put("no_retry_6_params", settings);
RETRY_PARAM_DEFINITIONS = definitions.build();
}

Expand All @@ -311,6 +362,8 @@ protected Builder(ClientContext clientContext) {
checkAndMutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
pingAndWarmSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
readModifyWriteRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
listChangeStreamPartitionsSettings = ServerStreamingCallSettings.newBuilder();
readChangeStreamSettings = ServerStreamingCallSettings.newBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
Expand All @@ -331,6 +384,8 @@ protected Builder(BigtableStubSettings settings) {
checkAndMutateRowSettings = settings.checkAndMutateRowSettings.toBuilder();
pingAndWarmSettings = settings.pingAndWarmSettings.toBuilder();
readModifyWriteRowSettings = settings.readModifyWriteRowSettings.toBuilder();
listChangeStreamPartitionsSettings = settings.listChangeStreamPartitionsSettings.toBuilder();
readChangeStreamSettings = settings.readChangeStreamSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
Expand Down Expand Up @@ -389,6 +444,16 @@ private static Builder initDefaults(Builder builder) {
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_0_params"));

builder
.listChangeStreamPartitionsSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_5_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_5_params"));

builder
.readChangeStreamSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_6_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_6_params"));

return builder;
}

Expand Down Expand Up @@ -448,6 +513,19 @@ public UnaryCallSettings.Builder<MutateRowRequest, MutateRowResponse> mutateRowS
return readModifyWriteRowSettings;
}

/** Returns the builder for the settings used for calls to listChangeStreamPartitions. */
public ServerStreamingCallSettings.Builder<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsSettings() {
return listChangeStreamPartitionsSettings;
}

/** Returns the builder for the settings used for calls to readChangeStream. */
public ServerStreamingCallSettings.Builder<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamSettings() {
return readChangeStreamSettings;
}

@Override
public BigtableStubSettings build() throws IOException {
return new BigtableStubSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@
import com.google.api.pathtemplate.PathTemplate;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.common.collect.ImmutableMap;
import com.google.longrunning.stub.GrpcOperationsStub;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
Expand Down Expand Up @@ -122,6 +127,30 @@ public class GrpcBigtableStub extends BigtableStub {
ProtoUtils.marshaller(ReadModifyWriteRowResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsMethodDescriptor =
MethodDescriptor
.<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>newBuilder()
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName("google.bigtable.v2.Bigtable/ListChangeStreamPartitions")
.setRequestMarshaller(
ProtoUtils.marshaller(ListChangeStreamPartitionsRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ListChangeStreamPartitionsResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamMethodDescriptor =
MethodDescriptor.<ReadChangeStreamRequest, ReadChangeStreamResponse>newBuilder()
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName("google.bigtable.v2.Bigtable/ReadChangeStream")
.setRequestMarshaller(
ProtoUtils.marshaller(ReadChangeStreamRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ReadChangeStreamResponse.getDefaultInstance()))
.build();

private final ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> readRowsCallable;
private final ServerStreamingCallable<SampleRowKeysRequest, SampleRowKeysResponse>
sampleRowKeysCallable;
Expand All @@ -132,6 +161,11 @@ public class GrpcBigtableStub extends BigtableStub {
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
private final UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>
readModifyWriteRowCallable;
private final ServerStreamingCallable<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsCallable;
private final ServerStreamingCallable<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamCallable;

private final BackgroundResource backgroundResources;
private final GrpcOperationsStub operationsStub;
Expand Down Expand Up @@ -298,6 +332,29 @@ protected GrpcBigtableStub(
return builder.build();
})
.build();
GrpcCallSettings<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsTransportSettings =
GrpcCallSettings
.<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>newBuilder()
.setMethodDescriptor(listChangeStreamPartitionsMethodDescriptor)
.setParamsExtractor(
request -> {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("table_name", String.valueOf(request.getTableName()));
return params.build();
})
.build();
GrpcCallSettings<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamTransportSettings =
GrpcCallSettings.<ReadChangeStreamRequest, ReadChangeStreamResponse>newBuilder()
.setMethodDescriptor(readChangeStreamMethodDescriptor)
.setParamsExtractor(
request -> {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("table_name", String.valueOf(request.getTableName()));
return params.build();
})
.build();

this.readRowsCallable =
callableFactory.createServerStreamingCallable(
Expand All @@ -324,6 +381,14 @@ protected GrpcBigtableStub(
readModifyWriteRowTransportSettings,
settings.readModifyWriteRowSettings(),
clientContext);
this.listChangeStreamPartitionsCallable =
callableFactory.createServerStreamingCallable(
listChangeStreamPartitionsTransportSettings,
settings.listChangeStreamPartitionsSettings(),
clientContext);
this.readChangeStreamCallable =
callableFactory.createServerStreamingCallable(
readChangeStreamTransportSettings, settings.readChangeStreamSettings(), clientContext);

this.backgroundResources =
new BackgroundResourceAggregation(clientContext.getBackgroundResources());
Expand Down Expand Up @@ -371,6 +436,19 @@ public UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallabl
return readModifyWriteRowCallable;
}

@Override
public ServerStreamingCallable<
ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
listChangeStreamPartitionsCallable() {
return listChangeStreamPartitionsCallable;
}

@Override
public ServerStreamingCallable<ReadChangeStreamRequest, ReadChangeStreamResponse>
readChangeStreamCallable() {
return readChangeStreamCallable;
}

@Override
public final void close() {
try {
Expand Down
Loading

0 comments on commit f706695

Please sign in to comment.