Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tenant settings ruler #3944

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ packages:
interfaces:
RaftNodeServiceClient:
RaftNodeServiceServer:
github.com/grafana/pyroscope/pkg/experiment/metrics:
interfaces:
Exporter:
Ruler:
40 changes: 34 additions & 6 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,24 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
}
}

func WithSampleObserver(observer SampleObserver) CompactionOption {
return func(p *compactionConfig) {
p.sampleObserver = observer
}
}

type compactionConfig struct {
objectOptions []ObjectOption
source objstore.BucketReader
destination objstore.Bucket
tempdir string
objectOptions []ObjectOption
source objstore.BucketReader
destination objstore.Bucket
tempdir string
sampleObserver SampleObserver
}

type SampleObserver interface {
// Observe is called before the compactor appends the entry
// to the output block. This method must not modify the entry.
Observe(ProfileEntry)
}

func Compact(
Expand Down Expand Up @@ -89,7 +102,7 @@ func Compact(

compacted := make([]*metastorev1.BlockMeta, 0, len(plan))
for _, p := range plan {
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir)
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver)
if compactionErr != nil {
return nil, compactionErr
}
Expand Down Expand Up @@ -187,7 +200,12 @@ func newBlockCompaction(
return p
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tempdir string) (m *metastorev1.BlockMeta, err error) {
func (b *CompactionPlan) Compact(
ctx context.Context,
dst objstore.Bucket,
tempdir string,
observer SampleObserver,
) (m *metastorev1.BlockMeta, err error) {
w, err := NewBlockWriter(tempdir)
if err != nil {
return nil, fmt.Errorf("creating block writer: %w", err)
Expand All @@ -199,6 +217,7 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tempd
// Datasets are compacted in a strict order.
for i, s := range b.datasets {
b.datasetIndex.setIndex(uint32(i))
s.registerSampleObserver(observer)
if err = s.compact(ctx, w); err != nil {
return nil, fmt.Errorf("compacting block: %w", err)
}
Expand Down Expand Up @@ -284,6 +303,8 @@ type datasetCompaction struct {
profiles uint64

flushOnce sync.Once

observer SampleObserver
}

func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
Expand Down Expand Up @@ -349,6 +370,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error)
return nil
}

func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) {
m.observer = observer
}

func (m *datasetCompaction) open(ctx context.Context, w io.Writer) (err error) {
var estimatedProfileTableSize int64
for _, ds := range m.datasets {
Expand Down Expand Up @@ -416,6 +441,9 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) {
if err = m.symbolsRewriter.rewriteRow(r); err != nil {
return err
}
if m.observer != nil {
m.observer.Observe(r)
}
return m.profilesWriter.writeRow(r)
}

Expand Down
72 changes: 59 additions & 13 deletions pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compactor

import (
"context"
"encoding/binary"
"flag"
"fmt"
"os"
Expand All @@ -13,18 +14,22 @@ import (
"sync/atomic"
"time"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
_ "go.uber.org/automaxprocs"
"golang.org/x/sync/errgroup"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
"github.com/grafana/pyroscope/pkg/experiment/metrics"
"github.com/grafana/pyroscope/pkg/objstore"
"github.com/grafana/pyroscope/pkg/util"
)
Expand All @@ -36,7 +41,7 @@ type Worker struct {
config Config
client MetastoreClient
storage objstore.Bucket
metrics *metrics
metrics *compactionWorkerMetrics

jobs map[string]*compactionJob
queue chan *compactionJob
Expand All @@ -46,14 +51,18 @@ type Worker struct {
stopped atomic.Bool
closeOnce sync.Once
wg sync.WaitGroup

exporter metrics.Exporter
ruler metrics.Ruler
}

type Config struct {
JobConcurrency int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
RequestTimeout time.Duration `yaml:"request_timeout"`
JobConcurrency int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MetricsExporterEnabled bool `yaml:"metrics_exporter_enabled"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -63,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RequestTimeout, prefix+"request-timeout", 5*time.Second, "Job request timeout.")
f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.")
f.StringVar(&cfg.TempDir, prefix+"temp-dir", os.TempDir(), "Temporary directory for compaction jobs.")
f.BoolVar(&cfg.MetricsExporterEnabled, prefix+"metrics-exporter.enabled", false, "This parameter specifies whether the metrics exporter is enabled.")
}

type compactionJob struct {
Expand Down Expand Up @@ -100,18 +110,22 @@ func New(
client MetastoreClient,
storage objstore.Bucket,
reg prometheus.Registerer,
ruler metrics.Ruler,
exporter metrics.Exporter,
) (*Worker, error) {
config.TempDir = filepath.Join(filepath.Clean(config.TempDir), "pyroscope-compactor")
_ = os.RemoveAll(config.TempDir)
if err := os.MkdirAll(config.TempDir, 0o777); err != nil {
return nil, fmt.Errorf("failed to create compactor directory: %w", err)
}
w := &Worker{
config: config,
logger: logger,
client: client,
storage: storage,
metrics: newMetrics(reg),
config: config,
logger: logger,
client: client,
storage: storage,
metrics: newMetrics(reg),
ruler: ruler,
exporter: exporter,
}
w.threads = config.JobConcurrency
if w.threads < 1 {
Expand Down Expand Up @@ -176,6 +190,11 @@ func (w *Worker) running(ctx context.Context) error {
ticker.Stop()
close(stopPolling)
<-pollingDone
// Force exporter to send all staged samples (depends on the implementation)
// Must be a blocking call.
if w.exporter != nil {
w.exporter.Flush()
}
return nil
}

Expand Down Expand Up @@ -394,12 +413,20 @@ func (w *Worker) runCompaction(job *compactionJob) {

tempdir := filepath.Join(w.config.TempDir, job.Name)
sourcedir := filepath.Join(tempdir, "source")
compacted, err := block.Compact(ctx, job.blocks, w.storage,
options := []block.CompactionOption{
block.WithCompactionTempDir(tempdir),
block.WithCompactionObjectOptions(
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),
block.WithObjectDownload(sourcedir),
))
),
}

if observer := w.buildSampleObserver(job.blocks[0]); observer != nil {
defer observer.Close()
options = append(options, block.WithSampleObserver(observer))
}

compacted, err := block.Compact(ctx, job.blocks, w.storage, options...)
defer func() {
if err = os.RemoveAll(tempdir); err != nil {
level.Warn(logger).Log("msg", "failed to remove compaction directory", "path", tempdir, "err", err)
Expand Down Expand Up @@ -458,6 +485,25 @@ func (w *Worker) runCompaction(job *compactionJob) {
_ = deleteGroup.Wait()
}

func (w *Worker) buildSampleObserver(md *metastorev1.BlockMeta) *metrics.SampleObserver {
if !w.config.MetricsExporterEnabled || md.CompactionLevel > 0 {
return nil
}
recordingTime := int64(ulid.MustParse(md.Id).Time())
pyroscopeInstanceLabel := labels.Label{
Name: "pyroscope_instance",
Value: pyroscopeInstanceHash(md.Shard, uint32(md.CreatedBy)),
}
return metrics.NewSampleObserver(recordingTime, w.exporter, w.ruler, pyroscopeInstanceLabel)
}

func pyroscopeInstanceHash(shard uint32, createdBy uint32) string {
buf := make([]byte, 8)
binary.BigEndian.PutUint32(buf[0:4], shard)
binary.BigEndian.PutUint32(buf[4:8], createdBy)
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error {
ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout)
defer cancel()
Expand Down
6 changes: 3 additions & 3 deletions pkg/experiment/compactor/compaction_worker_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"github.com/grafana/pyroscope/pkg/util"
)

type metrics struct {
type compactionWorkerMetrics struct {
jobsInProgress *prometheus.GaugeVec
jobsCompleted *prometheus.CounterVec
jobDuration *prometheus.HistogramVec
timeToCompaction *prometheus.HistogramVec
}

func newMetrics(r prometheus.Registerer) *metrics {
m := &metrics{
func newMetrics(r prometheus.Registerer) *compactionWorkerMetrics {
m := &compactionWorkerMetrics{
jobsInProgress: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "jobs_in_progress",
Help: "The number of active compaction jobs currently running.",
Expand Down
Loading
Loading