-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdelay_queue.go
137 lines (128 loc) · 2.42 KB
/
delay_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package kreconciler
import (
"context"
"fmt"
"sync"
"time"
)
type dq struct {
head *qElt
tail *qElt
C chan bool
sync.Mutex
resolution time.Duration
size int
capacity int
}
type qElt struct {
next *qElt
prev *qElt
i interface{}
end time.Time
}
func newQueue(size int, resolution time.Duration) *dq {
head := &qElt{i: "head"}
tail := &qElt{i: "tail"}
head.next = tail
tail.prev = head
res := &dq{
resolution: resolution,
head: head,
tail: tail,
size: 0,
capacity: size,
C: make(chan bool, 1),
}
return res
}
func (q *dq) run(ctx context.Context, onItem func(t time.Time, i interface{})) {
var timer = time.NewTimer(time.Hour)
for {
select {
case <-ctx.Done():
timer.Stop()
return
case <-q.C:
timer.Stop()
t := q.nextTime()
if t.IsZero() {
timer = time.NewTimer(time.Hour)
} else {
timer = time.NewTimer(t.Sub(time.Now()))
}
case now := <-timer.C:
elts := q.dequeueIfBefore(now)
for _, n := range elts {
onItem(now, n)
}
}
}
}
func (q *dq) schedule(i interface{}, delay time.Duration) error {
return q.scheduleOnTime(i, time.Now().Add(delay))
}
func (q *dq) scheduleOnTime(i interface{}, delay time.Time) error {
q.Lock()
defer q.Unlock()
if q.size == q.capacity {
return fmt.Errorf("delay queue is full with %d items", q.capacity)
}
q.size += 1
// Only do things in batch of 100 millis this avoids having 1000s of steps
end := delay.Truncate(q.resolution)
n := &qElt{
i: i,
end: end,
}
cur := q.head.next
for {
if cur == q.tail || cur.end.After(end) {
prev := cur.prev
cur.prev = n
prev.next = n
n.next = cur
n.prev = prev
if prev == q.head {
select {
// If we replaced the head prepare to restart the tick
case q.C <- true:
default:
}
}
return nil
}
cur = cur.next
}
}
func (q *dq) dequeueIfBefore(t time.Time) []interface{} {
q.Lock()
defer q.Unlock()
var res []interface{}
for {
n := q.head.next
if n != q.tail && n.end.Before(t) {
q.head.next = n.next
n.next.prev = q.head
res = append(res, n.i)
n = n.next
} else {
if len(res) > 0 {
q.size -= len(res)
select {
// If we replaced the head prepare to restart the tick
case q.C <- true:
default:
}
}
return res
}
}
}
func (q *dq) nextTime() time.Time {
q.Lock()
defer q.Unlock()
if q.head.next == q.tail {
return time.Time{}
}
return q.head.next.end
}