Skip to content

Commit

Permalink
[aggregator] CanLead for unflushed window takes BufferPast into accou…
Browse files Browse the repository at this point in the history
…nt (#3328)

* [aggregator] CanLead for unflushed window takes BufferPast into account

* Lint
  • Loading branch information
linasm authored Mar 5, 2021
1 parent 7038cda commit 397b095
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 105 deletions.
20 changes: 20 additions & 0 deletions src/aggregator/aggregator/flush_mgr_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ type FlushManagerOptions interface {

// ForcedFlushWindowSize returns the window size for a forced flush.
ForcedFlushWindowSize() time.Duration

// SetBufferForPastTimedMetric sets the size of the buffer for timed metrics in the past.
SetBufferForPastTimedMetric(value time.Duration) FlushManagerOptions

// BufferForPastTimedMetric returns the size of the buffer for timed metrics in the past.
BufferForPastTimedMetric() time.Duration
}

type flushManagerOptions struct {
Expand All @@ -137,6 +143,8 @@ type flushManagerOptions struct {
flushTimesPersistEvery time.Duration
maxBufferSize time.Duration
forcedFlushWindowSize time.Duration

bufferForPastTimedMetric time.Duration
}

// NewFlushManagerOptions create a new set of flush manager options.
Expand All @@ -152,6 +160,8 @@ func NewFlushManagerOptions() FlushManagerOptions {
flushTimesPersistEvery: defaultFlushTimesPersistEvery,
maxBufferSize: defaultMaxBufferSize,
forcedFlushWindowSize: defaultForcedFlushWindowSize,

bufferForPastTimedMetric: defaultTimedMetricBuffer,
}
}

Expand Down Expand Up @@ -274,3 +284,13 @@ func (o *flushManagerOptions) SetForcedFlushWindowSize(value time.Duration) Flus
func (o *flushManagerOptions) ForcedFlushWindowSize() time.Duration {
return o.forcedFlushWindowSize
}

func (o *flushManagerOptions) SetBufferForPastTimedMetric(value time.Duration) FlushManagerOptions {
opts := *o
opts.bufferForPastTimedMetric = value
return &opts
}

func (o *flushManagerOptions) BufferForPastTimedMetric() time.Duration {
return o.bufferForPastTimedMetric
}
70 changes: 38 additions & 32 deletions src/aggregator/aggregator/follower_flush_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type followerFlushManager struct {
flushTask *followerFlushTask
sleepFn sleepFn
metrics followerFlushManagerMetrics

bufferForPastTimedMetric time.Duration
}

func newFollowerFlushManager(
Expand All @@ -133,22 +135,23 @@ func newFollowerFlushManager(
instrumentOpts := opts.InstrumentOptions()
scope := instrumentOpts.MetricsScope()
mgr := &followerFlushManager{
nowFn: nowFn,
checkEvery: opts.CheckEvery(),
workers: opts.WorkerPool(),
placementManager: opts.PlacementManager(),
electionManager: opts.ElectionManager(),
flushTimesManager: opts.FlushTimesManager(),
maxBufferSize: opts.MaxBufferSize(),
forcedFlushWindowSize: opts.ForcedFlushWindowSize(),
logger: instrumentOpts.Logger(),
scope: scope,
doneCh: doneCh,
flushTimesState: flushTimesUninitialized,
flushMode: unknownFollowerFlush,
lastFlushed: nowFn(),
sleepFn: time.Sleep,
metrics: newFollowerFlushManagerMetrics(scope),
nowFn: nowFn,
checkEvery: opts.CheckEvery(),
workers: opts.WorkerPool(),
placementManager: opts.PlacementManager(),
electionManager: opts.ElectionManager(),
flushTimesManager: opts.FlushTimesManager(),
maxBufferSize: opts.MaxBufferSize(),
forcedFlushWindowSize: opts.ForcedFlushWindowSize(),
bufferForPastTimedMetric: opts.BufferForPastTimedMetric(),
logger: instrumentOpts.Logger(),
scope: scope,
doneCh: doneCh,
flushTimesState: flushTimesUninitialized,
flushMode: unknownFollowerFlush,
lastFlushed: nowFn(),
sleepFn: time.Sleep,
metrics: newFollowerFlushManagerMetrics(scope),
}
mgr.flushTask = &followerFlushTask{mgr: mgr}
return mgr
Expand Down Expand Up @@ -215,12 +218,7 @@ func (mgr *followerFlushManager) OnBucketAdded(int, *flushBucket) {}

// NB(xichen): The follower flush manager flushes data based on the flush times
// stored in kv and does not need to take extra actions when a new flusher is added.
func (mgr *followerFlushManager) OnFlusherAdded(
bucketIdx int,
bucket *flushBucket,
flusher flushingMetricList,
) {
}
func (mgr *followerFlushManager) OnFlusherAdded(int, *flushBucket, flushingMetricList) {}

// The follower flush manager may only lead if and only if all the following conditions
// are met:
Expand Down Expand Up @@ -271,7 +269,6 @@ func (mgr *followerFlushManager) CanLead() bool {
now,
int(shardID),
shardFlushTimes.ForwardedByResolution,
mgr.metrics.forwarded,
) {
return false
}
Expand All @@ -294,10 +291,10 @@ func (mgr *followerFlushManager) canLead(
zap.Time("now", now),
zap.Stringer("windowSize", windowSize),
zap.Stringer("flusherType", flusherType),
zap.Int("shardID", int(shardID)))
zap.Int("shardID", shardID))
mgr.metrics.forwarded.windowNeverFlushed.Inc(1)

if mgr.canLeadNotFlushed(now, windowSize) {
if mgr.canLeadNotFlushed(now, windowSize, flusherType) {
continue
} else {
return false
Expand All @@ -320,15 +317,14 @@ func (mgr *followerFlushManager) canLeadForwarded(
now time.Time,
shardID int,
flushTimes map[int64]*schema.ForwardedFlushTimesForResolution,
metrics forwardedFollowerFlusherMetrics,
) bool {
// Check that the forwarded metrics have been flushed past the process start
// time, meaning the forwarded metrics that didn't make to the process have been
// flushed successfully downstream.
for windowNanos, fbr := range flushTimes {
if fbr == nil {
mgr.logger.Warn("ForwardedByResolution is nil",
zap.Int("shardID", int(shardID)))
zap.Int("shardID", shardID))
mgr.metrics.forwarded.nilForwardedTimes.Inc(1)
return false
}
Expand All @@ -348,11 +344,11 @@ func (mgr *followerFlushManager) canLeadForwarded(
zap.Time("now", now),
zap.Stringer("windowSize", windowSize),
zap.Stringer("flusherType", forwardedMetricListType),
zap.Int("shardID", int(shardID)),
zap.Int("shardID", shardID),
zap.Int32("numForwardedTimes", numForwardedTimes))
mgr.metrics.forwarded.windowNeverFlushed.Inc(1)

if mgr.canLeadNotFlushed(now, windowSize) {
if mgr.canLeadNotFlushed(now, windowSize, forwardedMetricListType) {
continue
} else {
return false
Expand All @@ -371,10 +367,20 @@ func (mgr *followerFlushManager) canLeadForwarded(

// canLeadNotFlushed determines whether the follower can takeover leadership
// for the window that was not (yet) flushed by leader.
// This is case is possible when leader encounters a metric at some resolution
// This case is possible when leader encounters a metric at some resolution
// window for the first time in the shard it owns.
func (mgr *followerFlushManager) canLeadNotFlushed(now time.Time, windowSize time.Duration) bool {
windowStartAt := now.Truncate(windowSize)
func (mgr *followerFlushManager) canLeadNotFlushed(
now time.Time,
windowSize time.Duration,
flusherType metricListType,
) bool {
adjustedNow := now
if flusherType == timedMetricListType {
adjustedNow = adjustedNow.Add(-mgr.bufferForPastTimedMetric)
}

windowStartAt := adjustedNow.Truncate(windowSize)

return mgr.openedAt.Before(windowStartAt)
}

Expand Down
122 changes: 80 additions & 42 deletions src/aggregator/aggregator/follower_flush_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func TestFollowerFlushManagerOpen(t *testing.T) {
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
mgr.Open()

watchable.Update(testFlushTimes)
require.NoError(t, watchable.Update(testFlushTimes))

for {
mgr.RLock()
state := mgr.flushTimesState
Expand Down Expand Up @@ -139,12 +140,12 @@ func TestFollowerFlushManagerCanLeadNotFlushed(t *testing.T) {
window1m := time.Minute
testFlushTimes := &schema.ShardSetFlushTimes{
ByShard: map[uint32]*schema.ShardFlushTimes{
123: &schema.ShardFlushTimes{
123: {
StandardByResolution: map[int64]int64{
window10m.Nanoseconds(): 0,
},
ForwardedByResolution: map[int64]*schema.ForwardedFlushTimesForResolution{
window10m.Nanoseconds(): &schema.ForwardedFlushTimesForResolution{
window10m.Nanoseconds(): {
ByNumForwardedTimes: map[int32]int64{
1: 0,
},
Expand All @@ -154,53 +155,25 @@ func TestFollowerFlushManagerCanLeadNotFlushed(t *testing.T) {
},
}

runTestFn := func(
t *testing.T,
flushTimes *schema.ShardSetFlushTimes,
followerOpenedAt time.Time,
expectedCanLead bool,
) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true).AnyTimes()
clockOpts := clock.NewOptions().SetNowFn(func() time.Time {
return now
})
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetClockOptions(clockOpts)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)

mgr.processed = flushTimes
mgr.openedAt = followerOpenedAt
require.Equal(t, expectedCanLead, mgr.CanLead())
}

t.Run("opened_on_the_window_start", func(t *testing.T) {
followerOpenedAt := now.Truncate(window10m)
expectedCanLead := false
runTestFn(t, testFlushTimes, followerOpenedAt, expectedCanLead)
testCanLeadNotFlushed(t, testFlushTimes, now, followerOpenedAt, 0, false)
})

t.Run("opened_after_the_window_start", func(t *testing.T) {
followerOpenedAt := now.Truncate(window10m).Add(1 * time.Second)
expectedCanLead := false
runTestFn(t, testFlushTimes, followerOpenedAt, expectedCanLead)
testCanLeadNotFlushed(t, testFlushTimes, now, followerOpenedAt, 0, false)
})

t.Run("opened_before_the_window_start", func(t *testing.T) {
followerOpenedAt := now.Truncate(window10m).Add(-1 * time.Second)
expectedCanLead := true
runTestFn(t, testFlushTimes, followerOpenedAt, expectedCanLead)
testCanLeadNotFlushed(t, testFlushTimes, now, followerOpenedAt, 0, true)
})

t.Run("standard_flushed_ok_and_unflushed_bad", func(t *testing.T) {
flushedAndUnflushedTimes := &schema.ShardSetFlushTimes{
ByShard: map[uint32]*schema.ShardFlushTimes{
123: &schema.ShardFlushTimes{
123: {
StandardByResolution: map[int64]int64{
window1m.Nanoseconds(): now.Add(1 * time.Second).UnixNano(),
window10m.Nanoseconds(): 0,
Expand All @@ -210,21 +183,20 @@ func TestFollowerFlushManagerCanLeadNotFlushed(t *testing.T) {
}

followerOpenedAt := now
expectedCanLead := false
runTestFn(t, flushedAndUnflushedTimes, followerOpenedAt, expectedCanLead)
testCanLeadNotFlushed(t, flushedAndUnflushedTimes, now, followerOpenedAt, 0, false)
})

t.Run("forwarded_flushed_ok_and_unflushed_bad", func(t *testing.T) {
flushedAndUnflushedTimes := &schema.ShardSetFlushTimes{
ByShard: map[uint32]*schema.ShardFlushTimes{
123: &schema.ShardFlushTimes{
123: {
ForwardedByResolution: map[int64]*schema.ForwardedFlushTimesForResolution{
window1m.Nanoseconds(): &schema.ForwardedFlushTimesForResolution{
window1m.Nanoseconds(): {
ByNumForwardedTimes: map[int32]int64{
1: now.Truncate(window1m).Add(1 * time.Second).UnixNano(),
},
},
window10m.Nanoseconds(): &schema.ForwardedFlushTimesForResolution{
window10m.Nanoseconds(): {
ByNumForwardedTimes: map[int32]int64{
1: 0,
},
Expand All @@ -235,9 +207,75 @@ func TestFollowerFlushManagerCanLeadNotFlushed(t *testing.T) {
}

followerOpenedAt := now
expectedCanLead := false
runTestFn(t, flushedAndUnflushedTimes, followerOpenedAt, expectedCanLead)
testCanLeadNotFlushed(t, flushedAndUnflushedTimes, now, followerOpenedAt, 0, false)
})
}

func TestFollowerFlushManagerCanLeadTimedNotFlushed(t *testing.T) {
var (
now = time.Unix(24*60*60, 0)
window10m = 10 * time.Minute
bufferPast = 3 * time.Minute
)

flushTimes := &schema.ShardSetFlushTimes{
ByShard: map[uint32]*schema.ShardFlushTimes{
123: {
TimedByResolution: map[int64]int64{
window10m.Nanoseconds(): 0,
},
},
},
}

t.Run("opened_on_the_window_start", func(t *testing.T) {
followerOpenedAt := now.Add(-bufferPast).Truncate(window10m)
testCanLeadNotFlushed(t, flushTimes, now, followerOpenedAt, bufferPast, false)
})

t.Run("opened_after_the_window_start", func(t *testing.T) {
followerOpenedAt := now.Add(-bufferPast).Truncate(window10m).Add(time.Second)
testCanLeadNotFlushed(t, flushTimes, now, followerOpenedAt, bufferPast, false)
})

t.Run("opened_before_the_window_start", func(t *testing.T) {
followerOpenedAt := now.Add(-bufferPast).Truncate(window10m).Add(-time.Second)
testCanLeadNotFlushed(t, flushTimes, now, followerOpenedAt, bufferPast, true)
})
}

func testCanLeadNotFlushed(
t *testing.T,
flushTimes *schema.ShardSetFlushTimes,
now time.Time,
followerOpenedAt time.Time,
bufferPast time.Duration,
expectedCanLead bool,
) {
t.Helper()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true).AnyTimes()
clockOpts := clock.NewOptions().SetNowFn(func() time.Time {
return now
})
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetClockOptions(clockOpts)

if bufferPast > 0 {
opts = opts.SetBufferForPastTimedMetric(bufferPast)
}

mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)

mgr.processed = flushTimes
mgr.openedAt = followerOpenedAt
require.Equal(t, expectedCanLead, mgr.CanLead())
}

func TestFollowerFlushManagerCanLeadNoTombstonedShards(t *testing.T) {
Expand Down
Loading

0 comments on commit 397b095

Please sign in to comment.