diff --git a/spanner/batch.go b/spanner/batch.go index 5c20f66fb0ab..69399d0fecce 100644 --- a/spanner/batch.go +++ b/spanner/batch.go @@ -384,6 +384,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R return stream( contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), sh.session.logger, + t.sp.sc.metricsTracerFactory, rpc, t.setTimestamp, t.release) diff --git a/spanner/client.go b/spanner/client.go index dcd2342df4e6..d34f9838298d 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -24,6 +24,8 @@ import ( "os" "regexp" "strconv" + "strings" + "sync" "time" "cloud.google.com/go/internal/trace" @@ -33,6 +35,7 @@ import ( "github.com/googleapis/gax-go/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" @@ -42,6 +45,8 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" vkit "cloud.google.com/go/spanner/apiv1" "cloud.google.com/go/spanner/internal" @@ -111,6 +116,7 @@ type Client struct { disableRouteToLeader bool dro *sppb.DirectedReadOptions otConfig *openTelemetryConfig + metricsTracerFactory *builtinMetricsTracerFactory } // DatabaseName returns the full name of a database, e.g., @@ -461,6 +467,30 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf sc.otConfig = otConfig sc.mu.Unlock() + var metricsProvider metric.MeterProvider + if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" { + // Do not emit native metrics when emulator is being used + metricsProvider = noop.NewMeterProvider() + } + + // SPANNER_ENABLE_BUILTIN_METRICS environment variable is used to enable + // native metrics for the Spanner client, which overrides the default. + // + // This is an EXPERIMENTAL feature and may be changed or removed in the future. + if os.Getenv("SPANNER_ENABLE_BUILTIN_METRICS") != "true" { + // Do not emit native metrics when SPANNER_ENABLE_BUILTIN_METRICS is not set to true + metricsProvider = noop.NewMeterProvider() + } + + // Create a OpenTelemetry metrics configuration + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider) + if err != nil { + return nil, err + } + sc.mu.Lock() + sc.metricsTracerFactory = metricsTracerFactory + sc.mu.Unlock() + // Create a session pool. config.SessionPoolConfig.sessionLabels = sessionLabels sp, err := newSessionPool(sc, config.SessionPoolConfig) @@ -482,6 +512,7 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf disableRouteToLeader: config.DisableRouteToLeader, dro: config.DirectedReadOptions, otConfig: otConfig, + metricsTracerFactory: metricsTracerFactory, } return c, nil } @@ -537,6 +568,8 @@ func allClientOpts(numChannels int, compression string, userOpts ...option.Clien option.WithGRPCConnectionPool(numChannels), option.WithUserAgent(fmt.Sprintf("spanner-go/v%s", internal.Version)), internaloption.AllowNonDefaultServiceAccount(true), + option.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(addNativeMetricsInterceptor()...)), + option.WithGRPCDialOption(grpc.WithChainStreamInterceptor(addStreamNativeMetricsInterceptor()...)), } if enableDirectPathXds, _ := strconv.ParseBool(os.Getenv("GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS")); enableDirectPathXds { clientDefaultOpts = append(clientDefaultOpts, internaloption.EnableDirectPath(true), internaloption.EnableDirectPathXds()) @@ -549,6 +582,136 @@ func allClientOpts(numChannels int, compression string, userOpts ...option.Clien return append(allDefaultOpts, userOpts...) } +// metricsInterceptor is a gRPC unary client interceptor that records metrics for unary RPCs. +func metricsInterceptor() grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req interface{}, + reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + mt, ok := ctx.Value(metricsTracerKey).(*builtinMetricsTracer) + if !ok { + return invoker(ctx, method, req, reply, cc, opts...) + } + + mt.method = method + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = &attemptTracer{} + mt.currOp.currAttempt.setStartTime(time.Now()) + if strings.HasPrefix(cc.Target(), "google-c2p") { + mt.currOp.setDirectPathEnabled(true) + } + + peerInfo := &peer.Peer{} + opts = append(opts, grpc.Peer(peerInfo)) + err := invoker(ctx, method, req, reply, cc, opts...) + + statusCode, _ := status.FromError(err) + mt.currOp.currAttempt.setStatus(statusCode.Code().String()) + + isDirectPathUsed := false + if peerInfo.Addr != nil { + remoteIP := peerInfo.Addr.String() + if strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) { + isDirectPathUsed = true + } + } + + mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed) + recordAttemptCompletion(mt) + return err + } +} + +// wrappedStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and +// SendMsg method call. +type wrappedStream struct { + sync.Mutex + isFirstRecv bool + method string + target string + grpc.ClientStream +} + +func (w *wrappedStream) RecvMsg(m any) error { + attempt := &attemptTracer{} + attempt.setStartTime(time.Now()) + err := w.ClientStream.RecvMsg(m) + statusCode, _ := status.FromError(err) + ctx := w.ClientStream.Context() + mt, ok := ctx.Value(metricsTracerKey).(*builtinMetricsTracer) + if !ok || !w.isFirstRecv { + return err + } + w.Lock() + w.isFirstRecv = false + w.Unlock() + mt.method = w.method + mt.currOp.incrementAttemptCount() + mt.currOp.currAttempt = attempt + if strings.HasPrefix(w.target, "google-c2p") { + mt.currOp.setDirectPathEnabled(true) + } + isDirectPathUsed := false + peerInfo, ok := peer.FromContext(ctx) + if ok { + if peerInfo.Addr != nil { + remoteIP := peerInfo.Addr.String() + if strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) { + isDirectPathUsed = true + } + } + } + mt.currOp.currAttempt.setStatus(statusCode.Code().String()) + mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed) + recordAttemptCompletion(mt) + return err +} + +func (w *wrappedStream) SendMsg(m any) error { + return w.ClientStream.SendMsg(m) +} + +func newWrappedStream(s grpc.ClientStream, method, target string) grpc.ClientStream { + return &wrappedStream{ClientStream: s, method: method, target: target, isFirstRecv: true} +} + +// metricsInterceptor is a gRPC stream client interceptor that records metrics for stream RPCs. +func metricsStreamInterceptor() grpc.StreamClientInterceptor { + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + s, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return nil, err + } + return newWrappedStream(s, method, cc.Target()), nil + } +} + +// AddNativeMetricsInterceptor intercepts unary requests and records metrics for them. +func addNativeMetricsInterceptor() []grpc.UnaryClientInterceptor { + unaryInterceptors := []grpc.UnaryClientInterceptor{} + unaryInterceptors = append(unaryInterceptors, metricsInterceptor()) + return unaryInterceptors +} + +// AddStreamNativeMetricsInterceptor intercepts stream requests and records metrics for them. +func addStreamNativeMetricsInterceptor() []grpc.StreamClientInterceptor { + streamInterceptors := []grpc.StreamClientInterceptor{} + streamInterceptors = append(streamInterceptors, metricsStreamInterceptor()) + return streamInterceptors +} + // getQueryOptions returns the query options overwritten by the environment // variables if exist. The input parameter is the query options set by users // via application-level configuration. If the environment variables are set, @@ -570,6 +733,9 @@ func getQueryOptions(opts QueryOptions) QueryOptions { // Close closes the client. func (c *Client) Close() { + if c.metricsTracerFactory != nil { + c.metricsTracerFactory.shutdown() + } if c.idleSessions != nil { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -703,6 +869,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound rts: rts, }, } + t.txReadOnly.sp = c.idleSessions t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo @@ -738,6 +905,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) }, ID: tid, } + t.txReadOnly.sp = c.idleSessions t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo @@ -988,20 +1156,29 @@ func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions { // BatchWriteResponseIterator is an iterator over BatchWriteResponse structures returned from BatchWrite RPC. type BatchWriteResponseIterator struct { - ctx context.Context - stream sppb.Spanner_BatchWriteClient - err error - dataReceived bool - replaceSession func(ctx context.Context) error - rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error) - release func(error) - cancel func() + ctx context.Context + stream sppb.Spanner_BatchWriteClient + err error + dataReceived bool + meterTracerFactory *builtinMetricsTracerFactory + replaceSession func(ctx context.Context) error + rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error) + release func(error) + cancel func() } // Next returns the next result. Its second return value is iterator.Done if // there are no more results. Once Next returns Done, all subsequent calls // will return Done. func (r *BatchWriteResponseIterator) Next() (*sppb.BatchWriteResponse, error) { + mt := r.meterTracerFactory.createBuiltinMetricsTracer(r.ctx) + defer func() { + if mt.method != "" { + statusCode, _ := convertToGrpcStatusErr(r.err) + mt.currOp.setStatus(statusCode.String()) + recordOperationCompletion(&mt) + } + }() for { // Stream finished or in error state. if r.err != nil { @@ -1117,13 +1294,13 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup mgsPb, err := mutationGroupsProto(mgs) if err != nil { - return &BatchWriteResponseIterator{err: err} + return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err} } var sh *sessionHandle sh, err = c.idleSessions.take(ctx) if err != nil { - return &BatchWriteResponseIterator{err: err} + return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err} } rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) { @@ -1169,11 +1346,12 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup ctx, cancel := context.WithCancel(ctx) ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchWriteResponseIterator") return &BatchWriteResponseIterator{ - ctx: ctx, - rpc: rpc, - replaceSession: replaceSession, - release: release, - cancel: cancel, + ctx: ctx, + meterTracerFactory: c.metricsTracerFactory, + rpc: rpc, + replaceSession: replaceSession, + release: release, + cancel: cancel, } } diff --git a/spanner/client_test.go b/spanner/client_test.go index 85555583eb3c..65b6802f5a30 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -4447,13 +4447,13 @@ func TestClient_CallOptions(t *testing.T) { cs := &gax.CallSettings{} // This is the default retry setting. - c.CallOptions.CreateSession[1].Resolve(cs) + c.CallOptions().CreateSession[1].Resolve(cs) if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14 8]}"; got != want { t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) } // This is the custom retry setting. - c.CallOptions.CreateSession[2].Resolve(cs) + c.CallOptions().CreateSession[2].Resolve(cs) if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{200000000 30000000000 1.25 0} [14 4]}"; got != want { t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) } diff --git a/spanner/go.mod b/spanner/go.mod index 1d48bcbdd9fe..4650163d8677 100644 --- a/spanner/go.mod +++ b/spanner/go.mod @@ -6,12 +6,16 @@ require ( cloud.google.com/go v0.115.1 cloud.google.com/go/iam v1.2.1 cloud.google.com/go/longrunning v0.6.1 + cloud.google.com/go/monitoring v1.21.0 github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 github.com/googleapis/gax-go/v2 v2.13.0 go.opencensus.io v0.24.0 + go.opentelemetry.io/contrib/detectors/gcp v1.29.0 go.opentelemetry.io/otel v1.29.0 go.opentelemetry.io/otel/metric v1.29.0 + go.opentelemetry.io/otel/sdk/metric v1.29.0 golang.org/x/oauth2 v0.23.0 golang.org/x/sync v0.8.0 google.golang.org/api v0.197.0 @@ -27,6 +31,7 @@ require ( cloud.google.com/go/auth v0.9.3 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect cloud.google.com/go/compute/metadata v0.5.0 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20240822171458-6449f94b4d59 // indirect @@ -37,7 +42,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/s2a-go v0.1.8 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect diff --git a/spanner/go.sum b/spanner/go.sum index 5e90f5155951..1c721d84bd9c 100644 --- a/spanner/go.sum +++ b/spanner/go.sum @@ -378,6 +378,8 @@ cloud.google.com/go/monitoring v1.7.0/go.mod h1:HpYse6kkGo//7p6sT0wsIC6IBDET0RhI cloud.google.com/go/monitoring v1.8.0/go.mod h1:E7PtoMJ1kQXWxPjB6mv2fhC5/15jInuulFdYYtlcvT4= cloud.google.com/go/monitoring v1.12.0/go.mod h1:yx8Jj2fZNEkL/GYZyTLS4ZtZEZN8WtDEiEqG4kLK50w= cloud.google.com/go/monitoring v1.13.0/go.mod h1:k2yMBAB1H9JT/QETjNkgdCGD9bPF712XiLTVr+cBrpw= +cloud.google.com/go/monitoring v1.21.0 h1:EMc0tB+d3lUewT2NzKC/hr8cSR9WsUieVywzIHetGro= +cloud.google.com/go/monitoring v1.21.0/go.mod h1:tuJ+KNDdJbetSsbSGTqnaBvbauS5kr3Q/koy3Up6r+4= cloud.google.com/go/networkconnectivity v1.4.0/go.mod h1:nOl7YL8odKyAOtzNX73/M5/mGZgqqMeryi6UPZTk/rA= cloud.google.com/go/networkconnectivity v1.5.0/go.mod h1:3GzqJx7uhtlM3kln0+x5wyFvuVH1pIBJjhCpjzSt75o= cloud.google.com/go/networkconnectivity v1.6.0/go.mod h1:OJOoEXW+0LAxHh89nXd64uGG+FbQoeH8DtxCHVOMlaM= @@ -613,6 +615,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 h1:oVLqHXhnYtUwM89y9T1fXGaK9wTkXHgNp8/ZNMQzUxE= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 h1:pB2F2JKCj1Znmp2rwxxt1J0Fg0wezTMgWYk5Mpbi1kg= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1/go.mod h1:itPGVDKf9cC/ov4MdvJ2QZ0khw4bfoo9jzwTJlaxy2k= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= @@ -903,6 +907,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/detectors/gcp v1.29.0 h1:TiaiXB4DpGD3sdzNlYQxruQngn5Apwzi1X0DRhuGvDQ= +go.opentelemetry.io/contrib/detectors/gcp v1.29.0/go.mod h1:GW2aWZNwR2ZxDLdv8OyC2G8zkRoQBuURgV7RPQgcPoU= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= @@ -913,6 +919,8 @@ go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2 go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/sdk/metric v1.29.0 h1:K2CfmJohnRgvZ9UAj2/FhIf/okdWcNdBwe1m8xFXiSY= +go.opentelemetry.io/otel/sdk/metric v1.29.0/go.mod h1:6zZLdCl2fkauYoZIOn/soQIDSWFmNSRcICarHfuhNJQ= go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= diff --git a/spanner/grpc_client.go b/spanner/grpc_client.go new file mode 100644 index 000000000000..9b7f1bca4ca6 --- /dev/null +++ b/spanner/grpc_client.go @@ -0,0 +1,251 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spanner + +import ( + "context" + "strings" + + vkit "cloud.google.com/go/spanner/apiv1" + "cloud.google.com/go/spanner/apiv1/spannerpb" + "cloud.google.com/go/spanner/internal" + "github.com/googleapis/gax-go/v2" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" +) + +const ( + directPathIPV6Prefix = "[2001:4860:8040" + directPathIPV4Prefix = "34.126" +) + +type contextKey string + +const metricsTracerKey contextKey = "metricsTracer" + +// spannerClient is an interface that defines the methods available from Cloud Spanner API. +type spannerClient interface { + CallOptions() *vkit.CallOptions + Close() error + Connection() *grpc.ClientConn + CreateSession(context.Context, *spannerpb.CreateSessionRequest, ...gax.CallOption) (*spannerpb.Session, error) + BatchCreateSessions(context.Context, *spannerpb.BatchCreateSessionsRequest, ...gax.CallOption) (*spannerpb.BatchCreateSessionsResponse, error) + GetSession(context.Context, *spannerpb.GetSessionRequest, ...gax.CallOption) (*spannerpb.Session, error) + ListSessions(context.Context, *spannerpb.ListSessionsRequest, ...gax.CallOption) *vkit.SessionIterator + DeleteSession(context.Context, *spannerpb.DeleteSessionRequest, ...gax.CallOption) error + ExecuteSql(context.Context, *spannerpb.ExecuteSqlRequest, ...gax.CallOption) (*spannerpb.ResultSet, error) + ExecuteStreamingSql(context.Context, *spannerpb.ExecuteSqlRequest, ...gax.CallOption) (spannerpb.Spanner_ExecuteStreamingSqlClient, error) + ExecuteBatchDml(context.Context, *spannerpb.ExecuteBatchDmlRequest, ...gax.CallOption) (*spannerpb.ExecuteBatchDmlResponse, error) + Read(context.Context, *spannerpb.ReadRequest, ...gax.CallOption) (*spannerpb.ResultSet, error) + StreamingRead(context.Context, *spannerpb.ReadRequest, ...gax.CallOption) (spannerpb.Spanner_StreamingReadClient, error) + BeginTransaction(context.Context, *spannerpb.BeginTransactionRequest, ...gax.CallOption) (*spannerpb.Transaction, error) + Commit(context.Context, *spannerpb.CommitRequest, ...gax.CallOption) (*spannerpb.CommitResponse, error) + Rollback(context.Context, *spannerpb.RollbackRequest, ...gax.CallOption) error + PartitionQuery(context.Context, *spannerpb.PartitionQueryRequest, ...gax.CallOption) (*spannerpb.PartitionResponse, error) + PartitionRead(context.Context, *spannerpb.PartitionReadRequest, ...gax.CallOption) (*spannerpb.PartitionResponse, error) + BatchWrite(context.Context, *spannerpb.BatchWriteRequest, ...gax.CallOption) (spannerpb.Spanner_BatchWriteClient, error) +} + +// grpcSpannerClient is the gRPC API implementation of the transport-agnostic +// spannerClient interface. +type grpcSpannerClient struct { + raw *vkit.Client + metricsTracerFactory *builtinMetricsTracerFactory +} + +var ( + // Ensure that grpcSpannerClient implements spannerClient. + _ spannerClient = (*grpcSpannerClient)(nil) +) + +// newGRPCSpannerClient initializes a new spannerClient that uses the gRPC +// Spanner API. +func newGRPCSpannerClient(ctx context.Context, sc *sessionClient, opts ...option.ClientOption) (spannerClient, error) { + raw, err := vkit.NewClient(ctx, opts...) + if err != nil { + return nil, err + } + + g := &grpcSpannerClient{raw: raw, metricsTracerFactory: sc.metricsTracerFactory} + clientInfo := []string{"gccl", internal.Version} + if sc.userAgent != "" { + agentWithVersion := strings.SplitN(sc.userAgent, "/", 2) + if len(agentWithVersion) == 2 { + clientInfo = append(clientInfo, agentWithVersion[0], agentWithVersion[1]) + } + } + raw.SetGoogleClientInfo(clientInfo...) + if sc.callOptions != nil { + raw.CallOptions = mergeCallOptions(raw.CallOptions, sc.callOptions) + } + return g, nil +} + +func (g *grpcSpannerClient) newBuiltinMetricsTracer(ctx context.Context) *builtinMetricsTracer { + mt := g.metricsTracerFactory.createBuiltinMetricsTracer(ctx) + return &mt +} + +func (g *grpcSpannerClient) CallOptions() *vkit.CallOptions { + return g.raw.CallOptions +} + +func (g *grpcSpannerClient) Close() error { + return g.raw.Close() +} + +func (g *grpcSpannerClient) Connection() *grpc.ClientConn { + return g.raw.Connection() +} + +func (g *grpcSpannerClient) CreateSession(ctx context.Context, req *spannerpb.CreateSessionRequest, opts ...gax.CallOption) (*spannerpb.Session, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.CreateSession(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) BatchCreateSessions(ctx context.Context, req *spannerpb.BatchCreateSessionsRequest, opts ...gax.CallOption) (*spannerpb.BatchCreateSessionsResponse, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.BatchCreateSessions(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) GetSession(ctx context.Context, req *spannerpb.GetSessionRequest, opts ...gax.CallOption) (*spannerpb.Session, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.GetSession(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) ListSessions(ctx context.Context, req *spannerpb.ListSessionsRequest, opts ...gax.CallOption) *vkit.SessionIterator { + return g.raw.ListSessions(ctx, req, opts...) +} + +func (g *grpcSpannerClient) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest, opts ...gax.CallOption) error { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + err := g.raw.DeleteSession(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return err +} + +func (g *grpcSpannerClient) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (*spannerpb.ResultSet, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.ExecuteSql(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) ExecuteStreamingSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (spannerpb.Spanner_ExecuteStreamingSqlClient, error) { + return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, opts...) +} + +func (g *grpcSpannerClient) ExecuteBatchDml(ctx context.Context, req *spannerpb.ExecuteBatchDmlRequest, opts ...gax.CallOption) (*spannerpb.ExecuteBatchDmlResponse, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.ExecuteBatchDml(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) Read(ctx context.Context, req *spannerpb.ReadRequest, opts ...gax.CallOption) (*spannerpb.ResultSet, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.Read(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) StreamingRead(ctx context.Context, req *spannerpb.ReadRequest, opts ...gax.CallOption) (spannerpb.Spanner_StreamingReadClient, error) { + return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, opts...) +} + +func (g *grpcSpannerClient) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest, opts ...gax.CallOption) (*spannerpb.Transaction, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.BeginTransaction(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) Commit(ctx context.Context, req *spannerpb.CommitRequest, opts ...gax.CallOption) (*spannerpb.CommitResponse, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.Commit(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) Rollback(ctx context.Context, req *spannerpb.RollbackRequest, opts ...gax.CallOption) error { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + err := g.raw.Rollback(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return err +} + +func (g *grpcSpannerClient) PartitionQuery(ctx context.Context, req *spannerpb.PartitionQueryRequest, opts ...gax.CallOption) (*spannerpb.PartitionResponse, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.PartitionQuery(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) PartitionRead(ctx context.Context, req *spannerpb.PartitionReadRequest, opts ...gax.CallOption) (*spannerpb.PartitionResponse, error) { + mt := g.newBuiltinMetricsTracer(ctx) + defer recordOperationCompletion(mt) + ctx = context.WithValue(ctx, metricsTracerKey, mt) + resp, err := g.raw.PartitionRead(ctx, req, opts...) + statusCode, _ := status.FromError(err) + mt.currOp.setStatus(statusCode.Code().String()) + return resp, err +} + +func (g *grpcSpannerClient) BatchWrite(ctx context.Context, req *spannerpb.BatchWriteRequest, opts ...gax.CallOption) (spannerpb.Spanner_BatchWriteClient, error) { + return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, opts...) +} diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 71c325fb72c0..afcd835de881 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -63,9 +63,6 @@ import ( ) const ( - directPathIPV6Prefix = "[2001:4860:8040" - directPathIPV4Prefix = "34.126" - singerDDLStatements = "SINGER_DDL_STATEMENTS" simpleDDLStatements = "SIMPLE_DDL_STATEMENTS" readDDLStatements = "READ_DDL_STATEMENTS" diff --git a/spanner/metric_monitoring_exporter_test.go b/spanner/metric_monitoring_exporter_test.go new file mode 100644 index 000000000000..6dea3eff1b08 --- /dev/null +++ b/spanner/metric_monitoring_exporter_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spanner + +import ( + "context" + "net" + "strings" + "sync" + "time" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + metricpb "google.golang.org/genproto/googleapis/api/metric" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" +) + +type MetricsTestServer struct { + lis net.Listener + srv *grpc.Server + Endpoint string + userAgent string + createMetricDescriptorReqs []*monitoringpb.CreateMetricDescriptorRequest + createServiceTimeSeriesReqs []*monitoringpb.CreateTimeSeriesRequest + RetryCount int + mu sync.Mutex +} + +func (m *MetricsTestServer) Shutdown() { + // this will close mts.lis + m.srv.GracefulStop() +} + +// Pops out the UserAgent from the most recent CreateTimeSeriesRequests or CreateServiceTimeSeriesRequests. +func (m *MetricsTestServer) UserAgent() string { + m.mu.Lock() + defer m.mu.Unlock() + ua := m.userAgent + m.userAgent = "" + return ua +} + +// Pops out the CreateServiceTimeSeriesRequests which the test server has received so far. +func (m *MetricsTestServer) CreateServiceTimeSeriesRequests() []*monitoringpb.CreateTimeSeriesRequest { + m.mu.Lock() + defer m.mu.Unlock() + reqs := m.createServiceTimeSeriesReqs + m.createServiceTimeSeriesReqs = nil + return reqs +} + +func (m *MetricsTestServer) appendCreateMetricDescriptorReq(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest) { + m.mu.Lock() + defer m.mu.Unlock() + m.createMetricDescriptorReqs = append(m.createMetricDescriptorReqs, req) +} + +func (m *MetricsTestServer) appendCreateServiceTimeSeriesReq(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) { + m.mu.Lock() + defer m.mu.Unlock() + m.createServiceTimeSeriesReqs = append(m.createServiceTimeSeriesReqs, req) + if md, ok := metadata.FromIncomingContext(ctx); ok { + m.userAgent = strings.Join(md.Get("User-Agent"), ";") + } +} + +func (m *MetricsTestServer) Serve() error { + return m.srv.Serve(m.lis) +} + +type fakeMetricServiceServer struct { + monitoringpb.UnimplementedMetricServiceServer + metricsTestServer *MetricsTestServer +} + +func (f *fakeMetricServiceServer) CreateServiceTimeSeries( + ctx context.Context, + req *monitoringpb.CreateTimeSeriesRequest, +) (*emptypb.Empty, error) { + f.metricsTestServer.appendCreateServiceTimeSeriesReq(ctx, req) + return &emptypb.Empty{}, nil +} + +func (f *fakeMetricServiceServer) CreateMetricDescriptor( + ctx context.Context, + req *monitoringpb.CreateMetricDescriptorRequest, +) (*metricpb.MetricDescriptor, error) { + f.metricsTestServer.appendCreateMetricDescriptorReq(ctx, req) + return &metricpb.MetricDescriptor{}, nil +} + +func NewMetricTestServer() (*MetricsTestServer, error) { + srv := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{Time: 5 * time.Minute})) + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, err + } + testServer := &MetricsTestServer{ + Endpoint: lis.Addr().String(), + lis: lis, + srv: srv, + } + + monitoringpb.RegisterMetricServiceServer( + srv, + &fakeMetricServiceServer{metricsTestServer: testServer}, + ) + + return testServer, nil +} diff --git a/spanner/metrics.go b/spanner/metrics.go new file mode 100644 index 000000000000..30c369bf0838 --- /dev/null +++ b/spanner/metrics.go @@ -0,0 +1,473 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spanner + +import ( + "context" + "errors" + "fmt" + "strings" + + "log" + "os" + "strconv" + "time" + + "github.com/google/uuid" + "go.opentelemetry.io/contrib/detectors/gcp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + "google.golang.org/api/option" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "cloud.google.com/go/spanner/internal" +) + +const ( + builtInMetricsMeterName = "gax-go" + + nativeMetricsPrefix = "spanner.googleapis.com/internal/client/" + + // Monitored resource labels + monitoredResLabelKeyProject = "project_id" + monitoredResLabelKeyInstance = "instance_id" + monitoredResLabelKeyInstanceConfig = "instance_config" + monitoredResLabelKeyLocation = "location" + monitoredResLabelKeyClientHash = "client_hash" + + // Metric labels + metricLabelKeyClientUID = "client_uid" + metricLabelKeyClientName = "client_name" + metricLabelKeyDatabase = "database" + metricLabelKeyMethod = "method" + metricLabelKeyStatus = "status" + metricLabelKeyDirectPathEnabled = "directpath_enabled" + metricLabelKeyDirectPathUsed = "directpath_used" + + // Metric names + metricNameOperationLatencies = "operation_latencies" + metricNameAttemptLatencies = "attempt_latencies" + metricNameOperationCount = "operation_count" + metricNameAttemptCount = "attempt_count" + + // Metric units + metricUnitMS = "ms" + metricUnitCount = "1" +) + +// These are effectively const, but for testing purposes they are mutable +var ( + // duration between two metric exports + defaultSamplePeriod = 1 * time.Minute + + clientName = fmt.Sprintf("spanner-go/%v", internal.Version) + + bucketBounds = []float64{0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, + 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, + 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, + 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, + 400000.0, 800000.0, 1600000.0, 3200000.0} + + // All the built-in metrics have same attributes except 'status' and 'streaming' + // These attributes need to be added to only few of the metrics + metricsDetails = map[string]metricInfo{ + metricNameOperationCount: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: false, + }, + metricNameOperationLatencies: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: false, + }, + metricNameAttemptLatencies: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: true, + }, + metricNameAttemptCount: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: true, + }, + } + + // Generates unique client ID in the format go-@ + generateClientUID = func() (string, error) { + hostname := "localhost" + hostname, err := os.Hostname() + if err != nil { + return "", err + } + return uuid.NewString() + "@" + strconv.FormatInt(int64(os.Getpid()), 10) + "@" + hostname, nil + } + + exporterOpts = []option.ClientOption{} +) + +type metricInfo struct { + additionalAttrs []string + recordedPerAttempt bool +} + +// builtinMetricsTracerFactory is responsible for creating and managing metrics tracers. +type builtinMetricsTracerFactory struct { + enabled bool // Indicates if metrics tracing is enabled. + isDirectPathEnabled bool // Indicates if DirectPath is enabled. + + // shutdown is a function to be called on client close to clean up resources. + shutdown func() + + // clientAttributes are attributes specific to a client instance that do not change across different function calls on the client. + clientAttributes []attribute.KeyValue + + // Metrics instruments + operationLatencies metric.Float64Histogram // Histogram for operation latencies. + attemptLatencies metric.Float64Histogram // Histogram for attempt latencies. + operationCount metric.Int64Counter // Counter for the number of operations. + attemptCount metric.Int64Counter // Counter for the number of attempts. +} + +func detectClientLocation(ctx context.Context) string { + resource, err := gcp.NewDetector().Detect(ctx) + if err != nil { + return "global" + } + for _, attr := range resource.Attributes() { + if attr.Key == semconv.CloudRegionKey { + return attr.Value.AsString() + } + } + // If region is not found, return global + return "global" +} + +func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider) (*builtinMetricsTracerFactory, error) { + clientUID, err := generateClientUID() + if err != nil { + log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID) + } + project, instance, database, err := parseDatabaseName(dbpath) + if err != nil { + return nil, err + } + + tracerFactory := &builtinMetricsTracerFactory{ + enabled: false, + clientAttributes: []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyProject, project), + attribute.String(monitoredResLabelKeyInstance, instance), + attribute.String(metricLabelKeyDatabase, database), + attribute.String(metricLabelKeyClientUID, clientUID), + attribute.String(metricLabelKeyClientName, clientName), + attribute.String(monitoredResLabelKeyClientHash, "cloud_spanner_client_raw_metrics"), + // Skipping instance config until we have a way to get it + attribute.String(monitoredResLabelKeyInstanceConfig, "unknown"), + attribute.String(monitoredResLabelKeyLocation, detectClientLocation(ctx)), + }, + shutdown: func() {}, + } + + tracerFactory.isDirectPathEnabled = false + tracerFactory.enabled = false + var meterProvider *sdkmetric.MeterProvider + if metricsProvider == nil { + // Create default meter provider + mpOptions, err := builtInMeterProviderOptions(project) + if err != nil { + return tracerFactory, err + } + meterProvider = sdkmetric.NewMeterProvider(mpOptions...) + + tracerFactory.enabled = true + tracerFactory.shutdown = func() { meterProvider.Shutdown(ctx) } + } else { + switch metricsProvider.(type) { + case noop.MeterProvider: + return tracerFactory, nil + default: + return tracerFactory, errors.New("unknown MetricsProvider type") + } + } + + // Create meter and instruments + meter := meterProvider.Meter(builtInMetricsMeterName, metric.WithInstrumentationVersion(internal.Version)) + err = tracerFactory.createInstruments(meter) + return tracerFactory, err +} + +func builtInMeterProviderOptions(project string) ([]sdkmetric.Option, error) { + defaultExporter, err := newMonitoringExporter(context.Background(), project, exporterOpts...) + if err != nil { + return nil, err + } + + return []sdkmetric.Option{sdkmetric.WithReader( + sdkmetric.NewPeriodicReader( + defaultExporter, + sdkmetric.WithInterval(defaultSamplePeriod), + ), + )}, nil +} + +func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) error { + var err error + + // Create operation_latencies + tf.operationLatencies, err = meter.Float64Histogram( + nativeMetricsPrefix+metricNameOperationLatencies, + metric.WithDescription("Total time until final operation success or failure, including retries and backoff."), + metric.WithUnit(metricUnitMS), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + if err != nil { + return err + } + + // Create attempt_latencies + tf.attemptLatencies, err = meter.Float64Histogram( + nativeMetricsPrefix+metricNameAttemptLatencies, + metric.WithDescription("Client observed latency per RPC attempt."), + metric.WithUnit(metricUnitMS), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + if err != nil { + return err + } + + // Create operation_count + tf.operationCount, err = meter.Int64Counter( + nativeMetricsPrefix+metricNameOperationCount, + metric.WithDescription("The count of database operations."), + metric.WithUnit(metricUnitCount), + ) + + // Create attempt_count + tf.attemptCount, err = meter.Int64Counter( + nativeMetricsPrefix+metricNameAttemptCount, + metric.WithDescription("The number of attempts made for the operation, including the initial attempt."), + metric.WithUnit(metricUnitCount), + ) + return err +} + +// builtinMetricsTracer is created one per operation. +// It is used to store metric instruments, attribute values, and other data required to obtain and record them. +type builtinMetricsTracer struct { + ctx context.Context // Context for the tracer. + builtInEnabled bool // Indicates if built-in metrics are enabled. + + // clientAttributes are attributes specific to a client instance that do not change across different operations on the client. + clientAttributes []attribute.KeyValue + + // Metrics instruments + instrumentOperationLatencies metric.Float64Histogram // Histogram for operation latencies. + instrumentAttemptLatencies metric.Float64Histogram // Histogram for attempt latencies. + instrumentOperationCount metric.Int64Counter // Counter for the number of operations. + instrumentAttemptCount metric.Int64Counter // Counter for the number of attempts. + + method string // The method being traced. + + currOp *opTracer // The current operation tracer. +} + +// opTracer is used to record metrics for the entire operation, including retries. +// An operation is a logical unit that represents a single method invocation on the client. +// The method might require multiple attempts/RPCs and backoff logic to complete. +type opTracer struct { + attemptCount int64 // The number of attempts made for the operation. + startTime time.Time // The start time of the operation. + + // status is the gRPC status code of the last completed attempt. + status string + + directPathEnabled bool // Indicates if DirectPath is enabled for the operation. + + currAttempt *attemptTracer // The current attempt tracer. +} + +// attemptTracer is used to record metrics for a single attempt within an operation. +type attemptTracer struct { + startTime time.Time // The start time of the attempt. + status string // The gRPC status code of the attempt. + + directPathUsed bool // Indicates if DirectPath was used for the attempt. +} + +// setStartTime sets the start time for the operation. +func (o *opTracer) setStartTime(t time.Time) { + o.startTime = t +} + +// setStartTime sets the start time for the attempt. +func (a *attemptTracer) setStartTime(t time.Time) { + a.startTime = t +} + +// setStatus sets the status for the operation. +func (o *opTracer) setStatus(s string) { + o.status = s +} + +// setStatus sets the status for the attempt. +func (a *attemptTracer) setStatus(s string) { + a.status = s +} + +// incrementAttemptCount increments the attempt count for the operation. +func (o *opTracer) incrementAttemptCount() { + o.attemptCount++ +} + +// setDirectPathUsed sets whether DirectPath was used for the attempt. +func (a *attemptTracer) setDirectPathUsed(used bool) { + a.directPathUsed = used +} + +// setDirectPathEnabled sets whether DirectPath is enabled for the operation. +func (o *opTracer) setDirectPathEnabled(enabled bool) { + o.directPathEnabled = enabled +} + +func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Context) builtinMetricsTracer { + // Operation has started but not the attempt. + // So, create only operation tracer and not attempt tracer + currOpTracer := opTracer{} + currOpTracer.setStartTime(time.Now()) + currOpTracer.setDirectPathEnabled(tf.isDirectPathEnabled) + + return builtinMetricsTracer{ + ctx: ctx, + builtInEnabled: tf.enabled, + + currOp: &currOpTracer, + clientAttributes: tf.clientAttributes, + + instrumentOperationLatencies: tf.operationLatencies, + instrumentAttemptLatencies: tf.attemptLatencies, + instrumentOperationCount: tf.operationCount, + instrumentAttemptCount: tf.attemptCount, + } +} + +// toOtelMetricAttrs: +// - converts metric attributes values captured throughout the operation / attempt +// to OpenTelemetry attributes format, +// - combines these with common client attributes and returns +func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) ([]attribute.KeyValue, error) { + // Create attribute key value pairs for attributes common to all metricss + attrKeyValues := []attribute.KeyValue{ + attribute.String(metricLabelKeyMethod, strings.ReplaceAll(strings.TrimPrefix(mt.method, "/google.spanner.v1."), "/", ".")), + } + attrKeyValues = append(attrKeyValues, mt.clientAttributes...) + + // Get metric details + mDetails, found := metricsDetails[metricName] + if !found { + return attrKeyValues, fmt.Errorf("unable to create attributes list for unknown metric: %v", metricName) + } + attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyDirectPathEnabled, strconv.FormatBool(mt.currOp.directPathEnabled))) + attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyDirectPathUsed, strconv.FormatBool(mt.currOp.currAttempt.directPathUsed))) + + rpcStatus := mt.currOp.status + if mDetails.recordedPerAttempt { + rpcStatus = mt.currOp.currAttempt.status + } + + // Add additional attributes to metrics + for _, attrKey := range mDetails.additionalAttrs { + switch attrKey { + case metricLabelKeyStatus: + attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyStatus, rpcStatus)) + default: + return attrKeyValues, fmt.Errorf("unknown additional attribute: %v", attrKey) + } + } + + return attrKeyValues, nil +} + +// Convert error to grpc status error +func convertToGrpcStatusErr(err error) (codes.Code, error) { + if err == nil { + return codes.OK, nil + } + + if errStatus, ok := status.FromError(err); ok { + return errStatus.Code(), status.Error(errStatus.Code(), errStatus.Message()) + } + + ctxStatus := status.FromContextError(err) + if ctxStatus.Code() != codes.Unknown { + return ctxStatus.Code(), status.Error(ctxStatus.Code(), ctxStatus.Message()) + } + + return codes.Unknown, err +} + +// recordAttemptCompletion records as many attempt specific metrics as it can +// Ignore errors seen while creating metric attributes since metric can still +// be recorded with rest of the attributes +func recordAttemptCompletion(mt *builtinMetricsTracer) { + if !mt.builtInEnabled { + return + } + + // Calculate elapsed time + elapsedTime := convertToMs(time.Since(mt.currOp.currAttempt.startTime)) + + // Record attempt_latencies + attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies) + mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...)) +} + +// recordOperationCompletion records as many operation specific metrics as it can +// Ignores error seen while creating metric attributes since metric can still +// be recorded with rest of the attributes +func recordOperationCompletion(mt *builtinMetricsTracer) { + if !mt.builtInEnabled { + return + } + + // Calculate elapsed time + elapsedTimeMs := convertToMs(time.Since(mt.currOp.startTime)) + + // Record operation_count + opCntAttrs, _ := mt.toOtelMetricAttrs(metricNameOperationCount) + mt.instrumentOperationCount.Add(mt.ctx, 1, metric.WithAttributes(opCntAttrs...)) + + // Record operation_latencies + opLatAttrs, _ := mt.toOtelMetricAttrs(metricNameOperationLatencies) + mt.instrumentOperationLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(opLatAttrs...)) + + // Record attempt_count + attemptCntAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptCount) + mt.instrumentAttemptCount.Add(mt.ctx, mt.currOp.attemptCount, metric.WithAttributes(attemptCntAttrs...)) +} + +func convertToMs(d time.Duration) float64 { + return float64(d.Nanoseconds()) / float64(time.Millisecond) +} diff --git a/spanner/metrics_monitoring_exporter.go b/spanner/metrics_monitoring_exporter.go new file mode 100644 index 000000000000..61e9a704ec61 --- /dev/null +++ b/spanner/metrics_monitoring_exporter.go @@ -0,0 +1,346 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This is a modified version of https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/exporter/metric/v0.46.0/exporter/metric/metric.go + +package spanner + +import ( + "context" + "errors" + "fmt" + "math" + "reflect" + "sync" + "time" + + monitoring "cloud.google.com/go/monitoring/apiv3/v2" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/api/option" + "google.golang.org/genproto/googleapis/api/distribution" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + spannerResourceType = "spanner_instance_client" + + // The number of timeserieses to send to Google Cloud Monitoring in a single request. This + // is a hard limit in the GCM API, so we never want to exceed 200. + // https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create + sendBatchSize = 200 +) + +var ( + monitoredResLabelsSet = map[string]bool{ + monitoredResLabelKeyProject: true, + monitoredResLabelKeyInstance: true, + monitoredResLabelKeyInstanceConfig: true, + monitoredResLabelKeyLocation: true, + monitoredResLabelKeyClientHash: true, + } + + errShutdown = fmt.Errorf("exporter is shutdown") +) + +type errUnexpectedAggregationKind struct { + kind string +} + +func (e errUnexpectedAggregationKind) Error() string { + return fmt.Sprintf("the metric kind is unexpected: %v", e.kind) +} + +// monitoringExporter is the implementation of OpenTelemetry metric exporter for +// Google Cloud Monitoring. +// Default exporter for built-in metrics +type monitoringExporter struct { + shutdown chan struct{} + client *monitoring.MetricClient + shutdownOnce sync.Once + projectID string +} + +func newMonitoringExporter(ctx context.Context, project string, opts ...option.ClientOption) (*monitoringExporter, error) { + client, err := monitoring.NewMetricClient(ctx, opts...) + if err != nil { + return nil, err + } + return &monitoringExporter{ + client: client, + shutdown: make(chan struct{}), + projectID: project, + }, nil +} + +// ForceFlush does nothing, the exporter holds no state. +func (e *monitoringExporter) ForceFlush(ctx context.Context) error { return ctx.Err() } + +// Shutdown shuts down the client connections. +func (e *monitoringExporter) Shutdown(ctx context.Context) error { + err := errShutdown + e.shutdownOnce.Do(func() { + close(e.shutdown) + err = errors.Join(ctx.Err(), e.client.Close()) + }) + return err +} + +// Export exports OpenTelemetry Metrics to Google Cloud Monitoring. +func (me *monitoringExporter) Export(ctx context.Context, rm *otelmetricdata.ResourceMetrics) error { + select { + case <-me.shutdown: + return errShutdown + default: + } + + return me.exportTimeSeries(ctx, rm) +} + +// Temporality returns the Temporality to use for an instrument kind. +func (me *monitoringExporter) Temporality(ik otelmetric.InstrumentKind) otelmetricdata.Temporality { + return otelmetricdata.CumulativeTemporality +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (me *monitoringExporter) Aggregation(ik otelmetric.InstrumentKind) otelmetric.Aggregation { + return otelmetric.DefaultAggregationSelector(ik) +} + +// exportTimeSeries create TimeSeries from the records in cps. +// res should be the common resource among all TimeSeries, such as instance id, application name and so on. +func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetricdata.ResourceMetrics) error { + tss, err := me.recordsToTimeSeriesPbs(rm) + if len(tss) == 0 { + return err + } + + name := fmt.Sprintf("projects/%s", me.projectID) + + errs := []error{err} + for i := 0; i < len(tss); i += sendBatchSize { + j := i + sendBatchSize + if j >= len(tss) { + j = len(tss) + } + + req := &monitoringpb.CreateTimeSeriesRequest{ + Name: name, + TimeSeries: tss[i:j], + } + errs = append(errs, me.client.CreateServiceTimeSeries(ctx, req)) + } + + return errors.Join(errs...) +} + +// recordToMetricAndMonitoredResourcePbs converts data from records to Metric and Monitored resource proto type for Cloud Monitoring. +func (me *monitoringExporter) recordToMetricAndMonitoredResourcePbs(metrics otelmetricdata.Metrics, attributes attribute.Set) (*googlemetricpb.Metric, *monitoredrespb.MonitoredResource) { + mr := &monitoredrespb.MonitoredResource{ + Type: spannerResourceType, + Labels: map[string]string{}, + } + labels := make(map[string]string) + addAttributes := func(attr *attribute.Set) { + iter := attr.Iter() + for iter.Next() { + kv := iter.Attribute() + labelKey := string(kv.Key) + + if _, isResLabel := monitoredResLabelsSet[labelKey]; isResLabel { + // Add labels to monitored resource + mr.Labels[labelKey] = kv.Value.Emit() + } else { + // Add labels to metric + labels[labelKey] = kv.Value.Emit() + + } + } + } + addAttributes(&attributes) + return &googlemetricpb.Metric{ + Type: metrics.Name, + Labels: labels, + }, mr +} + +func (me *monitoringExporter) recordsToTimeSeriesPbs(rm *otelmetricdata.ResourceMetrics) ([]*monitoringpb.TimeSeries, error) { + var ( + tss []*monitoringpb.TimeSeries + errs []error + ) + for _, scope := range rm.ScopeMetrics { + if scope.Scope.Name != builtInMetricsMeterName { + // Filter out metric data for instruments that are not part of the spanner builtin metrics + continue + } + for _, metrics := range scope.Metrics { + ts, err := me.recordToTimeSeriesPb(metrics) + errs = append(errs, err) + tss = append(tss, ts...) + } + } + + return tss, errors.Join(errs...) +} + +// recordToTimeSeriesPb converts record to TimeSeries proto type with common resource. +// ref. https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries +func (me *monitoringExporter) recordToTimeSeriesPb(m otelmetricdata.Metrics) ([]*monitoringpb.TimeSeries, error) { + var tss []*monitoringpb.TimeSeries + var errs []error + if m.Data == nil { + return nil, nil + } + switch a := m.Data.(type) { + case otelmetricdata.Histogram[float64]: + for _, point := range a.DataPoints { + metric, mr := me.recordToMetricAndMonitoredResourcePbs(m, point.Attributes) + ts, err := histogramToTimeSeries(point, m, mr) + if err != nil { + errs = append(errs, err) + continue + } + ts.Metric = metric + tss = append(tss, ts) + } + case otelmetricdata.Sum[int64]: + for _, point := range a.DataPoints { + metric, mr := me.recordToMetricAndMonitoredResourcePbs(m, point.Attributes) + var ts *monitoringpb.TimeSeries + var err error + ts, err = sumToTimeSeries[int64](point, m, mr) + if err != nil { + errs = append(errs, err) + continue + } + ts.Metric = metric + tss = append(tss, ts) + } + default: + errs = append(errs, errUnexpectedAggregationKind{kind: reflect.TypeOf(m.Data).String()}) + } + return tss, errors.Join(errs...) +} + +func sumToTimeSeries[N int64 | float64](point otelmetricdata.DataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { + interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) + if err != nil { + return nil, err + } + value, valueType := numberDataPointToValue[N](point) + return &monitoringpb.TimeSeries{ + Resource: mr, + Unit: string(metrics.Unit), + MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + ValueType: valueType, + Points: []*monitoringpb.Point{{ + Interval: interval, + Value: value, + }}, + }, nil +} + +func histogramToTimeSeries[N int64 | float64](point otelmetricdata.HistogramDataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { + interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) + if err != nil { + return nil, err + } + distributionValue := histToDistribution(point) + return &monitoringpb.TimeSeries{ + Resource: mr, + Unit: string(metrics.Unit), + MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + ValueType: googlemetricpb.MetricDescriptor_DISTRIBUTION, + Points: []*monitoringpb.Point{{ + Interval: interval, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: distributionValue, + }, + }, + }}, + }, nil +} + +func toNonemptyTimeIntervalpb(start, end time.Time) (*monitoringpb.TimeInterval, error) { + // The end time of a new interval must be at least a millisecond after the end time of the + // previous interval, for all non-gauge types. + // https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#timeinterval + if end.Sub(start).Milliseconds() <= 1 { + end = start.Add(time.Millisecond) + } + startpb := timestamppb.New(start) + endpb := timestamppb.New(end) + err := errors.Join( + startpb.CheckValid(), + endpb.CheckValid(), + ) + if err != nil { + return nil, err + } + + return &monitoringpb.TimeInterval{ + StartTime: startpb, + EndTime: endpb, + }, nil +} + +func histToDistribution[N int64 | float64](hist otelmetricdata.HistogramDataPoint[N]) *distribution.Distribution { + counts := make([]int64, len(hist.BucketCounts)) + for i, v := range hist.BucketCounts { + counts[i] = int64(v) + } + var mean float64 + if !math.IsNaN(float64(hist.Sum)) && hist.Count > 0 { // Avoid divide-by-zero + mean = float64(hist.Sum) / float64(hist.Count) + } + return &distribution.Distribution{ + Count: int64(hist.Count), + Mean: mean, + BucketCounts: counts, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ + Bounds: hist.Bounds, + }, + }, + }, + } +} + +func numberDataPointToValue[N int64 | float64]( + point otelmetricdata.DataPoint[N], +) (*monitoringpb.TypedValue, googlemetricpb.MetricDescriptor_ValueType) { + switch v := any(point.Value).(type) { + case int64: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: v, + }}, + googlemetricpb.MetricDescriptor_INT64 + case float64: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: v, + }}, + googlemetricpb.MetricDescriptor_DOUBLE + } + // It is impossible to reach this statement + return nil, googlemetricpb.MetricDescriptor_INT64 +} diff --git a/spanner/metrics_test.go b/spanner/metrics_test.go new file mode 100644 index 000000000000..fac8c8ed9601 --- /dev/null +++ b/spanner/metrics_test.go @@ -0,0 +1,237 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spanner + +import ( + "context" + "os" + "sort" + "testing" + + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + "go.opentelemetry.io/otel/attribute" + "google.golang.org/api/option" + "google.golang.org/genproto/googleapis/api/metric" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "cloud.google.com/go/internal/testutil" + . "cloud.google.com/go/spanner/internal/testutil" +) + +func TestNewBuiltinMetricsTracerFactory(t *testing.T) { + os.Setenv("SPANNER_ENABLE_BUILTIN_METRICS", "true") + defer os.Unsetenv("SPANNER_ENABLE_BUILTIN_METRICS") + ctx := context.Background() + project := "test-project" + instance := "test-instance" + clientUID := "test-uid" + createSessionRPC := "Spanner.BatchCreateSessions" + if isMultiplexEnabled { + createSessionRPC = "Spanner.CreateSession" + } + + wantClientAttributes := []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyProject, project), + attribute.String(monitoredResLabelKeyInstance, instance), + attribute.String(metricLabelKeyDatabase, "[DATABASE]"), + attribute.String(metricLabelKeyClientUID, clientUID), + attribute.String(metricLabelKeyClientName, clientName), + attribute.String(monitoredResLabelKeyClientHash, "cloud_spanner_client_raw_metrics"), + attribute.String(monitoredResLabelKeyInstanceConfig, "unknown"), + attribute.String(monitoredResLabelKeyLocation, "global"), + } + wantMetricNamesStdout := []string{metricNameAttemptCount, metricNameAttemptLatencies, metricNameOperationCount, metricNameOperationLatencies} + wantMetricTypesGCM := []string{} + for _, wantMetricName := range wantMetricNamesStdout { + wantMetricTypesGCM = append(wantMetricTypesGCM, nativeMetricsPrefix+wantMetricName) + } + + // Reduce sampling period to reduce test run time + origSamplePeriod := defaultSamplePeriod + defaultSamplePeriod = 5 * time.Second + defer func() { + defaultSamplePeriod = origSamplePeriod + }() + + // return constant client UID instead of random, so that attributes can be compared + origGenerateClientUID := generateClientUID + generateClientUID = func() (string, error) { + return clientUID, nil + } + defer func() { + generateClientUID = origGenerateClientUID + }() + + // Setup mock monitoring server + monitoringServer, err := NewMetricTestServer() + if err != nil { + t.Fatalf("Error setting up metrics test server") + } + go monitoringServer.Serve() + defer monitoringServer.Shutdown() + origExporterOpts := exporterOpts + exporterOpts = []option.ClientOption{ + option.WithEndpoint(monitoringServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + defer func() { + exporterOpts = origExporterOpts + }() + + tests := []struct { + desc string + config ClientConfig + wantBuiltinEnabled bool + setEmulator bool + wantCreateTSCallsCount int // No. of CreateTimeSeries calls + wantMethods []string + wantOTELValue map[string]map[string]int64 + wantOTELMetrics map[string][]string + }{ + { + desc: "should create a new tracer factory with default meter provider", + config: ClientConfig{}, + wantBuiltinEnabled: true, + wantCreateTSCallsCount: 2, + wantMethods: []string{createSessionRPC, "Spanner.StreamingRead"}, + wantOTELValue: map[string]map[string]int64{ + createSessionRPC: { + nativeMetricsPrefix + metricNameAttemptCount: 1, + nativeMetricsPrefix + metricNameOperationCount: 1, + }, + "Spanner.StreamingRead": { + nativeMetricsPrefix + metricNameAttemptCount: 2, + nativeMetricsPrefix + metricNameOperationCount: 1, + }, + }, + wantOTELMetrics: map[string][]string{ + createSessionRPC: wantMetricTypesGCM, + // since operation will be retries once we will have extra attempt latency for this operation + "Spanner.StreamingRead": append(wantMetricTypesGCM, nativeMetricsPrefix+metricNameAttemptLatencies), + }, + }, + { + desc: "should not create instruments when SPANNER_EMULATOR_HOST is set", + config: ClientConfig{}, + setEmulator: true, + }, + } + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + if test.setEmulator { + // Set environment variable + t.Setenv("SPANNER_EMULATOR_HOST", "localhost:9010") + } + server, client, teardown := setupMockedTestServerWithConfig(t, test.config) + defer teardown() + server.TestSpanner.PutExecutionTime(MethodStreamingRead, + SimulatedExecutionTime{ + Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}, + }) + + if client.metricsTracerFactory.enabled != test.wantBuiltinEnabled { + t.Errorf("builtinEnabled: got: %v, want: %v", client.metricsTracerFactory.enabled, test.wantBuiltinEnabled) + } + + if diff := testutil.Diff(client.metricsTracerFactory.clientAttributes, wantClientAttributes, + cmpopts.IgnoreUnexported(attribute.KeyValue{}, attribute.Value{})); diff != "" { + t.Errorf("clientAttributes: got=-, want=+ \n%v", diff) + } + + // Check instruments + gotNonNilInstruments := client.metricsTracerFactory.operationLatencies != nil && + client.metricsTracerFactory.operationCount != nil && + client.metricsTracerFactory.attemptLatencies != nil && + client.metricsTracerFactory.attemptCount != nil + if test.wantBuiltinEnabled != gotNonNilInstruments { + t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled) + } + + // pop out all old requests + // record start time + testStartTime := time.Now() + + // pop out all old requests + monitoringServer.CreateServiceTimeSeriesRequests() + + // Perform single use read-only transaction + _, err = client.Single().ReadRow(ctx, "Albums", Key{"foo"}, []string{"SingerId", "AlbumId", "AlbumTitle"}) + if err != nil { + t.Fatalf("ReadRows failed: %v", err) + } + + // Calculate elapsed time + elapsedTime := time.Since(testStartTime) + if elapsedTime < 3*defaultSamplePeriod { + // Ensure at least 2 datapoints are recorded + time.Sleep(3*defaultSamplePeriod - elapsedTime) + } + + // Get new CreateServiceTimeSeriesRequests + gotCreateTSCalls := monitoringServer.CreateServiceTimeSeriesRequests() + var gotExpectedMethods []string + gotOTELValues := make(map[string]map[string]int64) + for _, gotCreateTSCall := range gotCreateTSCalls { + gotMetricTypesPerMethod := make(map[string][]string) + for _, ts := range gotCreateTSCall.TimeSeries { + gotMetricTypesPerMethod[ts.Metric.GetLabels()["method"]] = append(gotMetricTypesPerMethod[ts.Metric.GetLabels()["method"]], ts.Metric.Type) + if _, ok := gotOTELValues[ts.Metric.GetLabels()["method"]]; !ok { + gotOTELValues[ts.Metric.GetLabels()["method"]] = make(map[string]int64) + gotExpectedMethods = append(gotExpectedMethods, ts.Metric.GetLabels()["method"]) + } + if ts.MetricKind == metric.MetricDescriptor_CUMULATIVE && ts.GetValueType() == metric.MetricDescriptor_INT64 { + gotOTELValues[ts.Metric.GetLabels()["method"]][ts.Metric.Type] = ts.Points[0].Value.GetInt64Value() + } else { + for _, p := range ts.Points { + if p.Value.GetInt64Value() > int64(elapsedTime) { + t.Errorf("Value %v is greater than elapsed time %v", p.Value.GetInt64Value(), elapsedTime) + } + } + } + } + for method, gotMetricTypes := range gotMetricTypesPerMethod { + sort.Strings(gotMetricTypes) + sort.Strings(test.wantOTELMetrics[method]) + if !testutil.Equal(gotMetricTypes, test.wantOTELMetrics[method]) { + t.Errorf("Metric types missing in req. %s got: %v, want: %v", method, gotMetricTypes, wantMetricTypesGCM) + } + } + } + sort.Strings(gotExpectedMethods) + if !testutil.Equal(gotExpectedMethods, test.wantMethods) { + t.Errorf("Expected methods missing in req. got: %v, want: %v", gotExpectedMethods, test.wantMethods) + } + for method, wantOTELValues := range test.wantOTELValue { + for metricName, wantValue := range wantOTELValues { + if gotOTELValues[method][metricName] != wantValue { + t.Errorf("OTEL value for %s, %s: got: %v, want: %v", method, metricName, gotOTELValues[method][metricName], wantValue) + } + } + } + gotCreateTSCallsCount := len(gotCreateTSCalls) + if gotCreateTSCallsCount < test.wantCreateTSCallsCount { + t.Errorf("No. of CreateServiceTimeSeriesRequests: got: %v, want: %v", gotCreateTSCalls, test.wantCreateTSCallsCount) + } + }) + } +} diff --git a/spanner/ot_metrics.go b/spanner/ot_metrics.go index 16190860ce77..2002e395ca26 100644 --- a/spanner/ot_metrics.go +++ b/spanner/ot_metrics.go @@ -75,7 +75,6 @@ func createOpenTelemetryConfig(mp metric.MeterProvider, logger *log.Logger, sess config.attributeMapWithoutMultiplexed = append(config.attributeMapWithoutMultiplexed, attributeMap...) config.attributeMapWithoutMultiplexed = append(config.attributeMapWithoutMultiplexed, attributeKeyIsMultiplexed.String("false")) - setOpenTelemetryMetricProvider(config, mp, logger) return config, nil } diff --git a/spanner/read.go b/spanner/read.go index 83755722eca9..34af289004dc 100644 --- a/spanner/read.go +++ b/spanner/read.go @@ -51,6 +51,7 @@ func errEarlyReadEnd() error { func stream( ctx context.Context, logger *log.Logger, + meterTracerFactory *builtinMetricsTracerFactory, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), setTimestamp func(time.Time), release func(error), @@ -58,6 +59,7 @@ func stream( return streamWithReplaceSessionFunc( ctx, logger, + meterTracerFactory, rpc, nil, nil, @@ -72,6 +74,7 @@ func stream( func streamWithReplaceSessionFunc( ctx context.Context, logger *log.Logger, + meterTracerFactory *builtinMetricsTracerFactory, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error, setTransactionID func(transactionID), @@ -81,7 +84,7 @@ func streamWithReplaceSessionFunc( ctx, cancel := context.WithCancel(ctx) ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator") return &RowIterator{ - streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession), + streamd: newResumableStreamDecoder(ctx, logger, meterTracerFactory, rpc, replaceSession), rowd: &partialResultSetDecoder{}, setTransactionID: setTransactionID, setTimestamp: setTimestamp, @@ -403,17 +406,20 @@ type resumableStreamDecoder struct { // backoff is used for the retry settings backoff gax.Backoff + + meterTracerFactory *builtinMetricsTracerFactory } // newResumableStreamDecoder creates a new resumeableStreamDecoder instance. // Parameter rpc should be a function that creates a new stream beginning at the // restartToken if non-nil. -func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder { +func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, meterTracerFactory *builtinMetricsTracerFactory, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder { return &resumableStreamDecoder{ ctx: ctx, logger: logger, rpc: rpc, replaceSessionFunc: replaceSession, + meterTracerFactory: meterTracerFactory, maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens), backoff: DefaultRetryBackoff, } @@ -498,12 +504,20 @@ var ( ) func (d *resumableStreamDecoder) next() bool { + mt := d.meterTracerFactory.createBuiltinMetricsTracer(d.ctx) + defer func() { + if mt.method != "" { + statusCode, _ := convertToGrpcStatusErr(d.lastErr()) + mt.currOp.setStatus(statusCode.String()) + recordOperationCompletion(&mt) + } + }() retryer := onCodes(d.backoff, codes.Unavailable, codes.ResourceExhausted, codes.Internal) for { switch d.state { case unConnected: // If no gRPC stream is available, try to initiate one. - d.stream, d.err = d.rpc(d.ctx, d.resumeToken) + d.stream, d.err = d.rpc(context.WithValue(d.ctx, metricsTracerKey, &mt), d.resumeToken) if d.err == nil { d.changeState(queueingRetryable) continue @@ -633,6 +647,7 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { d.q.clear() d.stream = nil d.changeState(unConnected) + return } // get returns the most recent PartialResultSet generated by a call to next. diff --git a/spanner/read_test.go b/spanner/read_test.go index 7ef3f550ffe1..e15ced784870 100644 --- a/spanner/read_test.go +++ b/spanner/read_test.go @@ -25,7 +25,6 @@ import ( "testing" "time" - vkit "cloud.google.com/go/spanner/apiv1" sppb "cloud.google.com/go/spanner/apiv1/spannerpb" . "cloud.google.com/go/spanner/internal/testutil" "github.com/googleapis/gax-go/v2" @@ -809,6 +808,7 @@ func TestRsdNonblockingStates(t *testing.T) { r := newResumableStreamDecoder( ctx, nil, + c.metricsTracerFactory, test.rpc, nil, ) @@ -1102,6 +1102,7 @@ func TestRsdBlockingStates(t *testing.T) { r := newResumableStreamDecoder( ctx, nil, + c.metricsTracerFactory, test.rpc, nil, ) @@ -1263,6 +1264,7 @@ func TestQueueBytes(t *testing.T) { decoder := newResumableStreamDecoder( ctx, nil, + c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { r, err := mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1360,6 +1362,7 @@ func TestResumeToken(t *testing.T) { streaming := func() *RowIterator { return stream(context.Background(), nil, + c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { r, err := mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1504,7 +1507,7 @@ func TestGrpcReconnect(t *testing.T) { // The retry is counted from the second call. r := -1 // Establish a stream to mock cloud spanner server. - iter := stream(context.Background(), nil, + iter := stream(context.Background(), nil, c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { r++ return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ @@ -1557,7 +1560,7 @@ func TestCancelTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { // Establish a stream to mock cloud spanner server. - iter := stream(ctx, nil, + iter := stream(ctx, nil, c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1594,7 +1597,7 @@ func TestCancelTimeout(t *testing.T) { defer cancel() go func() { // Establish a stream to mock cloud spanner server. - iter := stream(ctx, nil, + iter := stream(ctx, nil, c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1674,7 +1677,7 @@ func TestRowIteratorDo(t *testing.T) { } nRows := 0 - iter := stream(context.Background(), nil, + iter := stream(context.Background(), nil, c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1709,7 +1712,7 @@ func TestRowIteratorDoWithError(t *testing.T) { t.Fatalf("failed to create a session") } - iter := stream(context.Background(), nil, + iter := stream(context.Background(), nil, c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1743,7 +1746,7 @@ func TestIteratorStopEarly(t *testing.T) { t.Fatalf("failed to create a session") } - iter := stream(ctx, nil, + iter := stream(ctx, nil, c.metricsTracerFactory, func(ct context.Context, resumeToken []byte) (streamingReceiver, error) { return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{ Session: session.Name, @@ -1774,7 +1777,7 @@ func TestIteratorWithError(t *testing.T) { } } -func createSession(client *vkit.Client) (*sppb.Session, error) { +func createSession(client spannerClient) (*sppb.Session, error) { var formattedDatabase string = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") var request = &sppb.CreateSessionRequest{ Database: formattedDatabase, diff --git a/spanner/session.go b/spanner/session.go index ec0046444bbd..b6a0cc9d9538 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -31,7 +31,6 @@ import ( "time" "cloud.google.com/go/internal/trace" - vkit "cloud.google.com/go/spanner/apiv1" sppb "cloud.google.com/go/spanner/apiv1/spannerpb" "cloud.google.com/go/spanner/internal" "go.opencensus.io/stats" @@ -91,7 +90,7 @@ type sessionHandle struct { // access it directly. session *session // client is the RPC channel to Cloud Spanner. It is set only once during session acquisition. - client *vkit.Client + client spannerClient // checkoutTime is the time the session was checked out of the pool. checkoutTime time.Time // lastUseTime is the time the session was last used after checked out of the pool. @@ -151,7 +150,7 @@ func (sh *sessionHandle) getID() string { // getClient gets the Cloud Spanner RPC client associated with the session ID // in sessionHandle. -func (sh *sessionHandle) getClient() *vkit.Client { +func (sh *sessionHandle) getClient() spannerClient { sh.mu.Lock() defer sh.mu.Unlock() if sh.session == nil { @@ -227,7 +226,7 @@ func (sh *sessionHandle) updateLastUseTime() { type session struct { // client is the RPC channel to Cloud Spanner. It is set only once during // session's creation. - client *vkit.Client + client spannerClient // id is the unique id of the session in Cloud Spanner. It is set only once // during session's creation. id string @@ -603,7 +602,7 @@ type sessionPool struct { // multiplexSessionClientCounter is the counter for the multiplexed session client. multiplexSessionClientCounter int // clientPool is a pool of Cloud Spanner grpc clients. - clientPool []*vkit.Client + clientPool []spannerClient // multiplexedSession contains the multiplexed session multiplexedSession *session // mayGetSession is for broadcasting that session retrival/creation may @@ -1084,14 +1083,14 @@ func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { return sh } -func (p *sessionPool) getRoundRobinClient() *vkit.Client { +func (p *sessionPool) getRoundRobinClient() spannerClient { p.sc.mu.Lock() defer func() { p.multiplexSessionClientCounter++ p.sc.mu.Unlock() }() if len(p.clientPool) == 0 { - p.clientPool = make([]*vkit.Client, p.sc.connPool.Num()) + p.clientPool = make([]spannerClient, p.sc.connPool.Num()) for i := 0; i < p.sc.connPool.Num(); i++ { c, err := p.sc.nextClient() if err != nil { diff --git a/spanner/session_test.go b/spanner/session_test.go index dfd25e7657b8..598bb34fa75e 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -2074,7 +2074,8 @@ func getSessionsPerChannel(sp *sessionPool) map[string]int { // Get the pointer to the actual underlying gRPC ClientConn and use // that as the key in the map. val := reflect.ValueOf(s.client).Elem() - internalClient := val.FieldByName("internalClient").Elem().Elem() + rawClient := val.FieldByName("raw").Elem() + internalClient := rawClient.FieldByName("internalClient").Elem().Elem() connPool := internalClient.FieldByName("connPool").Elem().Elem() conn := connPool.Field(0).Pointer() key := fmt.Sprintf("%v", conn) diff --git a/spanner/sessionclient.go b/spanner/sessionclient.go index e9352bd42c5a..7468f21bc722 100644 --- a/spanner/sessionclient.go +++ b/spanner/sessionclient.go @@ -21,7 +21,6 @@ import ( "fmt" "log" "reflect" - "strings" "sync" "time" @@ -90,17 +89,18 @@ type sessionClient struct { closed bool disableRouteToLeader bool - connPool gtransport.ConnPool - database string - id string - userAgent string - sessionLabels map[string]string - databaseRole string - md metadata.MD - batchTimeout time.Duration - logger *log.Logger - callOptions *vkit.CallOptions - otConfig *openTelemetryConfig + connPool gtransport.ConnPool + database string + id string + userAgent string + sessionLabels map[string]string + databaseRole string + md metadata.MD + batchTimeout time.Duration + logger *log.Logger + callOptions *vkit.CallOptions + otConfig *openTelemetryConfig + metricsTracerFactory *builtinMetricsTracerFactory } // newSessionClient creates a session client to use for a database. @@ -249,9 +249,8 @@ func (sc *sessionClient) batchCreateSessions(createSessionCount int32, distribut // executeBatchCreateSessions executes the gRPC call for creating a batch of // sessions. -func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createCount int32, labels map[string]string, md metadata.MD, consumer sessionConsumer) { +func (sc *sessionClient) executeBatchCreateSessions(client spannerClient, createCount int32, labels map[string]string, md metadata.MD, consumer sessionConsumer) { defer sc.waitWorkers.Done() - ctx, cancel := context.WithTimeout(context.Background(), sc.batchTimeout) defer cancel() ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchCreateSessions") @@ -325,7 +324,7 @@ func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createC } } -func (sc *sessionClient) executeCreateMultiplexedSession(ctx context.Context, client *vkit.Client, md metadata.MD, consumer sessionConsumer) { +func (sc *sessionClient) executeCreateMultiplexedSession(ctx context.Context, client spannerClient, md metadata.MD, consumer sessionConsumer) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.CreateSession") defer func() { trace.EndSpan(ctx, nil) }() trace.TracePrintf(ctx, nil, "Creating a multiplexed session") @@ -395,7 +394,7 @@ func (sc *sessionClient) sessionWithID(id string) (*session, error) { // client is set on the session, and used by all subsequent gRPC calls on the // session. Using the same channel for all gRPC calls for a session ensures the // optimal usage of server side caches. -func (sc *sessionClient) nextClient() (*vkit.Client, error) { +func (sc *sessionClient) nextClient() (spannerClient, error) { var clientOpt option.ClientOption if _, ok := sc.connPool.(*gmeWrapper); ok { // Pass GCPMultiEndpoint as a pool. @@ -404,21 +403,11 @@ func (sc *sessionClient) nextClient() (*vkit.Client, error) { // Pick a grpc.ClientConn from a regular pool. clientOpt = option.WithGRPCConn(sc.connPool.Conn()) } - client, err := vkit.NewClient(context.Background(), clientOpt) + client, err := newGRPCSpannerClient(context.Background(), sc, clientOpt) if err != nil { return nil, err } - clientInfo := []string{"gccl", internal.Version} - if sc.userAgent != "" { - agentWithVersion := strings.SplitN(sc.userAgent, "/", 2) - if len(agentWithVersion) == 2 { - clientInfo = append(clientInfo, agentWithVersion[0], agentWithVersion[1]) - } - } - client.SetGoogleClientInfo(clientInfo...) - if sc.callOptions != nil { - client.CallOptions = mergeCallOptions(client.CallOptions, sc.callOptions) - } + return client, nil } diff --git a/spanner/sessionclient_test.go b/spanner/sessionclient_test.go index d1813754267b..9c3180f991a6 100644 --- a/spanner/sessionclient_test.go +++ b/spanner/sessionclient_test.go @@ -281,7 +281,7 @@ func TestBatchCreateAndCloseSession(t *testing.T) { t.Fatalf("number of sessions created mismatch\ngot: %v\nwant: %v", created, expectedNumSessions) } // Check that all channels are used evenly. - channelCounts := make(map[*vkit.Client]int32) + channelCounts := make(map[spannerClient]int32) for _, s := range consumer.sessions { channelCounts[s.client]++ } diff --git a/spanner/spannertest/integration_test.go b/spanner/spannertest/integration_test.go index dbdfa3ac8aeb..22165c0ba34d 100644 --- a/spanner/spannertest/integration_test.go +++ b/spanner/spannertest/integration_test.go @@ -142,7 +142,10 @@ func makeClient(t *testing.T) (*spanner.Client, *dbadmin.DatabaseAdminClient, *v client, _, err = spanner.NewMultiEndpointClient(ctx, dbName(), gmeCfg, opts...) os.Setenv("SPANNER_EMULATOR_HOST", old) } else { - opts = append(opts, option.WithGRPCConn(conn)) + opts = append(opts, + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + option.WithoutAuthentication(), + option.WithEndpoint(srv.Addr)) client, err = spanner.NewClient(ctx, dbName(), opts...) } if err != nil { diff --git a/spanner/test/opentelemetry/test/go.mod b/spanner/test/opentelemetry/test/go.mod index 12daf7ca48bd..dd2f7011e0d7 100644 --- a/spanner/test/opentelemetry/test/go.mod +++ b/spanner/test/opentelemetry/test/go.mod @@ -24,7 +24,9 @@ require ( cloud.google.com/go/compute/metadata v0.5.0 // indirect cloud.google.com/go/iam v1.2.1 // indirect cloud.google.com/go/longrunning v0.6.1 // indirect + cloud.google.com/go/monitoring v1.21.0 // indirect github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20240822171458-6449f94b4d59 // indirect @@ -40,6 +42,7 @@ require ( github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/detectors/gcp v1.29.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect go.opentelemetry.io/otel/metric v1.29.0 // indirect diff --git a/spanner/test/opentelemetry/test/go.sum b/spanner/test/opentelemetry/test/go.sum index e45af471728e..4e34219d1e4c 100644 --- a/spanner/test/opentelemetry/test/go.sum +++ b/spanner/test/opentelemetry/test/go.sum @@ -1292,6 +1292,7 @@ cloud.google.com/go/monitoring v1.20.1/go.mod h1:FYSe/brgfuaXiEzOQFhTjsEsJv+WePy cloud.google.com/go/monitoring v1.20.2/go.mod h1:36rpg/7fdQ7NX5pG5x1FA7cXTVXusOp6Zg9r9e1+oek= cloud.google.com/go/monitoring v1.20.3/go.mod h1:GPIVIdNznIdGqEjtRKQWTLcUeRnPjZW85szouimiczU= cloud.google.com/go/monitoring v1.20.4/go.mod h1:v7F/UcLRw15EX7xq565N7Ae5tnYEE28+Cl717aTXG4c= +cloud.google.com/go/monitoring v1.21.0 h1:EMc0tB+d3lUewT2NzKC/hr8cSR9WsUieVywzIHetGro= cloud.google.com/go/monitoring v1.21.0/go.mod h1:tuJ+KNDdJbetSsbSGTqnaBvbauS5kr3Q/koy3Up6r+4= cloud.google.com/go/networkconnectivity v1.4.0/go.mod h1:nOl7YL8odKyAOtzNX73/M5/mGZgqqMeryi6UPZTk/rA= cloud.google.com/go/networkconnectivity v1.5.0/go.mod h1:3GzqJx7uhtlM3kln0+x5wyFvuVH1pIBJjhCpjzSt75o= @@ -2075,6 +2076,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 h1:oVLqHXhnYtUwM89y9T1fXGaK9wTkXHgNp8/ZNMQzUxE= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 h1:pB2F2JKCj1Znmp2rwxxt1J0Fg0wezTMgWYk5Mpbi1kg= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1/go.mod h1:itPGVDKf9cC/ov4MdvJ2QZ0khw4bfoo9jzwTJlaxy2k= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= @@ -2393,6 +2396,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -2437,6 +2441,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/detectors/gcp v1.29.0 h1:TiaiXB4DpGD3sdzNlYQxruQngn5Apwzi1X0DRhuGvDQ= +go.opentelemetry.io/contrib/detectors/gcp v1.29.0/go.mod h1:GW2aWZNwR2ZxDLdv8OyC2G8zkRoQBuURgV7RPQgcPoU= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0/go.mod h1:tIKj3DbO8N9Y2xo52og3irLsPI4GW02DSMtrVgNMgxg= diff --git a/spanner/transaction.go b/spanner/transaction.go index 4c7ca876aad7..86fedd7a4d8b 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" - vkit "cloud.google.com/go/spanner/apiv1" durationpb "google.golang.org/protobuf/types/known/durationpb" ) @@ -292,6 +291,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), sh.session.logger, + t.sp.sc.metricsTracerFactory, func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { if t.sh != nil { t.sh.updateLastUseTime() @@ -585,6 +585,7 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), sh.session.logger, + t.sp.sc.metricsTracerFactory, func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { req.ResumeToken = resumeToken req.Session = t.sh.getID() @@ -1477,7 +1478,7 @@ func (t *ReadWriteTransaction) setSessionEligibilityForLongRunning(sh *sessionHa } } -func beginTransaction(ctx context.Context, sid string, client *vkit.Client, opts TransactionOptions) (transactionID, error) { +func beginTransaction(ctx context.Context, sid string, client spannerClient, opts TransactionOptions) (transactionID, error) { res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ Session: sid, Options: &sppb.TransactionOptions{