Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add more structured logging supports #9644

Merged
merged 5 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

"github.com/coreos/pkg/capnslog"
"github.com/jonboulle/clockwork"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -54,12 +56,19 @@ type RevGetter interface {
Rev() int64
}

func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
// New returns a new Compactor based on given "mode".
func New(
lg *zap.Logger,
mode string,
retention time.Duration,
rg RevGetter,
c Compactable,
) (Compactor, error) {
switch mode {
case ModePeriodic:
return NewPeriodic(retention, rg, c), nil
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
case ModeRevision:
return NewRevision(int64(retention), rg, c), nil
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
default:
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
}
Expand Down
124 changes: 75 additions & 49 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"github.com/coreos/etcd/mvcc"

"github.com/jonboulle/clockwork"
"go.uber.org/zap"
)

// Periodic compacts the log by purging revisions older than
// the configured retention time.
type Periodic struct {
lg *zap.Logger
clock clockwork.Clock
period time.Duration

Expand All @@ -43,22 +45,19 @@ type Periodic struct {
paused bool
}

// NewPeriodic creates a new instance of Periodic compactor that purges
// newPeriodic creates a new instance of Periodic compactor that purges
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return newPeriodic(clockwork.NewRealClock(), h, rg, c)
}

func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
t := &Periodic{
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
pc := &Periodic{
lg: lg,
clock: clock,
period: h,
rg: rg,
c: c,
revs: make([]int64, 0),
}
t.ctx, t.cancel = context.WithCancel(context.Background())
return t
pc.ctx, pc.cancel = context.WithCancel(context.Background())
return pc
}

/*
Expand Down Expand Up @@ -96,50 +95,77 @@ Compaction period 5-sec:
*/

// Run runs periodic compactor.
func (t *Periodic) Run() {
compactInterval := t.getCompactInterval()
retryInterval := t.getRetryInterval()
retentions := t.getRetentions()
func (pc *Periodic) Run() {
compactInterval := pc.getCompactInterval()
retryInterval := pc.getRetryInterval()
retentions := pc.getRetentions()

go func() {
lastSuccess := t.clock.Now()
baseInterval := t.period
lastSuccess := pc.clock.Now()
baseInterval := pc.period
for {
t.revs = append(t.revs, t.rg.Rev())
if len(t.revs) > retentions {
t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
pc.revs = append(pc.revs, pc.rg.Rev())
if len(pc.revs) > retentions {
pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago
}

select {
case <-t.ctx.Done():
case <-pc.ctx.Done():
return
case <-t.clock.After(retryInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
case <-pc.clock.After(retryInterval):
pc.mu.Lock()
p := pc.paused
pc.mu.Unlock()
if p {
continue
}
}

if t.clock.Now().Sub(lastSuccess) < baseInterval {
if pc.clock.Now().Sub(lastSuccess) < baseInterval {
continue
}

// wait up to initial given period
if baseInterval == t.period {
if baseInterval == pc.period {
baseInterval = compactInterval
}
rev := t.revs[0]

plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
rev := pc.revs[0]

if pc.lg != nil {
pc.lg.Info(
"starting auto periodic compaction",
zap.Int64("revision", rev),
zap.Duration("compact-period", pc.period),
)
} else {
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period)
}
_, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
lastSuccess = t.clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
if pc.lg != nil {
pc.lg.Info(
"completed auto periodic compaction",
zap.Int64("revision", rev),
zap.Duration("compact-period", pc.period),
zap.Duration("took", time.Since(lastSuccess)),
)
} else {
plog.Noticef("Finished auto-compaction at revision %d", rev)
}
lastSuccess = pc.clock.Now()
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", retryInterval)
if pc.lg != nil {
pc.lg.Warn(
"failed auto periodic compaction",
zap.Int64("revision", rev),
zap.Duration("compact-period", pc.period),
zap.Duration("retry-interval", retryInterval),
zap.Error(err),
)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", retryInterval)
}
}
}
}()
Expand All @@ -149,43 +175,43 @@ func (t *Periodic) Run() {
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
// if given compaction period x is >1-hour, compact every hour.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
func (t *Periodic) getCompactInterval() time.Duration {
itv := t.period
func (pc *Periodic) getCompactInterval() time.Duration {
itv := pc.period
if itv > time.Hour {
itv = time.Hour
}
return itv
}

func (t *Periodic) getRetentions() int {
return int(t.period/t.getRetryInterval()) + 1
func (pc *Periodic) getRetentions() int {
return int(pc.period/pc.getRetryInterval()) + 1
}

const retryDivisor = 10

func (t *Periodic) getRetryInterval() time.Duration {
itv := t.period
func (pc *Periodic) getRetryInterval() time.Duration {
itv := pc.period
if itv > time.Hour {
itv = time.Hour
}
return itv / retryDivisor
}

// Stop stops periodic compactor.
func (t *Periodic) Stop() {
t.cancel()
func (pc *Periodic) Stop() {
pc.cancel()
}

// Pause pauses periodic compactor.
func (t *Periodic) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
func (pc *Periodic) Pause() {
pc.mu.Lock()
pc.paused = true
pc.mu.Unlock()
}

// Resume resumes periodic compactor.
func (t *Periodic) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = false
func (pc *Periodic) Resume() {
pc.mu.Lock()
pc.paused = false
pc.mu.Unlock()
}
7 changes: 4 additions & 3 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/coreos/etcd/pkg/testutil"

"github.com/jonboulle/clockwork"
"go.uber.org/zap"
)

func TestPeriodicHourly(t *testing.T) {
Expand All @@ -32,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)
tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)
tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestPeriodicPause(t *testing.T) {
retentionDuration := time.Hour
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)
tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)

tb.Run()
tb.Pause()
Expand Down
Loading