Skip to content

Commit

Permalink
query-scheduler: fix query distribution in SSD mode (#9471)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
When we run the `query-scheduler` in `ring` mode, `queriers` and
`query-frontend` discover the available `query-scheduler` instances
using the ring. However, we have a problem when `query-schedulers` are
not running in the same process as queriers and query-frontend since [we
try to get the ring client interface from the scheduler
instance](https://github.com/grafana/loki/blob/abd6131bba18db7f3575241c5e6dc4eed879fbc0/pkg/loki/modules.go#L358).

This causes queries not to be spread across all the available queriers
when running in SSD mode because [we point querier workers to query
frontend when there is no ring client and scheduler address
configured](https://github.com/grafana/loki/blob/b05f4fced305800b32641ae84e3bed5f1794fa7d/pkg/querier/worker_service.go#L115).

I have fixed this issue by adding a new hidden target to initialize the
ring client in `reader`/`member` mode based on which service is
initializing it. `reader` mode will be used by `queriers` and
`query-frontend` for discovering `query-scheduler` instances from the
ring. `member` mode will be used by `query-schedulers` for registering
themselves in the ring.

I have also made a couple of changes not directly related to the issue
but it fixes some problems:
* [reset metric registry for each integration
test](18c4fe5)
- Previously we were reusing the same registry for all the tests and
just [ignored the attempts to register same
metrics](https://github.com/grafana/loki/blob/01f0ded7fcb57e3a7b26ffc1e8e3abf04a403825/integration/cluster/cluster.go#L113).
This causes the registry to have metrics registered only from the first
test so any updates from subsequent tests won't reflect in the metrics.
metrics was the only reliable way for me to verify that
`query-schedulers` were connected to `queriers` and `query-frontend`
when running in ring mode in the integration test that I added to test
my changes. This should also help with other tests where earlier it was
hard to reliably check the metrics.
* [load config from cli as well before applying dynamic
config](f9e2448)
- Previously we were applying dynamic config considering just the config
from config file. This results in unexpected config changes, for
example, [this config
change](https://github.com/grafana/loki/blob/4148dd2c51cb827ec3889298508b95ec7731e7fd/integration/loki_micro_services_test.go#L66)
was getting ignored and [dynamic config tuning was unexpectedly turning
on ring
mode](https://github.com/grafana/loki/blob/52cd0a39b8266564352c61ab9b845ab597008770/pkg/loki/config_wrapper.go#L94)
in the config. It is better to do any config tuning based on both file
and cli args configs.

**Which issue(s) this PR fixes**:
Fixes #9195
  • Loading branch information
sandeepsukhani authored Jun 6, 2023
1 parent 90ed037 commit 0a5e149
Show file tree
Hide file tree
Showing 12 changed files with 499 additions and 183 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [9252](https://github.com/grafana/loki/pull/9252) **jeschkies**: Use un-escaped regex literal for string matching.
* [9176](https://github.com/grafana/loki/pull/9176) **DylanGuedes**: Fix incorrect association of per-stream rate limit when sharding is enabled.
* [9463](https://github.com/grafana/loki/pull/9463) **Totalus**: Fix OpenStack Swift client object listing to fetch all the objects properly.
* [9471](https://github.com/grafana/loki/pull/9471) **sandeepsukhani**: query-scheduler: fix query distribution in SSD mode.
* [9495](https://github.com/grafana/loki/pull/9495) **thampiotr**: Promtail: Fix potential goroutine leak in file tailer.
* [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams.

Expand Down
16 changes: 7 additions & 9 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
)

var (
wrapRegistryOnce sync.Once

configTemplate = template.Must(template.New("").Parse(`
auth_enabled: true
Expand Down Expand Up @@ -108,18 +106,18 @@ ruler:
`))
)

func wrapRegistry() {
wrapRegistryOnce.Do(func() {
prometheus.DefaultRegisterer = &wrappedRegisterer{Registerer: prometheus.DefaultRegisterer}
})
func resetMetricRegistry() {
registry := &wrappedRegisterer{Registry: prometheus.NewRegistry()}
prometheus.DefaultRegisterer = registry
prometheus.DefaultGatherer = registry
}

type wrappedRegisterer struct {
prometheus.Registerer
*prometheus.Registry
}

func (w *wrappedRegisterer) Register(collector prometheus.Collector) error {
if err := w.Registerer.Register(collector); err != nil {
if err := w.Registry.Register(collector); err != nil {
var aErr prometheus.AlreadyRegisteredError
if errors.As(err, &aErr) {
return nil
Expand Down Expand Up @@ -151,7 +149,7 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
util_log.Logger = level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.Allow(logLevel))
}

wrapRegistry()
resetMetricRegistry()
sharedPath, err := os.MkdirTemp("", "loki-shared-data")
if err != nil {
panic(err.Error())
Expand Down
7 changes: 6 additions & 1 deletion integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,13 @@ func checkUserLabelAndMetricValue(t *testing.T, metricName, metrics, tenantID st
}

func checkMetricValue(t *testing.T, metricName, metrics string, expectedValue float64) {
t.Helper()
require.Equal(t, expectedValue, getMetricValue(t, metricName, metrics))
}

func getMetricValue(t *testing.T, metricName, metrics string) float64 {
t.Helper()
val, _, err := extractMetric(metricName, metrics)
require.NoError(t, err)
require.Equal(t, expectedValue, val)
return val
}
118 changes: 118 additions & 0 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,121 @@ func TestMicroServicesMultipleBucketSingleProvider(t *testing.T) {
})
}
}

func TestSchedulerRing(t *testing.T) {
clu := cluster.New(nil)
defer func() {
assert.NoError(t, clu.Cleanup())
}()

// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-boltdb.shipper.compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())

// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-query-scheduler.use-scheduler-ring=true",
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-query-scheduler.use-scheduler-ring=true",
"-frontend.scheduler-worker-concurrency=5",
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-query-scheduler.use-scheduler-ring=true",
"-querier.max-concurrent=4",
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
cliQueryScheduler := client.New(tenantID, "", tQueryScheduler.HTTPURL())
cliQueryScheduler.Now = now

t.Run("verify-scheduler-connections", func(t *testing.T) {
require.Eventually(t, func() bool {
// Check metrics to see if query scheduler is connected with query-frontend
metrics, err := cliQueryScheduler.Metrics()
require.NoError(t, err)
return getMetricValue(t, "cortex_query_scheduler_connected_frontend_clients", metrics) == 5
}, 5*time.Second, 500*time.Millisecond)

require.Eventually(t, func() bool {
// Check metrics to see if query scheduler is connected with query-frontend
metrics, err := cliQueryScheduler.Metrics()
require.NoError(t, err)
return getMetricValue(t, "cortex_query_scheduler_connected_querier_clients", metrics) == 4
}, 5*time.Second, 500*time.Millisecond)
})

t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))

require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})

t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
}
1 change: 1 addition & 0 deletions integration/loki_rule_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func testRuleEval(t *testing.T, mode string) {
// and we have a circular dependency with the backend
"-common.compactor-address=http://fake",
"-legacy-read-mode=false",
"-query-scheduler.use-scheduler-ring=false",
)

require.NoError(t, clu.Run())
Expand Down
65 changes: 34 additions & 31 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,33 +342,34 @@ type Loki struct {
deps map[string][]string
SignalHandler *signals.Handler

Server *server.Server
InternalServer *server.Server
ring *ring.Ring
Overrides limiter.CombinedLimits
tenantConfigs *runtime.TenantConfigs
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
Ingester ingester.Interface
Querier querier.Querier
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
ruleEvaluator ruler.Evaluator
RulerStorage rulestore.RuleStore
rulerAPI *base_ruler.API
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
MemberlistKV *memberlist.KVInitService
compactor *compactor.Compactor
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler
usageReport *analytics.Reporter
indexGatewayRingManager *indexgateway.RingManager
Server *server.Server
InternalServer *server.Server
ring *ring.Ring
Overrides limiter.CombinedLimits
tenantConfigs *runtime.TenantConfigs
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
Ingester ingester.Interface
Querier querier.Querier
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
ruleEvaluator ruler.Evaluator
RulerStorage rulestore.RuleStore
rulerAPI *base_ruler.API
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
MemberlistKV *memberlist.KVInitService
compactor *compactor.Compactor
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler
querySchedulerRingManager *scheduler.RingManager
usageReport *analytics.Reporter
indexGatewayRingManager *indexgateway.RingManager

clientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
Expand Down Expand Up @@ -634,8 +635,9 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule)
mm.RegisterModule(Analytics, t.initAnalytics)
mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader)

Expand All @@ -654,16 +656,17 @@ func (t *Loki) setupModuleManager() error {
Distributor: {Ring, Server, Overrides, TenantConfigs, Analytics},
Store: {Overrides, IndexGatewayRing, IngesterQuerier},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics},
Querier: {Store, Ring, Server, Overrides, Analytics, CacheGenerationLoader},
Querier: {Store, Ring, Server, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader},
QueryScheduler: {Server, Overrides, MemberlistKV, Analytics},
QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader, QuerySchedulerRing},
QueryScheduler: {Server, Overrides, MemberlistKV, Analytics, QuerySchedulerRing},
Ruler: {Ring, Server, RulerStorage, RuleEvaluator, Overrides, TenantConfigs, Analytics},
RuleEvaluator: {Ring, Server, Store, Overrides, TenantConfigs, Analytics},
TableManager: {Server, Analytics},
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing},
IngesterQuerier: {Ring},
QuerySchedulerRing: {RuntimeConfig, Server, MemberlistKV},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryFrontend, Querier},
Expand Down
42 changes: 32 additions & 10 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
IndexGateway string = "index-gateway"
IndexGatewayRing string = "index-gateway-ring"
QueryScheduler string = "query-scheduler"
QuerySchedulerRing string = "query-scheduler-ring"
All string = "all"
Read string = "read"
Write string = "write"
Expand Down Expand Up @@ -361,7 +362,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
SchedulerRing: scheduler.SafeReadRing(t.queryScheduler),
SchedulerRing: scheduler.SafeReadRing(t.querySchedulerRingManager),
}

toMerge := []middleware.Interface{
Expand Down Expand Up @@ -781,7 +782,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
}
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(
combinedCfg,
scheduler.SafeReadRing(t.queryScheduler),
scheduler.SafeReadRing(t.querySchedulerRingManager),
disabledShuffleShardingLimits{},
t.Cfg.Server.GRPCListenPort,
util_log.Logger,
Expand Down Expand Up @@ -1227,24 +1228,45 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
}

func (t *Loki) initQueryScheduler() (services.Service, error) {
// Set some config sections from other config sections in the config struct
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort

s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.querySchedulerRingManager, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s)
schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s)
t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(s)

t.queryScheduler = s
return s, nil
}

func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) {
if !t.Cfg.QueryScheduler.UseSchedulerRing {
return
}

// Set some config sections from other config sections in the config struct
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort

managerMode := scheduler.RingManagerModeReader
if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)) {
managerMode = scheduler.RingManagerModeMember
}
rm, err := scheduler.NewRingManager(managerMode, t.Cfg.QueryScheduler, util_log.Logger, prometheus.DefaultRegisterer)

if err != nil {
return nil, gerrors.Wrap(err, "new scheduler ring manager")
}

t.querySchedulerRingManager = rm

t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(t.querySchedulerRingManager)

if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(s)
t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(t.querySchedulerRingManager)
}

t.queryScheduler = s
return s, nil
return t.querySchedulerRingManager, nil
}

func (t *Loki) initQueryLimiter() (services.Service, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func InitWorkerService(
externalRouter.Path(route).Methods("GET", "POST").Handler(handlerMiddleware.Wrap(internalRouter))
}

//If no frontend or scheduler address has been configured, then there is no place for the
//If no scheduler ring or frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
if cfg.SchedulerRing == nil && (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
return nil, nil
}

Expand Down
Loading

0 comments on commit 0a5e149

Please sign in to comment.