forked from mediocregopher/radix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub_stub_test.go
120 lines (101 loc) · 3.15 KB
/
pubsub_stub_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package radix
import (
"log"
. "testing"
"time"
"github.com/mediocregopher/radix/v3/resp/resp2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPubSubStub(t *T) {
conn, stubCh := PubSubStub("tcp", "127.0.0.1:6379", func(in []string) interface{} {
return in
})
message := func(channel, val string) {
stubCh <- PubSubMessage{Type: "message", Channel: channel, Message: []byte(val)}
<-conn.(*pubSubStub).mDoneCh
}
pmessage := func(pattern, channel, val string) {
stubCh <- PubSubMessage{Type: "pmessage", Pattern: pattern, Channel: channel, Message: []byte(val)}
<-conn.(*pubSubStub).mDoneCh
}
assertEncode := func(in ...string) {
require.Nil(t, conn.Encode(resp2.Any{I: in}))
}
assertDecode := func(exp ...string) {
var into []string
require.Nil(t, conn.Decode(resp2.Any{I: &into}))
assert.Equal(t, exp, into)
}
assertEncode("foo")
assertDecode("foo")
// shouldn't do anything
message("foo", "a")
assertEncode("SUBSCRIBE", "foo", "bar")
assertDecode("subscribe", "foo", "1")
assertDecode("subscribe", "bar", "2")
// should error because we're in pubsub mode
assertEncode("wat")
assert.Equal(t, errPubSubMode.Error(), conn.Decode(resp2.Any{}).Error())
assertEncode("PING")
assertDecode("pong", "")
message("foo", "b")
message("bar", "c")
message("baz", "c")
assertDecode("message", "foo", "b")
assertDecode("message", "bar", "c")
assertEncode("PSUBSCRIBE", "b*z")
assertDecode("psubscribe", "b*z", "3")
assertEncode("PSUBSCRIBE", "b[au]z")
assertDecode("psubscribe", "b[au]z", "4")
pmessage("b*z", "buz", "d")
pmessage("b[au]z", "buz", "d")
pmessage("b*z", "biz", "e")
assertDecode("pmessage", "b*z", "buz", "d")
assertDecode("pmessage", "b[au]z", "buz", "d")
assertDecode("pmessage", "b*z", "biz", "e")
assertEncode("UNSUBSCRIBE", "foo")
assertDecode("unsubscribe", "foo", "3")
message("foo", "f")
message("bar", "g")
assertDecode("message", "bar", "g")
assertEncode("UNSUBSCRIBE", "bar")
assertDecode("unsubscribe", "bar", "2")
assertEncode("PUNSUBSCRIBE", "b*z")
assertDecode("punsubscribe", "b*z", "1")
assertEncode("PUNSUBSCRIBE", "b[au]z")
assertDecode("punsubscribe", "b[au]z", "0")
// No longer in pubsub mode, normal requests should work again
assertEncode("wat")
assertDecode("wat")
}
func ExamplePubSubStub() {
// Make a pubsub stub conn which will return nil for everything except
// pubsub commands (which will be handled automatically)
stub, stubCh := PubSubStub("tcp", "127.0.0.1:6379", func([]string) interface{} {
return nil
})
// These writes shouldn't do anything, initially, since we haven't
// subscribed to anything
go func() {
for {
stubCh <- PubSubMessage{
Channel: "foo",
Message: []byte("bar"),
}
time.Sleep(1 * time.Second)
}
}()
// Use PubSub to wrap the stub like we would for a normal redis connection
pstub := PubSub(stub)
// Subscribe msgCh to "foo"
msgCh := make(chan PubSubMessage)
if err := pstub.Subscribe(msgCh, "foo"); err != nil {
log.Fatal(err)
}
// now msgCh is subscribed the publishes being made by the go-routine above
// will start being written to it
for m := range msgCh {
log.Printf("read m: %#v", m)
}
}