Skip to content

Commit

Permalink
perf: close sessions async revert revert (#46)
Browse files Browse the repository at this point in the history
* Revert "Revert "perf: close sessions async (#24)" (#43)"

This reverts commit 809ed88.

* Ignore compatibility check failure in internal interfaces.

asyncClose() was added to com.google.cloud.spanner.Session and
asyncDeleteSession() was added to
com.google.cloud.spanner.spi.v1.SpannerRpc in #24 which resulted in
binary compatibility test failures. This config allows us to ignore the
failure.

* Annotate SpannerRpc and Session classes as @internalapi.

Users shouldn't be implementing these interfaces as they're internal to
the client library implementation.
  • Loading branch information
skuruppu authored and olavloite committed Jan 24, 2020
1 parent b619fed commit c9864e5
Show file tree
Hide file tree
Showing 18 changed files with 129 additions and 49 deletions.
14 changes: 14 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/Session</className>
<method>* asyncClose()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* asyncDeleteSession(*)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.protobuf.Empty;

/**
* A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud
* Spanner database.
Expand All @@ -39,6 +43,7 @@
* require external synchronization; {@code Session} implementations are not required to be
* thread-safe.
*/
@InternalApi
public interface Session extends DatabaseClient, AutoCloseable {
/** Returns the resource name associated with this session. */
String getName();
Expand All @@ -54,4 +59,10 @@ public interface Session extends DatabaseClient, AutoCloseable {

@Override
void close();

/**
* Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor
* the operation progress.
*/
ApiFuture<Empty> asyncClose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
Expand All @@ -27,6 +28,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -196,6 +198,11 @@ public void prepareReadWriteTransaction() {
readyTransactionId = beginTransaction();
}

@Override
public ApiFuture<Empty> asyncClose() {
return spanner.getRpc().asyncDeleteSession(name, options);
}

@Override
public void close() {
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand All @@ -35,6 +37,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import io.opencensus.common.Scope;
import io.opencensus.trace.Annotation;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -763,6 +766,12 @@ public TransactionRunner readWriteTransaction() {
return new SessionPoolTransactionRunner(SessionPool.this, this);
}

@Override
public ApiFuture<Empty> asyncClose() {
close();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}

@Override
public void close() {
synchronized (lock) {
Expand Down Expand Up @@ -998,7 +1007,7 @@ private void closeIdleSessions(Instant currTime) {
}
for (PooledSession sess : sessionsToClose) {
logger.log(Level.FINE, "Closing session {0}", sess.getName());
closeSession(sess);
closeSessionAsync(sess);
}
}

Expand Down Expand Up @@ -1611,37 +1620,27 @@ int totalSessions() {
}
}

private void closeSessionAsync(final PooledSession sess) {
executor.submit(
private ApiFuture<Empty> closeSessionAsync(final PooledSession sess) {
ApiFuture<Empty> res = sess.delegate.asyncClose();
res.addListener(
new Runnable() {
@Override
public void run() {
closeSession(sess);
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
});
}

private void closeSession(PooledSession sess) {
try {
sess.delegate.close();
} catch (SpannerException e) {
// Backend will delete these sessions after a while even if we fail to close them.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e);
}
} finally {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
},
executor);
return res;
}

private void prepareSession(final PooledSession sess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -523,9 +524,14 @@ public Session createSession(
@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
get(asyncDeleteSession(sessionName, options));
}

@Override
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = newCallContext(options, sessionName);
get(spannerStub.deleteSessionCallable().futureCall(request, context));
return spannerStub.deleteSessionCallable().futureCall(request, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -66,6 +68,7 @@
* is purely for expedience; a future version of this interface is likely to be independent of
* transport to allow switching between gRPC and HTTP.
*/
@InternalApi
public interface SpannerRpc extends ServiceRpc {
/** Options passed in {@link SpannerRpc} methods to control how an RPC is issued. */
enum Option {
Expand Down Expand Up @@ -219,6 +222,9 @@ Session createSession(

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.protobuf.Empty;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -61,6 +63,7 @@ SessionImpl mockSession() {
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
sessionIndex++;
return session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
spannerClient.close();
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -55,13 +56,19 @@ public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
String uniqueName = InProcessServerBuilder.generateName();
server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
server =
InProcessServerBuilder.forName(uniqueName)
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -152,14 +155,15 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable {
});
when(mockResult.next()).thenReturn(true);
doAnswer(
new Answer<Void>() {
new Answer<ApiFuture<Empty>>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
synchronized (lock) {
if (expiredSessions.contains(session.getName())) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found");
return ApiFutures.immediateFailedFuture(
SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found"));
}
if (sessions.remove(session.getName()) == null) {
setFailed(closedSessions.get(session.getName()));
Expand All @@ -169,11 +173,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
minSessionsWhenSessionClosed = sessions.size();
}
}
return null;
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
})
.when(session)
.close();
.asyncClose();

doAnswer(
new Answer<Void>() {
Expand Down
Loading

0 comments on commit c9864e5

Please sign in to comment.