forked from godbus/dbus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsequential_handler.go
120 lines (104 loc) · 2.58 KB
/
sequential_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package dbus
import (
"container/list"
"sync"
)
// NewSequentialSignalHandler returns an instance of a new
// signal handler that guarantees sequential processing of signals. It is a
// guarantee of this signal handler that signals will be written to
// channels in the order they are received on the DBus connection.
func NewSequentialSignalHandler() SignalHandler {
return &sequentialSignalHandler{}
}
type sequentialSignalHandler struct {
mu sync.RWMutex
closed bool
signals []*sequentialSignalChannelData
}
func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
sh.mu.RLock()
defer sh.mu.RUnlock()
if sh.closed {
return
}
for _, scd := range sh.signals {
scd.deliver(signal)
}
}
func (sh *sequentialSignalHandler) Terminate() {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.closed {
return
}
for _, scd := range sh.signals {
scd.close()
close(scd.ch)
}
sh.closed = true
sh.signals = nil
}
func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.closed {
return
}
sh.signals = append(sh.signals, &sequentialSignalChannelData{
queue: list.New(),
ch: ch,
done: make(chan struct{}),
})
}
func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.closed {
return
}
for i := len(sh.signals) - 1; i >= 0; i-- {
if ch == sh.signals[i].ch {
sh.signals[i].close()
copy(sh.signals[i:], sh.signals[i+1:])
sh.signals[len(sh.signals)-1] = nil
sh.signals = sh.signals[:len(sh.signals)-1]
}
}
}
type sequentialSignalChannelData struct {
stateLock sync.Mutex
writeLock sync.Mutex
queue *list.List
wg sync.WaitGroup
ch chan<- *Signal
done chan struct{}
}
func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
// Avoid blocking the main DBus message processing routine;
// queue signal to be dispatched later.
scd.stateLock.Lock()
scd.queue.PushBack(signal)
scd.stateLock.Unlock()
scd.wg.Add(1)
go scd.deferredDeliver()
}
func (scd *sequentialSignalChannelData) deferredDeliver() {
defer scd.wg.Done()
// Ensure only one goroutine is in this section at once, to
// make sure signals are sent over ch in the order they
// are in the queue.
scd.writeLock.Lock()
defer scd.writeLock.Unlock()
scd.stateLock.Lock()
elem := scd.queue.Front()
scd.queue.Remove(elem)
scd.stateLock.Unlock()
select {
case scd.ch <- elem.Value.(*Signal):
case <-scd.done:
}
}
func (scd *sequentialSignalChannelData) close() {
close(scd.done)
scd.wg.Wait() // wait until all spawned goroutines return
}