diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index bc2b9f8e376d..21273794bb0a 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/randutil", - "//pkg/util/retry", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index ca36ce99b908..bb1612fcd2fb 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -173,7 +173,7 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er if err != nil { return nil, err } - fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount, false) + fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 09c584f2f2c1..45997d0adccc 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -15,11 +15,9 @@ import ( "fmt" "sort" "strings" - "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" ) @@ -65,8 +63,6 @@ var _ Validator = &orderValidator{} var _ Validator = &noOpValidator{} var _ StreamValidator = &orderValidator{} -var retryDuration = 2 * time.Minute - type noOpValidator struct{} // NoteRow accepts a changed row entry. @@ -323,16 +319,23 @@ type validatorRow struct { updated hlc.Timestamp } -// fingerprintValidator verifies that recreating a table from its changefeed +// eventKey returns a key that encodes the key and timestamp of a row +// received from a changefeed. Can be used to keep track of which +// updates have been seen before in a validator. +func (row validatorRow) eventKey() string { + return fmt.Sprintf("%s|%s", row.key, row.updated.AsOfSystemTime()) +} + +// FingerprintValidator verifies that recreating a table from its changefeed // will fingerprint the same at all "interesting" points in time. -type fingerprintValidator struct { - sqlDB *gosql.DB +type FingerprintValidator struct { + sqlDBFunc func(func(*gosql.DB) error) error origTable, fprintTable string primaryKeyCols []string partitionResolved map[string]hlc.Timestamp resolved hlc.Timestamp // It's possible to get a resolved timestamp from before the table even - // exists, which is valid but complicates the way fingerprintValidator works. + // exists, which is valid but complicates the way FingerprintValidator works. // Don't create a fingerprint earlier than the first seen row. firstRowTimestamp hlc.Timestamp // previousRowUpdateTs keeps track of the timestamp of the most recently processed row @@ -346,15 +349,20 @@ type fingerprintValidator struct { fprintOrigColumns int fprintTestColumns int buffer []validatorRow - - // shouldRetry indicates whether row updates should be retried (for - // a fixed duration). Typically used when the transient errors are - // expected (e.g., if performing an upgrade) - shouldRetry bool + previouslySeen map[string]struct{} failures []string } +// defaultSQLDBFunc is the default function passed the FingerprintValidator's +// `sqlDBFunc`. It is sufficient in cases when the database is not expected to +// fail while the validator is using it. +func defaultSQLDBFunc(db *gosql.DB) func(func(*gosql.DB) error) error { + return func(f func(*gosql.DB) error) error { + return f(db) + } +} + // NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as // scratch space to recreate `origTable`. `fprintTable` must exist before calling this // constructor. `maxTestColumnCount` indicates the maximum number of columns that can be @@ -362,23 +370,19 @@ type fingerprintValidator struct { // will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to // accommodate schema changes on the fly. func NewFingerprintValidator( - sqlDB *gosql.DB, - origTable, fprintTable string, - partitions []string, - maxTestColumnCount int, - shouldRetry bool, -) (Validator, error) { + db *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int, +) (*FingerprintValidator, error) { // Fetch the primary keys though information_schema schema inspections so we // can use them to construct the SQL for DELETEs and also so we can verify // that the key in a message matches what's expected for the value. - primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, fprintTable) + primaryKeyCols, err := fetchPrimaryKeyCols(db, fprintTable) if err != nil { return nil, err } // Record the non-test%d columns in `fprint`. var fprintOrigColumns int - if err := sqlDB.QueryRow(` + if err := db.QueryRow(` SELECT count(column_name) FROM information_schema.columns WHERE table_name=$1 @@ -396,19 +400,19 @@ func NewFingerprintValidator( } fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i) } - if _, err := sqlDB.Exec(addColumnStmt.String()); err != nil { + _, err = db.Exec(addColumnStmt.String()) + if err != nil { return nil, err } } - v := &fingerprintValidator{ - sqlDB: sqlDB, + v := &FingerprintValidator{ + sqlDBFunc: defaultSQLDBFunc(db), origTable: origTable, fprintTable: fprintTable, primaryKeyCols: primaryKeyCols, fprintOrigColumns: fprintOrigColumns, fprintTestColumns: maxTestColumnCount, - shouldRetry: shouldRetry, } v.partitionResolved = make(map[string]hlc.Timestamp) for _, partition := range partitions { @@ -417,25 +421,54 @@ func NewFingerprintValidator( return v, nil } +// DBFunc sets the database function used when the validator needs to +// perform database operations (updating the scratch table, computing +// fingerprints, etc) +func (v *FingerprintValidator) DBFunc( + dbFunc func(func(*gosql.DB) error) error, +) *FingerprintValidator { + v.sqlDBFunc = dbFunc + return v +} + +// ValidateDuplicatedEvents enables the validation of duplicated +// messages in the fingerprint validator. Whenever a row is received +// with a timestamp lower than the last `resolved` timestamp seen, we +// verify that the event has been seen before (if it hasn't, that +// would be a violation of the changefeed guarantees) +func (v *FingerprintValidator) ValidateDuplicatedEvents() *FingerprintValidator { + v.previouslySeen = make(map[string]struct{}) + return v +} + // NoteRow implements the Validator interface. -func (v *fingerprintValidator) NoteRow( +func (v *FingerprintValidator) NoteRow( ignoredPartition string, key, value string, updated hlc.Timestamp, ) error { if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) { v.firstRowTimestamp = updated } - v.buffer = append(v.buffer, validatorRow{ - key: key, - value: value, - updated: updated, - }) + + row := validatorRow{key: key, value: value, updated: updated} + if err := v.maybeValidateDuplicatedEvent(row); err != nil { + return err + } + + // if this row's timestamp is earlier than the last resolved + // timestamp we processed, we can skip it as it is a duplicate + if row.updated.Less(v.resolved) { + return nil + } + + v.buffer = append(v.buffer, row) + v.maybeAddSeenEvent(row) return nil } // applyRowUpdate applies the update represented by `row` to the scratch table. -func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { +func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { defer func() { - _err = errors.Wrap(_err, "fingerprintValidator failed") + _err = errors.Wrap(_err, "FingerprintValidator failed") }() var args []interface{} @@ -512,14 +545,14 @@ func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { } } - return v.maybeRetry(func() error { - _, err := v.sqlDB.Exec(stmtBuf.String(), args...) + return v.sqlDBFunc(func(db *gosql.DB) error { + _, err := db.Exec(stmtBuf.String(), args...) return err }) } // NoteResolved implements the Validator interface. -func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { +func (v *FingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { if r, ok := v.partitionResolved[partition]; !ok { return errors.Errorf(`unknown partition: %s`, partition) } else if resolved.LessEq(r) { @@ -557,8 +590,8 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times } row := v.buffer[0] // NOTE: changes to the validator's state before `applyRowUpdate` - // are safe because, if the operation can fail, the caller should - // be setting the `shouldRetry` field accordingly + // are safe because if database calls can fail, they should be + // retried by passing a custom function to DBFunction v.buffer = v.buffer[1:] // If we've processed all row updates belonging to the previous row's timestamp, @@ -592,18 +625,54 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times return nil } -func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error { +// maybeAddSeenEvent is a no-op if the caller did not call +// ValidateDuplicatedEvents. Otherwise, we keep a reference to the row +// key and MVCC timestamp for later validation +func (v *FingerprintValidator) maybeAddSeenEvent(row validatorRow) { + if v.previouslySeen == nil { + return + } + + v.previouslySeen[row.eventKey()] = struct{}{} +} + +// maybeValidateDuplicatedEvent is a no-op if the caller did not call +// ValidateDuplicatedEvents. Otherwise, it returns an error if the +// row's timestamp is earlier than the validator's `resolved` +// timestamp *and* it has not been seen before; that would be a +// violation of the changefeed's guarantees. +func (v *FingerprintValidator) maybeValidateDuplicatedEvent(row validatorRow) error { + if v.previouslySeen == nil { + return nil + } + + // row's timestamp is after the last resolved timestamp; no problem + if v.resolved.LessEq(row.updated) { + return nil + } + + // row's timestamp is earlier than resolved timestamp *and* it + // hasn't been seen before; that shouldn't happen + if _, seen := v.previouslySeen[row.eventKey()]; !seen { + return fmt.Errorf("unexpected out-of-order event at timestamp %s prior to resolved timestamp %s", + row.updated.AsOfSystemTime(), v.resolved.AsOfSystemTime()) + } + + return nil +} + +func (v *FingerprintValidator) fingerprint(ts hlc.Timestamp) error { var orig string - if err := v.maybeRetry(func() error { - return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ + if err := v.sqlDBFunc(func(db *gosql.DB) error { + return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.origTable + ` ] AS OF SYSTEM TIME '` + ts.AsOfSystemTime() + `'`).Scan(&orig) }); err != nil { return err } var check string - if err := v.maybeRetry(func() error { - return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ + if err := v.sqlDBFunc(func(db *gosql.DB) error { + return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [ SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.fprintTable + ` ]`).Scan(&check) }); err != nil { @@ -617,21 +686,10 @@ func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error { } // Failures implements the Validator interface. -func (v *fingerprintValidator) Failures() []string { +func (v *FingerprintValidator) Failures() []string { return v.failures } -// maybeRetry will retry the function passed if the fingerprint was -// created with `shouldRetry` set to `true`. Every access to `sqlDB` -// should be made my closures passed to this function -func (v *fingerprintValidator) maybeRetry(f func() error) error { - if v.shouldRetry { - return retry.ForDuration(retryDuration, f) - } - - return f() -} - // Validators abstracts over running multiple `Validator`s at once on the same // feed. type Validators []Validator diff --git a/pkg/ccl/changefeedccl/cdctest/validator_test.go b/pkg/ccl/changefeedccl/cdctest/validator_test.go index f70617c197ea..4589518b96da 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator_test.go +++ b/pkg/ccl/changefeedccl/cdctest/validator_test.go @@ -292,14 +292,14 @@ func TestFingerprintValidator(t *testing.T) { t.Run(`empty`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`empty`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) assertValidatorFailures(t, v) }) t.Run(`wrong data`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`wrong_data`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":10}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -310,7 +310,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`all resolved`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`all_resolved`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`p`, ts[0]); err != nil { t.Fatal(err) @@ -329,7 +329,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`rows unsorted`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`rows_unsorted`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) @@ -341,7 +341,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`missed initial`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`missed_initial`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // Intentionally missing {"k":1,"v":1} at ts[1]. @@ -357,7 +357,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`missed middle`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`missed_middle`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) @@ -375,7 +375,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`missed end`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`missed_end`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) @@ -390,7 +390,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`initial scan`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`initial_scan`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) noteRow(t, v, ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[3]) @@ -399,7 +399,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`unknown partition`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`unknown_partition`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`nope`, ts[1]); !testutils.IsError(err, `unknown partition`) { t.Fatalf(`expected "unknown partition" error got: %+v`, err) @@ -407,7 +407,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`resolved unsorted`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`resolved_unsorted`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -417,7 +417,7 @@ func TestFingerprintValidator(t *testing.T) { }) t.Run(`two partitions`, func(t *testing.T) { sqlDB.Exec(t, createTableStmt(`two_partitions`)) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns, false) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns) require.NoError(t, err) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index f793ebc929c3..949cd7dd4a17 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -417,7 +417,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { return errors.Wrap(err, "CREATE TABLE failed") } - fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0, false) + fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0) if err != nil { return errors.Wrap(err, "error creating validator") } diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 0731fe2b6424..29c9c774c20c 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -12,6 +12,7 @@ package tests import ( "context" + gosql "database/sql" "fmt" "strconv" "strings" @@ -45,6 +46,15 @@ var ( targetTable = "bank" timeout = 30 * time.Minute + // set a fixed number of operations to be performed by the + // workload. Since we are validating events emitted by the + // changefeed in this test (ValidateDuplicatedEvents() call), + // enforcing a maximum number of operations sets a boundary on the + // validator's memory usage. The current value represents enough + // operations to allow for the validator to receive the desired + // amount of resolved events at different points of the upgrade + // process while staying within the current timeout. + maxOps = 12000 ) func registerCDCMixedVersions(r registry.Registry) { @@ -74,9 +84,11 @@ type cdcMixedVersionTester struct { syncutil.Mutex C chan struct{} } - kafka kafkaManager - validator *cdctest.CountValidator - cleanup func() + crdbUpgrading syncutil.Mutex + kafka kafkaManager + validator *cdctest.CountValidator + workloadDone bool + cleanup func() } func newCDCMixedVersionTester( @@ -121,10 +133,12 @@ func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep { t.Status("installing and running workload") u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}") cmvt.monitor.Go(func(ctx context.Context) error { + defer func() { cmvt.workloadDone = true }() return u.c.RunE( ctx, cmvt.workloadNodes, - fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", cmvt.crdbNodes), + fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --max-ops %d --tolerate-errors", + cmvt.crdbNodes, maxOps), ) }) } @@ -165,6 +179,15 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { } } +// waitForWorkload waits for the workload to finish +func (cmvt *cdcMixedVersionTester) waitForWorkload() versionStep { + return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + t.L().Printf("waiting for workload to finish...") + cmvt.monitor.Wait() + t.L().Printf("workload finished") + } +} + // setupVerifier creates a CDC validator to validate that a changefeed // created on the `target` table is able to re-create the table // somewhere else. It also verifies CDC's ordering guarantees. This @@ -192,20 +215,22 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { t.Fatal(err) } - fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0, true) + getConn := func(node int) *gosql.DB { return u.conn(ctx, t, node) } + fprintV, err := cdctest.NewFingerprintValidator(db, tableName, `fprint`, consumer.partitions, 0) if err != nil { t.Fatal(err) } + fprintV.DBFunc(cmvt.cdcDBConn(getConn)).ValidateDuplicatedEvents() validators := cdctest.Validators{ cdctest.NewOrderValidator(tableName), fprintV, } cmvt.validator = cdctest.MakeCountValidator(validators) - for { + for !cmvt.workloadDone { m := consumer.Next(ctx) if m == nil { - t.L().Printf("end of changefeed") + t.Fatal("unexpected end of changefeed") return nil } @@ -229,6 +254,7 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { cmvt.timestampResolved() } } + return nil }) } } @@ -244,6 +270,35 @@ func (cmvt *cdcMixedVersionTester) timestampResolved() { } } +// cdcDBConn is the wrapper passed to the FingerprintValidator. The +// goal is to ensure that database checks by the validator do not +// happen while we are running an upgrade. We used to retry database +// calls in the validator, but that logic adds complexity and does not +// help in testing the changefeed's correctness +func (cmvt *cdcMixedVersionTester) cdcDBConn( + getConn func(int) *gosql.DB, +) func(func(*gosql.DB) error) error { + return func(f func(*gosql.DB) error) error { + cmvt.crdbUpgrading.Lock() + defer cmvt.crdbUpgrading.Unlock() + + node := cmvt.crdbNodes.RandNode()[0] + return f(getConn(node)) + } +} + +// crdbUpgradeStep is a wrapper to steps that upgrade the cockroach +// binary running in the cluster. It makes sure we hold exclusive +// access to the `crdbUpgrading` lock while the upgrade is in process +func (cmvt *cdcMixedVersionTester) crdbUpgradeStep(step versionStep) versionStep { + return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + cmvt.crdbUpgrading.Lock() + defer cmvt.crdbUpgrading.Unlock() + + step(ctx, t, u) + } +} + // assertValid checks if the validator has found any issues at the // time the function is called. func (cmvt *cdcMixedVersionTester) assertValid() versionStep { @@ -312,7 +367,7 @@ func runCDCMixedVersions( tester.waitForResolvedTimestamps(), // Roll the nodes into the new version one by one in random order - binaryUpgradeStep(tester.crdbNodes, mainVersion), + tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, mainVersion)), // let the workload run in the new version for a while tester.waitForResolvedTimestamps(), @@ -320,19 +375,20 @@ func runCDCMixedVersions( // Roll back again, which ought to be fine because the cluster upgrade was // not finalized. - binaryUpgradeStep(tester.crdbNodes, predecessorVersion), + tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)), tester.waitForResolvedTimestamps(), tester.assertValid(), // Roll nodes forward and finalize upgrade. - binaryUpgradeStep(tester.crdbNodes, mainVersion), + tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, mainVersion)), // allow cluster version to update allowAutoUpgradeStep(sqlNode()), waitForUpgradeStep(tester.crdbNodes), tester.waitForResolvedTimestamps(), + tester.waitForWorkload(), tester.assertValid(), ).run(ctx, t) }