forked from mabunixda/wattpilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.go
58 lines (48 loc) · 941 Bytes
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package wattpilot
import "sync"
type Pubsub struct {
mu sync.RWMutex
subs map[string][]chan interface{}
closed bool
}
func NewPubsub() *Pubsub {
ps := &Pubsub{}
ps.subs = make(map[string][]chan interface{})
return ps
}
func (ps *Pubsub) IsEmpty() bool {
ps.mu.Lock()
defer ps.mu.Unlock()
return len(ps.subs) == 0
}
func (ps *Pubsub) Subscribe(topic string) <-chan interface{} {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan interface{}, 1)
ps.subs[topic] = append(ps.subs[topic], ch)
return ch
}
func (ps *Pubsub) Publish(topic string, msg interface{}) {
ps.mu.RLock()
defer ps.mu.RUnlock()
if ps.closed {
return
}
for _, ch := range ps.subs[topic] {
go func(ch chan interface{}) {
ch <- msg
}(ch)
}
}
func (ps *Pubsub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.closed {
ps.closed = true
for _, subs := range ps.subs {
for _, ch := range subs {
close(ch)
}
}
}
}