diff --git a/compactor/periodic.go b/compactor/periodic.go index 9e83191faf4..9d9164e9c5c 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -61,20 +61,59 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact return t } -// periodDivisor divides Periodic.period in into checkCompactInterval duration -const periodDivisor = 10 +/* +Compaction period 1-hour: + 1. compute compaction period, which is 1-hour + 2. record revisions for every 1/10 of 1-hour (6-minute) + 3. keep recording revisions with no compaction for first 1-hour + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 1-hour (6-minute) + +Compaction period 24-hour: + 1. compute compaction period, which is 1-hour + 2. record revisions for every 1/10 of 1-hour (6-minute) + 3. keep recording revisions with no compaction for first 24-hour + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 1-hour (6-minute) + +Compaction period 59-min: + 1. compute compaction period, which is 59-min + 2. record revisions for every 1/10 of 59-min (5.9-min) + 3. keep recording revisions with no compaction for first 59-min + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 59-min (5.9-min) + +Compaction period 5-sec: + 1. compute compaction period, which is 5-sec + 2. record revisions for every 1/10 of 5-sec (0.5-sec) + 3. keep recording revisions with no compaction for first 5-sec + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec) +*/ // Run runs periodic compactor. func (t *Periodic) Run() { - interval := t.period / time.Duration(periodDivisor) + compactInterval := t.getCompactInterval() + retryInterval := t.getRetryInterval() + retentions := t.getRetentions() + go func() { - initialWait := t.clock.Now() + lastSuccess := t.clock.Now() + baseInterval := t.period for { t.revs = append(t.revs, t.rg.Rev()) + if len(t.revs) > retentions { + t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago + } + select { case <-t.ctx.Done(): return - case <-t.clock.After(interval): + case <-t.clock.After(retryInterval): t.mu.Lock() p := t.paused t.mu.Unlock() @@ -83,30 +122,55 @@ func (t *Periodic) Run() { } } - // wait up to initial given period - if t.clock.Now().Sub(initialWait) < t.period { + if t.clock.Now().Sub(lastSuccess) < baseInterval { continue } - rev, remaining := t.getRev() - if rev < 0 { - continue + // wait up to initial given period + if baseInterval == t.period { + baseInterval = compactInterval } + rev := t.revs[0] plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { - // move to next sliding window - t.revs = remaining + lastSuccess = t.clock.Now() plog.Noticef("Finished auto-compaction at revision %d", rev) } else { plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", interval) + plog.Noticef("Retry after %v", retryInterval) } } }() } +// if given compaction period x is <1-hour, compact every x duration. +// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) +// if given compaction period x is >1-hour, compact every hour. +// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) +func (t *Periodic) getCompactInterval() time.Duration { + itv := t.period + if itv > time.Hour { + itv = time.Hour + } + return itv +} + +func (t *Periodic) getRetentions() int { + return int(t.period/t.getRetryInterval()) + 1 +} + +const retryDivisor = 10 + +func (t *Periodic) getRetryInterval() time.Duration { + itv := t.period + if itv > time.Hour { + itv = time.Hour + } + return itv / retryDivisor +} + // Stop stops periodic compactor. func (t *Periodic) Stop() { t.cancel() @@ -125,11 +189,3 @@ func (t *Periodic) Resume() { defer t.mu.Unlock() t.paused = false } - -func (t *Periodic) getRev() (int64, []int64) { - i := len(t.revs) - periodDivisor - if i < 0 { - return -1, t.revs - } - return t.revs[i], t.revs[i+1:] -} diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index f039a8a7617..21e539e765d 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -25,7 +25,7 @@ import ( "github.com/jonboulle/clockwork" ) -func TestPeriodic(t *testing.T) { +func TestPeriodicHourly(t *testing.T) { retentionHours := 2 retentionDuration := time.Duration(retentionHours) * time.Hour @@ -36,32 +36,94 @@ func TestPeriodic(t *testing.T) { tb.Run() defer tb.Stop() - checkCompactInterval := retentionDuration / time.Duration(periodDivisor) - n := periodDivisor - // simulate 5 hours worth of intervals. - for i := 0; i < n/retentionHours*5; i++ { + + initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 + + // compaction doesn't happen til 2 hours elapse + for i := 0; i < initialIntervals; i++ { rg.Wait(1) - fc.Advance(checkCompactInterval) - // compaction doesn't happen til 2 hours elapses. - if i < n { - continue + fc.Advance(tb.getRetryInterval()) + } + + // very first compaction + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(1) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + // simulate 3 hours + // now compactor kicks in, every hour + for i := 0; i < 3; i++ { + // advance one hour, one revision for each interval + for j := 0; j < intervalsPerPeriod; j++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) } - // after 2 hours, compaction happens at every checkCompactInterval. - a, err := compactable.Wait(1) + + a, err = compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(i + 1 - n) + + expectedRevision = int64((i + 1) * 10) if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } } +} + +func TestPeriodicMinutes(t *testing.T) { + retentionMinutes := 5 + retentionDuration := time.Duration(retentionMinutes) * time.Minute + + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := newPeriodic(fc, retentionDuration, rg, compactable) + + tb.Run() + defer tb.Stop() + + initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 + + // compaction doesn't happen til 5 minutes elapse + for i := 0; i < initialIntervals; i++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } - // unblock the rev getter, so we can stop the compactor routine. - _, err := rg.Wait(1) + // very first compaction + a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } + expectedRevision := int64(1) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + // compaction happens at every interval + for i := 0; i < 5; i++ { + // advance 5-minute, one revision for each interval + for j := 0; j < intervalsPerPeriod; j++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } + + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + + expectedRevision = int64((i + 1) * 10) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + } } func TestPeriodicPause(t *testing.T) { @@ -74,14 +136,14 @@ func TestPeriodicPause(t *testing.T) { tb.Run() tb.Pause() + n := tb.getRetentions() + // tb will collect 3 hours of revisions but not compact since paused - checkCompactInterval := retentionDuration / time.Duration(periodDivisor) - n := periodDivisor - for i := 0; i < 3*n; i++ { + for i := 0; i < n*3; i++ { rg.Wait(1) - fc.Advance(checkCompactInterval) + fc.Advance(tb.getRetryInterval()) } - // tb ends up waiting for the clock + // t.revs = [21 22 23 24 25 26 27 28 29 30] select { case a := <-compactable.Chan(): @@ -91,14 +153,17 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - - // unblock clock, will kick off a compaction at hour 3:06 rg.Wait(1) - fc.Advance(checkCompactInterval) + + // unblock clock, will kick off a compaction at T=3h6m by retry + fc.Advance(tb.getRetryInterval()) + + // T=3h6m a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } + // compact the revision from hour 2:06 wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} if !reflect.DeepEqual(a[0].Params[0], wreq) {