Skip to content

Commit

Permalink
cloud/gcp: add custom retryer for gcs storage, retry on stream INTERN…
Browse files Browse the repository at this point in the history
…AL_ERROR

Currently, errors like
`stream error: stream ID <x>; INTERNAL_ERROR; received from peer`
are not being retried. Create a custom retryer to retry these errors as
suggested by:

googleapis/google-cloud-go#3735
googleapis/google-cloud-go#784

Fixes: cockroachdb#85217, cockroachdb#85216, cockroachdb#85204, cockroachdb#84162

Release note: None
  • Loading branch information
Rui Hu committed Jul 29, 2022
1 parent 1c90519 commit cb92673
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 1 deletion.
3 changes: 2 additions & 1 deletion build/bazelutil/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
"cockroach/pkg/sql/pgwire/pgerror/pgcode\\.go$": "invalid direct cast on error object",
"cockroach/pkg/testutils/lint/lint_test\\.go$": "invalid direct cast on error object",
"cockroach/pkg/util/contextutil/timeout_error\\.go$": "invalid direct cast on error object",
"cockroach/pkg/util/sysutil/sysutil_.*": "type can change by system"
"cockroach/pkg/util/sysutil/sysutil_.*": "type can change by system",
"cockroach/pkg/cloud/gcp/gcs_retry\\.go$": "invalid direct cast on error object"
},
"only_files": {
"cockroach/pkg/.*$": "first-party code"
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloud/gcp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
name = "gcp",
srcs = [
"gcs_kms.go",
"gcs_retry.go",
"gcs_storage.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/cloud/gcp",
Expand All @@ -23,11 +24,15 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_google_cloud_go_kms//apiv1",
"@com_google_cloud_go_storage//:storage",
"@org_golang_google_api//googleapi",
"@org_golang_google_api//impersonate",
"@org_golang_google_api//iterator",
"@org_golang_google_api//option",
"@org_golang_google_genproto//googleapis/cloud/kms/v1:kms",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/wrapperspb",
"@org_golang_x_net//http2",
"@org_golang_x_oauth2//:oauth2",
],
)
Expand Down
78 changes: 78 additions & 0 deletions pkg/cloud/gcp/gcs_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package gcp

import (
"io"
"net"
"net/url"
"strings"

"github.com/cockroachdb/errors"
"google.golang.org/api/googleapi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// defaultShouldRetry is google-cloud's default predicate for determining
// whether an error can be retried.
//
// TODO(rui): Currently this code is copied as-is from the google-cloud-go SDK
// in order to get the default retry behavior on top of our own customizations.
// There's currently a PR in google-cloud-go that exposes the default retry
// function, so this can be removed when it is merged:
// https://github.com/googleapis/google-cloud-go/pull/6370
func defaultShouldRetry(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.ErrUnexpectedEOF) {
return true
}

switch e := err.(type) {
case *net.OpError:
if strings.Contains(e.Error(), "use of closed network connection") {
// TODO: check against net.ErrClosed (go 1.16+) instead of string
return true
}
case *googleapi.Error:
// Retry on 408, 429, and 5xx, according to
// https://cloud.google.com/storage/docs/exponential-backoff.
return e.Code == 408 || e.Code == 429 || (e.Code >= 500 && e.Code < 600)
case *url.Error:
// Retry socket-level errors ECONNREFUSED and ECONNRESET (from syscall).
// Unfortunately the error type is unexported, so we resort to string
// matching.
retriable := []string{"connection refused", "connection reset"}
for _, s := range retriable {
if strings.Contains(e.Error(), s) {
return true
}
}
case interface{ Temporary() bool }:
if e.Temporary() {
return true
}
}
// HTTP 429, 502, 503, and 504 all map to gRPC UNAVAILABLE per
// https://grpc.github.io/grpc/core/md_doc_http-grpc-status-mapping.html.
//
// This is only necessary for the experimental gRPC-based media operations.
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
return true
}
// Unwrap is only supported in go1.13.x+
if e, ok := err.(interface{ Unwrap() error }); ok {
return defaultShouldRetry(e.Unwrap())
}
return false
}
30 changes: 30 additions & 0 deletions pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
"golang.org/x/net/http2"
"golang.org/x/oauth2"
"google.golang.org/api/impersonate"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -174,6 +175,7 @@ func makeGCSStorage(
if err != nil {
return nil, errors.Wrap(err, "failed to create google cloud client")
}
g.SetRetry(gcs.WithErrorFunc(shouldRetry))
bucket := g.Bucket(conf.Bucket)
if conf.BillingProject != `` {
bucket = bucket.UserProject(conf.BillingProject)
Expand Down Expand Up @@ -343,6 +345,34 @@ func (g *gcsStorage) Close() error {
return g.client.Close()
}

// shouldRetry is the predicate that determines whether a GCS client error
// should be retried. The predicate combines google-cloud-go's default retry
// predicate and some additional predicates when determining whether the error
// is retried. The additional predicates are:
//
// - http2.StreamError error with code http2.ErrCodeInternal: this error has
// been recommended to be retried in several issues in the google-cloud-go repo:
// https://github.com/googleapis/google-cloud-go/issues/3735
// https://github.com/googleapis/google-cloud-go/issues/784
// Remove if this error ever becomes part of the default retry predicate.
func shouldRetry(err error) bool {
if defaultShouldRetry(err) {
return true
}

if e := (http2.StreamError{}); errors.As(err, &e) {
if e.Code == http2.ErrCodeInternal {
return true
}
}

if e := (errors.Wrapper)(nil); errors.As(err, &e) {
return shouldRetry(e.Unwrap())
}

return false
}

func init() {
cloud.RegisterExternalStorageProvider(cloudpb.ExternalStorageProvider_gs,
parseGSURL, makeGCSStorage, cloud.RedactedParams(CredentialsParam, BearerTokenParam), "gs")
Expand Down
2 changes: 2 additions & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2208,6 +2208,8 @@ func TestLint(t *testing.T) {
stream.GrepNot(`pkg/roachprod/logger/log\.go:.*format argument is not a constant expression`),
// We purposefully produce nil dereferences in this file to test crash conditions
stream.GrepNot(`pkg/util/log/logcrash/crash_reporting_test\.go:.*nil dereference in type assertion`),
// Temporarily copied code from google-cloud-go's retry predicate.
stream.GrepNot(`pkg/cloud/gcp/gcs_retry\.go:.*invalid direct cast on error object`),
// Spawning naked goroutines is ok when it's not as part of the main CRDB
// binary. This is for now - if we use #58164 to introduce more aggressive
// pooling, etc, then test code needs to adhere as well.
Expand Down

0 comments on commit cb92673

Please sign in to comment.