From c5eeca2966d0aa3b449f09a58cf892b1c1fd1f1a Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 28 May 2018 02:44:18 -0400 Subject: [PATCH] compactor: never stop the loop The compactor had an optimization that would stop the compactor loop when no work was to be done. Unfortunately, the condition to probe this was incorrect. Essentially, whenever *any* work had been done, the loop stopped and would not reawaken until the next incoming suggestion. In the absence of such a suggestion, the skipped existing suggestions would never be revisited, and thus never discarded. This created confusion as it kept the "queued bytes" metric up. Refactor the code so that instead of stopping, the loop waits for the max age before running again (except if there are further suggestions). It's hard to use timers correctly, so this commit should be scrutinized. In particular, whether it's correct to call `t.Reset` in a select that does not read from the timer's channel. The test change makes `make test PKG=./pkg/storage/compactor` fail every time on my machine before the fix. I would suggest not backporting this, at least not until there is a follow-up bugfix that needs backporting and doesn't apply cleanly on top of this diff. Fixes #21824. Release note (bug fix): Expired compaction suggestions are now dropped not too soon after they expire. Previously, this could be delayed indefinitely. --- pkg/storage/compactor/compactor.go | 34 ++++++++++++++++--------- pkg/storage/compactor/compactor_test.go | 5 +++- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/pkg/storage/compactor/compactor.go b/pkg/storage/compactor/compactor.go index 795f822a84bd..5e099485561e 100644 --- a/pkg/storage/compactor/compactor.go +++ b/pkg/storage/compactor/compactor.go @@ -87,6 +87,8 @@ func (c *Compactor) maxAge() time.Duration { return maxSuggestedCompactionRecordAge.Get(&c.st.SV) } +// poke instructs the compactor's main loop to react to new suggestions in a +// timely manner. func (c *Compactor) poke() { select { case c.ch <- struct{}{}: @@ -98,6 +100,8 @@ func (c *Compactor) poke() { // provided stopper indicates. Processing is done with a periodicity of // compactionMinInterval, but only if there are compactions pending. func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) { + ctx = log.WithLogTagStr(ctx, "compactor", "") + // Wake up immediately to examine the queue and set the bytes queued metric. // Note that the compactor may have received suggestions before having been // started (this isn't great, but it's how it is right now). @@ -106,9 +110,15 @@ func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) { stopper.RunWorker(ctx, func(ctx context.Context) { var timer timeutil.Timer defer timer.Stop() - var timerSet bool - ctx = log.WithLogTagStr(ctx, "compactor", "") + // The above timer will either be on c.minInterval() or c.maxAge(). The + // former applies if we know there are new suggestions waiting to be + // inspected: we want to look at them soon, but also want to make sure + // "related" suggestions arrive before we start compacting. When no new + // suggestions have been made since the last inspection, the expectation + // is that all we have to do is clean up any previously skipped ones (at + // least after sufficient time has passed), and so we wait out the max age. + var isFast bool for { select { @@ -127,9 +137,9 @@ func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) { break } // Set the wait timer if not already set. - if !timerSet { + if !isFast { + isFast = true timer.Reset(c.minInterval()) - timerSet = true } case <-timer.C: @@ -139,15 +149,16 @@ func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) { log.Warningf(ctx, "failed processing suggested compactions: %s", err) } if ok { - // The queue was processed. Wait for the next suggested - // compaction before resetting timer. - timerSet = false + // The queue was processed, so either it's empty or contains suggestions + // that were skipped for now. Revisit when they are certainly expired. + isFast = false + timer.Reset(c.maxAge()) break } - // Reset the timer to re-attempt processing after the minimum - // compaction interval. + // More work to do, revisit after minInterval. Note that basically + // `ok == (err == nil)` but this refactor is left for a future commit. + isFast = true timer.Reset(c.minInterval()) - timerSet = true } } }) @@ -190,8 +201,7 @@ func (aggr aggregatedCompaction) String() string { // older than maxSuggestedCompactionRecordAge. Returns a boolean // indicating whether the queue was successfully processed. func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) { - var cleanup func() - ctx, cleanup = tracing.EnsureContext(ctx, c.st.Tracer, "process suggested compactions") + ctx, cleanup := tracing.EnsureContext(ctx, c.st.Tracer, "process suggested compactions") defer cleanup() // Collect all suggestions. diff --git a/pkg/storage/compactor/compactor_test.go b/pkg/storage/compactor/compactor_test.go index 984d01c1fcc3..9e777d942882 100644 --- a/pkg/storage/compactor/compactor_test.go +++ b/pkg/storage/compactor/compactor_test.go @@ -617,7 +617,10 @@ func TestCompactorCleansUpOldRecords(t *testing.T) { } compactor, we, compactionCount, cleanup := testSetup(capacityFn) minInterval.Override(&compactor.st.SV, time.Millisecond) - maxSuggestedCompactionRecordAge.Override(&compactor.st.SV, time.Millisecond) + // NB: The compactor had a bug where it would never revisit skipped compactions + // alone when there wasn't also a new suggestion. Making the max age larger + // than the min interval exercises that code path (flakily). + maxSuggestedCompactionRecordAge.Override(&compactor.st.SV, 5*time.Millisecond) defer cleanup() // Add a suggested compaction that won't get processed because it's