Skip to content

Commit

Permalink
Merge pull request #1287 from aaronlehmann/scheduler-debouncing
Browse files Browse the repository at this point in the history
scheduler: Debounce commit events
  • Loading branch information
aaronlehmann authored Aug 1, 2016
2 parents 3121444 + 77c62db commit 008c7f6
Showing 1 changed file with 40 additions and 7 deletions.
47 changes: 40 additions & 7 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,31 @@ func (s *Scheduler) Run(ctx context.Context) error {
// Queue all unassigned tasks before processing changes.
s.tick(ctx)

const (
// commitDebounceGap is the amount of time to wait between
// commit events to debounce them.
commitDebounceGap = 50 * time.Millisecond
// maxLatency is a time limit on the debouncing.
maxLatency = time.Second
)
var (
debouncingStarted time.Time
commitDebounceTimer *time.Timer
commitDebounceTimeout <-chan time.Time
)

pendingChanges := 0

schedule := func() {
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
}
if pendingChanges > 0 {
s.tick(ctx)
pendingChanges = 0
}
}

// Watch for changes.
for {
select {
Expand All @@ -131,15 +154,25 @@ func (s *Scheduler) Run(ctx context.Context) error {
case state.EventDeleteNode:
s.nodeHeap.remove(v.Node.ID)
case state.EventCommit:
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
}
if pendingChanges > 0 {
s.tick(ctx)
pendingChanges = 0
if commitDebounceTimer != nil {
if time.Since(debouncingStarted) > maxLatency {
commitDebounceTimer.Stop()
commitDebounceTimer = nil
commitDebounceTimeout = nil
schedule()
} else {
commitDebounceTimer.Reset(commitDebounceGap)
}
} else {
commitDebounceTimer = time.NewTimer(commitDebounceGap)
commitDebounceTimeout = commitDebounceTimer.C
debouncingStarted = time.Now()
}
}

case <-commitDebounceTimeout:
schedule()
commitDebounceTimer = nil
commitDebounceTimeout = nil
case <-s.stopChan:
return nil
}
Expand Down

0 comments on commit 008c7f6

Please sign in to comment.