From d7765ccd20616d91affe2c27057ebe5204260588 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Wed, 6 Mar 2019 15:29:32 +0100 Subject: [PATCH] test/e2e: Add spin up test for remote write receiver --- Gopkg.lock | 13 +-------- test/e2e/query_test.go | 50 ++++++++++++++++++++++++++++------- test/e2e/spinup_test.go | 58 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 98 insertions(+), 23 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 51eac948d3c..3cded3c6b70 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -35,7 +35,7 @@ version = "0.1.7" [[projects]] - digest = "1:5da4d3b3b9949b9043d2fd36c4ff9b208f72ad5260a3dcb6f94267a769ee1899" + digest = "0:" name = "github.com/Azure/azure-storage-blob-go" packages = ["azblob"] pruneopts = "" @@ -96,14 +96,6 @@ revision = "5c37fe3735342a2e0d01c87a907579987c8936cc" version = "v1.0.0" -[[projects]] - branch = "master" - digest = "1:aa8f6e1219c8c1bf569ded1c45485f4cf6900b0a99a2a17e253d8dde6515da43" - name = "github.com/cockroachdb/cmux" - packages = ["."] - pruneopts = "" - revision = "30d10be492927e2dcae0089c374c455d42414fcb" - [[projects]] branch = "master" digest = "1:c367c68b4bf22ef91069ae442422025da1a3a57049b370252a7b4a895c3fdd6b" @@ -623,7 +615,6 @@ "internal/timeseries", "ipv4", "ipv6", - "netutil", "publicsuffix", "trace", ] @@ -832,7 +823,6 @@ "github.com/NYTimes/gziphandler", "github.com/armon/go-metrics", "github.com/armon/go-metrics/prometheus", - "github.com/cockroachdb/cmux", "github.com/fatih/structtag", "github.com/fortytw2/leaktest", "github.com/fsnotify/fsnotify", @@ -892,7 +882,6 @@ "github.com/prometheus/tsdb/labels", "github.com/prometheus/tsdb/testutil", "golang.org/x/net/context", - "golang.org/x/net/netutil", "golang.org/x/sync/errgroup", "golang.org/x/text/language", "golang.org/x/text/message", diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index b9a345ef71e..35e056ca7f2 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "net/url" + "os" "testing" "time" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/promclient" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/testutil" @@ -20,28 +22,32 @@ type testConfig struct { } var ( - firstPromPort = promHTTPPort(1) + firstPromPort = promHTTPPort(1) + remoteWriteEndpoint = fmt.Sprintf("http://%s/receive", remoteWriteReceiveHTTP(1)) queryGossipSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), true)). Add(scraper(2, defaultPromConfig("prom-ha", 0), true)). Add(scraper(3, defaultPromConfig("prom-ha", 1), true)). Add(querier(1, "replica"), queryCluster(1)). - Add(querier(2, "replica"), queryCluster(2)) + Add(querier(2, "replica"), queryCluster(2)). + Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint), true)) queryStaticFlagsSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)). Add(scraper(2, defaultPromConfig("prom-ha", 0), false)). Add(scraper(3, defaultPromConfig("prom-ha", 1), false)). - Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), ""). - Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "") + Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint), true)) queryFileSDSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)). Add(scraper(2, defaultPromConfig("prom-ha", 0), false)). Add(scraper(3, defaultPromConfig("prom-ha", 1), false)). - Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), ""). - Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "") + Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint), true)) ) func TestQuery(t *testing.T) { @@ -84,12 +90,15 @@ func testQuerySimple(t *testing.T, conf testConfig) { var res model.Vector + w := log.NewSyncWriter(os.Stderr) + l := log.NewLogfmtLogger(w) + // Try query without deduplication. - testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(l, time.Second, ctx.Done(), func() error { select { case <-exit: cancel() - return nil + return errors.Errorf("exiting test, possibly due to timeout") default: } @@ -98,7 +107,7 @@ func testQuerySimple(t *testing.T, conf testConfig) { if err != nil { return err } - if len(res) != 3 { + if len(res) != 4 { return errors.Errorf("unexpected result size %d", len(res)) } return nil @@ -126,6 +135,11 @@ func testQuerySimple(t *testing.T, conf testConfig) { "prometheus": "prom-ha", "replica": model.LabelValue("1"), }, res[2].Metric) + testutil.Equals(t, model.Metric{ + "__name__": "up", + "instance": model.LabelValue("localhost:9100"), + "job": "node", + }, res[3].Metric) // Try query with deduplication. testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { @@ -141,7 +155,7 @@ func testQuerySimple(t *testing.T, conf testConfig) { if err != nil { return err } - if len(res) != 2 { + if len(res) != 3 { return errors.Errorf("unexpected result size for query with deduplication %d", len(res)) } @@ -160,6 +174,11 @@ func testQuerySimple(t *testing.T, conf testConfig) { "job": "prometheus", "prometheus": "prom-ha", }, res[1].Metric) + testutil.Equals(t, model.Metric{ + "__name__": "up", + "instance": model.LabelValue("localhost:9100"), + "job": "node", + }, res[2].Metric) } func urlParse(t *testing.T, addr string) *url.URL { @@ -183,3 +202,14 @@ scrape_configs: - "localhost:%s" `, name, replicas, firstPromPort) } + +func defaultPromRemoteWriteConfig(remoteWriteEndpoint string) string { + return fmt.Sprintf(` +scrape_configs: +- job_name: 'node' + static_configs: + - targets: ['localhost:9100'] +remote_write: +- url: "%s" +`, remoteWriteEndpoint) +} diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index c2ac1e401a2..165490252fd 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -25,7 +25,8 @@ var ( promHTTPPort = func(i int) string { return fmt.Sprintf("%d", 9090+i) } // We keep this one with localhost, to have perfect match with what Prometheus will expose in up metric. - promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) } + promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) } + promRemoteWriteHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(100+i)) } sidecarGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19090+i) } sidecarHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19190+i) } @@ -39,6 +40,11 @@ var ( rulerHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19890+i) } rulerCluster = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19990+i) } + remoteWriteReceiveHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18690+i) } + remoteWriteReceiveGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18790+i) } + remoteWriteReceiveMetricHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18890+i) } + remoteWriteReceiveCluster = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18990+i) } + storeGatewayGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20090+i) } storeGatewayHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20190+i) } @@ -115,6 +121,56 @@ func scraper(i int, config string, gossip bool) (cmdScheduleFunc, string) { }, gossipAddress } +func receiver(i int, config string, gossip bool) (cmdScheduleFunc, string) { + gossipAddress := "" + if gossip { + gossipAddress = remoteWriteReceiveCluster(i) + } + + return func(workDir string, clusterPeerFlags []string) ([]*exec.Cmd, error) { + promDir := fmt.Sprintf("%s/data/remote-write-prom%d", workDir, i) + if err := os.MkdirAll(promDir, 0777); err != nil { + return nil, errors.Wrap(err, "create prom dir failed") + } + + if err := ioutil.WriteFile(promDir+"/prometheus.yml", []byte(config), 0666); err != nil { + return nil, errors.Wrap(err, "creating prom config failed") + } + + var cmds []*exec.Cmd + cmds = append(cmds, exec.Command(testutil.PrometheusBinary(), + "--config.file", promDir+"/prometheus.yml", + "--storage.tsdb.path", promDir, + "--log.level", "info", + "--web.listen-address", promRemoteWriteHTTP(i), + )) + args := []string{ + "remote-write-receive", + "--debug.name", fmt.Sprintf("remote-write-receive-%d", i), + "--grpc-address", remoteWriteReceiveGRPC(i), + "--http-address", remoteWriteReceiveMetricHTTP(i), + "--remote-write.address", remoteWriteReceiveHTTP(i), + "--tsdb.path", promDir, + "--cluster.address", remoteWriteReceiveCluster(i), + "--log.level", "debug", + } + + if gossip { + args = append(args, []string{ + "--cluster.advertise-address", remoteWriteReceiveCluster(i), + "--cluster.gossip-interval", "200ms", + "--cluster.pushpull-interval", "200ms", + }...) + args = append(args, clusterPeerFlags...) + } else { + args = append(args, "--cluster.disable") + } + cmds = append(cmds, exec.Command("thanos", args...)) + + return cmds, nil + }, gossipAddress +} + func querier(i int, replicaLabel string, staticStores ...string) cmdScheduleFunc { return func(_ string, clusterPeerFlags []string) ([]*exec.Cmd, error) { args := append(defaultQuerierFlags(i, replicaLabel),