Skip to content

Commit

Permalink
testutils: add fingerprint utility package
Browse files Browse the repository at this point in the history
This patch adds a couple helper methods that make it easier to: fingerprint a
table, database or cluster, compare fingerprints, and identify a mismatch on
the table level. All fingerprinting methods call crdb_internal.fingerprint() on
the table span level with the user provided options.

This patch also integrates the new methods in the backupccl and streamingccl
codebases.

A future patch could add a helper method that identifies mismatched keys within
a mismatched table.

Informs #103072

Release note: None
  • Loading branch information
msbutler committed Jun 13, 2023
1 parent c1a71cf commit bad4864
Show file tree
Hide file tree
Showing 19 changed files with 528 additions and 53 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ ALL_TESTS = [
"//pkg/storage:storage_test",
"//pkg/testutils/docker:docker_test",
"//pkg/testutils/echotest:echotest_test",
"//pkg/testutils/fingerprintutils:fingerprintutils_test",
"//pkg/testutils/floatcmp:floatcmp_test",
"//pkg/testutils/keysutils:keysutils_test",
"//pkg/testutils/lint/passes/errcmp:errcmp_test",
Expand Down Expand Up @@ -2083,6 +2084,8 @@ GO_TARGETS = [
"//pkg/testutils/docker:testutils_docker",
"//pkg/testutils/echotest:echotest",
"//pkg/testutils/echotest:echotest_test",
"//pkg/testutils/fingerprintutils:fingerprintutils",
"//pkg/testutils/fingerprintutils:fingerprintutils_test",
"//pkg/testutils/floatcmp:floatcmp",
"//pkg/testutils/floatcmp:floatcmp_test",
"//pkg/testutils/gossiputil:gossiputil",
Expand Down Expand Up @@ -3286,6 +3289,7 @@ GET_X_DATA_TARGETS = [
"//pkg/testutils/docker:get_x_data",
"//pkg/testutils/docker/docker-fsnotify:get_x_data",
"//pkg/testutils/echotest:get_x_data",
"//pkg/testutils/fingerprintutils:get_x_data",
"//pkg/testutils/floatcmp:get_x_data",
"//pkg/testutils/gossiputil:get_x_data",
"//pkg/testutils/grpcutils:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/keysutils",
"//pkg/testutils/serverutils",
Expand Down
18 changes: 14 additions & 4 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/keysutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -1083,7 +1084,9 @@ SELECT payload FROM "".crdb_internal.system_jobs ORDER BY created DESC LIMIT 10
sqlDB.Exec(t, incBackupQuery, queryArgs...)
}
bankTableID := sqlutils.QueryTableID(t, conn, "data", "public", "bank")
backupTableFingerprint := sqlutils.FingerprintTable(t, sqlDB, bankTableID)
backupTableFingerprint, err := fingerprintutils.FingerprintTable(ctx, conn, bankTableID,
fingerprintutils.Stripped())
require.NoError(t, err)

sqlDB.Exec(t, `DROP DATABASE data CASCADE`)

Expand All @@ -1104,18 +1107,22 @@ SELECT payload FROM "".crdb_internal.system_jobs ORDER BY created DESC LIMIT 10
restoreQuery = fmt.Sprintf("%s WITH kms = %s", restoreQuery, kmsURIFmtString)
}
queryArgs := append(restoreURIArgs, kmsURIArgs...)
verifyRestoreData(t, sqlDB, storageSQLDB, restoreQuery, queryArgs, numAccounts, backupTableFingerprint)
verifyRestoreData(ctx, t, conn, sqlDB, storageSQLDB, restoreQuery, queryArgs, numAccounts,
backupTableFingerprint)
}

func verifyRestoreData(
ctx context.Context,
t *testing.T,
conn *gosql.DB,
sqlDB *sqlutils.SQLRunner,
storageSQLDB *sqlutils.SQLRunner,
restoreQuery string,
restoreURIArgs []interface{},
numAccounts int,
bankStrippedFingerprint int,
bankStrippedFingerprint int64,
) {

var unused string
var restored struct {
rows, idx, bytes int64
Expand Down Expand Up @@ -1166,7 +1173,10 @@ func verifyRestoreData(
}
}
restorebankID := sqlutils.QueryTableID(t, sqlDB.DB, "data", "public", "bank")
require.Equal(t, bankStrippedFingerprint, sqlutils.FingerprintTable(t, sqlDB, restorebankID))
fingerprint, err := fingerprintutils.FingerprintTable(ctx, conn, restorebankID,
fingerprintutils.Stripped())
require.NoError(t, err)
require.Equal(t, bankStrippedFingerprint, fingerprint)
}

func TestBackupRestoreSystemTables(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backuprand/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_test(
"//pkg/sql/randgen",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
13 changes: 10 additions & 3 deletions pkg/ccl/backupccl/backuprand/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -98,13 +99,16 @@ database_name = 'rand' AND schema_name = 'public'`)
}

expectedCreateTableStmt := make(map[string]string)
expectedData := make(map[string]int)
expectedData := make(map[string]int64)
for _, tableName := range tableNames {
expectedCreateTableStmt[tableName] = sqlDB.QueryStr(t,
fmt.Sprintf(`SELECT create_statement FROM [SHOW CREATE TABLE %s]`, tree.NameString(tableName)))[0][0]
if runSchemaOnlyExtension == "" {
var err error
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "rand", "public", tableName)
expectedData[tableName] = sqlutils.FingerprintTable(t, sqlDB, tableID)
expectedData[tableName], err = fingerprintutils.FingerprintTable(ctx, tc.Conns[0], tableID,
fingerprintutils.Stripped())
require.NoError(t, err)
}
}

Expand Down Expand Up @@ -138,7 +142,10 @@ database_name = 'rand' AND schema_name = 'public'`)
"SHOW CREATE %s not equal after RESTORE", tableName)
if runSchemaOnlyExtension == "" {
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "restoredb", "public", tableName)
require.Equal(t, expectedData[tableName], sqlutils.FingerprintTable(t, sqlDB, tableID))
fingerpint, err := fingerprintutils.FingerprintTable(ctx, tc.Conns[0], tableID,
fingerprintutils.Stripped())
require.NoError(t, err)
require.Equal(t, expectedData[tableName], fingerpint)
} else {
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT count(*) FROM %s`, restoreTable),
[][]string{{"0"}})
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ type TenantStreamingClusters struct {
SrcURL url.URL
SrcCleanup func()

DestCluster *testcluster.TestCluster
DestSysServer serverutils.TestServerInterface
DestSysSQL *sqlutils.SQLRunner
DestTenantSQL *sqlutils.SQLRunner
DestCluster *testcluster.TestCluster
DestSysServer serverutils.TestServerInterface
DestSysSQL *sqlutils.SQLRunner
DestTenantConn *gosql.DB
DestTenantSQL *sqlutils.SQLRunner
}

// CreateDestTenantSQL creates a dest tenant SQL runner and returns a cleanup
Expand All @@ -112,6 +113,7 @@ type TenantStreamingClusters struct {
func (c *TenantStreamingClusters) CreateDestTenantSQL(ctx context.Context) func() error {
testTenant, destTenantConn := serverutils.StartTenant(c.T, c.DestSysServer,
base.TestTenantArgs{TenantID: c.Args.DestTenantID, DisableCreateTenant: true, SkipTenantCheck: true})
c.DestTenantConn = destTenantConn
c.DestTenantSQL = sqlutils.MakeSQLRunner(destTenantConn)
return func() error {
if err := destTenantConn.Close(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/replicationutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ go_library(
"//pkg/roachpb",
"//pkg/sql/isql",
"//pkg/storage",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/sqlutils",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
Expand Down
64 changes: 64 additions & 0 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package replicationutils

import (
"context"
gosql "database/sql"
"fmt"
"testing"

Expand All @@ -21,8 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -201,6 +204,67 @@ func LoadIngestionProgress(
return sp.StreamIngest, nil
}

// InvestigateFingerprints checks that the src and dst cluster data match, table
// by table. It first computes and compares their stripped fingerprints to check
// that all the latest data matches; then it computes and compares their
// revision history fingerprints.
func InvestigateFingerprints(
ctx context.Context, srcConn, dstConn *gosql.DB, startTime,
cutoverTime hlc.Timestamp,
) error {
strippedOpts := []func(*fingerprintutils.FingerprintOption){
fingerprintutils.Stripped(),
fingerprintutils.AOST(cutoverTime),
}
if err := fingerprintClustersByTable(ctx, srcConn, dstConn, strippedOpts...); err != nil {
return fmt.Errorf("failed stripped fingerprint: %w", err)
}

opts := []func(*fingerprintutils.FingerprintOption){
fingerprintutils.RevisionHistory(),
fingerprintutils.StartTime(startTime),
fingerprintutils.AOST(cutoverTime),
}
if err := fingerprintClustersByTable(ctx, srcConn, dstConn, opts...); err != nil {
return fmt.Errorf("failed revision history fingerprint: %w", err)
}
return nil
}

func fingerprintClustersByTable(
ctx context.Context,
srcConn, dstConn *gosql.DB,
optFuncs ...func(*fingerprintutils.FingerprintOption),
) error {
g := ctxgroup.WithContext(ctx)
var (
srcFingerprints, dstFingerprints map[string]map[string]int64
)
g.Go(func() error {
var err error
srcFingerprints, err = fingerprintutils.FingerprintCluster(ctx, srcConn, true,
optFuncs...)
if err != nil {
return fmt.Errorf("failed getting src fingerprint: %w", err)
}
return nil
})
g.Go(func() error {
var err error
dstFingerprints, err = fingerprintutils.FingerprintCluster(ctx, dstConn, true,
optFuncs...)
if err != nil {
return fmt.Errorf("failed getting dst fingerprint: %w", err)
}
return nil
})
if err := g.Wait(); err != nil {
return err
}
return fingerprintutils.CompareClusterFingerprints(srcFingerprints,
dstFingerprints)
}

func GetStreamIngestionStats(
ctx context.Context,
streamIngestionDetails jobspb.StreamIngestionDetails,
Expand Down
28 changes: 24 additions & 4 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
Expand Down Expand Up @@ -55,11 +56,14 @@ import (
// - compare-replication-results: runs the specified SQL query on both the
// "source" and "destination" tenants and asserts that the results are equal
//
// - compare-tenant-fingerprints from=<start-time> to=<end-time> [with_revisions]
// - compare-tenant-fingerprints from=<start-time> to=<end-time> [with_revisions,table_fingerprints]
// Runs `crdb_internal.fingerprint` on both the "source" and "destination"
// tenants with the provided options and asserts that the generated fingerprints
// are equal.
//
// - the table_fingerprints option conducts another round of fingerprinting over each table in the
// clusters. (Primarily used to test fingerprint helper functions).
//
// - sleep ms=TIME
// Sleep for TIME milliseconds.
//
Expand Down Expand Up @@ -126,9 +130,7 @@ func TestDataDriven(t *testing.T) {
if ok {
replicatedTimeTarget = varValue
}
timestamp, _, err := tree.ParseDTimestamp(nil, replicatedTimeTarget, time.Microsecond)
require.NoError(t, err)
ds.replicationClusters.WaitUntilReplicatedTime(hlc.Timestamp{WallTime: timestamp.UnixNano()},
ds.replicationClusters.WaitUntilReplicatedTime(stringToHLC(t, replicatedTimeTarget),
jobspb.JobID(ds.replicationJobID))
case "start-replicated-tenant":
cleanupTenant := ds.replicationClusters.CreateDestTenantSQL(ctx)
Expand Down Expand Up @@ -213,8 +215,20 @@ func TestDataDriven(t *testing.T) {
ds.replicationClusters.DestSysSQL.QueryRow(t, fmt.Sprintf(fingerprintQuery,
ds.replicationClusters.Args.DestTenantName, from, allRevisions, to)).Scan(&fingerprintDestTenant)
require.NotZero(t, fingerprintDestTenant)
if fingerprintSrcTenant != fingerprintDestTenant {
require.NoError(t, replicationutils.InvestigateFingerprints(ctx,
ds.replicationClusters.SrcTenantConn,
ds.replicationClusters.DestTenantConn, stringToHLC(t, from), stringToHLC(t, to)))
t.Fatalf("tenant level fingerpint mismatch, but table level fingerprints match")
}
require.Equal(t, fingerprintSrcTenant, fingerprintDestTenant)

if d.HasArg("table_fingerprints") {
require.NoError(t, replicationutils.InvestigateFingerprints(ctx,
ds.replicationClusters.SrcTenantConn,
ds.replicationClusters.DestTenantConn, stringToHLC(t, from), stringToHLC(t, to)))
}

case "sleep":
var msStr string
if d.HasArg("ms") {
Expand Down Expand Up @@ -278,6 +292,12 @@ func TestDataDriven(t *testing.T) {
})
}

func stringToHLC(t *testing.T, timestamp string) hlc.Timestamp {
parsedTimestamp, _, err := tree.ParseDTimestamp(nil, timestamp, time.Microsecond)
require.NoError(t, err)
return hlc.Timestamp{WallTime: parsedTimestamp.UnixNano()}
}

type datadrivenTestState struct {
producerJobID, replicationJobID int
replicationClusters *replicationtestutils.TenantStreamingClusters
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/testdata/simple
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ cutover ts=$ts
start-replicated-tenant
----

compare-tenant-fingerprints from=$start to=$ts with_revisions
compare-tenant-fingerprints from=$start to=$ts with_revisions table_fingerprints
----

compare-replication-results
Expand Down
12 changes: 11 additions & 1 deletion pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,19 @@ AS OF SYSTEM TIME '%s'`, startTimeDecimal, aost)
rd.t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration)
return nil
})

// If the goroutine gets cancelled or fataled, return before comparing fingerprints.
require.NoError(rd.t, fingerPrintMonitor.WaitE())
if srcFingerprint != destFingerprint {
startHlc := hlc.Timestamp{WallTime: startTime.UnixNano()}
endHlc := hlc.Timestamp{WallTime: endTime.UnixNano()}
rd.t.L().Printf("fingerpint mismatch: conducting table level fingerprints")
srcTenantConn := rd.c.Conn(ctx, rd.t.L(), 1, option.TenantName(rd.setup.src.name))
dstTenantConn := rd.c.Conn(ctx, rd.t.L(), rd.rs.srcNodes+1, option.TenantName(rd.setup.dst.name))
require.NoError(rd.t, replicationutils.InvestigateFingerprints(ctx, srcTenantConn, dstTenantConn,
startHlc,
endHlc))
rd.t.L().Printf("fingerprints by table seem to match")
}
require.Equal(rd.t, srcFingerprint, destFingerprint)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/sem/builtins/fingerprint_builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
Expand Down Expand Up @@ -288,9 +289,9 @@ func TestFingerprintStripped(t *testing.T) {
// Create the same sql rows in a different table, committed at a different timestamp.
db.Exec(t, "CREATE TABLE test.test2 (k PRIMARY KEY) AS SELECT generate_series(1, 10)")

strippedFingerprint := func(tableName string) int {
tableID := sqlutils.QueryTableID(t, sqlDB, "test", "public", tableName)
return sqlutils.FingerprintTable(t, db, tableID)
}
require.Equal(t, strippedFingerprint("test"), strippedFingerprint("test2"))
fingerprints, err := fingerprintutils.FingerprintDatabase(ctx, sqlDB, "test",
fingerprintutils.Stripped())
require.NoError(t, err)

require.Equal(t, fingerprints["test"], fingerprints["test2"])
}
Loading

0 comments on commit bad4864

Please sign in to comment.