From 6ce875c536029d7be546fc7db5a00c6851d4c02a Mon Sep 17 00:00:00 2001 From: jojohappy Date: Mon, 11 Mar 2019 23:50:12 +0800 Subject: [PATCH 1/9] Feature: add /api/v1/labels support Signed-off-by: jojohappy --- pkg/query/api/v1.go | 34 +++++++++++++++++++++++++++++ pkg/query/querier.go | 15 +++++++++++-- pkg/store/bucket.go | 42 ++++++++++++++++++++++++++++++++++-- pkg/store/prometheus.go | 27 ++++++++++++++++++++++- pkg/store/proxy.go | 47 ++++++++++++++++++++++++++++++++++++++++- pkg/store/proxy_test.go | 45 ++++++++++++++++++++++++++++++++++++++- pkg/store/tsdb.go | 14 ++++++++++-- 7 files changed, 215 insertions(+), 9 deletions(-) diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index b05a109207..55ece3c853 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -176,6 +176,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. r.Get("/label/:name/values", instr("label_values", api.labelValues)) r.Get("/series", instr("series", api.series)) + + r.Get("/labels", instr("label_names", api.labelNames)) } type queryData struct { @@ -614,3 +616,35 @@ func parseDuration(s string) (time.Duration, error) { } return 0, fmt.Errorf("cannot parse %q to a valid duration", s) } + +func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { + ctx := r.Context() + + enablePartialResponse, apiErr := api.parsePartialResponseParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + + var ( + warnmtx sync.Mutex + warnings []error + ) + warningReporter := func(err error) { + warnmtx.Lock() + warnings = append(warnings, err) + warnmtx.Unlock() + } + + q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) + if err != nil { + return nil, nil, &ApiError{errorExec, err} + } + defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames") + + names, err := q.LabelNames() + if err != nil { + return nil, nil, &ApiError{errorExec, err} + } + + return names, warnings, nil +} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 819ff3ac2a..07dc3ec71b 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -265,9 +265,20 @@ func (q *querier) LabelValues(name string) ([]string, error) { } // LabelNames returns all the unique label names present in the block in sorted order. -// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702. func (q *querier) LabelNames() ([]string, error) { - return nil, errors.New("not implemented") + span, ctx := tracing.StartSpan(q.ctx, "querier_label_names") + defer span.Finish() + + resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse}) + if err != nil { + return nil, errors.Wrap(err, "proxy LabelNames()") + } + + for _, w := range resp.Warnings { + q.warningReporter(errors.New(w)) + } + + return resp.Names, nil } func (q *querier) Close() error { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8e6c3533c5..2c722d8c32 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -856,8 +856,37 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { } // LabelNames implements the storepb.StoreServer interface. -func (s *BucketStore) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + g, gctx := errgroup.WithContext(ctx) + + s.mtx.RLock() + + var mtx sync.Mutex + var sets [][]string + + for _, b := range s.blocks { + indexr := b.indexReader(gctx) + g.Go(func() error { + defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") + + res := indexr.LabelNames() + + mtx.Lock() + sets = append(sets, res) + mtx.Unlock() + + return nil + }) + } + + s.mtx.RUnlock() + + if err := g.Wait(); err != nil { + return nil, status.Error(codes.Aborted, err.Error()) + } + return &storepb.LabelNamesResponse{ + Names: strutil.MergeSlices(sets...), + }, nil } // LabelValues implements the storepb.StoreServer interface. @@ -1616,6 +1645,15 @@ func (r *bucketIndexReader) LabelValues(name string) []string { return res } +// LabelNames returns a list of label names. +func (r *bucketIndexReader) LabelNames() []string { + res := make([]string, 0, len(r.block.lvals)) + for ln, _ := range r.block.lvals { + res = append(res, ln) + } + return res +} + // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index e8fd677f53..edb791e414 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -339,7 +339,32 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label { func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + u := *p.base + u.Path = path.Join(u.Path, "/api/v1/labels") + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, status.Error(codes.Unknown, err.Error()) + } + + span, ctx := tracing.StartSpan(ctx, "/prom_label_names HTTP[client]") + defer span.Finish() + + resp, err := p.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, status.Error(codes.Unknown, err.Error()) + } + defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") + + var m struct { + Data []string `json:"data"` + } + if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { + return nil, status.Error(codes.Unknown, err.Error()) + } + sort.Strings(m.Data) + + return &storepb.LabelNamesResponse{Names: m.Data}, nil } // LabelValues returns all known label values for a given label name. diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 9324d62247..49ff212c0c 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -397,7 +397,52 @@ func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher) func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + var ( + warnings []string + names [][]string + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + ) + + stores, err := s.stores(ctx) + if err != nil { + return nil, status.Errorf(codes.Unknown, err.Error()) + } + for _, st := range stores { + st := st + g.Go(func() error { + resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{ + PartialResponseDisabled: r.PartialResponseDisabled, + }) + if err != nil { + err = errors.Wrapf(err, "fetch label names from store %s", st) + if r.PartialResponseDisabled { + return err + } + + mtx.Lock() + warnings = append(warnings, errors.Wrap(err, "fetch label names").Error()) + mtx.Unlock() + return nil + } + + mtx.Lock() + warnings = append(warnings, resp.Warnings...) + names = append(names, resp.Names) + mtx.Unlock() + + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + return &storepb.LabelNamesResponse{ + Names: strutil.MergeUnsortedSlices(names...), + Warnings: warnings, + }, nil } // LabelValues returns all known label values for a given label name. diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index ed1c1d3b0a..6344ce8e55 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -687,6 +687,45 @@ func TestProxyStore_LabelValues(t *testing.T) { testutil.Equals(t, 1, len(resp.Warnings)) } +func TestProxyStore_LabelNames(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + m1 := &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + Warnings: []string{"warning"}, + }, + } + + m2 := &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "c", "d"}, + }, + } + + cls := []Client{ + &testClient{StoreClient: m1}, + &testClient{StoreClient: m2}, + } + + q := NewProxyStore(nil, + func(context.Context) ([]Client, error) { return cls, nil }, + component.Query, + nil, + ) + + ctx := context.Background() + req := &storepb.LabelNamesRequest{ + PartialResponseDisabled: true, + } + resp, err := q.LabelNames(ctx, req) + testutil.Ok(t, err) + testutil.Assert(t, proto.Equal(req, m1.LastLabelNamesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelNamesReq) + + testutil.Equals(t, []string{"a", "b", "c", "d"}, resp.Names) + testutil.Equals(t, 1, len(resp.Warnings)) +} + type rawSeries struct { lset []storepb.Label samples []sample @@ -830,11 +869,13 @@ func (s *storeSeriesServer) Context() context.Context { type mockedStoreAPI struct { RespSeries []*storepb.SeriesResponse RespLabelValues *storepb.LabelValuesResponse + RespLabelNames *storepb.LabelNamesResponse RespError error RespDuration time.Duration LastSeriesReq *storepb.SeriesRequest LastLabelValuesReq *storepb.LabelValuesRequest + LastLabelNamesReq *storepb.LabelNamesRequest } func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) { @@ -848,7 +889,9 @@ func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, } func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") + s.LastLabelNamesReq = req + + return s.RespLabelNames, s.RespError } func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 1a5b5f820b..bf43d1e975 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -165,10 +165,20 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb. } // LabelNames returns all known label names. -func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( +func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + q, err := s.db.Querier(math.MinInt64, math.MaxInt64) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") + + res, err := q.LabelNames() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &storepb.LabelNamesResponse{Names: res}, nil } // LabelValues returns all known label values for a given label name. From 5244e2123f9e5ad4dc4f13b86a72aa906f469244 Mon Sep 17 00:00:00 2001 From: jojohappy Date: Wed, 27 Mar 2019 17:03:02 +0800 Subject: [PATCH 2/9] Addressed review comments Signed-off-by: jojohappy --- pkg/store/prometheus.go | 10 +++++++--- pkg/store/proxy_test.go | 39 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index edb791e414..6b65c28848 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -336,7 +336,7 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label { } // LabelNames returns all known label names. -func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( +func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { u := *p.base @@ -357,13 +357,17 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") var m struct { - Data []string `json:"data"` + Data []string `json:"data"` + Status string `json:"status"` + Error string `json:"error"` } if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { return nil, status.Error(codes.Unknown, err.Error()) } - sort.Strings(m.Data) + if m.Status != "success" { + return nil, status.Error(codes.Unknown, m.Error) + } return &storepb.LabelNamesResponse{Names: m.Data}, nil } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 6344ce8e55..d8640e7952 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -692,8 +692,7 @@ func TestProxyStore_LabelNames(t *testing.T) { m1 := &mockedStoreAPI{ RespLabelNames: &storepb.LabelNamesResponse{ - Names: []string{"a", "b"}, - Warnings: []string{"warning"}, + Names: []string{"a", "b"}, }, } @@ -723,6 +722,42 @@ func TestProxyStore_LabelNames(t *testing.T) { testutil.Assert(t, proto.Equal(req, m1.LastLabelNamesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelNamesReq) testutil.Equals(t, []string{"a", "b", "c", "d"}, resp.Names) + testutil.Equals(t, 0, len(resp.Warnings)) +} + +func TestProxyStore_LabelNames_PartialResponseEnable(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + m1 := &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + } + + m2 := &mockedStoreAPI{ + RespError: errors.New("error!"), + } + + cls := []Client{ + &testClient{StoreClient: m1}, + &testClient{StoreClient: m2}, + } + + q := NewProxyStore(nil, + func(context.Context) ([]Client, error) { return cls, nil }, + component.Query, + nil, + ) + + ctx := context.Background() + req := &storepb.LabelNamesRequest{ + PartialResponseDisabled: false, + } + resp, err := q.LabelNames(ctx, req) + testutil.Ok(t, err) + testutil.Assert(t, proto.Equal(req, m1.LastLabelNamesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelNamesReq) + + testutil.Equals(t, []string{"a", "b"}, resp.Names) testutil.Equals(t, 1, len(resp.Warnings)) } From 9c364d6eed70338d48b4bbd134d81b4fe79993ea Mon Sep 17 00:00:00 2001 From: jojohappy Date: Fri, 29 Mar 2019 11:01:23 +0800 Subject: [PATCH 3/9] Address reivew comments Signed-off-by: jojohappy --- pkg/store/proxy.go | 2 +- pkg/store/proxy_test.go | 157 ++++++++++++++++++++++++---------------- 2 files changed, 95 insertions(+), 64 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 49ff212c0c..9f39fb10d1 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -421,7 +421,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques } mtx.Lock() - warnings = append(warnings, errors.Wrap(err, "fetch label names").Error()) + warnings = append(warnings, err.Error()) mtx.Unlock() return nil } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index d8640e7952..cab00880a3 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -690,75 +690,106 @@ func TestProxyStore_LabelValues(t *testing.T) { func TestProxyStore_LabelNames(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() - m1 := &mockedStoreAPI{ - RespLabelNames: &storepb.LabelNamesResponse{ - Names: []string{"a", "b"}, - }, - } - - m2 := &mockedStoreAPI{ - RespLabelNames: &storepb.LabelNamesResponse{ - Names: []string{"a", "c", "d"}, - }, - } - - cls := []Client{ - &testClient{StoreClient: m1}, - &testClient{StoreClient: m2}, - } - - q := NewProxyStore(nil, - func(context.Context) ([]Client, error) { return cls, nil }, - component.Query, - nil, - ) - - ctx := context.Background() - req := &storepb.LabelNamesRequest{ - PartialResponseDisabled: true, - } - resp, err := q.LabelNames(ctx, req) - testutil.Ok(t, err) - testutil.Assert(t, proto.Equal(req, m1.LastLabelNamesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelNamesReq) - - testutil.Equals(t, []string{"a", "b", "c", "d"}, resp.Names) - testutil.Equals(t, 0, len(resp.Warnings)) -} + for _, tc := range []struct { + title string + storeAPIs []Client -func TestProxyStore_LabelNames_PartialResponseEnable(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + req *storepb.LabelNamesRequest - m1 := &mockedStoreAPI{ - RespLabelNames: &storepb.LabelNamesResponse{ - Names: []string{"a", "b"}, + expectedNames []string + expectedErr error + expectedWarningsLen int + }{ + { + title: "label_names partial response disabled", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "c", "d"}, + }, + }, + }, + }, + req: &storepb.LabelNamesRequest{ + PartialResponseDisabled: true, + }, + expectedNames: []string{"a", "b", "c", "d"}, + expectedWarningsLen: 0, }, - } - - m2 := &mockedStoreAPI{ - RespError: errors.New("error!"), - } - - cls := []Client{ - &testClient{StoreClient: m1}, - &testClient{StoreClient: m2}, - } + { + title: "label_names partial response disabled, but returns error", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + }, + }, + req: &storepb.LabelNamesRequest{ + PartialResponseDisabled: true, + }, + expectedErr: errors.New("fetch label names from store test: error!"), + }, + { + title: "label_names partial response enabled", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + }, + }, + req: &storepb.LabelNamesRequest{ + PartialResponseDisabled: false, + }, + expectedNames: []string{"a", "b"}, + expectedWarningsLen: 1, + }, + } { + if ok := t.Run(tc.title, func(t *testing.T) { + q := NewProxyStore(nil, + func(_ context.Context) ([]Client, error) { return tc.storeAPIs, nil }, + component.Query, + nil, + ) - q := NewProxyStore(nil, - func(context.Context) ([]Client, error) { return cls, nil }, - component.Query, - nil, - ) + ctx := context.Background() + resp, err := q.LabelNames(ctx, tc.req) + if tc.expectedErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tc.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, err) - ctx := context.Background() - req := &storepb.LabelNamesRequest{ - PartialResponseDisabled: false, + testutil.Equals(t, tc.expectedNames, resp.Names) + testutil.Equals(t, tc.expectedWarningsLen, len(resp.Warnings), "got %v", resp.Warnings) + }); !ok { + return + } } - resp, err := q.LabelNames(ctx, req) - testutil.Ok(t, err) - testutil.Assert(t, proto.Equal(req, m1.LastLabelNamesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelNamesReq) - - testutil.Equals(t, []string{"a", "b"}, resp.Names) - testutil.Equals(t, 1, len(resp.Warnings)) } type rawSeries struct { From a1c8ac305f2a5f34a499d3f24d3f7cb5cafa2e8f Mon Sep 17 00:00:00 2001 From: jojohappy Date: Fri, 29 Mar 2019 11:14:16 +0800 Subject: [PATCH 4/9] Update CHANGELOG Signed-off-by: jojohappy --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ce8c27c1e..1c3b566ff7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver - [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples. - [#910](https://github.com/improbable-eng/thanos/pull/910) Query's stores UI page is now sorted by type and old DNS or File SD stores are removed after 5 minutes (configurable via the new `--store.unhealthy-timeout=5m` flag). +- [#905](https://github.com/improbable-eng/thanos/pull/905) New Query API: /api/v1/labels. Noticed that the API was added in Prometheus v2.6. New options: From e2a781f32562e760cb1b3cfa873d0c69a846ba85 Mon Sep 17 00:00:00 2001 From: jojohappy Date: Fri, 29 Mar 2019 11:59:38 +0800 Subject: [PATCH 5/9] Fixed new proxy store instance Signed-off-by: jojohappy --- pkg/store/proxy.go | 8 ++------ pkg/store/proxy_test.go | 6 ++++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 9f39fb10d1..bbcf8fdfa1 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -404,11 +404,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques g, gctx = errgroup.WithContext(ctx) ) - stores, err := s.stores(ctx) - if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) - } - for _, st := range stores { + for _, st := range s.stores() { st := st g.Go(func() error { resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{ @@ -460,7 +456,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ store := st g.Go(func() error { resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{ - Label: r.Label, + Label: r.Label, PartialResponseDisabled: r.PartialResponseDisabled, }) if err != nil { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index cab00880a3..caaf6ff8bc 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -769,10 +769,12 @@ func TestProxyStore_LabelNames(t *testing.T) { }, } { if ok := t.Run(tc.title, func(t *testing.T) { - q := NewProxyStore(nil, - func(_ context.Context) ([]Client, error) { return tc.storeAPIs, nil }, + q := NewProxyStore( + nil, + func() []Client { return tc.storeAPIs }, component.Query, nil, + 0*time.Second, ) ctx := context.Background() From 18b0b9dfd9d5dbba172d7034da96f4f4399b8176 Mon Sep 17 00:00:00 2001 From: jojohappy Date: Mon, 8 Apr 2019 18:12:39 +0800 Subject: [PATCH 6/9] To mapping response status code to grpc codes when request failed Signed-off-by: jojohappy --- pkg/store/prometheus.go | 57 +++++++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 6b65c28848..994cdc0d10 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -29,6 +29,14 @@ import ( "google.golang.org/grpc/status" ) +var statusToCode = map[int]codes.Code{ + http.StatusBadRequest: codes.InvalidArgument, + http.StatusNotFound: codes.NotFound, + http.StatusUnprocessableEntity: codes.Internal, + http.StatusServiceUnavailable: codes.Unavailable, + http.StatusInternalServerError: codes.Internal, +} + // PrometheusStore implements the store node API on top of the Prometheus remote read API. type PrometheusStore struct { logger log.Logger @@ -336,7 +344,7 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label { } // LabelNames returns all known label names. -func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( +func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { u := *p.base @@ -344,7 +352,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR req, err := http.NewRequest("GET", u.String(), nil) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } span, ctx := tracing.StartSpan(ctx, "/prom_label_names HTTP[client]") @@ -352,7 +360,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR resp, err := p.client.Do(req.WithContext(ctx)) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") @@ -362,12 +370,23 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR Error string `json:"error"` } if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + + if resp.StatusCode == http.StatusNoContent { + return &storepb.LabelNamesResponse{Names: []string{}}, nil + } else if m.Status != "success" { + if !r.PartialResponseDisabled { + return &storepb.LabelNamesResponse{Names: m.Data}, nil + } else { + code, exists := statusToCode[resp.StatusCode] + if !exists { + code = codes.Internal + } + return nil, status.Error(code, m.Error) + } } - if m.Status != "success" { - return nil, status.Error(codes.Unknown, m.Error) - } return &storepb.LabelNamesResponse{Names: m.Data}, nil } @@ -385,7 +404,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue req, err := http.NewRequest("GET", u.String(), nil) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } span, ctx := tracing.StartSpan(ctx, "/prom_label_values HTTP[client]") @@ -393,17 +412,33 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue resp, err := p.client.Do(req.WithContext(ctx)) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body") var m struct { - Data []string `json:"data"` + Data []string `json:"data"` + Status string `json:"status"` + Error string `json:"error"` } if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } + sort.Strings(m.Data) + if resp.StatusCode == http.StatusNoContent { + return &storepb.LabelValuesResponse{Values: []string{}}, nil + } else if m.Status != "success" { + if !r.PartialResponseDisabled { + return &storepb.LabelValuesResponse{Values: m.Data}, nil + } else { + code, exists := statusToCode[resp.StatusCode] + if !exists { + code = codes.Internal + } + return nil, status.Error(code, m.Error) + } + } return &storepb.LabelValuesResponse{Values: m.Data}, nil } From 8441a1706e1798b088d9e6de6834c62c35d32e76 Mon Sep 17 00:00:00 2001 From: jojohappy Date: Tue, 9 Apr 2019 10:18:16 +0800 Subject: [PATCH 7/9] Simplify code Signed-off-by: jojohappy --- pkg/store/prometheus.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 994cdc0d10..b96b96c7b0 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -375,16 +375,18 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR if resp.StatusCode == http.StatusNoContent { return &storepb.LabelNamesResponse{Names: []string{}}, nil - } else if m.Status != "success" { + } + + if m.Status != "success" { if !r.PartialResponseDisabled { return &storepb.LabelNamesResponse{Names: m.Data}, nil - } else { - code, exists := statusToCode[resp.StatusCode] - if !exists { - code = codes.Internal - } - return nil, status.Error(code, m.Error) } + + code, exists := statusToCode[resp.StatusCode] + if !exists { + code = codes.Internal + } + return nil, status.Error(code, m.Error) } return &storepb.LabelNamesResponse{Names: m.Data}, nil @@ -426,18 +428,21 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue } sort.Strings(m.Data) + if resp.StatusCode == http.StatusNoContent { return &storepb.LabelValuesResponse{Values: []string{}}, nil - } else if m.Status != "success" { + } + + if m.Status != "success" { if !r.PartialResponseDisabled { return &storepb.LabelValuesResponse{Values: m.Data}, nil - } else { - code, exists := statusToCode[resp.StatusCode] - if !exists { - code = codes.Internal - } - return nil, status.Error(code, m.Error) } + + code, exists := statusToCode[resp.StatusCode] + if !exists { + code = codes.Internal + } + return nil, status.Error(code, m.Error) } return &storepb.LabelValuesResponse{Values: m.Data}, nil From 63ef9cacd8123c735f341207e96038af4bb2dda6 Mon Sep 17 00:00:00 2001 From: jojohappy Date: Sat, 13 Apr 2019 15:27:54 +0800 Subject: [PATCH 8/9] Addressed review comments Signed-off-by: jojohappy --- go.sum | 2 +- pkg/store/bucket.go | 22 +++++++++++++++------- pkg/store/prometheus.go | 8 ++++---- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/go.sum b/go.sum index e8269186b4..40dc119b1b 100644 --- a/go.sum +++ b/go.sum @@ -239,7 +239,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.2.3-0.20181014000028-04af85275a5c/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/tdewolff/minify/v2 v2.3.7/go.mod h1:DD1stRlSx6JsHfl1+E/HVMQeXiec9rD1UQ0epklIZLc= github.com/tdewolff/parse/v2 v2.3.5/go.mod h1:HansaqmN4I/U7L6/tUp0NcwT2tFO0F4EAWYGSDzkYNk= diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 2c722d8c32..ba673687cc 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -856,7 +856,7 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { } // LabelNames implements the storepb.StoreServer interface. -func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { +func (s *BucketStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { g, gctx := errgroup.WithContext(ctx) s.mtx.RLock() @@ -869,19 +869,24 @@ func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesReque g.Go(func() error { defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") - res := indexr.LabelNames() - + res, err := indexr.LabelNames(gctx) + sort.Strings(res) mtx.Lock() sets = append(sets, res) mtx.Unlock() - - return nil + return err }) } s.mtx.RUnlock() if err := g.Wait(); err != nil { + if !r.PartialResponseDisabled { + return &storepb.LabelNamesResponse{ + Names: strutil.MergeSlices(sets...), + Warnings: []string{err.Error()}, + }, nil + } return nil, status.Error(codes.Aborted, err.Error()) } return &storepb.LabelNamesResponse{ @@ -1646,12 +1651,15 @@ func (r *bucketIndexReader) LabelValues(name string) []string { } // LabelNames returns a list of label names. -func (r *bucketIndexReader) LabelNames() []string { +func (r *bucketIndexReader) LabelNames(ctx context.Context) ([]string, error) { res := make([]string, 0, len(r.block.lvals)) for ln, _ := range r.block.lvals { + if ctx.Err() != nil { + return res, ctx.Err() + } res = append(res, ln) } - return res + return res, nil } // Close released the underlying resources of the reader. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index b96b96c7b0..d8cb1db47b 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -379,12 +379,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR if m.Status != "success" { if !r.PartialResponseDisabled { - return &storepb.LabelNamesResponse{Names: m.Data}, nil + return &storepb.LabelNamesResponse{Names: m.Data, Warnings: []string{m.Error}}, nil } code, exists := statusToCode[resp.StatusCode] if !exists { - code = codes.Internal + return nil, status.Error(codes.Internal, m.Error) } return nil, status.Error(code, m.Error) } @@ -435,12 +435,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue if m.Status != "success" { if !r.PartialResponseDisabled { - return &storepb.LabelValuesResponse{Values: m.Data}, nil + return &storepb.LabelValuesResponse{Values: m.Data, Warnings: []string{m.Error}}, nil } code, exists := statusToCode[resp.StatusCode] if !exists { - code = codes.Internal + return nil, status.Error(codes.Internal, m.Error) } return nil, status.Error(code, m.Error) } From 90fb062fe66458da7da9cc25c8bce5389dcb3625 Mon Sep 17 00:00:00 2001 From: jojohappy Date: Tue, 16 Apr 2019 18:26:17 +0800 Subject: [PATCH 9/9] Remove partial response logic in store Signed-off-by: jojohappy --- pkg/store/bucket.go | 23 ++++++++--------------- pkg/store/prometheus.go | 34 +++++++++++++++++----------------- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ba673687cc..b75e16e58c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -856,7 +856,7 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { } // LabelNames implements the storepb.StoreServer interface. -func (s *BucketStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { +func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { g, gctx := errgroup.WithContext(ctx) s.mtx.RLock() @@ -869,25 +869,21 @@ func (s *BucketStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReque g.Go(func() error { defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") - res, err := indexr.LabelNames(gctx) + res := indexr.LabelNames() sort.Strings(res) + mtx.Lock() sets = append(sets, res) mtx.Unlock() - return err + + return nil }) } s.mtx.RUnlock() if err := g.Wait(); err != nil { - if !r.PartialResponseDisabled { - return &storepb.LabelNamesResponse{ - Names: strutil.MergeSlices(sets...), - Warnings: []string{err.Error()}, - }, nil - } - return nil, status.Error(codes.Aborted, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &storepb.LabelNamesResponse{ Names: strutil.MergeSlices(sets...), @@ -1651,15 +1647,12 @@ func (r *bucketIndexReader) LabelValues(name string) []string { } // LabelNames returns a list of label names. -func (r *bucketIndexReader) LabelNames(ctx context.Context) ([]string, error) { +func (r *bucketIndexReader) LabelNames() []string { res := make([]string, 0, len(r.block.lvals)) for ln, _ := range r.block.lvals { - if ctx.Err() != nil { - return res, ctx.Err() - } res = append(res, ln) } - return res, nil + return res } // Close released the underlying resources of the reader. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index d8cb1db47b..ea3482a19b 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -344,7 +344,7 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label { } // LabelNames returns all known label names. -func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( +func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { u := *p.base @@ -364,6 +364,14 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR } defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") + if resp.StatusCode/100 != 2 { + return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) + } + + if resp.StatusCode == http.StatusNoContent { + return &storepb.LabelNamesResponse{Names: []string{}}, nil + } + var m struct { Data []string `json:"data"` Status string `json:"status"` @@ -373,15 +381,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR return nil, status.Error(codes.Internal, err.Error()) } - if resp.StatusCode == http.StatusNoContent { - return &storepb.LabelNamesResponse{Names: []string{}}, nil - } - if m.Status != "success" { - if !r.PartialResponseDisabled { - return &storepb.LabelNamesResponse{Names: m.Data, Warnings: []string{m.Error}}, nil - } - code, exists := statusToCode[resp.StatusCode] if !exists { return nil, status.Error(codes.Internal, m.Error) @@ -418,6 +418,14 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue } defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body") + if resp.StatusCode/100 != 2 { + return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) + } + + if resp.StatusCode == http.StatusNoContent { + return &storepb.LabelValuesResponse{Values: []string{}}, nil + } + var m struct { Data []string `json:"data"` Status string `json:"status"` @@ -429,15 +437,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue sort.Strings(m.Data) - if resp.StatusCode == http.StatusNoContent { - return &storepb.LabelValuesResponse{Values: []string{}}, nil - } - if m.Status != "success" { - if !r.PartialResponseDisabled { - return &storepb.LabelValuesResponse{Values: m.Data, Warnings: []string{m.Error}}, nil - } - code, exists := statusToCode[resp.StatusCode] if !exists { return nil, status.Error(codes.Internal, m.Error)