Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add ResetForRetry method for stmt-based transactions #10956

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package spanner

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1749,6 +1750,7 @@ type ReadWriteStmtBasedTransaction struct {
// ReadWriteTransaction contains methods for performing transactional reads.
ReadWriteTransaction

client *Client
options TransactionOptions
}

Expand All @@ -1774,23 +1776,35 @@ func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWrit
// used by the transaction will not be returned to the pool and cause a session
// leak.
//
// ResetForRetry resets the transaction before a retry attempt. This function
// returns a new transaction that should be used for the retry attempt. The
// transaction that is returned by this function is assigned a higher priority
// than the previous transaction, making it less probable to be aborted by
// Spanner again during the retry.
//
// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
// NewReadWriteStmtBasedTransaction.
func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, c, options, nil)
}

func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *Client, options TransactionOptions, sh *sessionHandle) (*ReadWriteStmtBasedTransaction, error) {
var (
sh *sessionHandle
err error
t *ReadWriteStmtBasedTransaction
)
sh, err = c.idleSessions.take(ctx)
if err != nil {
// If session retrieval fails, just fail the transaction.
return nil, err
if sh == nil {
sh, err = c.idleSessions.take(ctx)
if err != nil {
// If session retrieval fails, just fail the transaction.
return nil, err
}
}
t = &ReadWriteStmtBasedTransaction{
ReadWriteTransaction: ReadWriteTransaction{
txReadyOrClosed: make(chan struct{}),
},
client: c,
}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.sh = sh
Expand Down Expand Up @@ -1829,6 +1843,7 @@ func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context
}
if t.sh != nil {
t.sh.recycle()
t.sh = nil
}
return resp, err
}
Expand All @@ -1839,7 +1854,22 @@ func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) {
t.rollback(ctx)
if t.sh != nil {
t.sh.recycle()
t.sh = nil
}
}

// ResetForRetry resets the transaction before a retry. This should be
// called if the transaction was aborted by Spanner and the application
// wants to retry the transaction.
// It is recommended to use this method above creating a new transaction,
// as this method will give the transaction a higher priority and thus a
// smaller probability of being aborted again by Spanner.
func (t *ReadWriteStmtBasedTransaction) ResetForRetry(ctx context.Context) (*ReadWriteStmtBasedTransaction, error) {
if t.state == txNew || t.state == txInit {
Copy link
Contributor

@harshachinta harshachinta Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a check whether the previous attempt failed with ABORTED, since

  1. for all the other errors (ex: session not found error) using the same session does not help during retry.
  2. This will prevent customers to use this only in case of ABORTED and not use it as retry mechanism for other errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the PR to incorporate this. This makes the change slightly bigger, as we were not tracking Aborted as an actual transaction state, so this PR now also adds that.

return nil, fmt.Errorf("ResetForRetry should only be called on an active transaction that was aborted by Spanner")
}
// Create a new transaction that re-uses the current session if it is available.
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, t.client, t.options, t.sh)
}

// writeOnlyTransaction provides the most efficient way of doing write-only
Expand Down
32 changes: 30 additions & 2 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,31 @@ func TestReadWriteStmtBasedTransaction_CommitAborted(t *testing.T) {
}
}

func TestReadWriteStmtBasedTransaction_QueryAborted(t *testing.T) {
t.Parallel()
rowCount, attempts, err := testReadWriteStmtBasedTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
})
if err != nil {
t.Fatalf("transaction failed to commit: %v", err)
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
t.Fatalf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
if g, w := attempts, 2; g != w {
t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
}
}

func testReadWriteStmtBasedTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) (rowCount int64, attempts int, err error) {
server, client, teardown := setupMockedTestServer(t)
// server, client, teardown := setupMockedTestServer(t)
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
// Use a session pool with size 1 to ensure that there are no session leaks.
MinOpened: 1,
MaxOpened: 1,
},
})
defer teardown()
for method, exec := range executionTimes {
server.TestSpanner.PutExecutionTime(method, exec)
Expand Down Expand Up @@ -500,9 +523,14 @@ func testReadWriteStmtBasedTransaction(t *testing.T, executionTimes map[string]S
return rowCount, nil
}

var tx *ReadWriteStmtBasedTransaction
for {
attempts++
tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
if attempts > 1 {
tx, err = tx.ResetForRetry(ctx)
} else {
tx, err = NewReadWriteStmtBasedTransaction(ctx, client)
}
if err != nil {
return 0, attempts, fmt.Errorf("failed to begin a transaction: %v", err)
}
Expand Down
Loading