This repository has been archived by the owner on Oct 4, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
/
redis_queue.go
118 lines (94 loc) · 2.45 KB
/
redis_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package incus
import "container/list"
import "sync"
import "log"
import "time"
// A queue of redis commands
type RedisQueue struct {
incoming chan RedisCommand
outgoing chan RedisCommand
pending *list.List
stats RuntimeStats
// Used to control access to the pending list
pendingListLock sync.Mutex
// Used to signal to exactly one goroutine that it should wake up
pendingCond *sync.Cond
}
func NewRedisQueue(consumers int, stats RuntimeStats, pool *redisPool) *RedisQueue {
incoming := make(chan RedisCommand)
outgoing := make(chan RedisCommand)
for i := 0; i < consumers; i++ {
outgoingRcv := (<-chan RedisCommand)(outgoing)
NewRedisQueueConsumer(outgoingRcv, pool)
}
pendingList := &RedisQueue{
incoming: incoming,
outgoing: outgoing,
pending: list.New(),
stats: stats,
pendingListLock: sync.Mutex{},
pendingCond: sync.NewCond(&sync.Mutex{}),
}
go pendingList.ReceiveForever()
go pendingList.DispatchForever()
return pendingList
}
func (r *RedisQueue) RunAsyncTimeout(timeout time.Duration, callback RedisCallback) RedisCommandResult {
resultChan := make(chan RedisCommandResult, 1)
job := RedisCommand{
Callback: callback,
Result: resultChan,
}
r.incoming <- job
select {
case result := <-resultChan:
return result
case <-time.After(timeout):
return RedisCommandResult{
Value: nil,
Error: timedOut,
}
}
}
// Receive a command, save/enqueue it, and wake the dispatching goroutine
func (r *RedisQueue) ReceiveForever() {
for {
command := <-r.incoming
r.pendingListLock.Lock()
r.pending.PushBack(command)
r.pendingListLock.Unlock()
r.pendingCond.Signal()
}
}
// Dispatch a command to a goroutine that is blocking on receiving a command.
func (r *RedisQueue) DispatchForever() {
for {
r.pendingCond.L.Lock()
var pendingLength int
var front *list.Element = nil
for {
r.pendingListLock.Lock()
pendingLength = r.pending.Len()
if DEBUG {
log.Printf("There are %d pending commands", pendingLength)
}
r.stats.LogPendingRedisActivityCommandsListLength(pendingLength)
if pendingLength != 0 {
front = r.pending.Front()
r.pending.Remove(front)
r.pendingListLock.Unlock()
break
} else {
r.pendingListLock.Unlock()
r.pendingCond.Wait()
}
}
r.pendingCond.L.Unlock()
go func(value RedisCommand) {
if DEBUG {
log.Printf("Pushed one command to outgoing")
}
r.outgoing <- value
}(front.Value.(RedisCommand))
}
}