Skip to content

Commit

Permalink
importccl: add more errors to transient retryable error list
Browse files Browse the repository at this point in the history
This change adds a few more errors that were exposed when stressing
a previously skipped test, that can occur when a worker node running
the import is shutdown. These errors should not cause the job to fail
but instead trigger a replan.

I ran 500+ iterations under stress of TestImportWorkerFailure.

Release note: None
  • Loading branch information
adityamaru authored and pbardea committed May 18, 2021
1 parent b91434d commit ecc031c
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 33 deletions.
7 changes: 4 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2134,8 +2134,9 @@ func ingestWithRetry(
MaxRetries: 5,
}

// We want to retry a restore if there are transient failures (i.e. worker nodes
// dying), so if we receive a retryable error, re-plan and retry the backup.
// We want to retry an import if there are transient failures (i.e. worker
// nodes dying), so if we receive a retryable error, re-plan and retry the
// import.
var res roachpb.BulkOpSummary
var err error
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
Expand All @@ -2144,7 +2145,7 @@ func ingestWithRetry(
break
}

if !utilccl.IsDistSQLRetryableError(err) {
if utilccl.IsPermanentBulkJobError(err) {
return roachpb.BulkOpSummary{}, err
}

Expand Down
17 changes: 6 additions & 11 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4887,17 +4887,12 @@ func TestImportControlJobRBAC(t *testing.T) {
}
}

// TestImportWorkerFailure tests that IMPORT can restart after the failure
// of a worker node.
// TestImportWorkerFailure tests that IMPORT retries after the failure of a
// worker node.
func TestImportWorkerFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// TODO(mjibson): Although this test passes most of the time it still
// sometimes fails because not all kinds of failures caused by shutting a
// node down are detected and retried.
skip.WithIssue(t, 51793, "flaky due to undetected kinds of failures when the node is shutdown")

defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

allowResponse := make(chan struct{})
Expand Down Expand Up @@ -4940,13 +4935,13 @@ func TestImportWorkerFailure(t *testing.T) {
var jobID jobspb.JobID
sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)

// Shut down a node. This should force LoadCSV to fail in its current
// execution. It should detect this as a context canceled error.
// Shut down a node.
tc.StopServer(1)

close(allowResponse)
// We expect the statement to fail.
if err := <-errCh; !testutils.IsError(err, "node failure") {
// We expect the statement to retry since it should have encountered a
// retryable error.
if err := <-errCh; err != nil {
t.Fatal(err)
}

Expand Down
38 changes: 19 additions & 19 deletions pkg/ccl/utilccl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,12 @@
package utilccl

import (
"fmt"
"strings"
)

const retryableJobsFlowError = "retryable jobs error"

type retryableError struct {
wrapped error
}

// MarkRetryableError wraps the given error, marking it as retryable to
// jobs.
func MarkRetryableError(e error) error {
return &retryableError{wrapped: e}
}

// Error implements the error interface.
func (e *retryableError) Error() string {
return fmt.Sprintf("%s: %s", retryableJobsFlowError, e.wrapped.Error())
}

"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
)

// IsDistSQLRetryableError returns true if the supplied error, or any of its parent
// causes is an rpc error.
Expand All @@ -49,3 +34,18 @@ func IsDistSQLRetryableError(err error) bool {
// `(*DistSQLPlanner).Run`.
return strings.Contains(errStr, `rpc error`)
}

// IsPermanentBulkJobError returns true if the error results in a permanent
// failure of a bulk job (IMPORT, BACKUP, RESTORE). This function is a allowlist
// instead of a blocklist: only known safe errors are confirmed to not be
// permanent errors. Anything unknown is assumed to be permanent.
func IsPermanentBulkJobError(err error) bool {
if err == nil {
return false
}

return !IsDistSQLRetryableError(err) &&
!grpcutil.IsClosedConnection(err) &&
!flowinfra.IsNoInboundStreamConnectionError(err) &&
!kvcoord.IsSendError(err)
}
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2158,3 +2158,8 @@ func newSendError(msg string) error {
func (s sendError) Error() string {
return "failed to send RPC: " + s.message
}

// IsSendError returns true if err is a sendError.
func IsSendError(err error) bool {
return errors.HasType(err, sendError{})
}
8 changes: 8 additions & 0 deletions pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ import (
"github.com/cockroachdb/redact"
)

// errNoInboundStreamConnection is the error propagated through the flow when
// the timeout to setup the flow is exceeded.
var errNoInboundStreamConnection = errors.New("no inbound stream connection")

// IsNoInboundStreamConnectionError returns true if err's Cause is an
// errNoInboundStreamConnection.
func IsNoInboundStreamConnectionError(err error) bool {
return errors.Is(err, errNoInboundStreamConnection)
}

// SettingFlowStreamTimeout is a cluster setting that sets the default flow
// stream timeout.
var SettingFlowStreamTimeout = settings.RegisterDurationSetting(
Expand Down

0 comments on commit ecc031c

Please sign in to comment.