From d21429ef5ed873b38e976ae8ce8357b66db77236 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Mon, 16 Oct 2023 15:44:17 -0400 Subject: [PATCH] chore(storage): add invocation ID to gRPC calls (#8698) This adds x-goog-gcs-idempotency-token and x-goog-api-client header keys to all gRPC calls. It also refactors how this is done for both HTTP and gRPC to take advantage of passing in headers via the callctx package rather than by mutating requests directly. --- storage/grpc_client.go | 140 ++++++++++++++++++----------------- storage/hmac.go | 5 +- storage/http_client.go | 164 ++++++++++++++++++++--------------------- storage/invoke.go | 46 +++++------- storage/invoke_test.go | 18 +++-- storage/reader.go | 10 --- 6 files changed, 182 insertions(+), 201 deletions(-) diff --git a/storage/grpc_client.go b/storage/grpc_client.go index daaaf71420d0..d8f70b77a465 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -141,11 +141,11 @@ func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project strin Project: toProjectResource(project), } var resp *storagepb.ServiceAccount - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error resp, err = c.raw.GetServiceAccount(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return "", err } @@ -173,13 +173,13 @@ func (c *grpcStorageClient) CreateBucket(ctx context.Context, project, bucket st } var battrs *BucketAttrs - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { res, err := c.raw.CreateBucket(ctx, req, s.gax...) battrs = newBucketFromProto(res) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) return battrs, err } @@ -193,26 +193,26 @@ func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opt var gitr *gapic.BucketIterator fetch := func(pageSize int, pageToken string) (token string, err error) { - // Initialize GAPIC-based iterator when pageToken is empty, which - // indicates that this fetch call is attempting to get the first page. - // - // Note: Initializing the GAPIC-based iterator lazily is necessary to - // capture the BucketIterator.Prefix set by the user *after* the - // BucketIterator is returned to them from the veneer. - if pageToken == "" { - req := &storagepb.ListBucketsRequest{ - Parent: toProjectResource(it.projectID), - Prefix: it.Prefix, - } - gitr = c.raw.ListBuckets(it.ctx, req, s.gax...) - } var buckets []*storagepb.Bucket var next string - err = run(it.ctx, func() error { + err = run(it.ctx, func(ctx context.Context) error { + // Initialize GAPIC-based iterator when pageToken is empty, which + // indicates that this fetch call is attempting to get the first page. + // + // Note: Initializing the GAPIC-based iterator lazily is necessary to + // capture the BucketIterator.Prefix set by the user *after* the + // BucketIterator is returned to them from the veneer. + if pageToken == "" { + req := &storagepb.ListBucketsRequest{ + Parent: toProjectResource(it.projectID), + Prefix: it.Prefix, + } + gitr = c.raw.ListBuckets(ctx, req, s.gax...) + } buckets, next, err = gitr.InternalFetch(pageSize, pageToken) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return "", err } @@ -246,9 +246,9 @@ func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, con ctx = setUserProjectMetadata(ctx, s.userProject) } - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { return c.raw.DeleteBucket(ctx, req, s.gax...) - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) } func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { @@ -265,13 +265,13 @@ func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds } var battrs *BucketAttrs - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { res, err := c.raw.GetBucket(ctx, req, s.gax...) battrs = newBucketFromProto(res) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { return nil, ErrBucketNotExist @@ -369,11 +369,11 @@ func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uat req.UpdateMask = fieldMask var battrs *BucketAttrs - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { res, err := c.raw.UpdateBucket(ctx, req, s.gax...) battrs = newBucketFromProto(res) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) return battrs, err } @@ -386,10 +386,10 @@ func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucke return err } - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { _, err := c.raw.LockBucketRetentionPolicy(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) } func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator { @@ -413,7 +413,6 @@ func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Q if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } - gitr := c.raw.ListObjects(it.ctx, req, s.gax...) fetch := func(pageSize int, pageToken string) (token string, err error) { // MatchGlob not yet supported for gRPC. // TODO: add support when b/287306063 resolved. @@ -421,10 +420,13 @@ func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Q return "", status.Errorf(codes.Unimplemented, "MatchGlob is not supported for gRPC") } var objects []*storagepb.Object - err = run(it.ctx, func() error { + var gitr *gapic.ObjectIterator + err = run(it.ctx, func(ctx context.Context) error { + gitr = c.raw.ListObjects(ctx, req, s.gax...) + it.ctx = ctx objects, token, err = gitr.InternalFetch(pageSize, pageToken) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { err = ErrBucketNotExist @@ -467,9 +469,9 @@ func (c *grpcStorageClient) DeleteObject(ctx context.Context, bucket, object str if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { return c.raw.DeleteObject(ctx, req, s.gax...) - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { return ErrObjectNotExist } @@ -495,12 +497,12 @@ func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string } var attrs *ObjectAttrs - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { res, err := c.raw.GetObject(ctx, req, s.gax...) attrs = newObjectFromProto(res) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { return nil, ErrObjectNotExist @@ -577,11 +579,11 @@ func (c *grpcStorageClient) UpdateObject(ctx context.Context, bucket, object str req.UpdateMask = fieldMask var attrs *ObjectAttrs - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { res, err := c.raw.UpdateObject(ctx, req, s.gax...) attrs = newObjectFromProto(res) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if e, ok := status.FromError(err); ok && e.Code() == codes.NotFound { return nil, ErrObjectNotExist } @@ -820,10 +822,10 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec var obj *storagepb.Object var err error - if err := run(ctx, func() error { + if err := run(ctx, func(ctx context.Context) error { obj, err = c.raw.ComposeObject(ctx, rawReq, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)); err != nil { + }, s.retry, s.idempotent); err != nil { return nil, err } @@ -870,9 +872,9 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec var res *storagepb.RewriteResponse var err error - retryCall := func() error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err } + retryCall := func(ctx context.Context) error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err } - if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)); err != nil { + if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil { return nil, err } @@ -936,7 +938,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange var msg *storagepb.ReadObjectResponse var err error - err = run(cc, func() error { + err = run(cc, func(ctx context.Context) error { stream, err = c.raw.ReadObject(cc, req, s.gax...) if err != nil { return err @@ -950,7 +952,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange } return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { // Close the stream context we just created to ensure we don't leak // resources. @@ -1112,11 +1114,11 @@ func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, v }, } var rp *iampb.Policy - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) return rp, err } @@ -1130,10 +1132,10 @@ func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, p Policy: policy, } - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { _, err := c.raw.SetIamPolicy(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) } func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) { @@ -1144,11 +1146,11 @@ func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource str Permissions: permissions, } var res *iampb.TestIamPermissionsResponse - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error res, err = c.raw.TestIamPermissions(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1167,11 +1169,11 @@ func (c *grpcStorageClient) GetHMACKey(ctx context.Context, project, accessID st ctx = setUserProjectMetadata(ctx, s.userProject) } var metadata *storagepb.HmacKeyMetadata - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error metadata, err = c.raw.GetHmacKey(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1193,13 +1195,13 @@ func (c *grpcStorageClient) ListHMACKeys(ctx context.Context, project, serviceAc projectID: project, retry: s.retry, } - gitr := c.raw.ListHmacKeys(it.ctx, req, s.gax...) fetch := func(pageSize int, pageToken string) (token string, err error) { var hmacKeys []*storagepb.HmacKeyMetadata - err = run(it.ctx, func() error { + err = run(it.ctx, func(ctx context.Context) error { + gitr := c.raw.ListHmacKeys(ctx, req, s.gax...) hmacKeys, token, err = gitr.InternalFetch(pageSize, pageToken) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return "", err } @@ -1246,11 +1248,11 @@ func (c *grpcStorageClient) UpdateHMACKey(ctx context.Context, project, serviceA ctx = setUserProjectMetadata(ctx, s.userProject) } var metadata *storagepb.HmacKeyMetadata - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error metadata, err = c.raw.UpdateHmacKey(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1267,11 +1269,11 @@ func (c *grpcStorageClient) CreateHMACKey(ctx context.Context, project, serviceA ctx = setUserProjectMetadata(ctx, s.userProject) } var res *storagepb.CreateHmacKeyResponse - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error res, err = c.raw.CreateHmacKey(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1290,9 +1292,9 @@ func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, project string, a if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { return c.raw.DeleteHmacKey(ctx, req, s.gax...) - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) } // Notification methods. @@ -1309,7 +1311,7 @@ func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string Parent: bucketResourceName(globalProjectAlias, bucket), } var notifications []*storagepb.NotificationConfig - err = run(ctx, func() error { + err = run(ctx, func(ctx context.Context) error { gitr := c.raw.ListNotificationConfigs(ctx, req, s.gax...) for { // PageSize is not set and fallbacks to the API default pageSize of 100. @@ -1324,7 +1326,7 @@ func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string } req.PageToken = nextPageToken } - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1342,11 +1344,11 @@ func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket strin NotificationConfig: toProtoNotification(n), } var pbn *storagepb.NotificationConfig - err = run(ctx, func() error { + err = run(ctx, func(ctx context.Context) error { var err error pbn, err = c.raw.CreateNotificationConfig(ctx, req, s.gax...) return err - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1359,9 +1361,9 @@ func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket strin s := callSettings(c.settings, opts...) req := &storagepb.DeleteNotificationConfigRequest{Name: id} - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { return c.raw.DeleteNotificationConfig(ctx, req, s.gax...) - }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + }, s.retry, s.idempotent) } // setUserProjectMetadata appends a project ID to the outgoing Context metadata @@ -1560,24 +1562,24 @@ func (w *gRPCWriter) startResumableUpload() error { // the upload, but in the future, we must also support sending it // on the *last* message of the stream. req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs) - return run(w.ctx, func() error { + return run(w.ctx, func(ctx context.Context) error { upres, err := w.c.raw.StartResumableWrite(w.ctx, req) w.upid = upres.GetUploadId() return err - }, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx)) + }, w.settings.retry, w.settings.idempotent) } // queryProgress is a helper that queries the status of the resumable upload // associated with the given upload ID. func (w *gRPCWriter) queryProgress() (int64, error) { var persistedSize int64 - err := run(w.ctx, func() error { + err := run(w.ctx, func(ctx context.Context) error { q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{ UploadId: w.upid, }) persistedSize = q.GetPersistedSize() return err - }, w.settings.retry, true, setRetryHeaderGRPC(w.ctx)) + }, w.settings.retry, true) // q.GetCommittedSize() will return 0 if q is nil. return persistedSize, err diff --git a/storage/hmac.go b/storage/hmac.go index 30b67f43770d..1b9fbe9dd202 100644 --- a/storage/hmac.go +++ b/storage/hmac.go @@ -289,12 +289,11 @@ func (it *HMACKeysIterator) fetch(pageSize int, pageToken string) (token string, call = call.MaxResults(int64(pageSize)) } - ctx := it.ctx var resp *raw.HmacKeysMetadata - err = run(it.ctx, func() error { + err = run(it.ctx, func(ctx context.Context) error { resp, err = call.Context(ctx).Do() return err - }, it.retry, true, setRetryHeaderHTTP(call)) + }, it.retry, true) if err != nil { return "", err } diff --git a/storage/http_client.go b/storage/http_client.go index c8feb03fa6d1..b62f009da1d2 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -148,11 +148,11 @@ func (c *httpStorageClient) GetServiceAccount(ctx context.Context, project strin s := callSettings(c.settings, opts...) call := c.raw.Projects.ServiceAccount.Get(project) var res *raw.ServiceAccount - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error res, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) if err != nil { return "", err } @@ -182,14 +182,14 @@ func (c *httpStorageClient) CreateBucket(ctx context.Context, project, bucket st req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL) } var battrs *BucketAttrs - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { b, err := req.Context(ctx).Do() if err != nil { return err } battrs, err = newBucket(b) return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) return battrs, err } @@ -210,10 +210,10 @@ func (c *httpStorageClient) ListBuckets(ctx context.Context, project string, opt req.MaxResults(int64(pageSize)) } var resp *raw.Buckets - err = run(it.ctx, func() error { - resp, err = req.Context(it.ctx).Do() + err = run(it.ctx, func(ctx context.Context) error { + resp, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) if err != nil { return "", err } @@ -248,7 +248,7 @@ func (c *httpStorageClient) DeleteBucket(ctx context.Context, bucket string, con req.UserProject(s.userProject) } - return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent) } func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { @@ -264,10 +264,10 @@ func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds } var resp *raw.Bucket - err = run(ctx, func() error { + err = run(ctx, func(ctx context.Context) error { resp, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { @@ -298,10 +298,10 @@ func (c *httpStorageClient) UpdateBucket(ctx context.Context, bucket string, uat } var rawBucket *raw.Bucket - err = run(ctx, func() error { + err = run(ctx, func(ctx context.Context) error { rawBucket, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -317,10 +317,10 @@ func (c *httpStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucke } req := c.raw.Buckets.LockRetentionPolicy(bucket, metageneration) - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { _, err := req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) } func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator { s := callSettings(c.settings, opts...) @@ -357,10 +357,10 @@ func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Q } var resp *raw.Objects var err error - err = run(it.ctx, func() error { - resp, err = req.Context(it.ctx).Do() + err = run(it.ctx, func(ctx context.Context) error { + resp, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) if err != nil { var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { @@ -395,7 +395,7 @@ func (c *httpStorageClient) DeleteObject(ctx context.Context, bucket, object str if s.userProject != "" { req.UserProject(s.userProject) } - err := run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + err := run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { return ErrObjectNotExist @@ -417,10 +417,10 @@ func (c *httpStorageClient) GetObject(ctx context.Context, bucket, object string } var obj *raw.Object var err error - err = run(ctx, func() error { + err = run(ctx, func(ctx context.Context) error { obj, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist @@ -499,7 +499,7 @@ func (c *httpStorageClient) UpdateObject(ctx context.Context, bucket, object str rawObj := attrs.toRawObject(bucket) rawObj.ForceSendFields = forceSendFields rawObj.NullFields = nullFields - call := c.raw.Objects.Patch(bucket, object, rawObj).Projection("full").Context(ctx) + call := c.raw.Objects.Patch(bucket, object, rawObj).Projection("full") if err := applyConds("Update", gen, conds, call); err != nil { return nil, err } @@ -514,7 +514,7 @@ func (c *httpStorageClient) UpdateObject(ctx context.Context, bucket, object str } var obj *raw.Object var err error - err = run(ctx, func() error { obj, err = call.Do(); return err }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + err = run(ctx, func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err }, s.retry, s.idempotent) var e *googleapi.Error if errors.As(err, &e) && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist @@ -531,7 +531,7 @@ func (c *httpStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket s s := callSettings(c.settings, opts...) req := c.raw.DefaultObjectAccessControls.Delete(bucket, string(entity)) configureACLCall(ctx, s.userProject, req) - return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent) } func (c *httpStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { @@ -540,10 +540,10 @@ func (c *httpStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket st var err error req := c.raw.DefaultObjectAccessControls.List(bucket) configureACLCall(ctx, s.userProject, req) - err = run(ctx, func() error { - acls, err = req.Do() + err = run(ctx, func(ctx context.Context) error { + acls, err = req.Context(ctx).Do() return err - }, s.retry, true, setRetryHeaderHTTP(req)) + }, s.retry, true) if err != nil { return nil, err } @@ -560,14 +560,13 @@ func (c *httpStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket s Entity: string(entity), Role: string(role), } - var req setRequest var err error - req = c.raw.DefaultObjectAccessControls.Update(bucket, string(entity), acl) + req := c.raw.DefaultObjectAccessControls.Update(bucket, string(entity), acl) configureACLCall(ctx, s.userProject, req) - return run(ctx, func() error { - _, err = req.Do() + return run(ctx, func(ctx context.Context) error { + _, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) } // Bucket ACL methods. @@ -576,7 +575,7 @@ func (c *httpStorageClient) DeleteBucketACL(ctx context.Context, bucket string, s := callSettings(c.settings, opts...) req := c.raw.BucketAccessControls.Delete(bucket, string(entity)) configureACLCall(ctx, s.userProject, req) - return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent) } func (c *httpStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { @@ -585,10 +584,10 @@ func (c *httpStorageClient) ListBucketACLs(ctx context.Context, bucket string, o var err error req := c.raw.BucketAccessControls.List(bucket) configureACLCall(ctx, s.userProject, req) - err = run(ctx, func() error { - acls, err = req.Do() + err = run(ctx, func(ctx context.Context) error { + acls, err = req.Context(ctx).Do() return err - }, s.retry, true, setRetryHeaderHTTP(req)) + }, s.retry, true) if err != nil { return nil, err } @@ -605,10 +604,10 @@ func (c *httpStorageClient) UpdateBucketACL(ctx context.Context, bucket string, req := c.raw.BucketAccessControls.Update(bucket, string(entity), acl) configureACLCall(ctx, s.userProject, req) var err error - return run(ctx, func() error { - _, err = req.Do() + return run(ctx, func(ctx context.Context) error { + _, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) } // configureACLCall sets the context, user project and headers on the apiary library call. @@ -628,7 +627,7 @@ func (c *httpStorageClient) DeleteObjectACL(ctx context.Context, bucket, object s := callSettings(c.settings, opts...) req := c.raw.ObjectAccessControls.Delete(bucket, object, string(entity)) configureACLCall(ctx, s.userProject, req) - return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent) } // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object. @@ -639,10 +638,10 @@ func (c *httpStorageClient) ListObjectACLs(ctx context.Context, bucket, object s var err error req := c.raw.ObjectAccessControls.List(bucket, object) configureACLCall(ctx, s.userProject, req) - err = run(ctx, func() error { - acls, err = req.Do() + err = run(ctx, func(ctx context.Context) error { + acls, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -661,14 +660,13 @@ func (c *httpStorageClient) UpdateObjectACL(ctx context.Context, bucket, object Entity: string(entity), Role: string(role), } - var req setRequest var err error - req = c.raw.ObjectAccessControls.Update(bucket, object, string(entity), acl) + req := c.raw.ObjectAccessControls.Update(bucket, object, string(entity), acl) configureACLCall(ctx, s.userProject, req) - return run(ctx, func() error { - _, err = req.Do() + return run(ctx, func(ctx context.Context) error { + _, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) + }, s.retry, s.idempotent) } // Media operations. @@ -692,7 +690,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec rawReq.SourceObjects = append(rawReq.SourceObjects, srcObj) } - call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq).Context(ctx) + call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq) if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil { return nil, err } @@ -709,9 +707,9 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec setClientHeader(call.Header()) var err error - retryCall := func() error { obj, err = call.Do(); return err } + retryCall := func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err } - if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { + if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil { return nil, err } return newObject(obj), nil @@ -721,7 +719,7 @@ func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec rawObject := req.dstObject.attrs.toRawObject("") call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject) - call.Context(ctx).Projection("full") + call.Projection("full") if req.token != "" { call.RewriteToken(req.token) } @@ -757,9 +755,9 @@ func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec var err error setClientHeader(call.Header()) - retryCall := func() error { res, err = call.Do(); return err } + retryCall := func(ctx context.Context) error { res, err = call.Context(ctx).Do(); return err } - if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { + if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil { return nil, err } @@ -801,7 +799,6 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa if err != nil { return nil, err } - req = req.WithContext(ctx) if s.userProject != "" { req.Header.Set("X-Goog-User-Project", s.userProject) @@ -821,7 +818,7 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa } reopen := readerReopen(ctx, req.Header, params, s, - func() (*http.Response, error) { return c.hc.Do(req) }, + func(ctx context.Context) (*http.Response, error) { return c.hc.Do(req.WithContext(ctx)) }, func() error { return setConditionsHeaders(req.Header, params.conds) }, func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) }) @@ -836,7 +833,6 @@ func (c *httpStorageClient) newRangeReaderJSON(ctx context.Context, params *newR call := c.raw.Objects.Get(params.bucket, params.object) setClientHeader(call.Header()) - call.Context(ctx) call.Projection("full") if s.userProject != "" { @@ -847,7 +843,7 @@ func (c *httpStorageClient) newRangeReaderJSON(ctx context.Context, params *newR return nil, err } - reopen := readerReopen(ctx, call.Header(), params, s, func() (*http.Response, error) { return call.Download() }, + reopen := readerReopen(ctx, call.Header(), params, s, func(ctx context.Context) (*http.Response, error) { return call.Context(ctx).Download() }, func() error { return applyConds("NewReader", params.gen, params.conds, call) }, func() { call.Generation(params.gen) }) @@ -957,11 +953,11 @@ func (c *httpStorageClient) GetIamPolicy(ctx context.Context, resource string, v call.UserProject(s.userProject) } var rp *raw.Policy - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error rp, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -978,10 +974,10 @@ func (c *httpStorageClient) SetIamPolicy(ctx context.Context, resource string, p call.UserProject(s.userProject) } - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { _, err := call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) } func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) { @@ -992,11 +988,11 @@ func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource str call.UserProject(s.userProject) } var res *raw.TestIamPermissionsResponse - err := run(ctx, func() error { + err := run(ctx, func(ctx context.Context) error { var err error res, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1014,10 +1010,10 @@ func (c *httpStorageClient) GetHMACKey(ctx context.Context, project, accessID st var metadata *raw.HmacKeyMetadata var err error - if err := run(ctx, func() error { + if err := run(ctx, func(ctx context.Context) error { metadata, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { + }, s.retry, s.idempotent); err != nil { return nil, err } hk := &raw.HmacKey{ @@ -1054,10 +1050,10 @@ func (c *httpStorageClient) ListHMACKeys(ctx context.Context, project, serviceAc } var resp *raw.HmacKeysMetadata - err = run(it.ctx, func() error { - resp, err = call.Context(it.ctx).Do() + err = run(it.ctx, func(ctx context.Context) error { + resp, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) if err != nil { return "", err } @@ -1099,10 +1095,10 @@ func (c *httpStorageClient) UpdateHMACKey(ctx context.Context, project, serviceA var metadata *raw.HmacKeyMetadata var err error - if err := run(ctx, func() error { + if err := run(ctx, func(ctx context.Context) error { metadata, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { + }, s.retry, s.idempotent); err != nil { return nil, err } hk := &raw.HmacKey{ @@ -1119,11 +1115,11 @@ func (c *httpStorageClient) CreateHMACKey(ctx context.Context, project, serviceA } var hk *raw.HmacKey - if err := run(ctx, func() error { + if err := run(ctx, func(ctx context.Context) error { h, err := call.Context(ctx).Do() hk = h return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { + }, s.retry, s.idempotent); err != nil { return nil, err } return toHMACKeyFromRaw(hk, true) @@ -1135,9 +1131,9 @@ func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, project string, a if s.userProject != "" { call = call.UserProject(s.userProject) } - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { return call.Context(ctx).Do() - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) } // Notification methods. @@ -1156,10 +1152,10 @@ func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string call.UserProject(s.userProject) } var res *raw.Notifications - err = run(ctx, func() error { + err = run(ctx, func(ctx context.Context) error { res, err = call.Context(ctx).Do() return err - }, s.retry, true, setRetryHeaderHTTP(call)) + }, s.retry, true) if err != nil { return nil, err } @@ -1176,10 +1172,10 @@ func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket strin call.UserProject(s.userProject) } var rn *raw.Notification - err = run(ctx, func() error { + err = run(ctx, func(ctx context.Context) error { rn, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) if err != nil { return nil, err } @@ -1195,9 +1191,9 @@ func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket strin if s.userProject != "" { call.UserProject(s.userProject) } - return run(ctx, func() error { + return run(ctx, func(ctx context.Context) error { return call.Context(ctx).Do() - }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + }, s.retry, s.idempotent) } type httpReader struct { @@ -1246,7 +1242,7 @@ func setRangeReaderHeaders(h http.Header, params *newRangeReaderParams) error { // readerReopen initiates a Read with offset and length, assuming we // have already read seen bytes. func readerReopen(ctx context.Context, header http.Header, params *newRangeReaderParams, s *settings, - doDownload func() (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) { + doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) { return func(seen int64) (*http.Response, error) { // If the context has already expired, return immediately without making a // call. @@ -1273,8 +1269,8 @@ func readerReopen(ctx context.Context, header http.Header, params *newRangeReade var err error var res *http.Response - err = run(ctx, func() error { - res, err = doDownload() + err = run(ctx, func(ctx context.Context) error { + res, err = doDownload(ctx) if err != nil { var e *googleapi.Error if errors.As(err, &e) { @@ -1328,7 +1324,7 @@ func readerReopen(ctx context.Context, header http.Header, params *newRangeReade params.gen = gen64 } return nil - }, s.retry, s.idempotent, setRetryHeaderHTTP(nil)) + }, s.retry, s.idempotent) if err != nil { return nil, err } diff --git a/storage/invoke.go b/storage/invoke.go index d8f5a6d4a6f6..dc79fd88bbc4 100644 --- a/storage/invoke.go +++ b/storage/invoke.go @@ -20,7 +20,6 @@ import ( "fmt" "io" "net" - "net/http" "net/url" "strings" @@ -29,6 +28,7 @@ import ( sinternal "cloud.google.com/go/storage/internal" "github.com/google/uuid" gax "github.com/googleapis/gax-go/v2" + "github.com/googleapis/gax-go/v2/callctx" "google.golang.org/api/googleapi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -37,10 +37,15 @@ import ( var defaultRetry *retryConfig = &retryConfig{} var xGoogDefaultHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), sinternal.Version) +const ( + xGoogHeaderKey = "x-goog-api-client" + idempotencyHeaderKey = "x-goog-gcs-idempotency-token" +) + // run determines whether a retry is necessary based on the config and // idempotency information. It then calls the function with or without retries // as appropriate, using the configured settings. -func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool, setHeader func(string, int)) error { +func run(ctx context.Context, call func(ctx context.Context) error, retry *retryConfig, isIdempotent bool) error { attempts := 1 invocationID := uuid.New().String() @@ -48,8 +53,8 @@ func run(ctx context.Context, call func() error, retry *retryConfig, isIdempoten retry = defaultRetry } if (retry.policy == RetryIdempotent && !isIdempotent) || retry.policy == RetryNever { - setHeader(invocationID, attempts) - return call() + ctxWithHeaders := setInvocationHeaders(ctx, invocationID, attempts) + return call(ctxWithHeaders) } bo := gax.Backoff{} if retry.backoff != nil { @@ -63,35 +68,22 @@ func run(ctx context.Context, call func() error, retry *retryConfig, isIdempoten } return internal.Retry(ctx, bo, func() (stop bool, err error) { - setHeader(invocationID, attempts) - err = call() + ctxWithHeaders := setInvocationHeaders(ctx, invocationID, attempts) + err = call(ctxWithHeaders) attempts++ return !errorFunc(err), err }) } -func setRetryHeaderHTTP(req interface{ Header() http.Header }) func(string, int) { - return func(invocationID string, attempts int) { - if req == nil { - return - } - header := req.Header() - // TODO(b/274504690): Consider dropping gccl-invocation-id key since it - // duplicates the X-Goog-Gcs-Idempotency-Token header (added in v1.31.0). - invocationHeader := fmt.Sprintf("gccl-invocation-id/%v gccl-attempt-count/%v", invocationID, attempts) - xGoogHeader := strings.Join([]string{invocationHeader, xGoogDefaultHeader}, " ") - header.Set("x-goog-api-client", xGoogHeader) - // Also use the invocationID for the idempotency token header, which will - // enable idempotent retries for more operations. - header.Set("x-goog-gcs-idempotency-token", invocationID) - } -} +// Sets invocation ID headers on the context which will be propagated as +// headers in the call to the service (for both gRPC and HTTP). +func setInvocationHeaders(ctx context.Context, invocationID string, attempts int) context.Context { + invocationHeader := fmt.Sprintf("gccl-invocation-id/%v gccl-attempt-count/%v", invocationID, attempts) + xGoogHeader := strings.Join([]string{invocationHeader, xGoogDefaultHeader}, " ") -// TODO: Implement method setting header via context for gRPC -func setRetryHeaderGRPC(_ context.Context) func(string, int) { - return func(_ string, _ int) { - return - } + ctx = callctx.SetHeaders(ctx, xGoogHeaderKey, xGoogHeader) + ctx = callctx.SetHeaders(ctx, idempotencyHeaderKey, invocationID) + return ctx } // ShouldRetry returns true if an error is retryable, based on best practice diff --git a/storage/invoke_test.go b/storage/invoke_test.go index 58f608cb7cd5..005cff117109 100644 --- a/storage/invoke_test.go +++ b/storage/invoke_test.go @@ -26,8 +26,8 @@ import ( "strings" "testing" + "github.com/googleapis/gax-go/v2/callctx" "golang.org/x/xerrors" - "google.golang.org/api/googleapi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -176,27 +176,29 @@ func TestInvoke(t *testing.T) { } { t.Run(test.desc, func(s *testing.T) { counter := 0 - req := &fakeApiaryRequest{header: http.Header{}} var initialClientHeader, initialIdempotencyHeader string - call := func() error { + var gotClientHeader, gotIdempotencyHeader string + call := func(ctx context.Context) error { if counter == 0 { - initialClientHeader = req.Header()["X-Goog-Api-Client"][0] - initialIdempotencyHeader = req.Header()["X-Goog-Gcs-Idempotency-Token"][0] + headers := callctx.HeadersFromContext(ctx) + initialClientHeader = headers["x-goog-api-client"][0] + initialIdempotencyHeader = headers["x-goog-gcs-idempotency-token"][0] } counter++ + headers := callctx.HeadersFromContext(ctx) + gotClientHeader = headers["x-goog-api-client"][0] + gotIdempotencyHeader = headers["x-goog-gcs-idempotency-token"][0] if counter <= test.count { return test.initialErr } return test.finalErr } - got := run(ctx, call, test.retry, test.isIdempotentValue, setRetryHeaderHTTP(req)) + got := run(ctx, call, test.retry, test.isIdempotentValue) if test.expectFinalErr && got != test.finalErr { s.Errorf("got %v, want %v", got, test.finalErr) } else if !test.expectFinalErr && got != test.initialErr { s.Errorf("got %v, want %v", got, test.initialErr) } - gotClientHeader := req.Header()["X-Goog-Api-Client"][0] - gotIdempotencyHeader := req.Header()["X-Goog-Gcs-Idempotency-Token"][0] wantAttempts := 1 + test.count if !test.expectFinalErr { wantAttempts = 1 diff --git a/storage/reader.go b/storage/reader.go index 580353053acd..4673a68d0789 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -187,16 +187,6 @@ func setConditionsHeaders(headers http.Header, conds *Conditions) error { return nil } -// Wrap a request to look similar to an apiary library request, in order to -// be used by run(). -type readerRequestWrapper struct { - req *http.Request -} - -func (w *readerRequestWrapper) Header() http.Header { - return w.req.Header -} - var emptyBody = ioutil.NopCloser(strings.NewReader("")) // Reader reads a Cloud Storage object.