Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
frankyn committed Aug 15, 2024
1 parent ddf0d41 commit 210280d
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 71 deletions.
15 changes: 11 additions & 4 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// go:build grpc
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -34,6 +35,7 @@ import (
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/api/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
Expand Down Expand Up @@ -74,10 +76,10 @@ const (
// defaultGRPCOptions returns a set of the default client options
// for gRPC client initialization.
func defaultGRPCOptions() []option.ClientOption {

defaults := []option.ClientOption{
option.WithGRPCConnectionPool(defaultConnPoolSize),
}

// Set emulator options for gRPC if an emulator was specified. Note that in a
// hybrid client, STORAGE_EMULATOR_HOST will set the host to use for HTTP and
// STORAGE_EMULATOR_HOST_GRPC will set the host to use for gRPC (when using a
Expand Down Expand Up @@ -119,15 +121,20 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageCl
s.clientOption = append(defaultGRPCOptions(), s.clientOption...)
// Disable all gax-level retries in favor of retry logic in the veneer client.
s.gax = append(s.gax, gax.WithRetry(nil))

config := newStorageConfig(s.clientOption...)

if config.readAPIWasSet {
return nil, errors.New("storage: GRPC is incompatible with any option that specifies an API for reads")
}
if !config.disableClientMetrics {
project := ""
c, err := transport.Creds(ctx, s.clientOption...)
if err == nil {
project = c.ProjectID
}
// TODO: detect project id
log.Println("Testing using gRPC Metrics")
project := "spec-test-ruby-samples"
log.Printf("Using project: %v", project)

// Enable client-side metrics for gRPC
// TODO: detect endpoint
Expand Down
174 changes: 123 additions & 51 deletions storage/grpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package storage

import (
"context"
"log"
"strings"
"time"

mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/google/uuid"
Expand All @@ -32,42 +34,60 @@ import (
)

const (
monitored_resource_name = "storage.googleapis.com/Client"
metric_format_prefix = "storage.googleapis.com/client/"
metric_lb_locality_label = "grpc.lb.locality"
monitored_resource_name = "storage.googleapis.com/Client"
)

func getMetricsToEnable() []estats.Metric {
return []estats.Metric{"grpc.lb.wrr.rr_fallback",
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
"grpc.lb.wrr.endpoint_weight_stale",
"grpc.lb.wrr.endpoint_weights",
"grpc.lb.rls.cache_entries",
"grpc.lb.rls.cache_size",
"grpc.lb.rls.default_target_picks",
"grpc.lb.rls.target_picks",
"grpc.lb.rls.failed_picks",
// "grpc.xds_client.connected",
// "grpc.xds_client.server_failure",
// "grpc.xds_client.resource_updates_valid",
// "grpc.xds_client.resource_updates_invalid",
// "grpc.xds_client.resources",
func latencyHistogramBoundaries() []float64 {
boundaries := []float64{}
boundary := 0.0
increment := 0.002
// 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range
for i := 0; i < 50; i += 1 {
boundaries = append(boundaries, boundary)
// increment by 2ms
boundary += increment
}
// For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes
for i := 0; i < 150 && boundary < 300; i += 1 {
boundaries = append(boundaries, boundary)
if i != 0 && i%10 == 0 {
increment *= 2
}
boundary += increment
}
return boundaries
}

func getMetricsEnabledByDefault() []estats.Metric {
return []estats.Metric{
"grpc.client.attempt.sent_total_compressed_message_size",
"grpc.client.attempt.rcvd_total_compressed_message_size",
"grpc.client.attempt.started",
"grpc.client.attempt.duration",
"grpc.client.call.duration",
func sizeHistogramBoundaries() []float64 {
kb := 1024.0
mb := 1024.0 * kb
gb := 1024.0 * mb

boundaries := []float64{}
boundary := 0.0
increment := 128 * kb

for len(boundaries) < 200 && boundary <= 16*gb {
boundaries = append(boundaries, boundary)
boundary += increment
if boundary >= 4*mb {
increment *= 2
}
}
return boundaries
}

func getViewMasks(metrics []estats.Metric) []sdkmetric.View {
func getViewMasks(defaultMetrics map[estats.Metric]bool, additionalMetrics []estats.Metric) []sdkmetric.View {
views := []sdkmetric.View{}
for _, m := range metrics {
for k, include := range defaultMetrics {
if !include {
continue
}
views = append(views, sdkmetric.NewView(sdkmetric.Instrument{
Name: string(k),
}, sdkmetric.Stream{Name: strings.ReplaceAll(string(k), ".", "/")}))
}
for _, m := range additionalMetrics {
views = append(views, sdkmetric.NewView(sdkmetric.Instrument{
Name: string(m),
}, sdkmetric.Stream{Name: strings.ReplaceAll(string(m), ".", "/")}))
Expand All @@ -76,7 +96,7 @@ func getViewMasks(metrics []estats.Metric) []sdkmetric.View {
}

func metricFormatter(m metricdata.Metrics) string {
return metric_format_prefix + m.Name
return "storage.googleapis.com/client/" + m.Name
}

func gcpAttributeExpectedDefaults() []attribute.KeyValue {
Expand All @@ -86,25 +106,33 @@ func gcpAttributeExpectedDefaults() []attribute.KeyValue {
{Key: "host_id", Value: attribute.StringValue("unknown")}}
}

func getPreparedResourceUsingGCPDetector(ctx context.Context, project string) (*resource.Resource, error) {
gcpDetector := []resource.Option{resource.WithDetectors(gcp.NewDetector())}
return getPreparedResource(ctx, project, gcpDetector)
// Added to help with tests
type preparedResource struct {
projectToUse string
resource *resource.Resource
}

// Added to help with tests
func getPreparedResource(ctx context.Context, project string, resourceOptions []resource.Option) (*resource.Resource, error) {
func createPreparedResource(ctx context.Context, project string, resourceOptions []resource.Option) (*preparedResource, error) {
detectedAttrs, err := resource.New(ctx, resourceOptions...)
if err != nil {
return nil, err
}
preparedResource := &preparedResource{}

s := detectedAttrs.Set()
p, present := s.Value("cloud.account.id")
if present {
preparedResource.projectToUse = p.AsString()
} else {
preparedResource.projectToUse = project
}
updates := []attribute.KeyValue{}
for _, kv := range gcpAttributeExpectedDefaults() {
if val, present := s.Value(kv.Key); !present || val.AsString() == "" {
updates = append(updates, attribute.KeyValue{Key: kv.Key, Value: kv.Value})
}
}
return resource.New(
r, err := resource.New(
ctx,
resource.WithAttributes(
attribute.KeyValue{Key: "gcp.resource_type", Value: attribute.StringValue(monitored_resource_name)},
Expand All @@ -116,6 +144,11 @@ func getPreparedResource(ctx context.Context, project string, resourceOptions []
// Last duplicate key / value wins
resource.WithAttributes(updates...),
)
if err != nil {
return nil, err
}
preparedResource.resource = r
return preparedResource, nil
}

type internalMetricsConfig struct {
Expand All @@ -134,38 +167,77 @@ type internalMetricsContext struct {

// TODO: format errors emitted here.
func gRPCMetricProvider(ctx context.Context, config internalMetricsConfig) (*internalMetricsContext, error) {
preparedResource, err := createPreparedResource(ctx, config.project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
if err != nil {
return nil, err
}
if config.project != preparedResource.projectToUse {
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, config.project)
}
metricsToEnable := []estats.Metric{
"grpc.lb.wrr.rr_fallback",
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
"grpc.lb.wrr.endpoint_weight_stale",
"grpc.lb.wrr.endpoint_weights",
"grpc.lb.rls.cache_entries",
"grpc.lb.rls.cache_size",
"grpc.lb.rls.default_target_picks",
"grpc.lb.rls.target_picks",
"grpc.lb.rls.failed_picks",
}
defaultMetrics := opentelemetry.DefaultMetrics()
metricViews := getViewMasks(defaultMetrics.Metrics(), metricsToEnable)
metricViews = append(metricViews, []sdkmetric.View{
sdkmetric.NewView(sdkmetric.Instrument{
Name: "grpc/client/attempt/duration",
Kind: sdkmetric.InstrumentKindHistogram,
}, sdkmetric.Stream{
Name: "grpc/client/attempt/duration",
Description: "A view of grpc/client/attempt/duration with histogram boundaries more appropriate for Google Cloud Storage RPCs",
Unit: "s",
Aggregation: sdkmetric.AggregationExplicitBucketHistogram{Boundaries: latencyHistogramBoundaries()},
}),
sdkmetric.NewView(sdkmetric.Instrument{
Name: "grpc/client/attempt/rcvd_total_compressed_message_size",
Kind: sdkmetric.InstrumentKindHistogram,
}, sdkmetric.Stream{
Name: "grpc/client/attempt/rcvd_total_compressed_message_size",
Description: "A view of grpc/client/attempt/rcvd_total_compressed_message_size with histogram boundaries more appropriate for Google Cloud Storage RPCs",
Unit: "By",
Aggregation: sdkmetric.AggregationExplicitBucketHistogram{Boundaries: sizeHistogramBoundaries()},
}),
sdkmetric.NewView(sdkmetric.Instrument{
Name: "grpc/client/attempt/rcvd_total_compressed_message_size",
Kind: sdkmetric.InstrumentKindHistogram,
}, sdkmetric.Stream{
Name: "grpc/client/attempt/sent_total_compressed_message_size",
Description: "A view of grpc/client/attempt/sent_total_compressed_message_size with histogram boundaries more appropriate for Google Cloud Storage RPCs",
Unit: "By",
Aggregation: sdkmetric.AggregationExplicitBucketHistogram{Boundaries: sizeHistogramBoundaries()},
}),
}...)
exporter, err := mexporter.New(
// mexporter.WithMonitoringClientOptions(option.WithEndpoint(config.host)),
mexporter.WithProjectID(config.project),
mexporter.WithProjectID(preparedResource.projectToUse),
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
mexporter.WithCreateServiceTimeSeries(),
mexporter.WithMonitoredResourceDescription(monitored_resource_name, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"}))
if err != nil {
return nil, err
}
preparedResource, err := getPreparedResourceUsingGCPDetector(ctx, config.project)
if err != nil {
return nil, err
}
allMetrics := append(getMetricsEnabledByDefault(), getMetricsToEnable()...)
metricViews := getViewMasks(allMetrics)
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)),
sdkmetric.WithResource(preparedResource),
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, sdkmetric.WithInterval(time.Second*60))),
sdkmetric.WithResource(preparedResource.resource),
sdkmetric.WithView(metricViews...),
)
context := &internalMetricsContext{[]option.ClientOption{togRPCDialOption(provider)}, provider, createShutdown(ctx, provider)}
return context, nil
}

func togRPCDialOption(provider *sdkmetric.MeterProvider) option.ClientOption {
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add(getMetricsToEnable()...),
OptionalLabels: []string{metric_lb_locality_label},
Metrics: defaultMetrics.Add(metricsToEnable...),
OptionalLabels: []string{"grpc.lb.locality"},
}
do := option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}))
return do
context := &internalMetricsContext{[]option.ClientOption{do}, provider, createShutdown(ctx, provider)}
return context, nil
}

func createShutdown(ctx context.Context, provider *sdkmetric.MeterProvider) func() {
Expand Down
47 changes: 31 additions & 16 deletions storage/grpc_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestMetrics(t *testing.T) {
log.Fatalf("Error setting up gRPC client: %v", err)
}
defer grpcClient.Close()
bucket := grpcClient.Bucket("-frank")
bucket := grpcClient.Bucket("anima-frank-gcs-grpc-team-test-central1")
it := bucket.Objects(ctx, nil)
for {
_, err := it.Next()
Expand All @@ -48,30 +48,45 @@ func TestMetrics(t *testing.T) {
}

func TestGetViewMasks(t *testing.T) {
testMetricName := estats.Metric("test.metric.name")
views := getViewMasks([]estats.Metric{testMetricName})
for _, mask := range views {
got, ok := mask(sdkmetric.Instrument{Name: string(testMetricName)})
if !ok {
t.Errorf("getViewMasks: did not find %v", testMetricName)
}
want := "test/metric/name"
if got.Name != want {
t.Errorf("got: %v, want: %v\n", got, want)
testDefaultMetrics := map[estats.Metric]bool{
estats.Metric("default.metric.name"): true,
}
testAdditionalMetrics := []estats.Metric{"test.metric.name"}
views := getViewMasks(testDefaultMetrics, testAdditionalMetrics)
wantSlice := []struct {
inputFormat string
outputFormat string
}{
{
inputFormat: "default.metric.name",
outputFormat: "default/metric/name",
},
{
inputFormat: "test.metric.name",
outputFormat: "test/metric/name",
},
}
// Order matters for the wantSlice and views slice
for idx, want := range wantSlice {
stream, b := views[idx](sdkmetric.Instrument{
Name: want.inputFormat,
})
if !b || stream.Name != want.outputFormat {
t.Errorf("getViewMasks: For metric: %v got=%v, want=%v", want.inputFormat, stream.Name, want.outputFormat)
}
}
}

func TestMetricFormatter(t *testing.T) {
want := "storage.googleapis.com/client/t"
s := metricdata.Metrics{Name: "t", Description: "", Unit: "", Data: nil}
want := "storage.googleapis.com/client/metric"
s := metricdata.Metrics{Name: "metric", Description: "", Unit: "", Data: nil}
got := metricFormatter(s)
if want != got {
t.Errorf("got: %v, want %v", got, want)
}
}

func TestGetPreparedResource(t *testing.T) {
func TestCreatePreparedResource(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
desc string
Expand Down Expand Up @@ -135,11 +150,11 @@ func TestGetPreparedResource(t *testing.T) {
} {
t.Run(test.desc, func(t *testing.T) {
resourceOptions := []resource.Option{resource.WithAttributes(test.detectedAttributes...)}
result, err := getPreparedResource(ctx, "project", resourceOptions)
result, err := createPreparedResource(ctx, "project", resourceOptions)
if err != nil {
t.Errorf("getPreparedResource: %v", err)
}
resultSet := result.Set()
resultSet := result.resource.Set()
for _, want := range test.wantAttributes.ToSlice() {
got, exists := resultSet.Value(want.Key)
if !exists {
Expand Down

0 comments on commit 210280d

Please sign in to comment.