Skip to content

Commit

Permalink
[Pubsub] Adding a pubsub package for server/client connections (#120)
Browse files Browse the repository at this point in the history
* from avalanchego

* server test

* pubsub test lock

* check connection ends on client close

* lint

* pubsub stripped down, and server test added

* server callback function

* callback function in server and test

* use connections in server

* server comments

* github lint

* lint + comments + cleanup

* read callback to connection

* change callback function type

* callback function change

* server tests multiple connects + callback

* fixed race condition in tests

* partial PR comments

* consts, and server start + shutdown

* review

* naming and  pr comments
  • Loading branch information
samliok authored Apr 3, 2023
1 parent 13ea6b2 commit 74cbe95
Show file tree
Hide file tree
Showing 7 changed files with 696 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.12.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand All @@ -50,6 +52,7 @@ require (
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/supranational/blst v0.3.11-0.20220920110316-f72618070295 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk=
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.12.0 h1:kr3j8iIMR4ywO/O0rvksXaJvauGGCMg2zAZIiNZ9uIQ=
Expand All @@ -259,6 +261,8 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hydrogen18/memlistener v0.0.0-20141126152155-54553eb933fb/go.mod h1:qEIFzExnS6016fRpRfxrExeVn2gbClQA99gQhnIcdhE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -413,6 +417,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
Expand Down
163 changes: 163 additions & 0 deletions pubsub/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package pubsub

import (
"errors"
"io"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
"go.uber.org/zap"
)

var (
ErrFilterNotInitialized = errors.New("filter not initialized")
ErrAddressLimit = errors.New("address limit exceeded")
ErrInvalidFilterParam = errors.New("invalid bloom filter params")
ErrInvalidCommand = errors.New("invalid command")
)

// Callback type is used as a callback function for the
// WebSocket server to process incoming messages.
// Accepts a byte message, the connection and any additional information.
type Callback func([]byte, *Connection) []byte

// connection is a representation of the websocket connection.
type Connection struct {
s *Server

// The websocket connection.
conn *websocket.Conn

// Buffered channel of outbound messages.
send chan []byte

// Represents if the connection can receive new messages.
active atomic.Bool
}

// isActive returns whether the connection is active
func (c *Connection) isActive() bool {
return c.active.Load()
}

// deactivate deactivates the connection.
func (c *Connection) deactivate() {
c.active.Store(false)
}

// Send sends [msg] to c's send channel and returns whether the message was sent.
func (c *Connection) Send(msg []byte) bool {
if !c.isActive() {
return false
}
select {
case c.send <- msg:
return true
default:
c.s.log.Debug("msg was dropped")
}
return false
}

// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Connection) readPump() {
defer func() {
c.deactivate()
c.s.removeConnection(c)

// close is called by both the writePump and the readPump so one of them
// will always error
_ = c.conn.Close()
}()

c.conn.SetReadLimit(c.s.config.MaxMessageSize)
// SetReadDeadline returns an error if the connection is corrupted
if err := c.conn.SetReadDeadline(time.Now().Add(c.s.config.PongWait)); err != nil {
return
}
c.conn.SetPongHandler(func(string) error {
return c.conn.SetReadDeadline(time.Now().Add(c.s.config.PongWait))
})
for {
_, reader, err := c.conn.NextReader()
if err != nil {
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
c.s.log.Debug("unexpected close in websockets",
zap.Error(err),
)
}
break
}
if c.s.callback != nil {
responseBytes, err := io.ReadAll(reader)
if err == nil {
c.s.log.Debug("unexpected error reading bytes from websockets",
zap.Error(err),
)
}
c.Send(c.s.callback(responseBytes, c))
}
}
}

// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Connection) writePump() {
ticker := time.NewTicker(c.s.config.PingPeriod)
defer func() {
c.deactivate()
ticker.Stop()
c.s.removeConnection(c)

// close is called by both the writePump and the readPump so one of them
// will always error
_ = c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
if err := c.conn.SetWriteDeadline(time.Now().Add(c.s.config.WriteWait)); err != nil {
c.s.log.Debug("closing the connection",
zap.String("reason", "failed to set the write deadline"),
zap.Error(err),
)
return
}
if !ok {
// The hub closed the channel. Attempt to close the connection
// gracefully.
_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.BinaryMessage, message); err != nil {
return
}
case <-ticker.C:
if err := c.conn.SetWriteDeadline(time.Now().Add(c.s.config.WriteWait)); err != nil {
c.s.log.Debug("closing the connection",
zap.String("reason", "failed to set the write deadline"),
zap.Error(err),
)
return
}
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
60 changes: 60 additions & 0 deletions pubsub/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package pubsub

import (
"sync"

"github.com/ava-labs/avalanchego/utils/set"
)

// connections represents a collection of connections to clients.
type Connections struct {
lock sync.RWMutex
conns set.Set[*Connection]
}

// NewConnections returns a new Connections instance.
func NewConnections() *Connections {
return &Connections{}
}

// Conns returns a list of all connections in [c].
func (c *Connections) Conns() []*Connection {
c.lock.RLock()
defer c.lock.RUnlock()
return c.conns.List()
}

// Has returns if the connection [conn] is in [c].
func (c *Connections) Has(conn *Connection) bool {
c.lock.RLock()
defer c.lock.RUnlock()

return c.conns.Contains(conn)
}

// Remove removes [conn] from [c].
func (c *Connections) Remove(conn *Connection) {
c.lock.Lock()
defer c.lock.Unlock()

c.conns.Remove(conn)
}

// Add adds [conn] to the [c].
func (c *Connections) Add(conn *Connection) {
c.lock.Lock()
defer c.lock.Unlock()

c.conns.Add(conn)
}

// Len returns the number of connections in [c].
func (c *Connections) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()

return c.conns.Len()
}
21 changes: 21 additions & 0 deletions pubsub/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package pubsub

import (
"time"

"github.com/ava-labs/avalanchego/utils/units"
)

const (
ReadBufferSize = units.KiB
WriteBufferSize = units.KiB
WriteWait = 10 * time.Second
PongWait = 60 * time.Second
PingPeriod = (PongWait * 9) / 10
MaxMessageSize = 10 * units.KiB // bytes
MaxPendingMessages = 1024
ReadHeaderTimeout = 5 * time.Second
)
Loading

0 comments on commit 74cbe95

Please sign in to comment.