From c4e0fe46b775bf95f167524f627a51192c9d6808 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Wed, 18 Sep 2019 14:32:07 +0200 Subject: [PATCH 1/5] Add prober to receive Signed-off-by: Kemal Akkoyun --- cmd/thanos/compact.go | 2 +- cmd/thanos/main.go | 2 +- cmd/thanos/query.go | 2 +- cmd/thanos/receive.go | 26 ++++++++++++++++---------- cmd/thanos/sidecar.go | 2 +- go.mod | 2 ++ 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 5b2e277fc6..8f0f4a235e 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -170,7 +170,7 @@ func runCompact( statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil { - return errors.Wrap(err, "create default HTTP server with readiness prober") + return errors.Wrap(err, "schedule default HTTP server with probes") } confContentYaml, err := objStoreConfig.Content() diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index a5143be371..a7a583915f 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -80,7 +80,7 @@ func main() { registerCompact(cmds, app) registerBucket(cmds, app, "bucket") registerDownsample(cmds, app, "downsample") - registerReceive(cmds, app, "receive") + registerReceive(cmds, app) registerChecks(cmds, app, "check") cmd, err := app.Parse(os.Args[1:]) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 515c57de61..c7652243d2 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -416,7 +416,7 @@ func runQuery( // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil { - return errors.Wrap(err, "create default HTTP server with readiness prober") + return errors.Wrap(err, "schedule default HTTP server with probes") } } // Start query (proxy) gRPC StoreAPI. diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index ed6115425c..c7e7b3b3dc 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/objstore/client" + "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/shipper" @@ -28,11 +29,12 @@ import ( kingpin "gopkg.in/alecthomas/kingpin.v2" ) -func registerReceive(m map[string]setupFunc, app *kingpin.Application, name string) { - cmd := app.Command(name, "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)") +func registerReceive(m map[string]setupFunc, app *kingpin.Application) { + comp := component.Receive + cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)") grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) - httpMetricsBindAddr := regHTTPAddrFlag(cmd) + httpBindAddr := regHTTPAddrFlag(cmd) remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). Default("0.0.0.0:19291").String() @@ -62,7 +64,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri tsdbBlockDuration := modelDuration(cmd.Flag("tsdb.block-duration", "Duration for local TSDB blocks").Default("2h").Hidden()) - m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { return errors.Wrap(err, "parse labels") @@ -97,7 +99,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri *cert, *key, *clientCA, - *httpMetricsBindAddr, + *httpBindAddr, *remoteWriteAddress, *dataDir, objStoreConfig, @@ -109,6 +111,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri *replicaHeader, *replicationFactor, *tsdbBlockDuration, + comp, ) } } @@ -122,7 +125,7 @@ func runReceive( cert string, key string, clientCA string, - httpMetricsBindAddr string, + httpBindAddr string, remoteWriteAddress string, dataDir string, objStoreConfig *pathOrContent, @@ -134,6 +137,7 @@ func runReceive( replicaHeader string, replicationFactor uint64, tsdbBlockDuration model.Duration, + comp component.Component, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") @@ -248,9 +252,11 @@ func runReceive( ) } - level.Debug(logger).Log("msg", "setting up metric http listen-group") - if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil { - return err + level.Debug(logger).Log("msg", "setting up default http server") + statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) + // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. + if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { + return errors.Wrap(err, "schedule default HTTP server with probes") } level.Debug(logger).Log("msg", "setting up grpc server") @@ -278,6 +284,7 @@ func runReceive( s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts) level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) + statusProber.SetReady() return errors.Wrap(s.Serve(l), "serve gRPC") }, func(error) { if s != nil { @@ -343,6 +350,5 @@ func runReceive( } level.Info(logger).Log("msg", "starting receiver") - return nil } diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 68afd666e4..d2b192de58 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -122,7 +122,7 @@ func runSidecar( statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { - return errors.Wrap(err, "create default HTTP server with readiness prober") + return errors.Wrap(err, "schedule default HTTP server with probes") } // Setup all the concurrent groups. diff --git a/go.mod b/go.mod index b5d592a939..01910aa94e 100644 --- a/go.mod +++ b/go.mod @@ -65,3 +65,5 @@ replace ( k8s.io/klog => k8s.io/klog v0.3.1 k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 ) + +go 1.13 From 1cee31d1571fe686a92a25172e1facfe1f49a484 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Wed, 18 Sep 2019 14:38:04 +0200 Subject: [PATCH 2/5] Add changelog entries Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 2 ++ go.mod | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25ffcf972c..468cab5f7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Added - [#1538](https://github.com/thanos-io/thanos/pull/1538) Added `/-/ready` and `/-/healthy` endpoints to Thanos Rule. +- [#XXX](https://github.com/thanos-io/thanos/pull/XXX) Added `/-/ready` and `/-/healthy` endpoints to Thanos Receive. +- [#1534](https://github.com/thanos-io/thanos/pull/1534) Added `/-/ready` endpoint to Thanos Query. -[1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag. ### Fixed diff --git a/go.mod b/go.mod index 01910aa94e..b5d592a939 100644 --- a/go.mod +++ b/go.mod @@ -65,5 +65,3 @@ replace ( k8s.io/klog => k8s.io/klog v0.3.1 k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 ) - -go 1.13 From 023a1ea063dc1d262f1dfc10980966c1fce72650 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Wed, 18 Sep 2019 14:40:49 +0200 Subject: [PATCH 3/5] Update README Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 468cab5f7e..825efa655f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,14 +13,14 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Added - [#1538](https://github.com/thanos-io/thanos/pull/1538) Added `/-/ready` and `/-/healthy` endpoints to Thanos Rule. -- [#XXX](https://github.com/thanos-io/thanos/pull/XXX) Added `/-/ready` and `/-/healthy` endpoints to Thanos Receive. +- [#1537](https://github.com/thanos-io/thanos/pull/1537) Added `/-/ready` and `/-/healthy` endpoints to Thanos Receive. - [#1534](https://github.com/thanos-io/thanos/pull/1534) Added `/-/ready` endpoint to Thanos Query. --[1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag. +- [#1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag. ### Fixed --[#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems. --[#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks. +- [#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems. +- [#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks. ## v0.7.0 - 2019.09.02 From 0bf982c16cb48c4716b6f81df3db45e5607fcc3a Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Wed, 18 Sep 2019 16:23:23 +0200 Subject: [PATCH 4/5] Remove default Signed-off-by: Kemal Akkoyun --- cmd/thanos/compact.go | 4 ++-- cmd/thanos/query.go | 4 ++-- cmd/thanos/receive.go | 6 +++--- cmd/thanos/sidecar.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 8f0f4a235e..01347caba2 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -168,9 +168,9 @@ func runCompact( downsampleMetrics := newDownsampleMetrics(reg) statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) - // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. + // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil { - return errors.Wrap(err, "schedule default HTTP server with probes") + return errors.Wrap(err, "schedule HTTP server with probes") } confContentYaml, err := objStoreConfig.Content() diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index c7652243d2..594ac90de3 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -414,9 +414,9 @@ func runQuery( api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) - // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. + // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil { - return errors.Wrap(err, "schedule default HTTP server with probes") + return errors.Wrap(err, "schedule HTTP server with probes") } } // Start query (proxy) gRPC StoreAPI. diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c7e7b3b3dc..8b19beb654 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -252,11 +252,11 @@ func runReceive( ) } - level.Debug(logger).Log("msg", "setting up default http server") + level.Debug(logger).Log("msg", "setting up http server") statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) - // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. + // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { - return errors.Wrap(err, "schedule default HTTP server with probes") + return errors.Wrap(err, "schedule HTTP server with probes") } level.Debug(logger).Log("msg", "setting up grpc server") diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index d2b192de58..aab6be8e55 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -120,9 +120,9 @@ func runSidecar( } statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) - // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. + // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { - return errors.Wrap(err, "schedule default HTTP server with probes") + return errors.Wrap(err, "schedule HTTP server with probes") } // Setup all the concurrent groups. From c4279bbeead97b207e2db0150c0128b99025faef Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Thu, 19 Sep 2019 11:18:37 +0200 Subject: [PATCH 5/5] Wait hashring to be ready Signed-off-by: Kemal Akkoyun --- cmd/thanos/receive.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 8b19beb654..72dd8752b0 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -163,6 +163,8 @@ func runReceive( ReplicationFactor: replicationFactor, }) + statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) + // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. dbOpen := make(chan struct{}) @@ -202,6 +204,7 @@ func runReceive( ) } + hashringReady := make(chan struct{}) level.Debug(logger).Log("msg", "setting up hashring") { updates := make(chan receive.Hashring) @@ -231,15 +234,20 @@ func runReceive( func() error { select { case h := <-updates: + close(hashringReady) webHandler.Hashring(h) + statusProber.SetReady() case <-cancel: + close(hashringReady) return nil } select { // If any new hashring is received, then mark the handler as unready, but keep it alive. case <-updates: + msg := "hashring has changed; server is not ready to receive web requests." webHandler.Hashring(nil) - level.Info(logger).Log("msg", "hashring has changed; server is not ready to receive web requests.") + statusProber.SetNotReady(errors.New(msg)) + level.Info(logger).Log("msg", msg) case <-cancel: return nil } @@ -253,7 +261,6 @@ func runReceive( } level.Debug(logger).Log("msg", "setting up http server") - statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { return errors.Wrap(err, "schedule HTTP server with probes") @@ -283,8 +290,9 @@ func runReceive( } s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts) + // Wait hashring to be ready before start serving metrics + <-hashringReady level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) - statusProber.SetReady() return errors.Wrap(s.Serve(l), "serve gRPC") }, func(error) { if s != nil {