From fc25ef039a1471719dedca090c482a778f011a9a Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 14 Aug 2023 17:35:14 -0400 Subject: [PATCH] roachtest/cdc/mixed-versions: use mixed version framework This change updates the `cdc/mixed-versions` roachtest to use the mixed version testing framework. This mixed version testing framework is better than the previous framework because it offers testing for multiple upgrades. It will also make it easier to maintain and expand this cdc-specific test. Informs: #107451 Epic: None Release note: None --- .../roachtestutil/mixedversion/BUILD.bazel | 1 + .../roachtestutil/mixedversion/helper.go | 23 + .../mixedversion/mixedversion.go | 10 + .../roachtestutil/mixedversion/runner.go | 10 + pkg/cmd/roachtest/tests/cdc.go | 4 +- pkg/cmd/roachtest/tests/mixed_version_cdc.go | 548 ++++++++++-------- 6 files changed, 339 insertions(+), 257 deletions(-) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index d8108eb91d90..3c71bc18a784 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/version", + "@com_github_cockroachdb_errors//:errors", "@com_github_pkg_errors//:errors", ], ) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go index 38e2c4990742..60f37597c6bc 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/errors" ) func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int { @@ -105,6 +106,28 @@ func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) cont }) } +// ExpectNoRestarts acts as an assertion that ensures no restarts happen inside +// some critical section. Note that this does not block restarts, but will cause +// them to fail if they run concurrently to the critical section (eventually +// failing the test because this assertion failed). The expected usage is: +// +// done, err := h.ExpectNoRestarts() +// if err != nil { +// return err +// } +// defer done() +// { +// ... critical section +// } +func (h *Helper) ExpectNoRestarts() (done func(), err error) { + if !h.upgradeFlag.TryLock() { + return nil, errors.New("hook expected no restarts but a restart was detected") + } + return func() { + h.upgradeFlag.Unlock() + }, nil +} + // ExpectDeath alerts the testing infrastructure that a node is // expected to die. Regular restarts as part of the mixedversion // testing are already taken into account. This function should only diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index f0e349468623..826587544b77 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -654,6 +654,16 @@ func (s restartWithNewBinaryStep) Run( ctx context.Context, l *logger.Logger, c cluster.Cluster, h *Helper, ) error { h.ExpectDeath() + + // If a hook set this flag before we did, the hook expects there to + // be no upgrades while it is holding the lock. Return an error to indicate + // this. + if !h.upgradeFlag.TryLock() { + return errors.New("attempted to restart when a hook expected" + + " no restarts to occur") + } + defer h.upgradeFlag.Unlock() + return clusterupgrade.RestartNodesWithNewBinary( ctx, s.rt, diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 4784bc509497..d634c2f67c05 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -47,6 +47,16 @@ type ( bgCount int64 runner *testRunner stepLogger *logger.Logger + + // A mutex held during node restart used to signal to a hook that the + // restart is taking place. + // The expected usage is: + // + // if !upgradeFlag.TryLock() { + // return errors.New() + // } + // defer upgradeFlag.Unlock() + upgradeFlag sync.Mutex } // backgroundEvent is the struct sent by background steps when they diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 5b26a7561311..b7a549aaafd5 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -633,7 +633,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - tc, err := kafka.consumer(ctx, "bank") + tc, err := kafka.newConsumer(ctx, "bank") if err != nil { t.Fatal(errors.Wrap(err, "could not create kafka consumer")) } @@ -2234,7 +2234,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error { }) } -func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) { +func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() // I was seeing "error processing FetchRequest: kafka: error decoding diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 25f6d72f5ca9..b1a769ebc1c8 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -14,23 +14,27 @@ import ( "context" gosql "database/sql" "fmt" + "math/rand" "strconv" "strings" "time" + "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" - "github.com/cockroachdb/cockroach/pkg/testutils/release" - "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" ) const ( @@ -40,7 +44,11 @@ const ( // resolvedInterval is the value passed to the `resolved` option // when creating the changefeed - resolvedInterval = "10s" + resolvedInterval = "5s" + + // kafkaBufferMessageSize is the number of messages from kafka + // we allow to be buffered in memory before validating them. + kafkaBufferMessageSize = 8192 ) var ( @@ -74,7 +82,7 @@ func registerCDCMixedVersions(r registry.Registry) { Timeout: timeout, RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runCDCMixedVersions(ctx, t, c, t.BuildVersion()) + runCDCMixedVersions(ctx, t, c) }, }) } @@ -84,21 +92,43 @@ func registerCDCMixedVersions(r registry.Registry) { // events, and run validations that ensure changefeeds work during and // after upgrade. type cdcMixedVersionTester struct { - ctx context.Context - crdbNodes option.NodeListOption - workloadNodes option.NodeListOption - kafkaNodes option.NodeListOption - monitor cluster.Monitor + ctx context.Context + + c cluster.Cluster + + crdbNodes option.NodeListOption + workloadNodes option.NodeListOption + kafkaNodes option.NodeListOption + + // This channel is closed after the workload has been initialized. + workloadInit chan struct{} + timestampsResolved struct { + latest hlc.Timestamp syncutil.Mutex - C chan struct{} + C chan hlc.Timestamp + } + + kafka struct { + manager kafkaManager + + // buf is used to buffer messages received from kafka to be sent + // to the validator and validated when the cluster is stable (ie. no node restarts in progress). + // + // Typically, messages would be immediately forwared messages to the validator, + // but we cannot do that right when a message is received because + // some nodes may be offline/upgrading at the time (the validator + // relies on having a stable DB connection to note the rows and fingerprint). + mu struct { + syncutil.Mutex + buf []*sarama.ConsumerMessage + } + + consumer *topicConsumer } - crdbUpgrading syncutil.Mutex - kafka kafkaManager - validator *cdctest.CountValidator - testFinished bool - validatorFinished chan struct{} - cleanup func() + + validator *cdctest.CountValidator + fprintV *cdctest.FingerprintValidator } func newCDCMixedVersionTester( @@ -111,304 +141,312 @@ func newCDCMixedVersionTester( c.Put(ctx, t.DeprecatedWorkload(), "./workload", lastNode) return cdcMixedVersionTester{ - ctx: ctx, - crdbNodes: crdbNodes, - workloadNodes: lastNode, - kafkaNodes: lastNode, - monitor: c.NewMonitor(ctx, crdbNodes), - validatorFinished: make(chan struct{}), + ctx: ctx, + c: c, + crdbNodes: crdbNodes, + workloadNodes: lastNode, + kafkaNodes: lastNode, + workloadInit: make(chan struct{}), } } // StartKafka will install and start Kafka on the configured node. It -// will also create the topic where changefeed events will be sent. -func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) { +// will also create the topic where changefeed events will be sent and a consumer. +// Returns a function to clean up the consumer and kafka resources. +func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (cleanup func()) { t.Status("starting Kafka node") - cmvt.kafka, cmvt.cleanup = setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) - if err := cmvt.kafka.createTopic(cmvt.ctx, targetTable); err != nil { + + manager, tearDown := setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) + if err := manager.createTopic(cmvt.ctx, targetTable); err != nil { t.Fatal(err) } -} + cmvt.kafka.manager = manager -// Cleanup is supposed to be called at the end of tests that use -// `cdcMixedVersionTester` -func (cmvt *cdcMixedVersionTester) Cleanup() { - if cmvt.cleanup != nil { - cmvt.cleanup() + consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable) + if err != nil { + t.Fatal(err) } -} -// installAndStartWorkload starts a bank workload asynchronously -func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("installing and running workload") - u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}") - cmvt.monitor.Go(func(ctx context.Context) error { - return u.c.RunE( - ctx, - cmvt.workloadNodes, - fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", - cmvt.crdbNodes), - ) - }) + cmvt.kafka.manager = manager + cmvt.kafka.mu.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + cmvt.kafka.consumer = consumer + + return func() { + cmvt.kafka.consumer.Close() + tearDown() } } -// waitForResolvedTimestamps waits for the underlying CDC verifier to -// resolve `resolvedTimestampsPerState` -func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status(fmt.Sprintf("waiting for %d resolved timestamps", resolvedTimestampsPerState)) - // create a new channel for the resolved timestamps, allowing any - // new resolved timestamps to be captured and account for in the - // loop below - func() { - cmvt.timestampsResolved.Lock() - defer cmvt.timestampsResolved.Unlock() - - cmvt.timestampsResolved.C = make(chan struct{}) - }() - - var resolved int - for resolved < resolvedTimestampsPerState { - select { - case <-cmvt.timestampsResolved.C: - resolved++ - t.L().Printf("%d of %d timestamps resolved", resolved, resolvedTimestampsPerState) - case <-ctx.Done(): - return +// waitForResolvedTimestamps waits for a few resolved timestamps to assert +// that the changefeed is running. +func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + l.Printf("waiting for %d resolved timestamps", resolvedTimestampsPerState) + // create a new channel for the resolved timestamps, allowing any + // new resolved timestamps to be captured and account for in the + // loop below + func() { + cmvt.timestampsResolved.Lock() + defer cmvt.timestampsResolved.Unlock() + + cmvt.timestampsResolved.C = make(chan hlc.Timestamp) + }() + + var numResolved int + for numResolved < resolvedTimestampsPerState { + select { + case resolvedTs := <-cmvt.timestampsResolved.C: + if !cmvt.timestampsResolved.latest.Less(resolvedTs) { + return errors.Newf("expected resolved timestamp %s to be greater than previous "+ + " resolved timestamp %s", resolvedTs, cmvt.timestampsResolved.latest) } + cmvt.timestampsResolved.latest = resolvedTs + numResolved++ + l.Printf("%d of %d timestamps resolved", numResolved, resolvedTimestampsPerState) + case <-ctx.Done(): + return ctx.Err() } + } - // set the resolved timestamps channel back to `nil`; while in - // this state, any new resolved timestamps will be ignored - func() { - cmvt.timestampsResolved.Lock() - defer cmvt.timestampsResolved.Unlock() + // set the resolved timestamps channel back to `nil`; while in + // this state, any new resolved timestamps will be ignored + func() { + cmvt.timestampsResolved.Lock() + defer cmvt.timestampsResolved.Unlock() - cmvt.timestampsResolved.C = nil - }() - } + cmvt.timestampsResolved.C = nil + }() + return nil } -// finishTest marks the test as finished which will then prompt the -// changefeed validator loop to return. This step will block until the -// validator has finished; this is done to avoid the possibility of -// the test closing the database connection while the validator is -// running a query. -func (cmvt *cdcMixedVersionTester) finishTest() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.L().Printf("waiting for background tasks to finish") - cmvt.testFinished = true - <-cmvt.validatorFinished +// setupValidator creates a CDC validator to validate that a changefeed +// created on the target table is able to re-create the table +// somewhere else. It can also verify changefeed ordering guarantees. +func (cmvt *cdcMixedVersionTester) setupValidator( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + tableName := fmt.Sprintf("%s.%s", targetDB, targetTable) + if err := h.Exec(r, "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)"); err != nil { + return err } + + // The fingerprint validator will save this db connection and use it + // when we submit rows for validation. This can be changed later using + // `(*FingerprintValidator) DBFunc`. + _, db := h.RandomDB(r, cmvt.crdbNodes) + fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, + cmvt.kafka.consumer.partitions, 0) + if err != nil { + return err + } + validators := cdctest.Validators{ + cdctest.NewOrderValidator(tableName), + fprintV, + } + + cmvt.validator = cdctest.MakeCountValidator(validators) + cmvt.fprintV = fprintV + return nil } -// setupVerifier creates a CDC validator to validate that a changefeed -// created on the `target` table is able to re-create the table -// somewhere else. It also verifies CDC's ordering guarantees. This -// step will not block, but will start the verifier in a separate Go -// routine. Use `waitForVerifier` to wait for the verifier to finish. -func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - tableName := targetDB + "." + targetTable - t.Status(fmt.Sprintf("setting up changefeed verifier for table %s", tableName)) - - // we could just return the error here and let `Wait` return the - // error. However, calling t.Fatal directly lets us stop the test - // earlier - cmvt.monitor.Go(func(ctx context.Context) error { - defer close(cmvt.validatorFinished) - consumer, err := cmvt.kafka.consumer(ctx, targetTable) +// runKafkaConsumer continuously reads from the kafka consumer and places +// messages in a buffer. It expects to be run in a goroutine and does not +// rely on any DB connections. +func (cmvt *cdcMixedVersionTester) runKafkaConsumer( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + everyN := log.Every(30 * time.Second) + + // This runs until the test finishes, which will be signaled via + // context cancellation. We rely on consumer.Next() to check + // the context. + for { + m := cmvt.kafka.consumer.Next(ctx) + if m == nil { + // this is expected to happen once the test has finished and + // Kafka is being shut down. If it happens in the middle of + // the test, it will eventually time out, and this message + // should allow us to see that the validator finished + // earlier than it should have + l.Printf("end of changefeed") + return nil + } + + // Forward resolved timetsamps to "heartbeat" that the changefeed is running. + if len(m.Key) <= 0 { + _, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) if err != nil { - t.Fatal(err) + return err } - defer consumer.Close() + cmvt.timestampResolved(resolved) - db := u.conn(ctx, t, node) - if _, err := db.Exec( - "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)", - ); err != nil { - t.Fatal(err) + if everyN.ShouldLog() { + l.Printf("latest resolved timestamp %s behind realtime", timeutil.Since(resolved.GoTime()).String()) } + } - getConn := func(node int) *gosql.DB { return u.conn(ctx, t, node) } - fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0) - if err != nil { - t.Fatal(err) + if err := func() error { + cmvt.kafka.mu.Lock() + defer cmvt.kafka.mu.Unlock() + + if len(cmvt.kafka.mu.buf) == kafkaBufferMessageSize { + return errors.Newf("kafka message buffer limit %d exceeded", kafkaBufferMessageSize) } - fprintV.DBFunc(cmvt.cdcDBConn(getConn)) - validators := cdctest.Validators{ - cdctest.NewOrderValidator(tableName), - fprintV, + cmvt.kafka.mu.buf = append(cmvt.kafka.mu.buf, m) + return nil + }(); err != nil { + return err + } + } +} + +// validate reads rows from the kafka buffer and submits them to the validator +// to be validated. +func (cmvt *cdcMixedVersionTester) validate( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + // Assert that no restarts takes place during validation because the fingerprint validator relies on a db + // connection. We expect no restarts because `InMixedVersion` hooks run synchronously between node restarts. + reset, err := h.ExpectNoRestarts() + if err != nil { + return err + } + defer reset() + + // Choose a random node to run the validation on. + n, db := h.RandomDB(r, cmvt.crdbNodes) + l.Printf("running validation on node %d", n) + cmvt.fprintV.DBFunc(func(f func(*gosql.DB) error) error { + return f(db) + }) + + cmvt.kafka.mu.Lock() + defer cmvt.kafka.mu.Unlock() + + for _, m := range cmvt.kafka.mu.buf { + updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return err + } + + partitionStr := strconv.Itoa(int(m.Partition)) + if len(m.Key) > 0 { + if err := cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { + return err } - cmvt.validator = cdctest.MakeCountValidator(validators) - - for !cmvt.testFinished { - m := consumer.Next(ctx) - if m == nil { - // this is expected to happen once the test has finished and - // Kafka is being shut down. If it happens in the middle of - // the test, it will eventually time out, and this message - // should allow us to see that the validator finished - // earlier than it should have - t.L().Printf("end of changefeed") - return nil - } - - updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) - if err != nil { - t.Fatal(err) - } - - partitionStr := strconv.Itoa(int(m.Partition)) - if len(m.Key) > 0 { - if err := cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { - t.Fatal(err) - } - } else { - if err := cmvt.validator.NoteResolved(partitionStr, resolved); err != nil { - t.Fatal(err) - } - - t.L().Printf("%d resolved timestamps validated, latest is %s behind realtime", - cmvt.validator.NumResolvedWithRows, timeutil.Since(resolved.GoTime())) - cmvt.timestampResolved() - } + } else { + if err := cmvt.validator.NoteResolved(partitionStr, resolved); err != nil { + return err } - return nil - }) + l.Printf("%d resolved timestamps validated", cmvt.validator.NumResolvedWithRows) + } + + if failures := cmvt.validator.Failures(); len(failures) > 0 { + return errors.Newf("validator failures:\n%s", strings.Join(failures, "\n")) + } } + + cmvt.kafka.mu.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + return nil } // timestampsResolved updates the underlying channel if set (i.e., if // we are waiting for resolved timestamps events) -func (cmvt *cdcMixedVersionTester) timestampResolved() { +func (cmvt *cdcMixedVersionTester) timestampResolved(resolved hlc.Timestamp) { cmvt.timestampsResolved.Lock() defer cmvt.timestampsResolved.Unlock() if cmvt.timestampsResolved.C != nil { - cmvt.timestampsResolved.C <- struct{}{} + cmvt.timestampsResolved.C <- resolved } } -// cdcDBConn is the wrapper passed to the FingerprintValidator. The -// goal is to ensure that database checks by the validator do not -// happen while we are running an upgrade. We used to retry database -// calls in the validator, but that logic adds complexity and does not -// help in testing the changefeed's correctness -func (cmvt *cdcMixedVersionTester) cdcDBConn( - getConn func(int) *gosql.DB, -) func(func(*gosql.DB) error) error { - return func(f func(*gosql.DB) error) error { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() - - node := cmvt.crdbNodes.RandNode()[0] - return f(getConn(node)) +// createChangeFeed issues a call to the given node to create a change +// feed for the target table. +func (cmvt *cdcMixedVersionTester) createChangeFeed( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + // Wait for workload to be initialized so the target table exists. + select { + case <-ctx.Done(): + return ctx.Err() + case <-cmvt.workloadInit: } -} + node, db := h.RandomDB(r, cmvt.crdbNodes) + l.Printf("starting changefeed on node %d", node) -// crdbUpgradeStep is a wrapper to steps that upgrade the cockroach -// binary running in the cluster. It makes sure we hold exclusive -// access to the `crdbUpgrading` lock while the upgrade is in process -func (cmvt *cdcMixedVersionTester) crdbUpgradeStep(step versionStep) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() - - step(ctx, t, u) + options := map[string]string{ + "updated": "", + "resolved": fmt.Sprintf("'%s'", resolvedInterval), } -} -// assertValid checks if the validator has found any issues at the -// time the function is called. -func (cmvt *cdcMixedVersionTester) assertValid() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - if failures := cmvt.validator.Failures(); len(failures) > 0 { - t.Fatalf("validator failures:\n%s", strings.Join(failures, "\n")) - } + jobID, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), + cmvt.kafka.manager.sinkURL(ctx)). + With(options). + Create() + if err != nil { + return err } + l.Printf("created changefeed job %d", jobID) + return nil } -// createChangeFeed issues a call to the given node to create a change -// feed for the target table. -func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("creating changefeed") - db := u.conn(ctx, t, node) - - options := map[string]string{ - "updated": "", - "resolved": fmt.Sprintf("'%s'", resolvedInterval), - } - _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). - With(options). - Create() - if err != nil { - t.Fatal(err) - } +// runWorkload runs the bank workload in the background. +func (cmvt *cdcMixedVersionTester) runWorkload( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + bankRun := roachtestutil.NewCommand("%s workload run bank", test.DefaultCockroachPath). + Flag("max-rate", 10). + Arg("{pgurl%s}", cmvt.crdbNodes). + Option("tolerate-errors") + + if err := cmvt.c.RunE(ctx, option.NodeListOption{h.RandomNode(r, cmvt.crdbNodes)}, bankRun.String()); err != nil { + return err } + return nil } -func runCDCMixedVersions( - ctx context.Context, t test.Test, c cluster.Cluster, buildVersion *version.Version, -) { - predecessorVersion, err := release.LatestPredecessor(buildVersion) - if err != nil { - t.Fatal(err) - } - - tester := newCDCMixedVersionTester(ctx, t, c) - tester.StartKafka(t, c) - defer tester.Cleanup() +// initWorkload synchronously initializes the workload. +func (cmvt *cdcMixedVersionTester) initWorkload( + ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper, +) error { + bankInit := roachtestutil.NewCommand("%s workload init bank", test.DefaultCockroachPath). + Arg("{pgurl:1}") - rng, seed := randutil.NewPseudoRand() - t.L().Printf("random seed: %d", seed) - - // sqlNode returns the node to be used when sending SQL statements - // during this test. It is randomized, but the random seed is logged - // above. - sqlNode := func() int { - return tester.crdbNodes[rng.Intn(len(tester.crdbNodes))] + if err := cmvt.c.RunE(ctx, option.NodeListOption{h.RandomNode(r, cmvt.crdbNodes)}, bankInit.String()); err != nil { + return err } + close(cmvt.workloadInit) + return nil +} - newVersionUpgradeTest(c, - uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion), - tester.setupVerifier(sqlNode()), - tester.installAndStartWorkload(), - waitForUpgradeStep(tester.crdbNodes), - - // NB: at this point, cluster and binary version equal predecessorVersion, - // and auto-upgrades are on. - preventAutoUpgradeStep(sqlNode()), - tester.createChangeFeed(sqlNode()), +func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) { + tester := newCDCMixedVersionTester(ctx, t, c) - tester.waitForResolvedTimestamps(), - // Roll the nodes into the new version one by one in random order - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), - // let the workload run in the new version for a while - tester.waitForResolvedTimestamps(), + // NB: We rely on the testing framework to choose a random predecessor + // to upgrade from. + mvt := mixedversion.NewTest(ctx, t, t.L(), c, tester.crdbNodes) - tester.assertValid(), + cleanupKafka := tester.StartKafka(t, c) + defer cleanupKafka() - // Roll back again, which ought to be fine because the cluster upgrade was - // not finalized. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)), - tester.waitForResolvedTimestamps(), + // Register hooks. + mvt.OnStartup("start changefeed", tester.createChangeFeed) + mvt.OnStartup("create validator", tester.setupValidator) + mvt.OnStartup("init workload", tester.initWorkload) - tester.assertValid(), + _ = mvt.BackgroundFunc("run workload", tester.runWorkload) + _ = mvt.BackgroundFunc("run kafka consumer", tester.runKafkaConsumer) - // Roll nodes forward and finalize upgrade. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), + // NB: mvt.InMixedVersion will run these hooks multiple times at various points during the rolling upgrade, but + // not when any nodes are offline. This is important because the validator relies on a db connection. + mvt.InMixedVersion("wait for resolve timestamps", tester.waitForResolvedTimestamps) + mvt.InMixedVersion("validate messages", tester.validate) - // allow cluster version to update - allowAutoUpgradeStep(sqlNode()), - waitForUpgradeStep(tester.crdbNodes), + mvt.AfterUpgradeFinalized("wait for resolve timestamps", tester.waitForResolvedTimestamps) + mvt.AfterUpgradeFinalized("validate messages", tester.validate) - tester.waitForResolvedTimestamps(), - tester.finishTest(), - tester.assertValid(), - ).run(ctx, t) + mvt.Run() }