Skip to content

Commit

Permalink
Merge pull request #90429 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.2.0-89354
  • Loading branch information
stevendanna authored Oct 28, 2022
2 parents 0dc57e1 + 2561370 commit 775d97c
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 11 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ ALL_TESTS = [
"//pkg/internal/rsg:rsg_test",
"//pkg/internal/sqlsmith:sqlsmith_test",
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1061,6 +1062,7 @@ GO_TARGETS = [
"//pkg/internal/team:team",
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
Expand Down
17 changes: 16 additions & 1 deletion pkg/jobs/joberror/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "joberror",
Expand All @@ -12,8 +12,23 @@ go_library(
"//pkg/util/circuit",
"//pkg/util/grpcutil",
"//pkg/util/sysutil",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "joberror_test",
srcs = ["errors_test.go"],
args = ["-test.timeout=295s"],
embed = [":joberror"],
deps = [
"//pkg/util/circuit",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
6 changes: 5 additions & 1 deletion pkg/jobs/joberror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package joberror
import (
"strings"

circuitbreaker "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
Expand Down Expand Up @@ -41,8 +42,11 @@ func IsDistSQLRetryableError(err error) bool {
}

// isBreakerOpenError returns true if err is a circuit.ErrBreakerOpen.
//
// NB: Two packages have ErrBreakerOpen error types. The cicruitbreaker package
// is used by the nodedialer. The circuit package is used by kvserver.
func isBreakerOpenError(err error) bool {
return errors.Is(err, circuit.ErrBreakerOpen)
return errors.Is(err, circuit.ErrBreakerOpen) || errors.Is(err, circuitbreaker.ErrBreakerOpen)
}

// IsPermanentBulkJobError returns true if the error results in a permanent
Expand Down
45 changes: 45 additions & 0 deletions pkg/jobs/joberror/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package joberror

import (
"fmt"
"testing"

circuitbreaker "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
)

func TestErrBreakerOpenIsRetriable(t *testing.T) {
br := circuit.NewBreaker(circuit.Options{
Name: redact.Sprint("Breaker"),
AsyncProbe: func(_ func(error), done func()) {
done() // never untrip
},
EventHandler: &circuit.EventLogger{Log: func(redact.StringBuilder) {}},
})
br.Report(errors.New("test error"))
utilBreakderErr := br.Signal().Err()
// NB: This matches the error that dial produces.
dialErr := errors.Wrapf(circuitbreaker.ErrBreakerOpen, "unable to dial n%d", 9)

for _, e := range []error{
utilBreakderErr,
dialErr,
} {
t.Run(fmt.Sprintf("%s", e), func(t *testing.T) {
require.False(t, IsPermanentBulkJobError(e))
})
}
}
1 change: 1 addition & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ import (
"github.com/cockroachdb/errors"
)

type importTestingKnobs struct {
afterImport func(summary roachpb.RowCount) error
beforeRunDSP func() error
alwaysFlushJobProgress bool
}

type importResumer struct {
job *jobs.Job
settings *cluster.Settings
res roachpb.RowCount

testingKnobs struct {
afterImport func(summary roachpb.RowCount) error
alwaysFlushJobProgress bool
}
testingKnobs importTestingKnobs
}

func (r *importResumer) TestingSetAfterImportKnob(fn func(summary roachpb.RowCount) error) {
Expand Down Expand Up @@ -281,7 +284,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
procsPerNode := int(processorsPerNode.Get(&p.ExecCfg().Settings.SV))

res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime,
r.testingKnobs.alwaysFlushJobProgress, procsPerNode)
r.testingKnobs, procsPerNode)
if err != nil {
return err
}
Expand Down Expand Up @@ -1260,7 +1263,7 @@ func ingestWithRetry(
from []string,
format roachpb.IOFileFormat,
walltime int64,
alwaysFlushProgress bool,
testingKnobs importTestingKnobs,
procsPerNode int,
) (roachpb.BulkOpSummary, error) {
resumerSpan := tracing.SpanFromContext(ctx)
Expand Down Expand Up @@ -1288,7 +1291,7 @@ func ingestWithRetry(
RetryError: tracing.RedactAndTruncateError(err),
})
res, err = distImport(ctx, execCtx, job, tables, typeDescs, from, format, walltime,
alwaysFlushProgress, procsPerNode)
testingKnobs, procsPerNode)
// Replanning errors should not count towards retry limits.
if err == nil || !errors.Is(err, sql.ErrPlanChanged) {
break
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func distImport(
from []string,
format roachpb.IOFileFormat,
walltime int64,
alwaysFlushProgress bool,
testingKnobs importTestingKnobs,
procsPerNode int,
) (roachpb.BulkOpSummary, error) {

Expand Down Expand Up @@ -191,7 +191,7 @@ func distImport(
accumulatedBulkSummary.Add(meta.BulkProcessorProgress.BulkSummary)
accumulatedBulkSummary.Unlock()

if alwaysFlushProgress {
if testingKnobs.alwaysFlushJobProgress {
return updateJobProgress()
}
}
Expand Down Expand Up @@ -255,6 +255,12 @@ func distImport(
}
})

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return roachpb.BulkOpSummary{}, err
}
}

g.GoCtx(func(ctx context.Context) error {
defer cancelReplanner()
defer close(stopProgress)
Expand Down
71 changes: 71 additions & 0 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -2836,6 +2837,76 @@ func TestExportImportRoundTrip(t *testing.T) {
}
}

// TestImportRetriesBreakerOpenFailure tests that errors resulting from open
// breakers on the coordinator node are retried.
func TestImportRetriesBreakerOpenFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)
skip.UnderRace(t, "takes >1min under race")

const nodes = 3
numFiles := nodes + 2
rowsPerFile := 1

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
ExternalIODir: testutils.TestDataPath(t, "csv")}})
defer tc.Stopper().Stop(ctx)

aboutToRunDSP := make(chan struct{})
allowRunDSP := make(chan struct{})
for i := 0; i < tc.NumServers(); i++ {
tc.Server(i).JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*importResumer)
r.testingKnobs.beforeRunDSP = func() error {
aboutToRunDSP <- struct{}{}
<-allowRunDSP
return nil
}
return r
},
}
}

sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.pk_buffer_size = '16MiB'`)
var tableID int64
sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 't'`).Scan(&tableID)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerFile)
fileListStr := strings.Join(testFiles.files, ", ")
redactedFileListStr := strings.ReplaceAll(fileListStr, "?AWS_SESSION_TOKEN=secrets", "?AWS_SESSION_TOKEN=redacted")
query := fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, fileListStr)
sqlDB.Exec(t, query)
return jobutils.VerifySystemJob(t, sqlDB, 0, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{
Username: username.RootUserName(),
Description: fmt.Sprintf(`IMPORT INTO defaultdb.public.t(a, b) CSV DATA (%s)`, redactedFileListStr),
DescriptorIDs: []descpb.ID{descpb.ID(tableID)},
})
})

// On the first attempt, we trip the node 3 breaker between distsql planning
// and actually running the plan.
<-aboutToRunDSP
breaker := tc.Server(0).DistSQLServer().(*distsql.ServerImpl).PodNodeDialer.GetCircuitBreaker(roachpb.NodeID(3), rpc.DefaultClass)
breaker.Break()
allowRunDSP <- struct{}{}

// The failure above should be retried. We expect this to succeed even if we
// don't reset the breaker because node 3 should no longer be included in
// the plan.
<-aboutToRunDSP
allowRunDSP <- struct{}{}
require.NoError(t, g.Wait())
}

// TODO(adityamaru): Tests still need to be added incrementally as
// relevant IMPORT INTO logic is added. Some of them include:
// -> FK and constraint violation
Expand Down

0 comments on commit 775d97c

Please sign in to comment.