Skip to content

Commit

Permalink
roachtest: deflake decommission/nodes=4 test
Browse files Browse the repository at this point in the history
Fixes cockroachdb#51713. Previously this test used to decommission+wipe node 1,
which played a bit badly with cockroachdb#51329, where node 1 is tasked to
initialize the cluster (and no-ops attempts on finding a persisted
file). Given that previously this test wiped away the store dir in its
entirety, this test attempted to re-initialize the cluster and naturally
failed to do so. As a short workaround we just decommission+wipe nodes
towards the end of the cluster list (so node 4), instead of the
beginning. And while here, clean the test code up a bit.

Release note: None.
  • Loading branch information
irfansharif committed Jul 28, 2020
1 parent 3edbe4a commit 30aaf6d
Showing 1 changed file with 70 additions and 56 deletions.
126 changes: 70 additions & 56 deletions pkg/cmd/roachtest/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ func registerDecommission(r *testRegistry) {
duration := time.Hour

r.Add(testSpec{
Name: fmt.Sprintf("decommission/nodes=%d/duration=%s", numNodes, duration),
Owner: OwnerKV,
Cluster: makeClusterSpec(4),
Name: fmt.Sprintf("decommission/nodes=%d/duration=%s", numNodes, duration),
Owner: OwnerKV,
MinVersion: "v20.2.0",
Cluster: makeClusterSpec(4),
Run: func(ctx context.Context, t *test, c *cluster) {
if local {
duration = 3 * time.Minute
duration = 5 * time.Minute
t.l.Printf("running with duration=%s in local mode\n", duration)
}
runDecommission(t, c, numNodes, duration)
runDecommission(ctx, t, c, numNodes, duration)
},
})
}
Expand All @@ -63,14 +64,20 @@ func registerDecommission(r *testRegistry) {
}
}

// runDecommission decommissions and wipes nodes in a cluster repeatedly,
// alternating between the node being shut down gracefully before and after the
// decommissioning operation, while some light load is running against the
// cluster (to manually verify that the qps don't dip too much).
//
// TODO(tschottdorf): verify that the logs don't contain the messages
// that would spam the log before #23605. I wonder if we should really
// start grepping the logs. An alternative is to introduce a metric
// that would have signaled this and check that instead.
func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
ctx := context.Background()

func runDecommission(ctx context.Context, t *test, c *cluster, nodes int, duration time.Duration) {
const defaultReplicationFactor = 3
if defaultReplicationFactor > nodes {
t.Fatal("improper configuration: replication factor greater than number of nodes in the test")
}
// The number of nodes we're going to cycle through. Since we're sometimes
// killing the nodes and then removing them, this means having to be careful
// with loss of quorum. So only ever touch a fixed minority of nodes and
Expand All @@ -79,18 +86,19 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
// at some point.
numDecom := (defaultReplicationFactor - 1) / 2

c.Put(ctx, workload, "./workload", c.Node(nodes))
// node1 is kept pinned (i.e. not decommissioned/restarted), and is the node
// through which we run the workload and other queries.
pinnedNode := 1
c.Put(ctx, cockroach, "./cockroach", c.All())
c.Put(ctx, workload, "./workload", c.Node(pinnedNode))

for i := 1; i <= numDecom; i++ {
for i := 1; i <= nodes; i++ {
c.Start(ctx, t, c.Node(i), startArgs(fmt.Sprintf("-a=--attrs=node%d", i)))
}
c.Run(ctx, c.Node(pinnedNode), `./workload init kv --drop`)

c.Start(ctx, t, c.Range(numDecom+1, nodes))
c.Run(ctx, c.Node(nodes), `./workload init kv --drop`)

waitReplicatedAwayFrom := func(downNodeID string) error {
db := c.Conn(ctx, nodes)
waitReplicatedAwayFrom := func(downNodeID int) error {
db := c.Conn(ctx, pinnedNode)
defer func() {
_ = db.Close()
}()
Expand Down Expand Up @@ -121,52 +129,47 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
return nil
}

waitUpReplicated := func(targetNodeID string) error {
db := c.Conn(ctx, nodes)
waitUpReplicated := func(targetNode, targetNodeID int) error {
db := c.Conn(ctx, pinnedNode)
defer func() {
_ = db.Close()
}()

for ok := false; !ok; {
var count int
for {
// Check to see that there are no ranges where the target node is
// not part of the replica set.
stmtReplicaCount := fmt.Sprintf(
`SELECT count(*) = 0 FROM crdb_internal.ranges WHERE array_position(replicas, %s) IS NULL and database_name = 'kv';`, targetNodeID)
t.Status(stmtReplicaCount)
if err := db.QueryRow(stmtReplicaCount).Scan(&ok); err != nil {
`SELECT count(*) FROM crdb_internal.ranges WHERE array_position(replicas, %d) IS NULL and database_name = 'kv';`, targetNodeID)
if err := db.QueryRow(stmtReplicaCount).Scan(&count); err != nil {
return err
}
t.Status(fmt.Sprintf("node%d missing %d replica(s)", targetNode, count))
if count == 0 {
break
}
time.Sleep(time.Second)
}
return nil
}

if err := waitReplicatedAwayFrom("0" /* no down node */); err != nil {
if err := waitReplicatedAwayFrom(0 /* no down node */); err != nil {
t.Fatal(err)
}

loadDuration := " --duration=" + duration.String()

workloads := []string{
// TODO(tschottdorf): in remote mode, the ui shows that we consistently write
// at 330 qps (despite asking for 500 below). Locally we get 500qps (and a lot
// more without rate limiting). Check what's up with that.
"./workload run kv --max-rate 500 --tolerate-errors" + loadDuration + " {pgurl:1-%d}",
fmt.Sprintf("./workload run kv --max-rate 500 --tolerate-errors --duration=%s {pgurl:1-%d}", duration.String(), nodes),
}

run := func(stmtStr string) {
db := c.Conn(ctx, nodes)
run := func(stmt string) {
db := c.Conn(ctx, pinnedNode)
defer db.Close()
stmt := fmt.Sprintf(stmtStr, "", "=")
// We are removing the EXPERIMENTAL keyword in 2.1. For compatibility
// with 2.0 clusters we still need to try with it if the
// syntax without EXPERIMENTAL fails.
// TODO(knz): Remove this in 2.2.

t.Status(stmt)
_, err := db.ExecContext(ctx, stmt)
if err != nil && strings.Contains(err.Error(), "syntax error") {
stmt = fmt.Sprintf(stmtStr, "EXPERIMENTAL", "")
t.Status(stmt)
_, err = db.ExecContext(ctx, stmt)
}
if err != nil {
t.Fatal(err)
}
Expand All @@ -177,20 +180,19 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
m, ctx = errgroup.WithContext(ctx)
for _, cmd := range workloads {
cmd := cmd // copy is important for goroutine

cmd = fmt.Sprintf(cmd, nodes)
m.Go(func() error {
return c.RunE(ctx, c.Node(nodes), cmd)
return c.RunE(ctx, c.Node(pinnedNode), cmd)
})
}

m.Go(func() error {
nodeID := func(node int) (string, error) {
getNodeID := func(node int) (int, error) {
dbNode := c.Conn(ctx, node)
defer dbNode.Close()
var nodeID string

var nodeID int
if err := dbNode.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1`).Scan(&nodeID); err != nil {
return "", err
return 0, err
}
return nodeID, nil
}
Expand All @@ -201,21 +203,32 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
return c.RunE(ctx, c.Node(node), "./cockroach quit --insecure --host=:"+port)
}

decom := func(id string) error {
port := fmt.Sprintf("{pgport:%d}", nodes) // always use last node
t.Status("decommissioning node", id)
return c.RunE(ctx, c.Node(nodes), "./cockroach node decommission --insecure --wait=all --host=:"+port+" "+id)
decom := func(id int) error {
port := fmt.Sprintf("{pgport:%d}", pinnedNode) // always use the pinned node
t.Status(fmt.Sprintf("decommissioning node %d", id))
return c.RunE(ctx, c.Node(pinnedNode), fmt.Sprintf("./cockroach node decommission --insecure --wait=all --host=:%s %d", port, id))
}

for tBegin, whileDown, node := timeutil.Now(), true, 1; timeutil.Since(tBegin) <= duration; whileDown, node = !whileDown, (node%numDecom)+1 {
tBegin, whileDown := timeutil.Now(), true
node := nodes
for timeutil.Since(tBegin) <= duration {
// Alternate between the node being shut down gracefully before and
// after the decommissioning operation.
whileDown = !whileDown
// Cycle through the last numDecom nodes.
node = nodes - (node % numDecom)
if node == pinnedNode {
t.Fatalf("programming error: not expecting to decommission/wipe node%d", pinnedNode)
}

t.Status(fmt.Sprintf("decommissioning %d (down=%t)", node, whileDown))
id, err := nodeID(node)
nodeID, err := getNodeID(node)
if err != nil {
return err
}
run(fmt.Sprintf(`ALTER RANGE default %%[1]s CONFIGURE ZONE %%[2]s 'constraints: {"+node%d"}'`, node))

if err := waitUpReplicated(id); err != nil {
run(fmt.Sprintf(`ALTER RANGE default CONFIGURE ZONE = 'constraints: {"+node%d"}'`, node))
if err := waitUpReplicated(node, nodeID); err != nil {
return err
}

Expand All @@ -225,13 +238,13 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
}
}

run(fmt.Sprintf(`ALTER RANGE default %%[1]s CONFIGURE ZONE %%[2]s 'constraints: {"-node%d"}'`, node))
run(fmt.Sprintf(`ALTER RANGE default CONFIGURE ZONE = 'constraints: {"-node%d"}'`, node))

if err := decom(id); err != nil {
if err := decom(nodeID); err != nil {
return err
}

if err := waitReplicatedAwayFrom(id); err != nil {
if err := waitReplicatedAwayFrom(nodeID); err != nil {
return err
}

Expand All @@ -241,14 +254,15 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
}
}

// Wipe the node and re-add to cluster with a new node ID.
if err := c.RunE(ctx, c.Node(node), "rm -rf {store-dir}"); err != nil {
return err
}

db := c.Conn(ctx, 1)
db := c.Conn(ctx, pinnedNode)
defer db.Close()

sArgs := startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d", c.InternalAddr(ctx, c.Node(nodes))[0], node))
sArgs := startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d", c.InternalAddr(ctx, c.Node(pinnedNode))[0], node))
if err := c.StartE(ctx, c.Node(node), sArgs); err != nil {
return err
}
Expand Down

0 comments on commit 30aaf6d

Please sign in to comment.