Skip to content

Commit

Permalink
fix(blooms): Minor fixes and improvements for testing in dev (#13341)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jun 27, 2024
1 parent 5ae5b31 commit d0f56ee
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 28 deletions.
13 changes: 8 additions & 5 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Builder struct {
logger log.Logger

tsdbStore common.TSDBStore
bloomStore bloomshipper.StoreBase
bloomStore bloomshipper.Store
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient
Expand All @@ -55,20 +55,23 @@ func New(
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.StoreBase,
bloomStore bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)

builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)

tsdbStore, err := common.NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}

metrics := NewMetrics(r, v1.NewMetrics(r))
metrics := NewMetrics(r, bloomStore.BloomMetrics())
b := &Builder{
ID: uuid.NewString(),
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
Expand Down Expand Up @@ -341,7 +344,7 @@ func (b *Builder) processTask(
blocksIter,
b.rwFn,
nil, // TODO(salvacorts): Pass reporter or remove when we address tracking
b.metrics,
b.bloomStore.BloomMetrics(),
logger,
)

Expand Down
12 changes: 11 additions & 1 deletion pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
)
Expand Down Expand Up @@ -86,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, nil, logger, prometheus.DefaultRegisterer)
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
Expand Down Expand Up @@ -240,6 +242,14 @@ func (f fakeLimits) BloomCompactorMaxBloomSize(_ string) int {
panic("implement me")
}

type fakeBloomStore struct {
bloomshipper.Store
}

func (f fakeBloomStore) BloomMetrics() *v1.Metrics {
return nil
}

func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/bloombuild/builder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ const (
)

type Metrics struct {
bloomMetrics *v1.Metrics
running prometheus.Gauge
running prometheus.Gauge

taskStarted prometheus.Counter
taskCompleted *prometheus.CounterVec
Expand All @@ -35,7 +34,6 @@ type Metrics struct {

func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {

Check warning on line 35 in pkg/bloombuild/builder/metrics.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'bloomMetrics' seems to be unused, consider removing or renaming it as _ (revive)
return &Metrics{
bloomMetrics: bloomMetrics,
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Expand Down
14 changes: 7 additions & 7 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SimpleBloomGenerator struct {
// options to build blocks with
opts v1.BlockOptions

metrics *Metrics
metrics *v1.Metrics
logger log.Logger

readWriterFn func() (v1.BlockWriter, v1.BlockReader)
Expand All @@ -70,7 +70,7 @@ func NewSimpleBloomGenerator(
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *Metrics,
metrics *v1.Metrics,
logger log.Logger,
) *SimpleBloomGenerator {
return &SimpleBloomGenerator{
Expand All @@ -92,7 +92,7 @@ func NewSimpleBloomGenerator(
opts.Schema.NGramLen(),
opts.Schema.NGramSkip(),
int(opts.UnencodedBlockOptions.MaxBloomSizeBytes),
metrics.bloomMetrics,
metrics,
),
}
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
metrics *v1.Metrics
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
Expand All @@ -177,7 +177,7 @@ type LazyBlockBuilderIterator struct {
func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
metrics *v1.Metrics,
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
Expand Down Expand Up @@ -214,7 +214,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics)
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics)
writer, reader := b.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
Expand All @@ -229,7 +229,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

b.curr = v1.NewBlock(reader, b.metrics.bloomMetrics)
b.curr = v1.NewBlock(reader, b.metrics)
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf)
},
nil,
NewMetrics(nil, v1.NewMetrics(nil)),
v1.NewMetrics(nil),
log.NewNopLogger(),
)
}
Expand Down
9 changes: 1 addition & 8 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type Metrics struct {
inflightRequests prometheus.Summary
tasksRequeued prometheus.Counter
taskLost prometheus.Counter
tasksFailed prometheus.Counter

buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
Expand Down Expand Up @@ -86,12 +85,6 @@ func NewMetrics(
Name: "tasks_lost_total",
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),
tasksFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tasks_failed_total",
Help: "Total number of tasks that failed to be processed by builders (after the configured retries).",
}),

buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down Expand Up @@ -149,7 +142,7 @@ func NewMetrics(
Subsystem: metricsSubsystem,
Name: "tenant_tasks_completed",
Help: "Number of tasks completed for a tenant during the current build iteration.",
}, []string{"tenant"}),
}, []string{"tenant", "status"}),
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ func (p *Planner) loadTenantWork(
// NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal.
// Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about.
p.metrics.tenantTasksPlanned.WithLabelValues(tenant).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Set(0)

level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor)
}
Expand Down Expand Up @@ -799,7 +800,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.metrics.tasksFailed.Inc()
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusFailure).Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
Expand Down Expand Up @@ -841,7 +842,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant).Inc()
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusSuccess).Inc()

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down

0 comments on commit d0f56ee

Please sign in to comment.