diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index ff11df91169..189c8e1d819 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -128,6 +128,30 @@ class SessionPool { ErrorCode.UNIMPLEMENTED, ErrorCode.INTERNAL); + /** + * If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits + * for the creation of at least {@link SessionPoolOptions#getMinSessions()} in the pool using the + * given duration. If the waiting times out, a {@link SpannerException} with the {@link + * ErrorCode#DEADLINE_EXCEEDED} is thrown. + */ + void maybeWaitOnMinSessions() { + final long timeoutNanos = options.getWaitForMinSessions().toNanos(); + if (timeoutNanos <= 0) { + return; + } + + try { + if (!waitOnMinSessionsLatch.await(timeoutNanos, TimeUnit.NANOSECONDS)) { + final long timeoutMillis = options.getWaitForMinSessions().toMillis(); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.DEADLINE_EXCEEDED, + "Timed out after waiting " + timeoutMillis + "ms for session pool creation"); + } + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } + } + /** * Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8 * Clock. @@ -1855,6 +1879,8 @@ private enum Position { @VisibleForTesting Function idleSessionRemovedListener; + private final CountDownLatch waitOnMinSessionsLatch; + /** * Create a session pool with the given options and for the given database. It will also start * eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0. @@ -1934,6 +1960,8 @@ private SessionPool( this.clock = clock; this.poolMaintainer = new PoolMaintainer(); this.initMetricsCollection(metricRegistry, labelValues); + this.waitOnMinSessionsLatch = + options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0); } /** @@ -2399,6 +2427,7 @@ public void onSessionReady(SessionImpl session) { PooledSession pooledSession = null; boolean closeSession = false; synchronized (lock) { + int minSessions = options.getMinSessions(); pooledSession = new PooledSession(session); numSessionsBeingCreated--; if (closureFuture != null) { @@ -2406,6 +2435,9 @@ public void onSessionReady(SessionImpl session) { } else { Preconditions.checkState(totalSessions() <= options.getMaxSessions() - 1); allSessions.add(pooledSession); + if (allSessions.size() >= minSessions) { + waitOnMinSessionsLatch.countDown(); + } if (options.isAutoDetectDialect() && !detectDialectStarted) { // Get the dialect of the underlying database if that has not yet been done. Note that // this method will release the session into the pool once it is done. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 408b3d73965..ad6ad2e73d7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -51,6 +51,7 @@ public class SessionPoolOptions { private final ActionOnSessionLeak actionOnSessionLeak; private final long initialWaitForSessionTimeoutMillis; private final boolean autoDetectDialect; + private final Duration waitForMinSessions; private SessionPoolOptions(Builder builder) { // minSessions > maxSessions is only possible if the user has only set a value for maxSessions. @@ -69,6 +70,7 @@ private SessionPoolOptions(Builder builder) { this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes; this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter; this.autoDetectDialect = builder.autoDetectDialect; + this.waitForMinSessions = builder.waitForMinSessions; } @Override @@ -90,7 +92,8 @@ public boolean equals(Object o) { && Objects.equals(this.loopFrequency, other.loopFrequency) && Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes) && Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter) - && Objects.equals(this.autoDetectDialect, other.autoDetectDialect); + && Objects.equals(this.autoDetectDialect, other.autoDetectDialect) + && Objects.equals(this.waitForMinSessions, other.waitForMinSessions); } @Override @@ -108,7 +111,8 @@ public int hashCode() { this.loopFrequency, this.keepAliveIntervalMinutes, this.removeInactiveSessionAfter, - this.autoDetectDialect); + this.autoDetectDialect, + this.waitForMinSessions); } public Builder toBuilder() { @@ -186,6 +190,11 @@ boolean isFailOnSessionLeak() { return actionOnSessionLeak == ActionOnSessionLeak.FAIL; } + @VisibleForTesting + Duration getWaitForMinSessions() { + return waitForMinSessions; + } + public static Builder newBuilder() { return new Builder(); } @@ -229,6 +238,7 @@ public static class Builder { private int keepAliveIntervalMinutes = 30; private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L); private boolean autoDetectDialect = false; + private Duration waitForMinSessions = Duration.ZERO; public Builder() {} @@ -247,6 +257,7 @@ private Builder(SessionPoolOptions options) { this.keepAliveIntervalMinutes = options.keepAliveIntervalMinutes; this.removeInactiveSessionAfter = options.removeInactiveSessionAfter; this.autoDetectDialect = options.autoDetectDialect; + this.waitForMinSessions = options.waitForMinSessions; } /** @@ -394,6 +405,21 @@ public Builder setWriteSessionsFraction(float writeSessionsFraction) { return this; } + /** + * If greater than zero, waits for the session pool to have at least {@link + * SessionPoolOptions#minSessions} before returning the database client to the caller. Note that + * this check is only done during the session pool creation. This is usually done asynchronously + * in order to provide the client back to the caller as soon as possible. We don't recommend + * using this option unless you are executing benchmarks and want to guarantee the session pool + * has min sessions in the pool before continuing. + * + *

Defaults to zero (initialization is done asynchronously). + */ + public Builder setWaitForMinSessions(Duration waitForMinSessions) { + this.waitForMinSessions = waitForMinSessions; + return this; + } + /** Build a SessionPoolOption object */ public SessionPoolOptions build() { validate(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index b8255f1d65e..721be9cd762 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -221,6 +221,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { SessionPool pool = SessionPool.createPool( getOptions(), SpannerImpl.this.getSessionClient(db), labelValues); + pool.maybeWaitOnMinSessions(); DatabaseClientImpl dbClient = createDatabaseClient(clientId, pool); dbClients.put(db, dbClient); return dbClient; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 147f66220b3..4e5b1e0395c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -94,6 +94,7 @@ import org.junit.runners.Parameterized.Parameters; import org.mockito.Mock; import org.mockito.Mockito; +import org.threeten.bp.Duration; /** Tests for SessionPool that mock out the underlying stub. */ @RunWith(Parameterized.class) @@ -1188,6 +1189,47 @@ public void testGetDatabaseRole() throws Exception { assertEquals(TEST_DATABASE_ROLE, pool.getDatabaseRole()); } + @Test + public void testWaitOnMinSessionsWhenSessionsAreCreatedBeforeTimeout() { + doAnswer( + invocation -> + executor.submit( + () -> { + SessionConsumerImpl consumer = + invocation.getArgument(2, SessionConsumerImpl.class); + consumer.onSessionReady(mockSession()); + })) + .when(sessionClient) + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); + + options = + SessionPoolOptions.newBuilder() + .setMinSessions(minSessions) + .setMaxSessions(minSessions + 1) + .setWaitForMinSessions(Duration.ofSeconds(5)) + .build(); + pool = createPool(new FakeClock(), new FakeMetricRegistry(), SPANNER_DEFAULT_LABEL_VALUES); + pool.maybeWaitOnMinSessions(); + assertTrue(pool.getNumberOfSessionsInPool() >= minSessions); + } + + @Test(expected = SpannerException.class) + public void testWaitOnMinSessionsThrowsExceptionWhenTimeoutIsReached() { + // Does not call onSessionReady, so session pool is never populated + doAnswer(invocation -> null) + .when(sessionClient) + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); + + options = + SessionPoolOptions.newBuilder() + .setMinSessions(minSessions + 1) + .setMaxSessions(minSessions + 1) + .setWaitForMinSessions(Duration.ofMillis(100)) + .build(); + pool = createPool(new FakeClock(), new FakeMetricRegistry(), SPANNER_DEFAULT_LABEL_VALUES); + pool.maybeWaitOnMinSessions(); + } + private void mockKeepAlive(Session session) { ReadContext context = mock(ReadContext.class); ResultSet resultSet = mock(ResultSet.class);