Skip to content

Commit

Permalink
pkg/receive: enable forwarding metrics (#1243)
Browse files Browse the repository at this point in the history
* pkg/receive: rename host->node

This commit renames `host` to `node` in the context of the receive
hashring. This is because more often than not, the hashring will deal
with endpoints rather than simply hosts and node is a more generic
term for the operand of a hashring.

* pkg/receive: forward metrics

This commit enables metrics forwarding from one receive node to another.
The receive nodes construct hashrings from the given sd-files and
use these hashrings to select a node to which toforward a given time
series. Time series are batched together to ensure that for any incoming
write-request to a node, at most one outgoing write-request will be made
every other node in the hashring.

* test/e2e: add receiver hashring test
  • Loading branch information
squat authored and brancz committed Jun 20, 2019
1 parent 66fd721 commit 34dcf07
Show file tree
Hide file tree
Showing 10 changed files with 769 additions and 227 deletions.
92 changes: 91 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"net"
"os"
"strings"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -45,12 +47,42 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri

retention := modelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention").Default("15d"))

hashringsFile := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration.").
PlaceHolder("<path>").String()

refreshInterval := modelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))

local := cmd.Flag("receive.local-endpoint", "Endpoint of local receive node. Used to identify the local node in the hashring configuration.").String()

tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default("THANOS-TENANT").String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
}

var cw *receive.ConfigWatcher
if *hashringsFile != "" {
cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval)
if err != nil {
return err
}
}

// Local is empty, so try to generate a local endpoint
// based on the hostname and the listening port.
if *local == "" {
hostname, err := os.Hostname()
if hostname == "" || err != nil {
return errors.New("--receive.local-endpoint is empty and host could not be determined.")
}
parts := strings.Split(*remoteWriteAddress, ":")
port := parts[len(parts)-1]
*local = fmt.Sprintf("http://%s:%s/api/v1/receive", hostname, port)
}

return runReceive(
g,
logger,
Expand All @@ -66,6 +98,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
objStoreConfig,
lset,
*retention,
cw,
*local,
*tenantHeader,
)
}
}
Expand All @@ -85,6 +120,9 @@ func runReceive(
objStoreConfig *pathOrContent,
lset labels.Labels,
retention model.Duration,
cw *receive.ConfigWatcher,
endpoint string,
tenantHeader string,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand All @@ -103,6 +141,8 @@ func runReceive(
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
Endpoint: endpoint,
TenantHeader: tenantHeader,
})

// Start all components while we wait for TSDB to open but only load
Expand All @@ -129,7 +169,7 @@ func runReceive(

startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
webHandler.Ready()
webHandler.StorageReady()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
close(dbOpen)
<-cancel
Expand All @@ -144,6 +184,56 @@ func runReceive(
)
}

level.Debug(logger).Log("msg", "setting up hashring")
{
updates := make(chan receive.Hashring)
if cw != nil {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
receive.HashringFromConfig(ctx, updates, cw)
return nil
}, func(error) {
cancel()
close(updates)
})
} else {
cancel := make(chan struct{})
g.Add(func() error {
updates <- receive.SingleNodeHashring(endpoint)
<-cancel
return nil
}, func(error) {
close(cancel)
close(updates)
})
}

cancel := make(chan struct{})
g.Add(
func() error {
select {
case h := <-updates:
webHandler.Hashring(h)
case <-cancel:
return nil
}
select {
// If any new hashring is received, then mark the handler as unready, but keep it alive.
case <-updates:
webHandler.Hashring(nil)
level.Info(logger).Log("msg", "hashring has changed; server is not ready to receive web requests.")
case <-cancel:
return nil
}
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}

level.Debug(logger).Log("msg", "setting up metric http listen-group")
if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
google.golang.org/api v0.3.2
google.golang.org/grpc v1.19.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect
gopkg.in/yaml.v2 v2.2.2
)
Loading

0 comments on commit 34dcf07

Please sign in to comment.