From 784b59ce1d94a8ed7205a9dd31719025661dbc6a Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 5 Oct 2022 20:54:54 +0000 Subject: [PATCH 1/3] opt: assert that inverted scans have inverted constraints This commit adds an assertion to ensure that inverted index scans have inverted constraints. If they do not, there is a likely a bug that can cause incorrect query results (e.g., #88047). This assertion is made in release builds,not just test builds, because it is cheap to perform. Fixes #89440 Release note: None --- pkg/sql/opt/exec/execbuilder/relational.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 9b42a104c97e..25019863f24f 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -724,6 +724,12 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) { } } + idx := tab.Index(scan.Index) + if idx.IsInverted() && len(scan.InvertedConstraint) == 0 { + return execPlan{}, + errors.AssertionFailedf("expected inverted index scan to have an inverted constraint") + } + // Save if we planned a full table/index scan on the builder so that the // planner can be made aware later. We only do this for non-virtual tables. stats := scan.Relational().Statistics() From ce950cfb7c455b5e03ab81f4416430f30766f582 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 3 Oct 2022 00:13:46 -0400 Subject: [PATCH 2/3] roachtests: introduce admission-control/elastic-backup Informs #89208. This test sets up a 3-node CRDB cluster on 8vCPU machines running 1000-warehouse TPC-C with an aggressive (every 20m) full backup schedule. We've observed latency spikes during backups because of its CPU-heavy nature -- it can elevate CPU scheduling latencies which in turn translates to an increase in foreground latency. In #86638 we introduced admission control mechanisms to dynamically pace such work while maintaining acceptable CPU scheduling latencies (sub millisecond p99s). This roachtest exercises that machinery. In future commits we'll add libraries to the roachtest package to automatically spit out the degree to which {CPU-scheduler,foreground} latencies are protected. Release note: None --- pkg/cmd/roachtest/tests/BUILD.bazel | 1 + pkg/cmd/roachtest/tests/admission_control.go | 1 + .../tests/admission_control_elastic_backup.go | 110 ++++++++++++++++++ .../admission_control_snapshot_overload.go | 24 ++-- pkg/cmd/roachtest/tests/tpcc.go | 73 ++++++++---- pkg/cmd/roachtest/tests/util.go | 10 +- pkg/roachprod/prometheus/prometheus.go | 7 ++ 7 files changed, 188 insertions(+), 38 deletions(-) create mode 100644 pkg/cmd/roachtest/tests/admission_control_elastic_backup.go diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index fdc508143029..28e66b5fcf1e 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "activerecord.go", "activerecord_blocklist.go", "admission_control.go", + "admission_control_elastic_backup.go", "admission_control_multi_store_overload.go", "admission_control_snapshot_overload.go", "admission_control_tpcc_overload.go", diff --git a/pkg/cmd/roachtest/tests/admission_control.go b/pkg/cmd/roachtest/tests/admission_control.go index b8e092247826..4d3835f14493 100644 --- a/pkg/cmd/roachtest/tests/admission_control.go +++ b/pkg/cmd/roachtest/tests/admission_control.go @@ -28,6 +28,7 @@ func registerAdmission(r registry.Registry) { // roachperf. Need to munge with histogram data to compute % test run spent // over some latency threshold. Will be Useful to track over time. + registerElasticControlForBackups(r) registerMultiStoreOverload(r) registerSnapshotOverload(r) registerTPCCOverload(r) diff --git a/pkg/cmd/roachtest/tests/admission_control_elastic_backup.go b/pkg/cmd/roachtest/tests/admission_control_elastic_backup.go new file mode 100644 index 000000000000..77b09c9b7dc6 --- /dev/null +++ b/pkg/cmd/roachtest/tests/admission_control_elastic_backup.go @@ -0,0 +1,110 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" +) + +// This test sets up a 3-node CRDB cluster on 8vCPU machines running +// 1000-warehouse TPC-C with an aggressive (every 20m) full backup schedule. +// We've observed latency spikes during backups because of its CPU-heavy nature +// -- it can elevate CPU scheduling latencies which in turn translates to an +// increase in foreground latency. In #86638 we introduced admission control +// mechanisms to dynamically pace such work while maintaining acceptable CPU +// scheduling latencies (sub millisecond p99s). This roachtest exercises that +// machinery. +// +// TODO(irfansharif): Add libraries to automatically spit out the degree to +// which {CPU-scheduler,foreground} latencies are protected and track this data +// in roachperf. +func registerElasticControlForBackups(r registry.Registry) { + r.Add(registry.TestSpec{ + Name: "admission-control/elastic-backup", + Owner: registry.OwnerAdmissionControl, + // TODO(irfansharif): After two weeks of nightly baking time, reduce + // this to a weekly cadence. This is a long-running test and serves only + // as a coarse-grained benchmark. + // Tags: []string{`weekly`}, + Cluster: r.MakeClusterSpec(4, spec.CPU(8)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + if c.Spec().NodeCount < 4 { + t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount) + } + + crdbNodes := c.Spec().NodeCount - 1 + workloadNode := crdbNodes + 1 + numWarehouses, workloadDuration, estimatedSetupTime := 1000, 90*time.Minute, 10*time.Minute + if c.IsLocal() { + numWarehouses, workloadDuration, estimatedSetupTime = 1, time.Minute, 2*time.Minute + } + + promCfg := &prometheus.Config{} + promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]). + WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()). + WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()). + WithGrafanaDashboard("http://go.crdb.dev/p/backup-admission-control-grafana"). + WithScrapeConfigs( + prometheus.MakeWorkloadScrapeConfig("workload", "/", + makeWorkloadScrapeNodes( + c.Node(workloadNode).InstallNodes()[0], + []workloadInstance{{nodes: c.Node(workloadNode)}}, + ), + ), + ) + + if t.SkipInit() { + t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute)) + } else { + t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute)) + } + + runTPCC(ctx, t, c, tpccOptions{ + Warehouses: numWarehouses, + Duration: workloadDuration, + SetupType: usingImport, + EstimatedSetupTime: estimatedSetupTime, + SkipPostRunCheck: true, + ExtraSetupArgs: "--checks=false", + PrometheusConfig: promCfg, + During: func(ctx context.Context) error { + db := c.Conn(ctx, t.L(), crdbNodes) + defer db.Close() + + t.Status(fmt.Sprintf("during: enabling admission control (<%s)", 30*time.Second)) + setAdmissionControl(ctx, t, c, true) + + m := c.NewMonitor(ctx, c.Range(1, crdbNodes)) + m.Go(func(ctx context.Context) error { + t.Status(fmt.Sprintf("during: creating full backup schedule to run every 20m (<%s)", time.Minute)) + _, err := db.ExecContext(ctx, + `CREATE SCHEDULE FOR BACKUP INTO $1 RECURRING '*/20 * * * *' FULL BACKUP ALWAYS WITH SCHEDULE OPTIONS ignore_existing_backups;`, + "gs://cockroachdb-backup-testing/"+c.Name()+"?AUTH=implicit", + ) + return err + }) + m.Wait() + + t.Status(fmt.Sprintf("during: waiting for workload to finish (<%s)", workloadDuration)) + return nil + }, + }) + }, + }) +} diff --git a/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go b/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go index 216b58fbd26d..d34186c8081c 100644 --- a/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go +++ b/pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go @@ -89,17 +89,19 @@ func registerSnapshotOverload(r registry.Registry) { } t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute)) - promCfg := &prometheus.Config{} - promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]) - promCfg.WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) - promCfg.WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) - promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig("workload", - "/", makeWorkloadScrapeNodes(c.Node(workloadNode).InstallNodes()[0], []workloadInstance{ - {nodes: c.Node(workloadNode)}, - }))) - promCfg.WithGrafanaDashboard("http://go.crdb.dev/p/snapshot-admission-control-grafana") - _, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil) - defer cleanupFunc() + { + promCfg := &prometheus.Config{} + promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]) + promCfg.WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) + promCfg.WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) + promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig("workload", + "/", makeWorkloadScrapeNodes(c.Node(workloadNode).InstallNodes()[0], []workloadInstance{ + {nodes: c.Node(workloadNode)}, + }))) + promCfg.WithGrafanaDashboard("http://go.crdb.dev/p/snapshot-admission-control-grafana") + _, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil) + defer cleanupFunc() + } var constraints []string for i := 1; i <= crdbNodes; i++ { diff --git a/pkg/cmd/roachtest/tests/tpcc.go b/pkg/cmd/roachtest/tests/tpcc.go index 3d06ad4fd750..fead65e29d9b 100644 --- a/pkg/cmd/roachtest/tests/tpcc.go +++ b/pkg/cmd/roachtest/tests/tpcc.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -52,13 +53,14 @@ const ( ) type tpccOptions struct { - Warehouses int - ExtraRunArgs string - ExtraSetupArgs string - Chaos func() Chaos // for late binding of stopper - During func(context.Context) error // for running a function during the test - Duration time.Duration // if zero, TPCC is not invoked - SetupType tpccSetupType + Warehouses int + ExtraRunArgs string + ExtraSetupArgs string + Chaos func() Chaos // for late binding of stopper + During func(context.Context) error // for running a function during the test + Duration time.Duration // if zero, TPCC is not invoked + SetupType tpccSetupType + EstimatedSetupTime time.Duration // PrometheusConfig, if set, overwrites the default prometheus config settings. PrometheusConfig *prometheus.Config // DisablePrometheus will force prometheus to not start up. @@ -86,6 +88,8 @@ type tpccOptions struct { // // TODO(tbg): remove this once https://github.com/cockroachdb/cockroach/issues/74705 is completed. EnableCircuitBreakers bool + // SkipPostRunCheck, if set, skips post TPC-C run checks. + SkipPostRunCheck bool } type workloadInstance struct { @@ -125,6 +129,7 @@ func setupTPCC( // Randomize starting with encryption-at-rest enabled. crdbNodes = c.Range(1, c.Spec().NodeCount-1) workloadNode = c.Node(c.Spec().NodeCount) + if c.IsLocal() { opts.Warehouses = 1 } @@ -134,10 +139,12 @@ func setupTPCC( // NB: workloadNode also needs ./cockroach because // of `./cockroach workload` for usingImport. c.Put(ctx, t.Cockroach(), "./cockroach", c.All()) - // We still use bare workload, though we could likely replace - // those with ./cockroach workload as well. - c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), crdbNodes) + settings := install.MakeClusterSettings() + if c.IsLocal() { + settings.Env = append(settings.Env, "COCKROACH_SCAN_INTERVAL=200ms") + settings.Env = append(settings.Env, "COCKROACH_SCAN_MAX_IDLE_TIME=5ms") + } + c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, crdbNodes) } } @@ -149,29 +156,39 @@ func setupTPCC( _, err := db.Exec(`SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '15s'`) require.NoError(t, err) } - err := WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0])) - require.NoError(t, err) + + if t.SkipInit() { + return + } + + require.NoError(t, WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0]))) + + estimatedSetupTimeStr := "" + if opts.EstimatedSetupTime != 0 { + estimatedSetupTimeStr = fmt.Sprintf(" (<%s)", opts.EstimatedSetupTime) + } + switch opts.SetupType { case usingExistingData: // Do nothing. case usingImport: - t.Status("loading fixture") + t.Status("loading fixture" + estimatedSetupTimeStr) c.Run(ctx, crdbNodes[:1], tpccImportCmd(opts.Warehouses, opts.ExtraSetupArgs)) case usingInit: - t.Status("initializing tables") + t.Status("initializing tables" + estimatedSetupTimeStr) extraArgs := opts.ExtraSetupArgs if !t.BuildVersion().AtLeast(version.MustParse("v20.2.0")) { extraArgs += " --deprecated-fk-indexes" } cmd := fmt.Sprintf( - "./workload init tpcc --warehouses=%d %s {pgurl:1}", + "./cockroach workload init tpcc --warehouses=%d %s {pgurl:1}", opts.Warehouses, extraArgs, ) c.Run(ctx, workloadNode, cmd) default: t.Fatal("unknown tpcc setup type") } - t.Status("") + t.Status("finished tpc-c setup") }() return crdbNodes, workloadNode } @@ -223,7 +240,6 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio rampDuration = 30 * time.Second } crdbNodes, workloadNode := setupTPCC(ctx, t, c, opts) - t.Status("waiting") m := c.NewMonitor(ctx, crdbNodes) for i := range workloadInstances { // Make a copy of i for the goroutine. @@ -235,7 +251,8 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio if len(workloadInstances) > 1 { statsPrefix = fmt.Sprintf("workload_%d.", i) } - t.WorkerStatus(fmt.Sprintf("running tpcc idx %d on %s", i, pgURLs[i])) + t.WorkerStatus(fmt.Sprintf("running tpcc worker=%d warehouses=%d ramp=%s duration=%s on %s (<%s)", + i, opts.Warehouses, rampDuration, opts.Duration, pgURLs[i], time.Minute)) cmd := fmt.Sprintf( "./cockroach workload run tpcc --warehouses=%d --histograms="+t.PerfArtifactsDir()+"/%sstats.json "+ opts.ExtraRunArgs+" --ramp=%s --duration=%s --prometheus-port=%d --pprofport=%d %s %s", @@ -260,8 +277,10 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio } m.Wait() - c.Run(ctx, workloadNode, fmt.Sprintf( - "./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses)) + if !opts.SkipPostRunCheck { + c.Run(ctx, workloadNode, fmt.Sprintf( + "./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses)) + } // Check no errors from metrics. if ep != nil { @@ -1447,15 +1466,21 @@ func setupPrometheusForRoachtest( } } if c.IsLocal() { - t.Skip("skipping test as prometheus is needed, but prometheus does not yet work locally") + t.Status("ignoring prometheus setup given --local was specified") return nil, func() {} } - if err := c.StartGrafana(ctx, t.L(), cfg); err != nil { + t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute)) + + quietLogger, err := t.L().ChildLogger("start-grafana", logger.QuietStdout, logger.QuietStderr) + if err != nil { + t.Fatal(err) + } + if err := c.StartGrafana(ctx, quietLogger, cfg); err != nil { t.Fatal(err) } cleanupFunc := func() { - if err := c.StopGrafana(ctx, t.L(), t.ArtifactsDir()); err != nil { + if err := c.StopGrafana(ctx, quietLogger, t.ArtifactsDir()); err != nil { t.L().ErrorfCtx(ctx, "Error(s) shutting down prom/grafana %s", err) } } diff --git a/pkg/cmd/roachtest/tests/util.go b/pkg/cmd/roachtest/tests/util.go index 71ab5f628b28..99924ef89eeb 100644 --- a/pkg/cmd/roachtest/tests/util.go +++ b/pkg/cmd/roachtest/tests/util.go @@ -37,7 +37,7 @@ func WaitFor3XReplication(ctx context.Context, t test.Test, db *gosql.DB) error func WaitForReplication( ctx context.Context, t test.Test, db *gosql.DB, replicationFactor int, ) error { - t.L().Printf("waiting for initial up-replication...") + t.L().Printf("waiting for initial up-replication... (<%s)", 2*time.Minute) tStart := timeutil.Now() var oldN int for { @@ -114,8 +114,12 @@ func setAdmissionControl(ctx context.Context, t test.Test, c cluster.Cluster, en if !enabled { val = "false" } - for _, setting := range []string{"admission.kv.enabled", "admission.sql_kv_response.enabled", - "admission.sql_sql_response.enabled"} { + for _, setting := range []string{ + "admission.kv.enabled", + "admission.sql_kv_response.enabled", + "admission.sql_sql_response.enabled", + "admission.elastic_cpu.enabled", + } { if _, err := db.ExecContext( ctx, "SET CLUSTER SETTING "+setting+" = '"+val+"'"); err != nil { t.Fatalf("failed to set admission control to %t: %v", enabled, err) diff --git a/pkg/roachprod/prometheus/prometheus.go b/pkg/roachprod/prometheus/prometheus.go index ab9278586eef..f35597157788 100644 --- a/pkg/roachprod/prometheus/prometheus.go +++ b/pkg/roachprod/prometheus/prometheus.go @@ -140,6 +140,13 @@ func (cfg *Config) WithGrafanaDashboard(url string) *Config { return cfg } +// WithScrapeConfigs adds scraping configs to the prometheus instance. Chains +// for convenience. +func (cfg *Config) WithScrapeConfigs(config ...ScrapeConfig) *Config { + cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, config...) + return cfg +} + // WithNodeExporter causes node_exporter to be set up on the specified machines, // a separate process that sends hardware metrics to prometheus. // For more on the node exporter process, see https://prometheus.io/docs/guides/node-exporter/ From 9aa0c6eb7a7d3acd5e1279cb55cb9cff20c2f764 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Sat, 10 Sep 2022 01:03:33 +0000 Subject: [PATCH 3/3] changefeedccl: mark kv senderrors retryable Resolves #87300 Changefeeds can encounter senderrors during a normal upgrade procedure and therefore should retry. This was done in the kvfeed however is apparently not high level enough as a send error was still observed to cause a permanent failure. This PR moves the senderror checking to the top level IsRetryable check to handle it regardless of its source. Release justification: low risk important bug fix Release note (bug fix): Changefeeds will now never permanently error on a "failed to send RPC" error. --- .../changefeedccl/changefeedbase/BUILD.bazel | 1 + pkg/ccl/changefeedccl/changefeedbase/errors.go | 17 +++++++++++++---- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 6 ------ pkg/kv/kvclient/kvcoord/dist_sender.go | 6 +++++- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index e03044faa0bb..4bb125e722a1 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -15,6 +15,7 @@ go_library( deps = [ "//pkg/jobs/joberror", "//pkg/jobs/jobspb", + "//pkg/kv/kvclient/kvcoord", "//pkg/roachpb", "//pkg/settings", "//pkg/sql", diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index a10415b4ad4c..22ab91666e03 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" @@ -108,14 +109,22 @@ func IsRetryableError(err error) bool { return true } + // During node shutdown it is possible for all outgoing transports used by + // the kvfeed to expire, producing a SendError that the node is still able + // to propagate to the frontier. This has been known to happen during + // cluster upgrades. This scenario should not fail the changefeed. + if kvcoord.IsSendError(err) { + return true + } + // TODO(knz): this is a bad implementation. Make it go away // by avoiding string comparisons. + // If a RetryableError occurs on a remote node, DistSQL serializes it such + // that we can't recover the structure and we have to rely on this + // unfortunate string comparison. errStr := err.Error() - if strings.Contains(errStr, retryableErrorString) { - // If a RetryableError occurs on a remote node, DistSQL serializes it such - // that we can't recover the structure and we have to rely on this - // unfortunate string comparison. + if strings.Contains(errStr, retryableErrorString) || strings.Contains(errStr, kvcoord.SendErrorString) { return true } diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index f609a133c77a..16619198f866 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -494,12 +494,6 @@ func (f *kvFeed) runUntilTableEvent( return nil } else if tErr := (*errEndTimeReached)(nil); errors.As(err, &tErr) { return err - } else if kvcoord.IsSendError(err) { - // During node shutdown it is possible for all outgoing transports used by - // the kvfeed to expire, producing a SendError that the node is still able - // to propagate to the frontier. This has been known to happen during - // cluster upgrades. This scenario should not fail the changefeed. - return changefeedbase.MarkRetryableError(err) } else { return err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index eba885e24220..7f154c63d7ea 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2388,8 +2388,12 @@ func TestNewSendError(msg string) error { return newSendError(msg) } +// SendErrorString is the prefix for all sendErrors, exported in order to +// perform cross-node error-checks. +const SendErrorString = "failed to send RPC" + func (s sendError) Error() string { - return "failed to send RPC: " + s.message + return SendErrorString + ": " + s.message } // IsSendError returns true if err is a sendError.