Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner: Re-create sessions that have been invalidated by the server #4734

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
41dc280
refactor SpannerImpl: move SessionImpl to separate file
olavloite Apr 3, 2019
c443e82
fixed merge conflicts
olavloite Apr 6, 2019
cfe198d
changed references after rebase on master
olavloite Apr 12, 2019
3f9a82b
re-create sessions that have been invalidated by the server
olavloite Mar 24, 2019
d4863e4
increase timeout for integration tests
olavloite Mar 24, 2019
b326110
increase of integration test timeout
olavloite Mar 24, 2019
4dab5c5
added test cases for sessionnotfound
olavloite Mar 25, 2019
97cb3cc
processed review comments
olavloite Mar 27, 2019
466dd56
added missing retry logic for direct operations on the session
olavloite Mar 27, 2019
c9e2030
changed to use instance variable for better readability
olavloite Mar 28, 2019
64bbd91
removed change of test env config
olavloite Mar 28, 2019
27ebed5
changed SessionNotFound handling to pick a new session from the pool
olavloite Apr 5, 2019
f151231
try to fix flaky test case
olavloite Apr 5, 2019
4976f55
request new session from the pool instead of creating a new session
olavloite Apr 10, 2019
f74cdb9
rebased and fixed merge conflicts
olavloite Apr 12, 2019
bcd474b
added tests for methods on DatabaseClient
olavloite Apr 12, 2019
8b98662
added project to builder to fix test case
olavloite Apr 12, 2019
f8d8075
cleanup and add documentation
olavloite Apr 16, 2019
6df4f2b
fixed formatting
olavloite Apr 16, 2019
d4717a0
replace a read session with a new read session and not read/write
olavloite Apr 22, 2019
054bbfc
moved fix for executePartitionedUpdate to separate PR
olavloite Apr 23, 2019
55ec5ec
make session pool accessible for test cases
olavloite May 5, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google-cloud-clients/google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
</systemPropertyVariables>
<groups>com.google.cloud.spanner.IntegrationTest</groups>
<excludedGroups>com.google.cloud.spanner.FlakyTest</excludedGroups>
<forkedProcessTimeoutInSeconds>2400</forkedProcessTimeoutInSeconds>
</configuration>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BatchClientImpl implements BatchClient {

@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session = (SessionImpl) spanner.createSession(db);
SessionImpl session = spanner.createSession(db);
return new BatchReadOnlyTransactionImpl(spanner, session, checkNotNull(bound));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
Expand All @@ -33,17 +36,33 @@ class DatabaseClientImpl implements DatabaseClient {
TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION, PARTITION_DML_TRANSACTION);
}

private final SessionPool pool;
@VisibleForTesting final SessionPool pool;

DatabaseClientImpl(SessionPool pool) {
this.pool = pool;
}

@VisibleForTesting
PooledSession getReadSession() {
return pool.getReadSession();
}

@VisibleForTesting
PooledSession getReadWriteSession() {
return pool.getReadWriteSession();
}

@Override
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().write(mutations);
return runWithSessionRetry(
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
return session.write(mutations);
}
});
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -53,10 +72,16 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().writeAtLeastOnce(mutations);
return runWithSessionRetry(
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
return session.writeAtLeastOnce(mutations);
}
});
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -69,7 +94,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUse();
return getReadSession().singleUse();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious whats the behavior with read sessions seeing SNF errors ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SNF is detected at the first actual database request. For read-only transactions (both single use and actual read transactions), that is at the first query. The query is sent to the server at the first call to ResultSet#next(), so the SNF for a read transaction is handled here:

} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -80,7 +105,7 @@ public ReadContext singleUse() {
public ReadContext singleUse(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUse(bound);
return getReadSession().singleUse(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -91,7 +116,7 @@ public ReadContext singleUse(TimestampBound bound) {
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUseReadOnlyTransaction();
return getReadSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -102,7 +127,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().singleUseReadOnlyTransaction(bound);
return getReadSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -113,7 +138,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
public ReadOnlyTransaction readOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().readOnlyTransaction();
return getReadSession().readOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -124,7 +149,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadSession().readOnlyTransaction(bound);
return getReadSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -135,7 +160,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
public TransactionRunner readWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().readWriteTransaction();
return getReadWriteSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -146,24 +171,41 @@ public TransactionRunner readWriteTransaction() {
public TransactionManager transactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().transactionManager();
return getReadWriteSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public long executePartitionedUpdate(Statement stmt) {
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().executePartitionedUpdate(stmt);
return runWithSessionRetry(
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
return session.executePartitionedUpdate(stmt);
}
});
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private <T> T runWithSessionRetry(Function<Session, T> callable) {
PooledSession session = getReadWriteSession();
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this respect client side timeout values set ?
If our backend is down, and does not return any sessions -> whats the outcome of the customer application ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The short answer: It will behave the same as now, and the behavior will depend on whether the pool has a session ready to be returned or not. The runWithSessionRetry will only retry on the specific condition of a SessionNotFoundException being returned.

There are two possible scenarios:

  1. The application requests a session and the session pool has a session that it thinks is valid. This session is handed to the application and the application tries to execute the transaction. This will cause the query or begin transaction statement to time out.
  2. The pool does not have a session available that can be returned and starts the creation of a new session asynchronously. The requesting thread is placed in a waiter. The create session request will fail and notify the waiter of the error. The waiting thread will return with the create session error.

Both of the above will not be considered a SessionNotFoundException by the runWithSessionRetry method, and the method will return with an error.

try {
return callable.apply(session);
} catch (SessionNotFoundException e) {
session = pool.replaceReadWriteSession(e, session);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up from my last comment, if the session is READ, is this correct, as replaceReadWriteSession eventually returns 'getReadWriteSession()'?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it wasn't right, even though the method was never called with READ. But that was not correct either, as executePartitionedUpdate does not need a read/write session, as it always starts a new transaction (as it needs a special type of transaction).

}
}
}

ListenableFuture<Void> closeAsync() {
return pool.closeAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
public class ForwardingResultSet extends ForwardingStructReader implements ResultSet {

private final ResultSet delegate;
private ResultSet delegate;

public ForwardingResultSet(ResultSet delegate) {
super(delegate);
this.delegate = Preconditions.checkNotNull(delegate);
}

/**
* Replaces the underlying {@link ResultSet}. It is the responsibility of the caller to ensure
* that the new delegate has the same properties and is in the same state as the original
* delegate. This method can be used if the underlying delegate needs to be replaced after a
* session or transaction needed to be restarted after the {@link ResultSet} had already been
* returned to the user.
*/
void replaceDelegate(ResultSet newDelegate) {
Preconditions.checkNotNull(newDelegate);
super.replaceDelegate(newDelegate);
this.delegate = newDelegate;
}

@Override
public boolean next() throws SpannerException {
return delegate.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@
/** Forwarding implements of StructReader */
public class ForwardingStructReader implements StructReader {

private final StructReader delegate;
private StructReader delegate;

public ForwardingStructReader(StructReader delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}

/**
* Replaces the underlying {@link StructReader}. It is the responsibility of the caller to ensure
* that the new delegate has the same properties and is in the same state as the original
* delegate. This method can be used if the underlying delegate needs to be replaced after a
* session or transaction needed to be restarted after the {@link StructReader} had already been
* returned to the user.
*/
void replaceDelegate(StructReader newDelegate) {
this.delegate = Preconditions.checkNotNull(newDelegate);
}

@Override
public Type getType() {
return delegate.getType();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import javax.annotation.Nullable;

/**
* Exception thrown by Cloud Spanner when an operation detects that the session that is being used
* is no longer valid. This type of error has its own subclass as it is a condition that should
* normally be hidden from the user, and the client library should try to fix this internally.
*/
public class SessionNotFoundException extends SpannerException {
private static final long serialVersionUID = -6395746612598975751L;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
SessionNotFoundException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
super(token, ErrorCode.NOT_FOUND, false, message, cause);
}
}
Loading