Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: ignore duplicated events in fingerprint validator #89332

Merged
merged 2 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
if err != nil {
return nil, err
}
fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount, false)
fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount)
if err != nil {
return nil, err
}
Expand Down
168 changes: 113 additions & 55 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -65,8 +63,6 @@ var _ Validator = &orderValidator{}
var _ Validator = &noOpValidator{}
var _ StreamValidator = &orderValidator{}

var retryDuration = 2 * time.Minute

type noOpValidator struct{}

// NoteRow accepts a changed row entry.
Expand Down Expand Up @@ -323,16 +319,23 @@ type validatorRow struct {
updated hlc.Timestamp
}

// fingerprintValidator verifies that recreating a table from its changefeed
// eventKey returns a key that encodes the key and timestamp of a row
// received from a changefeed. Can be used to keep track of which
// updates have been seen before in a validator.
func (row validatorRow) eventKey() string {
return fmt.Sprintf("%s|%s", row.key, row.updated.AsOfSystemTime())
}

// FingerprintValidator verifies that recreating a table from its changefeed
// will fingerprint the same at all "interesting" points in time.
type fingerprintValidator struct {
sqlDB *gosql.DB
type FingerprintValidator struct {
sqlDBFunc func(func(*gosql.DB) error) error
origTable, fprintTable string
primaryKeyCols []string
partitionResolved map[string]hlc.Timestamp
resolved hlc.Timestamp
// It's possible to get a resolved timestamp from before the table even
// exists, which is valid but complicates the way fingerprintValidator works.
// exists, which is valid but complicates the way FingerprintValidator works.
// Don't create a fingerprint earlier than the first seen row.
firstRowTimestamp hlc.Timestamp
// previousRowUpdateTs keeps track of the timestamp of the most recently processed row
Expand All @@ -346,39 +349,40 @@ type fingerprintValidator struct {
fprintOrigColumns int
fprintTestColumns int
buffer []validatorRow

// shouldRetry indicates whether row updates should be retried (for
// a fixed duration). Typically used when the transient errors are
// expected (e.g., if performing an upgrade)
shouldRetry bool
previouslySeen map[string]struct{}

failures []string
}

// defaultSQLDBFunc is the default function passed the FingerprintValidator's
// `sqlDBFunc`. It is sufficient in cases when the database is not expected to
// fail while the validator is using it.
func defaultSQLDBFunc(db *gosql.DB) func(func(*gosql.DB) error) error {
return func(f func(*gosql.DB) error) error {
return f(db)
}
}

// NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as
// scratch space to recreate `origTable`. `fprintTable` must exist before calling this
// constructor. `maxTestColumnCount` indicates the maximum number of columns that can be
// expected in `origTable` due to test-related schema changes. This fingerprint validator
// will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to
// accommodate schema changes on the fly.
func NewFingerprintValidator(
sqlDB *gosql.DB,
origTable, fprintTable string,
partitions []string,
maxTestColumnCount int,
shouldRetry bool,
) (Validator, error) {
db *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int,
) (*FingerprintValidator, error) {
// Fetch the primary keys though information_schema schema inspections so we
// can use them to construct the SQL for DELETEs and also so we can verify
// that the key in a message matches what's expected for the value.
primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, fprintTable)
primaryKeyCols, err := fetchPrimaryKeyCols(db, fprintTable)
if err != nil {
return nil, err
}

// Record the non-test%d columns in `fprint`.
var fprintOrigColumns int
if err := sqlDB.QueryRow(`
if err := db.QueryRow(`
SELECT count(column_name)
FROM information_schema.columns
WHERE table_name=$1
Expand All @@ -396,19 +400,19 @@ func NewFingerprintValidator(
}
fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i)
}
if _, err := sqlDB.Exec(addColumnStmt.String()); err != nil {
_, err = db.Exec(addColumnStmt.String())
if err != nil {
return nil, err
}
}

v := &fingerprintValidator{
sqlDB: sqlDB,
v := &FingerprintValidator{
sqlDBFunc: defaultSQLDBFunc(db),
origTable: origTable,
fprintTable: fprintTable,
primaryKeyCols: primaryKeyCols,
fprintOrigColumns: fprintOrigColumns,
fprintTestColumns: maxTestColumnCount,
shouldRetry: shouldRetry,
}
v.partitionResolved = make(map[string]hlc.Timestamp)
for _, partition := range partitions {
Expand All @@ -417,25 +421,54 @@ func NewFingerprintValidator(
return v, nil
}

// DBFunc sets the database function used when the validator needs to
// perform database operations (updating the scratch table, computing
// fingerprints, etc)
func (v *FingerprintValidator) DBFunc(
dbFunc func(func(*gosql.DB) error) error,
) *FingerprintValidator {
v.sqlDBFunc = dbFunc
return v
}

// ValidateDuplicatedEvents enables the validation of duplicated
// messages in the fingerprint validator. Whenever a row is received
// with a timestamp lower than the last `resolved` timestamp seen, we
// verify that the event has been seen before (if it hasn't, that
// would be a violation of the changefeed guarantees)
func (v *FingerprintValidator) ValidateDuplicatedEvents() *FingerprintValidator {
v.previouslySeen = make(map[string]struct{})
return v
}

// NoteRow implements the Validator interface.
func (v *fingerprintValidator) NoteRow(
func (v *FingerprintValidator) NoteRow(
ignoredPartition string, key, value string, updated hlc.Timestamp,
) error {
if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) {
v.firstRowTimestamp = updated
}
v.buffer = append(v.buffer, validatorRow{
key: key,
value: value,
updated: updated,
})

row := validatorRow{key: key, value: value, updated: updated}
if err := v.maybeValidateDuplicatedEvent(row); err != nil {
return err
}

// if this row's timestamp is earlier than the last resolved
// timestamp we processed, we can skip it as it is a duplicate
if row.updated.Less(v.resolved) {
return nil
}

v.buffer = append(v.buffer, row)
v.maybeAddSeenEvent(row)
return nil
}

// applyRowUpdate applies the update represented by `row` to the scratch table.
func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
defer func() {
_err = errors.Wrap(_err, "fingerprintValidator failed")
_err = errors.Wrap(_err, "FingerprintValidator failed")
}()

var args []interface{}
Expand Down Expand Up @@ -512,14 +545,14 @@ func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
}
}

return v.maybeRetry(func() error {
_, err := v.sqlDB.Exec(stmtBuf.String(), args...)
return v.sqlDBFunc(func(db *gosql.DB) error {
_, err := db.Exec(stmtBuf.String(), args...)
return err
})
}

// NoteResolved implements the Validator interface.
func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
func (v *FingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
if r, ok := v.partitionResolved[partition]; !ok {
return errors.Errorf(`unknown partition: %s`, partition)
} else if resolved.LessEq(r) {
Expand Down Expand Up @@ -557,8 +590,8 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
}
row := v.buffer[0]
// NOTE: changes to the validator's state before `applyRowUpdate`
// are safe because, if the operation can fail, the caller should
// be setting the `shouldRetry` field accordingly
// are safe because if database calls can fail, they should be
// retried by passing a custom function to DBFunction
v.buffer = v.buffer[1:]

// If we've processed all row updates belonging to the previous row's timestamp,
Expand Down Expand Up @@ -592,18 +625,54 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
return nil
}

func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error {
// maybeAddSeenEvent is a no-op if the caller did not call
// ValidateDuplicatedEvents. Otherwise, we keep a reference to the row
// key and MVCC timestamp for later validation
func (v *FingerprintValidator) maybeAddSeenEvent(row validatorRow) {
if v.previouslySeen == nil {
return
}

v.previouslySeen[row.eventKey()] = struct{}{}
}

// maybeValidateDuplicatedEvent is a no-op if the caller did not call
// ValidateDuplicatedEvents. Otherwise, it returns an error if the
// row's timestamp is earlier than the validator's `resolved`
// timestamp *and* it has not been seen before; that would be a
// violation of the changefeed's guarantees.
func (v *FingerprintValidator) maybeValidateDuplicatedEvent(row validatorRow) error {
if v.previouslySeen == nil {
return nil
}

// row's timestamp is after the last resolved timestamp; no problem
if v.resolved.LessEq(row.updated) {
return nil
}

// row's timestamp is earlier than resolved timestamp *and* it
// hasn't been seen before; that shouldn't happen
if _, seen := v.previouslySeen[row.eventKey()]; !seen {
return fmt.Errorf("unexpected out-of-order event at timestamp %s prior to resolved timestamp %s",
row.updated.AsOfSystemTime(), v.resolved.AsOfSystemTime())
}

return nil
}

func (v *FingerprintValidator) fingerprint(ts hlc.Timestamp) error {
var orig string
if err := v.maybeRetry(func() error {
return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
if err := v.sqlDBFunc(func(db *gosql.DB) error {
return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.origTable + `
] AS OF SYSTEM TIME '` + ts.AsOfSystemTime() + `'`).Scan(&orig)
}); err != nil {
return err
}
var check string
if err := v.maybeRetry(func() error {
return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
if err := v.sqlDBFunc(func(db *gosql.DB) error {
return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.fprintTable + `
]`).Scan(&check)
}); err != nil {
Expand All @@ -617,21 +686,10 @@ func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error {
}

// Failures implements the Validator interface.
func (v *fingerprintValidator) Failures() []string {
func (v *FingerprintValidator) Failures() []string {
return v.failures
}

// maybeRetry will retry the function passed if the fingerprint was
// created with `shouldRetry` set to `true`. Every access to `sqlDB`
// should be made my closures passed to this function
func (v *fingerprintValidator) maybeRetry(f func() error) error {
if v.shouldRetry {
return retry.ForDuration(retryDuration, f)
}

return f()
}

// Validators abstracts over running multiple `Validator`s at once on the same
// feed.
type Validators []Validator
Expand Down
Loading