forked from asaskevich/EventBus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
153 lines (139 loc) · 3.94 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package EventBus
import (
"errors"
"fmt"
"net"
"net/http"
"net/rpc"
"sync"
)
// SubscribeType - how the client intends to subscribe
type SubscribeType int
const (
// Subscribe - subscribe to all events
Subscribe SubscribeType = iota
// SubscribeOnce - subscribe to only one event
SubscribeOnce
)
const (
// RegisterService - Server subscribe service method
RegisterService = "ServerService.Register"
)
// SubscribeArg - object to hold subscribe arguments from remote event handlers
type SubscribeArg struct {
ClientAddr string
ClientPath string
ServiceMethod string
SubscribeType SubscribeType
Topic string
}
// Server - object capable of being subscribed to by remote handlers
type Server struct {
eventBus Bus
address string
path string
subscribers map[string][]*SubscribeArg
service *ServerService
}
// NewServer - create a new Server at the address and path
func NewServer(address, path string, eventBus Bus) *Server {
server := new(Server)
server.eventBus = eventBus
server.address = address
server.path = path
server.subscribers = make(map[string][]*SubscribeArg)
server.service = &ServerService{server, &sync.WaitGroup{}, false}
return server
}
// EventBus - returns wrapped event bus
func (server *Server) EventBus() Bus {
return server.eventBus
}
func (server *Server) rpcCallback(subscribeArg *SubscribeArg) func(args ...interface{}) {
return func(args ...interface{}) {
client, connErr := rpc.DialHTTPPath("tcp", subscribeArg.ClientAddr, subscribeArg.ClientPath)
defer client.Close()
if connErr != nil {
fmt.Errorf("dialing: %v", connErr)
}
clientArg := new(ClientArg)
clientArg.Topic = subscribeArg.Topic
clientArg.Args = args
var reply bool
err := client.Call(subscribeArg.ServiceMethod, clientArg, &reply)
if err != nil {
fmt.Errorf("dialing: %v", err)
}
}
}
// HasClientSubscribed - True if a client subscribed to this server with the same topic
func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool {
if topicSubscribers, ok := server.subscribers[arg.Topic]; ok {
for _, topicSubscriber := range topicSubscribers {
if *topicSubscriber == *arg {
return true
}
}
}
return false
}
// Start - starts a service for remote clients to subscribe to events
func (server *Server) Start() error {
var err error
service := server.service
if !service.started {
rpcServer := rpc.NewServer()
rpcServer.Register(service)
rpcServer.HandleHTTP(server.path, "/debug"+server.path)
l, e := net.Listen("tcp", server.address)
if e != nil {
err = e
fmt.Errorf("listen error: %v", e)
}
service.started = true
service.wg.Add(1)
go http.Serve(l, nil)
} else {
err = errors.New("Server bus already started")
}
return err
}
// Stop - signal for the service to stop serving
func (server *Server) Stop() {
service := server.service
if service.started {
service.wg.Done()
service.started = false
}
}
// ServerService - service object to listen to remote subscriptions
type ServerService struct {
server *Server
wg *sync.WaitGroup
started bool
}
// Register - Registers a remote handler to this event bus
// for a remote subscribe - a given client address only needs to subscribe once
// event will be republished in local event bus
func (service *ServerService) Register(arg *SubscribeArg, success *bool) error {
subscribers := service.server.subscribers
if !service.server.HasClientSubscribed(arg) {
rpcCallback := service.server.rpcCallback(arg)
switch arg.SubscribeType {
case Subscribe:
service.server.eventBus.Subscribe(arg.Topic, rpcCallback)
case SubscribeOnce:
service.server.eventBus.SubscribeOnce(arg.Topic, rpcCallback)
}
var topicSubscribers []*SubscribeArg
if _, ok := subscribers[arg.Topic]; ok {
topicSubscribers = []*SubscribeArg{arg}
} else {
topicSubscribers = subscribers[arg.Topic]
topicSubscribers = append(topicSubscribers, arg)
}
subscribers[arg.Topic] = topicSubscribers
}
*success = true
return nil
}