From 0ef2e3fd2c30e9a9b94ca0a0b334b55b3a58b824 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 14 Aug 2023 21:35:14 +0000 Subject: [PATCH] roachtest/cdc/mixed-versions: use mixed version framework This change updates the `cdc/mixed-versions` roachtest to use the mixed version testing framework. This mixed version testing framework is better than the previous framework because it offers testing for multiple upgrades. It will also make it easier to maintain and expand this cdc-specific test. Informs: #107451 Epic: None Release note: None --- pkg/cmd/roachtest/tests/cdc.go | 4 +- pkg/cmd/roachtest/tests/mixed_version_cdc.go | 540 ++++++++++--------- 2 files changed, 286 insertions(+), 258 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 741979c71be4..056c192430ec 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -641,7 +641,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - tc, err := kafka.consumer(ctx, "bank") + tc, err := kafka.newConsumer(ctx, "bank") if err != nil { t.Fatal(errors.Wrap(err, "could not create kafka consumer")) } @@ -2242,7 +2242,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error { }) } -func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) { +func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() // I was seeing "error processing FetchRequest: kafka: error decoding diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 25f6d72f5ca9..9d9eeadd30e1 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -14,23 +14,27 @@ import ( "context" gosql "database/sql" "fmt" + "math/rand" "strconv" "strings" "time" + "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" - "github.com/cockroachdb/cockroach/pkg/testutils/release" - "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" ) const ( @@ -40,7 +44,11 @@ const ( // resolvedInterval is the value passed to the `resolved` option // when creating the changefeed - resolvedInterval = "10s" + resolvedInterval = "5s" + + // kafkaBufferMessageSize is the number of messages from kafka + // we allow to be buffered in memory before validating them. + kafkaBufferMessageSize = 8192 ) var ( @@ -74,7 +82,7 @@ func registerCDCMixedVersions(r registry.Registry) { Timeout: timeout, RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runCDCMixedVersions(ctx, t, c, t.BuildVersion()) + runCDCMixedVersions(ctx, t, c) }, }) } @@ -84,21 +92,43 @@ func registerCDCMixedVersions(r registry.Registry) { // events, and run validations that ensure changefeeds work during and // after upgrade. type cdcMixedVersionTester struct { - ctx context.Context - crdbNodes option.NodeListOption - workloadNodes option.NodeListOption - kafkaNodes option.NodeListOption - monitor cluster.Monitor + ctx context.Context + + c cluster.Cluster + + crdbNodes option.NodeListOption + workloadNodes option.NodeListOption + kafkaNodes option.NodeListOption + + // This channel is closed after the workload has been initialized. + workloadInit chan struct{} + timestampsResolved struct { + latest hlc.Timestamp syncutil.Mutex - C chan struct{} + C chan hlc.Timestamp + } + + kafka struct { + manager kafkaManager + + // buf is used to buffer messages received from kafka to be sent + // to the validator and validated when the cluster is stable (ie. no node restarts in progress). + // + // Typically, messages would be immediately forwared messages to the validator, + // but we cannot do that right when a message is received because + // some nodes may be offline/upgrading at the time (the validator + // relies on having a stable DB connection to note the rows and fingerprint). + mu struct { + syncutil.Mutex + buf []*sarama.ConsumerMessage + } + + consumer *topicConsumer } - crdbUpgrading syncutil.Mutex - kafka kafkaManager - validator *cdctest.CountValidator - testFinished bool - validatorFinished chan struct{} - cleanup func() + + validator *cdctest.CountValidator + fprintV *cdctest.FingerprintValidator } func newCDCMixedVersionTester( @@ -111,304 +141,302 @@ func newCDCMixedVersionTester( c.Put(ctx, t.DeprecatedWorkload(), "./workload", lastNode) return cdcMixedVersionTester{ - ctx: ctx, - crdbNodes: crdbNodes, - workloadNodes: lastNode, - kafkaNodes: lastNode, - monitor: c.NewMonitor(ctx, crdbNodes), - validatorFinished: make(chan struct{}), + ctx: ctx, + c: c, + crdbNodes: crdbNodes, + workloadNodes: lastNode, + kafkaNodes: lastNode, + workloadInit: make(chan struct{}), } } // StartKafka will install and start Kafka on the configured node. It -// will also create the topic where changefeed events will be sent. -func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) { +// will also create the topic where changefeed events will be sent and a consumer. +// Returns a function to clean up the consumer and kafka resources. +func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (cleanup func()) { t.Status("starting Kafka node") - cmvt.kafka, cmvt.cleanup = setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) - if err := cmvt.kafka.createTopic(cmvt.ctx, targetTable); err != nil { + + manager, tearDown := setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) + if err := manager.createTopic(cmvt.ctx, targetTable); err != nil { t.Fatal(err) } -} + cmvt.kafka.manager = manager -// Cleanup is supposed to be called at the end of tests that use -// `cdcMixedVersionTester` -func (cmvt *cdcMixedVersionTester) Cleanup() { - if cmvt.cleanup != nil { - cmvt.cleanup() + consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable) + if err != nil { + t.Fatal(err) } -} -// installAndStartWorkload starts a bank workload asynchronously -func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("installing and running workload") - u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}") - cmvt.monitor.Go(func(ctx context.Context) error { - return u.c.RunE( - ctx, - cmvt.workloadNodes, - fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", - cmvt.crdbNodes), - ) - }) + cmvt.kafka.manager = manager + cmvt.kafka.mu.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + cmvt.kafka.consumer = consumer + + return func() { + cmvt.kafka.consumer.Close() + tearDown() } } -// waitForResolvedTimestamps waits for the underlying CDC verifier to -// resolve `resolvedTimestampsPerState` -func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status(fmt.Sprintf("waiting for %d resolved timestamps", resolvedTimestampsPerState)) - // create a new channel for the resolved timestamps, allowing any - // new resolved timestamps to be captured and account for in the - // loop below - func() { - cmvt.timestampsResolved.Lock() - defer cmvt.timestampsResolved.Unlock() - - cmvt.timestampsResolved.C = make(chan struct{}) - }() - - var resolved int - for resolved < resolvedTimestampsPerState { - select { - case <-cmvt.timestampsResolved.C: - resolved++ - t.L().Printf("%d of %d timestamps resolved", resolved, resolvedTimestampsPerState) - case <-ctx.Done(): - return +// waitAndValidate waits for a few resolved timestamps to assert +// that the changefeed is running and ensure that there is some data available +// to validate. Then, it returns the result of `cmvt.validate`. +func (cmvt *cdcMixedVersionTester) waitAndValidate( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + l.Printf("waiting for %d resolved timestamps", resolvedTimestampsPerState) + // create a new channel for the resolved timestamps, allowing any + // new resolved timestamps to be captured and account for in the + // loop below + func() { + cmvt.timestampsResolved.Lock() + defer cmvt.timestampsResolved.Unlock() + + cmvt.timestampsResolved.C = make(chan hlc.Timestamp) + }() + + var numResolved int + for numResolved < resolvedTimestampsPerState { + select { + case resolvedTs := <-cmvt.timestampsResolved.C: + if !cmvt.timestampsResolved.latest.Less(resolvedTs) { + return errors.Newf("expected resolved timestamp %s to be greater than previous "+ + " resolved timestamp %s", resolvedTs, cmvt.timestampsResolved.latest) } + cmvt.timestampsResolved.latest = resolvedTs + numResolved++ + l.Printf("%d of %d timestamps resolved", numResolved, resolvedTimestampsPerState) + case <-ctx.Done(): + return ctx.Err() } + } - // set the resolved timestamps channel back to `nil`; while in - // this state, any new resolved timestamps will be ignored - func() { - cmvt.timestampsResolved.Lock() - defer cmvt.timestampsResolved.Unlock() + // set the resolved timestamps channel back to `nil`; while in + // this state, any new resolved timestamps will be ignored + func() { + cmvt.timestampsResolved.Lock() + defer cmvt.timestampsResolved.Unlock() - cmvt.timestampsResolved.C = nil - }() - } + cmvt.timestampsResolved.C = nil + }() + + return cmvt.validate(ctx, l, r, h) } -// finishTest marks the test as finished which will then prompt the -// changefeed validator loop to return. This step will block until the -// validator has finished; this is done to avoid the possibility of -// the test closing the database connection while the validator is -// running a query. -func (cmvt *cdcMixedVersionTester) finishTest() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.L().Printf("waiting for background tasks to finish") - cmvt.testFinished = true - <-cmvt.validatorFinished +// setupValidator creates a CDC validator to validate that a changefeed +// created on the target table is able to re-create the table +// somewhere else. It can also verify changefeed ordering guarantees. +func (cmvt *cdcMixedVersionTester) setupValidator( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + tableName := fmt.Sprintf("%s.%s", targetDB, targetTable) + if err := h.Exec(r, "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)"); err != nil { + return err + } + + // The fingerprint validator will save this db connection and use it + // when we submit rows for validation. This can be changed later using + // `(*FingerprintValidator) DBFunc`. + _, db := h.RandomDB(r, cmvt.crdbNodes) + fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, + cmvt.kafka.consumer.partitions, 0) + if err != nil { + return err + } + validators := cdctest.Validators{ + cdctest.NewOrderValidator(tableName), + fprintV, } + + cmvt.validator = cdctest.MakeCountValidator(validators) + cmvt.fprintV = fprintV + return nil } -// setupVerifier creates a CDC validator to validate that a changefeed -// created on the `target` table is able to re-create the table -// somewhere else. It also verifies CDC's ordering guarantees. This -// step will not block, but will start the verifier in a separate Go -// routine. Use `waitForVerifier` to wait for the verifier to finish. -func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - tableName := targetDB + "." + targetTable - t.Status(fmt.Sprintf("setting up changefeed verifier for table %s", tableName)) - - // we could just return the error here and let `Wait` return the - // error. However, calling t.Fatal directly lets us stop the test - // earlier - cmvt.monitor.Go(func(ctx context.Context) error { - defer close(cmvt.validatorFinished) - consumer, err := cmvt.kafka.consumer(ctx, targetTable) +// runKafkaConsumer continuously reads from the kafka consumer and places +// messages in a buffer. It expects to be run in a goroutine and does not +// rely on any DB connections. +func (cmvt *cdcMixedVersionTester) runKafkaConsumer( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + everyN := log.Every(30 * time.Second) + + // This runs until the test finishes, which will be signaled via + // context cancellation. We rely on consumer.Next() to check + // the context. + for { + m := cmvt.kafka.consumer.Next(ctx) + if m == nil { + // this is expected to happen once the test has finished and + // Kafka is being shut down. If it happens in the middle of + // the test, it will eventually time out, and this message + // should allow us to see that the validator finished + // earlier than it should have + l.Printf("end of changefeed") + return nil + } + + // Forward resolved timetsamps to "heartbeat" that the changefeed is running. + if len(m.Key) <= 0 { + _, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) if err != nil { - t.Fatal(err) + return errors.Wrap(err, "failed to parse timestamps from message") } - defer consumer.Close() + cmvt.timestampResolved(resolved) - db := u.conn(ctx, t, node) - if _, err := db.Exec( - "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)", - ); err != nil { - t.Fatal(err) + if everyN.ShouldLog() { + l.Printf("latest resolved timestamp %s behind realtime", timeutil.Since(resolved.GoTime()).String()) } + } - getConn := func(node int) *gosql.DB { return u.conn(ctx, t, node) } - fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0) - if err != nil { - t.Fatal(err) + if err := func() error { + cmvt.kafka.mu.Lock() + defer cmvt.kafka.mu.Unlock() + + if len(cmvt.kafka.mu.buf) == kafkaBufferMessageSize { + return errors.Newf("kafka message buffer limit %d exceeded", kafkaBufferMessageSize) } - fprintV.DBFunc(cmvt.cdcDBConn(getConn)) - validators := cdctest.Validators{ - cdctest.NewOrderValidator(tableName), - fprintV, + cmvt.kafka.mu.buf = append(cmvt.kafka.mu.buf, m) + return nil + }(); err != nil { + return err + } + } +} + +// validate reads rows from the kafka buffer and submits them to the validator +// to be validated. +func (cmvt *cdcMixedVersionTester) validate( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + // Choose a random node to run the validation on. + n, db := h.RandomDB(r, cmvt.crdbNodes) + l.Printf("running validation on node %d", n) + cmvt.fprintV.DBFunc(func(f func(*gosql.DB) error) error { + return f(db) + }) + + cmvt.kafka.mu.Lock() + defer cmvt.kafka.mu.Unlock() + + for _, m := range cmvt.kafka.mu.buf { + updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return errors.Wrap(err, "failed to parse timestamps from message") + } + + partitionStr := strconv.Itoa(int(m.Partition)) + if len(m.Key) > 0 { + if err := cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { + return err } - cmvt.validator = cdctest.MakeCountValidator(validators) - - for !cmvt.testFinished { - m := consumer.Next(ctx) - if m == nil { - // this is expected to happen once the test has finished and - // Kafka is being shut down. If it happens in the middle of - // the test, it will eventually time out, and this message - // should allow us to see that the validator finished - // earlier than it should have - t.L().Printf("end of changefeed") - return nil - } - - updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) - if err != nil { - t.Fatal(err) - } - - partitionStr := strconv.Itoa(int(m.Partition)) - if len(m.Key) > 0 { - if err := cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { - t.Fatal(err) - } - } else { - if err := cmvt.validator.NoteResolved(partitionStr, resolved); err != nil { - t.Fatal(err) - } - - t.L().Printf("%d resolved timestamps validated, latest is %s behind realtime", - cmvt.validator.NumResolvedWithRows, timeutil.Since(resolved.GoTime())) - cmvt.timestampResolved() - } + } else { + if err := cmvt.validator.NoteResolved(partitionStr, resolved); err != nil { + return err } - return nil - }) + l.Printf("%d resolved timestamps validated", cmvt.validator.NumResolvedWithRows) + } + + if failures := cmvt.validator.Failures(); len(failures) > 0 { + return errors.Newf("validator failures:\n%s", strings.Join(failures, "\n")) + } } + + cmvt.kafka.mu.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + return nil } // timestampsResolved updates the underlying channel if set (i.e., if // we are waiting for resolved timestamps events) -func (cmvt *cdcMixedVersionTester) timestampResolved() { +func (cmvt *cdcMixedVersionTester) timestampResolved(resolved hlc.Timestamp) { cmvt.timestampsResolved.Lock() defer cmvt.timestampsResolved.Unlock() if cmvt.timestampsResolved.C != nil { - cmvt.timestampsResolved.C <- struct{}{} + cmvt.timestampsResolved.C <- resolved } } -// cdcDBConn is the wrapper passed to the FingerprintValidator. The -// goal is to ensure that database checks by the validator do not -// happen while we are running an upgrade. We used to retry database -// calls in the validator, but that logic adds complexity and does not -// help in testing the changefeed's correctness -func (cmvt *cdcMixedVersionTester) cdcDBConn( - getConn func(int) *gosql.DB, -) func(func(*gosql.DB) error) error { - return func(f func(*gosql.DB) error) error { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() - - node := cmvt.crdbNodes.RandNode()[0] - return f(getConn(node)) +// createChangeFeed issues a call to the given node to create a change +// feed for the target table. +func (cmvt *cdcMixedVersionTester) createChangeFeed( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + // Wait for workload to be initialized so the target table exists. + select { + case <-ctx.Done(): + return ctx.Err() + case <-cmvt.workloadInit: } -} - -// crdbUpgradeStep is a wrapper to steps that upgrade the cockroach -// binary running in the cluster. It makes sure we hold exclusive -// access to the `crdbUpgrading` lock while the upgrade is in process -func (cmvt *cdcMixedVersionTester) crdbUpgradeStep(step versionStep) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() + node, db := h.RandomDB(r, cmvt.crdbNodes) + l.Printf("starting changefeed on node %d", node) - step(ctx, t, u) + options := map[string]string{ + "updated": "", + "resolved": fmt.Sprintf("'%s'", resolvedInterval), } -} -// assertValid checks if the validator has found any issues at the -// time the function is called. -func (cmvt *cdcMixedVersionTester) assertValid() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - if failures := cmvt.validator.Failures(); len(failures) > 0 { - t.Fatalf("validator failures:\n%s", strings.Join(failures, "\n")) - } + jobID, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), + cmvt.kafka.manager.sinkURL(ctx)). + With(options). + Create() + if err != nil { + return err } + l.Printf("created changefeed job %d", jobID) + return nil } -// createChangeFeed issues a call to the given node to create a change -// feed for the target table. -func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("creating changefeed") - db := u.conn(ctx, t, node) - - options := map[string]string{ - "updated": "", - "resolved": fmt.Sprintf("'%s'", resolvedInterval), - } - _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). - With(options). - Create() - if err != nil { - t.Fatal(err) - } - } +// runWorkloadCmd returns the command that runs the workload. +func (cmvt *cdcMixedVersionTester) runWorkloadCmd(r *rand.Rand) *roachtestutil.Command { + return roachtestutil.NewCommand("%s workload run bank", test.DefaultCockroachPath). + // Since all rows are placed in a buffer, setting a low rate of 2 operations / sec + // helps ensure that we don't exceed the buffer capacity. + Flag("max-rate", 2). + Flag("seed", r.Int63()). + Arg("{pgurl%s}", cmvt.crdbNodes). + Option("tolerate-errors") } -func runCDCMixedVersions( - ctx context.Context, t test.Test, c cluster.Cluster, buildVersion *version.Version, -) { - predecessorVersion, err := release.LatestPredecessor(buildVersion) - if err != nil { - t.Fatal(err) - } - - tester := newCDCMixedVersionTester(ctx, t, c) - tester.StartKafka(t, c) - defer tester.Cleanup() +// initWorkload synchronously initializes the workload. +func (cmvt *cdcMixedVersionTester) initWorkload( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + bankInit := roachtestutil.NewCommand("%s workload init bank", test.DefaultCockroachPath). + Flag("seed", r.Int63()). + Arg("{pgurl%s}", cmvt.crdbNodes) - rng, seed := randutil.NewPseudoRand() - t.L().Printf("random seed: %d", seed) - - // sqlNode returns the node to be used when sending SQL statements - // during this test. It is randomized, but the random seed is logged - // above. - sqlNode := func() int { - return tester.crdbNodes[rng.Intn(len(tester.crdbNodes))] + if err := cmvt.c.RunE(ctx, option.NodeListOption{h.RandomNode(r, cmvt.workloadNodes)}, bankInit.String()); err != nil { + return err } + close(cmvt.workloadInit) + return nil +} - newVersionUpgradeTest(c, - uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion), - tester.setupVerifier(sqlNode()), - tester.installAndStartWorkload(), - waitForUpgradeStep(tester.crdbNodes), - - // NB: at this point, cluster and binary version equal predecessorVersion, - // and auto-upgrades are on. - preventAutoUpgradeStep(sqlNode()), - tester.createChangeFeed(sqlNode()), +func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) { + tester := newCDCMixedVersionTester(ctx, t, c) - tester.waitForResolvedTimestamps(), - // Roll the nodes into the new version one by one in random order - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), - // let the workload run in the new version for a while - tester.waitForResolvedTimestamps(), + // NB: We rely on the testing framework to choose a random predecessor + // to upgrade from. + mvt := mixedversion.NewTest(ctx, t, t.L(), c, tester.crdbNodes) - tester.assertValid(), + cleanupKafka := tester.StartKafka(t, c) + defer cleanupKafka() - // Roll back again, which ought to be fine because the cluster upgrade was - // not finalized. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)), - tester.waitForResolvedTimestamps(), + // Register hooks. + mvt.OnStartup("start changefeed", tester.createChangeFeed) + mvt.OnStartup("create validator", tester.setupValidator) + mvt.OnStartup("init workload", tester.initWorkload) - tester.assertValid(), + runWorkloadCmd := tester.runWorkloadCmd(mvt.RNG()) + _ = mvt.BackgroundCommand("run workload", tester.workloadNodes, runWorkloadCmd) + _ = mvt.BackgroundFunc("run kafka consumer", tester.runKafkaConsumer) - // Roll nodes forward and finalize upgrade. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), + // NB: mvt.InMixedVersion will run these hooks multiple times at various points during the rolling upgrade, but + // not when any nodes are offline. This is important because the validator relies on a db connection. + mvt.InMixedVersion("wait and validate", tester.waitAndValidate) - // allow cluster version to update - allowAutoUpgradeStep(sqlNode()), - waitForUpgradeStep(tester.crdbNodes), + mvt.AfterUpgradeFinalized("wait and validate", tester.waitAndValidate) - tester.waitForResolvedTimestamps(), - tester.finishTest(), - tester.assertValid(), - ).run(ctx, t) + mvt.Run() }