Skip to content

Commit

Permalink
Implement Query API discovery
Browse files Browse the repository at this point in the history
A recent commit (thanos-io#5250) added a GRPC API to Thanos Query which allows
executing PromQL over GRPC. This API is currently not discoverable
through endpointsets which makes it hard for other Thanos components
to use this it.

This commit extends endpointsets with a GetQueryAPIClients method
which returns Query API clients to all components which support
this API.
  • Loading branch information
fpetkovski committed Apr 21, 2022
1 parent 4b3f555 commit f9eae8b
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 44 deletions.
3 changes: 2 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,9 +675,10 @@ func runQuery(
info.WithRulesInfoFunc(),
info.WithMetricMetadataInfoFunc(),
info.WithTargetsInfoFunc(),
info.WithQueryInfoFunc(),
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
grpcAPI := apiv1.NewGRPCAPI(logger, time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
Expand Down
16 changes: 15 additions & 1 deletion pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"context"
"time"

"github.com/go-kit/log"
"github.com/thanos-io/thanos/pkg/component"

"github.com/thanos-io/thanos/pkg/info/infopb"

"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/query"
Expand All @@ -16,14 +21,23 @@ import (
)

type GRPCAPI struct {
logger log.Logger
now func() time.Time
queryableCreate query.QueryableCreator
queryEngine func(int64) *promql.Engine
defaultMaxResolutionSeconds time.Duration
}

func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
func (g *GRPCAPI) Info(ctx context.Context, request *infopb.InfoRequest) (*infopb.InfoResponse, error) {
return &infopb.InfoResponse{
ComponentType: component.QueryAPI.String(),
Query: &infopb.QueryInfo{},
}, nil
}

func NewGRPCAPI(logger log.Logger, now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
return &GRPCAPI{
logger: logger,
now: now,
queryableCreate: creator,
queryEngine: queryEngine,
Expand Down
3 changes: 3 additions & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func FromString(storeType string) StoreAPI {
return Receive
case "debug":
return Debug
case "queryAPI":
return QueryAPI
default:
return UnknownStoreAPI
}
Expand All @@ -124,4 +126,5 @@ var (
Store = storeAPI{component: component{name: "store"}}
UnknownStoreAPI = storeAPI{component: component{name: "unknown-store-api"}}
Query = storeAPI{component: component{name: "query"}}
QueryAPI = storeAPI{component: component{name: "queryAPI"}}
)
18 changes: 18 additions & 0 deletions pkg/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type InfoServer struct {
getRulesInfo func() *infopb.RulesInfo
getTargetsInfo func() *infopb.TargetsInfo
getMetricMetadataInfo func() *infopb.MetricMetadataInfo
getQueryInfo func() *infopb.QueryInfo
}

// NewInfoServer creates a new server instance for given component
Expand All @@ -42,6 +43,7 @@ func NewInfoServer(
getRulesInfo: func() *infopb.RulesInfo { return nil },
getTargetsInfo: func() *infopb.TargetsInfo { return nil },
getMetricMetadataInfo: func() *infopb.MetricMetadataInfo { return nil },
getQueryInfo: func() *infopb.QueryInfo { return nil },
}

for _, o := range options {
Expand Down Expand Up @@ -144,6 +146,21 @@ func WithMetricMetadataInfoFunc(getMetricMetadataInfo ...func() *infopb.MetricMe
}
}

// WithQueryInfoFunc determines the function that should be executed to obtain
// the query information. If no function is provided, the default empty
// query info is returned. Only the first function from the list is considered.
func WithQueryInfoFunc(queryInfo ...func() *infopb.QueryInfo) ServerOptionFunc {
if len(queryInfo) == 0 {
return func(s *InfoServer) {
s.getQueryInfo = func() *infopb.QueryInfo { return &infopb.QueryInfo{} }
}
}

return func(s *InfoServer) {
s.getQueryInfo = queryInfo[0]
}
}

// RegisterInfoServer registers the info server.
func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) {
return func(s *grpc.Server) {
Expand All @@ -161,5 +178,6 @@ func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*info
Rules: srv.getRulesInfo(),
Targets: srv.getTargetsInfo(),
MetricMetadata: srv.getMetricMetadataInfo(),
Query: srv.getQueryInfo(),
}, nil
}
Loading

0 comments on commit f9eae8b

Please sign in to comment.