Skip to content

Commit

Permalink
refactor(blooms): Limit task retries in bloom planner (#13139)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jun 5, 2024
1 parent 30df31e commit f788726
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 12 deletions.
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3439,6 +3439,12 @@ shard_streams:
# CLI flag: -bloom-build.builder-response-timeout
[bloom_build_builder_response_timeout: <duration> | default = 0s]

# Experimental. Maximum number of retries for a failed task. If a task fails
# more than this number of times, it is considered failed and will not be
# retried. A value of 0 disables this limit.
# CLI flag: -bloom-build.task-max-retries
[bloom_build_task_max_retries: <int> | default = 3]

# Experimental. Length of the n-grams created when computing blooms from log
# lines.
# CLI flag: -bloom-compactor.ngram-length
Expand Down
1 change: 1 addition & 0 deletions pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Limits interface {
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
BloomTaskMaxRetries(tenantID string) int
}

type QueueLimits struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Metrics struct {
inflightRequests prometheus.Summary
tasksRequeued prometheus.Counter
taskLost prometheus.Counter
tasksFailed prometheus.Counter

buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
Expand Down Expand Up @@ -79,6 +80,12 @@ func NewMetrics(
Name: "tasks_lost_total",
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),
tasksFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tasks_failed_total",
Help: "Total number of tasks that failed to be processed by builders (after the configured retries).",
}),

buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down
15 changes: 15 additions & 0 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (p *Planner) totalPendingTasks() (total int) {
func (p *Planner) enqueueTask(task *Task) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
task.timesEnqueued++
p.addPendingTask(task)
})
}
Expand Down Expand Up @@ -582,9 +583,23 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
}

if err := p.forwardTaskToBuilder(builder, builderID, task); err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && task.timesEnqueued >= maxRetries {
p.metrics.tasksFailed.Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued,
"maxRetries", maxRetries,
"err", err,
)
continue
}

// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
p.removePendingTask(task)
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
continue
}
Expand Down
46 changes: 36 additions & 10 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ func Test_BuilderLoop(t *testing.T) {
expectedBuilderLoopError error

// modifyBuilder should leave the builder in a state where it will not return or return an error
modifyBuilder func(builder *fakeBuilder)
modifyBuilder func(builder *fakeBuilder)
shouldConsumeAfterModify bool

// resetBuilder should reset the builder to a state where it will return no errors
resetBuilder func(builder *fakeBuilder)
}{
Expand Down Expand Up @@ -434,6 +436,15 @@ func Test_BuilderLoop(t *testing.T) {
builder.SetReturnErrorMsg(false)
},
},
{
name: "exceed max retries",
limits: &fakeLimits{maxRetries: 1},
expectedBuilderLoopError: errPlannerIsNotRunning,
modifyBuilder: func(builder *fakeBuilder) {
builder.SetReturnError(true)
},
shouldConsumeAfterModify: true,
},
{
name: "timeout",
limits: &fakeLimits{
Expand Down Expand Up @@ -518,14 +529,24 @@ func Test_BuilderLoop(t *testing.T) {
require.NoError(t, err)
}

// Tasks should not be consumed
require.Neverf(
t, func() bool {
return planner.totalPendingTasks() == 0
},
5*time.Second, 10*time.Millisecond,
"all tasks were consumed but they should not be",
)
if tc.shouldConsumeAfterModify {
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
)
} else {
require.Neverf(
t, func() bool {
return planner.totalPendingTasks() == 0
},
5*time.Second, 10*time.Millisecond,
"all tasks were consumed but they should not be",
)
}

}

if tc.resetBuilder != nil {
Expand Down Expand Up @@ -655,7 +676,8 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

type fakeLimits struct {
timeout time.Duration
timeout time.Duration
maxRetries int
}

func (f *fakeLimits) BuilderResponseTimeout(_ string) time.Duration {
Expand All @@ -674,6 +696,10 @@ func (f *fakeLimits) BloomBuildMaxBuilders(_ string) int {
return 0
}

func (f *fakeLimits) BloomTaskMaxRetries(_ string) int {
return f.maxRetries
}

func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloombuild/planner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type Task struct {
*protos.Task

// Tracking
queueTime time.Time
ctx context.Context
timesEnqueued int
queueTime time.Time
ctx context.Context
}

func NewTask(ctx context.Context, queueTime time.Time, task *protos.Task) *Task {
Expand Down
6 changes: 6 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ type Limits struct {
BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"`
BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"`
BuilderResponseTimeout time.Duration `yaml:"bloom_build_builder_response_timeout" json:"bloom_build_builder_response_timeout" category:"experimental"`
BloomTaskMaxRetries int `yaml:"bloom_build_task_max_retries" json:"bloom_build_task_max_retries" category:"experimental"`

BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"`
Expand Down Expand Up @@ -391,6 +392,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.")
f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.")
f.DurationVar(&l.BuilderResponseTimeout, "bloom-build.builder-response-timeout", 0, "Experimental. Timeout for a builder to finish a task. If a builder does not respond within this time, it is considered failed and the task will be requeued. 0 disables the timeout.")
f.IntVar(&l.BloomTaskMaxRetries, "bloom-build.task-max-retries", 3, "Experimental. Maximum number of retries for a failed task. If a task fails more than this number of times, it is considered failed and will not be retried. A value of 0 disables this limit.")

_ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize)
f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size",
Expand Down Expand Up @@ -1005,6 +1007,10 @@ func (o *Overrides) BuilderResponseTimeout(userID string) time.Duration {
return o.getOverridesForUser(userID).BuilderResponseTimeout
}

func (o *Overrides) BloomTaskMaxRetries(userID string) int {
return o.getOverridesForUser(userID).BloomTaskMaxRetries
}

func (o *Overrides) BloomNGramLength(userID string) int {
return o.getOverridesForUser(userID).BloomNGramLength
}
Expand Down

0 comments on commit f788726

Please sign in to comment.