From ad322c0fc22bbb99128001b81ebb384bd778066c Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Tue, 3 Dec 2024 06:58:55 +0530 Subject: [PATCH] feat(block-scheduler): adds service and basic planner support for scheduler (#15200) --- docs/sources/shared/configuration.md | 20 +++ pkg/blockbuilder/scheduler/kafkautil.go | 80 +++++++++ pkg/blockbuilder/scheduler/kafkautil_test.go | 164 +++++++++++++++++++ pkg/blockbuilder/scheduler/metrics.go | 24 +++ pkg/blockbuilder/scheduler/offsets_reader.go | 62 +++++++ pkg/blockbuilder/scheduler/queue.go | 19 +++ pkg/blockbuilder/scheduler/scheduler.go | 149 +++++++++++++++-- pkg/blockbuilder/scheduler/scheduler_test.go | 7 +- pkg/blockbuilder/scheduler/strategy.go | 142 ++++++++++++++++ pkg/blockbuilder/scheduler/strategy_test.go | 159 ++++++++++++++++++ pkg/loki/loki.go | 9 + pkg/loki/modules.go | 19 +++ 12 files changed, 834 insertions(+), 20 deletions(-) create mode 100644 pkg/blockbuilder/scheduler/kafkautil.go create mode 100644 pkg/blockbuilder/scheduler/kafkautil_test.go create mode 100644 pkg/blockbuilder/scheduler/metrics.go create mode 100644 pkg/blockbuilder/scheduler/offsets_reader.go create mode 100644 pkg/blockbuilder/scheduler/strategy.go create mode 100644 pkg/blockbuilder/scheduler/strategy_test.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 6c8ed01c5c0c7..18c4cdceb649e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -188,6 +188,26 @@ block_builder: # CLI flag: -blockbuilder.backoff..backoff-retries [max_retries: | default = 10] +block_scheduler: + # Consumer group used by block scheduler to track the last consumed offset. + # CLI flag: -block-scheduler.consumer-group + [consumer_group: | default = "block-scheduler"] + + # How often the scheduler should plan jobs. + # CLI flag: -block-scheduler.interval + [interval: | default = 5m] + + # Period used by the planner to calculate the start and end offset such that + # each job consumes records spanning the target period. + # CLI flag: -block-scheduler.target-records-spanning-period + [target_records_spanning_period: | default = 1h] + + # Lookback period in milliseconds used by the scheduler to plan jobs when the + # consumer group has no commits. -1 consumes from the latest offset. -2 + # consumes from the start of the partition. + # CLI flag: -block-scheduler.lookback-period + [lookback_period: | default = -2] + pattern_ingester: # Whether the pattern ingester is enabled. # CLI flag: -pattern-ingester.enabled diff --git a/pkg/blockbuilder/scheduler/kafkautil.go b/pkg/blockbuilder/scheduler/kafkautil.go new file mode 100644 index 0000000000000..f746f2a9fd4e0 --- /dev/null +++ b/pkg/blockbuilder/scheduler/kafkautil.go @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package scheduler + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" +) + +// GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. +// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits. +// +// The lag is the difference between the last produced offset (high watermark) and an offset in the "past". +// If the block builder committed an offset for a given partition to the consumer group at least once, then +// the lag is the difference between the last produced offset and the offset committed in the consumer group. +// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is +// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis. +func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) { + offsets, err := admClient.FetchOffsets(ctx, group) + if err != nil { + if !errors.Is(err, kerr.GroupIDNotFound) { + return nil, fmt.Errorf("fetch offsets: %w", err) + } + } + if err := offsets.Error(); err != nil { + return nil, fmt.Errorf("fetch offsets got error in response: %w", err) + } + + startOffsets, err := admClient.ListStartOffsets(ctx, topic) + if err != nil { + return nil, err + } + endOffsets, err := admClient.ListEndOffsets(ctx, topic) + if err != nil { + return nil, err + } + + resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) { + return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic) + }) + // If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at. + for topic, pt := range startOffsets.Offsets() { + for partition, startOffset := range pt { + if _, ok := offsets.Lookup(topic, partition); ok { + continue + } + fallbackOffsets, err := resolveFallbackOffsets() + if err != nil { + return nil, fmt.Errorf("resolve fallback offsets: %w", err) + } + o, ok := fallbackOffsets.Lookup(topic, partition) + if !ok { + return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic) + } + if o.Offset < startOffset.At { + // Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition). + // This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream. + continue + } + offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{ + Topic: o.Topic, + Partition: o.Partition, + At: o.Offset, + LeaderEpoch: o.LeaderEpoch, + }}) + } + } + + descrGroup := kadm.DescribedGroup{ + // "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder, + // because we don't use group consumption. + State: "Empty", + } + return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil +} diff --git a/pkg/blockbuilder/scheduler/kafkautil_test.go b/pkg/blockbuilder/scheduler/kafkautil_test.go new file mode 100644 index 0000000000000..d2a865702a808 --- /dev/null +++ b/pkg/blockbuilder/scheduler/kafkautil_test.go @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package scheduler + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/kafka/testkafka" +) + +const ( + testTopic = "test" + testGroup = "testgroup" +) + +func TestKafkaGetGroupLag(t *testing.T) { + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(errors.New("test done")) }) + + _, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 3, testTopic) + kafkaClient := mustKafkaClient(t, addr) + admClient := kadm.NewClient(kafkaClient) + + const numRecords = 5 + + var producedRecords []kgo.Record + kafkaTime := time.Now().Add(-12 * time.Hour) + for i := int64(0); i < numRecords; i++ { + kafkaTime = kafkaTime.Add(time.Minute) + + // Produce and keep records to partition 0. + res := produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 0, []byte(`test value`)) + rec, err := res.First() + require.NoError(t, err) + require.NotNil(t, rec) + + producedRecords = append(producedRecords, *rec) + + // Produce same records to partition 1 (this partition won't have any commits). + produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 1, []byte(`test value`)) + } + require.Len(t, producedRecords, numRecords) + + // Commit last produced record from partition 0. + rec := producedRecords[len(producedRecords)-1] + offsets := make(kadm.Offsets) + offsets.Add(kadm.Offset{ + Topic: rec.Topic, + Partition: rec.Partition, + At: rec.Offset + 1, + LeaderEpoch: rec.LeaderEpoch, + }) + err := admClient.CommitAllOffsets(ctx, testGroup, offsets) + require.NoError(t, err) + + // Truncate partition 1 after second to last record to emulate the retention + // Note Kafka sets partition's start offset to the requested offset. Any records within the segment before the requested offset can no longer be read. + // Note the difference between DeleteRecords and DeleteOffsets in kadm docs. + deleteRecOffsets := make(kadm.Offsets) + deleteRecOffsets.Add(kadm.Offset{ + Topic: testTopic, + Partition: 1, + At: numRecords - 2, + }) + _, err = admClient.DeleteRecords(ctx, deleteRecOffsets) + require.NoError(t, err) + + getTopicPartitionLag := func(t *testing.T, lag kadm.GroupLag, topic string, part int32) int64 { + l, ok := lag.Lookup(topic, part) + require.True(t, ok) + return l.Lag + } + + t.Run("fallbackOffset=milliseconds", func(t *testing.T) { + // get the timestamp of the last produced record + rec := producedRecords[len(producedRecords)-1] + fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli() + groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset) + require.NoError(t, err) + + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") + require.EqualValues(t, 1, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to known record and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) + + t.Run("fallbackOffset=before-earliest", func(t *testing.T) { + // get the timestamp of third to last produced record (record before earliest in partition 1) + rec := producedRecords[len(producedRecords)-3] + fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli() + groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset) + require.NoError(t, err) + + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") + require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to earliest and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) + + t.Run("fallbackOffset=0", func(t *testing.T) { + groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, 0) + require.NoError(t, err) + + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") + require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) + + t.Run("group=unknown", func(t *testing.T) { + groupLag, err := GetGroupLag(ctx, admClient, testTopic, "unknown", 0) + require.NoError(t, err) + + // This group doesn't have any commits, so it must calc its lag from the fallback. + require.EqualValues(t, numRecords, getTopicPartitionLag(t, groupLag, testTopic, 0)) + require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) +} + +func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client { + writeClient, err := kgo.NewClient( + kgo.SeedBrokers(addrs...), + kgo.AllowAutoTopicCreation(), + // We will choose the partition of each record. + kgo.RecordPartitioner(kgo.ManualPartitioner()), + ) + require.NoError(t, err) + t.Cleanup(writeClient.Close) + return writeClient +} + +func produceRecords( + ctx context.Context, + t *testing.T, + kafkaClient *kgo.Client, + ts time.Time, + userID string, + topic string, + part int32, + val []byte, +) kgo.ProduceResults { + rec := &kgo.Record{ + Timestamp: ts, + Key: []byte(userID), + Value: val, + Topic: topic, + Partition: part, // samples in this batch are split between N partitions + } + produceResult := kafkaClient.ProduceSync(ctx, rec) + require.NoError(t, produceResult.FirstErr()) + return produceResult +} + +func commitOffset(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, group string, offset kadm.Offset) { + offsets := make(kadm.Offsets) + offsets.Add(offset) + err := kadm.NewClient(kafkaClient).CommitAllOffsets(ctx, group, offsets) + require.NoError(t, err) +} diff --git a/pkg/blockbuilder/scheduler/metrics.go b/pkg/blockbuilder/scheduler/metrics.go new file mode 100644 index 0000000000000..4e1dbfa2afa1c --- /dev/null +++ b/pkg/blockbuilder/scheduler/metrics.go @@ -0,0 +1,24 @@ +package scheduler + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type Metrics struct { + lag *prometheus.GaugeVec + committedOffset *prometheus.GaugeVec +} + +func NewMetrics(reg prometheus.Registerer) *Metrics { + return &Metrics{ + lag: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_group_lag", + Help: "How far behind the block scheduler consumer group is from the latest offset.", + }, []string{"partition"}), + committedOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_group_committed_offset", + Help: "The current offset the block scheduler consumer group is at.", + }, []string{"partition"}), + } +} diff --git a/pkg/blockbuilder/scheduler/offsets_reader.go b/pkg/blockbuilder/scheduler/offsets_reader.go new file mode 100644 index 0000000000000..742185dba817f --- /dev/null +++ b/pkg/blockbuilder/scheduler/offsets_reader.go @@ -0,0 +1,62 @@ +package scheduler + +import ( + "context" + "errors" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" +) + +type offsetReader struct { + topic string + consumerGroup string + fallbackOffsetMillis int64 + + adminClient *kadm.Client +} + +func NewOffsetReader(topic, consumerGroup string, lookbackPeriodInMs int64, client *kgo.Client) OffsetReader { + var fallbackOffsetMillis int64 + if lookbackPeriodInMs >= 0 { + fallbackOffsetMillis = time.Now().UnixMilli() - lookbackPeriodInMs + } else { + fallbackOffsetMillis = lookbackPeriodInMs + } + + return &offsetReader{ + topic: topic, + consumerGroup: consumerGroup, + adminClient: kadm.NewClient(client), + fallbackOffsetMillis: fallbackOffsetMillis, + } +} + +func (r *offsetReader) GroupLag(ctx context.Context) (map[int32]kadm.GroupMemberLag, error) { + lag, err := GetGroupLag(ctx, r.adminClient, r.topic, r.consumerGroup, r.fallbackOffsetMillis) + if err != nil { + return nil, err + } + + offsets, ok := lag[r.topic] + if !ok { + return nil, errors.New("no lag found for the topic") + } + + return offsets, nil +} + +func (r *offsetReader) ListOffsetsAfterMilli(ctx context.Context, ts int64) (map[int32]kadm.ListedOffset, error) { + offsets, err := r.adminClient.ListOffsetsAfterMilli(ctx, ts, r.topic) + if err != nil { + return nil, err + } + + resp, ok := offsets[r.topic] + if !ok { + return nil, errors.New("no offsets found for the topic") + } + + return resp, nil +} diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 3e9cf087c6792..e2f125ad70a07 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -30,6 +30,25 @@ func NewJobQueue() *JobQueue { } } +func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + + if _, ok := q.inProgress[job.ID]; ok { + return types.JobStatusInProgress, true + } + + if _, ok := q.pending[job.ID]; ok { + return types.JobStatusPending, true + } + + if _, ok := q.completed[job.ID]; ok { + return types.JobStatusComplete, true + } + + return -1, false +} + // Enqueue adds a new job to the pending queue // This is a naive implementation, intended to be refactored func (q *JobQueue) Enqueue(job *types.Job) error { diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 274713b5b1c36..dbf732742de39 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -2,43 +2,140 @@ package scheduler import ( "context" + "errors" + "flag" + "strconv" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) var ( _ types.Scheduler = unimplementedScheduler{} - _ types.Scheduler = &QueueScheduler{} + _ types.Scheduler = &BlockScheduler{} ) -// unimplementedScheduler provides default implementations that panic. -type unimplementedScheduler struct{} +type Config struct { + ConsumerGroup string `yaml:"consumer_group"` + Interval time.Duration `yaml:"interval"` + TargetRecordConsumptionPeriod time.Duration `yaml:"target_records_spanning_period"` + LookbackPeriod int64 `yaml:"lookback_period"` +} -func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) { - panic("unimplemented") +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.") + f.DurationVar(&cfg.TargetRecordConsumptionPeriod, prefix+"target-records-spanning-period", time.Hour, "Period used by the planner to calculate the start and end offset such that each job consumes records spanning the target period.") + f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") + f.Int64Var(&cfg.LookbackPeriod, prefix+"lookback-period", -2, "Lookback period in milliseconds used by the scheduler to plan jobs when the consumer group has no commits. -1 consumes from the latest offset. -2 consumes from the start of the partition.") } -func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error { - panic("unimplemented") +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("block-scheduler.", f) } -func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error { - panic("unimplemented") +func (cfg *Config) Validate() error { + if cfg.Interval <= 0 { + return errors.New("interval must be a non-zero value") + } + + if cfg.LookbackPeriod < -2 { + return errors.New("only -1(latest) and -2(earliest) are valid as negative values for lookback_period") + } + + return nil } -// QueueScheduler implements the Scheduler interface -type QueueScheduler struct { - queue *JobQueue +// BlockScheduler implements the Scheduler interface +type BlockScheduler struct { + services.Service + + cfg Config + logger log.Logger + queue *JobQueue + metrics *Metrics + + offsetReader OffsetReader + planner Planner } // NewScheduler creates a new scheduler instance -func NewScheduler(queue *JobQueue) *QueueScheduler { - return &QueueScheduler{ - queue: queue, +func NewScheduler(cfg Config, queue *JobQueue, offsetReader OffsetReader, logger log.Logger, r prometheus.Registerer) *BlockScheduler { + planner := NewTimeRangePlanner(cfg.TargetRecordConsumptionPeriod, offsetReader, func() time.Time { return time.Now().UTC() }, logger) + s := &BlockScheduler{ + cfg: cfg, + planner: planner, + offsetReader: offsetReader, + logger: logger, + metrics: NewMetrics(r), + queue: queue, + } + s.Service = services.NewBasicService(nil, s.running, nil) + return s +} + +func (s *BlockScheduler) running(ctx context.Context) error { + if err := s.runOnce(ctx); err != nil { + level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err) + } + + ticker := time.NewTicker(s.cfg.Interval) + for { + select { + case <-ticker.C: + if err := s.runOnce(ctx); err != nil { + // TODO: add metrics + level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err) + } + case <-ctx.Done(): + return nil + } } } -func (s *QueueScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { +func (s *BlockScheduler) runOnce(ctx context.Context) error { + lag, err := s.offsetReader.GroupLag(ctx) + if err != nil { + level.Error(s.logger).Log("msg", "failed to get group lag", "err", err) + return err + } + + s.publishLagMetrics(lag) + + jobs, err := s.planner.Plan(ctx) + if err != nil { + level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err) + } + + for _, job := range jobs { + // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID + if status, ok := s.queue.Exists(&job); ok { + level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status) + continue + } + + if err := s.queue.Enqueue(&job); err != nil { + level.Error(s.logger).Log("msg", "failed to enqueue job", "job", job, "err", err) + } + } + + return nil +} + +func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) { + for partition, offsets := range lag { + // useful for scaling builders + s.metrics.lag.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(offsets.Lag)) + s.metrics.committedOffset.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(offsets.Commit.At)) + } +} + +func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { select { case <-ctx.Done(): return nil, false, ctx.Err() @@ -47,10 +144,26 @@ func (s *QueueScheduler) HandleGetJob(ctx context.Context, builderID string) (*t } } -func (s *QueueScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error { +func (s *BlockScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error { + // TODO: handle commits return s.queue.MarkComplete(job.ID, builderID) } -func (s *QueueScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { +func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { return s.queue.SyncJob(job.ID, builderID, job) } + +// unimplementedScheduler provides default implementations that panic. +type unimplementedScheduler struct{} + +func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) { + panic("unimplemented") +} + +func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error { + panic("unimplemented") +} + +func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error { + panic("unimplemented") +} diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index ad6829bc8fe69..bd9e00450dfa7 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -5,20 +5,23 @@ import ( "testing" "time" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/loki/v3/pkg/blockbuilder/builder" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) type testEnv struct { queue *JobQueue - scheduler *QueueScheduler + scheduler *BlockScheduler transport *builder.MemoryTransport builder *builder.Worker } func newTestEnv(builderID string) *testEnv { queue := NewJobQueue() - scheduler := NewScheduler(queue) + scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry()) transport := builder.NewMemoryTransport(scheduler) builder := builder.NewWorker(builderID, builder.NewMemoryTransport(scheduler)) diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go new file mode 100644 index 0000000000000..5ea1fb6db2d9c --- /dev/null +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -0,0 +1,142 @@ +package scheduler + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/twmb/franz-go/pkg/kadm" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka. +type OffsetReader interface { + ListOffsetsAfterMilli(context.Context, int64) (map[int32]kadm.ListedOffset, error) + GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error) +} + +type Planner interface { + Name() string + Plan(ctx context.Context) ([]types.Job, error) +} + +const ( + RecordCountStrategy = "record_count" + TimeRangeStrategy = "time_range" +) + +// tries to consume upto targetRecordCount records per partition +type RecordCountPlanner struct { + targetRecordCount int64 + offsetReader OffsetReader + logger log.Logger +} + +func NewRecordCountPlanner(targetRecordCount int64) *RecordCountPlanner { + return &RecordCountPlanner{ + targetRecordCount: targetRecordCount, + } +} + +func (p *RecordCountPlanner) Name() string { + return RecordCountStrategy +} + +func (p *RecordCountPlanner) Plan(ctx context.Context) ([]types.Job, error) { + offsets, err := p.offsetReader.GroupLag(ctx) + if err != nil { + level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) + return nil, err + } + + jobs := make([]types.Job, 0, len(offsets)) + for _, partition := range offsets { + // kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. + // no additional validation is needed here + startOffset := partition.Commit.At + 1 + endOffset := min(startOffset+p.targetRecordCount, partition.End.Offset) + + job := types.Job{ + Partition: int(partition.Partition), + Offsets: types.Offsets{ + Min: startOffset, + Max: endOffset, + }, + } + + jobs = append(jobs, job) + } + + return jobs, nil +} + +// Targets consuming records spanning a configured period. +// This is a stateless planner, it is upto the caller to deduplicate or update jobs that are already in queue or progress. +type TimeRangePlanner struct { + offsetReader OffsetReader + + buffer time.Duration + targetPeriod time.Duration + now func() time.Time + + logger log.Logger +} + +func NewTimeRangePlanner(interval time.Duration, offsetReader OffsetReader, now func() time.Time, logger log.Logger) *TimeRangePlanner { + return &TimeRangePlanner{ + targetPeriod: interval, + buffer: interval, + offsetReader: offsetReader, + now: now, + logger: logger, + } +} + +func (p *TimeRangePlanner) Name() string { + return TimeRangeStrategy +} + +func (p *TimeRangePlanner) Plan(ctx context.Context) ([]types.Job, error) { + // truncate to the nearest Interval + consumeUptoTS := p.now().Add(-p.buffer).Truncate(p.targetPeriod) + + // this will return the latest offset in the partition if no records are produced after this ts. + consumeUptoOffsets, err := p.offsetReader.ListOffsetsAfterMilli(ctx, consumeUptoTS.UnixMilli()) + if err != nil { + level.Error(p.logger).Log("msg", "failed to list offsets after timestamp", "err", err) + return nil, err + } + + offsets, err := p.offsetReader.GroupLag(ctx) + if err != nil { + level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) + return nil, err + } + + var jobs []types.Job + for _, partitionOffset := range offsets { + startOffset := partitionOffset.Commit.At + 1 + // TODO: we could further break down the work into Interval sized chunks if this partition has pending records spanning a long time range + // or have the builder consume in chunks and commit the job status back to scheduler. + endOffset := consumeUptoOffsets[partitionOffset.Partition].Offset + + if startOffset >= endOffset { + level.Info(p.logger).Log("msg", "no pending records to process", "partition", partitionOffset.Partition, + "commitOffset", partitionOffset.Commit.At, + "consumeUptoOffset", consumeUptoOffsets[partitionOffset.Partition].Offset) + continue + } + + jobs = append(jobs, types.Job{ + Partition: int(partitionOffset.Partition), + Offsets: types.Offsets{ + Min: startOffset, + Max: endOffset, + }, + }) + } + + return jobs, nil +} diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go new file mode 100644 index 0000000000000..eb4704f268c74 --- /dev/null +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -0,0 +1,159 @@ +package scheduler + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +func TestTimeRangePlanner_Plan(t *testing.T) { + interval := 15 * time.Minute + for _, tc := range []struct { + name string + now time.Time + expectedJobs []types.Job + groupLag map[int32]kadm.GroupMemberLag + consumeUpto map[int32]kadm.ListedOffset + }{ + { + // Interval 1 + // now: 00:42:00. consume until 00:15:00 + // last consumed offset 100 with record ts: 00:10:00 + // record offset with ts after 00:15:00 - offset 200 + // resulting jobs: [100, 200] + name: "normal case. schedule first interval", + now: time.Date(0, 0, 0, 0, 42, 0, 0, time.UTC), // 00:42:00 + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 100, + }, + Partition: 0, + }, + }, + consumeUpto: map[int32]kadm.ListedOffset{ + 0: { + Offset: 200, + }, + }, + expectedJobs: []types.Job{ + { + Partition: 0, + Offsets: types.Offsets{Min: 101, Max: 200}, + }, + }, + }, + { + // Interval 2 + // now: 00:46:00. consume until 00:30:00 + // last consumed offset 199 with record ts: 00:11:00 + // record offset with ts after 00:30:00 - offset 300 + // resulting jobs: [200, 300] + name: "normal case. schedule second interval", + now: time.Date(0, 0, 0, 0, 46, 0, 0, time.UTC), // 00:46:00 + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 199, + }, + Partition: 0, + }, + 1: { + Commit: kadm.Offset{ + At: 11, + }, + Partition: 1, + }, + }, + consumeUpto: map[int32]kadm.ListedOffset{ + 0: { + Offset: 300, + }, + 1: { + Offset: 123, + }, + }, + expectedJobs: []types.Job{ + { + Partition: 0, + Offsets: types.Offsets{Min: 200, Max: 300}, + }, + { + Partition: 1, + Offsets: types.Offsets{Min: 12, Max: 123}, + }, + }, + }, + { + // Interval 2 - run scheduling again + // now: 00:48:00. consume until 00:30:00 + // last consumed offset 299 + // record offset with ts after 00:30:00 - offset 300 + // no jobs to schedule for partition 0 + name: "no pending records to consume. schedule second interval once more time", + now: time.Date(0, 0, 0, 0, 48, 0, 0, time.UTC), // 00:48:00 + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 299, + }, + Partition: 0, + }, + 1: { + Commit: kadm.Offset{ + At: 11, + }, + Partition: 1, + }, + }, + consumeUpto: map[int32]kadm.ListedOffset{ + 0: { + Offset: 300, + }, + // still pending. assume no builder were assigned + 1: { + Offset: 123, + }, + }, + expectedJobs: []types.Job{ + { + Partition: 1, + Offsets: types.Offsets{Min: 12, Max: 123}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mockOffsetReader := &mockOffsetReader{ + offsetsAfterMilli: tc.consumeUpto, + groupLag: tc.groupLag, + } + planner := NewTimeRangePlanner(interval, mockOffsetReader, func() time.Time { return tc.now }, log.NewNopLogger()) + + jobs, err := planner.Plan(context.Background()) + require.NoError(t, err) + + require.Equal(t, len(tc.expectedJobs), len(jobs)) + require.Equal(t, tc.expectedJobs, jobs) + }) + } +} + +type mockOffsetReader struct { + offsetsAfterMilli map[int32]kadm.ListedOffset + groupLag map[int32]kadm.GroupMemberLag +} + +func (m *mockOffsetReader) ListOffsetsAfterMilli(_ context.Context, _ int64) (map[int32]kadm.ListedOffset, error) { + return m.offsetsAfterMilli, nil +} + +func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemberLag, error) { + return m.groupLag, nil +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 153387035d6b5..9747a8f231f7e 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" + blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler" "github.com/grafana/loki/v3/pkg/bloombuild" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" @@ -91,6 +92,7 @@ type Config struct { IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` BlockBuilder blockbuilder.Config `yaml:"block_builder,omitempty"` + BlockScheduler blockscheduler.Config `yaml:"block_scheduler,omitempty"` Pattern pattern.Config `yaml:"pattern_ingester,omitempty"` IndexGateway indexgateway.Config `yaml:"index_gateway"` BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"` @@ -186,6 +188,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Profiling.RegisterFlags(f) c.KafkaConfig.RegisterFlags(f) c.BlockBuilder.RegisterFlags(f) + c.BlockScheduler.RegisterFlags(f) } func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { @@ -264,6 +267,9 @@ func (c *Config) Validate() error { if err := c.BlockBuilder.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid block_builder config")) } + if err := c.BlockScheduler.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid block_scheduler config")) + } if err := c.LimitsConfig.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid limits_config config")) } @@ -379,6 +385,7 @@ type Loki struct { partitionRingWatcher *ring.PartitionRingWatcher partitionRing *ring.PartitionInstanceRing blockBuilder *blockbuilder.BlockBuilder + blockScheduler *blockscheduler.BlockScheduler ClientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -690,6 +697,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(PatternIngester, t.initPatternIngester) mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule) mm.RegisterModule(BlockBuilder, t.initBlockBuilder) + mm.RegisterModule(BlockScheduler, t.initBlockScheduler) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -728,6 +736,7 @@ func (t *Loki) setupModuleManager() error { PartitionRing: {MemberlistKV, Server, Ring}, MemberlistKV: {Server}, BlockBuilder: {PartitionRing, Store, Server}, + BlockScheduler: {Server}, Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor, PatternIngester}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 994576076af3e..c4449f3c51134 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -37,6 +37,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" + blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler" "github.com/grafana/loki/v3/pkg/bloombuild/builder" "github.com/grafana/loki/v3/pkg/bloombuild/planner" bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -49,6 +50,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/ingester" + kclient "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/kafka/partitionring" "github.com/grafana/loki/v3/pkg/logproto" @@ -139,6 +141,7 @@ const ( InitCodec string = "init-codec" PartitionRing string = "partition-ring" BlockBuilder string = "block-builder" + BlockScheduler string = "block-scheduler" ) const ( @@ -1863,6 +1866,22 @@ func (t *Loki) initBlockBuilder() (services.Service, error) { return t.blockBuilder, nil } +func (t *Loki) initBlockScheduler() (services.Service, error) { + logger := log.With(util_log.Logger, "component", "block_scheduler") + + clientMetrics := kclient.NewReaderClientMetrics("block-scheduler", prometheus.DefaultRegisterer) + c, err := kclient.NewReaderClient( + t.Cfg.KafkaConfig, + clientMetrics, + log.With(logger, "component", "kafka-client"), + ) + if err != nil { + return nil, fmt.Errorf("creating kafka client: %w", err) + } + offsetReader := blockscheduler.NewOffsetReader(t.Cfg.KafkaConfig.Topic, t.Cfg.BlockScheduler.ConsumerGroup, t.Cfg.BlockScheduler.LookbackPeriod, c) + return blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil +} + func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) { if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil