Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Querier: Split gRPC client into two. #12726

Merged
merged 18 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3557,10 +3557,23 @@ The `frontend_worker` configures the worker - running within the Loki querier -
# CLI flag: -querier.id
[id: <string> | default = ""]

# The grpc_client block configures the gRPC client used to communicate between a
# client and server component in Loki.
# Configures the gRPC client used to communicate between the querier and the
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
# query-frontend. If 'use_separated_clients' is false, this config is used for
# communicating with both frontend and scheduler.
# The CLI flags prefix for this block configuration is: querier.frontend-client
[grpc_client_config: <grpc_client>]

# Configures the gRPC client used to communicate between the querier and the
# query-scheduler. If 'use_separated_clients' is false, this config is ignored.
# The CLI flags prefix for this block configuration is: querier.scheduler-client
[query_scheduler_grpc_client_config: <grpc_client>]

# If set to true, querier will use 'query_scheduler_grpc_client_config' to
# communicate with the scheduler. Otherwise,
# 'query_scheduler_grpc_client_config' is ignored and 'grpc_client_config' is
# used instead.
# CLI flag: -querier.use-separated-grpc-clients
[uses_separated_clients: <boolean> | default = false]
```

### table_manager
Expand Down Expand Up @@ -4543,6 +4556,7 @@ The `grpc_client` block configures the gRPC client used to communicate between a
- `ingester.client`
- `pattern-ingester.client`
- `querier.frontend-client`
- `querier.scheduler-client`
- `query-scheduler.grpc-client-config`
- `ruler.client`
- `tsdb.shipper.index-gateway-client.grpc`
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger, co
log: log,
handler: handler,
codec: codec,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
maxMessageSize: cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, m
log: log,
handler: handler,
codec: codec,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
maxMessageSize: cfg.QuerySchedulerGRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
grpcConfig: cfg.GRPCClientConfig,
grpcConfig: cfg.QuerySchedulerGRPCClientConfig,
schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
return schedulerpb.NewSchedulerForQuerierClient(conn)
},
Expand Down
58 changes: 41 additions & 17 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,33 @@ type Config struct {

QuerierID string `yaml:"id"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
QueryFrontendGRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-frontend. If 'use_separated_clients' is false, this config is used for communicating with both frontend and scheduler."`

QuerySchedulerGRPCClientConfig grpcclient.Config `yaml:"query_scheduler_grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-scheduler. If 'use_separated_clients' is false, this config is ignored."`

UseSeparatedGRPCClients bool `yaml:"uses_separated_clients" doc:"description=If set to true, querier will use 'query_scheduler_grpc_client_config' to communicate with the scheduler. Otherwise, 'query_scheduler_grpc_client_config' is ignored and 'grpc_client_config' is used instead."`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")
f.BoolVar(&cfg.UseSeparatedGRPCClients, "querier.use-separated-grpc-clients", false, "Whether to use separated clients for frontend and scheduler. If set to true, querier will use separated clients for frontend and scheduler. If set to false, querier will use the same client for both frontend and scheduler.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
cfg.QueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-client", f)
}

func (cfg *Config) Validate() error {
if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" {
return errors.New("frontend address and scheduler address are mutually exclusive, please use only one")
}
return cfg.GRPCClientConfig.Validate()
if err := cfg.QueryFrontendGRPCClientConfig.Validate(); err != nil {
return err
}

return cfg.QuerySchedulerGRPCClientConfig.Validate()
}

// Handler for HTTP requests wrapped in protobuf messages.
Expand Down Expand Up @@ -80,7 +90,6 @@ type processor interface {
type querierWorker struct {
*services.BasicService

cfg Config
logger log.Logger

processor processor
Expand All @@ -92,9 +101,19 @@ type querierWorker struct {
managers map[string]*processorManager

metrics *Metrics

grpcClientConfig grpcclient.Config
maxConcurrentRequests int
}

func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, logger log.Logger, reg prometheus.Registerer, codec RequestCodec) (services.Service, error) {
if !cfg.UseSeparatedGRPCClients {
level.Warn(logger).Log("msg", "Using the same client for frontend and scheduler. This is deprecated and will be removed in the future. Please use separated clients for frontend and scheduler.")
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
// The frontend client config is the older config and it points to "grpc_client_config" so we
// reuse it for the scheduler client config.
cfg.QuerySchedulerGRPCClientConfig = cfg.QueryFrontendGRPCClientConfig
}

if cfg.QuerierID == "" {
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -105,43 +124,48 @@ func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, log

metrics := NewMetrics(cfg, reg)
var processor processor
var grpcCfg grpcclient.Config
var servs []services.Service
var address string

switch {
case rng != nil:
level.Info(logger).Log("msg", "Starting querier worker using query-scheduler and scheduler ring for addresses")
grpcCfg = cfg.QuerySchedulerGRPCClientConfig
processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec)
case cfg.SchedulerAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)

grpcCfg = cfg.QuerySchedulerGRPCClientConfig
address = cfg.SchedulerAddress
processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec)

case cfg.FrontendAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)

address = cfg.FrontendAddress
grpcCfg = cfg.QueryFrontendGRPCClientConfig
processor = newFrontendProcessor(cfg, handler, logger, codec)
default:
return nil, errors.New("unable to start the querier worker, need to configure one of frontend_address, scheduler_address, or a ring config in the query_scheduler config block")
}

return newQuerierWorkerWithProcessor(cfg, metrics, logger, processor, address, rng, servs)
return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrent, cfg.DNSLookupPeriod, metrics, logger, processor, address, rng, servs)
}

func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) {
func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, dnsLookupPeriod time.Duration, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) {
f := &querierWorker{
cfg: cfg,
logger: logger,
managers: map[string]*processorManager{},
processor: processor,
metrics: metrics,
maxConcurrentRequests: maxConcReq,
grpcClientConfig: grpcCfg,
logger: logger,
managers: map[string]*processorManager{},
processor: processor,
metrics: metrics,
}

// Empty address is only used in tests, where individual targets are added manually.
if address != "" {
w, err := util.NewDNSWatcher(address, cfg.DNSLookupPeriod, f)
w, err := util.NewDNSWatcher(address, dnsLookupPeriod, f)
if err != nil {
return nil, err
}
Expand All @@ -150,7 +174,7 @@ func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logg
}

if ring != nil {
w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f)
w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, dnsLookupPeriod, f)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -245,17 +269,17 @@ func (w *querierWorker) resetConcurrency() {
}()

for _, m := range w.managers {
concurrency := w.cfg.MaxConcurrent / len(w.managers)
concurrency := w.maxConcurrentRequests / len(w.managers)

// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrent%len(w.managers) {
if index < w.maxConcurrentRequests%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}

// If concurrency is 0 then MaxConcurrentRequests is less than the total number of
// If concurrency is 0 then maxConcurrentRequests is less than the total number of
// frontends/schedulers. In order to prevent accidentally starving a frontend or scheduler we are just going to
// always connect once to every target. This is dangerous b/c we may start exceeding LogQL
// max concurrency.
Expand All @@ -271,7 +295,7 @@ func (w *querierWorker) resetConcurrency() {

func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := w.cfg.GRPCClientConfig.DialOption(nil, nil)
opts, err := w.grpcClientConfig.DialOption(nil, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestResetConcurrency(t *testing.T) {
MaxConcurrent: tt.maxConcurrent,
}

w, err := newQuerierWorkerWithProcessor(cfg, NewMetrics(cfg, nil), log.NewNopLogger(), &mockProcessor{}, "", nil, nil)
w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrent, cfg.DNSLookupPeriod, NewMetrics(cfg, nil), log.NewNopLogger(), &mockProcessor{}, "", nil, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), w))

Expand Down
Loading