Skip to content

Commit

Permalink
Use streaming responses
Browse files Browse the repository at this point in the history
Signed-off-by: fpetkovski <[email protected]>
  • Loading branch information
fpetkovski committed Mar 29, 2022
1 parent d2e07a3 commit 55deaba
Show file tree
Hide file tree
Showing 10 changed files with 673 additions and 211 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func runQuery(
info.WithTargetsInfoFunc(),
)

grpcAPI := apiv1.NewGrpcAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
Expand Down
46 changes: 30 additions & 16 deletions docs/proposals-done/202203-grpc-query-api.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
## Introduce a Query gRPC API

* **Owners:**
* `@fpetkovski`
* `@mzardab`
* `@fpetkovski`
* `@mzardab`

> TL;DR: Introducing a new gRPC API for `/query` and `/query_range`
## Why

We want to be able to distinguish between gRPC Store APIs and other Queriers in the query path. Currently, Thanos Query implements the gRPC Store API and the root Querier does not distinguish between Store targets and other Queriers that are capable of processing a PromQL expression before returning the result. The new gRPC Query API will allow a querier to fan out query execution, in addition to Store API selects.

This is useful for a few reasons:
This is useful for a few reasons:

* When Queriers register disjoint Store targets, they should be able to deduplicate series and then execute the query without concerns of duplicate data from other queriers. This new API would enable users to effectively partition by Querier, and avoid shipping raw series back from each disjointed Querier to the root Querier.
* If Queriers register conjoint Store targets, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers.
* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP.
* If Queriers register Store targets with overlapping series, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers.
* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP.
* When there is only one StoreAPI connected to Thanos Query which completely covers the requested range of the original user's query, then it is more optimal to execute the query directly in the store, instead of sending raw samples to the querier. This scenario is not unlikely given query-frontend's sharding capabilities.

### Pitfalls of the current solution

Thanos Query currently allows for `query` and `query_range` operations through HTTP only. Various query strategies can be implemented using the HTTP API, an analogous gRPC API would allow for a more resource efficient and expressive query execution path. The two main reasons are the streaming capabilities that come out of the box with gRPC, statically typed API spec, as well as the lower bandwidth utilization which protobuf enables.
Thanos Query currently allows for `query` and `query_range` operations through HTTP only. Various query strategies can be implemented using the HTTP API, an analogous gRPC API would allow for a more resource efficient and expressive query execution path. The two main reasons are the streaming capabilities that come out of the box with gRPC, statically typed API spec, as well as the lower bandwidth utilization which protobuf enables.

## Goals
* Introduce a gRPC Query API implementation equivalent to the current Querier HTTP API (`query` for instant queries, `query_range` for range queries)
Expand All @@ -26,7 +28,7 @@ Thanos Query currently allows for `query` and `query_range` operations through H

* Implementation of potential query sharding strategies described in this proposal.
* Streaming implementations for `query` and `query_range` rpc's, these will be introduced as additional `QueryStream` and `QueryRangeStream` rpc's subsequently.
* Response series ordering equivalent to the current Prometheus Query HTTP API behaviour
* Response series ordering equivalent to the current Prometheus Query HTTP API behaviour

### Audience
* Thanos Maintainers
Expand All @@ -35,11 +37,12 @@ Thanos Query currently allows for `query` and `query_range` operations through H
## How

We propose defining the following gRPC API:

```protobuf
service Query {
rpc Query(QueryRequest) returns (QueryResponse);
rpc QueryRange(QueryRangeRequest) returns (QueryRangeResponse);
rpc Query(QueryRequest) returns (stream QueryResponse);
rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse);
}
```

Expand All @@ -64,7 +67,13 @@ message QueryRequest {
}
message QueryResponse {
repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;
/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}
message QueryRangeRequest {
Expand All @@ -88,21 +97,26 @@ message QueryRangeRequest {
}
message QueryRangeResponse {
repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
}
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;
/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}
```

The `Query` Service will be implemented by the gRPC server which is started via the `thanos query` command.

## Alternatives

The alternative to expressing a gRPC Query API would be to use the HTTP APIs and distinguish Queriers via configuration on startup. This would be suboptimal for the following reasons:
The alternative to expressing a gRPC Query API would be to use the HTTP APIs and distinguish Queriers via configuration on startup. This would be suboptimal for the following reasons:
* No statically typed API definition, we would need to rely on HTTP API versioning to manage changes to the API that is intended to enable advanced query execution strategies.
* HTTP not as performant as gRPC/HTTP2, gRPC/HTTP2 allows us to use streaming(less connection overhead) and protobuf(smaller response sizes), the current HTTP API does not.
* Ergonomics, gRPC allows us to express a functional API with parameters, HTTP requires request parameter marshalling/unmarshalling which is very error-prone.

## Action Plan

* [x] Define the QueryServer gRPC Service
* [x] Implement the QueryServer gRPC Service in the Thanos Query
* [X] Define the QueryServer gRPC Service
* [X] Implement the QueryServer gRPC Service in the Thanos Query
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ require (
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
)

require (
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2678,7 +2678,6 @@ gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
Expand Down
83 changes: 46 additions & 37 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
"google.golang.org/grpc"
)

type GrpcAPI struct {
type GRPCAPI struct {
now func() time.Time
queryableCreate query.QueryableCreator
queryEngine func(int64) *promql.Engine
defaultMaxResolutionSeconds time.Duration
}

func NewGrpcAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GrpcAPI {
return &GrpcAPI{
func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
return &GRPCAPI{
now: now,
queryableCreate: creator,
queryEngine: queryEngine,
Expand All @@ -37,10 +37,11 @@ func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) {
}
}

func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest) (*querypb.QueryResponse, error) {
func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_QueryServer) error {
ctx := context.Background()
var ts time.Time
if request.TimeSeconds == 0 {
ts = grpcAPI.now()
ts = g.now()
} else {
ts = time.Unix(request.TimeSeconds, 0)
}
Expand All @@ -54,16 +55,16 @@ func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = grpcAPI.defaultMaxResolutionSeconds.Milliseconds() / 1000
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return nil, err
return err
}

qe := grpcAPI.queryEngine(request.MaxResolutionSeconds)
queryable := grpcAPI.queryableCreate(
qe := g.queryEngine(request.MaxResolutionSeconds)
queryable := g.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
Expand All @@ -74,36 +75,41 @@ func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
return nil, err
return err
}

result := qry.Exec(ctx)
if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings)); err != nil {
return nil
}

switch vector := result.Value.(type) {
case promql.Scalar:
return &querypb.QueryResponse{
Timeseries: []prompb.TimeSeries{{
Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}},
}},
}, nil
case promql.Vector:
response := &querypb.QueryResponse{
Timeseries: make([]prompb.TimeSeries, 0, len(vector)),
series := &prompb.TimeSeries{
Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}},
}

if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
case promql.Vector:
for _, sample := range vector {
response.Timeseries = append(response.Timeseries, prompb.TimeSeries{
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
})
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
}

return response, nil
return nil
}

return nil, nil
return nil
}

func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRangeRequest) (*querypb.QueryRangeResponse, error) {
func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error {
ctx := context.Background()
if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(request.TimeoutSeconds))
Expand All @@ -112,16 +118,16 @@ func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRa

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = grpcAPI.defaultMaxResolutionSeconds.Milliseconds() / 1000
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return nil, err
return err
}

qe := grpcAPI.queryEngine(request.MaxResolutionSeconds)
queryable := grpcAPI.queryableCreate(
qe := g.queryEngine(request.MaxResolutionSeconds)
queryable := g.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
Expand All @@ -137,25 +143,28 @@ func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRa

qry, err := qe.NewRangeQuery(queryable, request.Query, startTime, endTime, interval)
if err != nil {
return nil, err
return err
}

result := qry.Exec(ctx)
if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings)); err != nil {
return err
}

switch matrix := result.Value.(type) {
case promql.Matrix:
response := &querypb.QueryRangeResponse{
Timeseries: make([]prompb.TimeSeries, len(matrix)),
}

for i, series := range matrix {
response.Timeseries[i] = prompb.TimeSeries{
for _, series := range matrix {
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
}
if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil {
return err
}
}

return response, nil
return nil
}

return nil, nil
return nil
}
Loading

0 comments on commit 55deaba

Please sign in to comment.