Skip to content

Commit

Permalink
fix(spanner): use spanner options when initializing monitoring export…
Browse files Browse the repository at this point in the history
…er (#11109)

* fix(spanner): use spanner options when initializing monitoring exporter

* fix test

* incorporate changes

* fix tests

* format struct
  • Loading branch information
rahul2393 authored Nov 12, 2024
1 parent 380e7d2 commit 81413f3
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 27 deletions.
3 changes: 1 addition & 2 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,7 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
metricsProvider = noop.NewMeterProvider()
}

// Create a OpenTelemetry metrics configuration
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider)
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider, config.Compression, opts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func setupMockedTestServerWithConfigAndGCPMultiendpointPool(t *testing.T, config
if len(token) != 1 {
return status.Errorf(codes.Internal, "unexpected number of api client token headers: %v", len(token))
}
if !strings.HasPrefix(token[0], "gl-go/") {
if !strings.Contains(token[0], "gl-go/") {
return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0])
}
if !strings.Contains(token[0], "gccl/") {
Expand Down
19 changes: 12 additions & 7 deletions spanner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"errors"
"fmt"
"hash/fnv"
"strings"

"log"
"os"
"strconv"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -167,7 +166,12 @@ var (
return "global"
}

exporterOpts = []option.ClientOption{}
// GCM exporter should use the same options as Spanner client
// createExporterOptions takes Spanner client options and returns exporter options
// Overwritten in tests
createExporterOptions = func(spannerOpts ...option.ClientOption) []option.ClientOption {
return spannerOpts
}
)

type metricInfo struct {
Expand All @@ -193,7 +197,7 @@ type builtinMetricsTracerFactory struct {
attemptCount metric.Int64Counter // Counter for the number of attempts.
}

func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider) (*builtinMetricsTracerFactory, error) {
func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider, compression string, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
clientUID, err := generateClientUID()
if err != nil {
log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID)
Expand Down Expand Up @@ -224,7 +228,7 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP
var meterProvider *sdkmetric.MeterProvider
if metricsProvider == nil {
// Create default meter provider
mpOptions, err := builtInMeterProviderOptions(project)
mpOptions, err := builtInMeterProviderOptions(project, compression, opts...)
if err != nil {
return tracerFactory, err
}
Expand All @@ -247,8 +251,9 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP
return tracerFactory, err
}

func builtInMeterProviderOptions(project string) ([]sdkmetric.Option, error) {
defaultExporter, err := newMonitoringExporter(context.Background(), project, exporterOpts...)
func builtInMeterProviderOptions(project, compression string, opts ...option.ClientOption) ([]sdkmetric.Option, error) {
allOpts := createExporterOptions(opts...)
defaultExporter, err := newMonitoringExporter(context.Background(), project, compression, allOpts...)
if err != nil {
return nil, err
}
Expand Down
19 changes: 13 additions & 6 deletions spanner/metrics_monitoring_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"cloud.google.com/go/spanner/internal"
"github.com/googleapis/gax-go/v2/callctx"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/sdk/metric"
otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata"
Expand All @@ -37,6 +39,7 @@ import (
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -78,17 +81,19 @@ type monitoringExporter struct {
client *monitoring.MetricClient
shutdownOnce sync.Once
projectID string
compression string
}

func newMonitoringExporter(ctx context.Context, project string, opts ...option.ClientOption) (*monitoringExporter, error) {
func newMonitoringExporter(ctx context.Context, project, compression string, opts ...option.ClientOption) (*monitoringExporter, error) {
client, err := monitoring.NewMetricClient(ctx, opts...)
if err != nil {
return nil, err
}
return &monitoringExporter{
client: client,
shutdown: make(chan struct{}),
projectID: project,
client: client,
shutdown: make(chan struct{}),
projectID: project,
compression: compression,
}, nil
}

Expand Down Expand Up @@ -135,14 +140,16 @@ func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetr
}

name := fmt.Sprintf("projects/%s", me.projectID)

ctx = callctx.SetHeaders(ctx, "x-goog-api-client", "gccl/"+internal.Version)
if me.compression == gzip.Name {
ctx = callctx.SetHeaders(ctx, requestsCompressionHeader, gzip.Name)
}
errs := []error{err}
for i := 0; i < len(tss); i += sendBatchSize {
j := i + sendBatchSize
if j >= len(tss) {
j = len(tss)
}

req := &monitoringpb.CreateTimeSeriesRequest{
Name: name,
TimeSeries: tss[i:j],
Expand Down
16 changes: 10 additions & 6 deletions spanner/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,18 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
}
go monitoringServer.Serve()
defer monitoringServer.Shutdown()
origExporterOpts := exporterOpts
exporterOpts = []option.ClientOption{
option.WithEndpoint(monitoringServer.Endpoint),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),

// Override exporter options
origCreateExporterOptions := createExporterOptions
createExporterOptions = func(opts ...option.ClientOption) []option.ClientOption {
return []option.ClientOption{
option.WithEndpoint(monitoringServer.Endpoint), // Connect to mock
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
}
}
defer func() {
exporterOpts = origExporterOpts
createExporterOptions = origCreateExporterOptions
}()

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion spanner/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ func TestIteratorStopEarly(t *testing.T) {
}

func TestIteratorWithError(t *testing.T) {
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(context.Background(), "projects/my-project/instances/my-instance/databases/my-database", noop.NewMeterProvider())
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(context.Background(), "projects/my-project/instances/my-instance/databases/my-database", noop.NewMeterProvider(), "identity")
if err != nil {
t.Fatalf("failed to create metrics tracer factory: %v", err)
}
Expand Down
5 changes: 1 addition & 4 deletions spanner/spannertest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ func makeClient(t *testing.T) (*spanner.Client, *dbadmin.DatabaseAdminClient, *v
client, _, err = spanner.NewMultiEndpointClient(ctx, dbName(), gmeCfg, opts...)
os.Setenv("SPANNER_EMULATOR_HOST", old)
} else {
opts = append(opts,
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
option.WithEndpoint(srv.Addr))
opts = append(opts, option.WithGRPCConn(conn))
client, err = spanner.NewClient(ctx, dbName(), opts...)
}
if err != nil {
Expand Down

0 comments on commit 81413f3

Please sign in to comment.