-
Notifications
You must be signed in to change notification settings - Fork 215
/
watch.go
144 lines (123 loc) · 3.65 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package memdb
import (
"context"
"time"
)
// WatchSet is a collection of watch channels.
type WatchSet map[<-chan struct{}]struct{}
// NewWatchSet constructs a new watch set.
func NewWatchSet() WatchSet {
return make(map[<-chan struct{}]struct{})
}
// Add appends a watchCh to the WatchSet if non-nil.
func (w WatchSet) Add(watchCh <-chan struct{}) {
if w == nil {
return
}
if _, ok := w[watchCh]; !ok {
w[watchCh] = struct{}{}
}
}
// AddWithLimit appends a watchCh to the WatchSet if non-nil, and if the given
// softLimit hasn't been exceeded. Otherwise, it will watch the given alternate
// channel. It's expected that the altCh will be the same on many calls to this
// function, so you will exceed the soft limit a little bit if you hit this, but
// not by much.
//
// This is useful if you want to track individual items up to some limit, after
// which you watch a higher-level channel (usually a channel from start start of
// an iterator higher up in the radix tree) that will watch a superset of items.
func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-chan struct{}) {
// This is safe for a nil WatchSet so we don't need to check that here.
if len(w) < softLimit {
w.Add(watchCh)
} else {
w.Add(altCh)
}
}
// Watch is used to wait for either the watch set to trigger or a timeout.
// Returns true on timeout.
func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
if w == nil {
return false
}
// Create a context that gets cancelled when the timeout is triggered
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-timeoutCh:
cancel()
case <-ctx.Done():
}
}()
return w.WatchCtx(ctx) == context.Canceled
}
// WatchCtx is used to wait for either the watch set to trigger or for the
// context to be cancelled. Watch with a timeout channel can be mimicked by
// creating a context with a deadline. WatchCtx should be preferred over Watch.
func (w WatchSet) WatchCtx(ctx context.Context) error {
if w == nil {
return nil
}
if n := len(w); n <= aFew {
idx := 0
chunk := make([]<-chan struct{}, aFew)
for watchCh := range w {
chunk[idx] = watchCh
idx++
}
return watchFew(ctx, chunk)
}
return w.watchMany(ctx)
}
// watchMany is used if there are many watchers.
func (w WatchSet) watchMany(ctx context.Context) error {
// Set up a goroutine for each watcher.
triggerCh := make(chan struct{}, 1)
watcher := func(chunk []<-chan struct{}) {
if err := watchFew(ctx, chunk); err == nil {
select {
case triggerCh <- struct{}{}:
default:
}
}
}
// Apportion the watch channels into chunks we can feed into the
// watchFew helper.
idx := 0
chunk := make([]<-chan struct{}, aFew)
for watchCh := range w {
subIdx := idx % aFew
chunk[subIdx] = watchCh
idx++
// Fire off this chunk and start a fresh one.
if idx%aFew == 0 {
go watcher(chunk)
chunk = make([]<-chan struct{}, aFew)
}
}
// Make sure to watch any residual channels in the last chunk.
if idx%aFew != 0 {
go watcher(chunk)
}
// Wait for a channel to trigger or timeout.
select {
case <-triggerCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// WatchCh returns a channel that is used to wait for either the watch set to trigger
// or for the context to be cancelled. WatchCh creates a new goroutine each call, so
// callers may need to cache the returned channel to avoid creating extra goroutines.
func (w WatchSet) WatchCh(ctx context.Context) <-chan error {
// Create the outgoing channel
triggerCh := make(chan error, 1)
// Create a goroutine to collect the error from WatchCtx
go func() {
triggerCh <- w.WatchCtx(ctx)
}()
return triggerCh
}