Skip to content

Commit

Permalink
fix(spanner): attempt latency for streaming call should capture the t…
Browse files Browse the repository at this point in the history
…otal latency till decoding of protos (#11039)

* fix(spanner): attempt latency for streaming call should capture the total latency till decoding of protos.

* fix tests
  • Loading branch information
rahul2393 authored Oct 29, 2024
1 parent e06cb64 commit 255c6bf
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 76 deletions.
25 changes: 7 additions & 18 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"cloud.google.com/go/internal/trace"
Expand Down Expand Up @@ -651,29 +650,19 @@ func metricsInterceptor() grpc.UnaryClientInterceptor {
// wrappedStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type wrappedStream struct {
sync.Mutex
isFirstRecv bool
method string
target string
method string
target string
grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m any) error {
attempt := &attemptTracer{}
attempt.setStartTime(time.Now())
err := w.ClientStream.RecvMsg(m)
statusCode, _ := status.FromError(err)
ctx := w.ClientStream.Context()
mt, ok := ctx.Value(metricsTracerKey).(*builtinMetricsTracer)
if !ok || !w.isFirstRecv {
if !ok {
return err
}
w.Lock()
w.isFirstRecv = false
w.Unlock()
mt.method = w.method
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = attempt
if strings.HasPrefix(w.target, "google-c2p") {
mt.currOp.setDirectPathEnabled(true)
}
Expand All @@ -687,9 +676,9 @@ func (w *wrappedStream) RecvMsg(m any) error {
}
}
}
mt.currOp.currAttempt.setStatus(statusCode.Code().String())
mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed)
recordAttemptCompletion(mt)
if mt.currOp.currAttempt != nil {
mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed)
}
return err
}

Expand All @@ -698,7 +687,7 @@ func (w *wrappedStream) SendMsg(m any) error {
}

func newWrappedStream(s grpc.ClientStream, method, target string) grpc.ClientStream {
return &wrappedStream{ClientStream: s, method: method, target: target, isFirstRecv: true}
return &wrappedStream{ClientStream: s, method: method, target: target}
}

// metricsInterceptor is a gRPC stream client interceptor that records metrics for stream RPCs.
Expand Down
35 changes: 26 additions & 9 deletions spanner/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,26 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
// Get new CreateServiceTimeSeriesRequests
gotCreateTSCalls := monitoringServer.CreateServiceTimeSeriesRequests()
var gotExpectedMethods []string
gotOTELValues := make(map[string]map[string]int64)
gotOTELCountValues := make(map[string]map[string]int64)
gotOTELLatencyValues := make(map[string]map[string]float64)
for _, gotCreateTSCall := range gotCreateTSCalls {
gotMetricTypesPerMethod := make(map[string][]string)
for _, ts := range gotCreateTSCall.TimeSeries {
gotMetricTypesPerMethod[ts.Metric.GetLabels()["method"]] = append(gotMetricTypesPerMethod[ts.Metric.GetLabels()["method"]], ts.Metric.Type)
if _, ok := gotOTELValues[ts.Metric.GetLabels()["method"]]; !ok {
gotOTELValues[ts.Metric.GetLabels()["method"]] = make(map[string]int64)
if _, ok := gotOTELCountValues[ts.Metric.GetLabels()["method"]]; !ok {
gotOTELCountValues[ts.Metric.GetLabels()["method"]] = make(map[string]int64)
gotOTELLatencyValues[ts.Metric.GetLabels()["method"]] = make(map[string]float64)
gotExpectedMethods = append(gotExpectedMethods, ts.Metric.GetLabels()["method"])
}
if ts.MetricKind == metric.MetricDescriptor_CUMULATIVE && ts.GetValueType() == metric.MetricDescriptor_INT64 {
gotOTELValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] = ts.Points[0].Value.GetInt64Value()
gotOTELCountValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] = ts.Points[0].Value.GetInt64Value()
} else {
for _, p := range ts.Points {
if p.Value.GetInt64Value() > int64(elapsedTime) {
t.Errorf("Value %v is greater than elapsed time %v", p.Value.GetInt64Value(), elapsedTime)
if _, ok := gotOTELCountValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type]; !ok {
gotOTELLatencyValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] = p.Value.GetDistributionValue().Mean
} else {
// sum up all attempt latencies
gotOTELLatencyValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] += p.Value.GetDistributionValue().Mean
}
}
}
Expand All @@ -216,7 +221,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
sort.Strings(gotMetricTypes)
sort.Strings(test.wantOTELMetrics[method])
if !testutil.Equal(gotMetricTypes, test.wantOTELMetrics[method]) {
t.Errorf("Metric types missing in req. %s got: %v, want: %v", method, gotMetricTypes, wantMetricTypesGCM)
t.Errorf("Metric types missing in req. %s got: %v, want: %v", method, gotMetricTypes, test.wantOTELMetrics[method])
}
}
}
Expand All @@ -226,10 +231,22 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
}
for method, wantOTELValues := range test.wantOTELValue {
for metricName, wantValue := range wantOTELValues {
if gotOTELValues[method][metricName] != wantValue {
t.Errorf("OTEL value for %s, %s: got: %v, want: %v", method, metricName, gotOTELValues[method][metricName], wantValue)
if gotOTELCountValues[method][metricName] != wantValue {
t.Errorf("OTEL value for %s, %s: got: %v, want: %v", method, metricName, gotOTELCountValues[method][metricName], wantValue)
}
}
// For StreamingRead, verify operation latency includes all attempt latencies
opLatency := gotOTELLatencyValues[method][nativeMetricsPrefix+metricNameOperationLatencies]
attemptLatency := gotOTELLatencyValues[method][nativeMetricsPrefix+metricNameAttemptLatencies]
// expect opLatency and attemptLatency to be non-zero
if opLatency == 0 || attemptLatency == 0 {
t.Errorf("Operation and attempt latencies should be non-zero for %s: operation_latency=%v, attempt_latency=%v",
method, opLatency, attemptLatency)
}
if opLatency <= attemptLatency {
t.Errorf("Operation latency should be greater than attempt latency for %s: operation_latency=%v, attempt_latency=%v",
method, opLatency, attemptLatency)
}
}
gotCreateTSCallsCount := len(gotCreateTSCalls)
if gotCreateTSCallsCount < test.wantCreateTSCallsCount {
Expand Down
102 changes: 69 additions & 33 deletions spanner/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
proto3 "google.golang.org/protobuf/types/known/structpb"
)
Expand Down Expand Up @@ -84,12 +85,13 @@ func streamWithReplaceSessionFunc(
ctx, cancel := context.WithCancel(ctx)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator")
return &RowIterator{
streamd: newResumableStreamDecoder(ctx, logger, meterTracerFactory, rpc, replaceSession),
rowd: &partialResultSetDecoder{},
setTransactionID: setTransactionID,
setTimestamp: setTimestamp,
release: release,
cancel: cancel,
meterTracerFactory: meterTracerFactory,
streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession),
rowd: &partialResultSetDecoder{},
setTransactionID: setTransactionID,
setTimestamp: setTimestamp,
release: release,
cancel: cancel,
}
}

Expand Down Expand Up @@ -120,15 +122,17 @@ type RowIterator struct {
// RowIterator.Next() returned an error that is not equal to iterator.Done.
Metadata *sppb.ResultSetMetadata

streamd *resumableStreamDecoder
rowd *partialResultSetDecoder
setTransactionID func(transactionID)
setTimestamp func(time.Time)
release func(error)
cancel func()
err error
rows []*Row
sawStats bool
ctx context.Context
meterTracerFactory *builtinMetricsTracerFactory
streamd *resumableStreamDecoder
rowd *partialResultSetDecoder
setTransactionID func(transactionID)
setTimestamp func(time.Time)
release func(error)
cancel func()
err error
rows []*Row
sawStats bool
}

// this is for safety from future changes to RowIterator making sure that it implements rowIterator interface.
Expand All @@ -138,10 +142,31 @@ var _ rowIterator = (*RowIterator)(nil)
// there are no more results. Once Next returns Done, all subsequent calls
// will return Done.
func (r *RowIterator) Next() (*Row, error) {
mt := r.meterTracerFactory.createBuiltinMetricsTracer(r.ctx)
if r.err != nil {
return nil, r.err
}
for len(r.rows) == 0 && r.streamd.next() {
// Start new attempt
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = &attemptTracer{
startTime: time.Now(),
}
defer func() {
// when mt method is not empty, it means the RPC was sent to backend and native metrics attributes were captured in interceptor
if mt.method != "" {
statusCode, _ := convertToGrpcStatusErr(r.err)
// record the attempt completion
mt.currOp.currAttempt.setStatus(statusCode.String())
recordAttemptCompletion(&mt)
mt.currOp.setStatus(statusCode.String())
// Record operation completion.
// Operational_latencies metric captures the full picture of all attempts including retries.
recordOperationCompletion(&mt)
mt.currOp.currAttempt = nil
}
}()

for len(r.rows) == 0 && r.streamd.next(&mt) {
prs := r.streamd.get()
if r.setTransactionID != nil {
// this is when Read/Query is executed using ReadWriteTransaction
Expand Down Expand Up @@ -406,20 +431,17 @@ type resumableStreamDecoder struct {

// backoff is used for the retry settings
backoff gax.Backoff

meterTracerFactory *builtinMetricsTracerFactory
}

// newResumableStreamDecoder creates a new resumeableStreamDecoder instance.
// Parameter rpc should be a function that creates a new stream beginning at the
// restartToken if non-nil.
func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, meterTracerFactory *builtinMetricsTracerFactory, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder {
func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder {
return &resumableStreamDecoder{
ctx: ctx,
logger: logger,
rpc: rpc,
replaceSessionFunc: replaceSession,
meterTracerFactory: meterTracerFactory,
maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens),
backoff: DefaultRetryBackoff,
}
Expand Down Expand Up @@ -503,32 +525,32 @@ var (
maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024)
)

func (d *resumableStreamDecoder) next() bool {
mt := d.meterTracerFactory.createBuiltinMetricsTracer(d.ctx)
defer func() {
if mt.method != "" {
statusCode, _ := convertToGrpcStatusErr(d.lastErr())
mt.currOp.setStatus(statusCode.String())
recordOperationCompletion(&mt)
}
}()
func (d *resumableStreamDecoder) next(mt *builtinMetricsTracer) bool {
retryer := onCodes(d.backoff, codes.Unavailable, codes.ResourceExhausted, codes.Internal)
for {
switch d.state {
case unConnected:
// If no gRPC stream is available, try to initiate one.
d.stream, d.err = d.rpc(context.WithValue(d.ctx, metricsTracerKey, &mt), d.resumeToken)
d.stream, d.err = d.rpc(context.WithValue(d.ctx, metricsTracerKey, mt), d.resumeToken)
if d.err == nil {
d.changeState(queueingRetryable)
continue
}

delay, shouldRetry := retryer.Retry(d.err)
if !shouldRetry {
d.changeState(aborted)
continue
}
trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay)
if err := gax.Sleep(d.ctx, delay); err == nil {
// record the attempt completion
mt.currOp.currAttempt.setStatus(status.Code(d.err).String())
recordAttemptCompletion(mt)
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = &attemptTracer{
startTime: time.Now(),
}
// Be explicit about state transition, although the
// state doesn't actually change. State transition
// will be triggered only by RPC activity, regardless of
Expand All @@ -549,7 +571,7 @@ func (d *resumableStreamDecoder) next() bool {
// Only the case that receiving queue is empty could cause
// peekLast to return error and in such case, we should try to
// receive from stream.
d.tryRecv(retryer)
d.tryRecv(mt, retryer)
continue
}
if d.isNewResumeToken(last.ResumeToken) {
Expand Down Expand Up @@ -578,7 +600,7 @@ func (d *resumableStreamDecoder) next() bool {
}
// Needs to receive more from gRPC stream till a new resume token
// is observed.
d.tryRecv(retryer)
d.tryRecv(mt, retryer)
continue
case aborted:
// Discard all pending items because none of them should be yield
Expand All @@ -604,7 +626,7 @@ func (d *resumableStreamDecoder) next() bool {
}

// tryRecv attempts to receive a PartialResultSet from gRPC stream.
func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) {
func (d *resumableStreamDecoder) tryRecv(mt *builtinMetricsTracer, retryer gax.Retryer) {
var res *sppb.PartialResultSet
res, d.err = d.stream.Recv()
if d.err == nil {
Expand All @@ -615,12 +637,16 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) {
d.changeState(d.state)
return
}

if d.err == io.EOF {
d.err = nil
d.changeState(finished)
return
}

if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil {
mt.currOp.currAttempt.setStatus(status.Code(d.err).String())
recordAttemptCompletion(mt)
// A 'Session not found' error occurred before we received a resume
// token and a replaceSessionFunc function is defined. Try to restart
// the stream on a new session.
Expand All @@ -629,7 +655,13 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) {
d.changeState(aborted)
return
}
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = &attemptTracer{
startTime: time.Now(),
}
} else {
mt.currOp.currAttempt.setStatus(status.Code(d.err).String())
recordAttemptCompletion(mt)
delay, shouldRetry := retryer.Retry(d.err)
if !shouldRetry || d.state != queueingRetryable {
d.changeState(aborted)
Expand All @@ -640,6 +672,10 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) {
d.changeState(aborted)
return
}
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = &attemptTracer{
startTime: time.Now(),
}
}
// Clear error and retry the stream.
d.err = nil
Expand Down
Loading

0 comments on commit 255c6bf

Please sign in to comment.