diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 58313033ba9..cd519b91ace 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -12,14 +12,17 @@ import ( "net/url" "path" "sort" + "strings" "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/component" @@ -80,7 +83,7 @@ func NewPrometheusStore( } // Info returns store information about the Prometheus instance. -// NOTE(bplotka): MaxTime & MinTime are not accurate nor adjusted dynamically. +// NOTE(bwplotka): MaxTime & MinTime are not accurate nor adjusted dynamically. // This is fine for now, but might be needed in future. func (p *PrometheusStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) { lset := p.externalLabels() @@ -162,12 +165,113 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie q.Matchers = append(q.Matchers, pm) } - resp, err := p.promSeries(s.Context(), q) + queryPrometheusSpan, ctx := tracing.StartSpan(s.Context(), "query_prometheus") + + httpResp, err := p.startPromSeries(ctx, q) if err != nil { + queryPrometheusSpan.Finish() return errors.Wrap(err, "query Prometheus") } - span, _ := tracing.StartSpan(s.Context(), "transform_and_respond") + // Negotiate content. We requested streamed chunked response type, but still we need to support old versions of + // remote read. + contentType := httpResp.Header.Get("Content-Type") + if strings.HasPrefix(contentType, "application/x-protobuf") { + return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, externalLabels) + } + + if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") { + return errors.Errorf("not supported remote read content type: %s", contentType) + } + + level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") + + framesNum := 0 + seriesNum := 0 + + defer func() { + queryPrometheusSpan.SetTag("frames", framesNum) + queryPrometheusSpan.SetTag("series", seriesNum) + queryPrometheusSpan.Finish() + }() + defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body") + + lastSeries := "" + + data := p.getBuffer() + defer p.putBuffer(data) + + // TODO(bwplotka): Put read limit as a flag. + stream := remote.NewChunkedReader(httpResp.Body, remote.DefaultChunkedReadLimit, *data) + for { + res := &prompb.ChunkedReadResponse{} + err := stream.NextProto(res) + if err == io.EOF { + break + } + if err != nil { + return errors.Wrap(err, "next proto") + } + + if len(res.ChunkedSeries) != 1 { + level.Warn(p.logger).Log("msg", "Prometheus ReadRequest_STREAMED_XOR_CHUNKS returned non 1 series in frame", "series", len(res.ChunkedSeries)) + } + + framesNum++ + for _, series := range res.ChunkedSeries { + hash := func() string { + var m []string + for _, l := range series.Labels { + m = append(m, l.String()) + } + return strings.Join(m, ";") + }() + if hash != lastSeries { + seriesNum++ + lastSeries = hash + } + + thanosChks := make([]storepb.AggrChunk, len(series.Chunks)) + + for i, chk := range series.Chunks { + thanosChks[i] = storepb.AggrChunk{ + MaxTime: chk.MaxTimeMs, + MinTime: chk.MinTimeMs, + Raw: &storepb.Chunk{ + Data: chk.Data, + // Prometheus ChunkEncoding vs ours https://github.com/thanos-io/thanos/blob/master/pkg/store/storepb/types.proto#L19 + // has one difference. Prometheus has Chunk_UNKNOWN Chunk_Encoding = 0 vs we start from + // XOR as 0. Compensate for that here: + Type: storepb.Chunk_Encoding(chk.Type - 1), + }, + } + series.Chunks[i].Data = nil + } + + if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ + Labels: p.translateAndExtendLabels(series.Labels, externalLabels), + Chunks: thanosChks, + })); err != nil { + return err + } + } + } + level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum) + return nil +} + +func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, externalLabels labels.Labels) error { + ctx := s.Context() + + level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") + + resp, err := p.fetchSampledResponse(ctx, httpResp) + querySpan.Finish() + if err != nil { + return err + } + + span, _ := tracing.StartSpan(ctx, "transform_and_respond") defer span.Finish() span.SetTag("series_count", len(resp.Results[0].Timeseries)) @@ -195,17 +299,48 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return err } - resp := storepb.NewSeriesResponse(&storepb.Series{ + if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ Labels: lset, Chunks: aggregatedChunks, - }) - if err := s.Send(resp); err != nil { + })); err != nil { return err } } + level.Debug(p.logger).Log("msg", "handled ReadRequest_SAMPLED request.", "series", len(resp.Results[0].Timeseries)) return nil } +func (p *PrometheusStore) fetchSampledResponse(ctx context.Context, resp *http.Response) (*prompb.ReadResponse, error) { + defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "prom series request body") + + b := p.getBuffer() + buf := bytes.NewBuffer(*b) + defer p.putBuffer(b) + if _, err := io.Copy(buf, resp.Body); err != nil { + return nil, errors.Wrap(err, "copy response") + } + spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response") + sb := p.getBuffer() + decomp, err := snappy.Decode(*sb, buf.Bytes()) + spanSnappyDecode.Finish() + defer p.putBuffer(sb) + if err != nil { + return nil, errors.Wrap(err, "decompress response") + } + + var data prompb.ReadResponse + spanUnmarshal, _ := tracing.StartSpan(ctx, "unmarshal_response") + if err := proto.Unmarshal(decomp, &data); err != nil { + return nil, errors.Wrap(err, "unmarshal response") + } + spanUnmarshal.Finish() + if len(data.Results) != 1 { + return nil, errors.Errorf("unexpected result size %d", len(data.Results)) + } + + return &data, nil +} + func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) { samples := series.Samples @@ -232,11 +367,11 @@ func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerC return chks, nil } -func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*prompb.ReadResponse, error) { - span, ctx := tracing.StartSpan(ctx, "query_prometheus") - defer span.Finish() - - reqb, err := proto.Marshal(&prompb.ReadRequest{Queries: []*prompb.Query{q}}) +func (p *PrometheusStore) startPromSeries(ctx context.Context, q *prompb.Query) (*http.Response, error) { + reqb, err := proto.Marshal(&prompb.ReadRequest{ + Queries: []*prompb.Query{q}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + }) if err != nil { return nil, errors.Wrap(err, "marshal read request") } @@ -249,8 +384,8 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*pro return nil, errors.Wrap(err, "unable to create request") } preq.Header.Add("Content-Encoding", "snappy") - preq.Header.Set("Content-Type", "application/x-protobuf") - preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + preq.Header.Set("Content-Type", "application/x-stream-protobuf") + preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.2.0") spanReqDo, ctx := tracing.StartSpan(ctx, "query_prometheus_request") preq = preq.WithContext(ctx) presp, err := p.client.Do(preq) @@ -258,38 +393,17 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*pro return nil, errors.Wrap(err, "send request") } spanReqDo.Finish() - defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body") - if presp.StatusCode/100 != 2 { - return nil, errors.Errorf("request failed with code %s", presp.Status) - } - - c := p.getBuffer() - buf := bytes.NewBuffer(*c) - defer p.putBuffer(c) - if _, err := io.Copy(buf, presp.Body); err != nil { - return nil, errors.Wrap(err, "copy response") - } - - spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response") - sc := p.getBuffer() - decomp, err := snappy.Decode(*sc, buf.Bytes()) - spanSnappyDecode.Finish() - defer p.putBuffer(sc) - if err != nil { - return nil, errors.Wrap(err, "decompress response") + // Best effort read. + b, err := ioutil.ReadAll(presp.Body) + if err != nil { + level.Error(p.logger).Log("msg", "failed to read response from non 2XX remote read request", "err", err) + } + _ = presp.Body.Close() + return nil, errors.Errorf("request failed with code %s; msg %s", presp.Status, string(b)) } - var data prompb.ReadResponse - spanUnmarshal, _ := tracing.StartSpan(ctx, "unmarshal_response") - if err := proto.Unmarshal(decomp, &data); err != nil { - return nil, errors.Wrap(err, "unmarshal response") - } - spanUnmarshal.Finish() - if len(data.Results) != 1 { - return nil, errors.Errorf("unexpected result size %d", len(data.Results)) - } - return &data, nil + return presp, nil } // matchesExternalLabels filters out external labels matching from matcher if exsits as the local storage does not have them. diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index a2d47fd42cc..d8bb3ed634b 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -34,6 +34,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { p, err := testutil.NewPrometheusOnPath(prefix) testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 @@ -50,7 +51,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) @@ -65,15 +65,13 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { // Query all three samples except for the first one. Since we round up queried data // to seconds, we can test whether the extra sample gets stripped properly. srv := newStoreSeriesServer(ctx) - - err = proxy.Series(&storepb.SeriesRequest{ + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ MinTime: baseT + 101, MaxTime: baseT + 300, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, }, - }, srv) - testutil.Ok(t, err) + }, srv)) testutil.Equals(t, 1, len(srv.SeriesSet)) @@ -134,6 +132,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { p, err := testutil.NewPrometheus() testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() a := p.Appender() _, err = a.Add(labels.FromStrings("a", "b"), 0, 1) @@ -148,7 +147,6 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) @@ -170,6 +168,7 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) { p, err := testutil.NewPrometheus() testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() a := p.Appender() _, err = a.Add(labels.FromStrings("ext_a", "b"), 0, 1) @@ -182,7 +181,6 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) @@ -210,6 +208,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) { p, err := testutil.NewPrometheus() testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 @@ -226,7 +225,6 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 229f81c2bac..11191edf555 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -712,6 +712,10 @@ type StoreClient interface { /// available. Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) /// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range. + /// + /// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain + //// partition of the single series, but once a new series is started to be streamed it means that no more data will + //// be sent for previous one. Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (Store_SeriesClient, error) /// LabelNames returns all label names that is available. /// Currently unimplemented in all Thanos implementations, because Query API does not implement this either. @@ -793,6 +797,10 @@ type StoreServer interface { /// available. Info(context.Context, *InfoRequest) (*InfoResponse, error) /// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range. + /// + /// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain + //// partition of the single series, but once a new series is started to be streamed it means that no more data will + //// be sent for previous one. Series(*SeriesRequest, Store_SeriesServer) error /// LabelNames returns all label names that is available. /// Currently unimplemented in all Thanos implementations, because Query API does not implement this either. diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 35ea8694544..e50077b739d 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -18,6 +18,10 @@ service Store { rpc Info(InfoRequest) returns (InfoResponse); /// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range. + /// + /// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain + //// partition of the single series, but once a new series is started to be streamed it means that no more data will + //// be sent for previous one. rpc Series(SeriesRequest) returns (stream SeriesResponse); /// LabelNames returns all label names that is available.