From 55deaba4373012e64775b8951bd12e308a3bb854 Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Fri, 25 Mar 2022 11:11:48 +0100 Subject: [PATCH] Use streaming responses Signed-off-by: fpetkovski --- cmd/thanos/query.go | 2 +- docs/proposals-done/202203-grpc-query-api.md | 46 +- go.mod | 1 - go.sum | 1 - pkg/api/query/grpc.go | 83 +-- pkg/api/query/querypb/query.pb.go | 575 ++++++++++++++----- pkg/api/query/querypb/query.proto | 20 +- pkg/api/query/querypb/responses.go | 50 ++ pkg/testutil/testutil.go | 17 + test/e2e/query_test.go | 89 ++- 10 files changed, 673 insertions(+), 211 deletions(-) create mode 100644 pkg/api/query/querypb/responses.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 38fc2f9322b..7c65edf77d4 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -677,7 +677,7 @@ func runQuery( info.WithTargetsInfoFunc(), ) - grpcAPI := apiv1.NewGrpcAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution) + grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)), grpcserver.WithServer(store.RegisterStoreServer(proxy)), diff --git a/docs/proposals-done/202203-grpc-query-api.md b/docs/proposals-done/202203-grpc-query-api.md index 88d478bd98f..d3f6fd6dc14 100644 --- a/docs/proposals-done/202203-grpc-query-api.md +++ b/docs/proposals-done/202203-grpc-query-api.md @@ -1,23 +1,25 @@ ## Introduce a Query gRPC API * **Owners:** - * `@fpetkovski` - * `@mzardab` + * `@fpetkovski` + * `@mzardab` > TL;DR: Introducing a new gRPC API for `/query` and `/query_range` ## Why + We want to be able to distinguish between gRPC Store APIs and other Queriers in the query path. Currently, Thanos Query implements the gRPC Store API and the root Querier does not distinguish between Store targets and other Queriers that are capable of processing a PromQL expression before returning the result. The new gRPC Query API will allow a querier to fan out query execution, in addition to Store API selects. -This is useful for a few reasons: +This is useful for a few reasons: * When Queriers register disjoint Store targets, they should be able to deduplicate series and then execute the query without concerns of duplicate data from other queriers. This new API would enable users to effectively partition by Querier, and avoid shipping raw series back from each disjointed Querier to the root Querier. -* If Queriers register conjoint Store targets, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers. -* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP. +* If Queriers register Store targets with overlapping series, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers. +* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP. +* When there is only one StoreAPI connected to Thanos Query which completely covers the requested range of the original user's query, then it is more optimal to execute the query directly in the store, instead of sending raw samples to the querier. This scenario is not unlikely given query-frontend's sharding capabilities. ### Pitfalls of the current solution -Thanos Query currently allows for `query` and `query_range` operations through HTTP only. Various query strategies can be implemented using the HTTP API, an analogous gRPC API would allow for a more resource efficient and expressive query execution path. The two main reasons are the streaming capabilities that come out of the box with gRPC, statically typed API spec, as well as the lower bandwidth utilization which protobuf enables. +Thanos Query currently allows for `query` and `query_range` operations through HTTP only. Various query strategies can be implemented using the HTTP API, an analogous gRPC API would allow for a more resource efficient and expressive query execution path. The two main reasons are the streaming capabilities that come out of the box with gRPC, statically typed API spec, as well as the lower bandwidth utilization which protobuf enables. ## Goals * Introduce a gRPC Query API implementation equivalent to the current Querier HTTP API (`query` for instant queries, `query_range` for range queries) @@ -26,7 +28,7 @@ Thanos Query currently allows for `query` and `query_range` operations through H * Implementation of potential query sharding strategies described in this proposal. * Streaming implementations for `query` and `query_range` rpc's, these will be introduced as additional `QueryStream` and `QueryRangeStream` rpc's subsequently. -* Response series ordering equivalent to the current Prometheus Query HTTP API behaviour +* Response series ordering equivalent to the current Prometheus Query HTTP API behaviour ### Audience * Thanos Maintainers @@ -35,11 +37,12 @@ Thanos Query currently allows for `query` and `query_range` operations through H ## How We propose defining the following gRPC API: + ```protobuf service Query { - rpc Query(QueryRequest) returns (QueryResponse); - - rpc QueryRange(QueryRangeRequest) returns (QueryRangeResponse); + rpc Query(QueryRequest) returns (stream QueryResponse); + + rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse); } ``` @@ -64,7 +67,13 @@ message QueryRequest { } message QueryResponse { - repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } } message QueryRangeRequest { @@ -88,21 +97,26 @@ message QueryRangeRequest { } message QueryRangeResponse { - repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; -} + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } +} ``` The `Query` Service will be implemented by the gRPC server which is started via the `thanos query` command. ## Alternatives -The alternative to expressing a gRPC Query API would be to use the HTTP APIs and distinguish Queriers via configuration on startup. This would be suboptimal for the following reasons: +The alternative to expressing a gRPC Query API would be to use the HTTP APIs and distinguish Queriers via configuration on startup. This would be suboptimal for the following reasons: * No statically typed API definition, we would need to rely on HTTP API versioning to manage changes to the API that is intended to enable advanced query execution strategies. * HTTP not as performant as gRPC/HTTP2, gRPC/HTTP2 allows us to use streaming(less connection overhead) and protobuf(smaller response sizes), the current HTTP API does not. * Ergonomics, gRPC allows us to express a functional API with parameters, HTTP requires request parameter marshalling/unmarshalling which is very error-prone. ## Action Plan -* [x] Define the QueryServer gRPC Service -* [x] Implement the QueryServer gRPC Service in the Thanos Query +* [X] Define the QueryServer gRPC Service +* [X] Implement the QueryServer gRPC Service in the Thanos Query diff --git a/go.mod b/go.mod index 9f60c2f7382..63401edc0b6 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,6 @@ require ( gopkg.in/fsnotify.v1 v1.4.7 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b - gotest.tools v2.2.0+incompatible ) require ( diff --git a/go.sum b/go.sum index 62e65f3a21c..48fba3f860f 100644 --- a/go.sum +++ b/go.sum @@ -2678,7 +2678,6 @@ gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index a7b7beb82dc..b3882b5fe50 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -15,15 +15,15 @@ import ( "google.golang.org/grpc" ) -type GrpcAPI struct { +type GRPCAPI struct { now func() time.Time queryableCreate query.QueryableCreator queryEngine func(int64) *promql.Engine defaultMaxResolutionSeconds time.Duration } -func NewGrpcAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GrpcAPI { - return &GrpcAPI{ +func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI { + return &GRPCAPI{ now: now, queryableCreate: creator, queryEngine: queryEngine, @@ -37,10 +37,11 @@ func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) { } } -func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest) (*querypb.QueryResponse, error) { +func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_QueryServer) error { + ctx := context.Background() var ts time.Time if request.TimeSeconds == 0 { - ts = grpcAPI.now() + ts = g.now() } else { ts = time.Unix(request.TimeSeconds, 0) } @@ -54,16 +55,16 @@ func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest maxResolution := request.MaxResolutionSeconds if request.MaxResolutionSeconds == 0 { - maxResolution = grpcAPI.defaultMaxResolutionSeconds.Milliseconds() / 1000 + maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000 } storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers) if err != nil { - return nil, err + return err } - qe := grpcAPI.queryEngine(request.MaxResolutionSeconds) - queryable := grpcAPI.queryableCreate( + qe := g.queryEngine(request.MaxResolutionSeconds) + queryable := g.queryableCreate( request.EnableDedup, request.ReplicaLabels, storeMatchers, @@ -74,36 +75,41 @@ func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest ) qry, err := qe.NewInstantQuery(queryable, request.Query, ts) if err != nil { - return nil, err + return err } result := qry.Exec(ctx) + if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings)); err != nil { + return nil + } + switch vector := result.Value.(type) { case promql.Scalar: - return &querypb.QueryResponse{ - Timeseries: []prompb.TimeSeries{{ - Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}}, - }}, - }, nil - case promql.Vector: - response := &querypb.QueryResponse{ - Timeseries: make([]prompb.TimeSeries, 0, len(vector)), + series := &prompb.TimeSeries{ + Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}}, } - + if err := server.Send(querypb.NewQueryResponse(series)); err != nil { + return err + } + case promql.Vector: for _, sample := range vector { - response.Timeseries = append(response.Timeseries, prompb.TimeSeries{ + series := &prompb.TimeSeries{ Labels: labelpb.ZLabelsFromPromLabels(sample.Metric), Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}), - }) + } + if err := server.Send(querypb.NewQueryResponse(series)); err != nil { + return err + } } - return response, nil + return nil } - return nil, nil + return nil } -func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRangeRequest) (*querypb.QueryRangeResponse, error) { +func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error { + ctx := context.Background() if request.TimeoutSeconds != 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Duration(request.TimeoutSeconds)) @@ -112,16 +118,16 @@ func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRa maxResolution := request.MaxResolutionSeconds if request.MaxResolutionSeconds == 0 { - maxResolution = grpcAPI.defaultMaxResolutionSeconds.Milliseconds() / 1000 + maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000 } storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers) if err != nil { - return nil, err + return err } - qe := grpcAPI.queryEngine(request.MaxResolutionSeconds) - queryable := grpcAPI.queryableCreate( + qe := g.queryEngine(request.MaxResolutionSeconds) + queryable := g.queryableCreate( request.EnableDedup, request.ReplicaLabels, storeMatchers, @@ -137,25 +143,28 @@ func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRa qry, err := qe.NewRangeQuery(queryable, request.Query, startTime, endTime, interval) if err != nil { - return nil, err + return err } result := qry.Exec(ctx) + if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings)); err != nil { + return err + } + switch matrix := result.Value.(type) { case promql.Matrix: - response := &querypb.QueryRangeResponse{ - Timeseries: make([]prompb.TimeSeries, len(matrix)), - } - - for i, series := range matrix { - response.Timeseries[i] = prompb.TimeSeries{ + for _, series := range matrix { + series := &prompb.TimeSeries{ Labels: labelpb.ZLabelsFromPromLabels(series.Metric), Samples: prompb.SamplesFromPromqlPoints(series.Points), } + if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil { + return err + } } - return response, nil + return nil } - return nil, nil + return nil } diff --git a/pkg/api/query/querypb/query.pb.go b/pkg/api/query/querypb/query.pb.go index 274f2a567b2..a7a9711bea3 100644 --- a/pkg/api/query/querypb/query.pb.go +++ b/pkg/api/query/querypb/query.pb.go @@ -114,7 +114,10 @@ func (m *StoreMatchers) XXX_DiscardUnknown() { var xxx_messageInfo_StoreMatchers proto.InternalMessageInfo type QueryResponse struct { - Timeseries []prompb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` + // Types that are valid to be assigned to Result: + // *QueryResponse_Warnings + // *QueryResponse_Timeseries + Result isQueryResponse_Result `protobuf_oneof:"result"` } func (m *QueryResponse) Reset() { *m = QueryResponse{} } @@ -150,6 +153,51 @@ func (m *QueryResponse) XXX_DiscardUnknown() { var xxx_messageInfo_QueryResponse proto.InternalMessageInfo +type isQueryResponse_Result interface { + isQueryResponse_Result() + MarshalTo([]byte) (int, error) + Size() int +} + +type QueryResponse_Warnings struct { + Warnings string `protobuf:"bytes,1,opt,name=warnings,proto3,oneof" json:"warnings,omitempty"` +} +type QueryResponse_Timeseries struct { + Timeseries *prompb.TimeSeries `protobuf:"bytes,2,opt,name=timeseries,proto3,oneof" json:"timeseries,omitempty"` +} + +func (*QueryResponse_Warnings) isQueryResponse_Result() {} +func (*QueryResponse_Timeseries) isQueryResponse_Result() {} + +func (m *QueryResponse) GetResult() isQueryResponse_Result { + if m != nil { + return m.Result + } + return nil +} + +func (m *QueryResponse) GetWarnings() string { + if x, ok := m.GetResult().(*QueryResponse_Warnings); ok { + return x.Warnings + } + return "" +} + +func (m *QueryResponse) GetTimeseries() *prompb.TimeSeries { + if x, ok := m.GetResult().(*QueryResponse_Timeseries); ok { + return x.Timeseries + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*QueryResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*QueryResponse_Warnings)(nil), + (*QueryResponse_Timeseries)(nil), + } +} + type QueryRangeRequest struct { Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` StartTimeSeconds int64 `protobuf:"varint,2,opt,name=start_time_seconds,json=startTimeSeconds,proto3" json:"start_time_seconds,omitempty"` @@ -199,7 +247,10 @@ func (m *QueryRangeRequest) XXX_DiscardUnknown() { var xxx_messageInfo_QueryRangeRequest proto.InternalMessageInfo type QueryRangeResponse struct { - Timeseries []prompb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` + // Types that are valid to be assigned to Result: + // *QueryRangeResponse_Warnings + // *QueryRangeResponse_Timeseries + Result isQueryRangeResponse_Result `protobuf_oneof:"result"` } func (m *QueryRangeResponse) Reset() { *m = QueryRangeResponse{} } @@ -235,6 +286,51 @@ func (m *QueryRangeResponse) XXX_DiscardUnknown() { var xxx_messageInfo_QueryRangeResponse proto.InternalMessageInfo +type isQueryRangeResponse_Result interface { + isQueryRangeResponse_Result() + MarshalTo([]byte) (int, error) + Size() int +} + +type QueryRangeResponse_Warnings struct { + Warnings string `protobuf:"bytes,1,opt,name=warnings,proto3,oneof" json:"warnings,omitempty"` +} +type QueryRangeResponse_Timeseries struct { + Timeseries *prompb.TimeSeries `protobuf:"bytes,2,opt,name=timeseries,proto3,oneof" json:"timeseries,omitempty"` +} + +func (*QueryRangeResponse_Warnings) isQueryRangeResponse_Result() {} +func (*QueryRangeResponse_Timeseries) isQueryRangeResponse_Result() {} + +func (m *QueryRangeResponse) GetResult() isQueryRangeResponse_Result { + if m != nil { + return m.Result + } + return nil +} + +func (m *QueryRangeResponse) GetWarnings() string { + if x, ok := m.GetResult().(*QueryRangeResponse_Warnings); ok { + return x.Warnings + } + return "" +} + +func (m *QueryRangeResponse) GetTimeseries() *prompb.TimeSeries { + if x, ok := m.GetResult().(*QueryRangeResponse_Timeseries); ok { + return x.Timeseries + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*QueryRangeResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*QueryRangeResponse_Warnings)(nil), + (*QueryRangeResponse_Timeseries)(nil), + } +} + func init() { proto.RegisterType((*QueryRequest)(nil), "thanos.QueryRequest") proto.RegisterType((*StoreMatchers)(nil), "thanos.StoreMatchers") @@ -246,45 +342,47 @@ func init() { func init() { proto.RegisterFile("api/query/querypb/query.proto", fileDescriptor_4b2aba43925d729f) } var fileDescriptor_4b2aba43925d729f = []byte{ - // 594 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0x41, 0x6e, 0xd3, 0x40, - 0x14, 0x86, 0x63, 0xd2, 0xa4, 0xcd, 0x4b, 0xd2, 0x86, 0x21, 0x41, 0x6e, 0x10, 0xc6, 0x44, 0xaa, - 0x30, 0x12, 0x4a, 0x50, 0xc9, 0x01, 0x68, 0xcb, 0xb2, 0x48, 0xad, 0x5b, 0x09, 0x89, 0x4d, 0x34, - 0x49, 0x9e, 0x12, 0xab, 0x8e, 0xc7, 0x9d, 0x19, 0x43, 0xb3, 0xe5, 0x04, 0xdc, 0x80, 0x7b, 0x70, - 0x82, 0x2c, 0xbb, 0x64, 0x85, 0x20, 0xb9, 0x08, 0xf2, 0xd8, 0x0e, 0x76, 0x89, 0xaa, 0x06, 0xd8, - 0xb8, 0x9e, 0xff, 0xff, 0xdf, 0xf3, 0xbc, 0xe9, 0x97, 0x81, 0xc7, 0xd4, 0x77, 0x3a, 0x97, 0x01, - 0xf2, 0x69, 0xf4, 0xf4, 0xfb, 0xd1, 0xdf, 0xb6, 0xcf, 0x99, 0x64, 0xa4, 0x28, 0xc7, 0xd4, 0x63, - 0xa2, 0x59, 0x1f, 0xb1, 0x11, 0x53, 0x52, 0x27, 0x7c, 0x8b, 0xdc, 0xe6, 0xae, 0x90, 0x8c, 0x63, - 0x47, 0x3d, 0xfd, 0x7e, 0x47, 0x4e, 0x7d, 0x14, 0xb1, 0x65, 0x66, 0x2d, 0x9f, 0xb3, 0x49, 0x36, - 0xd1, 0xfa, 0x9a, 0x87, 0xca, 0x69, 0xf8, 0x29, 0x1b, 0x2f, 0x03, 0x14, 0x92, 0xd4, 0xa1, 0xa0, - 0x3e, 0xad, 0x6b, 0xa6, 0x66, 0x95, 0xec, 0x68, 0x41, 0x9e, 0x42, 0x45, 0x3a, 0x13, 0xec, 0x09, - 0x1c, 0x30, 0x6f, 0x28, 0xf4, 0x7b, 0xa6, 0x66, 0xe5, 0xed, 0x72, 0xa8, 0x9d, 0x45, 0x12, 0x79, - 0x06, 0x3b, 0xe1, 0x92, 0x05, 0x72, 0x99, 0xca, 0xab, 0xd4, 0x76, 0x2c, 0x27, 0xc1, 0x2e, 0x3c, - 0x9c, 0xd0, 0xab, 0x1e, 0x47, 0xc1, 0xdc, 0x40, 0x3a, 0xcc, 0x5b, 0xe6, 0x37, 0x54, 0xbe, 0x3e, - 0xa1, 0x57, 0xf6, 0xd2, 0x4c, 0xaa, 0xf6, 0x60, 0x9b, 0xa3, 0xef, 0x3a, 0x03, 0xda, 0x73, 0x69, - 0x1f, 0x5d, 0xa1, 0x17, 0xcc, 0xbc, 0x55, 0xb2, 0xab, 0xb1, 0x7a, 0xac, 0x44, 0x72, 0x00, 0x55, - 0x35, 0xed, 0x5b, 0x2a, 0x07, 0x63, 0xe4, 0x42, 0x2f, 0x9a, 0x79, 0xab, 0xbc, 0xdf, 0x68, 0x47, - 0x47, 0xd8, 0x3e, 0x4b, 0x9b, 0x87, 0x1b, 0xb3, 0xef, 0x4f, 0x72, 0x76, 0xb6, 0x82, 0x98, 0x50, - 0x46, 0x8f, 0xf6, 0x5d, 0x7c, 0x83, 0xc3, 0xc0, 0xd7, 0x37, 0x4d, 0xcd, 0xda, 0xb2, 0xd3, 0x12, - 0xe9, 0x42, 0x23, 0x5a, 0x9e, 0x50, 0x2e, 0x1d, 0xea, 0xda, 0x28, 0x7c, 0xe6, 0x09, 0xd4, 0xb7, - 0x54, 0x76, 0xb5, 0x49, 0x5e, 0xc2, 0x83, 0xc8, 0x50, 0xe7, 0x7d, 0x12, 0x88, 0xf1, 0x90, 0x7d, - 0xf4, 0xf4, 0x92, 0xaa, 0x59, 0x65, 0x11, 0x03, 0x40, 0x5c, 0x38, 0xfe, 0xd1, 0x38, 0xf0, 0x2e, - 0x84, 0x0e, 0x2a, 0x98, 0x52, 0x5a, 0xa7, 0x50, 0xcd, 0xcc, 0x43, 0x5e, 0x43, 0x55, 0x1d, 0xce, - 0x72, 0x7a, 0x4d, 0x4d, 0x5f, 0x4f, 0xa6, 0x3f, 0x4e, 0x99, 0xc9, 0xf0, 0x99, 0x82, 0x96, 0x0d, - 0xd5, 0x18, 0x87, 0x78, 0xd7, 0x07, 0x00, 0xe1, 0xff, 0x4f, 0x20, 0x77, 0x30, 0xe9, 0xf7, 0x28, - 0x84, 0x67, 0x82, 0x72, 0x8c, 0x81, 0xe8, 0x0d, 0x98, 0x3f, 0x6d, 0x9f, 0x2b, 0x10, 0xc2, 0x48, - 0xdc, 0x36, 0x55, 0xd4, 0xfa, 0xb2, 0x01, 0xf7, 0xa3, 0xa6, 0xd4, 0x1b, 0xe1, 0xed, 0xa0, 0xbd, - 0x00, 0x22, 0x24, 0xe5, 0xb2, 0xb7, 0x02, 0xb7, 0x9a, 0x72, 0xce, 0x53, 0xcc, 0x59, 0x50, 0x43, - 0x6f, 0x98, 0xcd, 0xc6, 0xd0, 0xa1, 0x37, 0x4c, 0x27, 0x9f, 0x43, 0xcd, 0xf1, 0x24, 0xf2, 0x0f, - 0xd4, 0xbd, 0x81, 0xdb, 0x4e, 0xa2, 0xdf, 0x02, 0x72, 0x61, 0x4d, 0x90, 0x8b, 0x6b, 0x81, 0xbc, - 0x79, 0x27, 0x90, 0xb7, 0xfe, 0x15, 0xe4, 0xd2, 0x1a, 0x20, 0xc3, 0x5f, 0x80, 0x5c, 0xbe, 0x2b, - 0xc8, 0x95, 0x3f, 0x40, 0x7e, 0x07, 0x24, 0x0d, 0xc8, 0x7f, 0x43, 0x6f, 0xff, 0x93, 0x06, 0x05, - 0xd5, 0x99, 0x74, 0x93, 0x97, 0xe5, 0x8f, 0x21, 0x7d, 0xed, 0x35, 0x1b, 0x37, 0xd4, 0x78, 0x0b, - 0x47, 0x00, 0xbf, 0x37, 0x46, 0x76, 0xb3, 0xa1, 0x14, 0xcd, 0xcd, 0xe6, 0x2a, 0x2b, 0x6a, 0x72, - 0xb8, 0x37, 0xfb, 0x69, 0xe4, 0x66, 0x73, 0x43, 0xbb, 0x9e, 0x1b, 0xda, 0x8f, 0xb9, 0xa1, 0x7d, - 0x5e, 0x18, 0xb9, 0xeb, 0x85, 0x91, 0xfb, 0xb6, 0x30, 0x72, 0xef, 0x37, 0xe3, 0x1b, 0xbf, 0x5f, - 0x54, 0x37, 0xf2, 0xab, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x54, 0x79, 0xde, 0xe2, 0x0d, 0x06, - 0x00, 0x00, + // 627 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x95, 0xcf, 0x6e, 0xd3, 0x4c, + 0x14, 0xc5, 0xed, 0x2f, 0x4d, 0x9a, 0xdc, 0x34, 0x6d, 0xbf, 0x21, 0x45, 0x6e, 0x00, 0x63, 0x22, + 0x55, 0x04, 0x09, 0x25, 0x55, 0xa9, 0xd8, 0x21, 0x41, 0x01, 0xa9, 0x8b, 0x22, 0xb5, 0x6e, 0x57, + 0x6c, 0xa2, 0x49, 0x72, 0x95, 0x58, 0x75, 0x66, 0xdc, 0x99, 0x71, 0xff, 0x88, 0x3d, 0x6b, 0xde, + 0x80, 0xf7, 0xe0, 0x09, 0xba, 0xec, 0x92, 0x15, 0x82, 0xf6, 0x45, 0x90, 0xc7, 0x76, 0xb0, 0x4b, + 0x54, 0x35, 0x20, 0xb1, 0x71, 0x3c, 0xe7, 0x9c, 0x9b, 0x3b, 0x77, 0xf2, 0x8b, 0x0d, 0x0f, 0x68, + 0xe0, 0x75, 0x8e, 0x42, 0x14, 0x67, 0xf1, 0x35, 0xe8, 0xc5, 0x9f, 0xed, 0x40, 0x70, 0xc5, 0x49, + 0x49, 0x8d, 0x28, 0xe3, 0xb2, 0x51, 0x1f, 0xf2, 0x21, 0xd7, 0x52, 0x27, 0xba, 0x8b, 0xdd, 0xc6, + 0xaa, 0x54, 0x5c, 0x60, 0x47, 0x5f, 0x83, 0x5e, 0x47, 0x9d, 0x05, 0x28, 0x13, 0xcb, 0xc9, 0x5b, + 0x81, 0xe0, 0xe3, 0x7c, 0xa2, 0xf9, 0xa5, 0x00, 0x0b, 0x7b, 0x51, 0x2b, 0x17, 0x8f, 0x42, 0x94, + 0x8a, 0xd4, 0xa1, 0xa8, 0x5b, 0x5b, 0xa6, 0x63, 0xb6, 0x2a, 0x6e, 0xbc, 0x20, 0x8f, 0x60, 0x41, + 0x79, 0x63, 0xec, 0x4a, 0xec, 0x73, 0x36, 0x90, 0xd6, 0x7f, 0x8e, 0xd9, 0x2a, 0xb8, 0xd5, 0x48, + 0xdb, 0x8f, 0x25, 0xf2, 0x18, 0x96, 0xa2, 0x25, 0x0f, 0xd5, 0x24, 0x55, 0xd0, 0xa9, 0xc5, 0x44, + 0x4e, 0x83, 0x9b, 0x70, 0x77, 0x4c, 0x4f, 0xbb, 0x02, 0x25, 0xf7, 0x43, 0xe5, 0x71, 0x36, 0xc9, + 0xcf, 0xe9, 0x7c, 0x7d, 0x4c, 0x4f, 0xdd, 0x89, 0x99, 0x56, 0xad, 0xc1, 0xa2, 0xc0, 0xc0, 0xf7, + 0xfa, 0xb4, 0xeb, 0xd3, 0x1e, 0xfa, 0xd2, 0x2a, 0x3a, 0x85, 0x56, 0xc5, 0xad, 0x25, 0xea, 0x8e, + 0x16, 0xc9, 0x2b, 0xa8, 0xe9, 0x69, 0xdf, 0x51, 0xd5, 0x1f, 0xa1, 0x90, 0x56, 0xc9, 0x29, 0xb4, + 0xaa, 0x1b, 0x2b, 0xed, 0xf8, 0x08, 0xdb, 0xfb, 0x59, 0x73, 0x6b, 0xee, 0xfc, 0xdb, 0x43, 0xc3, + 0xcd, 0x57, 0x10, 0x07, 0xaa, 0xc8, 0x68, 0xcf, 0xc7, 0x37, 0x38, 0x08, 0x03, 0x6b, 0xde, 0x31, + 0x5b, 0x65, 0x37, 0x2b, 0x91, 0x4d, 0x58, 0x89, 0x97, 0xbb, 0x54, 0x28, 0x8f, 0xfa, 0x2e, 0xca, + 0x80, 0x33, 0x89, 0x56, 0x59, 0x67, 0xa7, 0x9b, 0x64, 0x1d, 0xee, 0xc4, 0x86, 0x3e, 0xef, 0xdd, + 0x50, 0x8e, 0x06, 0xfc, 0x84, 0x59, 0x15, 0x5d, 0x33, 0xcd, 0x22, 0x36, 0x80, 0x3c, 0xf4, 0x82, + 0xd7, 0xa3, 0x90, 0x1d, 0x4a, 0x0b, 0x74, 0x30, 0xa3, 0x34, 0xf7, 0xa0, 0x96, 0x9b, 0x87, 0xbc, + 0x84, 0x9a, 0x3e, 0x9c, 0xc9, 0xf4, 0xa6, 0x9e, 0xbe, 0x9e, 0x4e, 0xbf, 0x93, 0x31, 0xd3, 0xe1, + 0x73, 0x05, 0xcd, 0x63, 0xa8, 0x25, 0x38, 0x24, 0xbb, 0xbe, 0x0f, 0xe5, 0x13, 0x2a, 0x98, 0xc7, + 0x86, 0x32, 0x46, 0x62, 0xdb, 0x70, 0x27, 0x0a, 0x79, 0x01, 0x10, 0xfd, 0xba, 0x12, 0x85, 0x87, + 0x31, 0x15, 0xd5, 0x8d, 0x7b, 0x11, 0x5a, 0x63, 0x54, 0x23, 0x0c, 0x65, 0xb7, 0xcf, 0x83, 0xb3, + 0xf6, 0x81, 0xc6, 0x24, 0x8a, 0x6c, 0x1b, 0x6e, 0xa6, 0x60, 0xab, 0x0c, 0x25, 0x81, 0x32, 0xf4, + 0x55, 0xf3, 0xf3, 0x1c, 0xfc, 0x1f, 0x37, 0xa6, 0x6c, 0x88, 0x37, 0xc3, 0xf8, 0x14, 0x88, 0x54, + 0x54, 0xa8, 0xee, 0x14, 0x24, 0x97, 0xb5, 0x73, 0x90, 0xe1, 0xb2, 0x05, 0xcb, 0xc8, 0x06, 0xf9, + 0x6c, 0x02, 0x26, 0xb2, 0x41, 0x36, 0xf9, 0x04, 0x96, 0x3d, 0xa6, 0x50, 0x1c, 0x53, 0xff, 0x1a, + 0x92, 0x4b, 0xa9, 0x7e, 0x03, 0xec, 0xc5, 0x19, 0x61, 0x2f, 0xcd, 0x04, 0xfb, 0xfc, 0xad, 0x60, + 0x2f, 0xff, 0x2d, 0xec, 0x95, 0x19, 0x60, 0x87, 0x3f, 0x80, 0xbd, 0x7a, 0x5b, 0xd8, 0x17, 0x7e, + 0x83, 0xfd, 0x03, 0x90, 0x2c, 0x20, 0xff, 0x14, 0xcf, 0x8d, 0x8f, 0x26, 0x14, 0x75, 0x77, 0xf2, + 0x3c, 0xbd, 0x99, 0xfc, 0xa9, 0xb2, 0x8f, 0xcf, 0xc6, 0xca, 0x35, 0x35, 0xde, 0xe6, 0xba, 0x49, + 0xde, 0x02, 0xfc, 0xda, 0x3e, 0x59, 0xcd, 0xc7, 0x32, 0xcc, 0x37, 0x1a, 0xd3, 0xac, 0xf4, 0x6b, + 0xb6, 0xd6, 0xce, 0x7f, 0xd8, 0xc6, 0xf9, 0xa5, 0x6d, 0x5e, 0x5c, 0xda, 0xe6, 0xf7, 0x4b, 0xdb, + 0xfc, 0x74, 0x65, 0x1b, 0x17, 0x57, 0xb6, 0xf1, 0xf5, 0xca, 0x36, 0xde, 0xcf, 0x27, 0x6f, 0x8f, + 0x5e, 0x49, 0x3f, 0xdd, 0x9f, 0xfd, 0x0c, 0x00, 0x00, 0xff, 0xff, 0x4f, 0xb6, 0xe9, 0xc9, 0x59, + 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -299,8 +397,8 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type QueryClient interface { - Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) - QueryRange(ctx context.Context, in *QueryRangeRequest, opts ...grpc.CallOption) (*QueryRangeResponse, error) + Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Query_QueryClient, error) + QueryRange(ctx context.Context, in *QueryRangeRequest, opts ...grpc.CallOption) (Query_QueryRangeClient, error) } type queryClient struct { @@ -311,95 +409,149 @@ func NewQueryClient(cc *grpc.ClientConn) QueryClient { return &queryClient{cc} } -func (c *queryClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) { - out := new(QueryResponse) - err := c.cc.Invoke(ctx, "/thanos.Query/Query", in, out, opts...) +func (c *queryClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Query_QueryClient, error) { + stream, err := c.cc.NewStream(ctx, &_Query_serviceDesc.Streams[0], "/thanos.Query/Query", opts...) if err != nil { return nil, err } - return out, nil + x := &queryQueryClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Query_QueryClient interface { + Recv() (*QueryResponse, error) + grpc.ClientStream +} + +type queryQueryClient struct { + grpc.ClientStream +} + +func (x *queryQueryClient) Recv() (*QueryResponse, error) { + m := new(QueryResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } -func (c *queryClient) QueryRange(ctx context.Context, in *QueryRangeRequest, opts ...grpc.CallOption) (*QueryRangeResponse, error) { - out := new(QueryRangeResponse) - err := c.cc.Invoke(ctx, "/thanos.Query/QueryRange", in, out, opts...) +func (c *queryClient) QueryRange(ctx context.Context, in *QueryRangeRequest, opts ...grpc.CallOption) (Query_QueryRangeClient, error) { + stream, err := c.cc.NewStream(ctx, &_Query_serviceDesc.Streams[1], "/thanos.Query/QueryRange", opts...) if err != nil { return nil, err } - return out, nil + x := &queryQueryRangeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Query_QueryRangeClient interface { + Recv() (*QueryRangeResponse, error) + grpc.ClientStream +} + +type queryQueryRangeClient struct { + grpc.ClientStream +} + +func (x *queryQueryRangeClient) Recv() (*QueryRangeResponse, error) { + m := new(QueryRangeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // QueryServer is the server API for Query service. type QueryServer interface { - Query(context.Context, *QueryRequest) (*QueryResponse, error) - QueryRange(context.Context, *QueryRangeRequest) (*QueryRangeResponse, error) + Query(*QueryRequest, Query_QueryServer) error + QueryRange(*QueryRangeRequest, Query_QueryRangeServer) error } // UnimplementedQueryServer can be embedded to have forward compatible implementations. type UnimplementedQueryServer struct { } -func (*UnimplementedQueryServer) Query(ctx context.Context, req *QueryRequest) (*QueryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Query not implemented") +func (*UnimplementedQueryServer) Query(req *QueryRequest, srv Query_QueryServer) error { + return status.Errorf(codes.Unimplemented, "method Query not implemented") } -func (*UnimplementedQueryServer) QueryRange(ctx context.Context, req *QueryRangeRequest) (*QueryRangeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method QueryRange not implemented") +func (*UnimplementedQueryServer) QueryRange(req *QueryRangeRequest, srv Query_QueryRangeServer) error { + return status.Errorf(codes.Unimplemented, "method QueryRange not implemented") } func RegisterQueryServer(s *grpc.Server, srv QueryServer) { s.RegisterService(&_Query_serviceDesc, srv) } -func _Query_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(QueryRequest) - if err := dec(in); err != nil { - return nil, err +func _Query_Query_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(QueryRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - if interceptor == nil { - return srv.(QueryServer).Query(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/thanos.Query/Query", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServer).Query(ctx, req.(*QueryRequest)) - } - return interceptor(ctx, in, info, handler) + return srv.(QueryServer).Query(m, &queryQueryServer{stream}) } -func _Query_QueryRange_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(QueryRangeRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(QueryServer).QueryRange(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/thanos.Query/QueryRange", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServer).QueryRange(ctx, req.(*QueryRangeRequest)) +type Query_QueryServer interface { + Send(*QueryResponse) error + grpc.ServerStream +} + +type queryQueryServer struct { + grpc.ServerStream +} + +func (x *queryQueryServer) Send(m *QueryResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _Query_QueryRange_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(QueryRangeRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - return interceptor(ctx, in, info, handler) + return srv.(QueryServer).QueryRange(m, &queryQueryRangeServer{stream}) +} + +type Query_QueryRangeServer interface { + Send(*QueryRangeResponse) error + grpc.ServerStream +} + +type queryQueryRangeServer struct { + grpc.ServerStream +} + +func (x *queryQueryRangeServer) Send(m *QueryRangeResponse) error { + return x.ServerStream.SendMsg(m) } var _Query_serviceDesc = grpc.ServiceDesc{ ServiceName: "thanos.Query", HandlerType: (*QueryServer)(nil), - Methods: []grpc.MethodDesc{ + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ { - MethodName: "Query", - Handler: _Query_Query_Handler, + StreamName: "Query", + Handler: _Query_Query_Handler, + ServerStreams: true, }, { - MethodName: "QueryRange", - Handler: _Query_QueryRange_Handler, + StreamName: "QueryRange", + Handler: _Query_QueryRange_Handler, + ServerStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "api/query/querypb/query.proto", } @@ -568,23 +720,53 @@ func (m *QueryResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Timeseries) > 0 { - for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintQuery(dAtA, i, uint64(size)) + if m.Result != nil { + { + size := m.Result.Size() + i -= size + if _, err := m.Result.MarshalTo(dAtA[i:]); err != nil { + return 0, err } - i-- - dAtA[i] = 0xa } } return len(dAtA) - i, nil } +func (m *QueryResponse_Warnings) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryResponse_Warnings) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Warnings) + copy(dAtA[i:], m.Warnings) + i = encodeVarintQuery(dAtA, i, uint64(len(m.Warnings))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *QueryResponse_Timeseries) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryResponse_Timeseries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Timeseries != nil { + { + size, err := m.Timeseries.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} func (m *QueryRangeRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -723,23 +905,53 @@ func (m *QueryRangeResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Timeseries) > 0 { - for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintQuery(dAtA, i, uint64(size)) + if m.Result != nil { + { + size := m.Result.Size() + i -= size + if _, err := m.Result.MarshalTo(dAtA[i:]); err != nil { + return 0, err } - i-- - dAtA[i] = 0xa } } return len(dAtA) - i, nil } +func (m *QueryRangeResponse_Warnings) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryRangeResponse_Warnings) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Warnings) + copy(dAtA[i:], m.Warnings) + i = encodeVarintQuery(dAtA, i, uint64(len(m.Warnings))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *QueryRangeResponse_Timeseries) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryRangeResponse_Timeseries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Timeseries != nil { + { + size, err := m.Timeseries.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} func encodeVarintQuery(dAtA []byte, offset int, v uint64) int { offset -= sovQuery(v) base := offset @@ -818,15 +1030,34 @@ func (m *QueryResponse) Size() (n int) { } var l int _ = l - if len(m.Timeseries) > 0 { - for _, e := range m.Timeseries { - l = e.Size() - n += 1 + l + sovQuery(uint64(l)) - } + if m.Result != nil { + n += m.Result.Size() } return n } +func (m *QueryResponse_Warnings) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warnings) + n += 1 + l + sovQuery(uint64(l)) + return n +} +func (m *QueryResponse_Timeseries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Timeseries != nil { + l = m.Timeseries.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} func (m *QueryRangeRequest) Size() (n int) { if m == nil { return 0 @@ -885,11 +1116,31 @@ func (m *QueryRangeResponse) Size() (n int) { } var l int _ = l - if len(m.Timeseries) > 0 { - for _, e := range m.Timeseries { - l = e.Size() - n += 1 + l + sovQuery(uint64(l)) - } + if m.Result != nil { + n += m.Result.Size() + } + return n +} + +func (m *QueryRangeResponse_Warnings) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warnings) + n += 1 + l + sovQuery(uint64(l)) + return n +} +func (m *QueryRangeResponse_Timeseries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Timeseries != nil { + l = m.Timeseries.Size() + n += 1 + l + sovQuery(uint64(l)) } return n } @@ -1299,6 +1550,38 @@ func (m *QueryResponse) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warnings", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Result = &QueryResponse_Warnings{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) } @@ -1327,10 +1610,11 @@ func (m *QueryResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Timeseries = append(m.Timeseries, prompb.TimeSeries{}) - if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + v := &prompb.TimeSeries{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } + m.Result = &QueryResponse_Timeseries{v} iNdEx = postIndex default: iNdEx = preIndex @@ -1706,6 +1990,38 @@ func (m *QueryRangeResponse) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warnings", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Result = &QueryRangeResponse_Warnings{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) } @@ -1734,10 +2050,11 @@ func (m *QueryRangeResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Timeseries = append(m.Timeseries, prompb.TimeSeries{}) - if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + v := &prompb.TimeSeries{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } + m.Result = &QueryRangeResponse_Timeseries{v} iNdEx = postIndex default: iNdEx = preIndex diff --git a/pkg/api/query/querypb/query.proto b/pkg/api/query/querypb/query.proto index f2f36f66974..4420b586874 100644 --- a/pkg/api/query/querypb/query.proto +++ b/pkg/api/query/querypb/query.proto @@ -43,7 +43,13 @@ message StoreMatchers { } message QueryResponse { - repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } } message QueryRangeRequest { @@ -67,11 +73,17 @@ message QueryRangeRequest { } message QueryRangeResponse { - repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } } service Query { - rpc Query(QueryRequest) returns (QueryResponse); + rpc Query(QueryRequest) returns (stream QueryResponse); - rpc QueryRange(QueryRangeRequest) returns (QueryRangeResponse); + rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse); } diff --git a/pkg/api/query/querypb/responses.go b/pkg/api/query/querypb/responses.go new file mode 100644 index 00000000000..cd24a5ead8d --- /dev/null +++ b/pkg/api/query/querypb/responses.go @@ -0,0 +1,50 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package querypb + +import ( + "strings" + + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" +) + +func NewQueryResponse(series *prompb.TimeSeries) *QueryResponse { + return &QueryResponse{ + Result: &QueryResponse_Timeseries{ + Timeseries: series, + }, + } +} + +func NewQueryWarningsResponse(errs []error) *QueryResponse { + warnings := make([]string, len(errs)) + for _, err := range errs { + warnings = append(warnings, err.Error()) + } + return &QueryResponse{ + Result: &QueryResponse_Warnings{ + Warnings: strings.Join(warnings, ", "), + }, + } +} + +func NewQueryRangeResponse(series *prompb.TimeSeries) *QueryRangeResponse { + return &QueryRangeResponse{ + Result: &QueryRangeResponse_Timeseries{ + Timeseries: series, + }, + } +} + +func NewQueryRangeWarningsResponse(errs []error) *QueryRangeResponse { + warnings := make([]string, len(errs)) + for _, err := range errs { + warnings = append(warnings, err.Error()) + } + return &QueryRangeResponse{ + Result: &QueryRangeResponse_Warnings{ + Warnings: strings.Join(warnings, ", "), + }, + } +} diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index b31caf62405..eb20f6d54cf 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -13,6 +13,7 @@ import ( "runtime/debug" "sort" "testing" + "time" "github.com/davecgh/go-spew/spew" "github.com/pkg/errors" @@ -317,3 +318,19 @@ func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { return iw.Close() } + +func Eventually(interval, timeout time.Duration, f func() error) error { + wait := time.NewTicker(interval).C + done := time.After(timeout) + for { + select { + case <-done: + return fmt.Errorf("failed waiting for condition to become true") + case <-wait: + err := f() + if err == nil { + return nil + } + } + } +} diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index b387372811d..4efc0438262 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -6,6 +6,7 @@ package e2e_test import ( "context" "fmt" + "io" "net/http" "net/http/httptest" "net/url" @@ -18,15 +19,14 @@ import ( "testing" "time" - "github.com/thanos-io/thanos/pkg/api/query/querypb" - "google.golang.org/grpc" - "gotest.tools/poll" - "github.com/gogo/protobuf/proto" "github.com/golang/snappy" config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" + "github.com/thanos-io/thanos/pkg/api/query/querypb" + prompb_copy "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "google.golang.org/grpc" "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" @@ -1364,30 +1364,53 @@ func TestGrpcInstantQuery(t *testing.T) { } for _, query := range queries { - poll.WaitOn(t, func(t poll.LogT) poll.Result { + err := testutil.Eventually(5*time.Second, 1*time.Minute, func() error { result, err := queryClient.Query(ctx, &querypb.QueryRequest{ Query: "my_fake_metric", TimeSeconds: query.time.Unix(), }) if err != nil { - return poll.Error(err) + return err } - if len(result.Timeseries) != 1 { - return poll.Continue("got empty result from querier") + var warnings string + var series []*prompb_copy.TimeSeries + for { + msg, err := result.Recv() + if err == io.EOF { + break + } + + s := msg.GetTimeseries() + if s != nil { + series = append(series, s) + } + w := msg.GetWarnings() + if w != "" { + warnings = w + } } - if len(result.Timeseries[0].Samples) != 1 { - return poll.Continue("got empty timeseries from querier") + if warnings != "" { + return fmt.Errorf("got warnings, expected none") } - if result.Timeseries[0].Samples[0].Value != query.expectedResult { - return poll.Error(errors.New("got invalid result from querier")) + if len(series) != 1 { + return fmt.Errorf("got empty result from querier") } - return poll.Success() - }, poll.WithTimeout(1*time.Minute), poll.WithDelay(5*time.Second)) + if len(series[0].Samples) != 1 { + return fmt.Errorf("got empty timeseries from querier") + } + + if series[0].Samples[0].Value != query.expectedResult { + return fmt.Errorf("got invalid result from querier") + } + + return nil + }) + testutil.Ok(t, err) } } @@ -1447,7 +1470,7 @@ func TestGrpcQueryRange(t *testing.T) { testutil.Ok(t, err) queryClient := querypb.NewQueryClient(grpcConn) - poll.WaitOn(t, func(t poll.LogT) poll.Result { + err = testutil.Eventually(5*time.Second, 1*time.Minute, func() error { result, err := queryClient.QueryRange(ctx, &querypb.QueryRangeRequest{ Query: "my_fake_metric", StartTimeSeconds: now.Unix(), @@ -1456,17 +1479,39 @@ func TestGrpcQueryRange(t *testing.T) { }) if err != nil { - return poll.Error(err) + return err } - if len(result.Timeseries) != 1 { - return poll.Continue("got empty result from querier") + var warnings string + var series []*prompb_copy.TimeSeries + for { + msg, err := result.Recv() + if err == io.EOF { + break + } + + s := msg.GetTimeseries() + if s != nil { + series = append(series, s) + } + w := msg.GetWarnings() + if w != "" { + warnings = w + } + } + if warnings != "" { + return fmt.Errorf("got warnings, expected none") } - if len(result.Timeseries[0].Samples) != 5 { - return poll.Continue("got empty timeseries from querier") + if len(series) != 1 { + return fmt.Errorf("got empty result from querier") } - return poll.Success() - }, poll.WithTimeout(1*time.Minute), poll.WithDelay(5*time.Second)) + if len(series[0].Samples) != 5 { + return fmt.Errorf("got empty timeseries from querier") + } + + return nil + }) + testutil.Ok(t, err) }