Skip to content

Commit

Permalink
Spanner: Re-create sessions that have been invalidated by the server (#…
Browse files Browse the repository at this point in the history
…4734)

retry on invalidated session

When a SessionNotFoundException is thrown, a new session is now picked from the pool, instead of throwing a SessionNotFoundException.
  • Loading branch information
olavloite authored May 13, 2019
1 parent d8a3ff8 commit 7d47fce
Show file tree
Hide file tree
Showing 18 changed files with 2,743 additions and 153 deletions.
1 change: 1 addition & 0 deletions google-cloud-clients/google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
</systemPropertyVariables>
<groups>com.google.cloud.spanner.IntegrationTest</groups>
<excludedGroups>com.google.cloud.spanner.FlakyTest</excludedGroups>
<forkedProcessTimeoutInSeconds>2400</forkedProcessTimeoutInSeconds>
</configuration>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BatchClientImpl implements BatchClient {

@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session = (SessionImpl) spanner.createSession(db);
SessionImpl session = spanner.createSession(db);
return new BatchReadOnlyTransactionImpl(spanner, session, checkNotNull(bound));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
Expand All @@ -33,17 +36,33 @@ class DatabaseClientImpl implements DatabaseClient {
TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION, PARTITION_DML_TRANSACTION);
}

private final SessionPool pool;
@VisibleForTesting final SessionPool pool;

DatabaseClientImpl(SessionPool pool) {
this.pool = pool;
}

@VisibleForTesting
PooledSession getReadSession() {
return pool.getReadSession();
}

@VisibleForTesting
PooledSession getReadWriteSession() {
return pool.getReadWriteSession();
}

@Override
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().write(mutations);
return runWithSessionRetry(
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
return session.write(mutations);
}
});
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -53,10 +72,16 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().writeAtLeastOnce(mutations);
return runWithSessionRetry(
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
return session.writeAtLeastOnce(mutations);
}
});
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -69,7 +94,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUse();
return getReadSession().singleUse();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -80,7 +105,7 @@ public ReadContext singleUse() {
public ReadContext singleUse(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUse(bound);
return getReadSession().singleUse(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -91,7 +116,7 @@ public ReadContext singleUse(TimestampBound bound) {
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUseReadOnlyTransaction();
return getReadSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -102,7 +127,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUseReadOnlyTransaction(bound);
return getReadSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -113,7 +138,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
public ReadOnlyTransaction readOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().readOnlyTransaction();
return getReadSession().readOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -124,7 +149,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().readOnlyTransaction(bound);
return getReadSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -135,7 +160,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
public TransactionRunner readWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().readWriteTransaction();
return getReadWriteSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -146,24 +171,41 @@ public TransactionRunner readWriteTransaction() {
public TransactionManager transactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().transactionManager();
return getReadWriteSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public long executePartitionedUpdate(Statement stmt) {
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().executePartitionedUpdate(stmt);
return runWithSessionRetry(
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
return session.executePartitionedUpdate(stmt);
}
});
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private <T> T runWithSessionRetry(Function<Session, T> callable) {
PooledSession session = getReadWriteSession();
while (true) {
try {
return callable.apply(session);
} catch (SessionNotFoundException e) {
session = pool.replaceReadWriteSession(e, session);
}
}
}

ListenableFuture<Void> closeAsync() {
return pool.closeAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
public class ForwardingResultSet extends ForwardingStructReader implements ResultSet {

private final ResultSet delegate;
private ResultSet delegate;

public ForwardingResultSet(ResultSet delegate) {
super(delegate);
this.delegate = Preconditions.checkNotNull(delegate);
}

/**
* Replaces the underlying {@link ResultSet}. It is the responsibility of the caller to ensure
* that the new delegate has the same properties and is in the same state as the original
* delegate. This method can be used if the underlying delegate needs to be replaced after a
* session or transaction needed to be restarted after the {@link ResultSet} had already been
* returned to the user.
*/
void replaceDelegate(ResultSet newDelegate) {
Preconditions.checkNotNull(newDelegate);
super.replaceDelegate(newDelegate);
this.delegate = newDelegate;
}

@Override
public boolean next() throws SpannerException {
return delegate.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@
/** Forwarding implements of StructReader */
public class ForwardingStructReader implements StructReader {

private final StructReader delegate;
private StructReader delegate;

public ForwardingStructReader(StructReader delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}

/**
* Replaces the underlying {@link StructReader}. It is the responsibility of the caller to ensure
* that the new delegate has the same properties and is in the same state as the original
* delegate. This method can be used if the underlying delegate needs to be replaced after a
* session or transaction needed to be restarted after the {@link StructReader} had already been
* returned to the user.
*/
void replaceDelegate(StructReader newDelegate) {
this.delegate = Preconditions.checkNotNull(newDelegate);
}

@Override
public Type getType() {
return delegate.getType();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import javax.annotation.Nullable;

/**
* Exception thrown by Cloud Spanner when an operation detects that the session that is being used
* is no longer valid. This type of error has its own subclass as it is a condition that should
* normally be hidden from the user, and the client library should try to fix this internally.
*/
public class SessionNotFoundException extends SpannerException {
private static final long serialVersionUID = -6395746612598975751L;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
SessionNotFoundException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
super(token, ErrorCode.NOT_FOUND, false, message, cause);
}
}
Loading

0 comments on commit 7d47fce

Please sign in to comment.