Skip to content

Commit

Permalink
Merge #30977
Browse files Browse the repository at this point in the history
30977: roachtest: rewrite the test runner r=andreimatei a=andreimatei

This patch reimplements the roachtest test runner. Some issues with the
previous version:
- roachtest --paralleism is ignored when running a single test
- repeated runs of that test don't reuse clusters
- cluster reuse is achieved through an inflexible subtest mechanism
- there's no way to control the quota used on GCE. There's only a way to
control the number of tests that run in parallel

This patch takes a clean-slate approach to the runner. It introduces a
runner that takes a --cpu-quota (#cloud cpus) and then, building on the
cluster reuse policies introduced in the previous commit, tries to
schedule tests most efficiently and with the greatest parallelism while
staying under that quota (and also while staying under the
--parallelism, which now acts purely to protect against too many tests
mthrashing the local machine).
The runner starts --parallelism workers and lets them compete on a
ResourceGovernor which manages the cloud quota.
Tests are seen abstractly as units of work. There's no more subtests,
and --count is supported naturally as more work.
The scheduling policy is not advanced: at any point in time, a worker
has a cluster and will continuously select tests that can reuse that
cluster. When no more tests can reuse it, the cluster is destroyed, new
resources acquired and a new cluster created. Within multiple tests that
can all reuse a cluster, ones that don't soil the cluster are preferred.
Within otherwise equal tests, the ones with more runs remaining are
preferred.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Jul 2, 2019
2 parents c5bbcbd + 0a776f4 commit 6919482
Show file tree
Hide file tree
Showing 69 changed files with 2,981 additions and 1,835 deletions.
3 changes: 2 additions & 1 deletion build/teamcity-local-roachtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ tc_start_block "Run local roachtests"
run build/builder.sh env \
COCKROACH_DEV_LICENSE="$COCKROACH_DEV_LICENSE" \
stdbuf -oL -eL \
./bin/roachtest run '(acceptance|kv/splits|cdc/bank)' \
./bin/roachtest run acceptance kv/splits cdc/bank \
--local \
--parallelism=1 \
--cockroach "cockroach" \
--roachprod "bin/roachprod" \
--workload "bin/workload" \
Expand Down
51 changes: 14 additions & 37 deletions pkg/cmd/roachtest/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,15 @@ package main

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/version"
)

func registerAcceptance(r *registry) {
// The acceptance tests all share a cluster and run sequentially. In
// local mode the acceptance tests should be configured to run within a
// minute or so as these tests are run on every merge to master.

func registerAcceptance(r *testRegistry) {
testCases := []struct {
name string
fn func(ctx context.Context, t *test, c *cluster)
skip string
// roachtest needs to be taught about MinVersion for subtests.
// See https://github.com/cockroachdb/cockroach/issues/36752.
//
// minVersion string
name string
fn func(ctx context.Context, t *test, c *cluster)
skip string
minVersion string
}{
// Sorted. Please keep it that way.
{name: "bank/cluster-recovery", fn: runBankClusterRecovery},
Expand All @@ -53,43 +43,30 @@ func registerAcceptance(r *registry) {
{name: "gossip/locality-address", fn: runCheckLocalityIPAddress},
{name: "rapid-restart", fn: runRapidRestart},
{name: "status-server", fn: runStatusServer},
{
name: "version-upgrade",
fn: runVersionUpgrade,
// NB: this is hacked back in below.
// minVersion: "v19.2.0",
},
{name: "version-upgrade", fn: runVersionUpgrade, minVersion: "v19.1.0"},
}
tags := []string{"default", "quick"}
const numNodes = 4
spec := testSpec{
specTemplate := testSpec{
// NB: teamcity-post-failures.py relies on the acceptance tests
// being named acceptance/<testname> and will avoid posting a
// blank issue for the "acceptance" parent test. Make sure to
// teach that script (if it's still used at that point) should
// this naming scheme ever change (or issues such as #33519)
// will be posted.
Name: "acceptance",
Timeout: 10 * time.Minute,
Tags: tags,
Cluster: makeClusterSpec(numNodes),
}

for _, tc := range testCases {
tc := tc
minV := "v19.2.0-0"
if tc.name == "version-upgrade" && !r.buildVersion.AtLeast(version.MustParse(minV)) {
tc.skip = fmt.Sprintf("skipped on %s (want at least %s)", r.buildVersion, minV)
tc := tc // copy for closure
spec := specTemplate
spec.Name = specTemplate.Name + "/" + tc.name
spec.Run = func(ctx context.Context, t *test, c *cluster) {
tc.fn(ctx, t, c)
}
spec.SubTests = append(spec.SubTests, testSpec{
Skip: tc.skip,
Name: tc.name,
Timeout: 10 * time.Minute,
Tags: tags,
Run: func(ctx context.Context, t *test, c *cluster) {
c.Wipe(ctx)
tc.fn(ctx, t, c)
},
})
r.Add(spec)
}
r.Add(spec)
}
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pkg/errors"
)

func registerAllocator(r *registry) {
func registerAllocator(r *testRegistry) {
runAllocator := func(ctx context.Context, t *test, c *cluster, start int, maxStdDev float64) {
const fixturePath = `gs://cockroach-fixtures/workload/tpch/scalefactor=10/backup`
c.Put(ctx, cockroach, "./cockroach")
Expand All @@ -45,10 +45,10 @@ func registerAllocator(r *registry) {
m.Wait()

// Start the remaining nodes to kick off upreplication/rebalancing.
c.Start(ctx, t, c.Range(start+1, c.nodes), args)
c.Start(ctx, t, c.Range(start+1, c.spec.NodeCount), args)

c.Run(ctx, c.Node(1), `./workload init kv --drop`)
for node := 1; node <= c.nodes; node++ {
for node := 1; node <= c.spec.NodeCount; node++ {
node := node
// TODO(dan): Ideally, the test would fail if this queryload failed,
// but we can't put it in monitor as-is because the test deadlocks.
Expand Down Expand Up @@ -247,7 +247,7 @@ func waitForRebalance(ctx context.Context, l *logger, db *gosql.DB, maxStdDev fl
}

func runWideReplication(ctx context.Context, t *test, c *cluster) {
nodes := c.nodes
nodes := c.spec.NodeCount
if nodes != 9 {
t.Fatalf("9-node cluster required")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/roachtest/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pkg/errors"
)

func registerBackup(r *registry) {
func registerBackup(r *testRegistry) {
backup2TBSpec := makeClusterSpec(10)
r.Add(testSpec{
Name: fmt.Sprintf("backup2TB/%s", backup2TBSpec),
Expand Down Expand Up @@ -100,7 +100,7 @@ func registerBackup(r *registry) {
t.Status(`workload initialization`)
cmd := fmt.Sprintf(
"./workload init tpcc --warehouses=%d {pgurl:1-%d}",
warehouses, c.nodes,
warehouses, c.spec.NodeCount,
)
c.Run(ctx, c.Node(1), cmd)

Expand All @@ -122,7 +122,7 @@ func registerBackup(r *registry) {
go func() {
cmd := fmt.Sprintf(
"./workload run tpcc --warehouses=%d {pgurl:1-%d}",
warehouses, c.nodes,
warehouses, c.spec.NodeCount,
)

cmdDone <- c.RunE(ctx, c.Node(1), cmd)
Expand Down
32 changes: 16 additions & 16 deletions pkg/cmd/roachtest/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *bankState) startChaosMonkey(

// Don't begin the chaos monkey until all nodes are serving SQL connections.
// This ensures that we don't test cluster initialization under chaos.
for i := 1; i <= c.nodes; i++ {
for i := 1; i <= c.spec.NodeCount; i++ {
db := c.Conn(ctx, i)
var res int
err := db.QueryRowContext(ctx, `SELECT 1`).Scan(&res)
Expand Down Expand Up @@ -296,9 +296,9 @@ func (s *bankState) startSplitMonkey(ctx context.Context, d time.Duration, c *cl
defer s.waitGroup.Done()

r := newRand()
nodes := make([]string, c.nodes)
nodes := make([]string, c.spec.NodeCount)

for i := 0; i < c.nodes; i++ {
for i := 0; i < c.spec.NodeCount; i++ {
nodes[i] = strconv.Itoa(i + 1)
}

Expand Down Expand Up @@ -413,7 +413,7 @@ func (s *bankState) waitClientsStop(
curRound, strings.Join(strCounts, ", "))
} else {
newOutput = fmt.Sprintf("test finished, waiting for shutdown of %d clients",
c.nodes-doneClients)
c.spec.NodeCount-doneClients)
}
// This just stops the logs from being a bit too spammy.
if newOutput != prevOutput {
Expand All @@ -431,14 +431,14 @@ func runBankClusterRecovery(ctx context.Context, t *test, c *cluster) {
// TODO(peter): Run for longer when !local.
start := timeutil.Now()
s := &bankState{
errChan: make(chan error, c.nodes),
errChan: make(chan error, c.spec.NodeCount),
deadline: start.Add(time.Minute),
clients: make([]bankClient, c.nodes),
clients: make([]bankClient, c.spec.NodeCount),
}
s.initBank(ctx, t, c)
defer s.waitGroup.Wait()

for i := 0; i < c.nodes; i++ {
for i := 0; i < c.spec.NodeCount; i++ {
s.clients[i].Lock()
s.initClient(ctx, c, i+1)
s.clients[i].Unlock()
Expand All @@ -449,7 +449,7 @@ func runBankClusterRecovery(ctx context.Context, t *test, c *cluster) {
rnd, seed := randutil.NewPseudoRand()
t.l.Printf("monkey starts (seed %d)\n", seed)
pickNodes := func() []int {
nodes := rnd.Perm(c.nodes)[:rnd.Intn(c.nodes)+1]
nodes := rnd.Perm(c.spec.NodeCount)[:rnd.Intn(c.spec.NodeCount)+1]
for i := range nodes {
nodes[i]++
}
Expand Down Expand Up @@ -485,7 +485,7 @@ func runBankNodeRestart(ctx context.Context, t *test, c *cluster) {
s.initBank(ctx, t, c)
defer s.waitGroup.Wait()

clientIdx := c.nodes
clientIdx := c.spec.NodeCount
client := &s.clients[0]
client.db = c.Conn(ctx, clientIdx)

Expand Down Expand Up @@ -515,14 +515,14 @@ func runBankNodeZeroSum(ctx context.Context, t *test, c *cluster) {

start := timeutil.Now()
s := &bankState{
errChan: make(chan error, c.nodes),
errChan: make(chan error, c.spec.NodeCount),
deadline: start.Add(time.Minute),
clients: make([]bankClient, c.nodes),
clients: make([]bankClient, c.spec.NodeCount),
}
s.initBank(ctx, t, c)
defer s.waitGroup.Wait()

for i := 0; i < c.nodes; i++ {
for i := 0; i < c.spec.NodeCount; i++ {
s.clients[i].Lock()
s.initClient(ctx, c, i+1)
s.clients[i].Unlock()
Expand Down Expand Up @@ -551,14 +551,14 @@ func runBankZeroSumRestart(ctx context.Context, t *test, c *cluster) {

start := timeutil.Now()
s := &bankState{
errChan: make(chan error, c.nodes),
errChan: make(chan error, c.spec.NodeCount),
deadline: start.Add(time.Minute),
clients: make([]bankClient, c.nodes),
clients: make([]bankClient, c.spec.NodeCount),
}
s.initBank(ctx, t, c)
defer s.waitGroup.Wait()

for i := 0; i < c.nodes; i++ {
for i := 0; i < c.spec.NodeCount; i++ {
s.clients[i].Lock()
s.initClient(ctx, c, i+1)
s.clients[i].Unlock()
Expand All @@ -568,7 +568,7 @@ func runBankZeroSumRestart(ctx context.Context, t *test, c *cluster) {
rnd, seed := randutil.NewPseudoRand()
c.l.Printf("monkey starts (seed %d)\n", seed)
pickNodes := func() []int {
nodes := rnd.Perm(c.nodes)[:rnd.Intn(c.nodes)+1]
nodes := rnd.Perm(c.spec.NodeCount)[:rnd.Intn(c.spec.NodeCount)+1]
for i := range nodes {
nodes[i]++
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/roachtest/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func repeatRunWithBuffer(
// repeatGitCloneE is the same function as c.GitCloneE but with an automatic
// retry loop.
func repeatGitCloneE(
ctx context.Context, c *cluster, src, dest, branch string, node nodeListOption,
ctx context.Context, l *logger, c *cluster, src, dest, branch string, node nodeListOption,
) error {
var lastError error
for attempt, r := 0, retry.StartWithCtx(ctx, canaryRetryOptions); r.Next(); {
Expand All @@ -165,8 +165,8 @@ func repeatGitCloneE(
return fmt.Errorf("test has failed")
}
attempt++
c.l.Printf("attempt %d - clone %s", attempt, src)
lastError = c.GitCloneE(ctx, src, dest, branch, node)
l.Printf("attempt %d - clone %s", attempt, src)
lastError = c.GitClone(ctx, l, src, dest, branch, node)
if lastError != nil {
c.l.Printf("error - retrying: %s", lastError)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
//
// Once DistSQL queries provide more testing knobs, these tests can likely be
// replaced with unit tests.
func registerCancel(r *registry) {
func registerCancel(r *testRegistry) {
runCancel := func(ctx context.Context, t *test, c *cluster,
queries []string, warehouses int, useDistsql bool) {
c.Put(ctx, cockroach, "./cockroach", c.All())
Expand Down
12 changes: 6 additions & 6 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ type cdcTestArgs struct {
func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
// Skip the poller test on v19.2. After 19.2 is out, we should likely delete
// the test entirely.
if !args.rangefeed && t.registry.buildVersion.Compare(version.MustParse(`v19.1.0-0`)) > 0 {
if !args.rangefeed && t.buildVersion.Compare(version.MustParse(`v19.1.0-0`)) > 0 {
t.Skip("no poller in >= v19.2.0", "")
}

crdbNodes := c.Range(1, c.nodes-1)
workloadNode := c.Node(c.nodes)
kafkaNode := c.Node(c.nodes)
crdbNodes := c.Range(1, c.spec.NodeCount-1)
workloadNode := c.Node(c.spec.NodeCount)
kafkaNode := c.Node(c.spec.NodeCount)
c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload", workloadNode)
c.Start(ctx, t, crdbNodes)
Expand Down Expand Up @@ -237,7 +237,7 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) {
// spam.
c.Run(ctx, c.All(), `mkdir -p logs`)

crdbNodes, workloadNode, kafkaNode := c.Range(1, c.nodes-1), c.Node(c.nodes), c.Node(c.nodes)
crdbNodes, workloadNode, kafkaNode := c.Range(1, c.spec.NodeCount-1), c.Node(c.spec.NodeCount), c.Node(c.spec.NodeCount)
c.Put(ctx, cockroach, "./cockroach", crdbNodes)
c.Put(ctx, workload, "./workload", workloadNode)
c.Start(ctx, t, crdbNodes)
Expand Down Expand Up @@ -460,7 +460,7 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) {
}
}

func registerCDC(r *registry) {
func registerCDC(r *testRegistry) {
useRangeFeed := true
if r.buildVersion.Compare(version.MustParse(`v2.2.0-0`)) < 0 {
// RangeFeed is not production ready in 2.1, so run the tests with the
Expand Down
7 changes: 5 additions & 2 deletions pkg/cmd/roachtest/clearrange.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func registerClearRange(r *registry) {
func registerClearRange(r *testRegistry) {
for _, checks := range []bool{true, false} {
checks := checks
r.Add(testSpec{
Expand All @@ -27,7 +27,10 @@ func registerClearRange(r *registry) {
// to <3:30h but it varies.
Timeout: 5*time.Hour + 90*time.Minute,
MinVersion: `v2.2.0`,
Cluster: makeClusterSpec(10),
// This test reformats a drive to ZFS, so we don't want it reused.
// TODO(andrei): Can the test itself reuse the cluster (under --count=2)?
// In other words, would a OnlyTagged("clearrange") policy be good?
Cluster: makeClusterSpec(10, reuseNone()),
Run: func(ctx context.Context, t *test, c *cluster) {
runClearRange(ctx, t, c, checks)
},
Expand Down
Loading

0 comments on commit 6919482

Please sign in to comment.