From 255c6bfcdd3e844dcf602a829bfa2ce495bcd72e Mon Sep 17 00:00:00 2001 From: rahul2393 Date: Tue, 29 Oct 2024 08:16:21 +0530 Subject: [PATCH] fix(spanner): attempt latency for streaming call should capture the total latency till decoding of protos (#11039) * fix(spanner): attempt latency for streaming call should capture the total latency till decoding of protos. * fix tests --- spanner/client.go | 25 +++------- spanner/metrics_test.go | 35 ++++++++++---- spanner/read.go | 102 +++++++++++++++++++++++++++------------- spanner/read_test.go | 37 ++++++++++----- spanner/transaction.go | 17 +++++-- 5 files changed, 140 insertions(+), 76 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index 95bb7f7a606d..889fb43aa6da 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -25,7 +25,6 @@ import ( "regexp" "strconv" "strings" - "sync" "time" "cloud.google.com/go/internal/trace" @@ -651,29 +650,19 @@ func metricsInterceptor() grpc.UnaryClientInterceptor { // wrappedStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and // SendMsg method call. type wrappedStream struct { - sync.Mutex - isFirstRecv bool - method string - target string + method string + target string grpc.ClientStream } func (w *wrappedStream) RecvMsg(m any) error { - attempt := &attemptTracer{} - attempt.setStartTime(time.Now()) err := w.ClientStream.RecvMsg(m) - statusCode, _ := status.FromError(err) ctx := w.ClientStream.Context() mt, ok := ctx.Value(metricsTracerKey).(*builtinMetricsTracer) - if !ok || !w.isFirstRecv { + if !ok { return err } - w.Lock() - w.isFirstRecv = false - w.Unlock() mt.method = w.method - mt.currOp.incrementAttemptCount() - mt.currOp.currAttempt = attempt if strings.HasPrefix(w.target, "google-c2p") { mt.currOp.setDirectPathEnabled(true) } @@ -687,9 +676,9 @@ func (w *wrappedStream) RecvMsg(m any) error { } } } - mt.currOp.currAttempt.setStatus(statusCode.Code().String()) - mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed) - recordAttemptCompletion(mt) + if mt.currOp.currAttempt != nil { + mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed) + } return err } @@ -698,7 +687,7 @@ func (w *wrappedStream) SendMsg(m any) error { } func newWrappedStream(s grpc.ClientStream, method, target string) grpc.ClientStream { - return &wrappedStream{ClientStream: s, method: method, target: target, isFirstRecv: true} + return &wrappedStream{ClientStream: s, method: method, target: target} } // metricsInterceptor is a gRPC stream client interceptor that records metrics for stream RPCs. diff --git a/spanner/metrics_test.go b/spanner/metrics_test.go index e05cf8cba5be..605fc2501d4f 100644 --- a/spanner/metrics_test.go +++ b/spanner/metrics_test.go @@ -193,21 +193,26 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { // Get new CreateServiceTimeSeriesRequests gotCreateTSCalls := monitoringServer.CreateServiceTimeSeriesRequests() var gotExpectedMethods []string - gotOTELValues := make(map[string]map[string]int64) + gotOTELCountValues := make(map[string]map[string]int64) + gotOTELLatencyValues := make(map[string]map[string]float64) for _, gotCreateTSCall := range gotCreateTSCalls { gotMetricTypesPerMethod := make(map[string][]string) for _, ts := range gotCreateTSCall.TimeSeries { gotMetricTypesPerMethod[ts.Metric.GetLabels()["method"]] = append(gotMetricTypesPerMethod[ts.Metric.GetLabels()["method"]], ts.Metric.Type) - if _, ok := gotOTELValues[ts.Metric.GetLabels()["method"]]; !ok { - gotOTELValues[ts.Metric.GetLabels()["method"]] = make(map[string]int64) + if _, ok := gotOTELCountValues[ts.Metric.GetLabels()["method"]]; !ok { + gotOTELCountValues[ts.Metric.GetLabels()["method"]] = make(map[string]int64) + gotOTELLatencyValues[ts.Metric.GetLabels()["method"]] = make(map[string]float64) gotExpectedMethods = append(gotExpectedMethods, ts.Metric.GetLabels()["method"]) } if ts.MetricKind == metric.MetricDescriptor_CUMULATIVE && ts.GetValueType() == metric.MetricDescriptor_INT64 { - gotOTELValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] = ts.Points[0].Value.GetInt64Value() + gotOTELCountValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] = ts.Points[0].Value.GetInt64Value() } else { for _, p := range ts.Points { - if p.Value.GetInt64Value() > int64(elapsedTime) { - t.Errorf("Value %v is greater than elapsed time %v", p.Value.GetInt64Value(), elapsedTime) + if _, ok := gotOTELCountValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type]; !ok { + gotOTELLatencyValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] = p.Value.GetDistributionValue().Mean + } else { + // sum up all attempt latencies + gotOTELLatencyValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] += p.Value.GetDistributionValue().Mean } } } @@ -216,7 +221,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { sort.Strings(gotMetricTypes) sort.Strings(test.wantOTELMetrics[method]) if !testutil.Equal(gotMetricTypes, test.wantOTELMetrics[method]) { - t.Errorf("Metric types missing in req. %s got: %v, want: %v", method, gotMetricTypes, wantMetricTypesGCM) + t.Errorf("Metric types missing in req. %s got: %v, want: %v", method, gotMetricTypes, test.wantOTELMetrics[method]) } } } @@ -226,10 +231,22 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { } for method, wantOTELValues := range test.wantOTELValue { for metricName, wantValue := range wantOTELValues { - if gotOTELValues[method][metricName] != wantValue { - t.Errorf("OTEL value for %s, %s: got: %v, want: %v", method, metricName, gotOTELValues[method][metricName], wantValue) + if gotOTELCountValues[method][metricName] != wantValue { + t.Errorf("OTEL value for %s, %s: got: %v, want: %v", method, metricName, gotOTELCountValues[method][metricName], wantValue) } } + // For StreamingRead, verify operation latency includes all attempt latencies + opLatency := gotOTELLatencyValues[method][nativeMetricsPrefix+metricNameOperationLatencies] + attemptLatency := gotOTELLatencyValues[method][nativeMetricsPrefix+metricNameAttemptLatencies] + // expect opLatency and attemptLatency to be non-zero + if opLatency == 0 || attemptLatency == 0 { + t.Errorf("Operation and attempt latencies should be non-zero for %s: operation_latency=%v, attempt_latency=%v", + method, opLatency, attemptLatency) + } + if opLatency <= attemptLatency { + t.Errorf("Operation latency should be greater than attempt latency for %s: operation_latency=%v, attempt_latency=%v", + method, opLatency, attemptLatency) + } } gotCreateTSCallsCount := len(gotCreateTSCalls) if gotCreateTSCallsCount < test.wantCreateTSCallsCount { diff --git a/spanner/read.go b/spanner/read.go index 34af289004dc..2752b0ec93e0 100644 --- a/spanner/read.go +++ b/spanner/read.go @@ -30,6 +30,7 @@ import ( "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" proto3 "google.golang.org/protobuf/types/known/structpb" ) @@ -84,12 +85,13 @@ func streamWithReplaceSessionFunc( ctx, cancel := context.WithCancel(ctx) ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator") return &RowIterator{ - streamd: newResumableStreamDecoder(ctx, logger, meterTracerFactory, rpc, replaceSession), - rowd: &partialResultSetDecoder{}, - setTransactionID: setTransactionID, - setTimestamp: setTimestamp, - release: release, - cancel: cancel, + meterTracerFactory: meterTracerFactory, + streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession), + rowd: &partialResultSetDecoder{}, + setTransactionID: setTransactionID, + setTimestamp: setTimestamp, + release: release, + cancel: cancel, } } @@ -120,15 +122,17 @@ type RowIterator struct { // RowIterator.Next() returned an error that is not equal to iterator.Done. Metadata *sppb.ResultSetMetadata - streamd *resumableStreamDecoder - rowd *partialResultSetDecoder - setTransactionID func(transactionID) - setTimestamp func(time.Time) - release func(error) - cancel func() - err error - rows []*Row - sawStats bool + ctx context.Context + meterTracerFactory *builtinMetricsTracerFactory + streamd *resumableStreamDecoder + rowd *partialResultSetDecoder + setTransactionID func(transactionID) + setTimestamp func(time.Time) + release func(error) + cancel func() + err error + rows []*Row + sawStats bool } // this is for safety from future changes to RowIterator making sure that it implements rowIterator interface. @@ -138,10 +142,31 @@ var _ rowIterator = (*RowIterator)(nil) // there are no more results. Once Next returns Done, all subsequent calls // will return Done. func (r *RowIterator) Next() (*Row, error) { + mt := r.meterTracerFactory.createBuiltinMetricsTracer(r.ctx) if r.err != nil { return nil, r.err } - for len(r.rows) == 0 && r.streamd.next() { + // Start new attempt + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = &attemptTracer{ + startTime: time.Now(), + } + defer func() { + // when mt method is not empty, it means the RPC was sent to backend and native metrics attributes were captured in interceptor + if mt.method != "" { + statusCode, _ := convertToGrpcStatusErr(r.err) + // record the attempt completion + mt.currOp.currAttempt.setStatus(statusCode.String()) + recordAttemptCompletion(&mt) + mt.currOp.setStatus(statusCode.String()) + // Record operation completion. + // Operational_latencies metric captures the full picture of all attempts including retries. + recordOperationCompletion(&mt) + mt.currOp.currAttempt = nil + } + }() + + for len(r.rows) == 0 && r.streamd.next(&mt) { prs := r.streamd.get() if r.setTransactionID != nil { // this is when Read/Query is executed using ReadWriteTransaction @@ -406,20 +431,17 @@ type resumableStreamDecoder struct { // backoff is used for the retry settings backoff gax.Backoff - - meterTracerFactory *builtinMetricsTracerFactory } // newResumableStreamDecoder creates a new resumeableStreamDecoder instance. // Parameter rpc should be a function that creates a new stream beginning at the // restartToken if non-nil. -func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, meterTracerFactory *builtinMetricsTracerFactory, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder { +func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder { return &resumableStreamDecoder{ ctx: ctx, logger: logger, rpc: rpc, replaceSessionFunc: replaceSession, - meterTracerFactory: meterTracerFactory, maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens), backoff: DefaultRetryBackoff, } @@ -503,25 +525,18 @@ var ( maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024) ) -func (d *resumableStreamDecoder) next() bool { - mt := d.meterTracerFactory.createBuiltinMetricsTracer(d.ctx) - defer func() { - if mt.method != "" { - statusCode, _ := convertToGrpcStatusErr(d.lastErr()) - mt.currOp.setStatus(statusCode.String()) - recordOperationCompletion(&mt) - } - }() +func (d *resumableStreamDecoder) next(mt *builtinMetricsTracer) bool { retryer := onCodes(d.backoff, codes.Unavailable, codes.ResourceExhausted, codes.Internal) for { switch d.state { case unConnected: // If no gRPC stream is available, try to initiate one. - d.stream, d.err = d.rpc(context.WithValue(d.ctx, metricsTracerKey, &mt), d.resumeToken) + d.stream, d.err = d.rpc(context.WithValue(d.ctx, metricsTracerKey, mt), d.resumeToken) if d.err == nil { d.changeState(queueingRetryable) continue } + delay, shouldRetry := retryer.Retry(d.err) if !shouldRetry { d.changeState(aborted) @@ -529,6 +544,13 @@ func (d *resumableStreamDecoder) next() bool { } trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay) if err := gax.Sleep(d.ctx, delay); err == nil { + // record the attempt completion + mt.currOp.currAttempt.setStatus(status.Code(d.err).String()) + recordAttemptCompletion(mt) + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = &attemptTracer{ + startTime: time.Now(), + } // Be explicit about state transition, although the // state doesn't actually change. State transition // will be triggered only by RPC activity, regardless of @@ -549,7 +571,7 @@ func (d *resumableStreamDecoder) next() bool { // Only the case that receiving queue is empty could cause // peekLast to return error and in such case, we should try to // receive from stream. - d.tryRecv(retryer) + d.tryRecv(mt, retryer) continue } if d.isNewResumeToken(last.ResumeToken) { @@ -578,7 +600,7 @@ func (d *resumableStreamDecoder) next() bool { } // Needs to receive more from gRPC stream till a new resume token // is observed. - d.tryRecv(retryer) + d.tryRecv(mt, retryer) continue case aborted: // Discard all pending items because none of them should be yield @@ -604,7 +626,7 @@ func (d *resumableStreamDecoder) next() bool { } // tryRecv attempts to receive a PartialResultSet from gRPC stream. -func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { +func (d *resumableStreamDecoder) tryRecv(mt *builtinMetricsTracer, retryer gax.Retryer) { var res *sppb.PartialResultSet res, d.err = d.stream.Recv() if d.err == nil { @@ -615,12 +637,16 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { d.changeState(d.state) return } + if d.err == io.EOF { d.err = nil d.changeState(finished) return } + if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil { + mt.currOp.currAttempt.setStatus(status.Code(d.err).String()) + recordAttemptCompletion(mt) // A 'Session not found' error occurred before we received a resume // token and a replaceSessionFunc function is defined. Try to restart // the stream on a new session. @@ -629,7 +655,13 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { d.changeState(aborted) return } + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = &attemptTracer{ + startTime: time.Now(), + } } else { + mt.currOp.currAttempt.setStatus(status.Code(d.err).String()) + recordAttemptCompletion(mt) delay, shouldRetry := retryer.Retry(d.err) if !shouldRetry || d.state != queueingRetryable { d.changeState(aborted) @@ -640,6 +672,10 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { d.changeState(aborted) return } + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = &attemptTracer{ + startTime: time.Now(), + } } // Clear error and retry the stream. d.err = nil diff --git a/spanner/read_test.go b/spanner/read_test.go index e15ced784870..99ab2362a1ee 100644 --- a/spanner/read_test.go +++ b/spanner/read_test.go @@ -28,6 +28,7 @@ import ( sppb "cloud.google.com/go/spanner/apiv1/spannerpb" . "cloud.google.com/go/spanner/internal/testutil" "github.com/googleapis/gax-go/v2" + "go.opentelemetry.io/otel/metric/noop" "google.golang.org/api/iterator" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -805,10 +806,10 @@ func TestRsdNonblockingStates(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + mt := c.metricsTracerFactory.createBuiltinMetricsTracer(ctx) r := newResumableStreamDecoder( ctx, nil, - c.metricsTracerFactory, test.rpc, nil, ) @@ -882,8 +883,12 @@ func TestRsdNonblockingStates(t *testing.T) { } return } + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = &attemptTracer{ + startTime: time.Now(), + } // Receive next decoded item. - if r.next() { + if r.next(&mt) { rs = append(rs, r.get()) } } @@ -1099,10 +1104,10 @@ func TestRsdBlockingStates(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() + mt := c.metricsTracerFactory.createBuiltinMetricsTracer(ctx) r := newResumableStreamDecoder( ctx, nil, - c.metricsTracerFactory, test.rpc, nil, ) @@ -1146,8 +1151,12 @@ func TestRsdBlockingStates(t *testing.T) { var rs []*sppb.PartialResultSet rowsFetched := make(chan int) go func() { + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = &attemptTracer{ + startTime: time.Now(), + } for { - if !r.next() { + if !r.next(&mt) { // Note that r.Next also exits on context cancel/timeout. close(rowsFetched) return @@ -1261,10 +1270,10 @@ func TestQueueBytes(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + mt := c.metricsTracerFactory.createBuiltinMetricsTracer(ctx) decoder := newResumableStreamDecoder( ctx, nil, - c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { r, err := mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1286,24 +1295,24 @@ func TestQueueBytes(t *testing.T) { ResumeToken: rt1, }) - decoder.next() - decoder.next() - decoder.next() + decoder.next(&mt) + decoder.next(&mt) + decoder.next(&mt) if got, want := decoder.bytesBetweenResumeTokens, int32(2*sizeOfPRS); got != want { t.Errorf("r.bytesBetweenResumeTokens = %v, want %v", got, want) } - decoder.next() + decoder.next(&mt) if decoder.bytesBetweenResumeTokens != 0 { t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", decoder.bytesBetweenResumeTokens) } - decoder.next() + decoder.next(&mt) if got, want := decoder.bytesBetweenResumeTokens, int32(sizeOfPRS); got != want { t.Errorf("r.bytesBetweenResumeTokens = %v, want %v", got, want) } - decoder.next() + decoder.next(&mt) if decoder.bytesBetweenResumeTokens != 0 { t.Errorf("r.bytesBetweenResumeTokens = %v, want 0", decoder.bytesBetweenResumeTokens) } @@ -1769,8 +1778,12 @@ func TestIteratorStopEarly(t *testing.T) { } func TestIteratorWithError(t *testing.T) { + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(context.Background(), "projects/my-project/instances/my-instance/databases/my-database", noop.NewMeterProvider()) + if err != nil { + t.Fatalf("failed to create metrics tracer factory: %v", err) + } injected := errors.New("Failed iterator") - iter := RowIterator{err: injected} + iter := RowIterator{meterTracerFactory: metricsTracerFactory, err: injected} defer iter.Stop() if _, err := iter.Next(); err != injected { t.Fatalf("Expected error: %v, got %v", injected, err) diff --git a/spanner/transaction.go b/spanner/transaction.go index 86fedd7a4d8b..9e3e107d1065 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -242,16 +242,22 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key ) kset, err := keys.keySetProto() if err != nil { - return &RowIterator{err: err} + return &RowIterator{ + meterTracerFactory: t.sp.sc.metricsTracerFactory, + err: err} } if sh, ts, err = t.acquire(ctx); err != nil { - return &RowIterator{err: err} + return &RowIterator{ + meterTracerFactory: t.sp.sc.metricsTracerFactory, + err: err} } // Cloud Spanner will return "Session not found" on bad sessions. client := sh.getClient() if client == nil { // Might happen if transaction is closed in the middle of a API call. - return &RowIterator{err: errSessionClosed(sh)} + return &RowIterator{ + meterTracerFactory: t.sp.sc.metricsTracerFactory, + err: errSessionClosed(sh)} } index := t.ro.Index limit := t.ro.Limit @@ -573,7 +579,10 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que defer func() { trace.EndSpan(ctx, ri.err) }() req, sh, err := t.prepareExecuteSQL(ctx, statement, options) if err != nil { - return &RowIterator{err: err} + return &RowIterator{ + meterTracerFactory: t.sp.sc.metricsTracerFactory, + err: err, + } } var setTransactionID func(transactionID) if _, ok := req.Transaction.GetSelector().(*sppb.TransactionSelector_Begin); ok {