diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 6496a67faaaa..103a6cfcdcf7 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -22,6 +22,7 @@ import ( gosql "database/sql" "fmt" "io" + "net" "net/url" "os" "os/exec" @@ -468,6 +469,22 @@ func newCluster(ctx context.Context, t testI, nodes []nodeSpec) *cluster { return c } +// clone creates a new cluster object that refers to the same cluster as the +// receiver, but is associated with the specified test. +func (c *cluster) clone(t *test) *cluster { + l, err := rootLogger(t.Name()) + if err != nil { + t.Fatal(err) + } + return &cluster{ + name: c.name, + nodes: c.nodes, + status: t.Status, + t: t, + l: l, + } +} + // All returns a node list containing all of the nodes in the cluster. func (c *cluster) All() nodeListOption { return c.Range(1, c.nodes) @@ -535,6 +552,16 @@ func (c *cluster) destroy(ctx context.Context) { } } +// Run a command with output redirected to the logs instead of to os.Stdout +// (which doesn't go anywhere I've been able to find) Don't use this if you're +// going to call cmd.CombinedOutput or cmd.Output. +func (c *cluster) LoggedCommand(ctx context.Context, arg0 string, args ...string) *exec.Cmd { + cmd := exec.CommandContext(ctx, arg0, args...) + cmd.Stdout = c.l.stdout + cmd.Stderr = c.l.stderr + return cmd +} + // Put a local file to all of the machines in a cluster. func (c *cluster) Put(ctx context.Context, src, dest string, opts ...option) { if c.t.Failed() { @@ -711,32 +738,39 @@ func (c *cluster) RunWithBuffer( // address. In general, inter-cluster communication and should use internal IPs // and communication from a test driver to nodes in a cluster should use // external IPs. -func (c *cluster) pgURL(ctx context.Context, node int, external bool) string { +func (c *cluster) pgURL(ctx context.Context, node nodeListOption, external bool) []string { args := []string{`pgurl`} if external { args = append(args, `--external`) } - args = append(args, c.makeNodes(c.Node(node))) + args = append(args, c.makeNodes(node)) cmd := exec.CommandContext(ctx, `roachprod`, args...) output, err := cmd.CombinedOutput() if err != nil { fmt.Println(strings.Join(cmd.Args, ` `)) c.t.Fatal(err) } - return strings.Trim(string(output), "' \n") + urls := strings.Split(strings.TrimSpace(string(output)), " ") + for i := range urls { + urls[i] = strings.Trim(urls[i], "'") + } + return urls } -// InternalPGUrl returns the internal Postgres endpoint for the specified node. -func (c *cluster) InternalPGUrl(ctx context.Context, node int) string { +// InternalPGUrl returns the internal Postgres endpoint for the specified nodes. +func (c *cluster) InternalPGUrl(ctx context.Context, node nodeListOption) []string { return c.pgURL(ctx, node, false /* external */) } -// ExternalPGUrl returns the external Postgres endpoint for the specified node. -func (c *cluster) ExternalPGUrl(ctx context.Context, node int) string { +// Silence unused warning. +var _ = (&cluster{}).InternalPGUrl + +// ExternalPGUrl returns the external Postgres endpoint for the specified nodes. +func (c *cluster) ExternalPGUrl(ctx context.Context, node nodeListOption) []string { return c.pgURL(ctx, node, true /* external */) } -func urlToIP(c *cluster, pgURL string) string { +func urlToAddr(c *cluster, pgURL string) string { u, err := url.Parse(pgURL) if err != nil { c.t.Fatal(err) @@ -744,16 +778,50 @@ func urlToIP(c *cluster, pgURL string) string { return u.Host } -// InternalIP returns the internal IP address in the form host:port for the -// specified node. -func (c *cluster) InternalIP(ctx context.Context, node int) string { - return urlToIP(c, c.InternalPGUrl(ctx, node)) +func addrToIP(c *cluster, addr string) string { + host, _, err := net.SplitHostPort(addr) + if err != nil { + c.t.Fatal(err) + } + return host } -// ExternalIP returns the external IP address in the form host:port for the +// InternalAddr returns the internal address in the form host:port for the +// specified nodes. +func (c *cluster) InternalAddr(ctx context.Context, node nodeListOption) []string { + var addrs []string + for _, u := range c.pgURL(ctx, node, false /* external */) { + addrs = append(addrs, urlToAddr(c, u)) + } + return addrs +} + +// InternalIP returns the internal IP addresses for the specified nodes. +func (c *cluster) InternalIP(ctx context.Context, node nodeListOption) []string { + var ips []string + for _, addr := range c.InternalAddr(ctx, node) { + ips = append(ips, addrToIP(c, addr)) + } + return ips +} + +// ExternalAddr returns the external address in the form host:port for the // specified node. -func (c *cluster) ExternalIP(ctx context.Context, node int) string { - return urlToIP(c, c.ExternalPGUrl(ctx, node)) +func (c *cluster) ExternalAddr(ctx context.Context, node nodeListOption) []string { + var addrs []string + for _, u := range c.pgURL(ctx, node, true /* external */) { + addrs = append(addrs, urlToAddr(c, u)) + } + return addrs +} + +// ExternalIP returns the external IP addresses for the specified node. +func (c *cluster) ExternalIP(ctx context.Context, node nodeListOption) []string { + var ips []string + for _, addr := range c.ExternalAddr(ctx, node) { + ips = append(ips, addrToIP(c, addr)) + } + return ips } // Silence unused warning. @@ -761,7 +829,7 @@ var _ = (&cluster{}).ExternalIP // Conn returns a SQL connection to the specified node. func (c *cluster) Conn(ctx context.Context, node int) *gosql.DB { - url := c.ExternalPGUrl(ctx, node) + url := c.ExternalPGUrl(ctx, c.Node(node))[0] db, err := gosql.Open("postgres", url) if err != nil { c.t.Fatal(err) diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index be8b02d12344..f62a44c38538 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -206,7 +206,8 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { db := c.Conn(ctx, 1) defer db.Close() - c.Start(ctx, c.Node(node), startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d", c.InternalIP(ctx, nodes), node))) + c.Start(ctx, c.Node(node), startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d", + c.InternalAddr(ctx, c.Node(nodes))[0], node))) } // TODO(tschottdorf): run some ui sanity checks about decommissioned nodes // having disappeared. Verify that the workloads don't dip their qps or diff --git a/pkg/cmd/roachtest/jepsen.go b/pkg/cmd/roachtest/jepsen.go index 02eb14cbda2c..245f4456e2b6 100644 --- a/pkg/cmd/roachtest/jepsen.go +++ b/pkg/cmd/roachtest/jepsen.go @@ -22,7 +22,6 @@ import ( "os" "os/exec" "path/filepath" - "regexp" "strings" "time" @@ -41,45 +40,48 @@ var jepsenTests = []string{ "sets", } -var jepsenNemeses = []string{ - "--nemesis majority-ring", - "--nemesis split", - "--nemesis start-kill-2", - "--nemesis start-stop-2", - "--nemesis strobe-skews", - "--nemesis subcritical-skews", - "--nemesis majority-ring --nemesis2 subcritical-skews", - "--nemesis subcritical-skews --nemesis2 start-kill-2", - "--nemesis majority-ring --nemesis2 start-kill-2", - "--nemesis parts --nemesis2 start-kill-2", +var jepsenNemeses = []struct { + name, config string +}{ + {"majority-ring", "--nemesis majority-ring"}, + {"split", "--nemesis split"}, + {"start-kill-2", "--nemesis start-kill-2"}, + {"start-stop-2", "--nemesis start-stop-2"}, + {"strobe-skews", "--nemesis strobe-skews"}, + {"subcritical-skews", "--nemesis subcritical-skews"}, + {"majority-ring-subcritical-skews", "--nemesis majority-ring --nemesis2 subcritical-skews"}, + {"subcritical-skews-start-kill-2", "--nemesis subcritical-skews --nemesis2 start-kill-2"}, + {"majority-ring-start-kill-2", "--nemesis majority-ring --nemesis2 start-kill-2"}, + {"parts-start-kill-2", "--nemesis parts --nemesis2 start-kill-2"}, } -func runJepsen(ctx context.Context, t *test, c *cluster) { - if c.name == "local" { +func initJepsen(ctx context.Context, t *test, c *cluster) { + // NB: comment this out to see the commands jepsen would run locally. + if c.isLocal() { t.Fatal("local execution not supported") } - controller := c.Node(c.nodes) - workers := c.Range(1, c.nodes-1) - // Wrap roachtest's primitive logging in something more like util/log - logf := func(f string, args ...interface{}) { - // This log prefix matches the one (sometimes) used in roachprod - c.l.printf(timeutil.Now().Format("2006/01/02 15:04:05 ")) - c.l.printf(f, args...) - c.l.printf("\n") + // Check to see if the cluster has already been initialized. + if err := c.RunE(ctx, c.Node(1), "test -e jepsen_initialized"); err == nil { + c.l.printf("cluster already initialized\n") + return } + c.l.printf("initializing cluster\n") + t.Status("initializing cluster") + defer func() { + c.Run(ctx, c.Node(1), "touch jepsen_initialized") + }() - // Run a command with output redirected to the logs instead of to - // os.Stdout (which doesn't go anywhere I've been able to find) - // Don't use this if you're going to call cmd.CombinedOutput or - // cmd.Output. - loggedCommand := func(ctx context.Context, arg0 string, args ...string) *exec.Cmd { - cmd := exec.CommandContext(ctx, arg0, args...) - cmd.Stdout = c.l.stdout - cmd.Stderr = c.l.stderr - return cmd + if c.isLocal() { + // We can't perform any of the remaining setup locally and while we can't + // run jepsen locally we let the test run to indicate which commands it + // would have run remotely. + return } + controller := c.Node(c.nodes) + workers := c.Range(1, c.nodes-1) + // TODO(bdarnell): Does this blanket apt update matter? I just // copied it from the old jepsen scripts. It's slow, so we should // probably either remove it or use a new base image with more of @@ -107,29 +109,6 @@ func runJepsen(ctx context.Context, t *test, c *cluster) { c.Run(ctx, controller, "test -x lein || (curl -o lein https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein && chmod +x lein)") c.GitClone(ctx, "https://github.com/cockroachdb/jepsen", "/mnt/data1/jepsen", "tc-nightly", controller) - // Get the IP addresses for all our workers. - cmd := exec.CommandContext(ctx, "roachprod", "run", c.makeNodes(workers), "--", "hostname", "-I") - output, err := cmd.CombinedOutput() - if err != nil { - t.Fatal(err) - } - var workerIPs []string - var nodeFlags []string - lines := strings.Split(string(output), "\n") - // TODO(bdarnell): add an option to `roachprod run` for - // machine-friendly output (or merge roachprod into roachtest so we - // can access it here). - lineRE := regexp.MustCompile(`\s*[0-9]+:\s*([0-9.]+)`) - for i := range lines { - fields := lineRE.FindStringSubmatch(lines[i]) - if len(fields) == 0 { - continue - } - workerIPs = append(workerIPs, fields[1]) - nodeFlags = append(nodeFlags, "-n "+fields[1]) - } - nodesStr := strings.Join(nodeFlags, " ") - // SSH setup: create a key on the controller. tempDir, err := ioutil.TempDir("", "jepsen") if err != nil { @@ -137,7 +116,7 @@ func runJepsen(ctx context.Context, t *test, c *cluster) { } c.Run(ctx, controller, "sh", "-c", `"test -f .ssh/id_rsa || ssh-keygen -f .ssh/id_rsa -t rsa -N ''"`) pubSSHKey := filepath.Join(tempDir, "id_rsa.pub") - cmd = loggedCommand(ctx, "roachprod", "get", c.makeNodes(controller), ".ssh/id_rsa.pub", pubSSHKey) + cmd := c.LoggedCommand(ctx, "roachprod", "get", c.makeNodes(controller), ".ssh/id_rsa.pub", pubSSHKey) if err := cmd.Run(); err != nil { t.Fatal(err) } @@ -146,26 +125,54 @@ func runJepsen(ctx context.Context, t *test, c *cluster) { c.Run(ctx, workers, "sh", "-c", `"cat controller_id_rsa.pub >> .ssh/authorized_keys"`) // Prime the known hosts file, and use the unhashed format to // work around JSCH auth error: https://github.com/jepsen-io/jepsen/blob/master/README.md - for _, ip := range workerIPs { + for _, ip := range c.InternalIP(ctx, workers) { c.Run(ctx, controller, "sh", "-c", fmt.Sprintf(`"ssh-keyscan -t rsa %s >> .ssh/known_hosts"`, ip)) } +} - var failures []string - testIdx := 0 - numTests := len(jepsenTests) * len(jepsenNemeses) - for _, testName := range jepsenTests { - for _, nemesis := range jepsenNemeses { - testIdx++ - testCfg := fmt.Sprintf("%s %s", testName, nemesis) - t.Status(fmt.Sprintf("%d/%d: %s (%d failures)", testIdx, numTests, testCfg, len(failures))) - logf("%s: running", testCfg) +func runJepsen(ctx context.Context, t *test, c *cluster, testName, nemesis string) { + initJepsen(ctx, t, c) + + controller := c.Node(c.nodes) + + // Get the IP addresses for all our workers. + var nodeFlags []string + for _, ip := range c.InternalIP(ctx, c.Range(1, c.nodes-1)) { + nodeFlags = append(nodeFlags, "-n "+ip) + } + nodesStr := strings.Join(nodeFlags, " ") + + // Wrap roachtest's primitive logging in something more like util/log + logf := func(f string, args ...interface{}) { + // This log prefix matches the one (sometimes) used in roachprod + c.l.printf(timeutil.Now().Format("2006/01/02 15:04:05 ")) + c.l.printf(f, args...) + c.l.printf("\n") + } + run := func(c *cluster, ctx context.Context, node nodeListOption, args ...string) { + if !c.isLocal() { + c.Run(ctx, node, args...) + return + } + args = append([]string{"roachprod", "run", c.makeNodes(node), "--"}, args...) + c.l.printf("> %s\n", strings.Join(args, " ")) + } + runE := func(c *cluster, ctx context.Context, node nodeListOption, args ...string) error { + if !c.isLocal() { + return c.RunE(ctx, node, args...) + } + args = append([]string{"roachprod", "run", c.makeNodes(node), "--"}, args...) + c.l.printf("> %s\n", strings.Join(args, " ")) + return nil + } - // Reset the "latest" alias for the next run. - c.Run(ctx, controller, "rm -f /mnt/data1/jepsen/cockroachdb/store/latest") + // Reset the "latest" alias for the next run. + t.Status("running") + run(c, ctx, controller, "rm -f /mnt/data1/jepsen/cockroachdb/store/latest") - errCh := make(chan error, 1) - go func() { - errCh <- c.RunE(ctx, controller, "bash", "-e", "-c", fmt.Sprintf(`"\ + errCh := make(chan error, 1) + go func() { + errCh <- runE(c, ctx, controller, "bash", "-e", "-c", fmt.Sprintf(`"\ cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && \ ~/lein run test \ --tarball file://${PWD}/cockroach.tgz \ @@ -180,93 +187,101 @@ cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && \ --test %s %s \ > invoke.log 2>&1 \ "`, nodesStr, testName, nemesis)) - }() - - outputDir := filepath.Join(artifacts, c.t.Name(), testCfg) - if err := os.MkdirAll(outputDir, 0777); err != nil { - t.Fatal(err) - } - var failed bool - select { - case testErr := <-errCh: - if testErr == nil { - logf("%s: passed, grabbing minimal logs", testCfg) - } else { - logf("%s: failed: %s", testCfg, testErr) - failed = true - } + }() - case <-time.After(20 * time.Minute): - // Although we run tests of 6 minutes each, we use a timeout - // much larger than that. This is because Jepsen for some - // tests (e.g. register) runs a potentially long analysis - // after the test itself has completed, before determining - // whether the test has succeeded or not. - // - // Try to get any running jvm to log its stack traces for - // extra debugging help. - c.Run(ctx, controller, "pkill -QUIT java") - time.Sleep(10 * time.Second) - c.Run(ctx, controller, "pkill java") - logf("%s: timed out", testCfg) - failed = true - } + outputDir := filepath.Join(artifacts, t.Name()) + if err := os.MkdirAll(outputDir, 0777); err != nil { + t.Fatal(err) + } + var testErr error + select { + case testErr = <-errCh: + if testErr == nil { + logf("passed, grabbing minimal logs") + } else { + logf("failed: %s", testErr) + } - if failed { - failures = append(failures, testCfg) + case <-time.After(20 * time.Minute): + // Although we run tests of 6 minutes each, we use a timeout + // much larger than that. This is because Jepsen for some + // tests (e.g. register) runs a potentially long analysis + // after the test itself has completed, before determining + // whether the test has succeeded or not. + // + // Try to get any running jvm to log its stack traces for + // extra debugging help. + run(c, ctx, controller, "pkill -QUIT java") + time.Sleep(10 * time.Second) + run(c, ctx, controller, "pkill java") + logf("timed out") + testErr = fmt.Errorf("timed out") + } - logf("%s: grabbing artifacts from controller. Tail of controller log:", testCfg) - c.Run(ctx, controller, "tail -n 100 /mnt/data1/jepsen/cockroachdb/invoke.log") - cmd = exec.CommandContext(ctx, "roachprod", "run", c.makeNodes(controller), - // -h causes tar to follow symlinks; needed by the "latest" symlink. - // -f- sends the output to stdout, we read it and save it to a local file. - "tar -chj --ignore-failed-read -f- /mnt/data1/jepsen/cockroachdb/store/latest /mnt/data1/jepsen/cockroachdb/invoke.log /var/log/") - output, err := cmd.Output() - if err != nil { - t.Fatal(err) - } - if err := ioutil.WriteFile(filepath.Join(outputDir, "failure-logs.tbz"), output, 0666); err != nil { - t.Fatal(err) - } - } else { - collectFiles := []string{ - "test.fressian", "results.edn", "latency-quantiles.png", "latency-raw.png", "rate.png", - } - anyFailed := false - for _, file := range collectFiles { - cmd = loggedCommand(ctx, "roachprod", "get", c.makeNodes(controller), - "/mnt/data1/jepsen/cockroachdb/store/latest/"+file, - filepath.Join(outputDir, file)) - cmd.Stdout = c.l.stdout - cmd.Stderr = c.l.stderr - if err := cmd.Run(); err != nil { - logf("failed to retrieve %s: %s", file, err) - } - } - if anyFailed { - // Try to figure out why this is so common. - cmd = loggedCommand(ctx, "roachprod", "get", c.makeNodes(controller), - "/mnt/data1/jepsen/cockroachdb/invoke.log", - filepath.Join(outputDir, "invoke.log")) - cmd.Stdout = c.l.stdout - cmd.Stderr = c.l.stderr - if err := cmd.Run(); err != nil { - logf("failed to retrieve invoke.log: %s", err) - } - } + if testErr != nil { + logf("grabbing artifacts from controller. Tail of controller log:") + run(c, ctx, controller, "tail -n 100 /mnt/data1/jepsen/cockroachdb/invoke.log") + cmd := exec.CommandContext(ctx, "roachprod", "run", c.makeNodes(controller), + // -h causes tar to follow symlinks; needed by the "latest" symlink. + // -f- sends the output to stdout, we read it and save it to a local file. + "tar -chj --ignore-failed-read -f- /mnt/data1/jepsen/cockroachdb/store/latest /mnt/data1/jepsen/cockroachdb/invoke.log /var/log/") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(filepath.Join(outputDir, "failure-logs.tbz"), output, 0666); err != nil { + t.Fatal(err) + } + t.Fatal(testErr) + } else { + collectFiles := []string{ + "test.fressian", "results.edn", "latency-quantiles.png", "latency-raw.png", "rate.png", + } + anyFailed := false + for _, file := range collectFiles { + cmd := c.LoggedCommand(ctx, "roachprod", "get", c.makeNodes(controller), + "/mnt/data1/jepsen/cockroachdb/store/latest/"+file, + filepath.Join(outputDir, file)) + cmd.Stdout = c.l.stdout + cmd.Stderr = c.l.stderr + if err := cmd.Run(); err != nil { + logf("failed to retrieve %s: %s", file, err) + } + } + if anyFailed { + // Try to figure out why this is so common. + cmd := c.LoggedCommand(ctx, "roachprod", "get", c.makeNodes(controller), + "/mnt/data1/jepsen/cockroachdb/invoke.log", + filepath.Join(outputDir, "invoke.log")) + cmd.Stdout = c.l.stdout + cmd.Stderr = c.l.stderr + if err := cmd.Run(); err != nil { + logf("failed to retrieve invoke.log: %s", err) } } - } - if len(failures) > 0 { - logf("jepsen tests failed: %v", failures) - t.Fatalf("jepsen tests failed: %v", failures) } } func registerJepsen(r *registry) { - r.Add(testSpec{ + spec := testSpec{ Name: "jepsen", Nodes: nodes(6), - Run: runJepsen, - }) + } + + for _, testName := range jepsenTests { + testName := testName + sub := testSpec{Name: testName} + for _, nemesis := range jepsenNemeses { + nemesis := nemesis + sub.SubTests = append(sub.SubTests, testSpec{ + Name: nemesis.name, + Run: func(ctx context.Context, t *test, c *cluster) { + runJepsen(ctx, t, c, testName, nemesis.config) + }, + }) + } + spec.SubTests = append(spec.SubTests, sub) + } + + r.Add(spec) } diff --git a/pkg/cmd/roachtest/scaledata.go b/pkg/cmd/roachtest/scaledata.go index a700c1541490..b3b395be01d0 100644 --- a/pkg/cmd/roachtest/scaledata.go +++ b/pkg/cmd/roachtest/scaledata.go @@ -83,11 +83,7 @@ func runSqlapp(ctx context.Context, t *test, c *cluster, app, flags string, dur // Sqlapps each take a `--cockroach_ip_addresses_csv` flag, which is a // comma-separated list of node IP addresses with optional port specifiers. - var addrs []string - for i := 1; i <= roachNodeCount; i++ { - addrs = append(addrs, c.InternalIP(ctx, i)) - } - addrStr := strings.Join(addrs, ",") + addrStr := strings.Join(c.InternalAddr(ctx, c.All()), ",") m := newMonitor(ctx, c, roachNodes) m.Go(func(ctx context.Context) error { diff --git a/pkg/cmd/roachtest/test.go b/pkg/cmd/roachtest/test.go index 3fb8709827f0..b9c1309db629 100644 --- a/pkg/cmd/roachtest/test.go +++ b/pkg/cmd/roachtest/test.go @@ -43,16 +43,47 @@ var ( clusterNameRE = regexp.MustCompile(`^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?$`) ) +func makeFilterRE(filter []string) *regexp.Regexp { + if len(filter) == 0 { + return regexp.MustCompile(`.`) + } + for i := range filter { + filter[i] = "(" + filter[i] + ")" + } + return regexp.MustCompile(strings.Join(filter, "|")) +} + type testSpec struct { - SkippedBecause string // if nonzero, test will be skipped + SkippedBecause string // if non-empty, test will be skipped Name string // Stable indicates whether failure of the test will result in failure of the // test run. Tests should be added initially as unstable, and only converted // to stable once they have passed successfully on multiple nightly (not // local) runs. Stable bool - Nodes []nodeSpec - Run func(ctx context.Context, t *test, c *cluster) + // Nodes provides the specification for the cluster to use for the test. Only + // a top-level testSpec may contain a nodes specification. The cluster is + // shared by all subtests. + Nodes []nodeSpec + // A testSpec must specify only one of Run or SubTests. A SubTest should not + // assume any particular state for the cluster as the SubTest may be run in + // isolation. + Run func(ctx context.Context, t *test, c *cluster) + SubTests []testSpec +} + +// matchRegex returns true if the regex matches the test's name or any of the +// subtest names. +func (t *testSpec) matchRegex(re *regexp.Regexp) bool { + if re.MatchString(t.Name) { + return true + } + for i := range t.SubTests { + if t.SubTests[i].matchRegex(re) { + return true + } + } + return false } type registry struct { @@ -60,6 +91,13 @@ type registry struct { clusters map[string]string out io.Writer statusInterval time.Duration + + status struct { + syncutil.Mutex + running map[*test]struct{} + pass map[*test]struct{} + fail map[*test]struct{} + } } func newRegistry() *registry { @@ -86,37 +124,48 @@ func (r *registry) verifyClusterName(testName string) error { return nil } +func (r *registry) prepareSpec(spec *testSpec, depth int) error { + if depth == 0 { + // Only top-level tests can create clusters, so those are the only ones for + // which we need to verify the cluster name. + if err := r.verifyClusterName(spec.Name); err != nil { + return err + } + } + + if (spec.Run != nil) == (len(spec.SubTests) > 0) { + return fmt.Errorf("%s: must specify only one of Run or SubTests", spec.Name) + } + + if depth > 0 && len(spec.Nodes) > 0 { + return fmt.Errorf("%s: subtest may not provide cluster specification", spec.Name) + } + + for i := range spec.SubTests { + spec.SubTests[i].Name = spec.Name + "/" + spec.SubTests[i].Name + if err := r.prepareSpec(&spec.SubTests[i], depth+1); err != nil { + return err + } + } + return nil +} + func (r *registry) Add(spec testSpec) { if _, ok := r.m[spec.Name]; ok { fmt.Fprintf(os.Stderr, "test %s already registered\n", spec.Name) os.Exit(1) } - r.m[spec.Name] = &spec - if err := r.verifyClusterName(spec.Name); err != nil { + if err := r.prepareSpec(&spec, 0); err != nil { fmt.Fprintf(os.Stderr, "%s\n", err) os.Exit(1) } + r.m[spec.Name] = &spec } -func (r *registry) List(filter []string) []*testSpec { - if len(filter) == 0 { - filter = []string{"."} - } - re := make([]*regexp.Regexp, len(filter)) - for i := range filter { - re[i] = regexp.MustCompile(filter[i]) - } - +func (r *registry) List(re *regexp.Regexp) []*testSpec { var results []*testSpec for _, t := range r.m { - var matched bool - for _, r := range re { - if r.MatchString(t.Name) { - matched = true - break - } - } - if matched { + if t.matchRegex(re) { results = append(results, t) } } @@ -128,7 +177,8 @@ func (r *registry) List(filter []string) []*testSpec { } func (r *registry) Run(filter []string) int { - tests := r.List(filter) + filterRE := makeFilterRE(filter) + tests := r.List(filterRE) wg := &sync.WaitGroup{} wg.Add(count * len(tests)) @@ -143,43 +193,16 @@ func (r *registry) Run(filter []string) int { parallelism = len(tests) } - var status struct { - syncutil.Mutex - running map[*test]struct{} - pass map[*test]struct{} - fail map[*test]struct{} - } - status.running = make(map[*test]struct{}) - status.pass = make(map[*test]struct{}) - status.fail = make(map[*test]struct{}) + r.status.running = make(map[*test]struct{}) + r.status.pass = make(map[*test]struct{}) + r.status.fail = make(map[*test]struct{}) go func() { sem := make(chan struct{}, parallelism) for j := 0; j < count; j++ { for i := range tests { - t := &test{spec: tests[i]} sem <- struct{}{} - if teamCity { - fmt.Printf("##teamcity[testStarted name='%s' flowId='%s']\n", t.Name(), t.Name()) - } else { - stability := "" - if !t.spec.Stable { - stability = " [unstable]" - } - fmt.Fprintf(r.out, "=== RUN %s%s\n", t.Name(), stability) - } - status.Lock() - status.running[t] = struct{}{} - status.Unlock() - t.run(r.out, func(failed bool) { - status.Lock() - delete(status.running, t) - if failed { - status.fail[t] = struct{}{} - } else { - status.pass[t] = struct{}{} - } - status.Unlock() + r.run(tests[i], filterRE, nil, func() { wg.Done() <-sem }) @@ -207,12 +230,12 @@ func (r *registry) Run(filter []string) int { for i := 1; ; i++ { select { case <-done: - status.Lock() - defer status.Unlock() - postSlackReport(status.pass, status.fail, len(r.m)-len(tests)) + r.status.Lock() + defer r.status.Unlock() + postSlackReport(r.status.pass, r.status.fail, len(r.m)-len(tests)) stableFails := 0 - for t := range status.fail { + for t := range r.status.fail { if t.spec.Stable { stableFails++ } @@ -222,16 +245,16 @@ func (r *registry) Run(filter []string) int { return 1 } unstableFails := "" - if n := len(status.fail) - stableFails; n > 0 { + if n := len(r.status.fail) - stableFails; n > 0 { unstableFails = fmt.Sprintf(" (%d unstable FAIL)", n) } fmt.Fprintf(r.out, "PASS%s\n", unstableFails) return 0 case <-ticker.C: - status.Lock() - runningTests := make([]*test, 0, len(status.running)) - for t := range status.running { + r.status.Lock() + runningTests := make([]*test, 0, len(r.status.running)) + for t := range r.status.running { runningTests = append(runningTests, t) } sort.Slice(runningTests, func(i, j int) bool { @@ -239,6 +262,11 @@ func (r *registry) Run(filter []string) int { }) var buf bytes.Buffer for _, t := range runningTests { + if t.spec.Run == nil { + // Ignore tests with subtests. + continue + } + t.mu.Lock() done := t.mu.done var status map[int64]testStatus @@ -294,7 +322,7 @@ func (r *registry) Run(filter []string) int { } } fmt.Fprint(r.out, buf.String()) - status.Unlock() + r.status.Unlock() case <-sig: destroyAllClusters() @@ -455,7 +483,22 @@ func (t *test) Failed() bool { return failed } -func (t *test) run(out io.Writer, done func(failed bool)) { +func (r *registry) run(spec *testSpec, filter *regexp.Regexp, c *cluster, done func()) { + t := &test{spec: spec} + + if teamCity { + fmt.Printf("##teamcity[testStarted name='%s' flowId='%s']\n", t.Name(), t.Name()) + } else { + stability := "" + if !t.spec.Stable { + stability = " [unstable]" + } + fmt.Fprintf(r.out, "=== RUN %s%s\n", t.Name(), stability) + } + r.status.Lock() + r.status.running[t] = struct{}{} + r.status.Unlock() + callerName := func() string { // Make room for the skip PC. var pc [2]uintptr @@ -496,34 +539,43 @@ func (t *test) run(out io.Writer, done func(failed bool)) { if t.Failed() { if teamCity { fmt.Fprintf( - out, "##teamcity[testFailed name='%s' details='%s' flowId='%s']\n", + r.out, "##teamcity[testFailed name='%s' details='%s' flowId='%s']\n", t.Name(), teamCityEscape(string(t.mu.output)), t.Name(), ) } - fmt.Fprintf(out, "--- FAIL: %s %s(%s)\n%s", t.Name(), stability, dstr, t.mu.output) + fmt.Fprintf(r.out, "--- FAIL: %s %s(%s)\n%s", t.Name(), stability, dstr, t.mu.output) } else if t.spec.SkippedBecause == "" { - fmt.Fprintf(out, "--- PASS: %s %s(%s)\n", t.Name(), stability, dstr) + fmt.Fprintf(r.out, "--- PASS: %s %s(%s)\n", t.Name(), stability, dstr) // If `##teamcity[testFailed ...]` is not present before `##teamCity[testFinished ...]`, // TeamCity regards the test as successful. } else { if teamCity { - fmt.Fprintf(out, "##teamcity[testIgnored name='%s' message='%s']\n", + fmt.Fprintf(r.out, "##teamcity[testIgnored name='%s' message='%s']\n", t.Name(), t.spec.SkippedBecause) } - fmt.Fprintf(out, "--- SKIP: %s (%s)\n\t%s\n", t.Name(), dstr, t.spec.SkippedBecause) + fmt.Fprintf(r.out, "--- SKIP: %s (%s)\n\t%s\n", t.Name(), dstr, t.spec.SkippedBecause) } if teamCity { - fmt.Fprintf(out, "##teamcity[testFinished name='%s' flowId='%s']\n", t.Name(), t.Name()) + fmt.Fprintf(r.out, "##teamcity[testFinished name='%s' flowId='%s']\n", t.Name(), t.Name()) escapedTestName := teamCityNameEscape(t.Name()) artifactsGlobPath := filepath.Join(artifacts, escapedTestName, "**") artifactsSpec := fmt.Sprintf("%s => %s", artifactsGlobPath, escapedTestName) - fmt.Fprintf(out, "##teamcity[publishArtifacts '%s']\n", artifactsSpec) + fmt.Fprintf(r.out, "##teamcity[publishArtifacts '%s']\n", artifactsSpec) } } - done(t.Failed()) + r.status.Lock() + delete(r.status.running, t) + if t.Failed() { + r.status.fail[t] = struct{}{} + } else { + r.status.pass[t] = struct{}{} + } + r.status.Unlock() + + done() }() t.start = timeutil.Now() @@ -532,17 +584,39 @@ func (t *test) run(out io.Writer, done func(failed bool)) { return } + ctx := context.Background() if !dryrun { - ctx := context.Background() - c := newCluster(ctx, t, t.spec.Nodes) - defer func() { - if !debug || !t.Failed() { - c.Destroy(ctx) - } else { - c.l.printf("not destroying cluster to allow debugging\n") + if c == nil { + c = newCluster(ctx, t, t.spec.Nodes) + if c != nil { + defer func() { + if !debug || !t.Failed() { + c.Destroy(ctx) + } else { + c.l.printf("not destroying cluster to allow debugging\n") + } + }() } - }() - t.spec.Run(ctx, t, c) + } else { + c = c.clone(t) + } + } + + if t.spec.Run != nil { + if !dryrun { + t.spec.Run(ctx, t, c) + } + } else { + for i := range t.spec.SubTests { + if t.spec.SubTests[i].matchRegex(filter) { + var wg sync.WaitGroup + wg.Add(1) + r.run(&t.spec.SubTests[i], filter, c, func() { + wg.Done() + }) + wg.Wait() + } + } } }() } diff --git a/pkg/cmd/roachtest/test_test.go b/pkg/cmd/roachtest/test_test.go index 9a9baeb66ab0..5da782baa9f7 100644 --- a/pkg/cmd/roachtest/test_test.go +++ b/pkg/cmd/roachtest/test_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io/ioutil" "regexp" + "sort" "strings" "sync" "testing" @@ -28,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/kr/pretty" ) func TestRegistryRun(t *testing.T) { @@ -202,3 +204,107 @@ func TestRegistryVerifyClusterName(t *testing.T) { }) } } + +func TestRegistryPrepareSpec(t *testing.T) { + dummyRun := func(context.Context, *test, *cluster) {} + + var listTests func(t *testSpec) []string + listTests = func(t *testSpec) []string { + r := []string{t.Name} + for i := range t.SubTests { + r = append(r, listTests(&t.SubTests[i])...) + } + return r + } + + testCases := []struct { + spec testSpec + expectedErr string + expectedTests []string + }{ + { + testSpec{ + Name: "a", + Run: dummyRun, + }, + "", + []string{"a"}, + }, + { + testSpec{ + Name: "a", + SubTests: []testSpec{{ + Name: "b", + Run: dummyRun, + }}, + }, + "", + []string{"a", "a/b"}, + }, + { + testSpec{ + Name: "a", + Run: dummyRun, + SubTests: []testSpec{{ + Name: "b", + Run: dummyRun, + }}, + }, + "a: must specify only one of Run or SubTests", + nil, + }, + { + testSpec{ + Name: "a", + SubTests: []testSpec{{ + Name: "b", + }}, + }, + "a/b: must specify only one of Run or SubTests", + nil, + }, + { + testSpec{ + Name: "a", + SubTests: []testSpec{{ + Name: "b", + Run: dummyRun, + SubTests: []testSpec{{ + Name: "c", + Run: dummyRun, + }}, + }}, + }, + "b: must specify only one of Run or SubTests", + nil, + }, + { + testSpec{ + Name: "a", + SubTests: []testSpec{{ + Name: "b", + Nodes: nodes(1), + Run: dummyRun, + }}, + }, + "a/b: subtest may not provide cluster specification", + nil, + }, + } + for _, c := range testCases { + t.Run("", func(t *testing.T) { + r := newRegistry() + err := r.prepareSpec(&c.spec, 0) + if !testutils.IsError(err, c.expectedErr) { + t.Fatalf("expected %s, but found %v", c.expectedErr, err) + } + if c.expectedErr == "" { + tests := listTests(&c.spec) + sort.Strings(tests) + if diff := pretty.Diff(c.expectedTests, tests); len(diff) != 0 { + t.Fatalf("unexpected tests:\n%s", strings.Join(diff, "\n")) + } + } + }) + } +}