Skip to content

Commit

Permalink
chore(spanner): add support of client side native metrics collection …
Browse files Browse the repository at this point in the history
…and export (#10419)

* chore(spanner): add support of client side native metrics collection and export

* fix build

* fix header issue

* fix tests

* fix client_uid, client_name and method signature.

* fix directpath_used var too correct extract peerInfo, and capture streaming native metrics when stream ends,

* remove dep on grpc_go_middleware

* use grpc connection target to check if direct path is enabled.

* refactor as per suggestions

* fix header

* fix tests

* return wrappedStream from stream interceptor to fix peerInfo not available before Send

* fix tests

* fix tests

* skip holding client connection on wrapper

* do not emil metrics if env is not set

* fix tests

* mark env to enable native metrics as experimental

* add resource label client_hash and updated export duration to 1 minutes

* fix tests
  • Loading branch information
rahul2393 authored Sep 25, 2024
1 parent 36248ca commit 6fbd687
Show file tree
Hide file tree
Showing 22 changed files with 1,713 additions and 73 deletions.
1 change: 1 addition & 0 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
return stream(
contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader),
sh.session.logger,
t.sp.sc.metricsTracerFactory,
rpc,
t.setTimestamp,
t.release)
Expand Down
208 changes: 193 additions & 15 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"

"cloud.google.com/go/internal/trace"
Expand All @@ -33,6 +35,7 @@ import (
"github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand All @@ -42,6 +45,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal"
Expand Down Expand Up @@ -111,6 +116,7 @@ type Client struct {
disableRouteToLeader bool
dro *sppb.DirectedReadOptions
otConfig *openTelemetryConfig
metricsTracerFactory *builtinMetricsTracerFactory
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -461,6 +467,30 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
sc.otConfig = otConfig
sc.mu.Unlock()

var metricsProvider metric.MeterProvider
if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit native metrics when emulator is being used
metricsProvider = noop.NewMeterProvider()
}

// SPANNER_ENABLE_BUILTIN_METRICS environment variable is used to enable
// native metrics for the Spanner client, which overrides the default.
//
// This is an EXPERIMENTAL feature and may be changed or removed in the future.
if os.Getenv("SPANNER_ENABLE_BUILTIN_METRICS") != "true" {
// Do not emit native metrics when SPANNER_ENABLE_BUILTIN_METRICS is not set to true
metricsProvider = noop.NewMeterProvider()
}

// Create a OpenTelemetry metrics configuration
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider)
if err != nil {
return nil, err
}
sc.mu.Lock()
sc.metricsTracerFactory = metricsTracerFactory
sc.mu.Unlock()

// Create a session pool.
config.SessionPoolConfig.sessionLabels = sessionLabels
sp, err := newSessionPool(sc, config.SessionPoolConfig)
Expand All @@ -482,6 +512,7 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
disableRouteToLeader: config.DisableRouteToLeader,
dro: config.DirectedReadOptions,
otConfig: otConfig,
metricsTracerFactory: metricsTracerFactory,
}
return c, nil
}
Expand Down Expand Up @@ -537,6 +568,8 @@ func allClientOpts(numChannels int, compression string, userOpts ...option.Clien
option.WithGRPCConnectionPool(numChannels),
option.WithUserAgent(fmt.Sprintf("spanner-go/v%s", internal.Version)),
internaloption.AllowNonDefaultServiceAccount(true),
option.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(addNativeMetricsInterceptor()...)),
option.WithGRPCDialOption(grpc.WithChainStreamInterceptor(addStreamNativeMetricsInterceptor()...)),
}
if enableDirectPathXds, _ := strconv.ParseBool(os.Getenv("GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS")); enableDirectPathXds {
clientDefaultOpts = append(clientDefaultOpts, internaloption.EnableDirectPath(true), internaloption.EnableDirectPathXds())
Expand All @@ -549,6 +582,136 @@ func allClientOpts(numChannels int, compression string, userOpts ...option.Clien
return append(allDefaultOpts, userOpts...)
}

// metricsInterceptor is a gRPC unary client interceptor that records metrics for unary RPCs.
func metricsInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
mt, ok := ctx.Value(metricsTracerKey).(*builtinMetricsTracer)
if !ok {
return invoker(ctx, method, req, reply, cc, opts...)
}

mt.method = method
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = &attemptTracer{}
mt.currOp.currAttempt.setStartTime(time.Now())
if strings.HasPrefix(cc.Target(), "google-c2p") {
mt.currOp.setDirectPathEnabled(true)
}

peerInfo := &peer.Peer{}
opts = append(opts, grpc.Peer(peerInfo))
err := invoker(ctx, method, req, reply, cc, opts...)

statusCode, _ := status.FromError(err)
mt.currOp.currAttempt.setStatus(statusCode.Code().String())

isDirectPathUsed := false
if peerInfo.Addr != nil {
remoteIP := peerInfo.Addr.String()
if strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) {
isDirectPathUsed = true
}
}

mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed)
recordAttemptCompletion(mt)
return err
}
}

// wrappedStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type wrappedStream struct {
sync.Mutex
isFirstRecv bool
method string
target string
grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m any) error {
attempt := &attemptTracer{}
attempt.setStartTime(time.Now())
err := w.ClientStream.RecvMsg(m)
statusCode, _ := status.FromError(err)
ctx := w.ClientStream.Context()
mt, ok := ctx.Value(metricsTracerKey).(*builtinMetricsTracer)
if !ok || !w.isFirstRecv {
return err
}
w.Lock()
w.isFirstRecv = false
w.Unlock()
mt.method = w.method
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = attempt
if strings.HasPrefix(w.target, "google-c2p") {
mt.currOp.setDirectPathEnabled(true)
}
isDirectPathUsed := false
peerInfo, ok := peer.FromContext(ctx)
if ok {
if peerInfo.Addr != nil {
remoteIP := peerInfo.Addr.String()
if strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) {
isDirectPathUsed = true
}
}
}
mt.currOp.currAttempt.setStatus(statusCode.Code().String())
mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed)
recordAttemptCompletion(mt)
return err
}

func (w *wrappedStream) SendMsg(m any) error {
return w.ClientStream.SendMsg(m)
}

func newWrappedStream(s grpc.ClientStream, method, target string) grpc.ClientStream {
return &wrappedStream{ClientStream: s, method: method, target: target, isFirstRecv: true}
}

// metricsInterceptor is a gRPC stream client interceptor that records metrics for stream RPCs.
func metricsStreamInterceptor() grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return newWrappedStream(s, method, cc.Target()), nil
}
}

// AddNativeMetricsInterceptor intercepts unary requests and records metrics for them.
func addNativeMetricsInterceptor() []grpc.UnaryClientInterceptor {
unaryInterceptors := []grpc.UnaryClientInterceptor{}
unaryInterceptors = append(unaryInterceptors, metricsInterceptor())
return unaryInterceptors
}

// AddStreamNativeMetricsInterceptor intercepts stream requests and records metrics for them.
func addStreamNativeMetricsInterceptor() []grpc.StreamClientInterceptor {
streamInterceptors := []grpc.StreamClientInterceptor{}
streamInterceptors = append(streamInterceptors, metricsStreamInterceptor())
return streamInterceptors
}

// getQueryOptions returns the query options overwritten by the environment
// variables if exist. The input parameter is the query options set by users
// via application-level configuration. If the environment variables are set,
Expand All @@ -570,6 +733,9 @@ func getQueryOptions(opts QueryOptions) QueryOptions {

// Close closes the client.
func (c *Client) Close() {
if c.metricsTracerFactory != nil {
c.metricsTracerFactory.shutdown()
}
if c.idleSessions != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -703,6 +869,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
rts: rts,
},
}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
Expand Down Expand Up @@ -738,6 +905,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
},
ID: tid,
}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
Expand Down Expand Up @@ -988,20 +1156,29 @@ func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {

// BatchWriteResponseIterator is an iterator over BatchWriteResponse structures returned from BatchWrite RPC.
type BatchWriteResponseIterator struct {
ctx context.Context
stream sppb.Spanner_BatchWriteClient
err error
dataReceived bool
replaceSession func(ctx context.Context) error
rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error)
release func(error)
cancel func()
ctx context.Context
stream sppb.Spanner_BatchWriteClient
err error
dataReceived bool
meterTracerFactory *builtinMetricsTracerFactory
replaceSession func(ctx context.Context) error
rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error)
release func(error)
cancel func()
}

// Next returns the next result. Its second return value is iterator.Done if
// there are no more results. Once Next returns Done, all subsequent calls
// will return Done.
func (r *BatchWriteResponseIterator) Next() (*sppb.BatchWriteResponse, error) {
mt := r.meterTracerFactory.createBuiltinMetricsTracer(r.ctx)
defer func() {
if mt.method != "" {
statusCode, _ := convertToGrpcStatusErr(r.err)
mt.currOp.setStatus(statusCode.String())
recordOperationCompletion(&mt)
}
}()
for {
// Stream finished or in error state.
if r.err != nil {
Expand Down Expand Up @@ -1117,13 +1294,13 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup

mgsPb, err := mutationGroupsProto(mgs)
if err != nil {
return &BatchWriteResponseIterator{err: err}
return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err}
}

var sh *sessionHandle
sh, err = c.idleSessions.take(ctx)
if err != nil {
return &BatchWriteResponseIterator{err: err}
return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err}
}

rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) {
Expand Down Expand Up @@ -1169,11 +1346,12 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
ctx, cancel := context.WithCancel(ctx)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchWriteResponseIterator")
return &BatchWriteResponseIterator{
ctx: ctx,
rpc: rpc,
replaceSession: replaceSession,
release: release,
cancel: cancel,
ctx: ctx,
meterTracerFactory: c.metricsTracerFactory,
rpc: rpc,
replaceSession: replaceSession,
release: release,
cancel: cancel,
}
}

Expand Down
4 changes: 2 additions & 2 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4447,13 +4447,13 @@ func TestClient_CallOptions(t *testing.T) {

cs := &gax.CallSettings{}
// This is the default retry setting.
c.CallOptions.CreateSession[1].Resolve(cs)
c.CallOptions().CreateSession[1].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14 8]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}

// This is the custom retry setting.
c.CallOptions.CreateSession[2].Resolve(cs)
c.CallOptions().CreateSession[2].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{200000000 30000000000 1.25 0} [14 4]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}
Expand Down
6 changes: 5 additions & 1 deletion spanner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ require (
cloud.google.com/go v0.115.1
cloud.google.com/go/iam v1.2.1
cloud.google.com/go/longrunning v0.6.1
cloud.google.com/go/monitoring v1.21.0
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.13.0
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/detectors/gcp v1.29.0
go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/metric v1.29.0
go.opentelemetry.io/otel/sdk/metric v1.29.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
google.golang.org/api v0.197.0
Expand All @@ -27,6 +31,7 @@ require (
cloud.google.com/go/auth v0.9.3 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20240822171458-6449f94b4d59 // indirect
Expand All @@ -37,7 +42,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
Expand Down
Loading

0 comments on commit 6fbd687

Please sign in to comment.