Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar: Added matchers support #3940

Merged
merged 16 commits into from
May 7, 2021
Merged
55 changes: 47 additions & 8 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,34 @@ func runSidecar(
g.Add(func() error {
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure sane sidecar flags.
// Check prometheus's flags to ensure same sidecar flags.
if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}

// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.BuildVersion(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus version",
)
return nil
})
if err != nil {
return errors.Wrap(err, "failed to get prometheus version")
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -209,14 +228,13 @@ func runSidecar(
cancel()
})
}

{
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
t.MaxIdleConns = conf.connection.maxIdleConns
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -346,10 +364,11 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string

limitMinTime thanosmodel.TimeOrDurationValue

Expand Down Expand Up @@ -395,6 +414,26 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
return s.mint, s.maxt
}

func (s *promMetadata) BuildVersion(ctx context.Context) error {
ver, err := s.client.BuildVersion(ctx, s.promURL)
if err != nil {
return err
}

s.mtx.Lock()
defer s.mtx.Unlock()

s.promVersion = ver
return nil
}

func (s *promMetadata) Version() string {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.promVersion
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
github.com/blang/semver/v4 v4.0.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blang/semver v3.5.0+incompatible h1:CGxCgetQ64DKk7rdZ++Vfnb1+ogGNnB17OJKJXD2Cfs=
github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
Expand Down
33 changes: 33 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,39 @@ func (c *Client) AlertmanagerAlerts(ctx context.Context, base *url.URL) ([]*mode
return v.Data, nil
}

// BuildVersion returns Prometheus version from /api/v1/status/buildinfo Prometheus endpoint.
// For Prometheus versions < 2.14.0 it returns "0" as Prometheus version.
func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/status/buildinfo")

level.Debug(c.logger).Log("msg", "build version", "url", u.String())

span, ctx := tracing.StartSpan(ctx, "/prom_buildversion HTTP[client]")
defer span.Finish()

// We get status code 404 for prometheus versions lower than 2.14.0
body, code, err := c.req2xx(ctx, &u, http.MethodGet)
if err != nil {
if code == http.StatusNotFound {
return "0", nil
}
return "", err
}

var b struct {
Data struct {
Version string `json:"version"`
} `json:"data"`
}

if err = json.Unmarshal(body, &b); err != nil {
return "", errors.Wrap(err, "unmarshal build info API response")
}

return b.Data.Version, nil
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
}

func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}
Expand Down
52 changes: 49 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"
"sync"

"github.com/blang/semver/v4"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -50,6 +51,7 @@ type PrometheusStore struct {
component component.StoreAPI
externalLabelsFn func() labels.Labels
timestamps func() (mint int64, maxt int64)
promVersion func() string

remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

Expand All @@ -69,6 +71,7 @@ func NewPrometheusStore(
component component.StoreAPI,
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -80,6 +83,7 @@ func NewPrometheusStore(
component: component,
externalLabelsFn: externalLabelsFn,
timestamps: timestamps,
promVersion: promVersion,
remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
Expand Down Expand Up @@ -496,9 +500,51 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, nil, r.Start, r.End)
if err != nil {
return nil, err
var (
sers []map[string]string
err error
)

lvc := false // LabelValuesCall
vals := []string{}
v := p.promVersion()

version, err := semver.Parse(v)
if err == nil {
baseVer, err := semver.Make("2.24.0")
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is true(https://github.com/thanos-io/thanos/pull/3940/files#r622405182), then we should not exit early. Then we can remove the error check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can remove the error check.

Sorry, are you saying about the error check at Line 515? As it is for handling error retuned while initializing the baseVer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, if it errors out, we can revert to our original behaviour of -

we can still make a series call instead of returning the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this look good?

version, err1 := semver.Parse(v)

baseVer, err2 := semver.Make("2.24.0")

if err1 == nil && err2 == nil && version.Compare(baseVer) == 1 {

    lvc = true

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for elongating this, but we can simplify things by using this package

I checked their docs and it stated for their Compare function, we can provide two strings, if the promVersion is invalid, the value would be 0, so we dont have to worry about that. So lets use this -

defaultVersion := "2.24.0"
if len(r.Matchers) == 0 || semver.Compare(version, defaultVersion) == 1 {

Our current implementation is making it unnecessarily verbose and by using the above package, we can simplify things 😄

Copy link
Contributor Author

@Namanl2001 Namanl2001 Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on this @yeya24? 🤗 As you earlier suggested to use a library #3940 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already use the semver library then let's use it here as well.

return nil, err
}
if version.Compare(baseVer) == 1 {
lvc = true
}
}

if len(r.Matchers) == 0 || lvc {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version, err := semver.Parse(v)
if err == nil {
baseVer, err := semver.Make("2.24.0")
if err != nil {
return nil, err
}
if version.Compare(baseVer) == 1 {
lvc = true
}
}
if len(r.Matchers) == 0 || lvc {
// Make sure the prometheus version is a valid semver type.
version, err := semver.Parse(v)
if err != nil {
return nil, err
}
baseVer, err := semver.Make("2.24.0")
if err != nil {
return nil, err
}
if len(r.Matchers) == 0 || version.Compare(baseVer) == 1 {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the version is malformed we can still make a series call instead of returning the error. Please see If it is malformed, we just use the series way in #3940 (comment)

vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Matchers, r.Start, r.End)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we making gRPC call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming makes me confused, too. We are making HTTP calls but return GRPC error.

if err != nil {
return nil, err
}
} else {
matchers, err := storepb.MatchersToPromMatchers(r.Matchers...)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
if err != nil {
return nil, err
}

// Using set to handle duplicate values.
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
labelValuesSet := make(map[string]struct{})
for _, s := range sers {
if val, exists := s[r.Label]; exists {
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
labelValuesSet[val] = struct{}{}
}
}
for key := range labelValuesSet {
vals = append(vals, key)
}
}
sort.Strings(vals)
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
return &storepb.LabelValuesResponse{Values: vals}, nil
Expand Down
49 changes: 41 additions & 8 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter.
func() (int64, int64) { return limitMinT, -1 }, nil) // Maxt does not matter.
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 })
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil)
testutil.Ok(t, err)

for _, tcase := range []struct {
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Expand Down Expand Up @@ -420,7 +420,11 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
version, err := promclient.NewDefaultClient().BuildVersion(ctx, u)
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil,
func() string { return version })
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand All @@ -441,6 +445,32 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)

// check label value matcher
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{"b"}, resp.Values)

// check another label value matcher
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "d"},
},
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)
}

// Test to check external label values retrieve.
Expand All @@ -466,7 +496,10 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
version, err := promclient.NewDefaultClient().BuildVersion(ctx, u)
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, func() string { return version })
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand Down Expand Up @@ -512,7 +545,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -557,7 +590,7 @@ func TestPrometheusStore_Info(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 })
func() (int64, int64) { return 123, 456 }, nil)
testutil.Ok(t, err)

resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -635,7 +668,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down