diff --git a/go.mod b/go.mod index 0cc3b888c5e4..313df01798db 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/MichaelTJones/walk v0.0.0-20161122175330-4748e29d5718 github.com/PuerkitoBio/goquery v1.5.1 github.com/Shopify/sarama v1.29.0 - github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/VividCortex/ewma v1.1.1 github.com/abourget/teamcity v0.0.0-00010101000000-000000000000 github.com/alessio/shellescape v1.4.1 diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 875d2436f566..c162f7b1f5e2 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -119,7 +119,6 @@ go_library( "synctest.go", "sysbench.go", "tlp.go", - "toxiproxy.go", "tpc_utils.go", "tpcc.go", "tpcdsvec.go", @@ -213,7 +212,6 @@ go_library( "@com_github_prometheus_client_golang//api/prometheus/v1:prometheus", "@com_github_prometheus_common//model", "@com_github_shopify_sarama//:sarama", - "@com_github_shopify_toxiproxy//client", "@com_github_stretchr_testify//require", "@org_golang_google_protobuf//proto", "@org_golang_x_exp//rand", diff --git a/pkg/cmd/roachtest/tests/network.go b/pkg/cmd/roachtest/tests/network.go index c46bf305bb16..28689ba3643e 100644 --- a/pkg/cmd/roachtest/tests/network.go +++ b/pkg/cmd/roachtest/tests/network.go @@ -11,103 +11,25 @@ package tests import ( - "bytes" "context" "errors" "fmt" - "io/ioutil" "os" "path/filepath" "sync/atomic" "time" - toxiproxy "github.com/Shopify/toxiproxy/client" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" _ "github.com/lib/pq" // register postgres driver "github.com/stretchr/testify/require" ) -// runNetworkSanity is just a sanity check to make sure we're setting up toxiproxy -// correctly. It injects latency between the nodes and verifies that we're not -// seeing the latency on the client connection running `SELECT 1` on each node. -func runNetworkSanity(ctx context.Context, t test.Test, origC cluster.Cluster, nodes int) { - origC.Put(ctx, t.Cockroach(), "./cockroach", origC.All()) - c, err := Toxify(ctx, t, origC, origC.All()) - if err != nil { - t.Fatal(err) - } - - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All()) - - db := c.Conn(ctx, t.L(), 1) // unaffected by toxiproxy - defer db.Close() - err = WaitFor3XReplication(ctx, t, db) - require.NoError(t, err) - - // NB: we're generous with latency in this test because we're checking that - // the upstream connections aren't affected by latency below, but the fixed - // cost of starting the binary and processing the query is already close to - // 100ms. - const latency = 300 * time.Millisecond - for i := 1; i <= nodes; i++ { - // NB: note that these latencies only apply to connections *to* the node - // on which the toxic is active. That is, if n1 has a (down or upstream) - // latency toxic of 100ms, then none of its outbound connections are - // affected but any connections made to it by other nodes will. - // In particular, it's difficult to simulate intricate network partitions - // as there's no way to activate toxics only for certain peers. - proxy := c.Proxy(i) - if _, err := proxy.AddToxic("", "latency", "downstream", 1, toxiproxy.Attributes{ - "latency": latency / (2 * time.Millisecond), // ms - }); err != nil { - t.Fatal(err) - } - if _, err := proxy.AddToxic("", "latency", "upstream", 1, toxiproxy.Attributes{ - "latency": latency / (2 * time.Millisecond), // ms - }); err != nil { - t.Fatal(err) - } - } - - m := c.Cluster.NewMonitor(ctx, c.All()) - m.Go(func(ctx context.Context) error { - c.Measure(ctx, 1, `SET CLUSTER SETTING trace.debug.enable = true`) - c.Measure(ctx, 1, "CREATE DATABASE test") - c.Measure(ctx, 1, `CREATE TABLE test.commit (a INT, b INT, v INT, PRIMARY KEY (a, b))`) - - for i := 0; i < 10; i++ { - duration := c.Measure(ctx, 1, fmt.Sprintf( - "BEGIN; INSERT INTO test.commit VALUES (2, %[1]d), (1, %[1]d), (3, %[1]d); COMMIT", - i, - )) - t.L().Printf("%s\n", duration) - } - - c.Measure(ctx, 1, ` -set tracing=on; -insert into test.commit values(3,1000), (1,1000), (2,1000); -select age, message from [ show trace for session ]; -`) - - for i := 1; i <= origC.Spec().NodeCount; i++ { - if dur := c.Measure(ctx, i, `SELECT 1`); dur > latency { - t.Fatalf("node %d unexpectedly affected by latency: select 1 took %.2fs", i, dur.Seconds()) - } - } - - return nil - }) - - m.Wait() -} - // runNetworkAuthentication creates a network black hole to the leaseholder // of system.users, and then validates that the time required to create // new connections to the cluster afterwards remains under a reasonable limit. @@ -367,156 +289,8 @@ sudo iptables-save m.Wait() } -func runNetworkTPCC(ctx context.Context, t test.Test, origC cluster.Cluster, nodes int) { - n := origC.Spec().NodeCount - serverNodes, workerNode := origC.Range(1, n-1), origC.Node(n) - origC.Put(ctx, t.Cockroach(), "./cockroach", origC.All()) - origC.Put(ctx, t.DeprecatedWorkload(), "./workload", origC.All()) - - c, err := Toxify(ctx, t, origC, serverNodes) - if err != nil { - t.Fatal(err) - } - - const warehouses = 1 - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), serverNodes) - c.Run(ctx, c.Node(1), tpccImportCmd(warehouses)) - - db := c.Conn(ctx, t.L(), 1) - defer db.Close() - err = WaitFor3XReplication(ctx, t, db) - require.NoError(t, err) - - duration := time.Hour - if c.IsLocal() { - // NB: this is really just testing the test with this duration, it won't - // be able to detect slow goroutine leaks. - duration = 5 * time.Minute - } - - // Run TPCC, but don't give it the first node (or it basically won't do anything). - m := c.NewMonitor(ctx, serverNodes) - - m.Go(func(ctx context.Context) error { - t.WorkerStatus("running tpcc") - - cmd := fmt.Sprintf( - "./workload run tpcc --warehouses=%d --wait=false"+ - " --histograms="+t.PerfArtifactsDir()+"/stats.json"+ - " --duration=%s {pgurl:2-%d}", - warehouses, duration, c.Spec().NodeCount-1) - return c.RunE(ctx, workerNode, cmd) - }) - - checkGoroutines := func(ctx context.Context) int { - // NB: at the time of writing, the goroutine count would quickly - // stabilize near 230 when the network is partitioned, and around 270 - // when it isn't. Experimentally a past "slow" goroutine leak leaked ~3 - // goroutines every minute (though it would likely be more with the tpcc - // workload above), which over the duration of an hour would easily push - // us over the threshold. - const thresh = 350 - - uiAddrs, err := c.ExternalAdminUIAddr(ctx, t.L(), serverNodes) - if err != nil { - t.Fatal(err) - } - var maxSeen int - // The goroutine dump may take a while to generate, maybe more - // than the 3 second timeout of the default http client. - httpClient := httputil.NewClientWithTimeout(15 * time.Second) - for _, addr := range uiAddrs { - url := "http://" + addr + "/debug/pprof/goroutine?debug=2" - resp, err := httpClient.Get(ctx, url) - if err != nil { - t.Fatal(err) - } - content, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - t.Fatal(err) - } - numGoroutines := bytes.Count(content, []byte("goroutine ")) - if numGoroutines >= thresh { - t.Fatalf("%s shows %d goroutines (expected <%d)", url, numGoroutines, thresh) - } - if maxSeen < numGoroutines { - maxSeen = numGoroutines - } - } - return maxSeen - } - - m.Go(func(ctx context.Context) error { - time.Sleep(10 * time.Second) // give tpcc a head start - // Give n1 a network partition from the remainder of the cluster. Note that even though it affects - // both the "upstream" and "downstream" directions, this is in fact an asymmetric partition since - // it only affects connections *to* the node. n1 itself can connect to the cluster just fine. - proxy := c.Proxy(1) - t.L().Printf("letting inbound traffic to first node time out") - for _, direction := range []string{"upstream", "downstream"} { - if _, err := proxy.AddToxic("", "timeout", direction, 1, toxiproxy.Attributes{ - "timeout": 0, // forever - }); err != nil { - t.Fatal(err) - } - } - - t.WorkerStatus("checking goroutines") - done := time.After(duration) - var maxSeen int - for { - cur := checkGoroutines(ctx) - if maxSeen < cur { - t.L().Printf("new goroutine peak: %d", cur) - maxSeen = cur - } - - select { - case <-done: - t.L().Printf("done checking goroutines, repairing network") - // Repair the network. Note that the TPCC workload would never - // finish (despite the duration) without this. In particular, - // we don't want to m.Wait() before we do this. - toxics, err := proxy.Toxics() - if err != nil { - t.Fatal(err) - } - for _, toxic := range toxics { - if err := proxy.RemoveToxic(toxic.Name); err != nil { - t.Fatal(err) - } - } - t.L().Printf("network is repaired") - - // Verify that goroutine count doesn't spike. - for i := 0; i < 20; i++ { - nowGoroutines := checkGoroutines(ctx) - t.L().Printf("currently at most %d goroutines per node", nowGoroutines) - time.Sleep(time.Second) - } - - return nil - default: - time.Sleep(3 * time.Second) - } - } - }) - - m.Wait() -} - func registerNetwork(r registry.Registry) { const numNodes = 4 - - r.Add(registry.TestSpec{ - Name: fmt.Sprintf("network/sanity/nodes=%d", numNodes), - Owner: registry.OwnerKV, - Cluster: r.MakeClusterSpec(numNodes), - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runNetworkSanity(ctx, t, c, numNodes) - }, - }) r.Add(registry.TestSpec{ Name: fmt.Sprintf("network/authentication/nodes=%d", numNodes), Owner: registry.OwnerServer, @@ -525,34 +299,4 @@ func registerNetwork(r registry.Registry) { runNetworkAuthentication(ctx, t, c) }, }) - r.Add(registry.TestSpec{ - Name: fmt.Sprintf("network/tpcc/nodes=%d", numNodes), - Owner: registry.OwnerKV, - Cluster: r.MakeClusterSpec(numNodes), - Skip: "https://github.com/cockroachdb/cockroach/issues/49901#issuecomment-640666646", - SkipDetails: `The ordering of steps in the test is: - -- install toxiproxy -- start cluster, wait for up-replication -- launch the goroutine that starts the tpcc client command, but do not wait on -it starting -- immediately, cause a network partition -- only then, the goroutine meant to start the tpcc client goes to fetch the -pg URLs and start workload, but of course this fails because network -partition -- tpcc fails to start, so the test tears down before it resolves the network partition -- test tear-down and debug zip fail because the network partition is still active - -There are two problems here: - -the tpcc client is not actually started yet when the test sets up the -network partition. This is a race condition. there should be a defer in -there to resolve the partition when the test aborts prematurely. (And the -command to resolve the partition should not be sensitive to the test -context's Done() channel, because during a tear-down that is closed already) -`, - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runNetworkTPCC(ctx, t, c, numNodes) - }, - }) } diff --git a/pkg/cmd/roachtest/tests/toxiproxy.go b/pkg/cmd/roachtest/tests/toxiproxy.go deleted file mode 100644 index ba388411c476..000000000000 --- a/pkg/cmd/roachtest/tests/toxiproxy.go +++ /dev/null @@ -1,279 +0,0 @@ -// Copyright 2018 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package tests - -import ( - "context" - gosql "database/sql" - "fmt" - "net" - "net/url" - "regexp" - "runtime" - "strconv" - "time" - - toxiproxy "github.com/Shopify/toxiproxy/client" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" - "github.com/cockroachdb/errors" -) - -// cockroachToxiWrapper replaces the cockroach binary. It modifies the listening port so -// that the nodes in the cluster will communicate through toxiproxy instead of -// directly. -const cockroachToxiWrapper = `#!/usr/bin/env bash -set -eu - -cd "$(dirname "${0}")" - -orig_port="" - -args=() - -if [[ "$1" != "start" ]]; then - ./cockroach.real "$@" - exit $? -fi - -for arg in "$@"; do - capture=$(echo "${arg}" | sed -E 's/^--port=([0-9]+)$/\1/') - if [[ "${capture}" != "${arg}" ]] && [[ -z "${orig_port}" ]] && [[ -n "${capture}" ]]; then - orig_port="${capture}" - fi - args+=("${arg}") -done - -if [[ -z "${orig_port}" ]]; then - orig_port=26257 -fi - -args+=("--advertise-port=$((orig_port+10000))") - -echo "toxiproxy interception:" -echo "original args: $@" -echo "modified args: ${args[@]}" -./cockroach.real "${args[@]}" -` - -const toxiServerWrapper = `#!/usr/bin/env bash -set -eu - -mkdir -p logs -./toxiproxy-server -host 0.0.0.0 -port $1 2>&1 > logs/toxiproxy.log & /dev/null >/dev/null < /dev/null", toxPort)); err != nil { - return nil, errors.Wrap(err, "toxify") - } - - externalAddrs, err := c.ExternalAddr(ctx, t.L(), n) - if err != nil { - return nil, err - } - externalAddr, port, err := tc.addrToHostPort(externalAddrs[0]) - if err != nil { - return nil, err - } - tc.toxClients[i] = toxiproxy.NewClient(fmt.Sprintf("http://%s:%d", externalAddr, toxPort)) - proxy, err := tc.toxClients[i].CreateProxy("cockroach", fmt.Sprintf(":%d", tc.poisonedPort(port)), fmt.Sprintf("127.0.0.1:%d", port)) - if err != nil { - return nil, errors.Wrap(err, "toxify") - } - tc.toxProxies[i] = proxy - } - - return tc, nil -} - -func (*ToxiCluster) addrToHostPort(addr string) (string, int, error) { - host, portStr, err := net.SplitHostPort(addr) - if err != nil { - return "", 0, err - } - port, err := strconv.Atoi(portStr) - if err != nil { - return "", 0, err - } - return host, port, nil -} - -func (tc *ToxiCluster) poisonedPort(port int) int { - // NB: to make a change here, you also have to change - _ = cockroachToxiWrapper - return port + 10000 -} - -// Proxy returns the toxiproxy Proxy intercepting the given node's traffic. -func (tc *ToxiCluster) Proxy(i int) *toxiproxy.Proxy { - proxy, found := tc.toxProxies[i] - if !found { - tc.t.Fatalf("proxy for node %d not found", i) - } - return proxy -} - -// ExternalAddr gives the external host:port of the node(s), bypassing the -// toxiproxy interception. -func (tc *ToxiCluster) ExternalAddr( - ctx context.Context, node option.NodeListOption, -) ([]string, error) { - return tc.Cluster.ExternalAddr(ctx, tc.t.L(), node) -} - -// PoisonedExternalAddr gives the external host:port of the toxiproxy process -// for the given nodes (i.e. the connection will be affected by toxics). -func (tc *ToxiCluster) PoisonedExternalAddr( - ctx context.Context, node option.NodeListOption, -) ([]string, error) { - var out []string - - extAddrs, err := tc.ExternalAddr(ctx, node) - if err != nil { - return nil, err - } - for _, addr := range extAddrs { - host, port, err := tc.addrToHostPort(addr) - if err != nil { - return nil, err - } - out = append(out, fmt.Sprintf("%s:%d", host, tc.poisonedPort(port))) - } - return out, nil -} - -// PoisonedPGAddr gives a connection to the given node that passes through toxiproxy. -func (tc *ToxiCluster) PoisonedPGAddr( - ctx context.Context, node option.NodeListOption, -) ([]string, error) { - var out []string - - urls, err := tc.ExternalPGUrl(ctx, tc.t.L(), node) - if err != nil { - return nil, err - } - exts, err := tc.PoisonedExternalAddr(ctx, node) - if err != nil { - return nil, err - } - for i, s := range urls { - u, err := url.Parse(s) - if err != nil { - tc.t.Fatal(err) - } - u.Host = exts[i] - out = append(out, u.String()) - } - return out, nil -} - -// PoisonedConn returns an SQL connection to the specified node through toxiproxy. -func (tc *ToxiCluster) PoisonedConn(ctx context.Context, node int) *gosql.DB { - urls, err := tc.PoisonedPGAddr(ctx, tc.Cluster.Node(node)) - if err != nil { - tc.t.Fatal(err) - } - db, err := gosql.Open("postgres", urls[0]) - if err != nil { - tc.t.Fatal(err) - } - return db -} - -var _ = (*ToxiCluster)(nil).PoisonedConn -var _ = (*ToxiCluster)(nil).PoisonedPGAddr -var _ = (*ToxiCluster)(nil).PoisonedExternalAddr - -var measureRE = regexp.MustCompile(`real[^0-9]+([0-9.]+)`) - -// Measure runs a statement on the given node (bypassing toxiproxy for the -// client connection) and measures the duration (including the invocation time -// of `./cockroach sql`. This is simplistic and does not perform proper -// escaping. It's not useful for anything but simple sanity checks. -func (tc *ToxiCluster) Measure(ctx context.Context, fromNode int, stmt string) time.Duration { - externalAddrs, err := tc.ExternalAddr(ctx, tc.Node(fromNode)) - if err != nil { - tc.t.Fatal(err) - } - _, port, err := tc.addrToHostPort(externalAddrs[0]) - if err != nil { - tc.t.Fatal(err) - } - result, err := tc.Cluster.RunWithDetailsSingleNode(ctx, tc.t.L(), tc.Cluster.Node(fromNode), "time", "-p", "./cockroach", "sql", "--insecure", "--port", strconv.Itoa(port), "-e", "'"+stmt+"'") - output := []byte(result.Stdout + result.Stderr) - tc.t.L().Printf("%s\n", output) - if err != nil { - tc.t.Fatal(err) - } - matches := measureRE.FindSubmatch(output) - if len(matches) != 2 { - tc.t.Fatalf("unable to extract duration from output: %s", output) - } - f, err := strconv.ParseFloat(string(matches[1]), 64) - if err != nil { - tc.t.Fatalf("unable to parse %s as float: %s", output, err) - } - return time.Duration(f * 1e9) -} diff --git a/vendor b/vendor index 54ccaeee3f91..6cde0a9d2eb4 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 54ccaeee3f91daab2f4fd90fd4b947e314aed02f +Subproject commit 6cde0a9d2eb410e75e3624f7b6da7aefa1021db1