Skip to content

Commit

Permalink
feat(spanner): add ResetForRetry method for stmt-based transactions
Browse files Browse the repository at this point in the history
Read/write transactions that are aborted should preferably be retried using the
same session as the original attempt. For this, statement-based transactions
should have a ResetForRetry function. This was missing in the Go client library.

This change adds this method, and re-uses the session when possible. If the
aborted error happens during the Commit RPC, the session handle was already
cleaned up by the original implementation. We will not change that now, as
that could lead to breakage in existing code that depends on this. When
the Go client is switched to multiplexed sessions for read/write transactions,
then this implementation should be re-visited, and it should be made sure that
ResetForRetry optimizes the retry attempt for an actual retry.

Updates googleapis/go-sql-spanner#300
  • Loading branch information
olavloite committed Oct 7, 2024
1 parent 3377a3c commit 109f3e3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
40 changes: 35 additions & 5 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package spanner

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1749,6 +1750,7 @@ type ReadWriteStmtBasedTransaction struct {
// ReadWriteTransaction contains methods for performing transactional reads.
ReadWriteTransaction

client *Client
options TransactionOptions
}

Expand All @@ -1774,23 +1776,35 @@ func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWrit
// used by the transaction will not be returned to the pool and cause a session
// leak.
//
// ResetForRetry resets the transaction before a retry attempt. This function
// returns a new transaction that should be used for the retry attempt. The
// transaction that is returned by this function is assigned a higher priority
// than the previous transaction, making it less probable to be aborted by
// Spanner again during the retry.
//
// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
// NewReadWriteStmtBasedTransaction.
func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, c, options, nil)
}

func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *Client, options TransactionOptions, sh *sessionHandle) (*ReadWriteStmtBasedTransaction, error) {
var (
sh *sessionHandle
err error
t *ReadWriteStmtBasedTransaction
)
sh, err = c.idleSessions.take(ctx)
if err != nil {
// If session retrieval fails, just fail the transaction.
return nil, err
if sh == nil {
sh, err = c.idleSessions.take(ctx)
if err != nil {
// If session retrieval fails, just fail the transaction.
return nil, err
}
}
t = &ReadWriteStmtBasedTransaction{
ReadWriteTransaction: ReadWriteTransaction{
txReadyOrClosed: make(chan struct{}),
},
client: c,
}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.sh = sh
Expand Down Expand Up @@ -1829,6 +1843,7 @@ func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context
}
if t.sh != nil {
t.sh.recycle()
t.sh = nil
}
return resp, err
}
Expand All @@ -1839,7 +1854,22 @@ func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) {
t.rollback(ctx)
if t.sh != nil {
t.sh.recycle()
t.sh = nil
}
}

// ResetForRetry resets the transaction before a retry. This should be
// called if the transaction was aborted by Spanner and the application
// wants to retry the transaction.
// It is recommended to use this method above creating a new transaction,
// as this method will give the transaction a higher priority and thus a
// smaller probability of being aborted again by Spanner.
func (t *ReadWriteStmtBasedTransaction) ResetForRetry(ctx context.Context) (*ReadWriteStmtBasedTransaction, error) {
if t.state == txNew || t.state == txInit {
return nil, fmt.Errorf("ResetForRetry should only be called on an active transaction that was aborted by Spanner")
}
// Create a new transaction that re-uses the current session if it is available.
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, t.client, t.options, t.sh)
}

// writeOnlyTransaction provides the most efficient way of doing write-only
Expand Down
32 changes: 30 additions & 2 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,31 @@ func TestReadWriteStmtBasedTransaction_CommitAborted(t *testing.T) {
}
}

func TestReadWriteStmtBasedTransaction_QueryAborted(t *testing.T) {
t.Parallel()
rowCount, attempts, err := testReadWriteStmtBasedTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
})
if err != nil {
t.Fatalf("transaction failed to commit: %v", err)
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
t.Fatalf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
if g, w := attempts, 2; g != w {
t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
}
}

func testReadWriteStmtBasedTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) (rowCount int64, attempts int, err error) {
server, client, teardown := setupMockedTestServer(t)
// server, client, teardown := setupMockedTestServer(t)
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
// Use a session pool with size 1 to ensure that there are no session leaks.
MinOpened: 1,
MaxOpened: 1,
},
})
defer teardown()
for method, exec := range executionTimes {
server.TestSpanner.PutExecutionTime(method, exec)
Expand Down Expand Up @@ -500,9 +523,14 @@ func testReadWriteStmtBasedTransaction(t *testing.T, executionTimes map[string]S
return rowCount, nil
}

var tx *ReadWriteStmtBasedTransaction
for {
attempts++
tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
if attempts > 1 {
tx, err = tx.ResetForRetry(ctx)
} else {
tx, err = NewReadWriteStmtBasedTransaction(ctx, client)
}
if err != nil {
return 0, attempts, fmt.Errorf("failed to begin a transaction: %v", err)
}
Expand Down

0 comments on commit 109f3e3

Please sign in to comment.