Skip to content

Commit

Permalink
roachtest: skip duplicated events in fingerprint validator
Browse files Browse the repository at this point in the history
This commit updates the fingerprint validator (and its use in the
`cdc/mixed-versions` test) to ignore duplicated events received by the
validator.

A previously implicit assumption of the validator is that any events
that it receives are either not duplicated, or -- if they are
duplicated -- they are within the previous resolved timestamp and the
current resolved timestamp. However, that assumption is not justified
by the changefeed guarantees and depends on how frequently `resolved`
events are emitted and how often the changefeed checkpoints.

In the specific case of the `cdc/mixed-versions` roachtest, it was
possible for the changefeed to start from an old checkpoint (older
than the last received `resolved` timestamp), causing it to re-emit
old events that happened way before the previously observed resolved
event. As a consequence, when the validator applies the update
associated with that event, there is a mismatch with state of the
original table as of the update's timestamp, as the fingerprint
validator relies on the fact that updates are applied in order.

To fix the issue, we now skip events that happen before the timestamp
of the previous `resolved` event received. In addition, the caller can
also tell the validator to verify that such out-of-order messages
received by the validator have indeed been previously seen; if not,
that would represent a violation of the changefeed's guarantees.

Fixes: #87251.

Release note: None
  • Loading branch information
renatolabs committed Oct 4, 2022
1 parent 9192ed9 commit f4e74b8
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 21 deletions.
70 changes: 65 additions & 5 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ type validatorRow struct {
updated hlc.Timestamp
}

// eventKey returns a key that encodes the key and timestamp of a row
// received from a changefeed. Can be used to keep track of which
// updates have been seen before in a validator.
func (row validatorRow) eventKey() string {
return fmt.Sprintf("%s|%s", row.key, row.updated.AsOfSystemTime())
}

// FingerprintValidator verifies that recreating a table from its changefeed
// will fingerprint the same at all "interesting" points in time.
type FingerprintValidator struct {
Expand All @@ -342,6 +349,7 @@ type FingerprintValidator struct {
fprintOrigColumns int
fprintTestColumns int
buffer []validatorRow
previouslySeen map[string]struct{}

failures []string
}
Expand Down Expand Up @@ -423,18 +431,37 @@ func (v *FingerprintValidator) DBFunc(
return v
}

// ValidateDuplicatedEvents enables the validation of duplicated
// messages in the fingerprint validator. Whenever a row is received
// with a timestamp lower than the last `resolved` timestamp seen, we
// verify that the event has been seen before (if it hasn't, that
// would be a violation of the changefeed guarantees)
func (v *FingerprintValidator) ValidateDuplicatedEvents() *FingerprintValidator {
v.previouslySeen = make(map[string]struct{})
return v
}

// NoteRow implements the Validator interface.
func (v *FingerprintValidator) NoteRow(
ignoredPartition string, key, value string, updated hlc.Timestamp,
) error {
if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) {
v.firstRowTimestamp = updated
}
v.buffer = append(v.buffer, validatorRow{
key: key,
value: value,
updated: updated,
})

row := validatorRow{key: key, value: value, updated: updated}
if err := v.maybeValidateDuplicatedEvent(row); err != nil {
return err
}

// if this row's timestamp is earlier than the last resolved
// timestamp we processed, we can skip it as it is a duplicate
if row.updated.Less(v.resolved) {
return nil
}

v.buffer = append(v.buffer, row)
v.maybeAddSeenEvent(row)
return nil
}

Expand Down Expand Up @@ -598,6 +625,39 @@ func (v *FingerprintValidator) NoteResolved(partition string, resolved hlc.Times
return nil
}

// maybeAddSeenEvent is a no-op if the caller did not call
// ValidateDuplicatedEvents. Otherwise, we keep a reference to the row
// key and MVCC timestamp for later validation
func (v *FingerprintValidator) maybeAddSeenEvent(row validatorRow) {
if v.previouslySeen == nil {
return
}

v.previouslySeen[row.eventKey()] = struct{}{}
}

// maybeValidateDuplicatedEvent is a no-op if the caller did not call
// ValidateDuplicatedEvents. Otherwise, it returns an error if the
// row's timestamp is older than the `previousResolved` timestamp
// passed *and* it has not been seen before; that would be a violation
// of the changefeed's guarantees.
func (v *FingerprintValidator) maybeValidateDuplicatedEvent(row validatorRow) error {
if v.previouslySeen == nil {
return nil
}

if v.resolved.LessEq(row.updated) {
return nil
}

if _, seen := v.previouslySeen[row.eventKey()]; !seen {
return fmt.Errorf("unexpected new event at timestamp %s after resolved timestamp %s",
row.updated.AsOfSystemTime(), v.resolved.AsOfSystemTime())
}

return nil
}

func (v *FingerprintValidator) fingerprint(ts hlc.Timestamp) error {
var orig string
if err := v.sqlDBFunc(func(db *gosql.DB) error {
Expand Down
34 changes: 18 additions & 16 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ var (
targetTable = "bank"

timeout = 30 * time.Minute
// set a fixed number of operations to be performed by the
// workload. Since we are validating events emitted by the
// changefeed in this test (ValidateDuplicatedEvents() call),
// enforcing a maximum number of operations sets a boundary on the
// validator's memory usage
maxOps = 12000
)

func registerCDCMixedVersions(r registry.Registry) {
Expand Down Expand Up @@ -79,8 +85,7 @@ type cdcMixedVersionTester struct {
kafka kafkaManager
changefeedJobID int
validator *cdctest.CountValidator
validatorDone chan struct{} // validator is no longer waiting for messages
validatorStop bool // used to tell the validator to stop validating messages
workloadDone bool
cleanup func()
}

Expand All @@ -99,7 +104,6 @@ func newCDCMixedVersionTester(
workloadNodes: lastNode,
kafkaNodes: lastNode,
monitor: c.NewMonitor(ctx, crdbNodes),
validatorDone: make(chan struct{}),
}
}

Expand Down Expand Up @@ -127,10 +131,12 @@ func (cmvt *cdcMixedVersionTester) installAndStartWorkload() versionStep {
t.Status("installing and running workload")
u.c.Run(ctx, cmvt.workloadNodes, "./workload init bank {pgurl:1}")
cmvt.monitor.Go(func(ctx context.Context) error {
defer func() { cmvt.workloadDone = true }()
return u.c.RunE(
ctx,
cmvt.workloadNodes,
fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --tolerate-errors", cmvt.crdbNodes),
fmt.Sprintf("./workload run bank {pgurl%s} --max-rate=10 --max-ops %d --tolerate-errors",
cmvt.crdbNodes, maxOps),
)
})
}
Expand Down Expand Up @@ -171,14 +177,12 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep {
}
}

// waitForValidator sets the `validatorStop` flag and waits for the
// validator to finish. This should be done right before the test
// finishes, to ensure that we won't try to close the database
// connection while the validator is still trying to use it
func (cmvt *cdcMixedVersionTester) waitForValidator() versionStep {
// waitForWorkload waits for the workload to finish
func (cmvt *cdcMixedVersionTester) waitForWorkload() versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
cmvt.validatorStop = true
<-cmvt.validatorDone
t.L().Printf("waiting for workload to finish...")
cmvt.monitor.Wait()
t.L().Printf("workload finished")
}
}

Expand Down Expand Up @@ -214,14 +218,14 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep {
if err != nil {
t.Fatal(err)
}
fprintV.DBFunc(cmvt.cdcDBConn(getConn))
fprintV.DBFunc(cmvt.cdcDBConn(getConn)).ValidateDuplicatedEvents()
validators := cdctest.Validators{
cdctest.NewOrderValidator(tableName),
fprintV,
}
cmvt.validator = cdctest.MakeCountValidator(validators)

for !cmvt.validatorStop {
for !cmvt.workloadDone {
m := consumer.Next(ctx)
if m == nil {
t.L().Printf("end of changefeed")
Expand All @@ -248,8 +252,6 @@ func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep {
cmvt.timestampResolved()
}
}

close(cmvt.validatorDone)
return nil
})
}
Expand Down Expand Up @@ -386,7 +388,7 @@ func runCDCMixedVersions(
waitForUpgradeStep(tester.crdbNodes),

tester.waitForResolvedTimestamps(),
tester.waitForValidator(),
tester.waitForWorkload(),
tester.assertValid(),
).run(ctx, t)
}

0 comments on commit f4e74b8

Please sign in to comment.