Skip to content

Commit

Permalink
feat: Add baseline rf1 querier implementation (#13639)
Browse files Browse the repository at this point in the history
Co-authored-by: Cyril Tovena <[email protected]>
  • Loading branch information
benclive and cyriltovena authored Jul 24, 2024
1 parent 7ed63ea commit 3a99b69
Show file tree
Hide file tree
Showing 8 changed files with 1,496 additions and 7 deletions.
25 changes: 25 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# querier.
[querier: <querier>]

querier_rf1:
# Enable the RF1 querier. If set, replaces the usual querier with a RF-1
# querier when using 'ALL' target.
# CLI flag: -querier-rf1.enabled
[enabled: <boolean> | default = false]

# Time to wait before sending more than the minimum successful query requests.
# CLI flag: -querier-rf1.extra-query-delay
[extra_query_delay: <duration> | default = 0s]

engine:
# The maximum amount of time to look back for log lines. Used only for
# instant log queries.
# CLI flag: -querier-rf1.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]

# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier-rf1.max-concurrent
[max_concurrent: <int> | default = 4]

# When true, querier limits sent via a header are enforced.
# CLI flag: -querier-rf1.per-request-limits-enabled
[per_request_limits_enabled: <boolean> | default = false]

# The query_scheduler block configures the Loki query scheduler. When configured
# it separates the tenant query queues from the query-frontend.
[query_scheduler: <query_scheduler>]
Expand Down
3 changes: 3 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/grafana/loki/v3/pkg/lokifrontend/frontend/transport"
"github.com/grafana/loki/v3/pkg/pattern"
"github.com/grafana/loki/v3/pkg/querier"
querierrf1 "github.com/grafana/loki/v3/pkg/querier-rf1"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/querier/worker"
Expand Down Expand Up @@ -86,6 +87,7 @@ type Config struct {
InternalServer internalserver.Config `yaml:"internal_server,omitempty" doc:"hidden"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
QuerierRF1 querierrf1.Config `yaml:"querier_rf1,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
Expand Down Expand Up @@ -164,6 +166,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Common.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
c.Querier.RegisterFlags(f)
c.QuerierRF1.RegisterFlags(f)
c.CompactorHTTPClient.RegisterFlags(f)
c.CompactorGRPCClient.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
Expand Down
23 changes: 16 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/grafana/loki/v3/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/v3/pkg/pattern"
"github.com/grafana/loki/v3/pkg/querier"
querierrf1 "github.com/grafana/loki/v3/pkg/querier-rf1"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/ruler"
Expand Down Expand Up @@ -412,21 +413,29 @@ func (t *Loki) initQuerier() (services.Service, error) {
return nil, err
}

q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, prometheus.DefaultRegisterer, logger)
if err != nil {
return nil, err
if t.Cfg.QuerierRF1.Enabled {
logger.Log("Using RF-1 querier implementation")
t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, logger)
if err != nil {
return nil, err
}
} else {
t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, prometheus.DefaultRegisterer, logger)
if err != nil {
return nil, err
}
}

if t.Cfg.Pattern.Enabled {
patternQuerier, err := pattern.NewIngesterQuerier(t.Cfg.Pattern, t.PatternRingClient, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return nil, err
}
q.WithPatternQuerier(patternQuerier)
t.Querier.WithPatternQuerier(patternQuerier)
}

if t.Cfg.Querier.MultiTenantQueriesEnabled {
t.Querier = querier.NewMultiTenantQuerier(q, util_log.Logger)
} else {
t.Querier = q
t.Querier = querier.NewMultiTenantQuerier(t.Querier, util_log.Logger)
}

querierWorkerServiceConfig := querier.WorkerServiceConfig{
Expand Down
149 changes: 149 additions & 0 deletions pkg/querier-rf1/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package querierrf1

import (
"context"
"fmt"
"net/http"

"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
)

type Handler struct {
api *QuerierAPI
}

func NewQuerierHandler(api *QuerierAPI) *Handler {
return &Handler{
api: api,
}
}

func (h *Handler) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {

switch concrete := req.(type) {
case *queryrange.LokiRequest:
res, err := h.api.RangeQueryHandler(ctx, concrete)
if err != nil {
return nil, err
}

params, err := queryrange.ParamsFromRequest(req)
if err != nil {
return nil, err
}

return queryrange.ResultToResponse(res, params)
case *queryrange.LokiInstantRequest:
res, err := h.api.InstantQueryHandler(ctx, concrete)
if err != nil {
return nil, err
}

params, err := queryrange.ParamsFromRequest(req)
if err != nil {
return nil, err
}

return queryrange.ResultToResponse(res, params)
case *queryrange.LokiSeriesRequest:
request := &logproto.SeriesRequest{
Start: concrete.StartTs,
End: concrete.EndTs,
Groups: concrete.Match,
Shards: concrete.Shards,
}
result, statResult, err := h.api.SeriesHandler(ctx, request)
if err != nil {
return nil, err
}

return &queryrange.LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: result.Series,
Statistics: statResult,
}, nil
case *queryrange.LabelRequest:
res, err := h.api.LabelHandler(ctx, &concrete.LabelRequest)
if err != nil {
return nil, err
}

return &queryrange.LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: res.Values,
}, nil
case *logproto.IndexStatsRequest:
request := loghttp.NewRangeQueryWithDefaults()
request.Start = concrete.From.Time()
request.End = concrete.Through.Time()
request.Query = concrete.GetQuery()
request.UpdateStep()

result, err := h.api.IndexStatsHandler(ctx, request)
if err != nil {
return nil, err
}
return &queryrange.IndexStatsResponse{Response: result}, nil
case *logproto.ShardsRequest:
request := loghttp.NewRangeQueryWithDefaults()
request.Start = concrete.From.Time()
request.End = concrete.Through.Time()
request.Query = concrete.GetQuery()
request.UpdateStep()
result, err := h.api.IndexShardsHandler(ctx, request, concrete.TargetBytesPerShard)
if err != nil {
return nil, err
}
return &queryrange.ShardsResponse{Response: result}, nil

case *logproto.VolumeRequest:
result, err := h.api.VolumeHandler(ctx, concrete)
if err != nil {
return nil, err
}
return &queryrange.VolumeResponse{Response: result}, nil
case *queryrange.DetectedFieldsRequest:
result, err := h.api.DetectedFieldsHandler(ctx, &concrete.DetectedFieldsRequest)
if err != nil {
return nil, err
}

return &queryrange.DetectedFieldsResponse{
Response: result,
}, nil
case *logproto.QueryPatternsRequest:
result, err := h.api.PatternsHandler(ctx, concrete)
if err != nil {
return nil, err
}
return &queryrange.QueryPatternsResponse{
Response: result,
}, nil
case *queryrange.DetectedLabelsRequest:
result, err := h.api.DetectedLabelsHandler(ctx, &concrete.DetectedLabelsRequest)
if err != nil {
return nil, err
}

return &queryrange.DetectedLabelsResponse{Response: result}, nil
case *logproto.QuerySamplesRequest:
result, err := h.api.SamplesHandler(ctx, concrete)
if err != nil {
return nil, err
}
return &queryrange.QuerySamplesResponse{
Response: result,
}, nil
default:
return nil, fmt.Errorf("unsupported query type %T", req)
}
}

func NewQuerierHTTPHandler(h *Handler) http.Handler {
return queryrange.NewSerializeHTTPHandler(h, queryrange.DefaultCodec)
}
Loading

0 comments on commit 3a99b69

Please sign in to comment.