Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spanner): reset buffer after abort on first SQL statement #8440

Merged
merged 2 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
t.wb = []*Mutation{}
t.txOpts = c.txo.merge(options)
t.ct = c.ct

Expand Down
302 changes: 302 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,308 @@ func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) {
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeAbortedFirstSqlStatement(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return Aborted from the backend.
// This will force a retry of the transaction with an explicit BeginTransaction RPC.
c, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
if g, w := c, int64(UpdateBarSetFooRowCount); g != w {
return fmt.Errorf("update count mismatch\nGot: %v\nWant: %v", g, w)
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeAbortedFirstSqlStatementTwice(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.Aborted, "Transaction aborted"),
status.Error(codes.Aborted, "Transaction aborted"),
}})

var attempts int
expectedAttempts := 3
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return Aborted from the backend.
// This will force a retry of the transaction with an explicit BeginTransaction RPC.
c, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
if g, w := c, int64(UpdateBarSetFooRowCount); g != w {
return fmt.Errorf("update count mismatch\nGot: %v\nWant: %v", g, w)
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeSqlStatementWithError(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.InvalidArgument, "Invalid"),
status.Error(codes.InvalidArgument, "Invalid"),
}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return InvalidArgument from the backend.
// This will initially force a retry of the transaction with an explicit BeginTransaction RPC.
// We ignore the error and proceed to commit the transaction.
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err == nil {
return fmt.Errorf("missing expected InvalidArgument error")
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeSqlStatementWithErrorThatGoesAway(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.AlreadyExists, "Row already exists"),
}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return InvalidArgument from the backend.
// This will initially force a retry of the transaction with an explicit BeginTransaction RPC.
// The error does not occur during the retry and the transaction is allowed to continue.
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_OnlyBufferWritesDuringInitialAttempt(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.AlreadyExists, "Row already exists"),
}})

expectedAttempts := 2
var attempts int
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
if attempts == 1 {
// Only do a blind write if it is not a retry of the transaction.
if err := tx.BufferWrite([]*Mutation{
Delete("foo", AllKeys()),
}); err != nil {
return err
}
}
// Then execute a SQL statement that will return InvalidArgument from the backend.
// This will initially force a retry of the transaction with an explicit BeginTransaction RPC.
// The error does not occur during the retry and the transaction is allowed to continue.
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 0
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BlindWriteWithAbortedCommit(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodCommitTransaction, SimulatedExecutionTime{Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Do a blind write and then commit. The CommitRequest will be aborted and cause the transaction to retry.
if err := tx.BufferWrite([]*Mutation{Insert("foo", []string{"col1"}, []interface{}{"key1"})}); err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
// TODO: Update to 1 when the bug is fixed
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnCommit(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
Expand Down