forked from vindalu/go-vindalu-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriber.go
64 lines (49 loc) · 1.62 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package vindalu
import (
log "github.com/euforia/simplelog"
"github.com/nats-io/nats"
"github.com/euforia/vindaloo/events"
)
type VindaluSubscriber struct {
conn *nats.Conn
enConn *nats.EncodedConn
}
func NewVindaluSubscriber(servers []string, logger *log.Logger) (vs *VindaluSubscriber, err error) {
vs = &VindaluSubscriber{}
opts := nats.DefaultOptions
opts.Servers = servers
if vs.conn, err = opts.Connect(); err != nil {
return
}
logger.Debug.Printf("nats client connected to: %v!\n", vs.conn.ConnectedUrl())
vs.conn.Opts.ReconnectedCB = func(nc *nats.Conn) {
logger.Debug.Printf("nats client reconnected to: %v!\n", nc.ConnectedUrl())
}
vs.conn.Opts.DisconnectedCB = func(_ *nats.Conn) {
logger.Debug.Printf("nats client disconnected!\n")
}
vs.enConn, err = nats.NewEncodedConn(vs.conn, nats.JSON_ENCODER)
return
}
func (vs *VindaluSubscriber) Subscribe(topic string) (ch chan *events.Event, err error) {
// Goes no where as we do not want to allow writing (i.e publishing)
//if err = vs.enConn.BindSendChan(topic, make(chan *events.Event)); err != nil {
// return
//}
ch = make(chan *events.Event)
_, err = vs.enConn.BindRecvChan(topic, ch)
return
}
func (vs *VindaluSubscriber) SubscribeQueueGroup(topic, qGroup string) (ch chan *events.Event, err error) {
// Goes no where as we do not want to allow writing (i.e publishing)
//if err = vs.enConn.BindSendChan(topic, make(chan *events.Event)); err != nil {
// return
//}
ch = make(chan *events.Event)
_, err = vs.enConn.BindRecvQueueChan(topic, qGroup, ch)
return
}
func (vs *VindaluSubscriber) Close() {
vs.enConn.Close()
vs.conn.Close()
}