Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): BatchWrite #8652

Merged
merged 10 commits into from
Oct 27, 2023
190 changes: 190 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package spanner
import (
"context"
"fmt"
"io"
"log"
"os"
"regexp"
"time"

"cloud.google.com/go/internal/trace"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
Expand Down Expand Up @@ -669,6 +672,193 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
return t.applyAtLeastOnce(ctx, ms...)
}

// BatchWriteResponseIterator is an iterator over BatchWriteResponse structures returned from BatchWrite RPC.
type BatchWriteResponseIterator struct {
This conversation was marked as resolved.
Show resolved Hide resolved
ctx context.Context
stream sppb.Spanner_BatchWriteClient
err error
dataReceived bool
replaceSession func(ctx context.Context) error
rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error)
release func(error)
cancel func()
}

// Next returns the next result. Its second return value is iterator.Done if
// there are no more results. Once Next returns Done, all subsequent calls
// will return Done.
func (r *BatchWriteResponseIterator) Next() (*sppb.BatchWriteResponse, error) {
for {
// Stream finished or in error state.
if r.err != nil {
return nil, r.err
}

// RPC not made yet.
if r.stream == nil {
r.stream, r.err = r.rpc(r.ctx)
continue
}

// Read from the stream.
var response *sppb.BatchWriteResponse
response, r.err = r.stream.Recv()

// Return an item.
if r.err == nil {
r.dataReceived = true
return response, nil
}

// Stream finished.
if r.err == io.EOF {
r.err = iterator.Done
return nil, r.err
}

// Retry request on session not found error only if no data has been received before.
if !r.dataReceived && r.replaceSession != nil && isSessionNotFoundError(r.err) {
r.err = r.replaceSession(r.ctx)
r.stream = nil
}
}
}

// Stop terminates the iteration. It should be called after you finish using the
// iterator.
func (r *BatchWriteResponseIterator) Stop() {
if r.stream != nil {
err := r.err
if err == iterator.Done {
err = nil
}
defer trace.EndSpan(r.ctx, err)
}
if r.cancel != nil {
r.cancel()
r.cancel = nil
}
if r.release != nil {
r.release(r.err)
r.release = nil
}
if r.err == nil {
r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop")
}
}

// Do calls the provided function once in sequence for each item in the
// iteration. If the function returns a non-nil error, Do immediately returns
// that error.
//
// If there are no items in the iterator, Do will return nil without calling the
// provided function.
//
// Do always calls Stop on the iterator.
func (r *BatchWriteResponseIterator) Do(f func(r *sppb.BatchWriteResponse) error) error {
defer r.Stop()
for {
row, err := r.Next()
switch err {
case iterator.Done:
return nil
case nil:
if err = f(row); err != nil {
return err
}
default:
return err
}
}
}

// BatchWrite applies a list of mutation groups in a collection of efficient
// transactions. The mutation groups are applied non-atomically in an
// unspecified order and thus, they must be independent of each other. Partial
// failure is possible, i.e., some mutation groups may have been applied
// successfully, while some may have failed. The results of individual batches
// are streamed into the response as the batches are applied.
//
// BatchWrite requests are not replay protected, meaning that each mutation
// group may be applied more than once. Replays of non-idempotent mutations
// may have undesirable effects. For example, replays of an insert mutation
// may produce an already exists error or if you use generated or commit
// timestamp-based keys, it may result in additional rows being added to the
// mutation's table. We recommend structuring your mutation groups to be
// idempotent to avoid this issue.
func (c *Client) BatchWrite(ctx context.Context, mgs []*MutationGroup, opts ...ApplyOption) *BatchWriteResponseIterator {
This conversation was marked as resolved.
Show resolved Hide resolved
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchWrite")

var err error
defer func() {
trace.EndSpan(ctx, err)
}()

ao := &applyOption{}
for _, opt := range c.ao {
opt(ao)
}
for _, opt := range opts {
opt(ao)
}

mgsPb, err := mutationGroupsProto(mgs)
if err != nil {
return &BatchWriteResponseIterator{err: err}
}

var sh *sessionHandle
sh, err = c.idleSessions.take(ctx)
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return &BatchWriteResponseIterator{err: err}
}

rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) {
var md metadata.MD
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(ao.priority, "", ao.transactionTag),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && c.ct != nil {
if metricErr := createContextAndCaptureGFELatencyMetrics(ct, c.ct, md, "BatchWrite"); metricErr != nil {
trace.TracePrintf(ct, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return stream, rpcErr
}

replaceSession := func(ct context.Context) error {
if sh != nil {
sh.destroy()
}
var sessionErr error
sh, sessionErr = c.idleSessions.take(ct)
return sessionErr
}

release := func(err error) {
if sh == nil {
return
}
if isSessionNotFoundError(err) {
sh.destroy()
}
sh.recycle()
}

ctx, cancel := context.WithCancel(ctx)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchWriteResponseIterator")
return &BatchWriteResponseIterator{
ctx: ctx,
rpc: rpc,
replaceSession: replaceSession,
release: release,
cancel: cancel,
}
}

// logf logs the given message to the given logger, or the standard logger if
// the given logger is nil.
func logf(logger *log.Logger, format string, v ...interface{}) {
Expand Down
Loading