diff --git a/firestore/go.mod b/firestore/go.mod index bcad1877cc7c..b746f7172bc5 100644 --- a/firestore/go.mod +++ b/firestore/go.mod @@ -3,7 +3,7 @@ module cloud.google.com/go/firestore go 1.21 require ( - cloud.google.com/go v0.116.0 + cloud.google.com/go v0.117.0 cloud.google.com/go/longrunning v0.6.2 github.com/google/go-cmp v0.6.0 github.com/googleapis/gax-go/v2 v2.14.0 diff --git a/firestore/go.sum b/firestore/go.sum index 53a5ab8f30ea..4d040bfd5231 100644 --- a/firestore/go.sum +++ b/firestore/go.sum @@ -1,6 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= -cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= +cloud.google.com/go v0.117.0 h1:Z5TNFfQxj7WG2FgOGX1ekC5RiXrYgms6QscOm32M/4s= +cloud.google.com/go v0.117.0/go.mod h1:ZbwhVTb1DBGt2Iwb3tNO6SEK4q+cplHZmLWH+DelYYc= cloud.google.com/go/auth v0.13.0 h1:8Fu8TZy167JkW8Tj3q7dIkr2v4cndv41ouecJx0PAHs= cloud.google.com/go/auth v0.13.0/go.mod h1:COOjD9gwfKNKz+IIduatIhYJQIc0mG3H102r/EMxX6Q= cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU= diff --git a/firestore/integration_test.go b/firestore/integration_test.go index d2a9ab727cb2..674e971f4423 100644 --- a/firestore/integration_test.go +++ b/firestore/integration_test.go @@ -73,6 +73,7 @@ const ( envPrivateKey = "GCLOUD_TESTS_GOLANG_FIRESTORE_KEY" envDatabases = "GCLOUD_TESTS_GOLANG_FIRESTORE_DATABASES" envEmulator = "FIRESTORE_EMULATOR_HOST" + indexBuilding = "index is currently building" ) var ( @@ -260,12 +261,14 @@ func handleCreateIndexResp(ctx context.Context, indexNames []string, wg *sync.Wa // deleteIndexes deletes composite indexes created in createIndexes function func deleteIndexes(ctx context.Context, indexNames []string) { for _, indexName := range indexNames { - err := iAdminClient.DeleteIndex(ctx, &adminpb.DeleteIndexRequest{ - Name: indexName, + testutil.RetryWithoutTest(5, 5*time.Second, func(r *testutil.R) { + err := iAdminClient.DeleteIndex(ctx, &adminpb.DeleteIndexRequest{ + Name: indexName, + }) + if err != nil { + r.Errorf("Failed to delete index \"%s\": %+v\n", indexName, err) + } }) - if err != nil { - log.Printf("Failed to delete index \"%s\": %+v\n", indexName, err) - } } } @@ -2437,7 +2440,7 @@ func TestDetectProjectID(t *testing.T) { ts := testutil.ErroringTokenSource{} // Try to use creds without project ID. _, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(ts)) - if err == nil || err.Error() != "firestore: see the docs on DetectProjectID" { + if err == nil || err.Error() != "unable to detect projectID, please refer to docs for DetectProjectID" { t.Errorf("expected an error while using TokenSource that does not have a project ID") } } @@ -2931,28 +2934,39 @@ func TestIntegration_AggregationQueries(t *testing.T) { } for _, tc := range testcases { - var aggResult AggregationResult - var err error - if tc.runInTransaction { - client.RunTransaction(ctx, func(ctx context.Context, tx *Transaction) error { - aggResult, err = tc.aggregationQuery.Transaction(tx).Get(ctx) - return err + t.Run(tc.desc, func(t *testing.T) { + testutil.Retry(t, 5, 5*time.Second, func(r *testutil.R) { + var aggResult AggregationResult + var err error + if tc.runInTransaction { + client.RunTransaction(ctx, func(ctx context.Context, tx *Transaction) error { + aggResult, err = tc.aggregationQuery.Transaction(tx).Get(ctx) + return err + }) + } else { + aggResult, err = tc.aggregationQuery.Get(ctx) + } + + // Retry only if index building is in progress + s, ok := status.FromError(err) + if err != nil && ok && s != nil && s.Code() != codes.FailedPrecondition && + strings.Contains(s.Message(), indexBuilding) { + r.Errorf("Get: %v", err) + return + } + + // Compare expected and actual results + if err != nil && !tc.wantErr { + r.Fatalf("got: %v, want: nil", err) + } + if err == nil && tc.wantErr { + r.Fatalf("got: %v, wanted error", err) + } + if !reflect.DeepEqual(aggResult, tc.result) { + r.Fatalf("got: %v, want: %v", aggResult, tc.result) + } }) - } else { - aggResult, err = tc.aggregationQuery.Get(ctx) - } - if err != nil && !tc.wantErr { - t.Errorf("%s: got: %v, want: nil", tc.desc, err) - continue - } - if err == nil && tc.wantErr { - t.Errorf("%s: got: %v, wanted error", tc.desc, err) - continue - } - if !reflect.DeepEqual(aggResult, tc.result) { - t.Errorf("%s: got: %v, want: %v", tc.desc, aggResult, tc.result) - continue - } + }) } } @@ -3313,33 +3327,51 @@ func TestIntegration_FindNearest(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - iter := tc.vq.Documents(ctx) - gotDocs, err := iter.GetAll() - if err != nil { - t.Fatalf("GetAll: %+v", err) - } - - if len(gotDocs) != len(tc.wantBeans) { - t.Fatalf("Expected %v results, got %d", len(tc.wantBeans), len(gotDocs)) - } - - for i, doc := range gotDocs { - var gotBean coffeeBean - if len(tc.wantResField) != 0 { - _, ok := doc.Data()[tc.wantResField] - if !ok { - t.Errorf("Expected %v field to exist in %v", tc.wantResField, doc.Data()) - } + testutil.Retry(t, 5, 5*time.Second, func(r *testutil.R) { + // Get all documents + iter := tc.vq.Documents(ctx) + gotDocs, err := iter.GetAll() + + // Retry only if index building is in progress + s, ok := status.FromError(err) + if err != nil && ok && s != nil && s.Code() != codes.FailedPrecondition && + strings.Contains(s.Message(), indexBuilding) { + r.Errorf("GetAll: %v", err) + return } - err := doc.DataTo(&gotBean) + if err != nil { - t.Errorf("#%v: DataTo: %+v", doc.Ref.ID, err) - continue + t.Fatalf("GetAll: %+v", err) + } + + // Compare expected and actual results length + if len(gotDocs) != len(tc.wantBeans) { + t.Fatalf("Expected %v results, got %d", len(tc.wantBeans), len(gotDocs)) } - if tc.wantBeans[i].ID != gotBean.ID { - t.Errorf("#%v: want: %v, got: %v", i, beans[i].ID, gotBean.ID) + + // Compare results + for i, doc := range gotDocs { + var gotBean coffeeBean + + // Compare expected and actual result field + if len(tc.wantResField) != 0 { + _, ok := doc.Data()[tc.wantResField] + if !ok { + t.Errorf("Expected %v field to exist in %v", tc.wantResField, doc.Data()) + } + } + + // Compare expected and actual document ID + err := doc.DataTo(&gotBean) + if err != nil { + t.Errorf("#%v: DataTo: %+v", doc.Ref.ID, err) + continue + } + if tc.wantBeans[i].ID != gotBean.ID { + t.Errorf("#%v: want: %v, got: %v", i, beans[i].ID, gotBean.ID) + } } - } + }) }) } } diff --git a/internal/generated/snippets/go.mod b/internal/generated/snippets/go.mod index 413ca9a876e0..977b277f878a 100644 --- a/internal/generated/snippets/go.mod +++ b/internal/generated/snippets/go.mod @@ -5,7 +5,7 @@ go 1.21.13 toolchain go1.23.0 require ( - cloud.google.com/go v0.116.0 + cloud.google.com/go v0.117.0 cloud.google.com/go/accessapproval v1.8.2 cloud.google.com/go/accesscontextmanager v1.9.2 cloud.google.com/go/advisorynotifications v0.0.0-00010101000000-000000000000 diff --git a/logging/go.mod b/logging/go.mod index 4e66c6f30771..90ae203ac04c 100644 --- a/logging/go.mod +++ b/logging/go.mod @@ -3,7 +3,7 @@ module cloud.google.com/go/logging go 1.21 require ( - cloud.google.com/go v0.116.0 + cloud.google.com/go v0.117.0 cloud.google.com/go/compute/metadata v0.6.0 cloud.google.com/go/iam v1.2.2 cloud.google.com/go/longrunning v0.6.2 diff --git a/logging/go.sum b/logging/go.sum index d287b46b123c..1f50deae58a9 100644 --- a/logging/go.sum +++ b/logging/go.sum @@ -1,6 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= -cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= +cloud.google.com/go v0.117.0 h1:Z5TNFfQxj7WG2FgOGX1ekC5RiXrYgms6QscOm32M/4s= +cloud.google.com/go v0.117.0/go.mod h1:ZbwhVTb1DBGt2Iwb3tNO6SEK4q+cplHZmLWH+DelYYc= cloud.google.com/go/auth v0.13.0 h1:8Fu8TZy167JkW8Tj3q7dIkr2v4cndv41ouecJx0PAHs= cloud.google.com/go/auth v0.13.0/go.mod h1:COOjD9gwfKNKz+IIduatIhYJQIc0mG3H102r/EMxX6Q= cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU= diff --git a/logging/internal/testing/retry.go b/logging/internal/testing/retry.go new file mode 100644 index 000000000000..ae9a7ceed153 --- /dev/null +++ b/logging/internal/testing/retry.go @@ -0,0 +1,110 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var defaultMaxAttempts = 10 +var defaultSleep = 10 * time.Second +var defaultRetryableCodes = map[codes.Code]bool{ + codes.Unavailable: true, +} + +// Iterator is a wrapper interface type for iterators in the logadmin +// library that have a Next function that gets the next item/error, or returns +// nil/iterator.Done if the object has no next item. +type Iterator[T any] interface { + Next() (*T, error) +} + +// handleError handles the given error for the retry attempt. +func handleError(r *testutil.R, err error) { + if err != nil { + s, ok := status.FromError(err) + + // Throw a fatal error if the error is not retryable or if it cannot be converted into + // a status object. + if ok && !defaultRetryableCodes[s.Code()] { + r.Fatalf("%+v\n", err) + } else if ok { + r.Errorf("%+v\n", err) + } else { + r.Fatalf("%+v\n", err) + } + } +} + +// Retry is a wrapper around testutil.Retry that retries the test function on Unavailable errors, otherwise, Fatalfs. +func Retry(t *testing.T, f func(r *testutil.R) error) bool { + retryFunc := func(r *testutil.R) { + err := f(r) + handleError(r, err) + } + return testutil.Retry(t, defaultMaxAttempts, defaultSleep, retryFunc) +} + +// RetryAndExpectError retries the test function on Unavailable errors, otherwise passes +// if a different error was thrown. If no non-retryable error is returned, fails. +func RetryAndExpectError(t *testing.T, f func(r *testutil.R) error) bool { + retryFunc := func(r *testutil.R) { + err := f(r) + + if err != nil { + s, ok := status.FromError(err) + + // Only retry on retryable errors, otherwise pass. + if ok && defaultRetryableCodes[s.Code()] { + r.Errorf("%+v\n", err) + } + } else { + r.Fatalf("got no error, expected one") + } + } + + return testutil.Retry(t, defaultMaxAttempts, defaultSleep, retryFunc) +} + +// RetryIteratorNext is a wrapper around testutil.Retry that retries the given iterator's Next function +// and returns the next object, retrying if a retryable error is found. If a non-retryable error is found, fail +// the test. +func RetryIteratorNext[T any](t *testing.T, it Iterator[T]) (*T, bool) { + var next *T + var err error + retryFunc := func(r *testutil.R) { + next, err = it.Next() + if err != nil { + if err == iterator.Done { + return + } + + handleError(r, err) + } + } + testutil.Retry(t, defaultMaxAttempts, defaultSleep, retryFunc) + if err == iterator.Done { + return nil, true + } + return next, false +} diff --git a/logging/logadmin/metrics_test.go b/logging/logadmin/metrics_test.go index afcbe794ea08..f6906ac91805 100644 --- a/logging/logadmin/metrics_test.go +++ b/logging/logadmin/metrics_test.go @@ -22,6 +22,7 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" + ltest "cloud.google.com/go/logging/internal/testing" "google.golang.org/api/iterator" ) @@ -55,26 +56,32 @@ func TestCreateDeleteMetric(t *testing.T) { Description: "DESC", Filter: "FILTER", } + if err := client.CreateMetric(ctx, metric); err != nil { t.Fatal(err) } defer client.DeleteMetric(ctx, metric.ID) - got, err := client.Metric(ctx, metric.ID) - if err != nil { - t.Fatal(err) - } + var got *Metric + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.Metric(ctx, metric.ID) + return err + }) if want := metric; !testutil.Equal(got, want) { t.Errorf("got %+v, want %+v", got, want) } - if err := client.DeleteMetric(ctx, metric.ID); err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + return client.DeleteMetric(ctx, metric.ID) + }) - if _, err := client.Metric(ctx, metric.ID); err == nil { - t.Fatal("got no error, expected one") - } + // client.Metric should give an error. Test if this is the case, but retry on + // retryable errors. + ltest.RetryAndExpectError(t, func(r *testutil.R) error { + _, err := client.Metric(ctx, metric.ID) + return err + }) } func TestUpdateMetric(t *testing.T) { @@ -86,27 +93,31 @@ func TestUpdateMetric(t *testing.T) { } // Updating a non-existent metric creates a new one. - if err := client.UpdateMetric(ctx, metric); err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + return client.UpdateMetric(ctx, metric) + }) defer client.DeleteMetric(ctx, metric.ID) - got, err := client.Metric(ctx, metric.ID) - if err != nil { - t.Fatal(err) - } + + var got *Metric + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.Metric(ctx, metric.ID) + return err + }) if want := metric; !testutil.Equal(got, want) { t.Errorf("got %+v, want %+v", got, want) } // Updating an existing metric changes it. metric.Description = "CHANGED" - if err := client.UpdateMetric(ctx, metric); err != nil { - t.Fatal(err) - } - got, err = client.Metric(ctx, metric.ID) - if err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + return client.UpdateMetric(ctx, metric) + }) + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.Metric(ctx, metric.ID) + return err + }) if want := metric; !testutil.Equal(got, want) { t.Errorf("got %+v, want %+v", got, want) } @@ -127,22 +138,15 @@ func TestListMetrics(t *testing.T) { want[m.ID] = m } for _, m := range metrics { - if err := client.CreateMetric(ctx, m); err != nil { - t.Fatalf("Create(%q): %v", m.ID, err) - } + ltest.Retry(t, func(r *testutil.R) error { + return client.CreateMetric(ctx, m) + }) defer client.DeleteMetric(ctx, m.ID) } got := map[string]*Metric{} it := client.Metrics(ctx) - for { - m, err := it.Next() - if err == iterator.Done { - break - } - if err != nil { - t.Fatal(err) - } + for m, done := ltest.RetryIteratorNext(t, it); !done; m, done = ltest.RetryIteratorNext(t, it) { // If tests run simultaneously, we may have more metrics than we // created. So only check for our own. if _, ok := want[m.ID]; ok { diff --git a/logging/logadmin/sinks_test.go b/logging/logadmin/sinks_test.go index 1db528cd52ff..381dc2c1ef30 100644 --- a/logging/logadmin/sinks_test.go +++ b/logging/logadmin/sinks_test.go @@ -189,34 +189,38 @@ func TestCreateSink(t *testing.T) { Filter: testFilter, IncludeChildren: true, } - got, err := client.CreateSink(ctx, sink) - if err != nil { - t.Fatal(err) - } + var got *Sink + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.CreateSink(ctx, sink) + return err + }) defer client.DeleteSink(ctx, sink.ID) sink.WriterIdentity = ltest.SharedServiceAccount if want := sink; !testutil.Equal(got, want) { t.Errorf("got %+v, want %+v", got, want) } - got, err = client.Sink(ctx, sink.ID) - if err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.Sink(ctx, sink.ID) + return err + }) if want := sink; !testutil.Equal(got, want) { t.Errorf("got %+v, want %+v", got, want) } // UniqueWriterIdentity sink.ID = sinkIDs.New() - got, err = client.CreateSinkOpt(ctx, sink, SinkOptions{UniqueWriterIdentity: true}) - if err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.CreateSinkOpt(ctx, sink, SinkOptions{UniqueWriterIdentity: true}) + return err + }) defer client.DeleteSink(ctx, sink.ID) // Grant destination permissions to sink's writer identity. - err = addBucketCreator(testBucket, got.WriterIdentity) + err := addBucketCreator(testBucket, got.WriterIdentity) if err != nil { t.Fatal(err) } @@ -236,23 +240,27 @@ func TestUpdateSink(t *testing.T) { WriterIdentity: ltest.SharedServiceAccount, } - _, err := client.CreateSink(ctx, sink) - if err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + _, err := client.CreateSink(ctx, sink) + return err + }) defer client.DeleteSink(ctx, sink.ID) - got, err := client.UpdateSink(ctx, sink) - if err != nil { - t.Fatal(err) - } + var got *Sink + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.UpdateSink(ctx, sink) + return err + }) if want := sink; !testutil.Equal(got, want) { t.Errorf("got\n%+v\nwant\n%+v", got, want) } - got, err = client.Sink(ctx, sink.ID) - if err != nil { - t.Fatal(err) - } + + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.Sink(ctx, sink.ID) + return err + }) if want := sink; !testutil.Equal(got, want) { t.Errorf("got\n%+v\nwant\n%+v", got, want) } @@ -260,13 +268,16 @@ func TestUpdateSink(t *testing.T) { // Updating an existing sink changes it. sink.Filter = "" sink.IncludeChildren = false - if _, err := client.UpdateSink(ctx, sink); err != nil { - t.Fatal(err) - } - got, err = client.Sink(ctx, sink.ID) - if err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + _, err := client.UpdateSink(ctx, sink) + return err + }) + + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.Sink(ctx, sink.ID) + return err + }) if want := sink; !testutil.Equal(got, want) { t.Errorf("got\n%+v\nwant\n%+v", got, want) } @@ -283,26 +294,29 @@ func TestUpdateSinkOpt(t *testing.T) { WriterIdentity: ltest.SharedServiceAccount, } - _, err := client.CreateSink(ctx, origSink) - if err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + _, err := client.CreateSink(ctx, origSink) + return err + }) defer client.DeleteSink(ctx, origSink.ID) // Updating with empty options is an error. - _, err = client.UpdateSinkOpt(ctx, &Sink{ID: id, Destination: testSinkDestination}, SinkOptions{}) - if err == nil { - t.Errorf("got %v, want nil", err) - } + ltest.RetryAndExpectError(t, func(r *testutil.R) error { + _, err := client.UpdateSinkOpt(ctx, &Sink{ID: id, Destination: testSinkDestination}, SinkOptions{}) + return err + }) // Update selected fields. - got, err := client.UpdateSinkOpt(ctx, &Sink{ID: id}, SinkOptions{ - UpdateFilter: true, - UpdateIncludeChildren: true, + var got *Sink + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.UpdateSinkOpt(ctx, &Sink{ID: id}, SinkOptions{ + UpdateFilter: true, + UpdateIncludeChildren: true, + }) + return err }) - if err != nil { - t.Fatal(err) - } + want := *origSink want.Filter = "" want.IncludeChildren = false @@ -311,13 +325,15 @@ func TestUpdateSinkOpt(t *testing.T) { } // Update writer identity. - got, err = client.UpdateSinkOpt(ctx, &Sink{ID: id, Filter: "foo"}, - SinkOptions{UniqueWriterIdentity: true}) - if err != nil { - t.Fatal(err) - } + ltest.Retry(t, func(r *testutil.R) error { + var err error + got, err = client.UpdateSinkOpt(ctx, &Sink{ID: id, Filter: "foo"}, + SinkOptions{UniqueWriterIdentity: true}) + return err + }) + // Grant destination permissions to sink's new writer identity. - err = addBucketCreator(testBucket, got.WriterIdentity) + err := addBucketCreator(testBucket, got.WriterIdentity) if err != nil { t.Fatal(err) } @@ -354,14 +370,7 @@ func TestListSinks(t *testing.T) { got := map[string]*Sink{} it := client.Sinks(ctx) - for { - s, err := it.Next() - if err == iterator.Done { - break - } - if err != nil { - t.Fatal(err) - } + for s, done := ltest.RetryIteratorNext(t, it); !done; s, done = ltest.RetryIteratorNext(t, it) { // If tests run simultaneously, we may have more sinks than we // created. So only check for our own. if _, ok := want[s.ID]; ok { diff --git a/logging/logging_test.go b/logging/logging_test.go index 9988c63c150b..763043caa7bc 100644 --- a/logging/logging_test.go +++ b/logging/logging_test.go @@ -1597,7 +1597,7 @@ func TestWriteLogEntriesSizeLimit(t *testing.T) { } client.OnError = func(e error) { - t.Fatalf(e.Error()) + t.Fatal(e.Error()) } defer client.Close() diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 16aa59dce1d6..4748e269e2d6 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -35,6 +35,7 @@ import ( "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" + "google.golang.org/api/transport" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" @@ -118,6 +119,24 @@ type grpcStorageClient struct { settings *settings } +func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { + var project string + // TODO: use new auth client + c, err := transport.Creds(ctx, s.clientOption...) + if err == nil { + project = c.ProjectID + } + metricsContext, err := newGRPCMetricContext(ctx, metricsConfig{ + project: project, + interval: config.metricInterval, + manualReader: config.manualReader}, + ) + if err != nil { + return nil, fmt.Errorf("gRPC Metrics: %w", err) + } + return metricsContext, nil +} + // newGRPCStorageClient initializes a new storageClient that uses the gRPC // Storage API. func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) { diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 149b37807ed4..f7bebd1defa7 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -16,8 +16,8 @@ package storage import ( "context" + "errors" "fmt" - "log" "strings" "time" @@ -29,8 +29,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/api/option" - "google.golang.org/api/transport" "google.golang.org/grpc" + "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/stats/opentelemetry" ) @@ -39,98 +39,78 @@ const ( metricPrefix = "storage.googleapis.com/client/" ) -func latencyHistogramBoundaries() []float64 { - boundaries := []float64{} - boundary := 0.0 - increment := 0.002 - // 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range - for i := 0; i < 50; i++ { - boundaries = append(boundaries, boundary) - // increment by 2ms - boundary += increment - } - // For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes - for i := 0; i < 150 && boundary < 300; i++ { - boundaries = append(boundaries, boundary) - if i != 0 && i%10 == 0 { - increment *= 2 - } - boundary += increment - } - return boundaries +// Added to help with tests +type storageMonitoredResource struct { + project string + api string + location string + instance string + cloudPlatform string + host string + resource *resource.Resource } -func sizeHistogramBoundaries() []float64 { - kb := 1024.0 - mb := 1024.0 * kb - gb := 1024.0 * mb - boundaries := []float64{} - boundary := 0.0 - increment := 128 * kb - // 128 KiB increments up to 4MiB, then exponential growth - for len(boundaries) < 200 && boundary <= 16*gb { - boundaries = append(boundaries, boundary) - boundary += increment - if boundary >= 4*mb { - increment *= 2 - } +func (smr *storageMonitoredResource) exporter() (metric.Exporter, error) { + exporter, err := mexporter.New( + mexporter.WithProjectID(smr.project), + mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), + mexporter.WithCreateServiceTimeSeries(), + mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"}), + ) + if err != nil { + return nil, fmt.Errorf("storage: creating metrics exporter: %w", err) } - return boundaries -} - -func metricFormatter(m metricdata.Metrics) string { - return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") -} - -func gcpAttributeExpectedDefaults() []attribute.KeyValue { - return []attribute.KeyValue{ - {Key: "location", Value: attribute.StringValue("global")}, - {Key: "cloud_platform", Value: attribute.StringValue("unknown")}, - {Key: "host_id", Value: attribute.StringValue("unknown")}} -} - -// Added to help with tests -type preparedResource struct { - projectToUse string - resource *resource.Resource + return exporter, nil } -func newPreparedResource(ctx context.Context, project string, resourceOptions []resource.Option) (*preparedResource, error) { - detectedAttrs, err := resource.New(ctx, resourceOptions...) +func newStorageMonitoredResource(ctx context.Context, project, api string, opts ...resource.Option) (*storageMonitoredResource, error) { + detectedAttrs, err := resource.New(ctx, opts...) if err != nil { return nil, err } - preparedResource := &preparedResource{} + smr := &storageMonitoredResource{ + instance: uuid.New().String(), + api: api, + project: project, + } s := detectedAttrs.Set() - p, present := s.Value("cloud.account.id") - if present { - preparedResource.projectToUse = p.AsString() + // Attempt to use resource detector project id if project id wasn't + // identified using ADC as a last resort. Otherwise metrics cannot be started. + if p, present := s.Value("cloud.account.id"); present && smr.project == "" { + smr.project = p.AsString() + } else if !present && smr.project == "" { + return nil, errors.New("google cloud project is required to start client-side metrics") + } + if v, ok := s.Value("cloud.region"); ok { + smr.location = v.AsString() } else { - preparedResource.projectToUse = project + smr.location = "global" } - updates := []attribute.KeyValue{} - for _, kv := range gcpAttributeExpectedDefaults() { - if val, present := s.Value(kv.Key); !present || val.AsString() == "" { - updates = append(updates, attribute.KeyValue{Key: kv.Key, Value: kv.Value}) - } + if v, ok := s.Value("cloud.platform"); ok { + smr.cloudPlatform = v.AsString() + } else { + smr.cloudPlatform = "unknown" } - r, err := resource.New( - ctx, - resource.WithAttributes( - attribute.KeyValue{Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, - attribute.KeyValue{Key: "instance_id", Value: attribute.StringValue(uuid.New().String())}, - attribute.KeyValue{Key: "project_id", Value: attribute.StringValue(project)}, - attribute.KeyValue{Key: "api", Value: attribute.StringValue("grpc")}, - ), - resource.WithAttributes(detectedAttrs.Attributes()...), - // Last duplicate key / value wins - resource.WithAttributes(updates...), - ) + if v, ok := s.Value("host.id"); ok { + smr.host = v.AsString() + } else if v, ok := s.Value("faas.id"); ok { + smr.host = v.AsString() + } else { + smr.host = "unknown" + } + smr.resource, err = resource.New(ctx, resource.WithAttributes([]attribute.KeyValue{ + {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, + {Key: "project_id", Value: attribute.StringValue(smr.project)}, + {Key: "api", Value: attribute.StringValue(smr.api)}, + {Key: "instance_id", Value: attribute.StringValue(smr.instance)}, + {Key: "location", Value: attribute.StringValue(smr.location)}, + {Key: "cloud_platform", Value: attribute.StringValue(smr.cloudPlatform)}, + {Key: "host_id", Value: attribute.StringValue(smr.host)}, + }...)) if err != nil { return nil, err } - preparedResource.resource = r - return preparedResource, nil + return smr, nil } type metricsContext struct { @@ -142,64 +122,65 @@ type metricsContext struct { close func() } -func createHistogramView(name string, boundaries []float64) metric.View { - return metric.NewView(metric.Instrument{ - Name: name, - Kind: metric.InstrumentKindHistogram, - }, metric.Stream{ - Name: name, - Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries}, - }) +type metricsConfig struct { + project string + interval time.Duration + customExporter *metric.Exporter + manualReader *metric.ManualReader // used by tests + disableExporter bool // used by tests disables exports + resourceOpts []resource.Option // used by tests } -func newGRPCMetricContext(ctx context.Context, project string, config storageConfig) (*metricsContext, error) { +func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { var exporter metric.Exporter meterOpts := []metric.Option{} - if config.metricExporter != nil { - exporter = *config.metricExporter - } else { - preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())}) + if cfg.customExporter == nil { + var ropts []resource.Option + if cfg.resourceOpts != nil { + ropts = cfg.resourceOpts + } else { + ropts = []resource.Option{resource.WithDetectors(gcp.NewDetector())} + } + smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc", ropts...) if err != nil { return nil, err } - meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource)) - // Implementation requires a project, if one is not determined possibly user - // credentials. Then we will fail stating gRPC Metrics require a project-id. - if project == "" && preparedResource.projectToUse == "" { - return nil, fmt.Errorf("google cloud project is required to start client-side metrics") - } - // If projectTouse isn't the same as project provided to Storage client, then - // emit a log stating which project is being used to emit metrics to. - if project != preparedResource.projectToUse { - log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project) - } - meOpts := []mexporter.Option{ - mexporter.WithProjectID(preparedResource.projectToUse), - mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), - mexporter.WithCreateServiceTimeSeries(), - mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})} - exporter, err = mexporter.New(meOpts...) + exporter, err = smr.exporter() if err != nil { return nil, err } - } - // Metric views update histogram boundaries to be relevant to GCS - // otherwise default OTel histogram boundaries are used. - metricViews := []metric.View{ - createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()), - createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), - createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()), + meterOpts = append(meterOpts, metric.WithResource(smr.resource)) + } else { + exporter = *cfg.customExporter } interval := time.Minute - if config.metricInterval > 0 { - interval = config.metricInterval + if cfg.interval > 0 { + interval = cfg.interval + } + meterOpts = append(meterOpts, + // Metric views update histogram boundaries to be relevant to GCS + // otherwise default OTel histogram boundaries are used. + metric.WithView( + createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()), + createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), + createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries())), + ) + if cfg.manualReader != nil { + meterOpts = append(meterOpts, metric.WithReader(cfg.manualReader)) + } + if !cfg.disableExporter { + meterOpts = append(meterOpts, metric.WithReader( + metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval)))) } - meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), - metric.WithView(metricViews...)) provider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics().Add( + Metrics: stats.NewMetrics( + "grpc.client.attempt.started", + "grpc.client.attempt.duration", + "grpc.client.attempt.sent_total_compressed_message_size", + "grpc.client.attempt.rcvd_total_compressed_message_size", + "grpc.client.call.duration", "grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", @@ -208,45 +189,29 @@ func newGRPCMetricContext(ctx context.Context, project string, config storageCon "grpc.lb.rls.cache_size", "grpc.lb.rls.default_target_picks", "grpc.lb.rls.target_picks", - "grpc.lb.rls.failed_picks"), + "grpc.lb.rls.failed_picks", + ), OptionalLabels: []string{"grpc.lb.locality"}, } opts := []option.ClientOption{ - option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})), - option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), + option.WithGRPCDialOption( + opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})), + option.WithGRPCDialOption( + grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), } - context := &metricsContext{ + return &metricsContext{ clientOpts: opts, provider: provider, - close: createShutdown(ctx, provider), - } - return context, nil -} - -func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { - var project string - c, err := transport.Creds(ctx, s.clientOption...) - if err == nil { - project = c.ProjectID - } - // Enable client-side metrics for gRPC - metricsContext, err := newGRPCMetricContext(ctx, project, config) - if err != nil { - return nil, fmt.Errorf("gRPC Metrics: %w", err) - } - return metricsContext, nil -} - -func createShutdown(ctx context.Context, provider *metric.MeterProvider) func() { - return func() { - provider.Shutdown(ctx) - } + close: func() { + provider.Shutdown(ctx) + }, + }, nil } // Silences permission errors after initial error is emitted to prevent // chatty logs. type exporterLogSuppressor struct { - exporter metric.Exporter + metric.Exporter emittedFailure bool } @@ -254,7 +219,7 @@ type exporterLogSuppressor struct { // lack of credentials after initial failure. // https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric@v1.28.0#Exporter func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { - if err := e.exporter.Export(ctx, rm); err != nil && !e.emittedFailure { + if err := e.Exporter.Export(ctx, rm); err != nil && !e.emittedFailure { if strings.Contains(err.Error(), "PermissionDenied") { e.emittedFailure = true return fmt.Errorf("gRPC metrics failed due permission issue: %w", err) @@ -264,18 +229,55 @@ func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.Resou return nil } -func (e *exporterLogSuppressor) Temporality(k metric.InstrumentKind) metricdata.Temporality { - return e.exporter.Temporality(k) +func latencyHistogramBoundaries() []float64 { + boundaries := []float64{} + boundary := 0.0 + increment := 0.002 + // 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range + for i := 0; i < 50; i++ { + boundaries = append(boundaries, boundary) + // increment by 2ms + boundary += increment + } + // For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes + for i := 0; i < 150 && boundary < 300; i++ { + boundaries = append(boundaries, boundary) + if i != 0 && i%10 == 0 { + increment *= 2 + } + boundary += increment + } + return boundaries } -func (e *exporterLogSuppressor) Aggregation(k metric.InstrumentKind) metric.Aggregation { - return e.exporter.Aggregation(k) +func sizeHistogramBoundaries() []float64 { + kb := 1024.0 + mb := 1024.0 * kb + gb := 1024.0 * mb + boundaries := []float64{} + boundary := 0.0 + increment := 128 * kb + // 128 KiB increments up to 4MiB, then exponential growth + for len(boundaries) < 200 && boundary <= 16*gb { + boundaries = append(boundaries, boundary) + boundary += increment + if boundary >= 4*mb { + increment *= 2 + } + } + return boundaries } -func (e *exporterLogSuppressor) ForceFlush(ctx context.Context) error { - return e.exporter.ForceFlush(ctx) +func createHistogramView(name string, boundaries []float64) metric.View { + return metric.NewView(metric.Instrument{ + Name: name, + Kind: metric.InstrumentKindHistogram, + }, metric.Stream{ + Name: name, + Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries}, + }) } -func (e *exporterLogSuppressor) Shutdown(ctx context.Context) error { - return e.exporter.Shutdown(ctx) +func metricFormatter(m metricdata.Metrics) string { + return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") } diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 23b3cf981e1c..44d5ed89bd03 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -34,7 +34,7 @@ func TestMetricFormatter(t *testing.T) { } } -func TestNewPreparedResource(t *testing.T) { +func TestStorageMonitoredResource(t *testing.T) { ctx := context.Background() for _, test := range []struct { desc string @@ -52,16 +52,22 @@ func TestNewPreparedResource(t *testing.T) { }, attribute.KeyValue{ Key: "host_id", Value: attribute.StringValue("unknown"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), }, { - desc: "use detected values when GCP attributes are detected", + desc: "use detected values when GCE attributes are detected", detectedAttributes: []attribute.KeyValue{ - {Key: "location", + {Key: "cloud.region", Value: attribute.StringValue("us-central1")}, - {Key: "cloud_platform", - Value: attribute.StringValue("gcp")}, - {Key: "host_id", + {Key: "cloud.platform", + Value: attribute.StringValue("gce")}, + {Key: "host.id", Value: attribute.StringValue("gce-instance-id")}, }, wantAttributes: attribute.NewSet(attribute.KeyValue{ @@ -69,48 +75,60 @@ func TestNewPreparedResource(t *testing.T) { Value: attribute.StringValue("us-central1"), }, attribute.KeyValue{ Key: "cloud_platform", - Value: attribute.StringValue("gcp"), + Value: attribute.StringValue("gce"), }, attribute.KeyValue{ Key: "host_id", Value: attribute.StringValue("gce-instance-id"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), - }, { - desc: "use default when value is empty string", + }, + { + desc: "use detected values when FAAS attributes are detected", detectedAttributes: []attribute.KeyValue{ - {Key: "location", + {Key: "cloud.region", Value: attribute.StringValue("us-central1")}, - {Key: "cloud_platform", - Value: attribute.StringValue("")}, - {Key: "host_id", - Value: attribute.StringValue("")}, + {Key: "cloud.platform", + Value: attribute.StringValue("cloud-run")}, + {Key: "faas.id", + Value: attribute.StringValue("run-instance-id")}, }, wantAttributes: attribute.NewSet(attribute.KeyValue{ Key: "location", Value: attribute.StringValue("us-central1"), }, attribute.KeyValue{ Key: "cloud_platform", - Value: attribute.StringValue("unknown"), + Value: attribute.StringValue("cloud-run"), }, attribute.KeyValue{ Key: "host_id", - Value: attribute.StringValue("unknown"), + Value: attribute.StringValue("run-instance-id"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), }, } { t.Run(test.desc, func(t *testing.T) { - resourceOptions := []resource.Option{resource.WithAttributes(test.detectedAttributes...)} - result, err := newPreparedResource(ctx, "project", resourceOptions) + smr, err := newStorageMonitoredResource(ctx, "project-id", "grpc", resource.WithAttributes(test.detectedAttributes...)) if err != nil { - t.Errorf("newPreparedResource: %v", err) + t.Errorf("newStorageMonitoredResource: %v", err) } - resultSet := result.resource.Set() + resultSet := smr.resource.Set() for _, want := range test.wantAttributes.ToSlice() { got, exists := resultSet.Value(want.Key) if !exists { - t.Errorf("newPreparedResource: %v not set", want.Key) + t.Errorf("resultSet[%v] not set", want.Key) continue } if got != want.Value { - t.Errorf("newPreparedResource: want[%v] = %v, got: %v", want.Key, want.Value, got) + t.Errorf("want[%v] = %v, got: %v", want.Key, want.Value.AsString(), got.AsString()) continue } } @@ -118,9 +136,56 @@ func TestNewPreparedResource(t *testing.T) { } } +func TestNewGRPCMetricContext(t *testing.T) { + ctx := context.Background() + mr := metric.NewManualReader() + attrs := []attribute.KeyValue{ + {Key: "cloud.region", + Value: attribute.StringValue("us-central1")}, + {Key: "cloud.platform", + Value: attribute.StringValue("gcp")}, + {Key: "host.id", + Value: attribute.StringValue("gce-instance-id")}, + } + cfg := metricsConfig{ + project: "project-id", + manualReader: mr, + disableExporter: true, // disable since this is a unit test + resourceOpts: []resource.Option{resource.WithAttributes(attrs...)}, + } + mc, err := newGRPCMetricContext(ctx, cfg) + if err != nil { + t.Errorf("newGRPCMetricContext: %v", err) + } + defer mc.close() + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + monitoredResourceWant := map[string]string{ + "gcp.resource_type": "storage.googleapis.com/Client", + "api": "grpc", + "cloud_platform": "gcp", + "host_id": "gce-instance-id", + "location": "us-central1", + "project_id": "project-id", + "instance_id": "ignore", + } + for _, attr := range rm.Resource.Attributes() { + want := monitoredResourceWant[string(attr.Key)] + if want == "ignore" { + continue + } + got := attr.Value.AsString() + if want != got { + t.Errorf("got: %v want: %v", got, want) + } + } +} + func TestNewExporterLogSuppressor(t *testing.T) { ctx := context.Background() - s := &exporterLogSuppressor{exporter: &failingExporter{}} + s := &exporterLogSuppressor{Exporter: &failingExporter{}} if err := s.Export(ctx, nil); err == nil { t.Errorf("exporterLogSuppressor: did not emit an error when one was expected") } @@ -129,24 +194,10 @@ func TestNewExporterLogSuppressor(t *testing.T) { } } -type failingExporter struct{} +type failingExporter struct { + metric.Exporter +} func (f *failingExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { return fmt.Errorf("PermissionDenied") } - -func (f *failingExporter) Temporality(m metric.InstrumentKind) metricdata.Temporality { - return metricdata.CumulativeTemporality -} - -func (f *failingExporter) Aggregation(ik metric.InstrumentKind) metric.Aggregation { - return metric.AggregationDefault{} -} - -func (f *failingExporter) ForceFlush(ctx context.Context) error { - return nil -} - -func (f *failingExporter) Shutdown(ctx context.Context) error { - return nil -} diff --git a/storage/integration_test.go b/storage/integration_test.go index 0a19c59e4815..7b6e5ae465a6 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -55,6 +55,8 @@ import ( "github.com/googleapis/gax-go/v2/apierror" "go.opentelemetry.io/contrib/detectors/gcp" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -416,6 +418,108 @@ func TestIntegration_DoNotDetectDirectConnectivityWhenDisabled(t *testing.T) { }, internaloption.EnableDirectPath(false)) } +func TestIntegration_MetricsEnablement(t *testing.T) { + ctx := skipHTTP("grpc only test") + mr := metric.NewManualReader() + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { + it := client.Bucket(bucket).Objects(ctx, nil) + _, err := it.Next() + if err != iterator.Done { + t.Errorf("Objects.Next: expected iterator.Done got %v", err) + } + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + metricCheck := map[string]bool{ + "grpc.client.attempt.started": false, + "grpc.client.attempt.duration": false, + "grpc.client.attempt.sent_total_compressed_message_size": false, + "grpc.client.attempt.rcvd_total_compressed_message_size": false, + "grpc.client.call.duration": false, + } + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + metricCheck[m.Name] = true + } + } + for k, v := range metricCheck { + if !v { + t.Errorf("metric %v not found", k) + } + } + }, withTestMetricReader(mr)) +} + +func TestIntegration_MetricsEnablementInGCE(t *testing.T) { + t.Skip("flaky test for rls metrics; other metrics are tested TestIntegration_MetricsEnablement") + ctx := skipHTTP("grpc only test") + mr := metric.NewManualReader() + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { + detectedAttrs, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector())) + if err != nil { + t.Fatalf("resource.New: %v", err) + } + attrs := detectedAttrs.Set() + if v, exists := attrs.Value("cloud.platform"); !exists || v.AsString() != "gcp_compute_engine" { + t.Skip("only testable in a GCE instance") + } + instance, exists := attrs.Value("host.id") + if !exists { + t.Skip("GCE instance id not detected") + } + if v, exists := attrs.Value("cloud.region"); !exists || !strings.Contains(strings.ToLower(v.AsString()), "us-west1") { + t.Skip("inside a GCE instance but region is not us-west1") + } + it := client.Buckets(ctx, testutil.ProjID()) + _, _ = it.Next() + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + + monitoredResourceWant := map[string]string{ + "gcp.resource_type": "storage.googleapis.com/Client", + "api": "grpc", + "cloud_platform": "gcp_compute_engine", + "host_id": instance.AsString(), + "location": "us-west1", + "project_id": testutil.ProjID(), + "instance_id": "ignore", // generated UUID + } + for _, attr := range rm.Resource.Attributes() { + want := monitoredResourceWant[string(attr.Key)] + if want == "ignore" { + continue + } + got := attr.Value.AsString() + if want != got { + t.Errorf("got: %v want: %v", got, want) + } + } + metricCheck := map[string]bool{ + "grpc.client.attempt.started": false, + "grpc.client.attempt.duration": false, + "grpc.client.attempt.sent_total_compressed_message_size": false, + "grpc.client.attempt.rcvd_total_compressed_message_size": false, + "grpc.client.call.duration": false, + "grpc.lb.rls.cache_entries": false, + "grpc.lb.rls.cache_size": false, + "grpc.lb.rls.default_target_picks": false, + } + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + metricCheck[m.Name] = true + } + } + for k, v := range metricCheck { + if !v { + t.Errorf("metric %v not found", k) + } + } + }, withTestMetricReader(mr)) +} + func TestIntegration_BucketCreateDelete(t *testing.T) { ctx := skipJSONReads(context.Background(), "no reads in test") multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) { diff --git a/storage/option.go b/storage/option.go index 3b0cf9e71831..a7474842b78a 100644 --- a/storage/option.go +++ b/storage/option.go @@ -79,6 +79,7 @@ type storageConfig struct { disableClientMetrics bool metricExporter *metric.Exporter metricInterval time.Duration + manualReader *metric.ManualReader readStallTimeoutConfig *experimental.ReadStallTimeoutConfig } @@ -192,6 +193,20 @@ func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) { c.metricExporter = w.metricExporter } +type withTestMetricReaderConfig struct { + internaloption.EmbeddableAdapter + // reader override + metricReader *metric.ManualReader +} + +func withTestMetricReader(ex *metric.ManualReader) option.ClientOption { + return &withTestMetricReaderConfig{metricReader: ex} +} + +func (w *withTestMetricReaderConfig) ApplyStorageOpt(c *storageConfig) { + c.manualReader = w.metricReader +} + // WithReadStallTimeout is an option that may be passed to [NewClient]. // It enables the client to retry the stalled read request, happens as part of // storage.Reader creation. As the name suggest, timeout is adjusted dynamically diff --git a/storage/option_test.go b/storage/option_test.go index dfb30d3667cc..6bdedd3716b8 100644 --- a/storage/option_test.go +++ b/storage/option_test.go @@ -22,6 +22,7 @@ import ( "cloud.google.com/go/storage/experimental" "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" ) @@ -171,6 +172,21 @@ func TestSetCustomExporter(t *testing.T) { } } +func TestSetManualReader(t *testing.T) { + manualReader := metric.NewManualReader() + want := storageConfig{ + manualReader: manualReader, + } + var got storageConfig + opt := withTestMetricReader(manualReader) + if storageOpt, ok := opt.(storageClientOption); ok { + storageOpt.ApplyStorageOpt(&got) + } + if got.manualReader != want.manualReader { + t.Errorf("TestSetCustomExpoerter: manualReader want=%v, got=%v", want.manualReader, got.manualReader) + } +} + func TestGetDynamicReadReqInitialTimeoutSecFromEnv(t *testing.T) { defaultValue := 10 * time.Second