Skip to content

Commit

Permalink
feat: read-only transaction with options
Browse files Browse the repository at this point in the history
Adds a function for starting a read-only transaction with Spanner
options. This function takes a custom Spanner ReadOnlyTransactionOptions
struct as input argument, which in the future can be extended to add
any new options that Spanner might add.

The transaction options that are given for the transaction are only
used for this one transaction, instead of being persisted on the
connection.
  • Loading branch information
olavloite committed Jan 20, 2025
1 parent 5f7b8db commit 10e3e99
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 2 deletions.
62 changes: 60 additions & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,42 @@ func runTransactionWithOptions(ctx context.Context, db *sql.DB, opts *sql.TxOpti
}
}

// ReadOnlyTransactionOptions can be used to create a read-only transaction
// on a Spanner connection.
type ReadOnlyTransactionOptions struct {
TimestampBound spanner.TimestampBound
}

// BeginReadOnlyTransaction begins a read-only transaction on a Spanner connection.
// This function can only be called with a connection to a Spanner database.
func BeginReadOnlyTransaction(ctx context.Context, conn *sql.Conn, options ReadOnlyTransactionOptions) (*sql.Tx, error) {
if err := withTempReadOnlyTransactionOptions(conn, &options); err != nil {
return nil, err
}
tx, err := conn.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
clearTempReadOnlyTransactionOptions(conn)
return nil, err
}
return tx, nil
}

func withTempReadOnlyTransactionOptions(conn *sql.Conn, options *ReadOnlyTransactionOptions) error {
return conn.Raw(func(driverConn any) error {
spannerConn, ok := driverConn.(SpannerConn)
if !ok {
// It is not a Spanner connection.
return spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "This function can only be used with a Spanner connection"))
}
spannerConn.withTempReadOnlyTransactionOptions(options)
return nil
})
}

func clearTempReadOnlyTransactionOptions(conn *sql.Conn) {
_ = withTempReadOnlyTransactionOptions(conn, nil)
}

// SpannerConn is the public interface for the raw Spanner connection for the
// sql driver. This interface can be used with the db.Conn().Raw() method.
type SpannerConn interface {
Expand Down Expand Up @@ -740,9 +776,14 @@ type SpannerConn interface {
// returned.
resetTransactionForRetry(ctx context.Context, errDuringCommit bool) error

// setTransactionOptions sets the TransactionOptions that should be used
// withTransactionOptions sets the TransactionOptions that should be used
// for this transaction.
withTransactionOptions(options spanner.TransactionOptions)

// withTempReadOnlyTransactionOptions sets the options that should be used
// for the next read-only transaction. This method should only be called
// directly before starting a new read-only transaction.
withTempReadOnlyTransactionOptions(options *ReadOnlyTransactionOptions)
}

var _ SpannerConn = &conn{}
Expand Down Expand Up @@ -781,6 +822,10 @@ type conn struct {
// It can be set by passing it in as an argument to ExecContext or QueryContext
// and is cleared after each execution.
execOptions ExecOptions

// tempReadOnlyTransactionOptions are temporarily set right before a read-only
// transaction is started on a Spanner connection.
tempReadOnlyTransactionOptions *ReadOnlyTransactionOptions
}

type batchType int
Expand Down Expand Up @@ -1418,6 +1463,18 @@ func (c *conn) withTransactionOptions(options spanner.TransactionOptions) {
c.execOptions.TransactionOptions = options
}

func (c *conn) withTempReadOnlyTransactionOptions(options *ReadOnlyTransactionOptions) {
c.tempReadOnlyTransactionOptions = options
}

func (c *conn) getReadOnlyTransactionOptions() ReadOnlyTransactionOptions {
if c.tempReadOnlyTransactionOptions != nil {
defer func() { c.tempReadOnlyTransactionOptions = nil }()
return *c.tempReadOnlyTransactionOptions
}
return ReadOnlyTransactionOptions{TimestampBound: c.readOnlyStaleness}
}

type spannerIsolationLevel sql.IsolationLevel

const (
Expand All @@ -1441,6 +1498,7 @@ func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
c.resetForRetry = false
return c.tx, nil
}
readOnlyTxOpts := c.getReadOnlyTransactionOptions()
if c.inTransaction() {
return nil, spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "already in a transaction"))
}
Expand All @@ -1461,7 +1519,7 @@ func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e

if opts.ReadOnly {
logger := c.logger.With("tx", "ro")
ro := c.client.ReadOnlyTransaction().WithTimestampBound(c.readOnlyStaleness)
ro := c.client.ReadOnlyTransaction().WithTimestampBound(readOnlyTxOpts.TimestampBound)
c.tx = &readOnlyTransaction{
roTx: ro,
logger: logger,
Expand Down
63 changes: 63 additions & 0 deletions driver_with_mockserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,69 @@ func TestReadOnlyTransactionWithStaleness(t *testing.T) {
}
}

func TestReadOnlyTransactionWithOptions(t *testing.T) {
t.Parallel()

ctx := context.Background()
db, server, teardown := setupTestDBConnection(t)
defer teardown()

conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}

tx, err := BeginReadOnlyTransaction(ctx, conn,
ReadOnlyTransactionOptions{TimestampBound: spanner.ExactStaleness(10 * time.Second)})
if err != nil {
t.Fatal(err)
}
useTx := func(tx *sql.Tx) {
rows, err := tx.Query(testutil.SelectFooFromBar)
if err != nil {
t.Fatal(err)
}

for rows.Next() {
}
if rows.Err() != nil {
t.Fatal(rows.Err())
}
_ = rows.Close()
if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}
useTx(tx)

requests := drainRequestsFromServer(server.TestSpanner)
beginReadOnlyRequests := filterBeginReadOnlyRequests(requestsOfType(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{})))
if g, w := len(beginReadOnlyRequests), 1; g != w {
t.Fatalf("begin requests count mismatch\nGot: %v\nWant: %v", g, w)
}
beginReq := beginReadOnlyRequests[0]
if beginReq.GetOptions().GetReadOnly().GetExactStaleness() == nil {
t.Fatalf("missing exact_staleness option on BeginTransaction request")
}

// Verify that the staleness option is not 'sticky' on the connection.
tx, err = conn.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
t.Fatal(err)
}
useTx(tx)

requests = drainRequestsFromServer(server.TestSpanner)
beginReadOnlyRequests = filterBeginReadOnlyRequests(requestsOfType(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{})))
if g, w := len(beginReadOnlyRequests), 1; g != w {
t.Fatalf("begin requests count mismatch\nGot: %v\nWant: %v", g, w)
}
beginReq = beginReadOnlyRequests[0]
if beginReq.GetOptions().GetReadOnly().GetExactStaleness() != nil {
t.Fatalf("got unexpected exact_staleness option on BeginTransaction request")
}
}

func TestSimpleReadWriteTransaction(t *testing.T) {
t.Parallel()

Expand Down
69 changes: 69 additions & 0 deletions examples/read-only-transaction-with-options/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"database/sql"
"fmt"
"time"

"cloud.google.com/go/spanner"
spannerdriver "github.com/googleapis/go-sql-spanner"
"github.com/googleapis/go-sql-spanner/examples"
)

// Sample showing how to execute a read-only transaction with specific options on a Spanner database.
//
// Execute the sample with the command `go run main.go` from this directory.
func readOnlyTransactionWithOptions(projectId, instanceId, databaseId string) error {
ctx := context.Background()
db, err := sql.Open("spanner", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, databaseId))
if err != nil {
return err
}
defer db.Close()

// Get a connection that should be used for the transaction.
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
// Start a read-only transaction on the Spanner database using a staleness option.
tx, err := spannerdriver.BeginReadOnlyTransaction(
ctx, conn, spannerdriver.ReadOnlyTransactionOptions{
TimestampBound: spanner.ExactStaleness(time.Second * 10),
})
if err != nil {
return err
}
fmt.Println("Started a read-only transaction with a staleness option")

// Use the read-only transaction...

// Committing or rolling back a read-only transaction will not execute an actual Commit or Rollback
// on the database, but it is needed in order to release the resources that are held by the read-only
// transaction.
if err := tx.Commit(); err != nil {
return err
}

return nil
}

func main() {
examples.RunSampleOnEmulator(readOnlyTransactionWithOptions)
}

0 comments on commit 10e3e99

Please sign in to comment.