diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index b3f532c1691c..45997d0adccc 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -319,6 +319,13 @@ type validatorRow struct { updated hlc.Timestamp } +// eventKey returns a key that encodes the key and timestamp of a row +// received from a changefeed. Can be used to keep track of which +// updates have been seen before in a validator. +func (row validatorRow) eventKey() string { + return fmt.Sprintf("%s|%s", row.key, row.updated.AsOfSystemTime()) +} + // FingerprintValidator verifies that recreating a table from its changefeed // will fingerprint the same at all "interesting" points in time. type FingerprintValidator struct { @@ -342,6 +349,7 @@ type FingerprintValidator struct { fprintOrigColumns int fprintTestColumns int buffer []validatorRow + previouslySeen map[string]struct{} failures []string } @@ -423,6 +431,16 @@ func (v *FingerprintValidator) DBFunc( return v } +// ValidateDuplicatedEvents enables the validation of duplicated +// messages in the fingerprint validator. Whenever a row is received +// with a timestamp lower than the last `resolved` timestamp seen, we +// verify that the event has been seen before (if it hasn't, that +// would be a violation of the changefeed guarantees) +func (v *FingerprintValidator) ValidateDuplicatedEvents() *FingerprintValidator { + v.previouslySeen = make(map[string]struct{}) + return v +} + // NoteRow implements the Validator interface. func (v *FingerprintValidator) NoteRow( ignoredPartition string, key, value string, updated hlc.Timestamp, @@ -430,11 +448,20 @@ func (v *FingerprintValidator) NoteRow( if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) { v.firstRowTimestamp = updated } - v.buffer = append(v.buffer, validatorRow{ - key: key, - value: value, - updated: updated, - }) + + row := validatorRow{key: key, value: value, updated: updated} + if err := v.maybeValidateDuplicatedEvent(row); err != nil { + return err + } + + // if this row's timestamp is earlier than the last resolved + // timestamp we processed, we can skip it as it is a duplicate + if row.updated.Less(v.resolved) { + return nil + } + + v.buffer = append(v.buffer, row) + v.maybeAddSeenEvent(row) return nil } @@ -598,6 +625,42 @@ func (v *FingerprintValidator) NoteResolved(partition string, resolved hlc.Times return nil } +// maybeAddSeenEvent is a no-op if the caller did not call +// ValidateDuplicatedEvents. Otherwise, we keep a reference to the row +// key and MVCC timestamp for later validation +func (v *FingerprintValidator) maybeAddSeenEvent(row validatorRow) { + if v.previouslySeen == nil { + return + } + + v.previouslySeen[row.eventKey()] = struct{}{} +} + +// maybeValidateDuplicatedEvent is a no-op if the caller did not call +// ValidateDuplicatedEvents. Otherwise, it returns an error if the +// row's timestamp is earlier than the validator's `resolved` +// timestamp *and* it has not been seen before; that would be a +// violation of the changefeed's guarantees. +func (v *FingerprintValidator) maybeValidateDuplicatedEvent(row validatorRow) error { + if v.previouslySeen == nil { + return nil + } + + // row's timestamp is after the last resolved timestamp; no problem + if v.resolved.LessEq(row.updated) { + return nil + } + + // row's timestamp is earlier than resolved timestamp *and* it + // hasn't been seen before; that shouldn't happen + if _, seen := v.previouslySeen[row.eventKey()]; !seen { + return fmt.Errorf("unexpected out-of-order event at timestamp %s prior to resolved timestamp %s", + row.updated.AsOfSystemTime(), v.resolved.AsOfSystemTime()) + } + + return nil +} + func (v *FingerprintValidator) fingerprint(ts hlc.Timestamp) error { var orig string if err := v.sqlDBFunc(func(db *gosql.DB) error { diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index dcfe7b1b0f77..29c9c774c20c 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -46,6 +46,15 @@ var ( targetTable = "bank" timeout = 30 * time.Minute + // set a fixed number of operations to be performed by the + // workload. Since we are validating events emitted by the + // changefeed in this test (ValidateDuplicatedEvents() call), + // enforcing a maximum number of operations sets a boundary on the + // validator's memory usage. The current value represents enough + // operations to allow for the validator to receive the desired + // amount of resolved events at different points of the upgrade + // process while staying within the current timeout. + maxOps = 12000 ) func registerCDCMixedVersions(r registry.Registry) { @@ -78,8 +87,7 @@ type cdcMixedVersionTester struct { crdbUpgrading syncutil.Mutex kafka kafkaManager validator *cdctest.CountValidator - validatorDone chan struct{} // validator is no longer waiting for messages - validatorStop bool // used to tell the validator to stop validating messages + workloadDone bool cleanup func() } @@ -98,7 +106,6 @@ func newCDCMixedVersionTester( workloadNodes: lastNode, kafkaNodes: lastNode, monitor: c.NewMonitor(ctx, crdbNodes), - validatorDone: make(chan struct{}), } } @@ -126,10 +133,12 @@ func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep { t.Status("installing and running workload") u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}") cmvt.monitor.Go(func(ctx context.Context) error { + defer func() { cmvt.workloadDone = true }() return u.c.RunE( ctx, cmvt.workloadNodes, - fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", cmvt.crdbNodes), + fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --max-ops %d --tolerate-errors", + cmvt.crdbNodes, maxOps), ) }) } @@ -170,14 +179,12 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { } } -// waitForValidator sets the `validatorStop` flag and waits for the -// validator to finish. This should be done right before the test -// finishes, to ensure that we won't try to close the database -// connection while the validator is still trying to use it -func (cmvt *cdcMixedVersionTester) waitForValidator() versionStep { +// waitForWorkload waits for the workload to finish +func (cmvt *cdcMixedVersionTester) waitForWorkload() versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - cmvt.validatorStop = true - <-cmvt.validatorDone + t.L().Printf("waiting for workload to finish...") + cmvt.monitor.Wait() + t.L().Printf("workload finished") } } @@ -213,17 +220,17 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { if err != nil { t.Fatal(err) } - fprintV.DBFunc(cmvt.cdcDBConn(getConn)) + fprintV.DBFunc(cmvt.cdcDBConn(getConn)).ValidateDuplicatedEvents() validators := cdctest.Validators{ cdctest.NewOrderValidator(tableName), fprintV, } cmvt.validator = cdctest.MakeCountValidator(validators) - for !cmvt.validatorStop { + for !cmvt.workloadDone { m := consumer.Next(ctx) if m == nil { - t.L().Printf("end of changefeed") + t.Fatal("unexpected end of changefeed") return nil } @@ -247,8 +254,6 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { cmvt.timestampResolved() } } - - close(cmvt.validatorDone) return nil }) } @@ -383,7 +388,7 @@ func runCDCMixedVersions( waitForUpgradeStep(tester.crdbNodes), tester.waitForResolvedTimestamps(), - tester.waitForValidator(), + tester.waitForWorkload(), tester.assertValid(), ).run(ctx, t) }