From 3b0a059e111361efd63f1b6d151f171bda4a277f Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 14 Aug 2023 17:35:14 -0400 Subject: [PATCH] roachtest/cdc/mixed-versions: use mixed version framework This change updates the `cdc/mixed-versions` roachtest to use the mixed version testing framework. This mixed version testing framework is better than the previous framework because it offers testing for multiple upgrades. It will also make it easier to maintain and expand this cdc-specific test. Informs: #107451 Epic: None Release note: None --- .../roachtestutil/mixedversion/helper.go | 6 + .../mixedversion/mixedversion.go | 20 +- pkg/cmd/roachtest/tests/cdc.go | 4 +- pkg/cmd/roachtest/tests/mixed_version_cdc.go | 487 ++++++++++-------- 4 files changed, 277 insertions(+), 240 deletions(-) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go index 38e2c4990742..90eb9a71ab75 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go @@ -105,6 +105,12 @@ func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) cont }) } +// Command runs the command on the provided nodes. +func (h *Helper) Command(cmd string, nodes option.NodeListOption) error { + h.stepLogger.Printf("running command `%s` on nodes %v", cmd, nodes) + return h.runner.cluster.RunE(h.ctx, nodes, cmd) +} + // ExpectDeath alerts the testing infrastructure that a node is // expected to die. Regular restarts as part of the mixedversion // testing are already taken into account. This function should only diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index f0e349468623..aa561eaf5f23 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -158,7 +158,7 @@ type ( Finalizing bool } - // userFunc is the signature for user-provided functions that run at + // UserFunc is the signature for user-provided functions that run at // various points in the test (synchronously or in the background). // These functions run on the test runner node itself; i.e., any // commands they wish to execute on the cluster need to go through @@ -167,7 +167,7 @@ type ( // error over calling `t.Fatal` directly. The error is handled by // the mixedversion framework and better error messages are produced // as a result. - userFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error + UserFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error predicateFunc func(Context) bool // versionUpgradeHook is a hook that can be called at any time @@ -179,7 +179,7 @@ type ( versionUpgradeHook struct { name string predicate predicateFunc - fn userFunc + fn UserFunc } // testStep is an opaque reference to one step of a mixed-version @@ -359,7 +359,7 @@ func (t *Test) RNG() *rand.Rand { // will be tested in arbitrary mixed-version states. If multiple // InMixedVersion hooks are passed, they will be executed // concurrently. -func (t *Test) InMixedVersion(desc string, fn userFunc) { +func (t *Test) InMixedVersion(desc string, fn UserFunc) { lastFromVersion := "invalid-version" var numUpgradedNodes int predicate := func(testContext Context) bool { @@ -389,7 +389,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) { // certain previous version, potentially from existing fixtures). If // multiple OnStartup hooks are passed, they will be executed // concurrently. -func (t *Test) OnStartup(desc string, fn userFunc) { +func (t *Test) OnStartup(desc string, fn UserFunc) { // Since the callbacks here are only referenced in the setup steps // of the planner, there is no need to have a predicate function // gating them. @@ -400,20 +400,20 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // mixed-version test has brought the cluster to the latest version, // and allowed the upgrade to finalize successfully. If multiple such // hooks are passed, they will be executed concurrently. -func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) { +func (t *Test) AfterUpgradeFinalized(desc string, fn UserFunc) { t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn}) } // BackgroundFunc runs the function passed as argument in the // background during the test. Background functions are kicked off // once the cluster has been initialized (i.e., after all startup -// steps have finished). If the `userFunc` returns an error, it will +// steps have finished). If the `UserFunc` returns an error, it will // cause the test to fail. These functions can run indefinitely but // should respect the context passed to them, which will be canceled // when the test terminates (successfully or not). Returns a function // that can be called to terminate the step, which will cancel the -// context passed to `userFunc`. -func (t *Test) BackgroundFunc(desc string, fn userFunc) StopFunc { +// context passed to `UserFunc`. +func (t *Test) BackgroundFunc(desc string, fn UserFunc) StopFunc { t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn}) ch := make(shouldStop) @@ -512,7 +512,7 @@ func (t *Test) buildVersion() *version.Version { return t.rt.BuildVersion() } -func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc { +func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) UserFunc { return func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *Helper) error { l.Printf("running command `%s` on nodes %v", cmd, nodes) return t.cluster.RunE(ctx, nodes, cmd) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 5b26a7561311..b7a549aaafd5 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -633,7 +633,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - tc, err := kafka.consumer(ctx, "bank") + tc, err := kafka.newConsumer(ctx, "bank") if err != nil { t.Fatal(errors.Wrap(err, "could not create kafka consumer")) } @@ -2234,7 +2234,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error { }) } -func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) { +func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() // I was seeing "error processing FetchRequest: kafka: error decoding diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 25f6d72f5ca9..96c5e8fbc925 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -12,25 +12,27 @@ package tests import ( "context" - gosql "database/sql" "fmt" + "math/rand" "strconv" "strings" "time" + "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" - "github.com/cockroachdb/cockroach/pkg/testutils/release" - "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" ) const ( @@ -40,7 +42,11 @@ const ( // resolvedInterval is the value passed to the `resolved` option // when creating the changefeed - resolvedInterval = "10s" + resolvedInterval = "5s" + + // kafkaBufferMessageSize is the number of messages from kafka + // we allow to be buffered in memory before validating them. + kafkaBufferMessageSize = 8192 ) var ( @@ -74,7 +80,7 @@ func registerCDCMixedVersions(r registry.Registry) { Timeout: timeout, RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runCDCMixedVersions(ctx, t, c, t.BuildVersion()) + runCDCMixedVersions(ctx, t, c) }, }) } @@ -84,21 +90,66 @@ func registerCDCMixedVersions(r registry.Registry) { // events, and run validations that ensure changefeeds work during and // after upgrade. type cdcMixedVersionTester struct { - ctx context.Context - crdbNodes option.NodeListOption - workloadNodes option.NodeListOption - kafkaNodes option.NodeListOption - monitor cluster.Monitor + ctx context.Context + crdbNodes option.NodeListOption + workloadNodes option.NodeListOption + kafkaNodes option.NodeListOption + + // This channel is closed after the workload has been initialized. + workloadInit chan struct{} + timestampsResolved struct { syncutil.Mutex C chan struct{} } - crdbUpgrading syncutil.Mutex - kafka kafkaManager - validator *cdctest.CountValidator - testFinished bool - validatorFinished chan struct{} - cleanup func() + + kafka struct { + manager kafkaManager + + // buf is used to buffer messages received from kafka to be sent + // to the validator and validated when the cluster is stable (ie. no node restarts in progress). + // + // Typically, messages would be immediately forwared messages to the validator, + // but we cannot do that right when a message is received because + // some nodes may be offline/upgrading at the time (the validator + // relies on having a stable DB connection to note the rows and fingerprint). + buf []*sarama.ConsumerMessage + + consumer *topicConsumer + } + + validator *validatorWithDBOverride +} + +// validatorWithDBOverride is a *cdctest.CountValidator which contains other +// validators such as *cdctest.FingerPrintValidator. +// +// NB: The fingerprint validator depends on an internal DB connection. Because +// this test performs rolling upgrades, these connections aren't always safe to +// use. Thus, the caller must make sure no node restarts should be in progress +// when using the validator (hence the "Unsafe" API). A mixed version state +// during validation is okay, but connections should not be able to be torn +// down while performing validations. +type validatorWithDBOverride struct { + unsafeCountV *cdctest.CountValidator + unsafeFingerPrintV *cdctest.FingerprintValidator +} + +// UnsafeGetValidator returns a *cdctest.CountValidator and should not be called +// during node restarts. See comment on receiver for more details. +func (v *validatorWithDBOverride) UnsafeGetValidator() *cdctest.CountValidator { + return v.unsafeCountV +} + +func newValidator( + orderValidator cdctest.Validator, fPrintV *cdctest.FingerprintValidator, +) *validatorWithDBOverride { + validators := cdctest.Validators{ + orderValidator, + fPrintV, + } + countValidator := cdctest.MakeCountValidator(validators) + return &validatorWithDBOverride{unsafeCountV: countValidator, unsafeFingerPrintV: fPrintV} } func newCDCMixedVersionTester( @@ -111,54 +162,46 @@ func newCDCMixedVersionTester( c.Put(ctx, t.DeprecatedWorkload(), "./workload", lastNode) return cdcMixedVersionTester{ - ctx: ctx, - crdbNodes: crdbNodes, - workloadNodes: lastNode, - kafkaNodes: lastNode, - monitor: c.NewMonitor(ctx, crdbNodes), - validatorFinished: make(chan struct{}), + ctx: ctx, + crdbNodes: crdbNodes, + workloadNodes: lastNode, + kafkaNodes: lastNode, + workloadInit: make(chan struct{}), } } // StartKafka will install and start Kafka on the configured node. It -// will also create the topic where changefeed events will be sent. -func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) { +// will also create the topic where changefeed events will be sent and a consumer. +// Returns a function to clean up the consumer and kafka resources. +func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (cleanup func()) { t.Status("starting Kafka node") - cmvt.kafka, cmvt.cleanup = setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) - if err := cmvt.kafka.createTopic(cmvt.ctx, targetTable); err != nil { + + manager, tearDown := setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) + if err := manager.createTopic(cmvt.ctx, targetTable); err != nil { t.Fatal(err) } -} + cmvt.kafka.manager = manager -// Cleanup is supposed to be called at the end of tests that use -// `cdcMixedVersionTester` -func (cmvt *cdcMixedVersionTester) Cleanup() { - if cmvt.cleanup != nil { - cmvt.cleanup() + consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable) + if err != nil { + t.Fatal(err) } -} -// installAndStartWorkload starts a bank workload asynchronously -func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("installing and running workload") - u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}") - cmvt.monitor.Go(func(ctx context.Context) error { - return u.c.RunE( - ctx, - cmvt.workloadNodes, - fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", - cmvt.crdbNodes), - ) - }) + cmvt.kafka.manager = manager + cmvt.kafka.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + cmvt.kafka.consumer = consumer + + return func() { + cmvt.kafka.consumer.Close() + tearDown() } } -// waitForResolvedTimestamps waits for the underlying CDC verifier to -// resolve `resolvedTimestampsPerState` -func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status(fmt.Sprintf("waiting for %d resolved timestamps", resolvedTimestampsPerState)) +// waitForResolvedTimestamps waits for a few resolved timestamps to assert +// that the changefeed is running. +func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() mixedversion.UserFunc { + return func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + l.Printf("waiting for %d resolved timestamps", resolvedTimestampsPerState) // create a new channel for the resolved timestamps, allowing any // new resolved timestamps to be captured and account for in the // loop below @@ -174,9 +217,9 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { select { case <-cmvt.timestampsResolved.C: resolved++ - t.L().Printf("%d of %d timestamps resolved", resolved, resolvedTimestampsPerState) + l.Printf("%d of %d timestamps resolved", resolved, resolvedTimestampsPerState) case <-ctx.Done(): - return + return ctx.Err() } } @@ -188,97 +231,110 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { cmvt.timestampsResolved.C = nil }() + return nil } } -// finishTest marks the test as finished which will then prompt the -// changefeed validator loop to return. This step will block until the -// validator has finished; this is done to avoid the possibility of -// the test closing the database connection while the validator is -// running a query. -func (cmvt *cdcMixedVersionTester) finishTest() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.L().Printf("waiting for background tasks to finish") - cmvt.testFinished = true - <-cmvt.validatorFinished +// runKafkaConsumer creates a CDC validator to validate that a changefeed +// created on the target table is able to re-create the table +// somewhere else. It can also verify changefeed ordering guarantees. +func (cmvt *cdcMixedVersionTester) setupValidator() mixedversion.UserFunc { + return func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + tableName := fmt.Sprintf("%s.%s", targetDB, targetTable) + _, db := h.RandomDB(r, cmvt.crdbNodes) + if _, err := db.Exec( + "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)", + ); err != nil { + return err + } + + // NB: The fingerprint validator will save this db connection and use it + // when we submit rows for validation. + fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, cmvt.kafka.consumer.partitions, 0) + if err != nil { + return err + } + cmvt.validator = newValidator(cdctest.NewOrderValidator(tableName), fprintV) + return nil } } -// setupVerifier creates a CDC validator to validate that a changefeed -// created on the `target` table is able to re-create the table -// somewhere else. It also verifies CDC's ordering guarantees. This -// step will not block, but will start the verifier in a separate Go -// routine. Use `waitForVerifier` to wait for the verifier to finish. -func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - tableName := targetDB + "." + targetTable - t.Status(fmt.Sprintf("setting up changefeed verifier for table %s", tableName)) - - // we could just return the error here and let `Wait` return the - // error. However, calling t.Fatal directly lets us stop the test - // earlier - cmvt.monitor.Go(func(ctx context.Context) error { - defer close(cmvt.validatorFinished) - consumer, err := cmvt.kafka.consumer(ctx, targetTable) - if err != nil { - t.Fatal(err) +// runKafkaConsumer continuously reads from the kafka consumer and places +// messages in a buffer. It expects to be run in a goroutine and does not +// rely on any DB connections. +func (cmvt *cdcMixedVersionTester) runKafkaConsumer() mixedversion.UserFunc { + return func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + everyN := log.Every(30 * time.Second) + + // This runs until the test finishes, which will be signaled via + // context cancellation. We rely on consumer.Next() to check + // the context. + for { + m := cmvt.kafka.consumer.Next(ctx) + if m == nil { + // this is expected to happen once the test has finished and + // Kafka is being shut down. If it happens in the middle of + // the test, it will eventually time out, and this message + // should allow us to see that the validator finished + // earlier than it should have + l.Printf("end of changefeed") + return nil } - defer consumer.Close() - db := u.conn(ctx, t, node) - if _, err := db.Exec( - "CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)", - ); err != nil { - t.Fatal(err) + // Forward resolved timetsamps to "heartbeat" that the changefeed is running. + if len(m.Key) <= 0 { + _, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return err + } + cmvt.timestampResolved() + + if everyN.ShouldLog() { + l.Printf("latest resolved timestamp %d behind realtime", timeutil.Since(resolved.GoTime())) + } } - getConn := func(node int) *gosql.DB { return u.conn(ctx, t, node) } - fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0) - if err != nil { - t.Fatal(err) + if len(cmvt.kafka.buf) == kafkaBufferMessageSize { + return errors.Newf("kafka message buffer limit %d exceeded", kafkaBufferMessageSize) } - fprintV.DBFunc(cmvt.cdcDBConn(getConn)) - validators := cdctest.Validators{ - cdctest.NewOrderValidator(tableName), - fprintV, + cmvt.kafka.buf = append(cmvt.kafka.buf, m) + } + } +} + +// validate reads rows from the kafka buffer and submits them to the validator +// to be validated. +func (cmvt *cdcMixedVersionTester) validate() mixedversion.UserFunc { + return func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + // NB: The caller is responsible for making sure this function is run + // when nodes are running. + validator := cmvt.validator.UnsafeGetValidator() + for _, m := range cmvt.kafka.buf { + updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return err } - cmvt.validator = cdctest.MakeCountValidator(validators) - - for !cmvt.testFinished { - m := consumer.Next(ctx) - if m == nil { - // this is expected to happen once the test has finished and - // Kafka is being shut down. If it happens in the middle of - // the test, it will eventually time out, and this message - // should allow us to see that the validator finished - // earlier than it should have - t.L().Printf("end of changefeed") - return nil - } - updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) - if err != nil { - t.Fatal(err) + partitionStr := strconv.Itoa(int(m.Partition)) + if len(m.Key) > 0 { + if err := validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { + return err } - - partitionStr := strconv.Itoa(int(m.Partition)) - if len(m.Key) > 0 { - if err := cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated); err != nil { - t.Fatal(err) - } - } else { - if err := cmvt.validator.NoteResolved(partitionStr, resolved); err != nil { - t.Fatal(err) - } - - t.L().Printf("%d resolved timestamps validated, latest is %s behind realtime", - cmvt.validator.NumResolvedWithRows, timeutil.Since(resolved.GoTime())) - cmvt.timestampResolved() + } else { + if err := validator.NoteResolved(partitionStr, resolved); err != nil { + return err } + + l.Printf("%d resolved timestamps validated", validator.NumResolvedWithRows) + } + + if failures := validator.Failures(); len(failures) > 0 { + return errors.Newf("validator failures:\n%s", strings.Join(failures, "\n")) } + } - return nil - }) + cmvt.kafka.buf = make([]*sarama.ConsumerMessage, 0, kafkaBufferMessageSize) + return nil } } @@ -293,122 +349,97 @@ func (cmvt *cdcMixedVersionTester) timestampResolved() { } } -// cdcDBConn is the wrapper passed to the FingerprintValidator. The -// goal is to ensure that database checks by the validator do not -// happen while we are running an upgrade. We used to retry database -// calls in the validator, but that logic adds complexity and does not -// help in testing the changefeed's correctness -func (cmvt *cdcMixedVersionTester) cdcDBConn( - getConn func(int) *gosql.DB, -) func(func(*gosql.DB) error) error { - return func(f func(*gosql.DB) error) error { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() - - node := cmvt.crdbNodes.RandNode()[0] - return f(getConn(node)) - } -} - -// crdbUpgradeStep is a wrapper to steps that upgrade the cockroach -// binary running in the cluster. It makes sure we hold exclusive -// access to the `crdbUpgrading` lock while the upgrade is in process -func (cmvt *cdcMixedVersionTester) crdbUpgradeStep(step versionStep) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - cmvt.crdbUpgrading.Lock() - defer cmvt.crdbUpgrading.Unlock() - - step(ctx, t, u) - } -} - -// assertValid checks if the validator has found any issues at the -// time the function is called. -func (cmvt *cdcMixedVersionTester) assertValid() versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - if failures := cmvt.validator.Failures(); len(failures) > 0 { - t.Fatalf("validator failures:\n%s", strings.Join(failures, "\n")) - } - } -} - // createChangeFeed issues a call to the given node to create a change // feed for the target table. -func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - t.Status("creating changefeed") - db := u.conn(ctx, t, node) +func (cmvt *cdcMixedVersionTester) createChangeFeed() func(context.Context, *logger.Logger, *rand.Rand, *mixedversion.Helper) error { + return func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + // Wait for workload to be initialized so the target table exists. + select { + case <-ctx.Done(): + return ctx.Err() + case <-cmvt.workloadInit: + } + _, db := h.RandomDB(r, cmvt.crdbNodes) options := map[string]string{ "updated": "", "resolved": fmt.Sprintf("'%s'", resolvedInterval), } - _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). + + _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.manager.sinkURL(ctx)). With(options). Create() - if err != nil { - t.Fatal(err) + if err == nil { + return err } - } -} -func runCDCMixedVersions( - ctx context.Context, t test.Test, c cluster.Cluster, buildVersion *version.Version, -) { - predecessorVersion, err := release.LatestPredecessor(buildVersion) - if err != nil { - t.Fatal(err) + return nil } +} - tester := newCDCMixedVersionTester(ctx, t, c) - tester.StartKafka(t, c) - defer tester.Cleanup() - - rng, seed := randutil.NewPseudoRand() - t.L().Printf("random seed: %d", seed) +// runWorkload runs the bank workload in the background. +func (cmvt *cdcMixedVersionTester) runWorkload() func(context.Context, *logger.Logger, *rand.Rand, *mixedversion.Helper) error { + return func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + bankRun := roachtestutil.NewCommand("%s workload run bank", test.DefaultCockroachPath). + Flag("max-rate", 10). + Arg("{pgurl%s}", cmvt.crdbNodes). + Option("tolerate-errors") + + // Sanity check that the workload has been initlized. + select { + case <-ctx.Done(): + return ctx.Err() + case <-cmvt.workloadInit: + } - // sqlNode returns the node to be used when sending SQL statements - // during this test. It is randomized, but the random seed is logged - // above. - sqlNode := func() int { - return tester.crdbNodes[rng.Intn(len(tester.crdbNodes))] + h.BackgroundCommand(bankRun.String(), option.NodeListOption{h.RandomNode(r, cmvt.crdbNodes)}) + return nil } +} - newVersionUpgradeTest(c, - uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion), - tester.setupVerifier(sqlNode()), - tester.installAndStartWorkload(), - waitForUpgradeStep(tester.crdbNodes), - - // NB: at this point, cluster and binary version equal predecessorVersion, - // and auto-upgrades are on. - preventAutoUpgradeStep(sqlNode()), - tester.createChangeFeed(sqlNode()), - - tester.waitForResolvedTimestamps(), - // Roll the nodes into the new version one by one in random order - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), - // let the workload run in the new version for a while - tester.waitForResolvedTimestamps(), - - tester.assertValid(), - - // Roll back again, which ought to be fine because the cluster upgrade was - // not finalized. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)), - tester.waitForResolvedTimestamps(), - - tester.assertValid(), +// initWorkload synchronously initializes the workload. +func (cmvt *cdcMixedVersionTester) initWorkload() func(context.Context, *logger.Logger, *rand.Rand, *mixedversion.Helper) error { + return func(ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper) error { + bankInit := roachtestutil.NewCommand("%s workload init bank", test.DefaultCockroachPath). + Arg("{pgurl:1}") - // Roll nodes forward and finalize upgrade. - tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)), + // NB: We add 1 because option.NodeListOptions are indexed starting from 1. + if err := h.Command(bankInit.String(), option.NodeListOption{h.RandomNode(r, cmvt.crdbNodes) + 1}); err != nil { + return err + } + close(cmvt.workloadInit) + return nil + } +} - // allow cluster version to update - allowAutoUpgradeStep(sqlNode()), - waitForUpgradeStep(tester.crdbNodes), +func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) { + tester := newCDCMixedVersionTester(ctx, t, c) - tester.waitForResolvedTimestamps(), - tester.finishTest(), - tester.assertValid(), - ).run(ctx, t) + // NB: We rely on the testing framework to choose a random predecessor + // to upgrade from. + mvt := mixedversion.NewTest(ctx, t, t.L(), c, tester.crdbNodes) + + cleanupKafka := tester.StartKafka(t, c) + defer cleanupKafka() + + // Register hooks. + mvt.OnStartup("start-changefeed", tester.createChangeFeed()) + mvt.OnStartup("create-validator", tester.setupValidator()) + mvt.OnStartup("init-workload", tester.initWorkload()) + + // mvt should cancel its context when it is finished running, but we still call + // the stop functions for safety. + stopWorkload := mvt.BackgroundFunc("run-workload", tester.runWorkload()) + defer stopWorkload() + stopKafkaConsumer := mvt.BackgroundFunc("run-kafka-consumer", tester.runKafkaConsumer()) + defer stopKafkaConsumer() + // NB: mvt.InMixedVersion will run these hooks multiple times at various points during the rolling upgrade, but + // not when any nodes are offline. + mvt.InMixedVersion("wait-for-resolve-timestamps", tester.waitForResolvedTimestamps()) + mvt.InMixedVersion("validate-messages", tester.validate()) + + mvt.AfterUpgradeFinalized("wait-for-resolve-timestamps", tester.waitForResolvedTimestamps()) + mvt.AfterUpgradeFinalized("validate-messages", tester.validate()) + + mvt.Run() }