diff --git a/pkg/ccl/workloadccl/allccl/all_test.go b/pkg/ccl/workloadccl/allccl/all_test.go index 07354012f1bb..9f68f97d9d54 100644 --- a/pkg/ccl/workloadccl/allccl/all_test.go +++ b/pkg/ccl/workloadccl/allccl/all_test.go @@ -26,6 +26,7 @@ const ( directIngestion = true oneFilePerNode = 1 noInjectStats = false + noSkipPostLoad = false skipCSVRoundtrip = `` ) @@ -84,7 +85,8 @@ func TestAllRegisteredImportFixture(t *testing.T) { sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE d`) if _, err := workloadccl.ImportFixture( - ctx, db, gen, `d`, directIngestion, oneFilePerNode, noInjectStats, skipCSVRoundtrip, + ctx, db, gen, `d`, directIngestion, oneFilePerNode, noInjectStats, noSkipPostLoad, + skipCSVRoundtrip, ); err != nil { t.Fatal(err) } diff --git a/pkg/ccl/workloadccl/bench_test.go b/pkg/ccl/workloadccl/bench_test.go index cf8741315513..f0095f3cd188 100644 --- a/pkg/ccl/workloadccl/bench_test.go +++ b/pkg/ccl/workloadccl/bench_test.go @@ -33,9 +33,9 @@ func benchmarkImportFixture(b *testing.B, gen workload.Generator) { b.StartTimer() const filesPerNode = 1 + const directIngest, noInjectStats, skipPostLoad, csvServer = true, false, true, `` importBytes, err := ImportFixture( - ctx, db, gen, `d`, true /* directIngestion */, filesPerNode, false, /* injectStats */ - ``, /* csvServer */ + ctx, db, gen, `d`, directIngest, filesPerNode, noInjectStats, skipPostLoad, csvServer, ) require.NoError(b, err) bytes += importBytes diff --git a/pkg/ccl/workloadccl/cliccl/fixtures.go b/pkg/ccl/workloadccl/cliccl/fixtures.go index 279471fe39be..bf0484c6de16 100644 --- a/pkg/ccl/workloadccl/cliccl/fixtures.go +++ b/pkg/ccl/workloadccl/cliccl/fixtures.go @@ -333,9 +333,10 @@ func fixturesImport(gen workload.Generator, urls []string, dbName string) error directIngestion := *fixturesImportDirectIngestionTable filesPerNode := *fixturesImportFilesPerNode injectStats := *fixturesImportInjectStats + noSkipPostLoad := false csvServer := *fixturesMakeImportCSVServerURL bytes, err := workloadccl.ImportFixture( - ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats, csvServer, + ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats, noSkipPostLoad, csvServer, ) if err != nil { return errors.Wrap(err, `importing fixture`) diff --git a/pkg/ccl/workloadccl/fixture.go b/pkg/ccl/workloadccl/fixture.go index d6b9cc45d64a..9743b988141d 100644 --- a/pkg/ccl/workloadccl/fixture.go +++ b/pkg/ccl/workloadccl/fixture.go @@ -258,8 +258,10 @@ func MakeFixture( if _, err := sqlDB.Exec(`CREATE DATABASE IF NOT EXISTS ` + dbName); err != nil { return Fixture{}, err } - const direct, stats, csvServer = false, false, "" - if _, err := ImportFixture(ctx, sqlDB, gen, dbName, direct, filesPerNode, stats, csvServer); err != nil { + const direct, stats, skipPostLoad, csvServer = false, false, true, "" + if _, err := ImportFixture( + ctx, sqlDB, gen, dbName, direct, filesPerNode, stats, skipPostLoad, csvServer, + ); err != nil { return Fixture{}, err } g := ctxgroup.WithContext(ctx) @@ -293,6 +295,7 @@ func ImportFixture( directIngestion bool, filesPerNode int, injectStats bool, + skipPostLoad bool, csvServer string, ) (int64, error) { for _, t := range gen.Tables() { @@ -339,8 +342,10 @@ func ImportFixture( if err := g.Wait(); err != nil { return 0, err } - if err := runPostLoadSteps(ctx, sqlDB, gen); err != nil { - return 0, err + if !skipPostLoad { + if err := runPostLoadSteps(ctx, sqlDB, gen); err != nil { + return 0, err + } } return atomic.LoadInt64(&bytesAtomic), nil } @@ -449,13 +454,12 @@ func RestoreFixture( ) (int64, error) { var bytesAtomic int64 g := ctxgroup.WithContext(ctx) + genName := fixture.Generator.Meta().Name for _, table := range fixture.Tables { table := table g.GoCtx(func(ctx context.Context) error { - // The IMPORT ... CSV DATA command generates a backup with the table in - // database `csv`. start := timeutil.Now() - importStmt := fmt.Sprintf(`RESTORE csv.%s FROM $1 WITH into_db=$2`, table.TableName) + importStmt := fmt.Sprintf(`RESTORE %s.%s FROM $1 WITH into_db=$2`, genName, table.TableName) var rows, index, tableBytes int64 var discard interface{} if err := sqlDB.QueryRow(importStmt, table.BackupURI, database).Scan( diff --git a/pkg/ccl/workloadccl/fixture_test.go b/pkg/ccl/workloadccl/fixture_test.go index 2f3e73aca6e5..97907007f4a0 100644 --- a/pkg/ccl/workloadccl/fixture_test.go +++ b/pkg/ccl/workloadccl/fixture_test.go @@ -183,10 +183,11 @@ func TestImportFixture(t *testing.T) { } const filesPerNode = 1 + const noSkipPostLoad = false sqlDB.Exec(t, `CREATE DATABASE distsort`) _, err := ImportFixture( ctx, db, gen, `distsort`, false /* directIngestion */, filesPerNode, true, /* injectStats */ - ``, /* csvServer */ + noSkipPostLoad, ``, /* csvServer */ ) require.NoError(t, err) sqlDB.CheckQueryResults(t, @@ -203,7 +204,7 @@ func TestImportFixture(t *testing.T) { sqlDB.Exec(t, `CREATE DATABASE direct`) _, err = ImportFixture( ctx, db, gen, `direct`, true /* directIngestion */, filesPerNode, false, /* injectStats */ - ``, /* csvServer */ + noSkipPostLoad, ``, /* csvServer */ ) require.NoError(t, err) sqlDB.CheckQueryResults(t, @@ -240,9 +241,10 @@ func TestImportFixtureCSVServer(t *testing.T) { } const filesPerNode = 1 + const noDirectIngest, noInjectStats, noSkipPostLoad = false, false, true sqlDB.Exec(t, `CREATE DATABASE d`) _, err := ImportFixture( - ctx, db, gen, `d`, false /* directIngestion */, filesPerNode, false /* injectStats */, ts.URL, + ctx, db, gen, `d`, noDirectIngest, filesPerNode, noInjectStats, noSkipPostLoad, ts.URL, ) require.NoError(t, err) sqlDB.CheckQueryResults(t, diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 73d14e3e8295..69519d7c54f9 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -1041,39 +1041,75 @@ func (c *cluster) FailOnDeadNodes(ctx context.Context, t *test) { }) } -// FailOnReplicaDivergence fails the test if -// crdb_internal.check_consistency(true, '', '') indicates that any ranges' -// replicas are inconsistent with each other. -func (c *cluster) FailOnReplicaDivergence(ctx context.Context, t *test) { - if c.nodes < 1 { - return // unit tests - } - // TODO(tbg): n1 isn't necessarily online at this point. Try to sniff out - // a node that is. - db := c.Conn(ctx, 1) - defer db.Close() - - c.l.Printf("running (fast) consistency checks") +// CheckReplicaDivergenceOnDB runs a fast consistency check of the whole keyspace +// against the provided db. If an inconsistency is found, it returns it in the +// error. Note that this will swallow errors returned directly from the consistency +// check since we know that such spurious errors are possibly without any relation +// to the check having failed. +func (c *cluster) CheckReplicaDivergenceOnDB(ctx context.Context, db *gosql.DB) error { rows, err := db.QueryContext(ctx, ` SELECT t.range_id, t.start_key_pretty, t.status, t.detail FROM crdb_internal.check_consistency(true, '', '') as t WHERE t.status NOT IN ('RANGE_CONSISTENT', 'RANGE_INDETERMINATE')`) if err != nil { - c.l.Printf("%s", err) - return + // TODO(tbg): the checks can fail for silly reasons like missing gossiped + // descriptors, etc. -- not worth failing the test for. Ideally this would + // be rock solid. + c.l.Printf("consistency check failed with %v; ignoring", err) + return nil } + var buf bytes.Buffer for rows.Next() { var rangeID int32 var prettyKey, status, detail string if err := rows.Scan(&rangeID, &prettyKey, &status, &detail); err != nil { - c.l.Printf("%s", err) - break + return err } - t.Fatalf("r%d (%s) is inconsistent: %s %s", rangeID, prettyKey, status, detail) + fmt.Fprintf(&buf, "r%d (%s) is inconsistent: %s %s\n", rangeID, prettyKey, status, detail) } if err := rows.Err(); err != nil { - c.l.Printf("%s", err) + return err + } + + msg := buf.String() + if msg != "" { + return errors.New(msg) + } + return nil +} + +// FailOnReplicaDivergence fails the test if +// crdb_internal.check_consistency(true, '', '') indicates that any ranges' +// replicas are inconsistent with each other. It uses the first node that +// is up to run the query. +func (c *cluster) FailOnReplicaDivergence(ctx context.Context, t *test) { + if c.nodes < 1 { + return // unit tests + } + + // Find a live node to run against, if one exists. + var db *gosql.DB + for i := 1; i <= c.nodes; i++ { + db = c.Conn(ctx, i) + _, err := db.Exec(`SELECT 1`) + if err != nil { + _ = db.Close() + db = nil + continue + } + c.l.Printf("running (fast) consistency checks on node %d", i) + break + } + if db == nil { + c.l.Printf("no live node found, skipping consistency check") + return + } + + defer db.Close() + + if err := c.CheckReplicaDivergenceOnDB(ctx, db); err != nil { + t.Fatal(err) } } diff --git a/pkg/cmd/roachtest/version.go b/pkg/cmd/roachtest/version.go index 4eef75a06855..bf3bfda05c4e 100644 --- a/pkg/cmd/roachtest/version.go +++ b/pkg/cmd/roachtest/version.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/binfetcher" _ "github.com/lib/pq" + "github.com/pkg/errors" ) func registerVersion(r *registry) { @@ -95,7 +96,7 @@ func registerVersion(r *registry) { // Make sure everyone is still running. for i := 1; i <= nodes; i++ { t.WorkerStatus("checking ", i) - db := c.Conn(ctx, 1) + db := c.Conn(ctx, i) defer db.Close() rows, err := db.Query(`SHOW DATABASES`) if err != nil { @@ -104,6 +105,10 @@ func registerVersion(r *registry) { if err := rows.Close(); err != nil { return err } + // Regression test for #37425. + if err := c.CheckReplicaDivergenceOnDB(ctx, db); err != nil { + return errors.Wrapf(err, "node %d", i) + } } return nil } diff --git a/pkg/server/admin.go b/pkg/server/admin.go index bdecb9bf6c66..2208d8d9354d 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "fmt" + "os" "sort" "strconv" "strings" @@ -1353,12 +1354,12 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr return nil } - s.server.grpc.Stop() - go func() { - // The explicit closure here allows callers.Lookup() to return something - // sensible referring to this file (otherwise it ends up in runtime - // internals). + // TODO(tbg): why don't we stop the stopper first? Stopping the stopper + // first seems more reasonable since grpc.Stop closes the listener right + // away (and who knows whether gRPC-goroutines are tied up in some + // stopper task somewhere). + s.server.grpc.Stop() s.server.stopper.Stop(ctx) }() @@ -1367,6 +1368,23 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr return nil case <-ctx.Done(): return ctx.Err() + case <-time.After(10 * time.Second): + // This is a hack to work around the problem in + // https://github.com/cockroachdb/cockroach/issues/37425#issuecomment-494336131 + // + // There appear to be deadlock scenarios in which we don't manage to + // fully stop the grpc server (which implies closing the listener, i.e. + // seeming dead to the outside world) or don't manage to shut down the + // stopper (the evidence in #37425 is inconclusive which one it is). + // + // Other problems in this area are known, such as + // https://github.com/cockroachdb/cockroach/pull/31692 + // + // The signal-based shutdown path uses a similar time-based escape hatch. + // Until we spend (potentially lots of time to) understand and fix this + // issue, this will serve us well. + os.Exit(1) + return errors.New("unreachable") } } diff --git a/pkg/sql/sem/builtins/generator_builtins.go b/pkg/sql/sem/builtins/generator_builtins.go index 79f0297908f1..f905e4b3f6ba 100644 --- a/pkg/sql/sem/builtins/generator_builtins.go +++ b/pkg/sql/sem/builtins/generator_builtins.go @@ -962,8 +962,10 @@ func (c *checkConsistencyGenerator) Start() error { Key: c.from, EndKey: c.to, }, - Mode: c.mode, - WithDiff: true, + Mode: c.mode, + // No meaningful diff can be created if we're checking the stats only, + // so request one only if a full check is run. + WithDiff: c.mode == roachpb.ChecksumMode_CHECK_FULL, }) if err := c.db.Run(c.ctx, &b); err != nil { return err diff --git a/pkg/storage/batcheval/cmd_compute_checksum.go b/pkg/storage/batcheval/cmd_compute_checksum.go index a20eeb5c383a..6d6b6b038e67 100644 --- a/pkg/storage/batcheval/cmd_compute_checksum.go +++ b/pkg/storage/batcheval/cmd_compute_checksum.go @@ -42,7 +42,7 @@ func declareKeysComputeChecksum( // Version numbers for Replica checksum computation. Requests silently no-op // unless the versions are compatible. const ( - ReplicaChecksumVersion = 3 + ReplicaChecksumVersion = 4 ReplicaChecksumGCInterval = time.Hour )