Skip to content

Commit

Permalink
feat: compaction staging (#2663)
Browse files Browse the repository at this point in the history
* Compaction staging

* Compaction staging config

* Add help message

* Per-tenant limit
  • Loading branch information
kolesnikovae authored Nov 16, 2023
1 parent 8b93731 commit ce2f102
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 43 deletions.
2 changes: 2 additions & 0 deletions cmd/pyroscope/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ Usage of ./pyroscope:
Minimum time to wait for ring stability at startup. 0 to disable.
-compactor.split-and-merge-shards int
The number of shards to use when splitting blocks. 0 to disable splitting.
-compactor.split-and-merge-stage-size int
Number of stages split shards will be written to. Number of output split shards is controlled by -compactor.split-and-merge-shards.
-compactor.split-groups int
Number of groups that blocks for splitting should be grouped into. Each group of blocks is then split separately. Number of output split shards is controlled by -compactor.split-and-merge-shards. (default 1)
-config.expand-env
Expand Down
2 changes: 2 additions & 0 deletions cmd/pyroscope/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Usage of ./pyroscope:
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-compactor.split-and-merge-shards int
The number of shards to use when splitting blocks. 0 to disable splitting.
-compactor.split-and-merge-stage-size int
Number of stages split shards will be written to. Number of output split shards is controlled by -compactor.split-and-merge-shards.
-compactor.split-groups int
Number of groups that blocks for splitting should be grouped into. Each group of blocks is then split separately. Number of output split shards is controlled by -compactor.split-and-merge-shards. (default 1)
-config.expand-env
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ limits:
# CLI flag: -compactor.split-and-merge-shards
[compactor_split_and_merge_shards: <int> | default = 0]

# Number of stages split shards will be written to. Number of output split
# shards is controlled by -compactor.split-and-merge-shards.
# CLI flag: -compactor.split-and-merge-stage-size
[compactor_split_and_merge_stage_size: <int> | default = 0]

# Number of groups that blocks for splitting should be grouped into. Each
# group of blocks is then split separately. Number of output split shards is
# controlled by -compactor.split-and-merge-shards.
Expand Down
9 changes: 9 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ type mockConfigProvider struct {
splitAndMergeShards map[string]int
instancesShardSize map[string]int
splitGroups map[string]int
splitAndMergeStageSize map[string]int
blockUploadEnabled map[string]bool
blockUploadValidationEnabled map[string]bool
blockUploadMaxBlockSizeBytes map[string]int64
Expand All @@ -1034,6 +1035,7 @@ func newMockConfigProvider() *mockConfigProvider {
userRetentionPeriods: make(map[string]time.Duration),
splitAndMergeShards: make(map[string]int),
splitGroups: make(map[string]int),
splitAndMergeStageSize: make(map[string]int),
blockUploadEnabled: make(map[string]bool),
blockUploadValidationEnabled: make(map[string]bool),
blockUploadMaxBlockSizeBytes: make(map[string]int64),
Expand All @@ -1057,6 +1059,13 @@ func (m *mockConfigProvider) CompactorSplitAndMergeShards(user string) int {
return 0
}

func (m *mockConfigProvider) CompactorSplitAndMergeStageSize(user string) int {
if result, ok := m.splitAndMergeStageSize[user]; ok {
return result
}
return 0
}

func (m *mockConfigProvider) CompactorSplitGroups(user string) int {
if result, ok := m.splitGroups[user]; ok {
return result
Expand Down
10 changes: 5 additions & 5 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ type Compactor interface {
// CompactWithSplitting merges and splits the source blocks into shardCount number of compacted blocks,
// and returns slice of block IDs.
// If given compacted block has no series, corresponding block ID will not be returned.
CompactWithSplitting(ctx context.Context, dst string, dirs []string, shardCount uint64) (result []ulid.ULID, _ error)
CompactWithSplitting(ctx context.Context, dst string, dirs []string, shardCount, stageSize uint64) (result []ulid.ULID, _ error)
}

const (
Expand Down Expand Up @@ -316,7 +316,7 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
return m
}

func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string, dirs []string, shardCount uint64) ([]ulid.ULID, error) {
func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string, dirs []string, shardCount, stageSize uint64) ([]ulid.ULID, error) {
defer func() {
if err := recover(); err != nil {
level.Error(c.logger).Log("msg", "panic during compaction", "err", err, "dirs", strings.Join(dirs, ","))
Expand Down Expand Up @@ -383,7 +383,7 @@ func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string,
c.metrics.Ran.WithLabelValues(fmt.Sprintf("%d", currentLevel)).Inc()
c.metrics.Split.WithLabelValues(fmt.Sprintf("%d", currentLevel)).Observe(float64(shardCount))

metas, err := phlaredb.CompactWithSplitting(ctx, readers, shardCount, dest, c.splitBy)
metas, err := phlaredb.CompactWithSplitting(ctx, readers, shardCount, stageSize, dest, c.splitBy)
if err != nil {
return nil, errors.Wrapf(err, "compact blocks %v", dirs)
}
Expand Down Expand Up @@ -475,9 +475,9 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
compactionBegin := time.Now()

if job.UseSplitting() {
compIDs, err = c.comp.CompactWithSplitting(ctx, subDir, blocksToCompactDirs, uint64(job.SplittingShards()))
compIDs, err = c.comp.CompactWithSplitting(ctx, subDir, blocksToCompactDirs, uint64(job.SplittingShards()), uint64(job.SplitStageSize()))
} else {
compIDs, err = c.comp.CompactWithSplitting(ctx, subDir, blocksToCompactDirs, 1)
compIDs, err = c.comp.CompactWithSplitting(ctx, subDir, blocksToCompactDirs, 1, 0)
}
if err != nil {
return false, nil, errors.Wrapf(err, "compact blocks %v", blocksToCompactDirs)
Expand Down
6 changes: 3 additions & 3 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
require.NoError(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewSplitAndMergeGrouper("user-1", []int64{2 * time.Hour.Milliseconds()}, 0, 0, log.NewNopLogger())
grouper := NewSplitAndMergeGrouper("user-1", []int64{2 * time.Hour.Milliseconds()}, 0, 0, 0, log.NewNopLogger())
groups, err := grouper.Groups(sy.Metas())
require.NoError(t, err)

Expand Down Expand Up @@ -226,11 +226,11 @@ func TestGroupCompactE2E(t *testing.T) {
require.NoError(t, err)

planner := NewSplitAndMergePlanner([]int64{1000, 3000})
grouper := NewSplitAndMergeGrouper("user-1", []int64{1000, 3000}, 0, 0, logger)
grouper := NewSplitAndMergeGrouper("user-1", []int64{1000, 3000}, 0, 0, 0, logger)
metrics := NewBucketCompactorMetrics(blocksMarkedForDeletion, prometheus.NewPedanticRegistry())
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, &BlockCompactor{
splitBy: phlaredb.SplitByFingerprint,
blockOpenConcurrency: 100,
splitBy: phlaredb.SplitByFingerprint,
logger: logger,
metrics: newCompactorMetrics(nil),
}, dir, userbkt, 2, ownAllJobs, sortJobsByNewestBlocksFirst, 0, 4, metrics)
Expand Down
12 changes: 6 additions & 6 deletions pkg/compactor/bucket_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ func TestGroupMaxMinTime(t *testing.T) {
func TestBucketCompactor_FilterOwnJobs(t *testing.T) {
jobsFn := func() []*Job {
return []*Job{
NewJob("user", "key1", labels.EmptyLabels(), 0, false, 0, ""),
NewJob("user", "key2", labels.EmptyLabels(), 0, false, 0, ""),
NewJob("user", "key3", labels.EmptyLabels(), 0, false, 0, ""),
NewJob("user", "key4", labels.EmptyLabels(), 0, false, 0, ""),
NewJob("user", "key1", labels.EmptyLabels(), 0, false, 0, 0, ""),
NewJob("user", "key2", labels.EmptyLabels(), 0, false, 0, 0, ""),
NewJob("user", "key3", labels.EmptyLabels(), 0, false, 0, 0, ""),
NewJob("user", "key4", labels.EmptyLabels(), 0, false, 0, 0, ""),
}
}

Expand Down Expand Up @@ -134,13 +134,13 @@ func TestBucketCompactor_FilterOwnJobs(t *testing.T) {
}

func TestBlockMaxTimeDeltas(t *testing.T) {
j1 := NewJob("user", "key1", labels.EmptyLabels(), 0, false, 0, "")
j1 := NewJob("user", "key1", labels.EmptyLabels(), 0, false, 0, 0, "")
require.NoError(t, j1.AppendMeta(&block.Meta{
MinTime: 1500002700159,
MaxTime: 1500002800159,
}))

j2 := NewJob("user", "key2", labels.EmptyLabels(), 0, false, 0, "")
j2 := NewJob("user", "key2", labels.EmptyLabels(), 0, false, 0, 0, "")
require.NoError(t, j2.AppendMeta(&block.Meta{
MinTime: 1500002600159,
MaxTime: 1500002700159,
Expand Down
3 changes: 3 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ type ConfigProvider interface {
// CompactorSplitAndMergeShards returns the number of shards to use when splitting blocks.
CompactorSplitAndMergeShards(userID string) int

// CompactorSplitAndMergeStageSize returns the number of stages split shards will be written to.
CompactorSplitAndMergeStageSize(userID string) int

// CompactorSplitGroups returns the number of groups that blocks used for splitting should
// be grouped into. Different groups are then split by different jobs.
CompactorSplitGroups(userID string) int
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1703,8 +1703,8 @@ type blockCompactorMock struct {
mock.Mock
}

func (m *blockCompactorMock) CompactWithSplitting(ctx context.Context, dest string, dirs []string, shardCount uint64) (result []ulid.ULID, _ error) {
args := m.Called(ctx, dest, dirs, shardCount)
func (m *blockCompactorMock) CompactWithSplitting(ctx context.Context, dest string, dirs []string, shardCount, stageSize uint64) (result []ulid.ULID, _ error) {
args := m.Called(ctx, dest, dirs, shardCount, stageSize)
return args.Get(0).([]ulid.ULID), args.Error(1)
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/compactor/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,19 @@ type Job struct {

// The number of shards to split compacted block into. Not used if splitting is disabled.
splitNumShards uint32
splitStageSize uint32
}

// NewJob returns a new compaction Job.
func NewJob(userID string, key string, lset labels.Labels, resolution int64, useSplitting bool, splitNumShards uint32, shardingKey string) *Job {
func NewJob(userID string, key string, lset labels.Labels, resolution int64, useSplitting bool, splitNumShards, splitStageSize uint32, shardingKey string) *Job {
return &Job{
userID: userID,
key: key,
labels: lset,
resolution: resolution,
useSplitting: useSplitting,
splitNumShards: splitNumShards,
splitStageSize: splitStageSize,
shardingKey: shardingKey,
}
}
Expand Down Expand Up @@ -145,6 +147,11 @@ func (job *Job) SplittingShards() uint32 {
return job.splitNumShards
}

// SplitStageSize returns the number of stages split shards will be written to.
func (job *Job) SplitStageSize() uint32 {
return job.splitStageSize
}

// ShardingKey returns the key used to shard this job across multiple instances.
func (job *Job) ShardingKey() string {
return job.shardingKey
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func TestJob_MinCompactionLevel(t *testing.T) {
job := NewJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, "shard-1")
job := NewJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, 0, "shard-1")
require.NoError(t, job.AppendMeta(&block.Meta{ULID: ulid.MustNew(1, nil), Compaction: block.BlockMetaCompaction{Level: 2}}))
assert.Equal(t, 2, job.MinCompactionLevel())

Expand Down Expand Up @@ -108,7 +108,7 @@ func TestJobWaitPeriodElapsed(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
job := NewJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, "shard-1")
job := NewJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, 0, "shard-1")
for _, b := range testData.jobBlocks {
require.NoError(t, job.AppendMeta(b.meta))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/compactor/split_merge_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ func splitAndMergeGrouperFactory(_ context.Context, cfg Config, cfgProvider Conf
userID,
cfg.BlockRanges.ToMilliseconds(),
uint32(cfgProvider.CompactorSplitAndMergeShards(userID)),
uint32(cfgProvider.CompactorSplitAndMergeStageSize(userID)),
uint32(cfgProvider.CompactorSplitGroups(userID)),
logger)
}

func splitAndMergeCompactorFactory(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (Compactor, Planner, error) {
func splitAndMergeCompactorFactory(_ context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (Compactor, Planner, error) {
splitBy := getCompactionSplitBy(cfg.CompactionSplitBy)
if splitBy == nil {
return nil, nil, errInvalidCompactionSplitBy
Expand Down
6 changes: 6 additions & 0 deletions pkg/compactor/split_merge_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type SplitAndMergeGrouper struct {
// Number of shards to split source blocks into.
shardCount uint32

// Number of stages to split shards into.
splitStageSize uint32

// Number of groups that blocks used for splitting are grouped into.
splitGroupsCount uint32
}
Expand All @@ -38,13 +41,15 @@ func NewSplitAndMergeGrouper(
userID string,
ranges []int64,
shardCount uint32,
splitStageSize uint32,
splitGroupsCount uint32,
logger log.Logger,
) *SplitAndMergeGrouper {
return &SplitAndMergeGrouper{
userID: userID,
ranges: ranges,
shardCount: shardCount,
splitStageSize: splitStageSize,
splitGroupsCount: splitGroupsCount,
logger: logger,
}
Expand Down Expand Up @@ -83,6 +88,7 @@ func (g *SplitAndMergeGrouper) Groups(blocks map[ulid.ULID]*block.Meta) (res []*
resolution,
job.stage == stageSplit,
g.shardCount,
g.splitStageSize,
job.shardingKey(),
)

Expand Down
Loading

0 comments on commit ce2f102

Please sign in to comment.