Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87763: changefeedccl: mark kv senderrors retryable r=samiskin a=samiskin

Resolves #87300

Changefeeds can encounter senderrors during a normal upgrade procedure and therefore should retry.  This was done in the kvfeed however is apparently not high level enough as a send error was still observed to cause a permanent failure.

This PR moves the senderror checking to the top level IsRetryable check to handle it regardless of its source.

Release justification: low risk important bug fix

Release note (bug fix): Changefeeds will now never permanently error on a "failed to send RPC" error.

89445: opt: assert that inverted scans have inverted constraints r=mgartner a=mgartner

This commit adds an assertion to ensure that inverted index scans have inverted constraints. If they do not, there is a likely a bug that can cause incorrect query results (e.g., #88047). This assertion is made in release builds,not just test builds, because it is cheap to perform.

Fixes #89440

Release note: None

89482: roachtests: introduce admission-control/elastic-backup r=irfansharif a=irfansharif

Informs #89208. This test sets up a 3-node CRDB cluster on 8vCPU machines running 1000-warehouse TPC-C with an aggressive (every 20m) full backup schedule. We've observed latency spikes during backups because of its CPU-heavy nature -- it can elevate CPU scheduling latencies which in turn translates to an increase in foreground latency. In #86638 we introduced admission control mechanisms to dynamically pace such work while maintaining acceptable CPU scheduling latencies (sub millisecond p99s). This roachtest exercises that machinery. In future commits we'll add libraries to the roachtest package to automatically spit out the degree to which {CPU-scheduler,foreground} latencies are protected.

Release note: None

Co-authored-by: Shiranka Miskin <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
4 people committed Oct 6, 2022
4 parents b198140 + 9aa0c6e + 784b59c + ce950cf commit 516293f
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
deps = [
"//pkg/jobs/joberror",
"//pkg/jobs/jobspb",
"//pkg/kv/kvclient/kvcoord",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql",
Expand Down
17 changes: 13 additions & 4 deletions pkg/ccl/changefeedccl/changefeedbase/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
Expand Down Expand Up @@ -108,14 +109,22 @@ func IsRetryableError(err error) bool {
return true
}

// During node shutdown it is possible for all outgoing transports used by
// the kvfeed to expire, producing a SendError that the node is still able
// to propagate to the frontier. This has been known to happen during
// cluster upgrades. This scenario should not fail the changefeed.
if kvcoord.IsSendError(err) {
return true
}

// TODO(knz): this is a bad implementation. Make it go away
// by avoiding string comparisons.

// If a RetryableError occurs on a remote node, DistSQL serializes it such
// that we can't recover the structure and we have to rely on this
// unfortunate string comparison.
errStr := err.Error()
if strings.Contains(errStr, retryableErrorString) {
// If a RetryableError occurs on a remote node, DistSQL serializes it such
// that we can't recover the structure and we have to rely on this
// unfortunate string comparison.
if strings.Contains(errStr, retryableErrorString) || strings.Contains(errStr, kvcoord.SendErrorString) {
return true
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,6 @@ func (f *kvFeed) runUntilTableEvent(
return nil
} else if tErr := (*errEndTimeReached)(nil); errors.As(err, &tErr) {
return err
} else if kvcoord.IsSendError(err) {
// During node shutdown it is possible for all outgoing transports used by
// the kvfeed to expire, producing a SendError that the node is still able
// to propagate to the frontier. This has been known to happen during
// cluster upgrades. This scenario should not fail the changefeed.
return changefeedbase.MarkRetryableError(err)
} else {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"activerecord.go",
"activerecord_blocklist.go",
"admission_control.go",
"admission_control_elastic_backup.go",
"admission_control_multi_store_overload.go",
"admission_control_snapshot_overload.go",
"admission_control_tpcc_overload.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func registerAdmission(r registry.Registry) {
// roachperf. Need to munge with histogram data to compute % test run spent
// over some latency threshold. Will be Useful to track over time.

registerElasticControlForBackups(r)
registerMultiStoreOverload(r)
registerSnapshotOverload(r)
registerTPCCOverload(r)
Expand Down
110 changes: 110 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_elastic_backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
)

// This test sets up a 3-node CRDB cluster on 8vCPU machines running
// 1000-warehouse TPC-C with an aggressive (every 20m) full backup schedule.
// We've observed latency spikes during backups because of its CPU-heavy nature
// -- it can elevate CPU scheduling latencies which in turn translates to an
// increase in foreground latency. In #86638 we introduced admission control
// mechanisms to dynamically pace such work while maintaining acceptable CPU
// scheduling latencies (sub millisecond p99s). This roachtest exercises that
// machinery.
//
// TODO(irfansharif): Add libraries to automatically spit out the degree to
// which {CPU-scheduler,foreground} latencies are protected and track this data
// in roachperf.
func registerElasticControlForBackups(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/elastic-backup",
Owner: registry.OwnerAdmissionControl,
// TODO(irfansharif): After two weeks of nightly baking time, reduce
// this to a weekly cadence. This is a long-running test and serves only
// as a coarse-grained benchmark.
// Tags: []string{`weekly`},
Cluster: r.MakeClusterSpec(4, spec.CPU(8)),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount < 4 {
t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount)
}

crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1
numWarehouses, workloadDuration, estimatedSetupTime := 1000, 90*time.Minute, 10*time.Minute
if c.IsLocal() {
numWarehouses, workloadDuration, estimatedSetupTime = 1, time.Minute, 2*time.Minute
}

promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboard("http://go.crdb.dev/p/backup-admission-control-grafana").
WithScrapeConfigs(
prometheus.MakeWorkloadScrapeConfig("workload", "/",
makeWorkloadScrapeNodes(
c.Node(workloadNode).InstallNodes()[0],
[]workloadInstance{{nodes: c.Node(workloadNode)}},
),
),
)

if t.SkipInit() {
t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute))
} else {
t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute))
}

runTPCC(ctx, t, c, tpccOptions{
Warehouses: numWarehouses,
Duration: workloadDuration,
SetupType: usingImport,
EstimatedSetupTime: estimatedSetupTime,
SkipPostRunCheck: true,
ExtraSetupArgs: "--checks=false",
PrometheusConfig: promCfg,
During: func(ctx context.Context) error {
db := c.Conn(ctx, t.L(), crdbNodes)
defer db.Close()

t.Status(fmt.Sprintf("during: enabling admission control (<%s)", 30*time.Second))
setAdmissionControl(ctx, t, c, true)

m := c.NewMonitor(ctx, c.Range(1, crdbNodes))
m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("during: creating full backup schedule to run every 20m (<%s)", time.Minute))
_, err := db.ExecContext(ctx,
`CREATE SCHEDULE FOR BACKUP INTO $1 RECURRING '*/20 * * * *' FULL BACKUP ALWAYS WITH SCHEDULE OPTIONS ignore_existing_backups;`,
"gs://cockroachdb-backup-testing/"+c.Name()+"?AUTH=implicit",
)
return err
})
m.Wait()

t.Status(fmt.Sprintf("during: waiting for workload to finish (<%s)", workloadDuration))
return nil
},
})
},
})
}
24 changes: 13 additions & 11 deletions pkg/cmd/roachtest/tests/admission_control_snapshot_overload.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,19 @@ func registerSnapshotOverload(r registry.Registry) {
}

t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute))
promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0])
promCfg.WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes())
promCfg.WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes())
promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig("workload",
"/", makeWorkloadScrapeNodes(c.Node(workloadNode).InstallNodes()[0], []workloadInstance{
{nodes: c.Node(workloadNode)},
})))
promCfg.WithGrafanaDashboard("http://go.crdb.dev/p/snapshot-admission-control-grafana")
_, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil)
defer cleanupFunc()
{
promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0])
promCfg.WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes())
promCfg.WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes())
promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig("workload",
"/", makeWorkloadScrapeNodes(c.Node(workloadNode).InstallNodes()[0], []workloadInstance{
{nodes: c.Node(workloadNode)},
})))
promCfg.WithGrafanaDashboard("http://go.crdb.dev/p/snapshot-admission-control-grafana")
_, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil)
defer cleanupFunc()
}

var constraints []string
for i := 1; i <= crdbNodes; i++ {
Expand Down
73 changes: 49 additions & 24 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand All @@ -52,13 +53,14 @@ const (
)

type tpccOptions struct {
Warehouses int
ExtraRunArgs string
ExtraSetupArgs string
Chaos func() Chaos // for late binding of stopper
During func(context.Context) error // for running a function during the test
Duration time.Duration // if zero, TPCC is not invoked
SetupType tpccSetupType
Warehouses int
ExtraRunArgs string
ExtraSetupArgs string
Chaos func() Chaos // for late binding of stopper
During func(context.Context) error // for running a function during the test
Duration time.Duration // if zero, TPCC is not invoked
SetupType tpccSetupType
EstimatedSetupTime time.Duration
// PrometheusConfig, if set, overwrites the default prometheus config settings.
PrometheusConfig *prometheus.Config
// DisablePrometheus will force prometheus to not start up.
Expand Down Expand Up @@ -86,6 +88,8 @@ type tpccOptions struct {
//
// TODO(tbg): remove this once https://github.com/cockroachdb/cockroach/issues/74705 is completed.
EnableCircuitBreakers bool
// SkipPostRunCheck, if set, skips post TPC-C run checks.
SkipPostRunCheck bool
}

type workloadInstance struct {
Expand Down Expand Up @@ -125,6 +129,7 @@ func setupTPCC(
// Randomize starting with encryption-at-rest enabled.
crdbNodes = c.Range(1, c.Spec().NodeCount-1)
workloadNode = c.Node(c.Spec().NodeCount)

if c.IsLocal() {
opts.Warehouses = 1
}
Expand All @@ -134,10 +139,12 @@ func setupTPCC(
// NB: workloadNode also needs ./cockroach because
// of `./cockroach workload` for usingImport.
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
// We still use bare workload, though we could likely replace
// those with ./cockroach workload as well.
c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode)
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), crdbNodes)
settings := install.MakeClusterSettings()
if c.IsLocal() {
settings.Env = append(settings.Env, "COCKROACH_SCAN_INTERVAL=200ms")
settings.Env = append(settings.Env, "COCKROACH_SCAN_MAX_IDLE_TIME=5ms")
}
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, crdbNodes)
}
}

Expand All @@ -149,29 +156,39 @@ func setupTPCC(
_, err := db.Exec(`SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '15s'`)
require.NoError(t, err)
}
err := WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0]))
require.NoError(t, err)

if t.SkipInit() {
return
}

require.NoError(t, WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0])))

estimatedSetupTimeStr := ""
if opts.EstimatedSetupTime != 0 {
estimatedSetupTimeStr = fmt.Sprintf(" (<%s)", opts.EstimatedSetupTime)
}

switch opts.SetupType {
case usingExistingData:
// Do nothing.
case usingImport:
t.Status("loading fixture")
t.Status("loading fixture" + estimatedSetupTimeStr)
c.Run(ctx, crdbNodes[:1], tpccImportCmd(opts.Warehouses, opts.ExtraSetupArgs))
case usingInit:
t.Status("initializing tables")
t.Status("initializing tables" + estimatedSetupTimeStr)
extraArgs := opts.ExtraSetupArgs
if !t.BuildVersion().AtLeast(version.MustParse("v20.2.0")) {
extraArgs += " --deprecated-fk-indexes"
}
cmd := fmt.Sprintf(
"./workload init tpcc --warehouses=%d %s {pgurl:1}",
"./cockroach workload init tpcc --warehouses=%d %s {pgurl:1}",
opts.Warehouses, extraArgs,
)
c.Run(ctx, workloadNode, cmd)
default:
t.Fatal("unknown tpcc setup type")
}
t.Status("")
t.Status("finished tpc-c setup")
}()
return crdbNodes, workloadNode
}
Expand Down Expand Up @@ -223,7 +240,6 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio
rampDuration = 30 * time.Second
}
crdbNodes, workloadNode := setupTPCC(ctx, t, c, opts)
t.Status("waiting")
m := c.NewMonitor(ctx, crdbNodes)
for i := range workloadInstances {
// Make a copy of i for the goroutine.
Expand All @@ -235,7 +251,8 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio
if len(workloadInstances) > 1 {
statsPrefix = fmt.Sprintf("workload_%d.", i)
}
t.WorkerStatus(fmt.Sprintf("running tpcc idx %d on %s", i, pgURLs[i]))
t.WorkerStatus(fmt.Sprintf("running tpcc worker=%d warehouses=%d ramp=%s duration=%s on %s (<%s)",
i, opts.Warehouses, rampDuration, opts.Duration, pgURLs[i], time.Minute))
cmd := fmt.Sprintf(
"./cockroach workload run tpcc --warehouses=%d --histograms="+t.PerfArtifactsDir()+"/%sstats.json "+
opts.ExtraRunArgs+" --ramp=%s --duration=%s --prometheus-port=%d --pprofport=%d %s %s",
Expand All @@ -260,8 +277,10 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio
}
m.Wait()

c.Run(ctx, workloadNode, fmt.Sprintf(
"./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses))
if !opts.SkipPostRunCheck {
c.Run(ctx, workloadNode, fmt.Sprintf(
"./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses))
}

// Check no errors from metrics.
if ep != nil {
Expand Down Expand Up @@ -1447,15 +1466,21 @@ func setupPrometheusForRoachtest(
}
}
if c.IsLocal() {
t.Skip("skipping test as prometheus is needed, but prometheus does not yet work locally")
t.Status("ignoring prometheus setup given --local was specified")
return nil, func() {}
}

if err := c.StartGrafana(ctx, t.L(), cfg); err != nil {
t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute))

quietLogger, err := t.L().ChildLogger("start-grafana", logger.QuietStdout, logger.QuietStderr)
if err != nil {
t.Fatal(err)
}
if err := c.StartGrafana(ctx, quietLogger, cfg); err != nil {
t.Fatal(err)
}
cleanupFunc := func() {
if err := c.StopGrafana(ctx, t.L(), t.ArtifactsDir()); err != nil {
if err := c.StopGrafana(ctx, quietLogger, t.ArtifactsDir()); err != nil {
t.L().ErrorfCtx(ctx, "Error(s) shutting down prom/grafana %s", err)
}
}
Expand Down
Loading

0 comments on commit 516293f

Please sign in to comment.