Skip to content

Commit

Permalink
use CreatedBy instead of workerId
Browse files Browse the repository at this point in the history
  • Loading branch information
alsoba13 committed Jan 22, 2025
1 parent d7634c1 commit 1700a83
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 22 deletions.
20 changes: 6 additions & 14 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"

"github.com/cespare/xxhash/v2"
"github.com/google/uuid"
"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
"github.com/parquet-go/parquet-go"
Expand Down Expand Up @@ -66,7 +65,6 @@ func Compact(
ctx context.Context,
blocks []*metastorev1.BlockMeta,
storage objstore.Bucket,
workerId uuid.UUID,
options ...CompactionOption,
) (m []*metastorev1.BlockMeta, err error) {
c := &compactionConfig{
Expand All @@ -79,7 +77,7 @@ func Compact(
}

objects := ObjectsFromMetas(storage, blocks, c.objectOptions...)
plan, err := PlanCompaction(objects, workerId)
plan, err := PlanCompaction(objects)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,7 +109,7 @@ func Compact(
return compacted, nil
}

func PlanCompaction(objects Objects, workerId uuid.UUID) ([]*CompactionPlan, error) {
func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
if len(objects) == 0 {
// Even if there's just a single object, we still need to rewrite it.
return nil, ErrNoBlocksToMerge
Expand All @@ -138,7 +136,6 @@ func PlanCompaction(objects Objects, workerId uuid.UUID) ([]*CompactionPlan, err
obj.meta.StringTable[s.Tenant],
r.meta.Shard,
level,
workerId,
)
m[obj.meta.StringTable[s.Tenant]] = tm
}
Expand Down Expand Up @@ -170,21 +167,18 @@ type CompactionPlan struct {
meta *metastorev1.BlockMeta
strings *metadata.StringTable
metricsExporter *metrics.Exporter
workerId uuid.UUID
}

func newBlockCompaction(
id string,
tenant string,
shard uint32,
compactionLevel uint32,
workerId uuid.UUID,
) *CompactionPlan {
p := &CompactionPlan{
tenant: tenant,
datasetMap: make(map[int32]*datasetCompaction),
strings: metadata.NewStringTable(),
workerId: workerId,
}
p.path = BuildObjectPath(tenant, shard, compactionLevel, id)
p.meta = &metastorev1.BlockMeta{
Expand Down Expand Up @@ -265,7 +259,6 @@ type datasetCompaction struct {

flushOnce sync.Once

workerId uuid.UUID
metricsRecorder *metrics.Recorder
}

Expand All @@ -285,7 +278,6 @@ func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompac
Size: 0,
Labels: nil,
},
workerId: b.workerId,
}
}

Expand Down Expand Up @@ -344,7 +336,7 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
if m.parent.meta.CompactionLevel == 1 {
recordingTime := int64(ulid.MustParse(m.parent.meta.Id).Time())
rules := metrics.RecordingRulesFromTenant(m.parent.tenant)
pyroscopeInstance := pyroscopeInstanceHash(m.parent.meta.Shard, m.workerId)
pyroscopeInstance := pyroscopeInstanceHash(m.parent.meta.Shard, m.parent.meta.CreatedBy)
m.metricsRecorder = metrics.NewRecorder(rules, recordingTime, pyroscopeInstance)
}

Expand All @@ -369,10 +361,10 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
return nil
}

func pyroscopeInstanceHash(shard uint32, id uuid.UUID) string {
buf := make([]byte, 0, 40)
func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
buf := make([]byte, 0, 8)
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
buf = append(buf, id.String()...)
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/experiment/block/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path/filepath"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
Expand All @@ -27,7 +26,7 @@ func Test_CompactBlocks(t *testing.T) {
require.NoError(t, err)

dst, tempdir := testutil.NewFilesystemBucket(t, ctx, t.TempDir())
compactedBlocks, err := Compact(ctx, resp.Blocks, bucket, uuid.New(),
compactedBlocks, err := Compact(ctx, resp.Blocks, bucket,
WithCompactionDestination(dst),
WithCompactionTempDir(tempdir),
WithCompactionObjectOptions(
Expand Down
6 changes: 1 addition & 5 deletions pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -47,8 +46,6 @@ type Worker struct {
stopped atomic.Bool
closeOnce sync.Once
wg sync.WaitGroup

id uuid.UUID
}

type Config struct {
Expand Down Expand Up @@ -99,7 +96,6 @@ func New(
client: client,
storage: storage,
metrics: newMetrics(reg),
id: uuid.New(),
}
w.threads = config.JobConcurrency
if w.threads < 1 {
Expand Down Expand Up @@ -367,7 +363,7 @@ func (w *Worker) runCompaction(job *compactionJob) {

tempdir := filepath.Join(w.config.TempDir, job.Name)
sourcedir := filepath.Join(tempdir, "source")
compacted, err := block.Compact(ctx, job.blocks, w.storage, w.id,
compacted, err := block.Compact(ctx, job.blocks, w.storage,
block.WithCompactionTempDir(tempdir),
block.WithCompactionObjectOptions(
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),
Expand Down
19 changes: 18 additions & 1 deletion pkg/experiment/metrics/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func RecordingRulesFromTenant(tenant string) []*RecordingRule {
return []*RecordingRule{
{
profileType: "process_cpu:samples:count:cpu:nanoseconds",
metricName: "test_pyroscope_exported",
metricName: "ride_sharing_app_car_cpu_nanoseconds",
matchers: []*labels.Matcher{
{
Type: labels.MatchEqual,
Expand All @@ -31,5 +31,22 @@ func RecordingRulesFromTenant(tenant string) []*RecordingRule {
},
keepLabels: []string{"region"},
},
{
profileType: "process_cpu:samples:count:cpu:nanoseconds",
metricName: "ride_sharing_app_car_all_regions_cpu_nanoseconds",
matchers: []*labels.Matcher{
{
Type: labels.MatchEqual,
Name: "service_name",
Value: "ride-sharing-app",
},
{
Type: labels.MatchEqual,
Name: "vehicle",
Value: "car",
},
},
keepLabels: []string{},
},
}
}

0 comments on commit 1700a83

Please sign in to comment.