From f4e74b848f05f0aa157f2b901c421e6a3f779c11 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Mon, 3 Oct 2022 14:03:20 -0400 Subject: [PATCH] roachtest: skip duplicated events in fingerprint validator This commit updates the fingerprint validator (and its use in the `cdc/mixed-versions` test) to ignore duplicated events received by the validator. A previously implicit assumption of the validator is that any events that it receives are either not duplicated, or -- if they are duplicated -- they are within the previous resolved timestamp and the current resolved timestamp. However, that assumption is not justified by the changefeed guarantees and depends on how frequently `resolved` events are emitted and how often the changefeed checkpoints. In the specific case of the `cdc/mixed-versions` roachtest, it was possible for the changefeed to start from an old checkpoint (older than the last received `resolved` timestamp), causing it to re-emit old events that happened way before the previously observed resolved event. As a consequence, when the validator applies the update associated with that event, there is a mismatch with state of the original table as of the update's timestamp, as the fingerprint validator relies on the fact that updates are applied in order. To fix the issue, we now skip events that happen before the timestamp of the previous `resolved` event received. In addition, the caller can also tell the validator to verify that such out-of-order messages received by the validator have indeed been previously seen; if not, that would represent a violation of the changefeed's guarantees. Fixes: #87251. Release note: None --- pkg/ccl/changefeedccl/cdctest/validator.go | 70 ++++++++++++++++++-- pkg/cmd/roachtest/tests/mixed_version_cdc.go | 34 +++++----- 2 files changed, 83 insertions(+), 21 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index b3f532c1691c..46eeea303624 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -319,6 +319,13 @@ type validatorRow struct { updated hlc.Timestamp } +// eventKey returns a key that encodes the key and timestamp of a row +// received from a changefeed. Can be used to keep track of which +// updates have been seen before in a validator. +func (row validatorRow) eventKey() string { + return fmt.Sprintf("%s|%s", row.key, row.updated.AsOfSystemTime()) +} + // FingerprintValidator verifies that recreating a table from its changefeed // will fingerprint the same at all "interesting" points in time. type FingerprintValidator struct { @@ -342,6 +349,7 @@ type FingerprintValidator struct { fprintOrigColumns int fprintTestColumns int buffer []validatorRow + previouslySeen map[string]struct{} failures []string } @@ -423,6 +431,16 @@ func (v *FingerprintValidator) DBFunc( return v } +// ValidateDuplicatedEvents enables the validation of duplicated +// messages in the fingerprint validator. Whenever a row is received +// with a timestamp lower than the last `resolved` timestamp seen, we +// verify that the event has been seen before (if it hasn't, that +// would be a violation of the changefeed guarantees) +func (v *FingerprintValidator) ValidateDuplicatedEvents() *FingerprintValidator { + v.previouslySeen = make(map[string]struct{}) + return v +} + // NoteRow implements the Validator interface. func (v *FingerprintValidator) NoteRow( ignoredPartition string, key, value string, updated hlc.Timestamp, @@ -430,11 +448,20 @@ func (v *FingerprintValidator) NoteRow( if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) { v.firstRowTimestamp = updated } - v.buffer = append(v.buffer, validatorRow{ - key: key, - value: value, - updated: updated, - }) + + row := validatorRow{key: key, value: value, updated: updated} + if err := v.maybeValidateDuplicatedEvent(row); err != nil { + return err + } + + // if this row's timestamp is earlier than the last resolved + // timestamp we processed, we can skip it as it is a duplicate + if row.updated.Less(v.resolved) { + return nil + } + + v.buffer = append(v.buffer, row) + v.maybeAddSeenEvent(row) return nil } @@ -598,6 +625,39 @@ func (v *FingerprintValidator) NoteResolved(partition string, resolved hlc.Times return nil } +// maybeAddSeenEvent is a no-op if the caller did not call +// ValidateDuplicatedEvents. Otherwise, we keep a reference to the row +// key and MVCC timestamp for later validation +func (v *FingerprintValidator) maybeAddSeenEvent(row validatorRow) { + if v.previouslySeen == nil { + return + } + + v.previouslySeen[row.eventKey()] = struct{}{} +} + +// maybeValidateDuplicatedEvent is a no-op if the caller did not call +// ValidateDuplicatedEvents. Otherwise, it returns an error if the +// row's timestamp is older than the `previousResolved` timestamp +// passed *and* it has not been seen before; that would be a violation +// of the changefeed's guarantees. +func (v *FingerprintValidator) maybeValidateDuplicatedEvent(row validatorRow) error { + if v.previouslySeen == nil { + return nil + } + + if v.resolved.LessEq(row.updated) { + return nil + } + + if _, seen := v.previouslySeen[row.eventKey()]; !seen { + return fmt.Errorf("unexpected new event at timestamp %s after resolved timestamp %s", + row.updated.AsOfSystemTime(), v.resolved.AsOfSystemTime()) + } + + return nil +} + func (v *FingerprintValidator) fingerprint(ts hlc.Timestamp) error { var orig string if err := v.sqlDBFunc(func(db *gosql.DB) error { diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 00a5340cecf5..4bc931159c80 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -46,6 +46,12 @@ var ( targetTable = "bank" timeout = 30 * time.Minute + // set a fixed number of operations to be performed by the + // workload. Since we are validating events emitted by the + // changefeed in this test (ValidateDuplicatedEvents() call), + // enforcing a maximum number of operations sets a boundary on the + // validator's memory usage + maxOps = 12000 ) func registerCDCMixedVersions(r registry.Registry) { @@ -79,8 +85,7 @@ type cdcMixedVersionTester struct { kafka kafkaManager changefeedJobID int validator *cdctest.CountValidator - validatorDone chan struct{} // validator is no longer waiting for messages - validatorStop bool // used to tell the validator to stop validating messages + workloadDone bool cleanup func() } @@ -99,7 +104,6 @@ func newCDCMixedVersionTester( workloadNodes: lastNode, kafkaNodes: lastNode, monitor: c.NewMonitor(ctx, crdbNodes), - validatorDone: make(chan struct{}), } } @@ -127,10 +131,12 @@ func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep { t.Status("installing and running workload") u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}") cmvt.monitor.Go(func(ctx context.Context) error { + defer func() { cmvt.workloadDone = true }() return u.c.RunE( ctx, cmvt.workloadNodes, - fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", cmvt.crdbNodes), + fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --max-ops %d --tolerate-errors", + cmvt.crdbNodes, maxOps), ) }) } @@ -171,14 +177,12 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { } } -// waitForValidator sets the `validatorStop` flag and waits for the -// validator to finish. This should be done right before the test -// finishes, to ensure that we won't try to close the database -// connection while the validator is still trying to use it -func (cmvt *cdcMixedVersionTester) waitForValidator() versionStep { +// waitForWorkload waits for the workload to finish +func (cmvt *cdcMixedVersionTester) waitForWorkload() versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - cmvt.validatorStop = true - <-cmvt.validatorDone + t.L().Printf("waiting for workload to finish...") + cmvt.monitor.Wait() + t.L().Printf("workload finished") } } @@ -214,14 +218,14 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { if err != nil { t.Fatal(err) } - fprintV.DBFunc(cmvt.cdcDBConn(getConn)) + fprintV.DBFunc(cmvt.cdcDBConn(getConn)).ValidateDuplicatedEvents() validators := cdctest.Validators{ cdctest.NewOrderValidator(tableName), fprintV, } cmvt.validator = cdctest.MakeCountValidator(validators) - for !cmvt.validatorStop { + for !cmvt.workloadDone { m := consumer.Next(ctx) if m == nil { t.L().Printf("end of changefeed") @@ -248,8 +252,6 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep { cmvt.timestampResolved() } } - - close(cmvt.validatorDone) return nil }) } @@ -386,7 +388,7 @@ func runCDCMixedVersions( waitForUpgradeStep(tester.crdbNodes), tester.waitForResolvedTimestamps(), - tester.waitForValidator(), + tester.waitForWorkload(), tester.assertValid(), ).run(ctx, t) }