diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 4d49c595ccf8..b18b11c7de56 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -641,7 +641,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - tc, err := kafka.consumer(ctx, "bank") + tc, err := kafka.newConsumer(ctx, "bank") if err != nil { t.Fatal(errors.Wrap(err, "could not create kafka consumer")) } @@ -2242,7 +2242,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error { }) } -func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) { +func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() // I was seeing "error processing FetchRequest: kafka: error decoding diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 25f6d72f5ca9..9d9eeadd30e1 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -14,23 +14,27 @@ import ( "context" gosql "database/sql" "fmt" + "math/rand" "strconv" "strings" "time" + "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" - "github.com/cockroachdb/cockroach/pkg/testutils/release" - "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" ) const ( @@ -40,7 +44,11 @@ const ( // resolvedInterval is the value passed to the `resolved` option // when creating the changefeed - resolvedInterval = "10s" + resolvedInterval = "5s" + + // kafkaBufferMessageSize is the number of messages from kafka + // we allow to be buffered in memory before validating them. + kafkaBufferMessageSize = 8192 ) var ( @@ -74,7 +82,7 @@ func registerCDCMixedVersions(r registry.Registry) { Timeout: timeout, RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runCDCMixedVersions(ctx, t, c, t.BuildVersion()) + runCDCMixedVersions(ctx, t, c) }, }) } @@ -84,21 +92,43 @@ func registerCDCMixedVersions(r registry.Registry) { // events, and run validations that ensure changefeeds work during and // after upgrade. type cdcMixedVersionTester struct { - ctx context.Context - crdbNodes option.NodeListOption - workloadNodes option.NodeListOption - kafkaNodes option.NodeListOption - monitor cluster.Monitor + ctx context.Context + + c cluster.Cluster + + crdbNodes option.NodeListOption + workloadNodes option.NodeListOption + kafkaNodes option.NodeListOption + + // This channel is closed after the workload has been initialized. + workloadInit chan struct{} + timestampsResolved struct { + latest hlc.Timestamp syncutil.Mutex - C chan struct{} + C chan hlc.Timestamp + } + + kafka struct { + manager kafkaManager + + // buf is used to buffer messages received from kafka to be sent + // to the validator and validated when the cluster is stable (ie. no node restarts in progress). + // + // Typically, messages would be immediately forwared messages to the validator, + // but we cannot do that right when a message is received because + // some nodes may be offline/upgrading at the time (the validator + // relies on having a stable DB connection to note the rows and fingerprint). + mu struct { + syncutil.Mutex + buf []*sarama.ConsumerMessage + } + + consumer *topicConsumer } - crdbUpgrading syncutil.Mutex - kafka kafkaManager - validator *cdctest.CountValidator - testFinished bool - validatorFinished chan struct{} - cleanup func() + + validator *cdctest.CountValidator + fprintV *cdctest.FingerprintValidator } func newCDCMixedVersionTester( @@ -111,304 +141,302 @@ func newCDCMixedVersionTester( c.Put(ctx, t.DeprecatedWorkload(), "./workload", lastNode) return cdcMixedVersionTester{ - ctx: ctx, - crdbNodes: crdbNodes, - workloadNodes: lastNode, - kafkaNodes: lastNode, - monitor: c.NewMonitor(ctx, crdbNodes), - validatorFinished: make(chan struct{}), + ctx: ctx, + c: c, + crdbNodes: crdbNodes, + workloadNodes: lastNode, + kafkaNodes: lastNode, + workloadInit: make(chan struct{}), } } // StartKafka will install and start Kafka on the configured node. It -// will also create the topic where changefeed events will be sent. -func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) { +// will also create the topic where changefeed events will be sent and a consumer. +// Returns a function to clean up the consumer and kafka resources. +func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (cleanup func()) { t.Status("starting Kafka node") - cmvt.kafka, cmvt.cleanup = setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) - if err := cmvt.kafka.createTopic(cmvt.ctx, targetTable); err != nil { + + manager, tearDown := setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) + if err := manager.createTopic(cmvt.ctx, targetTable); err != nil { t.Fatal(err) } -} + cmvt.kafka.manager = manager -// Cleanup is supposed to be called at the end of tests that use -// `cdcMixedVersionTester` -func (cmvt *cdcMixedVersionTester) Cleanup() { - if cmvt.cleanup != nil { - cmvt.cleanup() + consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable) + if err != nil { + t.Fatal(err) } -} -// installAndStartWorkload starts a bank workload asynchronously -func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("installing and running workload") - u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}") - cmvt.monitor.Go(func(ctx context.Context) error { - return u.c.RunE( - ctx, - cmvt.workloadNodes, - fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", - cmvt.crdbNodes), - ) - }) + cmvt.kafka.manager = manager + cmvt.kafka.mu.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + cmvt.kafka.consumer = consumer + + return func() { + cmvt.kafka.consumer.Close() + tearDown() } } -// waitForResolvedTimestamps waits for the underlying CDC verifier to -// resolve `resolvedTimestampsPerState` -func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status(fmt.Sprintf("waiting for %d resolved timestamps", resolvedTimestampsPerState)) - // create a new channel for the resolved timestamps, allowing any - // new resolved timestamps to be captured and account for in the - // loop below - func() { - cmvt.timestampsResolved.Lock() - defer cmvt.timestampsResolved.Unlock() - - cmvt.timestampsResolved.C = make(chan struct{}) - }() - - var resolved int - for resolved < resolvedTimestampsPerState { - select { - case <-cmvt.timestampsResolved.C: - resolved++ - t.L().Printf("%d of %d timestamps resolved", resolved, resolvedTimestampsPerState) - case <-ctx.Done(): - return +// waitAndValidate waits for a few resolved timestamps to assert +// that the changefeed is running and ensure that there is some data available +// to validate. Then, it returns the result of `cmvt.validate`. +func (cmvt *cdcMixedVersionTester) waitAndValidate( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + l.Printf("waiting for %d resolved timestamps", resolvedTimestampsPerState) + // create a new channel for the resolved timestamps, allowing any + // new resolved timestamps to be captured and account for in the + // loop below + func() { + cmvt.timestampsResolved.Lock() + defer cmvt.timestampsResolved.Unlock() + + cmvt.timestampsResolved.C = make(chan hlc.Timestamp) + }() + + var numResolved int + for numResolved < resolvedTimestampsPerState { + select { + case resolvedTs := <-cmvt.timestampsResolved.C: + if !cmvt.timestampsResolved.latest.Less(resolvedTs) { + return errors.Newf("expected resolved timestamp %s to be greater than previous "+ + " resolved timestamp %s", resolvedTs, cmvt.timestampsResolved.latest) } + cmvt.timestampsResolved.latest = resolvedTs + numResolved++ + l.Printf("%d of %d timestamps resolved", numResolved, resolvedTimestampsPerState) + case <-ctx.Done(): + return ctx.Err() } + } - // set the resolved timestamps channel back to `nil`; while in - // this state, any new resolved timestamps will be ignored - func() { - cmvt.timestampsResolved.Lock() - defer cmvt.timestampsResolved.Unlock() + // set the resolved timestamps channel back to `nil`; while in + // this state, any new resolved timestamps will be ignored + func() { + cmvt.timestampsResolved.Lock() + defer cmvt.timestampsResolved.Unlock() - cmvt.timestampsResolved.C = nil - }() - } + cmvt.timestampsResolved.C = nil + }() + + return cmvt.validate(ctx, l, r, h) } -// finishTest marks the test as finished which will then prompt the -// changefeed validator loop to return. This step will block until the -// validator has finished; this is done to avoid the possibility of -// the test closing the database connection while the validator is -// running a query. -func (cmvt *cdcMixedVersionTester) finishTest() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.L().Printf("waiting for background tasks to finish") - cmvt.testFinished = true - <-cmvt.validatorFinished +// setupValidator creates a CDC validator to validate that a changefeed +// created on the target table is able to re-create the table +// somewhere else. It can also verify changefeed ordering guarantees. +func (cmvt *cdcMixedVersionTester) setupValidator( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + tableName := fmt.Sprintf("%s.%s", targetDB, targetTable) + if err := h.Exec(r, "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)"); err != nil { + return err + } + + // The fingerprint validator will save this db connection and use it + // when we submit rows for validation. This can be changed later using + // `(*FingerprintValidator) DBFunc`. + _, db := h.RandomDB(r, cmvt.crdbNodes) + fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, + cmvt.kafka.consumer.partitions, 0) + if err != nil { + return err + } + validators := cdctest.Validators{ + cdctest.NewOrderValidator(tableName), + fprintV, } + + cmvt.validator = cdctest.MakeCountValidator(validators) + cmvt.fprintV = fprintV + return nil } -// setupVerifier creates a CDC validator to validate that a changefeed -// created on the `target` table is able to re-create the table -// somewhere else. It also verifies CDC's ordering guarantees. This -// step will not block, but will start the verifier in a separate Go -// routine. Use `waitForVerifier` to wait for the verifier to finish. -func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - tableName := targetDB + "." + targetTable - t.Status(fmt.Sprintf("setting up changefeed verifier for table %s", tableName)) - - // we could just return the error here and let `Wait` return the - // error. However, calling t.Fatal directly lets us stop the test - // earlier - cmvt.monitor.Go(func(ctx context.Context) error { - defer close(cmvt.validatorFinished) - consumer, err := cmvt.kafka.consumer(ctx, targetTable) +// runKafkaConsumer continuously reads from the kafka consumer and places +// messages in a buffer. It expects to be run in a goroutine and does not +// rely on any DB connections. +func (cmvt *cdcMixedVersionTester) runKafkaConsumer( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + everyN := log.Every(30 * time.Second) + + // This runs until the test finishes, which will be signaled via + // context cancellation. We rely on consumer.Next() to check + // the context. + for { + m := cmvt.kafka.consumer.Next(ctx) + if m == nil { + // this is expected to happen once the test has finished and + // Kafka is being shut down. If it happens in the middle of + // the test, it will eventually time out, and this message + // should allow us to see that the validator finished + // earlier than it should have + l.Printf("end of changefeed") + return nil + } + + // Forward resolved timetsamps to "heartbeat" that the changefeed is running. + if len(m.Key) <= 0 { + _, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) if err != nil { - t.Fatal(err) + return errors.Wrap(err, "failed to parse timestamps from message") } - defer consumer.Close() + cmvt.timestampResolved(resolved) - db := u.conn(ctx, t, node) - if _, err := db.Exec( - "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)", - ); err != nil { - t.Fatal(err) + if everyN.ShouldLog() { + l.Printf("latest resolved timestamp %s behind realtime", timeutil.Since(resolved.GoTime()).String()) } + } - getConn := func(node int) *gosql.DB { return u.conn(ctx, t, node) } - fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0) - if err != nil { - t.Fatal(err) + if err := func() error { + cmvt.kafka.mu.Lock() + defer cmvt.kafka.mu.Unlock() + + if len(cmvt.kafka.mu.buf) == kafkaBufferMessageSize { + return errors.Newf("kafka message buffer limit %d exceeded", kafkaBufferMessageSize) } - fprintV.DBFunc(cmvt.cdcDBConn(getConn)) - validators := cdctest.Validators{ - cdctest.NewOrderValidator(tableName), - fprintV, + cmvt.kafka.mu.buf = append(cmvt.kafka.mu.buf, m) + return nil + }(); err != nil { + return err + } + } +} + +// validate reads rows from the kafka buffer and submits them to the validator +// to be validated. +func (cmvt *cdcMixedVersionTester) validate( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + // Choose a random node to run the validation on. + n, db := h.RandomDB(r, cmvt.crdbNodes) + l.Printf("running validation on node %d", n) + cmvt.fprintV.DBFunc(func(f func(*gosql.DB) error) error { + return f(db) + }) + + cmvt.kafka.mu.Lock() + defer cmvt.kafka.mu.Unlock() + + for _, m := range cmvt.kafka.mu.buf { + updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return errors.Wrap(err, "failed to parse timestamps from message") + } + + partitionStr := strconv.Itoa(int(m.Partition)) + if len(m.Key) > 0 { + if err := cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { + return err } - cmvt.validator = cdctest.MakeCountValidator(validators) - - for !cmvt.testFinished { - m := consumer.Next(ctx) - if m == nil { - // this is expected to happen once the test has finished and - // Kafka is being shut down. If it happens in the middle of - // the test, it will eventually time out, and this message - // should allow us to see that the validator finished - // earlier than it should have - t.L().Printf("end of changefeed") - return nil - } - - updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) - if err != nil { - t.Fatal(err) - } - - partitionStr := strconv.Itoa(int(m.Partition)) - if len(m.Key) > 0 { - if err := cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { - t.Fatal(err) - } - } else { - if err := cmvt.validator.NoteResolved(partitionStr, resolved); err != nil { - t.Fatal(err) - } - - t.L().Printf("%d resolved timestamps validated, latest is %s behind realtime", - cmvt.validator.NumResolvedWithRows, timeutil.Since(resolved.GoTime())) - cmvt.timestampResolved() - } + } else { + if err := cmvt.validator.NoteResolved(partitionStr, resolved); err != nil { + return err } - return nil - }) + l.Printf("%d resolved timestamps validated", cmvt.validator.NumResolvedWithRows) + } + + if failures := cmvt.validator.Failures(); len(failures) > 0 { + return errors.Newf("validator failures:\n%s", strings.Join(failures, "\n")) + } } + + cmvt.kafka.mu.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + return nil } // timestampsResolved updates the underlying channel if set (i.e., if // we are waiting for resolved timestamps events) -func (cmvt *cdcMixedVersionTester) timestampResolved() { +func (cmvt *cdcMixedVersionTester) timestampResolved(resolved hlc.Timestamp) { cmvt.timestampsResolved.Lock() defer cmvt.timestampsResolved.Unlock() if cmvt.timestampsResolved.C != nil { - cmvt.timestampsResolved.C <- struct{}{} + cmvt.timestampsResolved.C <- resolved } } -// cdcDBConn is the wrapper passed to the FingerprintValidator. The -// goal is to ensure that database checks by the validator do not -// happen while we are running an upgrade. We used to retry database -// calls in the validator, but that logic adds complexity and does not -// help in testing the changefeed's correctness -func (cmvt *cdcMixedVersionTester) cdcDBConn( - getConn func(int) *gosql.DB, -) func(func(*gosql.DB) error) error { - return func(f func(*gosql.DB) error) error { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() - - node := cmvt.crdbNodes.RandNode()[0] - return f(getConn(node)) +// createChangeFeed issues a call to the given node to create a change +// feed for the target table. +func (cmvt *cdcMixedVersionTester) createChangeFeed( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + // Wait for workload to be initialized so the target table exists. + select { + case <-ctx.Done(): + return ctx.Err() + case <-cmvt.workloadInit: } -} - -// crdbUpgradeStep is a wrapper to steps that upgrade the cockroach -// binary running in the cluster. It makes sure we hold exclusive -// access to the `crdbUpgrading` lock while the upgrade is in process -func (cmvt *cdcMixedVersionTester) crdbUpgradeStep(step versionStep) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() + node, db := h.RandomDB(r, cmvt.crdbNodes) + l.Printf("starting changefeed on node %d", node) - step(ctx, t, u) + options := map[string]string{ + "updated": "", + "resolved": fmt.Sprintf("'%s'", resolvedInterval), } -} -// assertValid checks if the validator has found any issues at the -// time the function is called. -func (cmvt *cdcMixedVersionTester) assertValid() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - if failures := cmvt.validator.Failures(); len(failures) > 0 { - t.Fatalf("validator failures:\n%s", strings.Join(failures, "\n")) - } + jobID, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), + cmvt.kafka.manager.sinkURL(ctx)). + With(options). + Create() + if err != nil { + return err } + l.Printf("created changefeed job %d", jobID) + return nil } -// createChangeFeed issues a call to the given node to create a change -// feed for the target table. -func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("creating changefeed") - db := u.conn(ctx, t, node) - - options := map[string]string{ - "updated": "", - "resolved": fmt.Sprintf("'%s'", resolvedInterval), - } - _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). - With(options). - Create() - if err != nil { - t.Fatal(err) - } - } +// runWorkloadCmd returns the command that runs the workload. +func (cmvt *cdcMixedVersionTester) runWorkloadCmd(r *rand.Rand) *roachtestutil.Command { + return roachtestutil.NewCommand("%s workload run bank", test.DefaultCockroachPath). + // Since all rows are placed in a buffer, setting a low rate of 2 operations / sec + // helps ensure that we don't exceed the buffer capacity. + Flag("max-rate", 2). + Flag("seed", r.Int63()). + Arg("{pgurl%s}", cmvt.crdbNodes). + Option("tolerate-errors") } -func runCDCMixedVersions( - ctx context.Context, t test.Test, c cluster.Cluster, buildVersion *version.Version, -) { - predecessorVersion, err := release.LatestPredecessor(buildVersion) - if err != nil { - t.Fatal(err) - } - - tester := newCDCMixedVersionTester(ctx, t, c) - tester.StartKafka(t, c) - defer tester.Cleanup() +// initWorkload synchronously initializes the workload. +func (cmvt *cdcMixedVersionTester) initWorkload( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + bankInit := roachtestutil.NewCommand("%s workload init bank", test.DefaultCockroachPath). + Flag("seed", r.Int63()). + Arg("{pgurl%s}", cmvt.crdbNodes) - rng, seed := randutil.NewPseudoRand() - t.L().Printf("random seed: %d", seed) - - // sqlNode returns the node to be used when sending SQL statements - // during this test. It is randomized, but the random seed is logged - // above. - sqlNode := func() int { - return tester.crdbNodes[rng.Intn(len(tester.crdbNodes))] + if err := cmvt.c.RunE(ctx, option.NodeListOption{h.RandomNode(r, cmvt.workloadNodes)}, bankInit.String()); err != nil { + return err } + close(cmvt.workloadInit) + return nil +} - newVersionUpgradeTest(c, - uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion), - tester.setupVerifier(sqlNode()), - tester.installAndStartWorkload(), - waitForUpgradeStep(tester.crdbNodes), - - // NB: at this point, cluster and binary version equal predecessorVersion, - // and auto-upgrades are on. - preventAutoUpgradeStep(sqlNode()), - tester.createChangeFeed(sqlNode()), +func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) { + tester := newCDCMixedVersionTester(ctx, t, c) - tester.waitForResolvedTimestamps(), - // Roll the nodes into the new version one by one in random order - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), - // let the workload run in the new version for a while - tester.waitForResolvedTimestamps(), + // NB: We rely on the testing framework to choose a random predecessor + // to upgrade from. + mvt := mixedversion.NewTest(ctx, t, t.L(), c, tester.crdbNodes) - tester.assertValid(), + cleanupKafka := tester.StartKafka(t, c) + defer cleanupKafka() - // Roll back again, which ought to be fine because the cluster upgrade was - // not finalized. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)), - tester.waitForResolvedTimestamps(), + // Register hooks. + mvt.OnStartup("start changefeed", tester.createChangeFeed) + mvt.OnStartup("create validator", tester.setupValidator) + mvt.OnStartup("init workload", tester.initWorkload) - tester.assertValid(), + runWorkloadCmd := tester.runWorkloadCmd(mvt.RNG()) + _ = mvt.BackgroundCommand("run workload", tester.workloadNodes, runWorkloadCmd) + _ = mvt.BackgroundFunc("run kafka consumer", tester.runKafkaConsumer) - // Roll nodes forward and finalize upgrade. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), + // NB: mvt.InMixedVersion will run these hooks multiple times at various points during the rolling upgrade, but + // not when any nodes are offline. This is important because the validator relies on a db connection. + mvt.InMixedVersion("wait and validate", tester.waitAndValidate) - // allow cluster version to update - allowAutoUpgradeStep(sqlNode()), - waitForUpgradeStep(tester.crdbNodes), + mvt.AfterUpgradeFinalized("wait and validate", tester.waitAndValidate) - tester.waitForResolvedTimestamps(), - tester.finishTest(), - tester.assertValid(), - ).run(ctx, t) + mvt.Run() }