Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add grpc metrics experimental options #10984

Merged
merged 18 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion storage/experimental/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,26 @@ import (
"time"

"cloud.google.com/go/storage/internal"
"go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/option"
)

// WithReadStallTimeout provides a [ClientOption] that may be passed to [storage.NewClient].
// WithMetricInterval provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient].
// It sets how often to emit metrics [metric.WithInterval] when using
// [metric.NewPeriodicReader]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing the minimum setting is good. Maybe suggest time.Minute or higher as an interval in the godoc if exporting to cloud monitoring.

// When using Cloud Monitoring interval must be at minimum 1 [time.Minute].
func WithMetricInterval(metricInterval time.Duration) option.ClientOption {
return internal.WithMetricInterval.(func(time.Duration) option.ClientOption)(metricInterval)
}

// WithMetricExporter provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient].
// Set an alternate client-side metric Exporter to emit metrics through.
// Must implement [metric.Exporter]
func WithMetricExporter(ex *metric.Exporter) option.ClientOption {
return internal.WithMetricExporter.(func(*metric.Exporter) option.ClientOption)(ex)
}

// WithReadStallTimeout provides a [option.ClientOption] that may be passed to [storage.NewClient].
// It enables the client to retry stalled requests when starting a download from
// Cloud Storage. If the timeout elapses with no response from the server, the request
// is automatically retried.
Expand Down
2 changes: 1 addition & 1 deletion storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageCl

if !config.disableClientMetrics {
// Do not fail client creation if enabling metrics fails.
if metricsContext, err := enableClientMetrics(ctx, s); err == nil {
if metricsContext, err := enableClientMetrics(ctx, s, config); err == nil {
s.metricsContext = metricsContext
s.clientOption = append(s.clientOption, metricsContext.clientOpts...)
} else {
Expand Down
72 changes: 39 additions & 33 deletions storage/grpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ func newPreparedResource(ctx context.Context, project string, resourceOptions []
}

type metricsContext struct {
// project used by exporter
project string
// client options passed to gRPC channels
clientOpts []option.ClientOption
// instance of metric reader used by gRPC client-side metrics
Expand All @@ -154,29 +152,36 @@ func createHistogramView(name string, boundaries []float64) metric.View {
})
}

func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, error) {
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
if err != nil {
return nil, err
}
// Implementation requires a project, if one is not determined possibly user
// credentials. Then we will fail stating gRPC Metrics require a project-id.
if project == "" && preparedResource.projectToUse != "" {
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
}
// If projectTouse isn't the same as project provided to Storage client, then
// emit a log stating which project is being used to emit metrics to.
if project != preparedResource.projectToUse {
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
}
meOpts := []mexporter.Option{
mexporter.WithProjectID(preparedResource.projectToUse),
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
mexporter.WithCreateServiceTimeSeries(),
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
exporter, err := mexporter.New(meOpts...)
if err != nil {
return nil, err
func newGRPCMetricContext(ctx context.Context, project string, config storageConfig) (*metricsContext, error) {
var exporter metric.Exporter
meterOpts := []metric.Option{}
if config.metricExporter != nil {
exporter = *config.metricExporter
} else {
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
if err != nil {
return nil, err
}
meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource))
// Implementation requires a project, if one is not determined possibly user
// credentials. Then we will fail stating gRPC Metrics require a project-id.
if project == "" && preparedResource.projectToUse == "" {
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
}
// If projectTouse isn't the same as project provided to Storage client, then
// emit a log stating which project is being used to emit metrics to.
if project != preparedResource.projectToUse {
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
}
meOpts := []mexporter.Option{
mexporter.WithProjectID(preparedResource.projectToUse),
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
mexporter.WithCreateServiceTimeSeries(),
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
exporter, err = mexporter.New(meOpts...)
if err != nil {
return nil, err
}
}
// Metric views update histogram boundaries to be relevant to GCS
// otherwise default OTel histogram boundaries are used.
Expand All @@ -185,11 +190,13 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext,
createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()),
createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()),
}
provider := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(time.Minute))),
metric.WithResource(preparedResource.resource),
metric.WithView(metricViews...),
)
interval := time.Minute
if config.metricInterval > 0 {
interval = config.metricInterval
}
meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))),
metric.WithView(metricViews...))
provider := metric.NewMeterProvider(meterOpts...)
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add(
Expand All @@ -209,22 +216,21 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext,
option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})),
}
context := &metricsContext{
project: preparedResource.projectToUse,
clientOpts: opts,
provider: provider,
close: createShutdown(ctx, provider),
}
return context, nil
}

func enableClientMetrics(ctx context.Context, s *settings) (*metricsContext, error) {
func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) {
var project string
c, err := transport.Creds(ctx, s.clientOption...)
if err == nil {
project = c.ProjectID
}
// Enable client-side metrics for gRPC
metricsContext, err := newGRPCMetricContext(ctx, project)
metricsContext, err := newGRPCMetricContext(ctx, project, config)
if err != nil {
return nil, fmt.Errorf("gRPC Metrics: %w", err)
}
Expand Down
9 changes: 9 additions & 0 deletions storage/internal/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
package internal

var (
// WithMetricInterval is a function which is implemented by storage package.
// It sets how often to emit metrics when using NewPeriodicReader and must be
// greater than 1 minute.
WithMetricInterval any // func (*time.Duration) option.ClientOption

// WithMetricExporter is a function which is implemented by storage package.
// Set an alternate client-side metric Exporter to emit metrics through.
WithMetricExporter any // func (*metric.Exporter) option.ClientOption

// WithReadStallTimeout is a function which is implemented by storage package.
// It takes ReadStallTimeoutConfig as inputs and returns a option.ClientOption.
WithReadStallTimeout any // func (*ReadStallTimeoutConfig) option.ClientOption
Expand Down
36 changes: 34 additions & 2 deletions storage/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/storage/experimental"
storageinternal "cloud.google.com/go/storage/internal"
"go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
)
Expand All @@ -35,7 +36,9 @@ const (
)

func init() {
// initialize experimental option.
// initialize experimental options
storageinternal.WithMetricExporter = withMetricExporter
storageinternal.WithMetricInterval = withMetricInterval
storageinternal.WithReadStallTimeout = withReadStallTimeout
}

Expand Down Expand Up @@ -69,12 +72,13 @@ func getDynamicReadReqInitialTimeoutSecFromEnv(defaultVal time.Duration) time.Du
return val
}

// storageConfig contains the Storage client option configuration that can be
// set through storageClientOptions.
type storageConfig struct {
useJSONforReads bool
readAPIWasSet bool
disableClientMetrics bool
metricExporter *metric.Exporter
metricInterval time.Duration
readStallTimeoutConfig *experimental.ReadStallTimeoutConfig
}

Expand Down Expand Up @@ -160,6 +164,34 @@ func (w *withDisabledClientMetrics) ApplyStorageOpt(c *storageConfig) {
c.disableClientMetrics = w.disabledClientMetrics
}

type withMeterOptions struct {
internaloption.EmbeddableAdapter
// set sampling interval
interval time.Duration
}

func withMetricInterval(interval time.Duration) option.ClientOption {
return &withMeterOptions{interval: interval}
}

func (w *withMeterOptions) ApplyStorageOpt(c *storageConfig) {
c.metricInterval = w.interval
}

type withMetricExporterConfig struct {
internaloption.EmbeddableAdapter
// exporter override
metricExporter *metric.Exporter
}

func withMetricExporter(ex *metric.Exporter) option.ClientOption {
return &withMetricExporterConfig{metricExporter: ex}
}

func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) {
c.metricExporter = w.metricExporter
}

// WithReadStallTimeout is an option that may be passed to [NewClient].
// It enables the client to retry the stalled read request, happens as part of
// storage.Reader creation. As the name suggest, timeout is adjusted dynamically
Expand Down
42 changes: 42 additions & 0 deletions storage/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"cloud.google.com/go/storage/experimental"
mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
)
Expand All @@ -37,6 +38,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: true,
readAPIWasSet: true,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -46,6 +49,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: true,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -55,6 +60,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: true,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -64,6 +71,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -73,6 +82,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -82,6 +93,19 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: true,
metricInterval: 0,
metricExporter: nil,
},
},
{
desc: "set metrics interval",
opts: []option.ClientOption{experimental.WithMetricInterval(time.Minute * 5)},
want: storageConfig{
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
metricInterval: time.Minute * 5,
metricExporter: nil,
},
},
{
Expand Down Expand Up @@ -128,6 +152,24 @@ func TestApplyStorageOpt(t *testing.T) {
}
}

func TestSetCustomExporter(t *testing.T) {
exporter, err := mexporter.New()
if err != nil {
t.Errorf("TestSetCustomExporter: %v", err)
}
want := storageConfig{
metricExporter: &exporter,
}
var got storageConfig
opt := experimental.WithMetricExporter(&exporter)
if storageOpt, ok := opt.(storageClientOption); ok {
storageOpt.ApplyStorageOpt(&got)
}
if got.metricExporter != want.metricExporter {
t.Errorf("TestSetCustomExpoerter: metricExporter want=%v, got=%v", want.metricExporter, got.metricExporter)
}
}

func TestGetDynamicReadReqInitialTimeoutSecFromEnv(t *testing.T) {
defaultValue := 10 * time.Second

Expand Down
Loading