Skip to content

Commit

Permalink
cdctest: remove retry logic from the fingerprint validator
Browse files Browse the repository at this point in the history
To handle mixed-version CDC tests, the fingerprint validator had
previously been updated to handle retries internally; this complicates
the validator as the retry logic is only ever used in the
mixed-version roachtests.

This commit removes the retry logic from the validator and instead
allows the caller to pass a `DBFunc` to be called whenever a database
connection is needed after initialization. By passing a custom DBFunc,
tests that need it (such as the mixed-versions roachtest) can pass a
function that accounts for temporary unavailability of nodes.
Specifically, we pass a function that blocks while nodes are being
upgraded, to simplify reasoning of this test's behavior.

In order to support that, we also change the signature of
`NewFingerprintValidator` to return the actual concrete
validator (which is now made public) instead of the `Validator`
interface, following the generally-good approach of "accept
interfaces, return structs" in Go.

Release note: None
  • Loading branch information
renatolabs committed Oct 4, 2022
1 parent d270005 commit 9192ed9
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 73 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
if err != nil {
return nil, err
}
fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount, false)
fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount)
if err != nil {
return nil, err
}
Expand Down
95 changes: 45 additions & 50 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -65,8 +63,6 @@ var _ Validator = &orderValidator{}
var _ Validator = &noOpValidator{}
var _ StreamValidator = &orderValidator{}

var retryDuration = 2 * time.Minute

type noOpValidator struct{}

// NoteRow accepts a changed row entry.
Expand Down Expand Up @@ -323,16 +319,16 @@ type validatorRow struct {
updated hlc.Timestamp
}

// fingerprintValidator verifies that recreating a table from its changefeed
// FingerprintValidator verifies that recreating a table from its changefeed
// will fingerprint the same at all "interesting" points in time.
type fingerprintValidator struct {
sqlDB *gosql.DB
type FingerprintValidator struct {
sqlDBFunc func(func(*gosql.DB) error) error
origTable, fprintTable string
primaryKeyCols []string
partitionResolved map[string]hlc.Timestamp
resolved hlc.Timestamp
// It's possible to get a resolved timestamp from before the table even
// exists, which is valid but complicates the way fingerprintValidator works.
// exists, which is valid but complicates the way FingerprintValidator works.
// Don't create a fingerprint earlier than the first seen row.
firstRowTimestamp hlc.Timestamp
// previousRowUpdateTs keeps track of the timestamp of the most recently processed row
Expand All @@ -347,38 +343,38 @@ type fingerprintValidator struct {
fprintTestColumns int
buffer []validatorRow

// shouldRetry indicates whether row updates should be retried (for
// a fixed duration). Typically used when the transient errors are
// expected (e.g., if performing an upgrade)
shouldRetry bool

failures []string
}

// defaultSQLDBFunc is the default function passed the FingerprintValidator's
// `sqlDBFunc`. It is sufficient in cases when the database is not expected to
// fail while the validator is using it.
func defaultSQLDBFunc(db *gosql.DB) func(func(*gosql.DB) error) error {
return func(f func(*gosql.DB) error) error {
return f(db)
}
}

// NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as
// scratch space to recreate `origTable`. `fprintTable` must exist before calling this
// constructor. `maxTestColumnCount` indicates the maximum number of columns that can be
// expected in `origTable` due to test-related schema changes. This fingerprint validator
// will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to
// accommodate schema changes on the fly.
func NewFingerprintValidator(
sqlDB *gosql.DB,
origTable, fprintTable string,
partitions []string,
maxTestColumnCount int,
shouldRetry bool,
) (Validator, error) {
db *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int,
) (*FingerprintValidator, error) {
// Fetch the primary keys though information_schema schema inspections so we
// can use them to construct the SQL for DELETEs and also so we can verify
// that the key in a message matches what's expected for the value.
primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, fprintTable)
primaryKeyCols, err := fetchPrimaryKeyCols(db, fprintTable)
if err != nil {
return nil, err
}

// Record the non-test%d columns in `fprint`.
var fprintOrigColumns int
if err := sqlDB.QueryRow(`
if err := db.QueryRow(`
SELECT count(column_name)
FROM information_schema.columns
WHERE table_name=$1
Expand All @@ -396,19 +392,19 @@ func NewFingerprintValidator(
}
fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i)
}
if _, err := sqlDB.Exec(addColumnStmt.String()); err != nil {
_, err = db.Exec(addColumnStmt.String())
if err != nil {
return nil, err
}
}

v := &fingerprintValidator{
sqlDB: sqlDB,
v := &FingerprintValidator{
sqlDBFunc: defaultSQLDBFunc(db),
origTable: origTable,
fprintTable: fprintTable,
primaryKeyCols: primaryKeyCols,
fprintOrigColumns: fprintOrigColumns,
fprintTestColumns: maxTestColumnCount,
shouldRetry: shouldRetry,
}
v.partitionResolved = make(map[string]hlc.Timestamp)
for _, partition := range partitions {
Expand All @@ -417,8 +413,18 @@ func NewFingerprintValidator(
return v, nil
}

// DBFunc sets the database function used when the validator needs to
// perform database operations (updating the scratch table, computing
// fingerprints, etc)
func (v *FingerprintValidator) DBFunc(
dbFunc func(func(*gosql.DB) error) error,
) *FingerprintValidator {
v.sqlDBFunc = dbFunc
return v
}

// NoteRow implements the Validator interface.
func (v *fingerprintValidator) NoteRow(
func (v *FingerprintValidator) NoteRow(
ignoredPartition string, key, value string, updated hlc.Timestamp,
) error {
if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) {
Expand All @@ -433,9 +439,9 @@ func (v *fingerprintValidator) NoteRow(
}

// applyRowUpdate applies the update represented by `row` to the scratch table.
func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
defer func() {
_err = errors.Wrap(_err, "fingerprintValidator failed")
_err = errors.Wrap(_err, "FingerprintValidator failed")
}()

var args []interface{}
Expand Down Expand Up @@ -512,14 +518,14 @@ func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
}
}

return v.maybeRetry(func() error {
_, err := v.sqlDB.Exec(stmtBuf.String(), args...)
return v.sqlDBFunc(func(db *gosql.DB) error {
_, err := db.Exec(stmtBuf.String(), args...)
return err
})
}

// NoteResolved implements the Validator interface.
func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
func (v *FingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
if r, ok := v.partitionResolved[partition]; !ok {
return errors.Errorf(`unknown partition: %s`, partition)
} else if resolved.LessEq(r) {
Expand Down Expand Up @@ -557,8 +563,8 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
}
row := v.buffer[0]
// NOTE: changes to the validator's state before `applyRowUpdate`
// are safe because, if the operation can fail, the caller should
// be setting the `shouldRetry` field accordingly
// are safe because if database calls can fail, they should be
// retried by passing a custom function to DBFunction
v.buffer = v.buffer[1:]

// If we've processed all row updates belonging to the previous row's timestamp,
Expand Down Expand Up @@ -592,18 +598,18 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
return nil
}

func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error {
func (v *FingerprintValidator) fingerprint(ts hlc.Timestamp) error {
var orig string
if err := v.maybeRetry(func() error {
return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
if err := v.sqlDBFunc(func(db *gosql.DB) error {
return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.origTable + `
] AS OF SYSTEM TIME '` + ts.AsOfSystemTime() + `'`).Scan(&orig)
}); err != nil {
return err
}
var check string
if err := v.maybeRetry(func() error {
return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
if err := v.sqlDBFunc(func(db *gosql.DB) error {
return db.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.fprintTable + `
]`).Scan(&check)
}); err != nil {
Expand All @@ -617,21 +623,10 @@ func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error {
}

// Failures implements the Validator interface.
func (v *fingerprintValidator) Failures() []string {
func (v *FingerprintValidator) Failures() []string {
return v.failures
}

// maybeRetry will retry the function passed if the fingerprint was
// created with `shouldRetry` set to `true`. Every access to `sqlDB`
// should be made my closures passed to this function
func (v *fingerprintValidator) maybeRetry(f func() error) error {
if v.shouldRetry {
return retry.ForDuration(retryDuration, f)
}

return f()
}

// Validators abstracts over running multiple `Validator`s at once on the same
// feed.
type Validators []Validator
Expand Down
22 changes: 11 additions & 11 deletions pkg/ccl/changefeedccl/cdctest/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ func TestFingerprintValidator(t *testing.T) {

t.Run(`empty`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`empty`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
assertValidatorFailures(t, v)
})
t.Run(`wrong data`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`wrong_data`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":10}}`, ts[1])
noteResolved(t, v, `p`, ts[1])
Expand All @@ -310,7 +310,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`all resolved`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`all_resolved`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns)
require.NoError(t, err)
if err := v.NoteResolved(`p`, ts[0]); err != nil {
t.Fatal(err)
Expand All @@ -329,7 +329,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`rows unsorted`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`rows_unsorted`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2])
Expand All @@ -341,7 +341,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`missed initial`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`missed_initial`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// Intentionally missing {"k":1,"v":1} at ts[1].
Expand All @@ -357,7 +357,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`missed middle`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`missed_middle`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
Expand All @@ -375,7 +375,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`missed end`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`missed_end`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
Expand All @@ -390,7 +390,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`initial scan`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`initial_scan`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3])
noteRow(t, v, ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[3])
Expand All @@ -399,15 +399,15 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`unknown partition`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`unknown_partition`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns)
require.NoError(t, err)
if err := v.NoteResolved(`nope`, ts[1]); !testutils.IsError(err, `unknown partition`) {
t.Fatalf(`expected "unknown partition" error got: %+v`, err)
}
})
t.Run(`resolved unsorted`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`resolved_unsorted`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
noteResolved(t, v, `p`, ts[1])
Expand All @@ -417,7 +417,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`two partitions`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`two_partitions`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns, false)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2])
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
return errors.Wrap(err, "CREATE TABLE failed")
}

fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0, false)
fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0)
if err != nil {
return errors.Wrap(err, "error creating validator")
}
Expand Down
Loading

0 comments on commit 9192ed9

Please sign in to comment.