Skip to content

Commit

Permalink
compactor: never stop the loop
Browse files Browse the repository at this point in the history
The compactor had an optimization that would stop the compactor loop
when no work was to be done.

Unfortunately, the condition to probe this was incorrect. Essentially,
whenever *any* work had been done, the loop stopped and would not
reawaken until the next incoming suggestion. In the absence of such a
suggestion, the skipped existing suggestions would never be revisited,
and thus never discarded. This created confusion as it kept the "queued
bytes" metric up.

Refactor the code so that instead of stopping, the loop waits for the
max age before running again (except if there are further suggestions).

It's hard to use timers correctly, so this commit should be scrutinized.
In particular, whether it's correct to call `t.Reset` in a select that
does not read from the timer's channel.

The test change makes `make test PKG=./pkg/storage/compactor` fail
every time on my machine before the fix.

I would suggest not backporting this, at least not until there is a
follow-up bugfix that needs backporting and doesn't apply cleanly on top
of this diff.

Fixes cockroachdb#21824.

Release note (bug fix): Expired compaction suggestions are now dropped
not too soon after they expire. Previously, this could be delayed
indefinitely.
  • Loading branch information
tbg committed May 28, 2018
1 parent 87205b2 commit c5eeca2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
34 changes: 22 additions & 12 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (c *Compactor) maxAge() time.Duration {
return maxSuggestedCompactionRecordAge.Get(&c.st.SV)
}

// poke instructs the compactor's main loop to react to new suggestions in a
// timely manner.
func (c *Compactor) poke() {
select {
case c.ch <- struct{}{}:
Expand All @@ -98,6 +100,8 @@ func (c *Compactor) poke() {
// provided stopper indicates. Processing is done with a periodicity of
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) {
ctx = log.WithLogTagStr(ctx, "compactor", "")

// Wake up immediately to examine the queue and set the bytes queued metric.
// Note that the compactor may have received suggestions before having been
// started (this isn't great, but it's how it is right now).
Expand All @@ -106,9 +110,15 @@ func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) {
stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
defer timer.Stop()
var timerSet bool

ctx = log.WithLogTagStr(ctx, "compactor", "")
// The above timer will either be on c.minInterval() or c.maxAge(). The
// former applies if we know there are new suggestions waiting to be
// inspected: we want to look at them soon, but also want to make sure
// "related" suggestions arrive before we start compacting. When no new
// suggestions have been made since the last inspection, the expectation
// is that all we have to do is clean up any previously skipped ones (at
// least after sufficient time has passed), and so we wait out the max age.
var isFast bool

for {
select {
Expand All @@ -127,9 +137,9 @@ func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) {
break
}
// Set the wait timer if not already set.
if !timerSet {
if !isFast {
isFast = true
timer.Reset(c.minInterval())
timerSet = true
}

case <-timer.C:
Expand All @@ -139,15 +149,16 @@ func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) {
log.Warningf(ctx, "failed processing suggested compactions: %s", err)
}
if ok {
// The queue was processed. Wait for the next suggested
// compaction before resetting timer.
timerSet = false
// The queue was processed, so either it's empty or contains suggestions
// that were skipped for now. Revisit when they are certainly expired.
isFast = false
timer.Reset(c.maxAge())
break
}
// Reset the timer to re-attempt processing after the minimum
// compaction interval.
// More work to do, revisit after minInterval. Note that basically
// `ok == (err == nil)` but this refactor is left for a future commit.
isFast = true
timer.Reset(c.minInterval())
timerSet = true
}
}
})
Expand Down Expand Up @@ -190,8 +201,7 @@ func (aggr aggregatedCompaction) String() string {
// older than maxSuggestedCompactionRecordAge. Returns a boolean
// indicating whether the queue was successfully processed.
func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) {
var cleanup func()
ctx, cleanup = tracing.EnsureContext(ctx, c.st.Tracer, "process suggested compactions")
ctx, cleanup := tracing.EnsureContext(ctx, c.st.Tracer, "process suggested compactions")
defer cleanup()

// Collect all suggestions.
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,10 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
}
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
minInterval.Override(&compactor.st.SV, time.Millisecond)
maxSuggestedCompactionRecordAge.Override(&compactor.st.SV, time.Millisecond)
// NB: The compactor had a bug where it would never revisit skipped compactions
// alone when there wasn't also a new suggestion. Making the max age larger
// than the min interval exercises that code path (flakily).
maxSuggestedCompactionRecordAge.Override(&compactor.st.SV, 5*time.Millisecond)
defer cleanup()

// Add a suggested compaction that won't get processed because it's
Expand Down

0 comments on commit c5eeca2

Please sign in to comment.