Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: add support for subtests #25408

Merged
merged 6 commits into from
May 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 84 additions & 16 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
gosql "database/sql"
"fmt"
"io"
"net"
"net/url"
"os"
"os/exec"
Expand Down Expand Up @@ -468,6 +469,22 @@ func newCluster(ctx context.Context, t testI, nodes []nodeSpec) *cluster {
return c
}

// clone creates a new cluster object that refers to the same cluster as the
// receiver, but is associated with the specified test.
func (c *cluster) clone(t *test) *cluster {
l, err := rootLogger(t.Name())
if err != nil {
t.Fatal(err)
}
return &cluster{
name: c.name,
nodes: c.nodes,
status: t.Status,
t: t,
l: l,
}
}

// All returns a node list containing all of the nodes in the cluster.
func (c *cluster) All() nodeListOption {
return c.Range(1, c.nodes)
Expand Down Expand Up @@ -535,6 +552,16 @@ func (c *cluster) destroy(ctx context.Context) {
}
}

// Run a command with output redirected to the logs instead of to os.Stdout
// (which doesn't go anywhere I've been able to find) Don't use this if you're
// going to call cmd.CombinedOutput or cmd.Output.
func (c *cluster) LoggedCommand(ctx context.Context, arg0 string, args ...string) *exec.Cmd {
cmd := exec.CommandContext(ctx, arg0, args...)
cmd.Stdout = c.l.stdout
cmd.Stderr = c.l.stderr
return cmd
}

// Put a local file to all of the machines in a cluster.
func (c *cluster) Put(ctx context.Context, src, dest string, opts ...option) {
if c.t.Failed() {
Expand Down Expand Up @@ -711,57 +738,98 @@ func (c *cluster) RunWithBuffer(
// address. In general, inter-cluster communication and should use internal IPs
// and communication from a test driver to nodes in a cluster should use
// external IPs.
func (c *cluster) pgURL(ctx context.Context, node int, external bool) string {
func (c *cluster) pgURL(ctx context.Context, node nodeListOption, external bool) []string {
args := []string{`pgurl`}
if external {
args = append(args, `--external`)
}
args = append(args, c.makeNodes(c.Node(node)))
args = append(args, c.makeNodes(node))
cmd := exec.CommandContext(ctx, `roachprod`, args...)
output, err := cmd.CombinedOutput()
if err != nil {
fmt.Println(strings.Join(cmd.Args, ` `))
c.t.Fatal(err)
}
return strings.Trim(string(output), "' \n")
urls := strings.Split(strings.TrimSpace(string(output)), " ")
for i := range urls {
urls[i] = strings.Trim(urls[i], "'")
}
return urls
}

// InternalPGUrl returns the internal Postgres endpoint for the specified node.
func (c *cluster) InternalPGUrl(ctx context.Context, node int) string {
// InternalPGUrl returns the internal Postgres endpoint for the specified nodes.
func (c *cluster) InternalPGUrl(ctx context.Context, node nodeListOption) []string {
return c.pgURL(ctx, node, false /* external */)
}

// ExternalPGUrl returns the external Postgres endpoint for the specified node.
func (c *cluster) ExternalPGUrl(ctx context.Context, node int) string {
// Silence unused warning.
var _ = (&cluster{}).InternalPGUrl

// ExternalPGUrl returns the external Postgres endpoint for the specified nodes.
func (c *cluster) ExternalPGUrl(ctx context.Context, node nodeListOption) []string {
return c.pgURL(ctx, node, true /* external */)
}

func urlToIP(c *cluster, pgURL string) string {
func urlToAddr(c *cluster, pgURL string) string {
u, err := url.Parse(pgURL)
if err != nil {
c.t.Fatal(err)
}
return u.Host
}

// InternalIP returns the internal IP address in the form host:port for the
// specified node.
func (c *cluster) InternalIP(ctx context.Context, node int) string {
return urlToIP(c, c.InternalPGUrl(ctx, node))
func addrToIP(c *cluster, addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
c.t.Fatal(err)
}
return host
}

// ExternalIP returns the external IP address in the form host:port for the
// InternalAddr returns the internal address in the form host:port for the
// specified nodes.
func (c *cluster) InternalAddr(ctx context.Context, node nodeListOption) []string {
var addrs []string
for _, u := range c.pgURL(ctx, node, false /* external */) {
addrs = append(addrs, urlToAddr(c, u))
}
return addrs
}

// InternalIP returns the internal IP addresses for the specified nodes.
func (c *cluster) InternalIP(ctx context.Context, node nodeListOption) []string {
var ips []string
for _, addr := range c.InternalAddr(ctx, node) {
ips = append(ips, addrToIP(c, addr))
}
return ips
}

// ExternalAddr returns the external address in the form host:port for the
// specified node.
func (c *cluster) ExternalIP(ctx context.Context, node int) string {
return urlToIP(c, c.ExternalPGUrl(ctx, node))
func (c *cluster) ExternalAddr(ctx context.Context, node nodeListOption) []string {
var addrs []string
for _, u := range c.pgURL(ctx, node, true /* external */) {
addrs = append(addrs, urlToAddr(c, u))
}
return addrs
}

// ExternalIP returns the external IP addresses for the specified node.
func (c *cluster) ExternalIP(ctx context.Context, node nodeListOption) []string {
var ips []string
for _, addr := range c.ExternalAddr(ctx, node) {
ips = append(ips, addrToIP(c, addr))
}
return ips
}

// Silence unused warning.
var _ = (&cluster{}).ExternalIP

// Conn returns a SQL connection to the specified node.
func (c *cluster) Conn(ctx context.Context, node int) *gosql.DB {
url := c.ExternalPGUrl(ctx, node)
url := c.ExternalPGUrl(ctx, c.Node(node))[0]
db, err := gosql.Open("postgres", url)
if err != nil {
c.t.Fatal(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) {
db := c.Conn(ctx, 1)
defer db.Close()

c.Start(ctx, c.Node(node), startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d", c.InternalIP(ctx, nodes), node)))
c.Start(ctx, c.Node(node), startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d",
c.InternalAddr(ctx, c.Node(nodes))[0], node)))
}
// TODO(tschottdorf): run some ui sanity checks about decommissioned nodes
// having disappeared. Verify that the workloads don't dip their qps or
Expand Down
Loading