Skip to content

Commit

Permalink
chore(spanner): add support of client side native metrics collection …
Browse files Browse the repository at this point in the history
…and export
  • Loading branch information
rahul2393 committed Aug 22, 2024
1 parent bcb38bd commit 1affbec
Show file tree
Hide file tree
Showing 15 changed files with 1,500 additions and 53 deletions.
8 changes: 8 additions & 0 deletions spanner/apiv1/spanner_client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ import "google.golang.org/api/option"
func DefaultClientOptions() []option.ClientOption {
return defaultGRPCClientOptions()
}

// Returns the default call options used by the generated Spanner client.
//
// This function is only intended for use by the client library, and may be
// removed at any time without any warning.
func DefaultCallOptions() *CallOptions {
return defaultCallOptions()
}
67 changes: 67 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp"
grpcgcppb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
"github.com/googleapis/gax-go/v2"
go_grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand All @@ -42,6 +44,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal"
Expand Down Expand Up @@ -96,6 +99,9 @@ func parseDatabaseName(db string) (project, instance, database string, err error
return matches[1], matches[2], matches[3], nil
}

// PeerKey is a key used to store peer information in a context.
type PeerKey struct{}

// Client is a client for reading and writing data to a Cloud Spanner database.
// A client is safe to use concurrently, except for its Close method.
type Client struct {
Expand All @@ -111,6 +117,7 @@ type Client struct {
disableRouteToLeader bool
dro *sppb.DirectedReadOptions
otConfig *openTelemetryConfig
metricsTracerFactory *builtinMetricsTracerFactory
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -461,6 +468,21 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
sc.otConfig = otConfig
sc.mu.Unlock()

metricsProvider := otConfig.meterProvider
if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
metricsProvider = noop.NewMeterProvider()
}

// Create a OpenTelemetry metrics configuration
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider)
if err != nil {
metricsProvider = noop.NewMeterProvider()
}
sc.mu.Lock()
sc.metricsTracerFactory = metricsTracerFactory
sc.mu.Unlock()

// Create a session pool.
config.SessionPoolConfig.sessionLabels = sessionLabels
sp, err := newSessionPool(sc, config.SessionPoolConfig)
Expand All @@ -482,6 +504,7 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
disableRouteToLeader: config.DisableRouteToLeader,
dro: config.DirectedReadOptions,
otConfig: otConfig,
metricsTracerFactory: metricsTracerFactory,
}
return c, nil
}
Expand Down Expand Up @@ -540,6 +563,8 @@ func allClientOpts(numChannels int, compression string, userOpts ...option.Clien
}
if enableDirectPathXds, _ := strconv.ParseBool(os.Getenv("GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS")); enableDirectPathXds {
clientDefaultOpts = append(clientDefaultOpts, internaloption.EnableDirectPath(true), internaloption.EnableDirectPathXds())
clientDefaultOpts = append(clientDefaultOpts, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(AddUnaryPeerInterceptor())),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(AddStreamPeerInterceptor())))
}
if compression == "gzip" {
userOpts = append(userOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
Expand All @@ -549,6 +574,45 @@ func allClientOpts(numChannels int, compression string, userOpts ...option.Clien
return append(allDefaultOpts, userOpts...)
}

func unaryPeerSetter() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
p, ok := ctx.Value(PeerKey{}).(*peer.Peer)
if ok {
opts = append(opts, grpc.Peer(p))
}

return invoker(ctx, method, req, reply, cc, opts...)
}
}

// streamPeerSetter makes the grpc connection include peer information in a context variable keyed by PeerKey{} if it exists.
func streamPeerSetter() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
p, ok := ctx.Value(PeerKey{}).(*peer.Peer)
if ok {
opts = append(opts, grpc.Peer(p))
}

return streamer(ctx, desc, cc, method, opts...)
}
}

// AddUnaryPeerInterceptor intercepts unary requests and add PeerKey.
func AddUnaryPeerInterceptor() grpc.UnaryClientInterceptor {
unaryInterceptors := []grpc.UnaryClientInterceptor{}
unaryInterceptors = append(unaryInterceptors, unaryPeerSetter())
return go_grpc_middleware.ChainUnaryClient(unaryInterceptors...)
}

// AddStreamPeerInterceptor intercepts stream requests and add PeerKey.
func AddStreamPeerInterceptor() grpc.StreamClientInterceptor {
streamInterceptors := []grpc.StreamClientInterceptor{}
streamInterceptors = append(streamInterceptors, streamPeerSetter())
return go_grpc_middleware.ChainStreamClient(streamInterceptors...)
}

// getQueryOptions returns the query options overwritten by the environment
// variables if exist. The input parameter is the query options set by users
// via application-level configuration. If the environment variables are set,
Expand All @@ -570,6 +634,9 @@ func getQueryOptions(opts QueryOptions) QueryOptions {

// Close closes the client.
func (c *Client) Close() {
if c.metricsTracerFactory != nil {
c.metricsTracerFactory.shutdown()
}
if c.idleSessions != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
13 changes: 9 additions & 4 deletions spanner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ require (
cloud.google.com/go v0.115.1
cloud.google.com/go/iam v1.1.13
cloud.google.com/go/longrunning v0.5.12
cloud.google.com/go/monitoring v1.20.3
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.13.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/detectors/gcp v1.28.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
golang.org/x/oauth2 v0.22.0
golang.org/x/sync v0.8.0
google.golang.org/api v0.193.0
Expand All @@ -27,6 +32,7 @@ require (
cloud.google.com/go/auth v0.9.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect
Expand All @@ -38,15 +44,14 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
)
Loading

0 comments on commit 1affbec

Please sign in to comment.