Skip to content

Commit

Permalink
Implement GRPC query API
Browse files Browse the repository at this point in the history
With the current GRPC APIs, layering Thanos Queriers results in
the root querier getting all of the samples and executing the query
in memory. As a result, the intermediary Queriers do not do any
intensive work and merely transport samples from the Stores to the
root Querier.

When data is perfectly sharded, users can implement a pattern where
the root Querier instructs the intermediary ones to execute the queries
from their stores and return back results. The results can then be
concatenated by the root querier and returned to the user.

In order to support this use case, this commit implements a GRPC API
in the Querier which is analogous to the HTTP Query API exposed
by Prometheus.

Signed-off-by: fpetkovski <[email protected]>
  • Loading branch information
fpetkovski committed Mar 29, 2022
1 parent 2b87a36 commit f1887a6
Show file tree
Hide file tree
Showing 13 changed files with 2,878 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier.
- [#5250](https://github.com/thanos-io/thanos/pull/5250/files) Querier: Expose Query and QueryRange APIs through GRPC.

### Changed

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ else ifeq ($(arch), armv8)
else ifeq ($(arch), arm64)
# arm64
BASE_DOCKER_SHA=${arm64}
else ifeq ($(arch), aarch64)
# arm64
BASE_DOCKER_SHA=${arm64}
else
echo >&2 "only support amd64 or arm64 arch" && exit 1
endif
Expand Down
9 changes: 6 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

v1 "github.com/thanos-io/thanos/pkg/api/query"
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -574,6 +574,7 @@ func runQuery(
grpcProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)
engineCreator := engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta)

// Start query API + UI HTTP server.
{
Expand All @@ -600,10 +601,10 @@ func runQuery(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL).Register(router, ins)

api := v1.NewQueryAPI(
api := apiv1.NewQueryAPI(
logger,
endpoints.GetEndpointStatus,
engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta),
engineCreator,
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
Expand Down Expand Up @@ -676,7 +677,9 @@ func runQuery(
info.WithTargetsInfoFunc(),
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
Expand Down
122 changes: 122 additions & 0 deletions docs/proposals-done/202203-grpc-query-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
## Introduce a Query gRPC API

* **Owners:**
* `@fpetkovski`
* `@mzardab`

> TL;DR: Introducing a new gRPC API for `/query` and `/query_range`
## Why

We want to be able to distinguish between gRPC Store APIs and other Queriers in the query path. Currently, Thanos Query implements the gRPC Store API and the root Querier does not distinguish between Store targets and other Queriers that are capable of processing a PromQL expression before returning the result. The new gRPC Query API will allow a querier to fan out query execution, in addition to Store API selects.

This is useful for a few reasons:

* When Queriers register disjoint Store targets, they should be able to deduplicate series and then execute the query without concerns of duplicate data from other queriers. This new API would enable users to effectively partition by Querier, and avoid shipping raw series back from each disjointed Querier to the root Querier.
* If Queriers register Store targets with overlapping series, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers.
* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP.
* When there is only one StoreAPI connected to Thanos Query which completely covers the requested range of the original user's query, then it is more optimal to execute the query directly in the store, instead of sending raw samples to the querier. This scenario is not unlikely given query-frontend's sharding capabilities.

### Pitfalls of the current solution

Thanos Query currently allows for `query` and `query_range` operations through HTTP only. Various query strategies can be implemented using the HTTP API, an analogous gRPC API would allow for a more resource efficient and expressive query execution path. The two main reasons are the streaming capabilities that come out of the box with gRPC, statically typed API spec, as well as the lower bandwidth utilization which protobuf enables.

## Goals
* Introduce a gRPC Query API implementation equivalent to the current Querier HTTP API (`query` for instant queries, `query_range` for range queries)

## Non-Goals

* Implementation of potential query sharding strategies described in this proposal.
* Streaming implementations for `query` and `query_range` rpc's, these will be introduced as additional `QueryStream` and `QueryRangeStream` rpc's subsequently.
* Response series ordering equivalent to the current Prometheus Query HTTP API behaviour

### Audience
* Thanos Maintainers
* Thanos Users

## How

We propose defining the following gRPC API:

```protobuf
service Query {
rpc Query(QueryRequest) returns (stream QueryResponse);
rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse);
}
```

Where the `QueryRequest`, `QueryResponse`, `QueryRangeRequest` and `Query RangeResponse` are defined as follows:

```protobuf
message QueryRequest {
string query = 1;
int64 time_seconds = 2;
int64 timeout_seconds = 3;
int64 max_resolution_seconds = 4;
repeated string replica_labels = 5;
repeated StoreMatchers storeMatchers = 6 [(gogoproto.nullable) = false];
bool enableDedup = 7;
bool enablePartialResponse = 8;
bool enableQueryPushdown = 9;
bool skipChunks = 10;
}
message QueryResponse {
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;
/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}
message QueryRangeRequest {
string query = 1;
int64 start_time_seconds = 2;
int64 end_time_seconds = 3;
int64 interval_seconds = 4;
int64 timeout_seconds = 5;
int64 max_resolution_seconds = 6;
repeated string replica_labels = 7;
repeated StoreMatchers storeMatchers = 8 [(gogoproto.nullable) = false];
bool enableDedup = 9;
bool enablePartialResponse = 10;
bool enableQueryPushdown = 11;
bool skipChunks = 12;
}
message QueryRangeResponse {
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;
/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}
```

The `Query` Service will be implemented by the gRPC server which is started via the `thanos query` command.

## Alternatives

The alternative to expressing a gRPC Query API would be to use the HTTP APIs and distinguish Queriers via configuration on startup. This would be suboptimal for the following reasons:
* No statically typed API definition, we would need to rely on HTTP API versioning to manage changes to the API that is intended to enable advanced query execution strategies.
* HTTP not as performant as gRPC/HTTP2, gRPC/HTTP2 allows us to use streaming(less connection overhead) and protobuf(smaller response sizes), the current HTTP API does not.
* Ergonomics, gRPC allows us to express a functional API with parameters, HTTP requires request parameter marshalling/unmarshalling which is very error-prone.

## Action Plan

* [X] Define the QueryServer gRPC Service
* [X] Implement the QueryServer gRPC Service in the Thanos Query
170 changes: 170 additions & 0 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package v1

import (
"context"
"time"

"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"google.golang.org/grpc"
)

type GRPCAPI struct {
now func() time.Time
queryableCreate query.QueryableCreator
queryEngine func(int64) *promql.Engine
defaultMaxResolutionSeconds time.Duration
}

func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
return &GRPCAPI{
now: now,
queryableCreate: creator,
queryEngine: queryEngine,
defaultMaxResolutionSeconds: defaultMaxResolutionSeconds,
}
}

func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) {
return func(s *grpc.Server) {
querypb.RegisterQueryServer(s, queryServer)
}
}

func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_QueryServer) error {
ctx := context.Background()
var ts time.Time
if request.TimeSeconds == 0 {
ts = g.now()
} else {
ts = time.Unix(request.TimeSeconds, 0)
}

if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
timeout := time.Duration(request.TimeoutSeconds) * time.Second
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return err
}

qe := g.queryEngine(request.MaxResolutionSeconds)
queryable := g.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
return err
}

result := qry.Exec(ctx)
if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings)); err != nil {
return nil
}

switch vector := result.Value.(type) {
case promql.Scalar:
series := &prompb.TimeSeries{
Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}},
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
case promql.Vector:
for _, sample := range vector {
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
}

return nil
}

return nil
}

func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error {
ctx := context.Background()
if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(request.TimeoutSeconds))
defer cancel()
}

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return err
}

qe := g.queryEngine(request.MaxResolutionSeconds)
queryable := g.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
)

startTime := time.Unix(request.StartTimeSeconds, 0)
endTime := time.Unix(request.EndTimeSeconds, 0)
interval := time.Duration(request.IntervalSeconds) * time.Second

qry, err := qe.NewRangeQuery(queryable, request.Query, startTime, endTime, interval)
if err != nil {
return err
}

result := qry.Exec(ctx)
if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings)); err != nil {
return err
}

switch matrix := result.Value.(type) {
case promql.Matrix:
for _, series := range matrix {
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
}
if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil {
return err
}
}

return nil
}

return nil
}
Loading

0 comments on commit f1887a6

Please sign in to comment.