Skip to content

Commit

Permalink
Merge #37668 #37701
Browse files Browse the repository at this point in the history
37668: storage: fix and test a bogus source of replica divergence errors r=nvanbenschoten a=tbg

An incompatibility in the consistency checks was introduced between v2.1 and v19.1.
See individual commit messages and #37425 for details.

Release note (bug fix): Fixed a potential source of (faux) replica
inconsistencies that can be reported while running a mixed v19.1 / v2.1
cluster. This error (in that situation only) is benign and can be
resolved by upgrading to the latest v19.1 patch release. Every time this
error occurs a "checkpoint" is created which will occupy a large amount
of disk space and which needs to be removed manually (see <store
directory>/auxiliary/checkpoints).

Release note (bug fix): Fixed a case in which `./cockroach quit` would
return success even though the server process was still running in a
severely degraded state.

37701: workloadcccl: fix two regressions in fixtures make/load r=nvanbenschoten a=danhhz

The SQL database for all the tables in the BACKUPs created by `fixtures
make` used to be "csv" (an artifact of the way we made them), but as
of #37343 it's the name of the generator. This seems better so change
`fixtures load` to match.

The same PR also (accidentally) started adding foreign keys in the
BACKUPs, but since there's one table per BACKUP (another artifact of the
way we used to make fixtures), we can't restore the foreign keys. It'd
be nice to switch to one BACKUP with all tables and get the foreign
keys, but the UX of the postLoad hook becomes tricky and I don't have
time right now to sort it all out. So, revert to the previous behavior
(no fks in fixtures) for now.

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
3 people committed May 21, 2019
3 parents 5141c2c + e3ae436 + 407a4de commit 5da9204
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 42 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/workloadccl/allccl/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
directIngestion = true
oneFilePerNode = 1
noInjectStats = false
noSkipPostLoad = false
skipCSVRoundtrip = ``
)

Expand Down Expand Up @@ -84,7 +85,8 @@ func TestAllRegisteredImportFixture(t *testing.T) {
sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE d`)

if _, err := workloadccl.ImportFixture(
ctx, db, gen, `d`, directIngestion, oneFilePerNode, noInjectStats, skipCSVRoundtrip,
ctx, db, gen, `d`, directIngestion, oneFilePerNode, noInjectStats, noSkipPostLoad,
skipCSVRoundtrip,
); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/workloadccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func benchmarkImportFixture(b *testing.B, gen workload.Generator) {

b.StartTimer()
const filesPerNode = 1
const directIngest, noInjectStats, skipPostLoad, csvServer = true, false, true, ``
importBytes, err := ImportFixture(
ctx, db, gen, `d`, true /* directIngestion */, filesPerNode, false, /* injectStats */
``, /* csvServer */
ctx, db, gen, `d`, directIngest, filesPerNode, noInjectStats, skipPostLoad, csvServer,
)
require.NoError(b, err)
bytes += importBytes
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/workloadccl/cliccl/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,10 @@ func fixturesImport(gen workload.Generator, urls []string, dbName string) error
directIngestion := *fixturesImportDirectIngestionTable
filesPerNode := *fixturesImportFilesPerNode
injectStats := *fixturesImportInjectStats
noSkipPostLoad := false
csvServer := *fixturesMakeImportCSVServerURL
bytes, err := workloadccl.ImportFixture(
ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats, csvServer,
ctx, sqlDB, gen, dbName, directIngestion, filesPerNode, injectStats, noSkipPostLoad, csvServer,
)
if err != nil {
return errors.Wrap(err, `importing fixture`)
Expand Down
18 changes: 11 additions & 7 deletions pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ func MakeFixture(
if _, err := sqlDB.Exec(`CREATE DATABASE IF NOT EXISTS ` + dbName); err != nil {
return Fixture{}, err
}
const direct, stats, csvServer = false, false, ""
if _, err := ImportFixture(ctx, sqlDB, gen, dbName, direct, filesPerNode, stats, csvServer); err != nil {
const direct, stats, skipPostLoad, csvServer = false, false, true, ""
if _, err := ImportFixture(
ctx, sqlDB, gen, dbName, direct, filesPerNode, stats, skipPostLoad, csvServer,
); err != nil {
return Fixture{}, err
}
g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -293,6 +295,7 @@ func ImportFixture(
directIngestion bool,
filesPerNode int,
injectStats bool,
skipPostLoad bool,
csvServer string,
) (int64, error) {
for _, t := range gen.Tables() {
Expand Down Expand Up @@ -339,8 +342,10 @@ func ImportFixture(
if err := g.Wait(); err != nil {
return 0, err
}
if err := runPostLoadSteps(ctx, sqlDB, gen); err != nil {
return 0, err
if !skipPostLoad {
if err := runPostLoadSteps(ctx, sqlDB, gen); err != nil {
return 0, err
}
}
return atomic.LoadInt64(&bytesAtomic), nil
}
Expand Down Expand Up @@ -449,13 +454,12 @@ func RestoreFixture(
) (int64, error) {
var bytesAtomic int64
g := ctxgroup.WithContext(ctx)
genName := fixture.Generator.Meta().Name
for _, table := range fixture.Tables {
table := table
g.GoCtx(func(ctx context.Context) error {
// The IMPORT ... CSV DATA command generates a backup with the table in
// database `csv`.
start := timeutil.Now()
importStmt := fmt.Sprintf(`RESTORE csv.%s FROM $1 WITH into_db=$2`, table.TableName)
importStmt := fmt.Sprintf(`RESTORE %s.%s FROM $1 WITH into_db=$2`, genName, table.TableName)
var rows, index, tableBytes int64
var discard interface{}
if err := sqlDB.QueryRow(importStmt, table.BackupURI, database).Scan(
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/workloadccl/fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,11 @@ func TestImportFixture(t *testing.T) {
}

const filesPerNode = 1
const noSkipPostLoad = false
sqlDB.Exec(t, `CREATE DATABASE distsort`)
_, err := ImportFixture(
ctx, db, gen, `distsort`, false /* directIngestion */, filesPerNode, true, /* injectStats */
``, /* csvServer */
noSkipPostLoad, ``, /* csvServer */
)
require.NoError(t, err)
sqlDB.CheckQueryResults(t,
Expand All @@ -203,7 +204,7 @@ func TestImportFixture(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE direct`)
_, err = ImportFixture(
ctx, db, gen, `direct`, true /* directIngestion */, filesPerNode, false, /* injectStats */
``, /* csvServer */
noSkipPostLoad, ``, /* csvServer */
)
require.NoError(t, err)
sqlDB.CheckQueryResults(t,
Expand Down Expand Up @@ -240,9 +241,10 @@ func TestImportFixtureCSVServer(t *testing.T) {
}

const filesPerNode = 1
const noDirectIngest, noInjectStats, noSkipPostLoad = false, false, true
sqlDB.Exec(t, `CREATE DATABASE d`)
_, err := ImportFixture(
ctx, db, gen, `d`, false /* directIngestion */, filesPerNode, false /* injectStats */, ts.URL,
ctx, db, gen, `d`, noDirectIngest, filesPerNode, noInjectStats, noSkipPostLoad, ts.URL,
)
require.NoError(t, err)
sqlDB.CheckQueryResults(t,
Expand Down
74 changes: 55 additions & 19 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,39 +1041,75 @@ func (c *cluster) FailOnDeadNodes(ctx context.Context, t *test) {
})
}

// FailOnReplicaDivergence fails the test if
// crdb_internal.check_consistency(true, '', '') indicates that any ranges'
// replicas are inconsistent with each other.
func (c *cluster) FailOnReplicaDivergence(ctx context.Context, t *test) {
if c.nodes < 1 {
return // unit tests
}
// TODO(tbg): n1 isn't necessarily online at this point. Try to sniff out
// a node that is.
db := c.Conn(ctx, 1)
defer db.Close()

c.l.Printf("running (fast) consistency checks")
// CheckReplicaDivergenceOnDB runs a fast consistency check of the whole keyspace
// against the provided db. If an inconsistency is found, it returns it in the
// error. Note that this will swallow errors returned directly from the consistency
// check since we know that such spurious errors are possibly without any relation
// to the check having failed.
func (c *cluster) CheckReplicaDivergenceOnDB(ctx context.Context, db *gosql.DB) error {
rows, err := db.QueryContext(ctx, `
SELECT t.range_id, t.start_key_pretty, t.status, t.detail
FROM
crdb_internal.check_consistency(true, '', '') as t
WHERE t.status NOT IN ('RANGE_CONSISTENT', 'RANGE_INDETERMINATE')`)
if err != nil {
c.l.Printf("%s", err)
return
// TODO(tbg): the checks can fail for silly reasons like missing gossiped
// descriptors, etc. -- not worth failing the test for. Ideally this would
// be rock solid.
c.l.Printf("consistency check failed with %v; ignoring", err)
return nil
}
var buf bytes.Buffer
for rows.Next() {
var rangeID int32
var prettyKey, status, detail string
if err := rows.Scan(&rangeID, &prettyKey, &status, &detail); err != nil {
c.l.Printf("%s", err)
break
return err
}
t.Fatalf("r%d (%s) is inconsistent: %s %s", rangeID, prettyKey, status, detail)
fmt.Fprintf(&buf, "r%d (%s) is inconsistent: %s %s\n", rangeID, prettyKey, status, detail)
}
if err := rows.Err(); err != nil {
c.l.Printf("%s", err)
return err
}

msg := buf.String()
if msg != "" {
return errors.New(msg)
}
return nil
}

// FailOnReplicaDivergence fails the test if
// crdb_internal.check_consistency(true, '', '') indicates that any ranges'
// replicas are inconsistent with each other. It uses the first node that
// is up to run the query.
func (c *cluster) FailOnReplicaDivergence(ctx context.Context, t *test) {
if c.nodes < 1 {
return // unit tests
}

// Find a live node to run against, if one exists.
var db *gosql.DB
for i := 1; i <= c.nodes; i++ {
db = c.Conn(ctx, i)
_, err := db.Exec(`SELECT 1`)
if err != nil {
_ = db.Close()
db = nil
continue
}
c.l.Printf("running (fast) consistency checks on node %d", i)
break
}
if db == nil {
c.l.Printf("no live node found, skipping consistency check")
return
}

defer db.Close()

if err := c.CheckReplicaDivergenceOnDB(ctx, db); err != nil {
t.Fatal(err)
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/cmd/roachtest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/util/binfetcher"
_ "github.com/lib/pq"
"github.com/pkg/errors"
)

func registerVersion(r *registry) {
Expand Down Expand Up @@ -95,7 +96,7 @@ func registerVersion(r *registry) {
// Make sure everyone is still running.
for i := 1; i <= nodes; i++ {
t.WorkerStatus("checking ", i)
db := c.Conn(ctx, 1)
db := c.Conn(ctx, i)
defer db.Close()
rows, err := db.Query(`SHOW DATABASES`)
if err != nil {
Expand All @@ -104,6 +105,10 @@ func registerVersion(r *registry) {
if err := rows.Close(); err != nil {
return err
}
// Regression test for #37425.
if err := c.CheckReplicaDivergenceOnDB(ctx, db); err != nil {
return errors.Wrapf(err, "node %d", i)
}
}
return nil
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -1353,12 +1354,12 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr
return nil
}

s.server.grpc.Stop()

go func() {
// The explicit closure here allows callers.Lookup() to return something
// sensible referring to this file (otherwise it ends up in runtime
// internals).
// TODO(tbg): why don't we stop the stopper first? Stopping the stopper
// first seems more reasonable since grpc.Stop closes the listener right
// away (and who knows whether gRPC-goroutines are tied up in some
// stopper task somewhere).
s.server.grpc.Stop()
s.server.stopper.Stop(ctx)
}()

Expand All @@ -1367,6 +1368,23 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Second):
// This is a hack to work around the problem in
// https://github.com/cockroachdb/cockroach/issues/37425#issuecomment-494336131
//
// There appear to be deadlock scenarios in which we don't manage to
// fully stop the grpc server (which implies closing the listener, i.e.
// seeming dead to the outside world) or don't manage to shut down the
// stopper (the evidence in #37425 is inconclusive which one it is).
//
// Other problems in this area are known, such as
// https://github.com/cockroachdb/cockroach/pull/31692
//
// The signal-based shutdown path uses a similar time-based escape hatch.
// Until we spend (potentially lots of time to) understand and fix this
// issue, this will serve us well.
os.Exit(1)
return errors.New("unreachable")
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,10 @@ func (c *checkConsistencyGenerator) Start() error {
Key: c.from,
EndKey: c.to,
},
Mode: c.mode,
WithDiff: true,
Mode: c.mode,
// No meaningful diff can be created if we're checking the stats only,
// so request one only if a full check is run.
WithDiff: c.mode == roachpb.ChecksumMode_CHECK_FULL,
})
if err := c.db.Run(c.ctx, &b); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func declareKeysComputeChecksum(
// Version numbers for Replica checksum computation. Requests silently no-op
// unless the versions are compatible.
const (
ReplicaChecksumVersion = 3
ReplicaChecksumVersion = 4
ReplicaChecksumGCInterval = time.Hour
)

Expand Down

0 comments on commit 5da9204

Please sign in to comment.