Skip to content

Commit

Permalink
feat(storage): introduce dp detector based on grpc metrics (#11100)
Browse files Browse the repository at this point in the history
* feat: introduce dp detector based on grpc metrics

* revert changes to go.work.sum

* add context on having to still use an exporter

* update func doc

* rename and fix vet

* adjust where exporter is initialized

* address feedback

* inspect ResourceMetrics instead of using stdoutmetric exporter

* shutdown MeterProvider and ManualReader

* only include provider.Shutdown()

* update docs on return value and xs refactor

* address feedback

* address feedback on error wrapping and bucket prefix

* revert error sentinel
  • Loading branch information
frankyn authored Nov 13, 2024
1 parent c2bcbf2 commit 60c2323
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
42 changes: 42 additions & 0 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2/apierror"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/sdk/resource"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -324,6 +326,46 @@ var readCases = []readCase{
},
}

func TestIntegration_DetectDirectConnectivity(t *testing.T) {
ctx := skipHTTP("direct connectivity isn't available for json")
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) {
h := testHelper{t}
// Using Resoource Detector to detect if test is being ran inside GCE
// if so, the test expects Direct Connectivity to be detected.
// Otherwise, it will only validate that Direct Connectivity was not
// detected.
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/resourcedetectionprocessor/README.md
detectedAttrs, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector()))
if err != nil {
t.Fatalf("resource.New: %v", err)
}
attrs := detectedAttrs.Set()
if v, exists := attrs.Value("cloud.platform"); exists && v.AsString() == "gcp_compute_engine" {
v, exists = attrs.Value("cloud.region")
if !exists {
t.Fatalf("CheckDirectConnectivitySupported: region not detected")
}
region := v.AsString()
newBucketName := prefix + uidSpace.New()
newBucket := client.Bucket(newBucketName)
h.mustCreate(newBucket, testutil.ProjID(), &BucketAttrs{Location: region, LocationType: "region"})
defer h.mustDeleteBucket(newBucket)
err := CheckDirectConnectivitySupported(ctx, newBucketName)
if err != nil {
t.Fatalf("CheckDirectConnectivitySupported: %v", err)
}
} else {
err = CheckDirectConnectivitySupported(ctx, bucket)
if err == nil {
t.Fatal("CheckDirectConnectivitySupported: expected error but none returned")
}
if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") {
t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err)
}
}
})
}

func TestIntegration_BucketCreateDelete(t *testing.T) {
ctx := skipJSONReads(context.Background(), "no reads in test")
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) {
Expand Down
59 changes: 59 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ import (
"cloud.google.com/go/storage/internal"
"cloud.google.com/go/storage/internal/apiv2/storagepb"
"github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
raw "google.golang.org/api/storage/v1"
"google.golang.org/api/transport"
htransport "google.golang.org/api/transport/http"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/fieldmaskpb"
Expand Down Expand Up @@ -233,6 +238,60 @@ func NewGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e
return &Client{tc: tc}, nil
}

// CheckDirectConnectivitySupported checks if gRPC direct connectivity
// is available for a specific bucket from the environment where the client
// is running. A `nil` error represents Direct Connectivity was detected.
// Direct connectivity is expected to be available when running from inside
// GCP and connecting to a bucket in the same region.
//
// You can pass in [option.ClientOption] you plan on passing to [NewGRPCClient]
func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts ...option.ClientOption) error {
view := metric.NewView(
metric.Instrument{
Name: "grpc.client.attempt.duration",
Kind: metric.InstrumentKindHistogram,
},
metric.Stream{AttributeFilter: attribute.NewAllowKeysFilter("grpc.lb.locality")},
)
mr := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(mr), metric.WithView(view))
// Provider handles shutting down ManualReader
defer provider.Shutdown(ctx)
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: stats.NewMetrics("grpc.client.attempt.duration"),
OptionalLabels: []string{"grpc.lb.locality"},
}
combinedOpts := append(opts, WithDisabledClientMetrics(), option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})))
client, err := NewGRPCClient(ctx, combinedOpts...)
if err != nil {
return fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
if _, err = client.Bucket(bucket).Attrs(ctx); err != nil {
return fmt.Errorf("Bucket.Attrs: %w", err)
}
// Call manual reader to collect metric
rm := metricdata.ResourceMetrics{}
if err = mr.Collect(context.Background(), &rm); err != nil {
return fmt.Errorf("ManualReader.Collect: %w", err)
}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name == "grpc.client.attempt.duration" {
hist := m.Data.(metricdata.Histogram[float64])
for _, d := range hist.DataPoints {
v, present := d.Attributes.Value("grpc.lb.locality")
if present && v.AsString() != "" {
return nil
}
}
}
}
}
return errors.New("storage: direct connectivity not detected")
}

// Close closes the Client.
//
// Close need not be called at program exit.
Expand Down

0 comments on commit 60c2323

Please sign in to comment.