Skip to content

Commit

Permalink
test/e2e: Add spin up test for remote write receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Mar 6, 2019
1 parent 0a18603 commit d7765cc
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 23 deletions.
13 changes: 1 addition & 12 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 40 additions & 10 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"net/url"
"os"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/promclient"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
Expand All @@ -20,28 +22,32 @@ type testConfig struct {
}

var (
firstPromPort = promHTTPPort(1)
firstPromPort = promHTTPPort(1)
remoteWriteEndpoint = fmt.Sprintf("http://%s/receive", remoteWriteReceiveHTTP(1))

queryGossipSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), true)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), true)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), true)).
Add(querier(1, "replica"), queryCluster(1)).
Add(querier(2, "replica"), queryCluster(2))
Add(querier(2, "replica"), queryCluster(2)).
Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint), true))

queryStaticFlagsSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), false)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), false)).
Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "").
Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "")
Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), "").
Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), "").
Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint), true))

queryFileSDSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), false)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), false)).
Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "").
Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "")
Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), "").
Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), "").
Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint), true))
)

func TestQuery(t *testing.T) {
Expand Down Expand Up @@ -84,12 +90,15 @@ func testQuerySimple(t *testing.T, conf testConfig) {

var res model.Vector

w := log.NewSyncWriter(os.Stderr)
l := log.NewLogfmtLogger(w)

// Try query without deduplication.
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
testutil.Ok(t, runutil.RetryWithLog(l, time.Second, ctx.Done(), func() error {
select {
case <-exit:
cancel()
return nil
return errors.Errorf("exiting test, possibly due to timeout")
default:
}

Expand All @@ -98,7 +107,7 @@ func testQuerySimple(t *testing.T, conf testConfig) {
if err != nil {
return err
}
if len(res) != 3 {
if len(res) != 4 {
return errors.Errorf("unexpected result size %d", len(res))
}
return nil
Expand Down Expand Up @@ -126,6 +135,11 @@ func testQuerySimple(t *testing.T, conf testConfig) {
"prometheus": "prom-ha",
"replica": model.LabelValue("1"),
}, res[2].Metric)
testutil.Equals(t, model.Metric{
"__name__": "up",
"instance": model.LabelValue("localhost:9100"),
"job": "node",
}, res[3].Metric)

// Try query with deduplication.
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
Expand All @@ -141,7 +155,7 @@ func testQuerySimple(t *testing.T, conf testConfig) {
if err != nil {
return err
}
if len(res) != 2 {
if len(res) != 3 {
return errors.Errorf("unexpected result size for query with deduplication %d", len(res))
}

Expand All @@ -160,6 +174,11 @@ func testQuerySimple(t *testing.T, conf testConfig) {
"job": "prometheus",
"prometheus": "prom-ha",
}, res[1].Metric)
testutil.Equals(t, model.Metric{
"__name__": "up",
"instance": model.LabelValue("localhost:9100"),
"job": "node",
}, res[2].Metric)
}

func urlParse(t *testing.T, addr string) *url.URL {
Expand All @@ -183,3 +202,14 @@ scrape_configs:
- "localhost:%s"
`, name, replicas, firstPromPort)
}

func defaultPromRemoteWriteConfig(remoteWriteEndpoint string) string {
return fmt.Sprintf(`
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
remote_write:
- url: "%s"
`, remoteWriteEndpoint)
}
58 changes: 57 additions & 1 deletion test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var (
promHTTPPort = func(i int) string { return fmt.Sprintf("%d", 9090+i) }

// We keep this one with localhost, to have perfect match with what Prometheus will expose in up metric.
promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) }
promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) }
promRemoteWriteHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(100+i)) }

sidecarGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19090+i) }
sidecarHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19190+i) }
Expand All @@ -39,6 +40,11 @@ var (
rulerHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19890+i) }
rulerCluster = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19990+i) }

remoteWriteReceiveHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18690+i) }
remoteWriteReceiveGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18790+i) }
remoteWriteReceiveMetricHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18890+i) }
remoteWriteReceiveCluster = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18990+i) }

storeGatewayGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20090+i) }
storeGatewayHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20190+i) }

Expand Down Expand Up @@ -115,6 +121,56 @@ func scraper(i int, config string, gossip bool) (cmdScheduleFunc, string) {
}, gossipAddress
}

func receiver(i int, config string, gossip bool) (cmdScheduleFunc, string) {
gossipAddress := ""
if gossip {
gossipAddress = remoteWriteReceiveCluster(i)
}

return func(workDir string, clusterPeerFlags []string) ([]*exec.Cmd, error) {
promDir := fmt.Sprintf("%s/data/remote-write-prom%d", workDir, i)
if err := os.MkdirAll(promDir, 0777); err != nil {
return nil, errors.Wrap(err, "create prom dir failed")
}

if err := ioutil.WriteFile(promDir+"/prometheus.yml", []byte(config), 0666); err != nil {
return nil, errors.Wrap(err, "creating prom config failed")
}

var cmds []*exec.Cmd
cmds = append(cmds, exec.Command(testutil.PrometheusBinary(),
"--config.file", promDir+"/prometheus.yml",
"--storage.tsdb.path", promDir,
"--log.level", "info",
"--web.listen-address", promRemoteWriteHTTP(i),
))
args := []string{
"remote-write-receive",
"--debug.name", fmt.Sprintf("remote-write-receive-%d", i),
"--grpc-address", remoteWriteReceiveGRPC(i),
"--http-address", remoteWriteReceiveMetricHTTP(i),
"--remote-write.address", remoteWriteReceiveHTTP(i),
"--tsdb.path", promDir,
"--cluster.address", remoteWriteReceiveCluster(i),
"--log.level", "debug",
}

if gossip {
args = append(args, []string{
"--cluster.advertise-address", remoteWriteReceiveCluster(i),
"--cluster.gossip-interval", "200ms",
"--cluster.pushpull-interval", "200ms",
}...)
args = append(args, clusterPeerFlags...)
} else {
args = append(args, "--cluster.disable")
}
cmds = append(cmds, exec.Command("thanos", args...))

return cmds, nil
}, gossipAddress
}

func querier(i int, replicaLabel string, staticStores ...string) cmdScheduleFunc {
return func(_ string, clusterPeerFlags []string) ([]*exec.Cmd, error) {
args := append(defaultQuerierFlags(i, replicaLabel),
Expand Down

0 comments on commit d7765cc

Please sign in to comment.