Skip to content

Commit

Permalink
fix: use copy commit timeout for all RPCs (#427)
Browse files Browse the repository at this point in the history
Apply the spanner.copy_commit_timeout value for all RPCs that are
executed as part of a commit during a COPY operation. This means
that the timeout will also be applied to BeginTransaction and
BatchCreateSessions if any of those are also needed during the
commit operation. The timeout should also be applied to those, as
those RPCs could also respond slowly if the COPY operation is
causing the server to be overloaded.
  • Loading branch information
olavloite authored Oct 25, 2022
1 parent 39763e1 commit 0381a0c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
Expand All @@ -26,7 +28,7 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.connection.Connection;
Expand All @@ -44,6 +46,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -367,9 +370,19 @@ private ApiFuture<Void> writeToSpannerAsync(
Context.current()
.withValue(
SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY,
SpannerCallContextTimeoutConfigurator.create()
.withCommitTimeout(
Duration.ofSeconds(copySettings.getCommitTimeoutSeconds())));
new CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context,
ReqT request,
MethodDescriptor<ReqT, RespT> method) {
// Use the same timeout for all RPCs that are executed for the commit.
// This could also include BeginTransaction and BatchCreateSessions.
return GrpcCallContext.createDefault()
.withTimeout(
Duration.ofSeconds(copySettings.getCommitTimeoutSeconds()));
}
});
context.run(() -> dbClient.write(immutableMutations));
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ public void testCreateAllTypes() throws IOException, InterruptedException {
assertEquals(
"some random string", insertRequest.getParams().getFieldsMap().get("p9").getStringValue());
assertEquals(
"{\"key\":\"value\"}", insertRequest.getParams().getFieldsMap().get("p10").getStringValue());
"{\"key\":\"value\"}",
insertRequest.getParams().getFieldsMap().get("p10").getStringValue());
}

static String runTest(String testName, int port) throws IOException, InterruptedException {
Expand Down

0 comments on commit 0381a0c

Please sign in to comment.