diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d30dce2f7775b..acf11102be511 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -351,6 +351,10 @@ bloom_build: # CLI flag: -bloom-build.planner.max-table-offset [max_table_offset: | default = 2] + # Maximum number of tasks to queue per tenant. + # CLI flag: -bloom-build.planner.max-tasks-per-tenant + [max_queued_tasks_per_tenant: | default = 30000] + builder: # Experimental: The bloom_gateway block configures the Loki bloom gateway @@ -3409,6 +3413,11 @@ shard_streams: # CLI flag: -bloom-build.split-keyspace-by [bloom_split_series_keyspace_by: | default = 256] +# Experimental. Maximum number of builders to use when building blooms. 0 allows +# unlimited builders. +# CLI flag: -bloom-build.max-builders +[bloom_build_max_builders: | default = 0] + # Experimental. Length of the n-grams created when computing blooms from log # lines. # CLI flag: -bloom-compactor.ngram-length diff --git a/pkg/bloombuild/planner/config.go b/pkg/bloombuild/planner/config.go index 47b01c0b286e0..aff25873b12f4 100644 --- a/pkg/bloombuild/planner/config.go +++ b/pkg/bloombuild/planner/config.go @@ -8,9 +8,10 @@ import ( // Config configures the bloom-planner component. type Config struct { - PlanningInterval time.Duration `yaml:"planning_interval"` - MinTableOffset int `yaml:"min_table_offset"` - MaxTableOffset int `yaml:"max_table_offset"` + PlanningInterval time.Duration `yaml:"planning_interval"` + MinTableOffset int `yaml:"min_table_offset"` + MaxTableOffset int `yaml:"max_table_offset"` + MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. @@ -24,6 +25,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // dynamically reloaded. // I'm doing it the simple way for now. f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.") + f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.") } func (cfg *Config) Validate() error { @@ -37,4 +39,28 @@ func (cfg *Config) Validate() error { type Limits interface { BloomCreationEnabled(tenantID string) bool BloomSplitSeriesKeyspaceBy(tenantID string) int + BloomBuildMaxBuilders(tenantID string) int +} + +type QueueLimits struct { + limits Limits +} + +func NewQueueLimits(limits Limits) *QueueLimits { + return &QueueLimits{limits: limits} +} + +// MaxConsumers is used to compute how many of the available builders are allowed to handle tasks for a given tenant. +// 0 is returned when neither limits are applied. 0 means all builders can be used. +func (c *QueueLimits) MaxConsumers(tenantID string, allConsumers int) int { + if c == nil || c.limits == nil { + return 0 + } + + maxBuilders := c.limits.BloomBuildMaxBuilders(tenantID) + if maxBuilders == 0 { + return 0 + } + + return min(allConsumers, maxBuilders) } diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index c0028237d9b1d..347af1926617b 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -1,8 +1,12 @@ package planner import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/queue" ) const ( @@ -16,6 +20,11 @@ const ( type Metrics struct { running prometheus.Gauge + // Extra Queue metrics + connectedBuilders prometheus.GaugeFunc + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + buildStarted prometheus.Counter buildCompleted *prometheus.CounterVec buildTime *prometheus.HistogramVec @@ -23,7 +32,10 @@ type Metrics struct { tenantsDiscovered prometheus.Counter } -func NewMetrics(r prometheus.Registerer) *Metrics { +func NewMetrics( + r prometheus.Registerer, + getConnectedBuilders func() float64, +) *Metrics { return &Metrics{ running: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, @@ -31,6 +43,28 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Name: "running", Help: "Value will be 1 if bloom planner is currently running on this instance", }), + connectedBuilders: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "connected_builders", + Help: "Number of builders currently connected to the planner.", + }, getConnectedBuilders), + queueDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "queue_duration_seconds", + Help: "Time spend by tasks in queue before getting picked up by a builder.", + Buckets: prometheus.DefBuckets, + }), + inflightRequests: promauto.With(r).NewSummary(prometheus.SummaryOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "inflight_tasks", + Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, + }), buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, @@ -60,3 +94,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics { }), } } + +func NewQueueMetrics(r prometheus.Registerer) *queue.Metrics { + return queue.NewMetrics(r, metricsNamespace, metricsSubsystem) +} diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 0be853a2f604a..9a5b9f6dc238e 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -12,16 +12,21 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/grafana/loki/v3/pkg/queue" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/util" utillog "github.com/grafana/loki/v3/pkg/util/log" ) type Planner struct { services.Service + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher cfg Config limits Limits @@ -30,6 +35,9 @@ type Planner struct { tsdbStore TSDBStore bloomStore bloomshipper.Store + tasksQueue *queue.RequestQueue + activeUsers *util.ActiveUsersCleanupService + metrics *Metrics logger log.Logger } @@ -51,15 +59,34 @@ func New( return nil, fmt.Errorf("error creating TSDB store: %w", err) } + // Queue to manage tasks + queueMetrics := NewQueueMetrics(r) + tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics) + + // Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour + activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) { + queueMetrics.Cleanup(user) + }) + p := &Planner{ - cfg: cfg, - limits: limits, - schemaCfg: schemaCfg, - tsdbStore: tsdbStore, - bloomStore: bloomStore, - metrics: NewMetrics(r), - logger: logger, + cfg: cfg, + limits: limits, + schemaCfg: schemaCfg, + tsdbStore: tsdbStore, + bloomStore: bloomStore, + tasksQueue: tasksQueue, + activeUsers: activeUsers, + metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric), + logger: logger, + } + + svcs := []services.Service{p.tasksQueue, p.activeUsers} + p.subservices, err = services.NewManager(svcs...) + if err != nil { + return nil, fmt.Errorf("error creating subservices manager: %w", err) } + p.subservicesWatcher = services.NewFailureWatcher() + p.subservicesWatcher.WatchManager(p.subservices) p.Service = services.NewBasicService(p.starting, p.running, p.stopping) return p, nil @@ -91,6 +118,7 @@ func (p *Planner) running(ctx context.Context) error { return err case <-ticker.C: + level.Info(p.logger).Log("msg", "starting bloom build iteration") if err := p.runOne(ctx); err != nil { level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err) } @@ -109,44 +137,53 @@ func (p *Planner) runOne(ctx context.Context) error { }() p.metrics.buildStarted.Inc() - level.Info(p.logger).Log("msg", "running bloom build planning") tables := p.tables(time.Now()) level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays()) work, err := p.loadWork(ctx, tables) if err != nil { - level.Error(p.logger).Log("msg", "error loading work", "err", err) return fmt.Errorf("error loading work: %w", err) } - // TODO: Enqueue instead of buffering here - // This is just a placeholder for now - var tasks []Task - + var totalTasks int for _, w := range work { + logger := log.With(p.logger, "tenant", w.tenant, "table", w.table.Addr(), "ownership", w.ownershipRange.String()) + gaps, err := p.findGapsForBounds(ctx, w.tenant, w.table, w.ownershipRange) if err != nil { - level.Error(p.logger).Log("msg", "error finding gaps", "err", err, "tenant", w.tenant, "table", w.table, "ownership", w.ownershipRange.String()) - return fmt.Errorf("error finding gaps for tenant (%s) in table (%s) for bounds (%s): %w", w.tenant, w.table, w.ownershipRange, err) + level.Error(logger).Log("msg", "error finding gaps", "err", err) + continue } + now := time.Now() for _, gap := range gaps { - tasks = append(tasks, Task{ + totalTasks++ + task := Task{ table: w.table.Addr(), tenant: w.tenant, OwnershipBounds: w.ownershipRange, tsdb: gap.tsdb, gaps: gap.gaps, - }) + + queueTime: now, + ctx: ctx, + } + + p.activeUsers.UpdateUserTimestamp(task.tenant, now) + if err := p.tasksQueue.Enqueue(task.tenant, nil, task, nil); err != nil { + level.Error(logger).Log("msg", "error enqueuing task", "err", err) + continue + } } } + level.Debug(p.logger).Log("msg", "planning completed", "tasks", totalTasks) + status = statusSuccess level.Info(p.logger).Log( "msg", "bloom build iteration completed", "duration", time.Since(start).Seconds(), - "tasks", len(tasks), ) return nil } diff --git a/pkg/bloombuild/planner/task.go b/pkg/bloombuild/planner/task.go index 80f730c4fb6dd..bff459fe17643 100644 --- a/pkg/bloombuild/planner/task.go +++ b/pkg/bloombuild/planner/task.go @@ -1,6 +1,9 @@ package planner import ( + "context" + "time" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" @@ -19,4 +22,8 @@ type Task struct { OwnershipBounds v1.FingerprintBounds tsdb tsdb.SingleTenantTSDBIdentifier gaps []GapWithBlocks + + // Tracking + queueTime time.Time + ctx context.Context } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index b0660686f5c1b..148205b306c19 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -207,6 +207,7 @@ type Limits struct { BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` + BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"` BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"` BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"` @@ -385,6 +386,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.") f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.") + f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.") _ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize) f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size", @@ -987,6 +989,10 @@ func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int { return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy } +func (o *Overrides) BloomBuildMaxBuilders(userID string) int { + return o.getOverridesForUser(userID).BloomBuildMaxBuilders +} + func (o *Overrides) BloomNGramLength(userID string) int { return o.getOverridesForUser(userID).BloomNGramLength }