diff --git a/_examples/redis/main.go b/_examples/redis/main.go index c1cce47..4a86b63 100644 --- a/_examples/redis/main.go +++ b/_examples/redis/main.go @@ -161,7 +161,13 @@ func startServer() { if err != nil { panic(err) } - server.StackExchange = exc + server.UseStackExchange(exc) + // The server.StackExchange field is also exported + // so users can directly use or/and test their registered + // implementations all together. + // This is possible because a wrapper is in-place + // when you register more than one stack exchanges + // on the same neffos server instance. server.IDGenerator = func(w http.ResponseWriter, r *http.Request) string { if userID := r.Header.Get("X-Username"); userID != "" { diff --git a/conn.go b/conn.go index 012f2e1..fa26406 100644 --- a/conn.go +++ b/conn.go @@ -572,18 +572,14 @@ func (c *Conn) notifyNamespaceConnected(ns *NSConn, connectMsg Message) { connectMsg.Event = OnNamespaceConnected ns.events.fireEvent(ns, connectMsg) // omit error, it's connected. - if !c.IsClient() { - if c.server.StackExchange != nil { - c.server.StackExchange.Subscribe(c, ns.namespace) - } + if !c.IsClient() && c.server.usesStackExchange() { + c.server.StackExchange.Subscribe(c, ns.namespace) } } func (c *Conn) notifyNamespaceDisconnect(ns *NSConn, disconnectMsg Message) { - if !c.IsClient() { - if c.server.StackExchange != nil { - c.server.StackExchange.Unsubscribe(c, disconnectMsg.Namespace) - } + if !c.IsClient() && c.server.usesStackExchange() { + c.server.StackExchange.Unsubscribe(c, disconnectMsg.Namespace) } } diff --git a/go.sum b/go.sum index ca40807..36d8015 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,3 @@ -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= @@ -9,8 +8,5 @@ github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/iris-contrib/go.uuid v2.0.0+incompatible h1:XZubAYg61/JwnJNbZilGjf3b3pB80+OQg2qf6c8BfWE= github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0= -github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg= github.com/mediocregopher/radix/v3 v3.3.0 h1:oacPXPKHJg0hcngVVrdtTnfGJiS+PtwoQwTBZGFlV4k= github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/server.go b/server.go index f62806a..09afa5a 100644 --- a/server.go +++ b/server.go @@ -108,6 +108,32 @@ func New(upgrader Upgrader, connHandler ConnHandler) *Server { return s } +// UseStackExchange can be used to add one or more StackExchange +// to the server. +// Returns a non-nil error when "exc" +// completes the `StackExchangeInitializer` interface and its `Init` failed. +// +// Read more at the `StackExchange` type's docs. +func (s *Server) UseStackExchange(exc StackExchange) error { + if err := stackExchangeInit(exc, s.namespaces); err != nil { + return err + } + + if s.usesStackExchange() { + s.StackExchange = wrapStackExchanges(s.StackExchange, exc) + } else { + s.StackExchange = exc + } + + return nil +} + +// usesStackExchange reports whether this server +// uses one or more `StackExchange`s. +func (s *Server) usesStackExchange() bool { + return s.StackExchange != nil +} + func (s *Server) start() { atomic.StoreUint32(&s.closed, 0) @@ -130,7 +156,7 @@ func (s *Server) start() { s.OnDisconnect(c) } - if s.StackExchange != nil { + if s.usesStackExchange() { s.StackExchange.OnDisconnect(c) } } @@ -275,10 +301,13 @@ func (s *Server) Upgrade( c.ReconnectTries, _ = strconv.Atoi(retriesHeaderValue) } - go func(c *Conn) { - for s.waitMessage(c) { - } - }(c) + if !s.usesStackExchange() { + // fire neffos broadcaster when no exchangers are registered. + go func(c *Conn) { + for s.waitMessage(c) { + } + }(c) + } s.connect <- c @@ -317,7 +346,7 @@ func (s *Server) Upgrade( } } - if s.StackExchange != nil { + if s.usesStackExchange() { if err := s.StackExchange.OnConnect(c); err != nil { c.readiness.unwait(err) return nil, err @@ -443,7 +472,7 @@ func (s *Server) Broadcast(exceptSender fmt.Stringer, msg Message) { // s.broadcastCond.Broadcast() - if s.StackExchange != nil { + if s.usesStackExchange() { s.StackExchange.Publish(msg) return } diff --git a/stackexchange.go b/stackexchange.go index 1d59d78..bf4592c 100644 --- a/stackexchange.go +++ b/stackexchange.go @@ -29,3 +29,71 @@ type StackExchange interface { // it's called automatically on neffos namespace disconnect. Unsubscribe(c *Conn, namespace string) // should close the subscriber. } + +// StackExchangeInitializer is an optional interface for a `StackExchange`. +// It contains a single `Init` method which accepts +// the registered server namespaces and returns error. +// It does not called on manual `Server.StackExchange` field set, +// use the `Server.UseStackExchange` to make sure that this implementation is respected. +type StackExchangeInitializer interface { + // Init should initialize a stackexchange, it's optional. + Init(Namespaces) error +} + +func stackExchangeInit(s StackExchange, namespaces Namespaces) error { + if s != nil { + if sinit, ok := s.(StackExchangeInitializer); ok { + return sinit.Init(namespaces) + } + } + + return nil +} + +// internal use only when more than one stack exchanges are registered. +type stackExchangeWrapper struct { + // read-only fields. + parent StackExchange + current StackExchange +} + +func wrapStackExchanges(existingExc StackExchange, newExc StackExchange) StackExchange { + return &stackExchangeWrapper{ + parent: existingExc, + current: newExc, + } +} + +func (s *stackExchangeWrapper) OnConnect(c *Conn) error { + // return on first error, do not wrap errors, + // the server should NOT run if at least one is errored. + err := s.parent.OnConnect(c) + if err != nil { + return err + } + + return s.current.OnConnect(c) +} + +func (s *stackExchangeWrapper) OnDisconnect(c *Conn) { + s.parent.OnDisconnect(c) + s.current.OnDisconnect(c) +} + +func (s *stackExchangeWrapper) Publish(msg Message) bool { + // keep try on the next but return false on any failure. + okParent := s.parent.Publish(msg) + okCurrent := s.current.Publish(msg) + + return okParent && okCurrent +} + +func (s *stackExchangeWrapper) Subscribe(c *Conn, namespace string) { + s.parent.Subscribe(c, namespace) + s.current.Subscribe(c, namespace) +} + +func (s *stackExchangeWrapper) Unsubscribe(c *Conn, namespace string) { + s.parent.Unsubscribe(c, namespace) + s.current.Unsubscribe(c, namespace) +}