From ba6e77b1b021776e1aa530b889d112a771c59cd4 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Wed, 16 Dec 2015 18:14:24 +0800 Subject: [PATCH 1/2] add TimerWheel --- gocron.go | 48 +++++++---- gocron_test.go | 3 +- tm.go | 224 +++++++++++++++++++++++++++++++++++++++++++++++++ tm_test.go | 88 +++++++++++++++++++ 4 files changed, 346 insertions(+), 17 deletions(-) create mode 100644 tm.go create mode 100644 tm_test.go diff --git a/gocron.go b/gocron.go index 6569de5..3bc14d3 100644 --- a/gocron.go +++ b/gocron.go @@ -20,6 +20,7 @@ package gocron import ( "errors" + "fmt" "reflect" "runtime" "sort" @@ -64,16 +65,20 @@ type Job struct { // Specific day of the week to start on startDay time.Weekday + + onSchedule func() interface{} + + userData interface{} } // Create a new job with the time interval. func NewJob(intervel uint64) *Job { - return &Job{intervel, "", "", "", time.Unix(0, 0), time.Unix(0, 0), 0, time.Sunday} + return &Job{interval: intervel, lastRun: time.Unix(0, 0)} } // True if the job should be run now func (j *Job) shouldRun() bool { - return time.Now().After(j.nextRun) + return time.Now().Sub(j.nextRun) < j.period } //Run the job and immdiately reschedulei it @@ -161,27 +166,32 @@ func (j *Job) scheduleNextRun() { } } - if j.period != 0 { - // translate all the units to the Seconds - j.nextRun = j.lastRun.Add(j.period * time.Second) - } else { + if j.period == 0 { switch j.unit { case "minutes": - j.period = time.Duration(j.interval * 60) - break + j.period = time.Duration(j.interval) * time.Minute + case "hours": - j.period = time.Duration(j.interval * 60 * 60) - break + j.period = time.Duration(j.interval) * time.Hour + case "days": - j.period = time.Duration(j.interval * 60 * 60 * 24) - break + j.period = time.Duration(j.interval) * 24 * time.Hour + case "weeks": - j.period = time.Duration(j.interval * 60 * 60 * 24 * 7) - break + j.period = time.Duration(j.interval) * 24 * 7 * time.Hour + case "seconds": - j.period = time.Duration(j.interval) + j.period = time.Duration(j.interval) * time.Second + + case "milliseconds": + j.period = time.Duration(j.interval) * time.Millisecond } - j.nextRun = j.lastRun.Add(j.period * time.Second) + } + + j.nextRun = j.lastRun.Add(j.period) + + if j.onSchedule != nil { + j.userData = j.onSchedule() } } @@ -202,6 +212,12 @@ func (j *Job) Seconds() (job *Job) { return j } +// Set the unit with seconds +func (j *Job) Milliseconds() (job *Job) { + j.unit = "milliseconds" + return j +} + // Set the unit with minute, which interval is 1 func (j *Job) Minute() (job *Job) { if j.interval != 1 { diff --git a/gocron_test.go b/gocron_test.go index fea96fa..72c8541 100644 --- a/gocron_test.go +++ b/gocron_test.go @@ -2,9 +2,9 @@ package gocron import ( + "fmt" "testing" "time" - "fmt" ) var err = 1 @@ -22,4 +22,5 @@ func TestSecond(*testing.T) { defaultScheduler.Every(1).Second().Do(taskWithParams, 1, "hello") defaultScheduler.Start() time.Sleep(10 * time.Second) + defaultScheduler.Clear() } diff --git a/tm.go b/tm.go new file mode 100644 index 0000000..a4184c9 --- /dev/null +++ b/tm.go @@ -0,0 +1,224 @@ +package gocron + +import ( + "math" + "sort" + "sync" + "time" +) + +type TimerWheel interface { + // Delete all scheduled jobs + Clear() + + // Create a new periodic job + Every(interval uint64) *Job + + // Create a one time job + After(interval uint64) *Job + + // Remove specific job j + Cancel(*Job) bool + + // Run all the jobs that are scheduled to run. + RunPending() + + // Start all the pending jobs Add seconds ticker + Start() chan bool +} + +type TimerWheelConfig struct { + TimeSlotCount uint + SlotInterval time.Duration + SequenceCall bool + Logf func(format string, v ...interface{}) +} + +var DefaultTimerWheelConfig = &TimerWheelConfig{ + TimeSlotCount: 60 * 60, + SlotInterval: 1 * time.Second, + SequenceCall: false, +} + +type timeSlot struct { + lock sync.Mutex + jobs []*Job +} + +type timerWheel struct { + *TimerWheelConfig + slots []timeSlot + currentSlot int +} + +func NewTimerWheel(cfg *TimerWheelConfig) TimerWheel { + if cfg == nil { + cfg = DefaultTimerWheelConfig + } + + return &timerWheel{ + TimerWheelConfig: cfg, + slots: make([]timeSlot, cfg.TimeSlotCount), + } +} + +func (tw *timerWheel) Clear() { + for _, slot := range tw.slots { + slot.lock.Lock() + slot.jobs = nil + slot.lock.Unlock() + } +} + +func (tw *timerWheel) Every(interval uint64) *Job { + job := NewJob(interval) + job.onSchedule = func() interface{} { + return tw.scheduleJob(job) + } + + return job +} + +func (tw *timerWheel) After(interval uint64) *Job { + job := NewJob(interval) + job.onSchedule = func() interface{} { + job.onSchedule = nil + + return tw.scheduleJob(job) + } + + return job +} + +func (tw *timerWheel) Cancel(job *Job) bool { + if n, ok := job.userData.(int); ok { + slot := tw.slots[n] + + slot.lock.Lock() + defer slot.lock.Unlock() + + for i, j := range slot.jobs { + if j == job { + if tw.Logf != nil { + tw.Logf("remove job %p @ slot #%d:%d", job, n, i) + } + + slot.jobs = append(slot.jobs[:i], slot.jobs[i+1:]...) + + return true + } + } + } + + return false +} + +func (tw *timerWheel) RunPending() { + jobs := tw.getPending(tw.currentSlot) + + go func() { + for _, job := range jobs { + if tw.SequenceCall { + tw.runJob(job) + } else { + go tw.runJob(job) + } + } + }() +} + +func (tw *timerWheel) getPending(n int) (jobs []*Job) { + slot := tw.slots[n] + + if len(slot.jobs) == 0 { + return + } + + slot.lock.Lock() + + if slot.jobs[len(slot.jobs)-1].shouldRun() { + // Fast Path O(1) + jobs = slot.jobs + slot.jobs = nil + } else { + // Binary Search O(log n) + pending := sort.Search(len(slot.jobs), func(i int) bool { + return !slot.jobs[i].shouldRun() + }) + + jobs = slot.jobs[:pending] + slot.jobs = slot.jobs[pending:] + } + + slot.lock.Unlock() + + if tw.Logf != nil { + tw.Logf("found %d pending jobs @ slot #%d @ %s, %s", len(jobs), tw.currentSlot, time.Now(), jobs) + } + + return +} + +func (tw *timerWheel) Start() chan bool { + stopped := make(chan bool, 1) + ticker := time.NewTicker(tw.SlotInterval) + + go func() { + for { + select { + case <-ticker.C: + tw.currentSlot = (tw.currentSlot + 1) % len(tw.slots) + tw.RunPending() + + case <-stopped: + return + } + } + }() + + return stopped +} + +func (tw *timerWheel) runJob(job *Job) { + defer func() { + if r := recover(); r != nil { + tw.scheduleJob(job) + } + }() + + job.run() +} + +func (tw *timerWheel) nextSlot(job *Job) (int, *timeSlot) { + nextSlot := (tw.currentSlot + int(math.Ceil(float64(job.nextRun.Sub(time.Now()))/float64(tw.SlotInterval)))) % len(tw.slots) + + if tw.Logf != nil { + tw.Logf("%s was mapped to slot #%d", job.String(), nextSlot) + } + + return nextSlot, &tw.slots[nextSlot] +} + +func (tw *timerWheel) scheduleJob(job *Job) int { + n, slot := tw.nextSlot(job) + + slot.lock.Lock() + + if len(slot.jobs) == 0 || job.nextRun.After(slot.jobs[len(slot.jobs)-1].nextRun) { + // Fast Path O(1) + slot.jobs = append(slot.jobs, job) + } else { + // Binary Search O(log n) + i := sort.Search(len(slot.jobs), func(i int) bool { + return slot.jobs[i].nextRun.After(job.nextRun) + }) + + jobs := append(slot.jobs[:i], job) + + slot.jobs = append(jobs, slot.jobs[i:]...) + } + + slot.lock.Unlock() + + return n +} diff --git a/tm_test.go b/tm_test.go new file mode 100644 index 0000000..fce99f0 --- /dev/null +++ b/tm_test.go @@ -0,0 +1,88 @@ +package gocron + +import ( + "math/rand" + "testing" + "time" +) + +func TestTimer(t *testing.T) { + scheduler := NewTimerWheel(nil) + + scheduler.Every(1).Second().Do(task) + scheduler.Every(1).Second().Do(taskWithParams, 1, "hello") + stopped := scheduler.Start() + + time.Sleep(10 * time.Second) + + scheduler.Clear() + + stopped <- true +} + +func TestOneTimeTimer(t *testing.T) { + scheduler := NewTimerWheel(nil) + + scheduler.After(1).Second().Do(task) + scheduler.After(1).Second().Do(taskWithParams, 1, "hello") + stopped := scheduler.Start() + + time.Sleep(10 * time.Second) + + scheduler.Clear() + + stopped <- true +} + +func BenchmarkSchedule(b *testing.B) { + cfg := *DefaultTimerWheelConfig + + scheduler := NewTimerWheel(&cfg) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + scheduler.Every(uint64(rand.Intn(int(cfg.TimeSlotCount)))).Seconds().Do(task) + } + }) +} + +func BenchmarkPending(b *testing.B) { + cfg := *DefaultTimerWheelConfig + + scheduler := NewTimerWheel(&cfg).(*timerWheel) + + jobs := make([]*Job, b.N) + + for i := 0; i < b.N; i++ { + jobs[i] = scheduler.Every(uint64(rand.Intn(int(cfg.TimeSlotCount)))) + jobs[i].Seconds().Do(task) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + scheduler.getPending(i % len(scheduler.slots)) + } +} + +func BenchmarkCancel(b *testing.B) { + cfg := *DefaultTimerWheelConfig + + scheduler := NewTimerWheel(&cfg).(*timerWheel) + + jobs := make([]*Job, b.N) + + for i := 0; i < b.N; i++ { + jobs[i] = scheduler.Every(uint64(rand.Intn(int(cfg.TimeSlotCount)))) + jobs[i].Seconds().Do(task) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if !scheduler.Cancel(jobs[i]) { + b.Logf("missing %s @ slot #%v, %v", jobs[i], jobs[i].userData, scheduler.slots[jobs[i].userData.(int)].jobs) + b.Fail() + } + } +} From 786174d787e01f22fa354926b754875efede800f Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Wed, 16 Dec 2015 18:22:01 +0800 Subject: [PATCH 2/2] fix spell/log error --- gocron.go | 3 +-- tm.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/gocron.go b/gocron.go index 3bc14d3..f9e8fde 100644 --- a/gocron.go +++ b/gocron.go @@ -20,7 +20,6 @@ package gocron import ( "errors" - "fmt" "reflect" "runtime" "sort" @@ -212,7 +211,7 @@ func (j *Job) Seconds() (job *Job) { return j } -// Set the unit with seconds +// Set the unit with milliseconds func (j *Job) Milliseconds() (job *Job) { j.unit = "milliseconds" return j diff --git a/tm.go b/tm.go index a4184c9..70accb8 100644 --- a/tm.go +++ b/tm.go @@ -193,7 +193,7 @@ func (tw *timerWheel) nextSlot(job *Job) (int, *timeSlot) { nextSlot := (tw.currentSlot + int(math.Ceil(float64(job.nextRun.Sub(time.Now()))/float64(tw.SlotInterval)))) % len(tw.slots) if tw.Logf != nil { - tw.Logf("%s was mapped to slot #%d", job.String(), nextSlot) + tw.Logf("%p was mapped to slot #%d", job, nextSlot) } return nextSlot, &tw.slots[nextSlot]