From 9192ed91dfd1038d4b2881c3e6e483c8e1bf919b Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Mon, 3 Oct 2022 09:18:20 -0400 Subject: [PATCH] cdctest: remove retry logic from the fingerprint validator To handle mixed-version CDC tests, the fingerprint validator had previously been updated to handle retries internally; this complicates the validator as the retry logic is only ever used in the mixed-version roachtests. This commit removes the retry logic from the validator and instead allows the caller to pass a `DBFunc` to be called whenever a database connection is needed after initialization. By passing a custom DBFunc, tests that need it (such as the mixed-versions roachtest) can pass a function that accounts for temporary unavailability of nodes. Specifically, we pass a function that blocks while nodes are being upgraded, to simplify reasoning of this test's behavior. In order to support that, we also change the signature of `NewFingerprintValidator` to return the actual concrete validator (which is now made public) instead of the `Validator` interface, following the generally-good approach of "accept interfaces, return structs" in Go. Release note: None --- pkg/ccl/changefeedccl/cdctest/BUILD.bazel | 1 - pkg/ccl/changefeedccl/cdctest/nemeses.go | 2 +- pkg/ccl/changefeedccl/cdctest/validator.go | 95 +++++++++---------- .../changefeedccl/cdctest/validator_test.go | 22 ++--- pkg/cmd/roachtest/tests/cdc.go | 2 +- pkg/cmd/roachtest/tests/mixed_version_cdc.go | 72 ++++++++++++-- 6 files changed, 121 insertions(+), 73 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index bc2b9f8e376d..21273794bb0a 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/randutil", - "//pkg/util/retry", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index ca36ce99b908..bb1612fcd2fb 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -173,7 +173,7 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er if err != nil { return nil, err } - fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount, false) + fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 09c584f2f2c1..b3f532c1691c 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -15,11 +15,9 @@ import ( "fmt" "sort" "strings" - "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" ) @@ -65,8 +63,6 @@ var _ Validator = &orderValidator{} var _ Validator = &noOpValidator{} var _ StreamValidator = &orderValidator{} -var retryDuration = 2 * time.Minute - type noOpValidator struct{} // NoteRow accepts a changed row entry. @@ -323,16 +319,16 @@ type validatorRow struct { updated hlc.Timestamp } -// fingerprintValidator verifies that recreating a table from its changefeed +// FingerprintValidator verifies that recreating a table from its changefeed // will fingerprint the same at all "interesting" points in time. -type fingerprintValidator struct { - sqlDB *gosql.DB +type FingerprintValidator struct { + sqlDBFunc func(func(*gosql.DB) error) error origTable, fprintTable string primaryKeyCols []string partitionResolved map[string]hlc.Timestamp resolved hlc.Timestamp // It's possible to get a resolved timestamp from before the table even - // exists, which is valid but complicates the way fingerprintValidator works. + // exists, which is valid but complicates the way FingerprintValidator works. // Don't create a fingerprint earlier than the first seen row. firstRowTimestamp hlc.Timestamp // previousRowUpdateTs keeps track of the timestamp of the most recently processed row @@ -347,14 +343,18 @@ type fingerprintValidator struct { fprintTestColumns int buffer []validatorRow - // shouldRetry indicates whether row updates should be retried (for - // a fixed duration). Typically used when the transient errors are - // expected (e.g., if performing an upgrade) - shouldRetry bool - failures []string } +// defaultSQLDBFunc is the default function passed the FingerprintValidator's +// `sqlDBFunc`. It is sufficient in cases when the database is not expected to +// fail while the validator is using it. +func defaultSQLDBFunc(db *gosql.DB) func(func(*gosql.DB) error) error { + return func(f func(*gosql.DB) error) error { + return f(db) + } +} + // NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as // scratch space to recreate `origTable`. `fprintTable` must exist before calling this // constructor. `maxTestColumnCount` indicates the maximum number of columns that can be @@ -362,23 +362,19 @@ type fingerprintValidator struct { // will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to // accommodate schema changes on the fly. func NewFingerprintValidator( - sqlDB *gosql.DB, - origTable, fprintTable string, - partitions []string, - maxTestColumnCount int, - shouldRetry bool, -) (Validator, error) { + db *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int, +) (*FingerprintValidator, error) { // Fetch the primary keys though information_schema schema inspections so we // can use them to construct the SQL for DELETEs and also so we can verify // that the key in a message matches what's expected for the value. - primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, fprintTable) + primaryKeyCols, err := fetchPrimaryKeyCols(db, fprintTable) if err != nil { return nil, err } // Record the non-test%d columns in `fprint`. var fprintOrigColumns int - if err := sqlDB.QueryRow(` + if err := db.QueryRow(` SELECT count(column_name) FROM information_schema.columns WHERE table_name=$1 @@ -396,19 +392,19 @@ func NewFingerprintValidator( } fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i) } - if _, err := sqlDB.Exec(addColumnStmt.String()); err != nil { + _, err = db.Exec(addColumnStmt.String()) + if err != nil { return nil, err } } - v := &fingerprintValidator{ - sqlDB: sqlDB, + v := &FingerprintValidator{ + sqlDBFunc: defaultSQLDBFunc(db), origTable: origTable, fprintTable: fprintTable, primaryKeyCols: primaryKeyCols, fprintOrigColumns: fprintOrigColumns, fprintTestColumns: maxTestColumnCount, - shouldRetry: shouldRetry, } v.partitionResolved = make(map[string]hlc.Timestamp) for _, partition := range partitions { @@ -417,8 +413,18 @@ func NewFingerprintValidator( return v, nil } +// DBFunc sets the database function used when the validator needs to +// perform database operations (updating the scratch table, computing +// fingerprints, etc) +func (v *FingerprintValidator) DBFunc( + dbFunc func(func(*gosql.DB) error) error, +) *FingerprintValidator { + v.sqlDBFunc = dbFunc + return v +} + // NoteRow implements the Validator interface. -func (v *fingerprintValidator) NoteRow( +func (v *FingerprintValidator) NoteRow( ignoredPartition string, key, value string, updated hlc.Timestamp, ) error { if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) { @@ -433,9 +439,9 @@ func (v *fingerprintValidator) NoteRow( } // applyRowUpdate applies the update represented by `row` to the scratch table. -func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { +func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { defer func() { - _err = errors.Wrap(_err, "fingerprintValidator failed") + _err = errors.Wrap(_err, "FingerprintValidator failed") }() var args []interface{} @@ -512,14 +518,14 @@ func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { } } - return v.maybeRetry(func() error { - _, err := v.sqlDB.Exec(stmtBuf.String(), args...) + return v.sqlDBFunc(func(db *gosql.DB) error { + _, err := db.Exec(stmtBuf.String(), args...) return err }) } // NoteResolved implements the Validator interface. -func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { +func (v *FingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { if r, ok := v.partitionResolved[partition]; !ok { return errors.Errorf(`unknown partition: %s`, partition) } else if resolved.LessEq(r) { @@ -557,8 +563,8 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times } row := v.buffer[0] // NOTE: changes to the validator's state before `applyRowUpdate` - // are safe because, if the operation can fail, the caller should - // be setting the `shouldRetry` field accordingly + // are safe because if database calls can fail, they should be + // retried by passing a custom function to DBFunction v.buffer = v.buffer[1:] // If we've processed all row updates belonging to the previous row's timestamp, @@ -592,18 +598,18 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times return nil } -func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error { +func (v *FingerprintValidator) fingerprint(ts hlc.Timestamp) error { var orig string - if err := v.maybeRetry(func() error { - return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ + if err := v.sqlDBFunc(func(db *gosql.DB) error { + return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.origTable + ` ] AS OF SYSTEM TIME '` + ts.AsOfSystemTime() + `'`).Scan(&orig) }); err != nil { return err } var check string - if err := v.maybeRetry(func() error { - return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ + if err := v.sqlDBFunc(func(db *gosql.DB) error { + return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.fprintTable + ` ]`).Scan(&check) }); err != nil { @@ -617,21 +623,10 @@ func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error { } // Failures implements the Validator interface. -func (v *fingerprintValidator) Failures() []string { +func (v *FingerprintValidator) Failures() []string { return v.failures } -// maybeRetry will retry the function passed if the fingerprint was -// created with `shouldRetry` set to `true`. Every access to `sqlDB` -// should be made my closures passed to this function -func (v *fingerprintValidator) maybeRetry(f func() error) error { - if v.shouldRetry { - return retry.ForDuration(retryDuration, f) - } - - return f() -} - // Validators abstracts over running multiple `Validator`s at once on the same // feed. type Validators []Validator diff --git a/pkg/ccl/changefeedccl/cdctest/validator_test.go b/pkg/ccl/changefeedccl/cdctest/validator_test.go index f70617c197ea..4589518b96da 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator_test.go +++ b/pkg/ccl/changefeedccl/cdctest/validator_test.go @@ -292,14 +292,14 @@ func TestFingerprintValidator(t *testing.T) { t.Run(`empty`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`empty`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) assertValidatorFailures(t, v) }) t.Run(`wrong data`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`wrong_data`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":10}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -310,7 +310,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`all resolved`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`all_resolved`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`p`, ts[0]); err != nil { t.Fatal(err) @@ -329,7 +329,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`rows unsorted`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`rows_unsorted`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) @@ -341,7 +341,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`missed initial`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`missed_initial`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // Intentionally missing {"k":1,"v":1} at ts[1]. @@ -357,7 +357,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`missed middle`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`missed_middle`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) @@ -375,7 +375,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`missed end`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`missed_end`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) @@ -390,7 +390,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`initial scan`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`initial_scan`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) noteRow(t, v, ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[3]) @@ -399,7 +399,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`unknown partition`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`unknown_partition`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`nope`, ts[1]); !testutils.IsError(err, `unknown partition`) { t.Fatalf(`expected "unknown partition" error got: %+v`, err) @@ -407,7 +407,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`resolved unsorted`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`resolved_unsorted`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -417,7 +417,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`two partitions`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`two_partitions`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index f793ebc929c3..949cd7dd4a17 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -417,7 +417,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { return errors.Wrap(err, "CREATE TABLE failed") } - fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0, false) + fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0) if err != nil { return errors.Wrap(err, "error creating validator") } diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 0731fe2b6424..00a5340cecf5 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -12,6 +12,7 @@ package tests import ( "context" + gosql "database/sql" "fmt" "strconv" "strings" @@ -74,9 +75,13 @@ type cdcMixedVersionTester struct { syncutil.Mutex C chan struct{} } - kafka kafkaManager - validator *cdctest.CountValidator - cleanup func() + crdbUpgrading syncutil.Mutex + kafka kafkaManager + changefeedJobID int + validator *cdctest.CountValidator + validatorDone chan struct{} // validator is no longer waiting for messages + validatorStop bool // used to tell the validator to stop validating messages + cleanup func() } func newCDCMixedVersionTester( @@ -94,6 +99,7 @@ func newCDCMixedVersionTester( workloadNodes: lastNode, kafkaNodes: lastNode, monitor: c.NewMonitor(ctx, crdbNodes), + validatorDone: make(chan struct{}), } } @@ -165,6 +171,17 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { } } +// waitForValidator sets the `validatorStop` flag and waits for the +// validator to finish. This should be done right before the test +// finishes, to ensure that we won't try to close the database +// connection while the validator is still trying to use it +func (cmvt *cdcMixedVersionTester) waitForValidator() versionStep { + return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + cmvt.validatorStop = true + <-cmvt.validatorDone + } +} + // setupVerifier creates a CDC validator to validate that a changefeed // created on the `target` table is able to re-create the table // somewhere else. It also verifies CDC's ordering guarantees. This @@ -192,17 +209,19 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { t.Fatal(err) } - fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0, true) + getConn := func(node int) *gosql.DB { return u.conn(ctx, t, node) } + fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0) if err != nil { t.Fatal(err) } + fprintV.DBFunc(cmvt.cdcDBConn(getConn)) validators := cdctest.Validators{ cdctest.NewOrderValidator(tableName), fprintV, } cmvt.validator = cdctest.MakeCountValidator(validators) - for { + for !cmvt.validatorStop { m := consumer.Next(ctx) if m == nil { t.L().Printf("end of changefeed") @@ -229,6 +248,9 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { cmvt.timestampResolved() } } + + close(cmvt.validatorDone) + return nil }) } } @@ -244,6 +266,35 @@ func (cmvt *cdcMixedVersionTester) timestampResolved() { } } +// cdcDBConn is the wrapper passed to the FingerprintValidator. The +// goal is to ensure that database checks by the validator do not +// happen while we are running an upgrade. We used to retry database +// calls in the validator, but that logic adds complexity and does not +// help in testing the changefeed's correctness +func (cmvt *cdcMixedVersionTester) cdcDBConn( + getConn func(int) *gosql.DB, +) func(func(*gosql.DB) error) error { + return func(f func(*gosql.DB) error) error { + cmvt.crdbUpgrading.Lock() + defer cmvt.crdbUpgrading.Unlock() + + node := cmvt.crdbNodes.RandNode()[0] + return f(getConn(node)) + } +} + +// crdbUpgradeStep is a wrapper to steps that upgrade the cockroach +// binary running in the cluster. It makes sure we hold exclusive +// access to the `crdbUpgrading` lock while the upgrade is in process +func (cmvt *cdcMixedVersionTester) crdbUpgradeStep(step versionStep) versionStep { + return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + cmvt.crdbUpgrading.Lock() + defer cmvt.crdbUpgrading.Unlock() + + step(ctx, t, u) + } +} + // assertValid checks if the validator has found any issues at the // time the function is called. func (cmvt *cdcMixedVersionTester) assertValid() versionStep { @@ -265,12 +316,14 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep { {"updated", ""}, {"resolved", fmt.Sprintf("'%s'", resolvedInterval)}, } - _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). + jobID, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). With(options...). Create() if err != nil { t.Fatal(err) } + + cmvt.changefeedJobID = jobID } } @@ -312,7 +365,7 @@ func runCDCMixedVersions( tester.waitForResolvedTimestamps(), // Roll the nodes into the new version one by one in random order - binaryUpgradeStep(tester.crdbNodes, mainVersion), + tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, mainVersion)), // let the workload run in the new version for a while tester.waitForResolvedTimestamps(), @@ -320,19 +373,20 @@ func runCDCMixedVersions( // Roll back again, which ought to be fine because the cluster upgrade was // not finalized. - binaryUpgradeStep(tester.crdbNodes, predecessorVersion), + tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)), tester.waitForResolvedTimestamps(), tester.assertValid(), // Roll nodes forward and finalize upgrade. - binaryUpgradeStep(tester.crdbNodes, mainVersion), + tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, mainVersion)), // allow cluster version to update allowAutoUpgradeStep(sqlNode()), waitForUpgradeStep(tester.crdbNodes), tester.waitForResolvedTimestamps(), + tester.waitForValidator(), tester.assertValid(), ).run(ctx, t) }