Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner Gapic Migration: fix updateDatabaseDdl #3403

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,6 @@ public OperationFuture<Void, UpdateDatabaseDdlMetadata> updateDatabaseDdl(
throws SpannerException {
final String dbName = getDatabaseName(instanceId, databaseId);
final String opId = operationId != null ? operationId : randomOperationId();
// TODO(hzyi)
// Spanner checks the exception and if the error code is ALREADY_EXISTS
// it creates a new Operation instead of throwing the exception. This
// feature is not implemented in this PR but will come later
OperationFuture<Empty, UpdateDatabaseDdlMetadata> rawOperationFuture =
rpc.updateDatabaseDdl(dbName, statements, opId);
return new OperationFutureImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.common.base.Preconditions;
import com.google.api.core.ApiFunction;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.GaxProperties;
Expand All @@ -27,15 +26,15 @@
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.OperationCallable;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.ServiceOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
Expand All @@ -53,6 +52,7 @@
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
Expand Down Expand Up @@ -108,12 +108,13 @@ public class GapicSpannerRpc implements SpannerRpc {

private static final PathTemplate PROJECT_NAME_TEMPLATE =
PathTemplate.create("projects/{project}");
private static final PathTemplate OPERATION_NAME_TEMPLATE =
PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}");
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;

// TODO(hzyi): change the stub names to be more intuitive
private final SpannerStub stub;
private final InstanceAdminStub instanceStub;
private final DatabaseAdminStub databaseStub;

private final SpannerStub spannerStub;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStub databaseAdminStub;
private final String projectId;
private final String projectName;
private final SpannerMetadataProvider metadataProvider;
Expand All @@ -126,9 +127,6 @@ public GapicSpannerRpc(SpannerOptions options) {
this.projectId = options.getProjectId();
this.projectName = PROJECT_NAME_TEMPLATE.instantiate("project", this.projectId);

// TODO(hzyi): inject userAgent to headerProvider so that it
// can be picked up by ChannelProvider

// create a metadataProvider which combines both internal headers and
// per-method-call extra headers for channelProvider to inject the headers
// for rpc calls
Expand Down Expand Up @@ -177,7 +175,7 @@ public GapicSpannerRpc(SpannerOptions options) {
try {
// TODO: bump the version of gax and remove this try-catch block
// applyToAllUnaryMethods does not throw exception in the latest version
this.stub =
this.spannerStub =
GrpcSpannerStub.create(
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
Expand All @@ -192,7 +190,7 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
})
.build());

this.instanceStub =
this.instanceAdminStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
Expand All @@ -206,7 +204,7 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
}
})
.build());
this.databaseStub =
this.databaseAdminStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
Expand Down Expand Up @@ -237,7 +235,7 @@ public Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable Str

GrpcCallContext context = newCallContext(null, projectName);
ListInstanceConfigsResponse response =
get(instanceStub.listInstanceConfigsCallable().futureCall(request, context));
get(instanceAdminStub.listInstanceConfigsCallable().futureCall(request, context));
return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken());
}

Expand All @@ -247,7 +245,7 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne
GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build();

GrpcCallContext context = newCallContext(null, projectName);
return get(instanceStub.getInstanceConfigCallable().futureCall(request, context));
return get(instanceAdminStub.getInstanceConfigCallable().futureCall(request, context));
}

@Override
Expand All @@ -265,7 +263,7 @@ public Paginated<Instance> listInstances(

GrpcCallContext context = newCallContext(null, projectName);
ListInstancesResponse response =
get(instanceStub.listInstancesCallable().futureCall(request, context));
get(instanceAdminStub.listInstancesCallable().futureCall(request, context));
return new Paginated<>(response.getInstancesList(), response.getNextPageToken());
}

Expand All @@ -279,7 +277,7 @@ public OperationFuture<Instance, CreateInstanceMetadata> createInstance(
.setInstance(instance)
.build();
GrpcCallContext context = newCallContext(null, parent);
return instanceStub.createInstanceOperationCallable().futureCall(request, context);
return instanceAdminStub.createInstanceOperationCallable().futureCall(request, context);
}

@Override
Expand All @@ -288,7 +286,7 @@ public OperationFuture<Instance, UpdateInstanceMetadata> updateInstance(
UpdateInstanceRequest request =
UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build();
GrpcCallContext context = newCallContext(null, instance.getName());
return instanceStub.updateInstanceOperationCallable().futureCall(request, context);
return instanceAdminStub.updateInstanceOperationCallable().futureCall(request, context);
}

@Override
Expand All @@ -297,7 +295,7 @@ public Instance getInstance(String instanceName) throws SpannerException {
GetInstanceRequest.newBuilder().setName(instanceName).build();

GrpcCallContext context = newCallContext(null, instanceName);
return get(instanceStub.getInstanceCallable().futureCall(request, context));
return get(instanceAdminStub.getInstanceCallable().futureCall(request, context));
}

@Override
Expand All @@ -306,7 +304,7 @@ public void deleteInstance(String instanceName) throws SpannerException {
DeleteInstanceRequest.newBuilder().setName(instanceName).build();

GrpcCallContext context = newCallContext(null, instanceName);
get(instanceStub.deleteInstanceCallable().futureCall(request, context));
get(instanceAdminStub.deleteInstanceCallable().futureCall(request, context));
}

@Override
Expand All @@ -320,7 +318,7 @@ public Paginated<Database> listDatabases(
ListDatabasesRequest request = requestBuilder.build();

GrpcCallContext context = newCallContext(null, instanceName);
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable()
ListDatabasesResponse response = get(databaseAdminStub.listDatabasesCallable()
.futureCall(request, context));
return new Paginated<>(response.getDatabasesList(), response.getNextPageToken());
}
Expand All @@ -335,7 +333,7 @@ public OperationFuture<Database, CreateDatabaseMetadata> createDatabase(
.addAllExtraStatements(additionalStatements)
.build();
GrpcCallContext context = newCallContext(null, instanceName);
return databaseStub.createDatabaseOperationCallable().futureCall(request, context);
return databaseAdminStub.createDatabaseOperationCallable().futureCall(request, context);
}

@Override
Expand All @@ -348,7 +346,21 @@ public OperationFuture<Empty, UpdateDatabaseDdlMetadata> updateDatabaseDdl(
.setOperationId(MoreObjects.firstNonNull(updateId, ""))
.build();
GrpcCallContext context = newCallContext(null, databaseName);
return databaseStub.updateDatabaseDdlOperationCallable().futureCall(request, context);
OperationCallable<UpdateDatabaseDdlRequest, Empty, UpdateDatabaseDdlMetadata> callable = databaseAdminStub.updateDatabaseDdlOperationCallable();
OperationFuture<Empty, UpdateDatabaseDdlMetadata> operationFuture = callable.futureCall(request, context);
try {
operationFuture.getInitialFuture().get();
} catch (InterruptedException e) {
throw SpannerExceptionFactory.newSpannerException(e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof AlreadyExistsException) {
String operationName =
OPERATION_NAME_TEMPLATE.instantiate("database", databaseName, "operation", updateId);
return callable.resumeFutureCall(operationName, context);
}
}
return operationFuture;
}

@Override
Expand All @@ -357,7 +369,7 @@ public void dropDatabase(String databaseName) throws SpannerException {
DropDatabaseRequest.newBuilder().setDatabase(databaseName).build();

GrpcCallContext context = newCallContext(null, databaseName);
get(databaseStub.dropDatabaseCallable().futureCall(request, context));
get(databaseAdminStub.dropDatabaseCallable().futureCall(request, context));
}

@Override
Expand All @@ -368,7 +380,7 @@ public Database getDatabase(String databaseName) throws SpannerException {
.build();

GrpcCallContext context = newCallContext(null, databaseName);
return get(databaseStub.getDatabaseCallable().futureCall(request, context));
return get(databaseAdminStub.getDatabaseCallable().futureCall(request, context));
}

@Override
Expand All @@ -377,15 +389,15 @@ public List<String> getDatabaseDdl(String databaseName) throws SpannerException
GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build();

GrpcCallContext context = newCallContext(null, databaseName);
return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context))
return get(databaseAdminStub.getDatabaseDdlCallable().futureCall(request, context))
.getStatementsList();
}

@Override
public Operation getOperation(String name) throws SpannerException {
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
GrpcCallContext context = newCallContext(null, name);
return get(databaseStub.getOperationsStub().getOperationCallable()
return get(databaseAdminStub.getOperationsStub().getOperationCallable()
.futureCall(request, context));
}

Expand All @@ -400,7 +412,7 @@ public Session createSession(String databaseName, @Nullable Map<String, String>
}
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context = newCallContext(options, databaseName);
return get(stub.createSessionCallable().futureCall(request, context));
return get(spannerStub.createSessionCallable().futureCall(request, context));
}

@Override
Expand All @@ -409,15 +421,15 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
DeleteSessionRequest request =
DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = newCallContext(options, sessionName);
get(stub.deleteSessionCallable().futureCall(request, context));
get(spannerStub.deleteSessionCallable().futureCall(request, context));
}

@Override
public StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
GrpcCallContext context = newCallContext(options, request.getSession());
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
stub.streamingReadCallable().call(request, responseObserver, context);
spannerStub.streamingReadCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
Expand All @@ -439,7 +451,7 @@ public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
GrpcCallContext context = newCallContext(options, request.getSession());
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
stub.executeStreamingSqlCallable().call(request, responseObserver, context);
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
Expand All @@ -460,35 +472,35 @@ public void cancel(String message) {
public Transaction beginTransaction(
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
GrpcCallContext context = newCallContext(options, request.getSession());
return get(stub.beginTransactionCallable().futureCall(request, context));
return get(spannerStub.beginTransactionCallable().futureCall(request, context));
}

@Override
public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> options)
throws SpannerException {
GrpcCallContext context = newCallContext(options, commitRequest.getSession());
return get(stub.commitCallable().futureCall(commitRequest, context));
return get(spannerStub.commitCallable().futureCall(commitRequest, context));
}

@Override
public void rollback(RollbackRequest request, @Nullable Map<Option, ?> options)
throws SpannerException {
GrpcCallContext context = newCallContext(options, request.getSession());
get(stub.rollbackCallable().futureCall(request, context));
get(spannerStub.rollbackCallable().futureCall(request, context));
}

@Override
public PartitionResponse partitionQuery(
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
GrpcCallContext context = newCallContext(options, request.getSession());
return get(stub.partitionQueryCallable().futureCall(request, context));
return get(spannerStub.partitionQueryCallable().futureCall(request, context));
}

@Override
public PartitionResponse partitionRead(
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
GrpcCallContext context = newCallContext(options, request.getSession());
return get(stub.partitionReadCallable().futureCall(request, context));
return get(spannerStub.partitionReadCallable().futureCall(request, context));
}

/** Gets the result of an async RPC call, handling any exceptions encountered. */
Expand Down Expand Up @@ -516,9 +528,9 @@ private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String
}

public void shutdown() {
this.stub.close();
this.instanceStub.close();
this.databaseStub.close();
this.spannerStub.close();
this.instanceAdminStub.close();
this.databaseAdminStub.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,25 @@ public void updateDatabaseDdl() throws Exception {
assertThat(op.getName()).isEqualTo(opName);
}

@Ignore("More work needs to be done")
@Test
// TODO(hzyi)
// Changing the surface to OperationFuture breaks updateDatabaseDdl in the case
// that there is already a longrunning operation running. Disabling this test for
// this PR and I will fix this in the next PR.
public void updateDatabaseDdlOpAlreadyExists() throws Exception {
String opName = DB_NAME + "/operations/myop";
String opId = "myop";
String originalOpName = DB_NAME + "/operations/originalop";
String originalOpId = "originalop";
List<String> ddl = ImmutableList.of();
when(rpc.updateDatabaseDdl(DB_NAME, ddl, opId))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, ""));
OperationFuture<Empty, UpdateDatabaseDdlMetadata> originalOp =
OperationFutureUtil.immediateOperationFuture(
originalOpName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance());

String newOpName = DB_NAME + "/operations/newop";
String newOpId = "newop";
OperationFuture<Empty, UpdateDatabaseDdlMetadata> newop =
OperationFutureUtil.immediateOperationFuture(
newOpName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance());

when(rpc.updateDatabaseDdl(DB_NAME, ddl, newOpId)).thenReturn(originalOp);
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, opId);
assertThat(op.getName()).isEqualTo(opName);
client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, newOpId);
assertThat(op.getName()).isEqualTo(originalOpName);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,7 @@ public void databaseOperations() throws Exception {
db = dbAdminClient.getDatabase(testHelper.getInstanceId().getInstance(), dbId);
}

@Ignore("More work needs to be done")
@Test
// TODO(hzyi)
// Changing the surface to OperationFuture breaks updateDatabaseDdl in the case
// that there is already a longrunning operation running. Disabling this test for
// this PR and I will fix this in the next PR.
public void updateDdlRetry() throws Exception {
String dbId = testHelper.getUniqueDatabaseId();
String instanceId = testHelper.getInstanceId().getInstance();
Expand All @@ -127,6 +122,8 @@ public void updateDdlRetry() throws Exception {
dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop");
OperationFuture<Void, UpdateDatabaseDdlMetadata> op2 =
dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop");
op1.get();
op2.get();
assertThat(op1.getMetadata().get()).isEqualTo(op2.getMetadata().get());
}

Expand Down