diff --git a/spanner/client.go b/spanner/client.go index a795e0f7a32c..1e21f5934029 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -507,8 +507,7 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf metricsProvider = noop.NewMeterProvider() } - // Create a OpenTelemetry metrics configuration - metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider) + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider, config.Compression, opts...) if err != nil { return nil, err } diff --git a/spanner/client_test.go b/spanner/client_test.go index 82b2a9a2d493..29b3323fbb32 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -76,7 +76,7 @@ func setupMockedTestServerWithConfigAndGCPMultiendpointPool(t *testing.T, config if len(token) != 1 { return status.Errorf(codes.Internal, "unexpected number of api client token headers: %v", len(token)) } - if !strings.HasPrefix(token[0], "gl-go/") { + if !strings.Contains(token[0], "gl-go/") { return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0]) } if !strings.Contains(token[0], "gccl/") { diff --git a/spanner/metrics.go b/spanner/metrics.go index cdc4b3df3d79..a765e004f15a 100644 --- a/spanner/metrics.go +++ b/spanner/metrics.go @@ -21,11 +21,10 @@ import ( "errors" "fmt" "hash/fnv" - "strings" - "log" "os" "strconv" + "strings" "time" "github.com/google/uuid" @@ -167,7 +166,12 @@ var ( return "global" } - exporterOpts = []option.ClientOption{} + // GCM exporter should use the same options as Spanner client + // createExporterOptions takes Spanner client options and returns exporter options + // Overwritten in tests + createExporterOptions = func(spannerOpts ...option.ClientOption) []option.ClientOption { + return spannerOpts + } ) type metricInfo struct { @@ -193,7 +197,7 @@ type builtinMetricsTracerFactory struct { attemptCount metric.Int64Counter // Counter for the number of attempts. } -func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider) (*builtinMetricsTracerFactory, error) { +func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider, compression string, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) { clientUID, err := generateClientUID() if err != nil { log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID) @@ -224,7 +228,7 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP var meterProvider *sdkmetric.MeterProvider if metricsProvider == nil { // Create default meter provider - mpOptions, err := builtInMeterProviderOptions(project) + mpOptions, err := builtInMeterProviderOptions(project, compression, opts...) if err != nil { return tracerFactory, err } @@ -247,8 +251,9 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP return tracerFactory, err } -func builtInMeterProviderOptions(project string) ([]sdkmetric.Option, error) { - defaultExporter, err := newMonitoringExporter(context.Background(), project, exporterOpts...) +func builtInMeterProviderOptions(project, compression string, opts ...option.ClientOption) ([]sdkmetric.Option, error) { + allOpts := createExporterOptions(opts...) + defaultExporter, err := newMonitoringExporter(context.Background(), project, compression, allOpts...) if err != nil { return nil, err } diff --git a/spanner/metrics_monitoring_exporter.go b/spanner/metrics_monitoring_exporter.go index b0d098274c51..8eb79323fdd0 100644 --- a/spanner/metrics_monitoring_exporter.go +++ b/spanner/metrics_monitoring_exporter.go @@ -29,6 +29,8 @@ import ( monitoring "cloud.google.com/go/monitoring/apiv3/v2" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "cloud.google.com/go/spanner/internal" + "github.com/googleapis/gax-go/v2/callctx" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/sdk/metric" otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -37,6 +39,7 @@ import ( googlemetricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/grpc/codes" + "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -78,17 +81,19 @@ type monitoringExporter struct { client *monitoring.MetricClient shutdownOnce sync.Once projectID string + compression string } -func newMonitoringExporter(ctx context.Context, project string, opts ...option.ClientOption) (*monitoringExporter, error) { +func newMonitoringExporter(ctx context.Context, project, compression string, opts ...option.ClientOption) (*monitoringExporter, error) { client, err := monitoring.NewMetricClient(ctx, opts...) if err != nil { return nil, err } return &monitoringExporter{ - client: client, - shutdown: make(chan struct{}), - projectID: project, + client: client, + shutdown: make(chan struct{}), + projectID: project, + compression: compression, }, nil } @@ -135,14 +140,16 @@ func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetr } name := fmt.Sprintf("projects/%s", me.projectID) - + ctx = callctx.SetHeaders(ctx, "x-goog-api-client", "gccl/"+internal.Version) + if me.compression == gzip.Name { + ctx = callctx.SetHeaders(ctx, requestsCompressionHeader, gzip.Name) + } errs := []error{err} for i := 0; i < len(tss); i += sendBatchSize { j := i + sendBatchSize if j >= len(tss) { j = len(tss) } - req := &monitoringpb.CreateTimeSeriesRequest{ Name: name, TimeSeries: tss[i:j], diff --git a/spanner/metrics_test.go b/spanner/metrics_test.go index eb6512c7ab02..2fc60e1682ec 100644 --- a/spanner/metrics_test.go +++ b/spanner/metrics_test.go @@ -89,14 +89,18 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { } go monitoringServer.Serve() defer monitoringServer.Shutdown() - origExporterOpts := exporterOpts - exporterOpts = []option.ClientOption{ - option.WithEndpoint(monitoringServer.Endpoint), - option.WithoutAuthentication(), - option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + + // Override exporter options + origCreateExporterOptions := createExporterOptions + createExporterOptions = func(opts ...option.ClientOption) []option.ClientOption { + return []option.ClientOption{ + option.WithEndpoint(monitoringServer.Endpoint), // Connect to mock + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } } defer func() { - exporterOpts = origExporterOpts + createExporterOptions = origCreateExporterOptions }() tests := []struct { diff --git a/spanner/read_test.go b/spanner/read_test.go index 99ab2362a1ee..9f6f3602595b 100644 --- a/spanner/read_test.go +++ b/spanner/read_test.go @@ -1778,7 +1778,7 @@ func TestIteratorStopEarly(t *testing.T) { } func TestIteratorWithError(t *testing.T) { - metricsTracerFactory, err := newBuiltinMetricsTracerFactory(context.Background(), "projects/my-project/instances/my-instance/databases/my-database", noop.NewMeterProvider()) + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(context.Background(), "projects/my-project/instances/my-instance/databases/my-database", noop.NewMeterProvider(), "identity") if err != nil { t.Fatalf("failed to create metrics tracer factory: %v", err) } diff --git a/spanner/spannertest/integration_test.go b/spanner/spannertest/integration_test.go index 22165c0ba34d..dbdfa3ac8aeb 100644 --- a/spanner/spannertest/integration_test.go +++ b/spanner/spannertest/integration_test.go @@ -142,10 +142,7 @@ func makeClient(t *testing.T) (*spanner.Client, *dbadmin.DatabaseAdminClient, *v client, _, err = spanner.NewMultiEndpointClient(ctx, dbName(), gmeCfg, opts...) os.Setenv("SPANNER_EMULATOR_HOST", old) } else { - opts = append(opts, - option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), - option.WithoutAuthentication(), - option.WithEndpoint(srv.Addr)) + opts = append(opts, option.WithGRPCConn(conn)) client, err = spanner.NewClient(ctx, dbName(), opts...) } if err != nil {