Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block-scheduler): adds service and basic planner support for scheduler #15200

Merged
merged 9 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,26 @@ block_builder:
# CLI flag: -blockbuilder.backoff..backoff-retries
[max_retries: <int> | default = 10]

block_scheduler:
# Consumer group used by block scheduler to track the last consumed offset.
# CLI flag: -block-scheduler.consumer-group
[consumer_group: <string> | default = "block-scheduler"]

# How often the scheduler should plan jobs.
# CLI flag: -block-scheduler.interval
[interval: <duration> | default = 5m]

# Period used by the planner to calculate the start and end offset such that
# each job consumes records spanning the target period.
# CLI flag: -block-scheduler.target-records-spanning-period
[target_records_spanning_period: <duration> | default = 1h]

# Lookback period in milliseconds used by the scheduler to plan jobs when the
# consumer group has no commits. -1 consumes from the latest offset. -2
# consumes from the start of the partition.
# CLI flag: -block-scheduler.lookback-period
[lookback_period: <int> | default = -2]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
80 changes: 80 additions & 0 deletions pkg/blockbuilder/scheduler/kafkautil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"context"
"errors"
"fmt"
"sync"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
)

// GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
//
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
// If the block builder committed an offset for a given partition to the consumer group at least once, then
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
return nil, fmt.Errorf("fetch offsets: %w", err)
}
}
if err := offsets.Error(); err != nil {
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
}

startOffsets, err := admClient.ListStartOffsets(ctx, topic)
if err != nil {
return nil, err
}
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
if err != nil {
return nil, err
}

resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) {
return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic)
})
// If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at.
for topic, pt := range startOffsets.Offsets() {
for partition, startOffset := range pt {
if _, ok := offsets.Lookup(topic, partition); ok {
continue
}
fallbackOffsets, err := resolveFallbackOffsets()
if err != nil {
return nil, fmt.Errorf("resolve fallback offsets: %w", err)
}
o, ok := fallbackOffsets.Lookup(topic, partition)
if !ok {
return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic)
}
if o.Offset < startOffset.At {
// Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition).
// This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream.
continue
}
offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{
Topic: o.Topic,
Partition: o.Partition,
At: o.Offset,
LeaderEpoch: o.LeaderEpoch,
}})
}
}

descrGroup := kadm.DescribedGroup{
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
// because we don't use group consumption.
State: "Empty",
}
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
}
164 changes: 164 additions & 0 deletions pkg/blockbuilder/scheduler/kafkautil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/kafka/testkafka"
)

const (
testTopic = "test"
testGroup = "testgroup"
)

func TestKafkaGetGroupLag(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 3, testTopic)
kafkaClient := mustKafkaClient(t, addr)
admClient := kadm.NewClient(kafkaClient)

const numRecords = 5

var producedRecords []kgo.Record
kafkaTime := time.Now().Add(-12 * time.Hour)
for i := int64(0); i < numRecords; i++ {
kafkaTime = kafkaTime.Add(time.Minute)

// Produce and keep records to partition 0.
res := produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 0, []byte(`test value`))
rec, err := res.First()
require.NoError(t, err)
require.NotNil(t, rec)

producedRecords = append(producedRecords, *rec)

// Produce same records to partition 1 (this partition won't have any commits).
produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 1, []byte(`test value`))
}
require.Len(t, producedRecords, numRecords)

// Commit last produced record from partition 0.
rec := producedRecords[len(producedRecords)-1]
offsets := make(kadm.Offsets)
offsets.Add(kadm.Offset{
Topic: rec.Topic,
Partition: rec.Partition,
At: rec.Offset + 1,
LeaderEpoch: rec.LeaderEpoch,
})
err := admClient.CommitAllOffsets(ctx, testGroup, offsets)
require.NoError(t, err)

// Truncate partition 1 after second to last record to emulate the retention
// Note Kafka sets partition's start offset to the requested offset. Any records within the segment before the requested offset can no longer be read.
// Note the difference between DeleteRecords and DeleteOffsets in kadm docs.
deleteRecOffsets := make(kadm.Offsets)
deleteRecOffsets.Add(kadm.Offset{
Topic: testTopic,
Partition: 1,
At: numRecords - 2,
})
_, err = admClient.DeleteRecords(ctx, deleteRecOffsets)
require.NoError(t, err)

getTopicPartitionLag := func(t *testing.T, lag kadm.GroupLag, topic string, part int32) int64 {
l, ok := lag.Lookup(topic, part)
require.True(t, ok)
return l.Lag
}

t.Run("fallbackOffset=milliseconds", func(t *testing.T) {
// get the timestamp of the last produced record
rec := producedRecords[len(producedRecords)-1]
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
require.EqualValues(t, 1, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to known record and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})

t.Run("fallbackOffset=before-earliest", func(t *testing.T) {
// get the timestamp of third to last produced record (record before earliest in partition 1)
rec := producedRecords[len(producedRecords)-3]
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to earliest and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})

t.Run("fallbackOffset=0", func(t *testing.T) {
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, 0)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})

t.Run("group=unknown", func(t *testing.T) {
groupLag, err := GetGroupLag(ctx, admClient, testTopic, "unknown", 0)
require.NoError(t, err)

// This group doesn't have any commits, so it must calc its lag from the fallback.
require.EqualValues(t, numRecords, getTopicPartitionLag(t, groupLag, testTopic, 0))
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})
}

func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client {
writeClient, err := kgo.NewClient(
kgo.SeedBrokers(addrs...),
kgo.AllowAutoTopicCreation(),
// We will choose the partition of each record.
kgo.RecordPartitioner(kgo.ManualPartitioner()),
)
require.NoError(t, err)
t.Cleanup(writeClient.Close)
return writeClient
}

func produceRecords(
ctx context.Context,
t *testing.T,
kafkaClient *kgo.Client,
ts time.Time,
userID string,
topic string,
part int32,
val []byte,
) kgo.ProduceResults {
rec := &kgo.Record{
Timestamp: ts,
Key: []byte(userID),
Value: val,
Topic: topic,
Partition: part, // samples in this batch are split between N partitions
}
produceResult := kafkaClient.ProduceSync(ctx, rec)
require.NoError(t, produceResult.FirstErr())
return produceResult
}

func commitOffset(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, group string, offset kadm.Offset) {
offsets := make(kadm.Offsets)
offsets.Add(offset)
err := kadm.NewClient(kafkaClient).CommitAllOffsets(ctx, group, offsets)
require.NoError(t, err)
}
24 changes: 24 additions & 0 deletions pkg/blockbuilder/scheduler/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package scheduler

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Metrics struct {
lag *prometheus.GaugeVec
committedOffset *prometheus.GaugeVec
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
return &Metrics{
lag: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_block_scheduler_group_lag",
Help: "How far behind the block scheduler consumer group is from the latest offset.",
}, []string{"partition"}),
committedOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_block_scheduler_group_committed_offset",
Help: "The current offset the block scheduler consumer group is at.",
}, []string{"partition"}),
}
}
62 changes: 62 additions & 0 deletions pkg/blockbuilder/scheduler/offsets_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package scheduler

import (
"context"
"errors"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)

type offsetReader struct {
topic string
consumerGroup string
fallbackOffsetMillis int64

adminClient *kadm.Client
}

func NewOffsetReader(topic, consumerGroup string, lookbackPeriodInMs int64, client *kgo.Client) OffsetReader {
var fallbackOffsetMillis int64
if lookbackPeriodInMs >= 0 {
fallbackOffsetMillis = time.Now().UnixMilli() - lookbackPeriodInMs
} else {
fallbackOffsetMillis = lookbackPeriodInMs
}

return &offsetReader{
topic: topic,
consumerGroup: consumerGroup,
adminClient: kadm.NewClient(client),
fallbackOffsetMillis: fallbackOffsetMillis,
}
}

func (r *offsetReader) GroupLag(ctx context.Context) (map[int32]kadm.GroupMemberLag, error) {
lag, err := GetGroupLag(ctx, r.adminClient, r.topic, r.consumerGroup, r.fallbackOffsetMillis)
if err != nil {
return nil, err
}

offsets, ok := lag[r.topic]
if !ok {
return nil, errors.New("no lag found for the topic")
}

return offsets, nil
}

func (r *offsetReader) ListOffsetsAfterMilli(ctx context.Context, ts int64) (map[int32]kadm.ListedOffset, error) {
offsets, err := r.adminClient.ListOffsetsAfterMilli(ctx, ts, r.topic)
if err != nil {
return nil, err
}

resp, ok := offsets[r.topic]
if !ok {
return nil, errors.New("no offsets found for the topic")
}

return resp, nil
}
19 changes: 19 additions & 0 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ func NewJobQueue() *JobQueue {
}
}

func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

if _, ok := q.inProgress[job.ID]; ok {
return types.JobStatusInProgress, true
}

if _, ok := q.pending[job.ID]; ok {
return types.JobStatusPending, true
}

if _, ok := q.completed[job.ID]; ok {
return types.JobStatusComplete, true
}

return -1, false
}

// Enqueue adds a new job to the pending queue
// This is a naive implementation, intended to be refactored
func (q *JobQueue) Enqueue(job *types.Job) error {
Expand Down
Loading
Loading