Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113703: streamingccl: fix cross cluster replication bug r=adityamaru a=adityamaru

This change does a couple of things:

- It introduces a cluster setting to enable replication
of span configs from the source, system tenant to the
destination, system tenant. This settings is default true.
This is a new feature in 23.2 and so it seems like a good
idea to have a killswitch.

- It special cases `pgcode.UndefinedFunction` error that
is thrown if the source cluster is on a pre-23.2 version
that does not recognize `crdb_internal.setup_span_configs_stream($1)`.
In case of this error, we simply skip replicating the span
configs. This is meant to be a short term, backportable solution.

Fixes: #113682

Release note (bug fix): fixes a bug where replicating from
a source cluster that is on an older version (<23.2) than the destination
cluster would fail because of an undefined builtin function

113706: roachtest: port tpcc/mixed-headroom to the new framework r=renatolabs,RaduBerinde a=DarrylWong

Note that the new framework randomizes how many version upgrades occur. This removes the need for both a single upgrade and multiple upgrade test and the two were merged.

Release note: None
Fixes: #110537

Co-authored-by: adityamaru <[email protected]>
Co-authored-by: DarrylWong <[email protected]>
  • Loading branch information
3 people committed Nov 10, 2023
3 parents 9a3b101 + 721641a + 8cdcf77 commit 194151c
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 223 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/streamingccl/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,14 @@ var DumpFrontierEntries = settings.RegisterDurationSetting(
0,
settings.NonNegativeDuration,
)

// ReplicateSpanConfigsEnabled controls whether we replicate span
// configurations from the source system tenant to the destination system
// tenant.
var ReplicateSpanConfigsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.consumer.span_configs.enabled",
"controls whether we replicate span configurations from the source system tenant to the "+
"destination system tenant",
true,
)
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
Expand All @@ -39,6 +40,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgx_v4//:pgx",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ var _ Subscription = (*partitionedStreamSubscription)(nil)

// Subscribe implements the Subscription interface.
func (p *partitionedStreamSubscription) Subscribe(ctx context.Context) error {
ctx, sp := tracing.ChildSpan(ctx, "Subscription.Subscribe")
ctx, sp := tracing.ChildSpan(ctx, "partitionedStreamSubscription.Subscribe")
defer sp.Finish()

defer close(p.eventsChan)
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/streamingccl/streamclient/span_config_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
)

Expand Down Expand Up @@ -129,7 +131,7 @@ var _ Subscription = (*spanConfigStreamSubscription)(nil)

// Subscribe implements the Subscription interface.
func (p *spanConfigStreamSubscription) Subscribe(ctx context.Context) error {
ctx, sp := tracing.ChildSpan(ctx, "Subscription.Subscribe")
ctx, sp := tracing.ChildSpan(ctx, "spanConfigStreamSubscription.Subscribe")
defer sp.Finish()

defer close(p.eventsChan)
Expand All @@ -152,6 +154,16 @@ func (p *spanConfigStreamSubscription) Subscribe(ctx context.Context) error {
defer cancelFunc()
rows, err := srcConn.Query(cancelCtx, `SELECT crdb_internal.setup_span_configs_stream($1)`, p.tenantName)
if err != nil {
// TODO(adityamaru): This is a short term fix for https://github.com/cockroachdb/cockroach/issues/113682
// to allow for a < 23.2 source cluster to replicate into a 23.2 destination
// cluster. In the long term we will want to use the source cluster's
// cluster version to gate features on the destination.
if pgErr := (*pgconn.PgError)(nil); errors.As(err, &pgErr) {
if pgcode.MakeCode(pgErr.Code) == pgcode.UndefinedFunction {
log.Warningf(ctx, "source cluster is running a version < 23.2, skipping span config replication: %v", err)
return nil
}
}
return err
}
// For close to return, the ctx passed to the query above must be cancelled.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func startDistIngestion(

spanConfigIngestStopper := make(chan struct{})
streamSpanConfigs := func(ctx context.Context) error {
if !streamingccl.ReplicateSpanConfigsEnabled.Get(&execCtx.ExecCfg().Settings.SV) {
log.Warningf(ctx, "span config replication is disabled")
return nil
}
if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.SkipSpanConfigReplication {
return nil
}
Expand Down
238 changes: 66 additions & 172 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/search"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
Expand Down Expand Up @@ -351,63 +350,12 @@ func maxSupportedTPCCWarehouses(
return warehouses
}

type backgroundFn func(ctx context.Context, u *versionUpgradeTest) error

// A backgroundStepper is a tool to run long-lived commands while a cluster is
// going through a sequence of version upgrade operations.
// It exposes a `launch` step that launches the method carrying out long-running
// work (in the background) and a `stop` step collecting any errors.
type backgroundStepper struct {
// This is the operation that will be launched in the background. When the
// context gets canceled, it should shut down and return without an error.
// The way to typically get this is:
//
// err := doSomething(ctx)
// ctx.Err() != nil {
// return nil
// }
// return err
run backgroundFn
// When not nil, called with the error within `.stop()`. The interceptor
// gets a chance to ignore the error or produce a different one (via t.Fatal).
onStop func(context.Context, test.Test, *versionUpgradeTest, error)
nodes option.NodeListOption // nodes to monitor, defaults to c.All()

// Internal.
m cluster.Monitor
}

// launch spawns the function the background step was initialized with.
func (s *backgroundStepper) launch(ctx context.Context, t test.Test, u *versionUpgradeTest) {
nodes := s.nodes
if nodes == nil {
nodes = u.c.All()
}
s.m = u.c.NewMonitor(ctx, nodes)
s.m.Go(func(ctx context.Context) error {
return s.run(ctx, u)
})
}

func (s *backgroundStepper) wait(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// We don't care about the workload failing since we only use it to produce a
// few `RESTORE` jobs. And indeed workload will fail because it does not
// tolerate pausing of its jobs.
err := s.m.WaitE()
if s.onStop != nil {
s.onStop(ctx, t, u, err)
} else if err != nil {
t.Fatal(err)
}
}

// runTPCCMixedHeadroom runs a mixed-version test that imports a large
// `bank` dataset, and runs one or multiple database upgrades while a
// TPCC workload is running. The number of database upgrades is
// controlled by the `versionsToUpgrade` parameter.
func runTPCCMixedHeadroom(
ctx context.Context, t test.Test, c cluster.Cluster, versionsToUpgrade int,
) {
// `bank` dataset, and runs multiple database upgrades while a TPCC
// workload is running. The number of database upgrades is randomized
// by the mixed-version framework which chooses a random predecessor version
// and upgrades until it reaches the current version.
func runTPCCMixedHeadroom(ctx context.Context, t test.Test, c cluster.Cluster) {
crdbNodes := c.Range(1, c.Spec().NodeCount-1)
workloadNode := c.Node(c.Spec().NodeCount)

Expand All @@ -417,26 +365,6 @@ func runTPCCMixedHeadroom(
headroomWarehouses = 10
}

// We'll need this below.
tpccBackgroundStepper := func(duration time.Duration) backgroundStepper {
return backgroundStepper{
nodes: crdbNodes,
run: func(ctx context.Context, u *versionUpgradeTest) error {
t.L().Printf("running background TPCC workload for %s", duration)
runTPCC(ctx, t, c, tpccOptions{
Warehouses: headroomWarehouses,
Duration: duration,
SetupType: usingExistingData,
Start: func(ctx context.Context, t test.Test, c cluster.Cluster) {
// Noop - we don't let tpcc upload or start binaries in this test.
},
})
return nil
}}
}

randomCRDBNode := func() int { return crdbNodes.RandNode()[0] }

// NB: this results in ~100GB of (actual) disk usage per node once things
// have settled down, and ~7.5k ranges. The import takes ~40 minutes.
// The full 6.5m import ran into out of disk errors (on 250gb machines),
Expand All @@ -446,91 +374,74 @@ func runTPCCMixedHeadroom(
bankRows = 1000
}

rng, seed := randutil.NewLockedPseudoRand()
t.L().Printf("using random seed %d", seed)
history, err := release.RandomPredecessorHistory(rng, t.BuildVersion(), versionsToUpgrade)
if err != nil {
t.Fatal(err)
}
sep := " -> "
t.L().Printf("testing upgrade: %s%scurrent", strings.Join(history, sep), sep)
releases := make([]*clusterupgrade.Version, 0, len(history))
for _, v := range history {
releases = append(releases, clusterupgrade.MustParseVersion(v))
}
releases = append(releases, clusterupgrade.CurrentVersion())
mvt := mixedversion.NewTest(ctx, t, t.L(), c, crdbNodes)

waitForWorkloadToRampUp := sleepStep(rampDuration(c.IsLocal()))
logStep := func(format string, args ...interface{}) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.L().Printf(format, args...)
}
importTPCC := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := tpccImportCmdWithCockroachBinary(test.DefaultCockroachPath, headroomWarehouses, fmt.Sprintf("{pgurl%s}", randomNode))
return c.RunE(ctx, randomNode, cmd)
}

oldestVersion := releases[0]
setupSteps := []versionStep{
logStep("starting from fixture at version %s", oldestVersion),
uploadAndStartFromCheckpointFixture(crdbNodes, oldestVersion),
waitForUpgradeStep(crdbNodes), // let oldest version settle (gossip etc)
uploadVersionStep(workloadNode, clusterupgrade.CurrentVersion()), // for tpccBackgroundStepper's workload

// Load TPCC dataset, don't run TPCC yet. We do this while in the
// version we are starting with to load some data and hopefully
// create some state that will need work by long-running
// migrations.
importTPCCStep(oldestVersion, headroomWarehouses, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBankStep(oldestVersion, bankRows, crdbNodes),
// Add a lot of cold data to this cluster. This further stresses the version
// upgrade machinery, in which a) all ranges are touched and b) work proportional
// to the amount data may be carried out.
importLargeBank := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload fixtures import bank", test.DefaultCockroachPath)).
Arg("{pgurl%s}", randomNode).
Flag("payload-bytes", 10240).
Flag("rows", bankRows).
Flag("seed", 4).
Flag("db", "bigbank").
String()
return c.RunE(ctx, randomNode, cmd)
}

// upgradeToVersionSteps returns the list of steps to be performed
// when upgrading to the given version.
upgradeToVersionSteps := func(crdbVersion *clusterupgrade.Version) []versionStep {
duration := 10 * time.Minute
if crdbVersion.IsCurrent() {
duration = 100 * time.Minute
}
tpccWorkload := tpccBackgroundStepper(duration)

return []versionStep{
logStep("upgrading to version %q", crdbVersion.String()),
preventAutoUpgradeStep(randomCRDBNode()),
// Upload and restart cluster into the new
// binary (stays at previous cluster version).
binaryUpgradeStep(crdbNodes, crdbVersion),
// Now start running TPCC in the background.
tpccWorkload.launch,
// Wait for the workload to ramp up before attemping to
// upgrade the cluster version. If we start the migrations
// immediately after launching the tpcc workload above, they
// could finish "too quickly", before the workload had a
// chance to pick up the pace (starting all the workers, range
// merge/splits, compactions, etc). By waiting here, we
// increase the concurrency exposed to the upgrade migrations,
// and increase the chances of exposing bugs (such as #83079).
waitForWorkloadToRampUp,
// While tpcc is running in the background, bump the cluster
// version manually. We do this over allowing automatic upgrades
// to get a better idea of what errors come back here, if any.
// This will block until the long-running migrations have run.
allowAutoUpgradeStep(randomCRDBNode()),
waitForUpgradeStep(crdbNodes),
// Wait until TPCC background run terminates
// and fail if it reports an error.
tpccWorkload.wait,
// We don't run this in the background using the Workload() wrapper. We want
// it to block and wait for the workload to ramp up before attempting to upgrade
// the cluster version. If we start the migrations immediately after launching
// the tpcc workload, they could finish "too quickly", before the workload had
// a chance to pick up the pace (starting all the workers, range merge/splits,
// compactions, etc). By waiting here, we increase the concurrency exposed to
// the upgrade migrations, and increase the chances of exposing bugs (such as #83079).
runTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
workloadDur := 10 * time.Minute
rampDur := rampDuration(c.IsLocal())
// If migrations are running we want to ramp up the workload faster in order
// to expose them to more concurrent load. In a similar goal, we also let the
// TPCC workload run longer.
if h.Context().Finalizing && !c.IsLocal() {
rampDur = 1 * time.Minute
if h.Context().ToVersion.IsCurrent() {
workloadDur = 100 * time.Minute
}
}
cmd := roachtestutil.NewCommand("./cockroach workload run tpcc").
Arg("{pgurl%s}", crdbNodes).
Flag("duration", workloadDur).
Flag("warehouses", headroomWarehouses).
Flag("histograms", t.PerfArtifactsDir()+"/stats.json").
Flag("ramp", rampDur).
Flag("prometheus-port", 2112).
Flag("pprofport", workloadPProfStartPort).
String()
return c.RunE(ctx, workloadNode, cmd)
}

// Test steps consist of the setup steps + the upgrade steps for
// each upgrade being carried out here.
testSteps := append([]versionStep{}, setupSteps...)
for _, nextVersion := range releases[1:] {
testSteps = append(testSteps, upgradeToVersionSteps(nextVersion)...)
checkTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload check tpcc", test.DefaultCockroachPath)).
Arg("{pgurl:1}").
Flag("warehouses", headroomWarehouses).
String()
return c.RunE(ctx, workloadNode, cmd)
}

newVersionUpgradeTest(c, testSteps...).run(ctx, t)
uploadVersion(ctx, t, c, workloadNode, clusterupgrade.CurrentVersion())
mvt.OnStartup("load TPCC dataset", importTPCC)
mvt.OnStartup("load bank dataset", importLargeBank)
mvt.InMixedVersion("TPCC workload", runTPCCWorkload)
mvt.AfterUpgradeFinalized("check TPCC workload", checkTPCCWorkload)
mvt.Run()
}

func registerTPCC(r registry.Registry) {
Expand Down Expand Up @@ -576,27 +487,10 @@ func registerTPCC(r registry.Registry) {
Cluster: mixedHeadroomSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 1)
runTPCCMixedHeadroom(ctx, t, c)
},
})

// N.B. Multiple upgrades may require a released version < 22.2.x, which wasn't built for ARM64.
mixedHeadroomMultiUpgradesSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs(), spec.Arch(vm.ArchAMD64))

r.Add(registry.TestSpec{
// run the same mixed-headroom test, but going back two versions
Name: "tpcc/mixed-headroom/multiple-upgrades/" + mixedHeadroomMultiUpgradesSpec.String(),
Timeout: 5 * time.Hour,
Owner: registry.OwnerTestEng,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Tags: registry.Tags(`default`),
Cluster: mixedHeadroomMultiUpgradesSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, 2)
},
})
r.Add(registry.TestSpec{
Name: "tpcc-nowait/nodes=3/w=1",
Owner: registry.OwnerTestEng,
Expand Down
Loading

0 comments on commit 194151c

Please sign in to comment.