Skip to content

Commit

Permalink
Merge #26039
Browse files Browse the repository at this point in the history
26039: compactor: never stop the loop r=bdarnell a=tschottdorf

The compactor had an optimization that would stop the compactor loop when
no work was to be done.

Unfortunately, the condition to probe this was incorrect. Essentially,
whenever *any* work had been done, the loop stopped and would not reawaken
until the next incoming suggestion. In the absence of such a suggestion,
the skipped existing suggestions would never be revisited, and thus never
discarded. This created confusion as it kept the "queued bytes" metric up.

Refactor the code so that instead of stopping, the loop waits for the max
age before running again (except if there are further suggestions).

It's hard to use timers correctly, so this commit should be scrutinized. In
particular, whether it's correct to call `t.Reset` in a select that does
not read from the timer's channel.

The test change makes `make test PKG=./pkg/storage/compactor` fail every
time on my machine before the fix.

I would suggest not backporting this, at least not until there is a
follow-up bugfix that needs backporting and doesn't apply cleanly on top of
this diff.

Fixes #21824.

Release note (bug fix): Expired compaction suggestions are now dropped not
too soon after they expire. Previously, this could be delayed indefinitely.

Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
craig[bot] and tbg committed May 28, 2018
2 parents fd1f514 + c5eeca2 commit 0530ad4
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
63 changes: 39 additions & 24 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -88,21 +87,38 @@ func (c *Compactor) maxAge() time.Duration {
return maxSuggestedCompactionRecordAge.Get(&c.st.SV)
}

// Start launches a compaction processing goroutine and exits when the
// provided stopper indicates. Processing is done with a periodicity of
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) {
// Wake up immediately to examine the queue and set the bytes queued metric.
// poke instructs the compactor's main loop to react to new suggestions in a
// timely manner.
func (c *Compactor) poke() {
select {
// The compactor can already have compactions waiting on it, so don't try to block here.
case c.ch <- struct{}{}:
default:
}
}

// Start launches a compaction processing goroutine and exits when the
// provided stopper indicates. Processing is done with a periodicity of
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, stopper *stop.Stopper) {
ctx = log.WithLogTagStr(ctx, "compactor", "")

// Wake up immediately to examine the queue and set the bytes queued metric.
// Note that the compactor may have received suggestions before having been
// started (this isn't great, but it's how it is right now).
c.poke()

stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
defer timer.Stop()
var timerSet bool

// The above timer will either be on c.minInterval() or c.maxAge(). The
// former applies if we know there are new suggestions waiting to be
// inspected: we want to look at them soon, but also want to make sure
// "related" suggestions arrive before we start compacting. When no new
// suggestions have been made since the last inspection, the expectation
// is that all we have to do is clean up any previously skipped ones (at
// least after sufficient time has passed), and so we wait out the max age.
var isFast bool

for {
select {
Expand All @@ -121,29 +137,28 @@ func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stoppe
break
}
// Set the wait timer if not already set.
if !timerSet {
if !isFast {
isFast = true
timer.Reset(c.minInterval())
timerSet = true
}

case <-timer.C:
timer.Read = true
spanCtx, cleanup := tracing.EnsureContext(ctx, tracer, "process suggested compactions")
ok, err := c.processSuggestions(spanCtx)
ok, err := c.processSuggestions(ctx)
if err != nil {
log.Warningf(spanCtx, "failed processing suggested compactions: %s", err)
log.Warningf(ctx, "failed processing suggested compactions: %s", err)
}
cleanup()
if ok {
// The queue was processed. Wait for the next suggested
// compaction before resetting timer.
timerSet = false
// The queue was processed, so either it's empty or contains suggestions
// that were skipped for now. Revisit when they are certainly expired.
isFast = false
timer.Reset(c.maxAge())
break
}
// Reset the timer to re-attempt processing after the minimum
// compaction interval.
// More work to do, revisit after minInterval. Note that basically
// `ok == (err == nil)` but this refactor is left for a future commit.
isFast = true
timer.Reset(c.minInterval())
timerSet = true
}
}
})
Expand Down Expand Up @@ -186,6 +201,9 @@ func (aggr aggregatedCompaction) String() string {
// older than maxSuggestedCompactionRecordAge. Returns a boolean
// indicating whether the queue was successfully processed.
func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) {
ctx, cleanup := tracing.EnsureContext(ctx, c.st.Tracer, "process suggested compactions")
defer cleanup()

// Collect all suggestions.
var suggestions []storagebase.SuggestedCompaction
var totalBytes int64
Expand Down Expand Up @@ -427,8 +445,5 @@ func (c *Compactor) Suggest(ctx context.Context, sc storagebase.SuggestedCompact

// Poke the compactor goroutine to reconsider compaction in light of
// this new suggested compaction.
select {
case c.ch <- struct{}{}:
default:
}
c.poke()
}
12 changes: 7 additions & 5 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -98,7 +97,7 @@ func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, *int32, fun
doneFn := func(_ context.Context) { atomic.AddInt32(compactionCount, 1) }
st := cluster.MakeTestingClusterSettings()
compactor := NewCompactor(st, eng, capFn, doneFn)
compactor.Start(context.Background(), tracing.NewTracer(), stopper)
compactor.Start(context.Background(), stopper)
return compactor, eng, compactionCount, func() {
stopper.Stop(context.Background())
}
Expand Down Expand Up @@ -557,7 +556,7 @@ func TestCompactorDeadlockOnStart(t *testing.T) {

compactor.ch <- struct{}{}

compactor.Start(context.Background(), tracing.NewTracer(), stopper)
compactor.Start(context.Background(), stopper)
}

// TestCompactorProcessingInitialization verifies that a compactor gets
Expand Down Expand Up @@ -589,7 +588,7 @@ func TestCompactorProcessingInitialization(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
fastCompactor := NewCompactor(st, we, capacityFn, doneFn)
minInterval.Override(&fastCompactor.st.SV, time.Millisecond)
fastCompactor.Start(context.Background(), tracing.NewTracer(), stopper)
fastCompactor.Start(context.Background(), stopper)
defer stopper.Stop(context.Background())

testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -618,7 +617,10 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
}
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
minInterval.Override(&compactor.st.SV, time.Millisecond)
maxSuggestedCompactionRecordAge.Override(&compactor.st.SV, time.Millisecond)
// NB: The compactor had a bug where it would never revisit skipped compactions
// alone when there wasn't also a new suggestion. Making the max age larger
// than the min interval exercises that code path (flakily).
maxSuggestedCompactionRecordAge.Override(&compactor.st.SV, 5*time.Millisecond)
defer cleanup()

// Add a suggested compaction that won't get processed because it's
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {

// Start the storage engine compactor.
if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) {
s.compactor.Start(s.AnnotateCtx(context.Background()), s.Tracer(), s.stopper)
s.compactor.Start(s.AnnotateCtx(context.Background()), s.stopper)
}

// Set the started flag (for unittests).
Expand Down

0 comments on commit 0530ad4

Please sign in to comment.