Skip to content

Commit

Permalink
Merge pull request #995 from weaveworks/828-websocket-ping
Browse files Browse the repository at this point in the history
Add ping/pong to websocket protocol
  • Loading branch information
paulbellamy committed Feb 25, 2016
2 parents d7b84ed + 35fe886 commit 1d17a33
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 63 deletions.
23 changes: 6 additions & 17 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"golang.org/x/net/context"

"github.com/weaveworks/scope/common/xfer"
Expand All @@ -14,8 +13,7 @@ import (
)

const (
websocketLoop = 1 * time.Second
websocketTimeout = 10 * time.Second
websocketLoop = 1 * time.Second
)

// APITopology is returned by the /api/topology/{name} handler.
Expand Down Expand Up @@ -67,10 +65,6 @@ func handleNode(nodeID string) func(context.Context, Reporter, render.Renderer,
}
}

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

func handleWebsocket(
ctx context.Context,
w http.ResponseWriter,
Expand All @@ -79,17 +73,17 @@ func handleWebsocket(
renderer render.Renderer,
loop time.Duration,
) {
conn, err := upgrader.Upgrade(w, r, nil)
conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
// log.Info("Upgrade:", err)
return
}
defer conn.Close()

quit := make(chan struct{})
go func(c *websocket.Conn) {
go func(c xfer.Websocket) {
for { // just discard everything the browser sends
if _, _, err := c.NextReader(); err != nil {
if _, _, err := c.ReadMessage(); err != nil {
if !xfer.IsExpectedWSCloseError(err) {
log.Println("err:", err)
}
Expand All @@ -112,18 +106,13 @@ func handleWebsocket(
diff := render.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo

if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil {
if err := conn.WriteJSON(diff); err != nil {
if !xfer.IsExpectedWSCloseError(err) {
log.Println("err:", err)
log.Errorf("cannot serialize topology diff: %s", err)
}
return
}

if err := xfer.WriteJSONtoWS(conn, diff); err != nil {
log.Errorf("cannot serialize topology diff: %s", err)
return
}

select {
case <-wait:
case <-tick:
Expand Down
7 changes: 4 additions & 3 deletions app/controls.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"io"
"net/http"
"net/rpc"

Expand Down Expand Up @@ -56,7 +57,7 @@ func handleProbeWS(cr ControlRouter) CtxHandlerFunc {
return
}

conn, err := upgrader.Upgrade(w, r, nil)
conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
log.Printf("Error upgrading control websocket: %v", err)
return
Expand All @@ -79,8 +80,8 @@ func handleProbeWS(cr ControlRouter) CtxHandlerFunc {
return
}
defer cr.Deregister(ctx, probeID, id)
if err := codec.WaitForReadError(); err != nil && !xfer.IsExpectedWSCloseError(err) {
log.Printf("Error reading from probe %s control websocket: %v", probeID, err)
if err := codec.WaitForReadError(); err != nil && err != io.EOF && !xfer.IsExpectedWSCloseError(err) {
log.Errorf("Error on websocket: %v", err)
}
}
}
2 changes: 1 addition & 1 deletion app/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
}
defer pr.Release(ctx, id, end)

conn, err := upgrader.Upgrade(w, r, nil)
conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
log.Errorf("Error upgrading pipe %s (%d) websocket: %v", id, end, err)
return
Expand Down
16 changes: 7 additions & 9 deletions common/xfer/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"net/rpc"
"sync"

"github.com/gorilla/websocket"
)

// ErrInvalidMessage is the error returned when the on-wire message is unexpected.
Expand Down Expand Up @@ -70,12 +68,12 @@ func ResponseError(err error) Response {
// that transmits and receives RPC messages over a websocker, as JSON.
type JSONWebsocketCodec struct {
sync.Mutex
conn *websocket.Conn
conn Websocket
err chan error
}

// NewJSONWebsocketCodec makes a new JSONWebsocketCodec
func NewJSONWebsocketCodec(conn *websocket.Conn) *JSONWebsocketCodec {
func NewJSONWebsocketCodec(conn Websocket) *JSONWebsocketCodec {
return &JSONWebsocketCodec{
conn: conn,
err: make(chan error, 1),
Expand All @@ -93,26 +91,26 @@ func (j *JSONWebsocketCodec) WriteRequest(r *rpc.Request, v interface{}) error {
j.Lock()
defer j.Unlock()

if err := WriteJSONtoWS(j.conn, Message{Request: r}); err != nil {
if err := j.conn.WriteJSON(Message{Request: r}); err != nil {
return err
}
return WriteJSONtoWS(j.conn, Message{Value: v})
return j.conn.WriteJSON(Message{Value: v})
}

// WriteResponse implements rpc.ServerCodec
func (j *JSONWebsocketCodec) WriteResponse(r *rpc.Response, v interface{}) error {
j.Lock()
defer j.Unlock()

if err := WriteJSONtoWS(j.conn, Message{Response: r}); err != nil {
if err := j.conn.WriteJSON(Message{Response: r}); err != nil {
return err
}
return WriteJSONtoWS(j.conn, Message{Value: v})
return j.conn.WriteJSON(Message{Value: v})
}

func (j *JSONWebsocketCodec) readMessage(v interface{}) (*Message, error) {
m := Message{Value: v}
if err := ReadJSONfromWS(j.conn, &m); err != nil {
if err := j.conn.ReadJSON(&m); err != nil {
j.err <- err
close(j.err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions common/xfer/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// to the UI.
type Pipe interface {
Ends() (io.ReadWriter, io.ReadWriter)
CopyToWebsocket(io.ReadWriter, *websocket.Conn) error
CopyToWebsocket(io.ReadWriter, Websocket) error

Close() error
Closed() bool
Expand Down Expand Up @@ -83,7 +83,7 @@ func (p *pipe) OnClose(f func()) {
}

// CopyToWebsocket copies pipe data to/from a websocket. It blocks.
func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn *websocket.Conn) error {
func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
p.mtx.Lock()
if p.closed {
p.mtx.Unlock()
Expand Down
155 changes: 140 additions & 15 deletions common/xfer/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,134 @@ package xfer

import (
"io"
"net/http"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"github.com/ugorji/go/codec"

"github.com/weaveworks/scope/common/mtime"
)

// IsExpectedWSCloseError returns boolean indicating whether the error is a
// clean disconnection.
func IsExpectedWSCloseError(err error) bool {
return websocket.IsCloseError(
err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer. Needs to be less
// than the idle timeout on whatever frontend server is proxying the
// websocket connections (e.g. nginx).
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait. The peer
// must respond with a pong in < pongWait. But it may take writeWait for the
// pong to be sent. Therefore we want to allow time for that, and a bit of
// delay/round-trip in case the peer is busy. 1/3 of pongWait seems like a
// reasonable amount of time to respond to a ping.
pingPeriod = ((pongWait - writeWait) * 2 / 3)
)

// Websocket exposes the bits of *websocket.Conn we actually use.
type Websocket interface {
ReadMessage() (messageType int, p []byte, err error)
WriteMessage(messageType int, data []byte) error
ReadJSON(v interface{}) error
WriteJSON(v interface{}) error
Close() error
}

type pingingWebsocket struct {
pinger *time.Timer
readLock sync.Mutex
writeLock sync.Mutex
conn *websocket.Conn
}

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

// Upgrade upgrades the HTTP server connection to the WebSocket protocol.
func Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (Websocket, error) {
wsConn, err := upgrader.Upgrade(w, r, responseHeader)
if err != nil {
return nil, err
}
return Ping(wsConn), nil
}

// WSDialer can dial a new websocket
type WSDialer interface {
Dial(urlStr string, requestHeader http.Header) (*websocket.Conn, *http.Response, error)
}

// DialWS creates a new client connection. Use requestHeader to specify the
// origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies (Cookie).
// Use the response.Header to get the selected subprotocol
// (Sec-WebSocket-Protocol) and cookies (Set-Cookie).
func DialWS(d WSDialer, urlStr string, requestHeader http.Header) (Websocket, *http.Response, error) {
wsConn, resp, err := d.Dial(urlStr, requestHeader)
if err != nil {
return nil, nil, err
}
return Ping(wsConn), resp, nil
}

// WriteJSONtoWS writes the JSON encoding of v to the connection.
func WriteJSONtoWS(c *websocket.Conn, v interface{}) error {
w, err := c.NextWriter(websocket.TextMessage)
// Ping adds a periodic ping to a websocket connection.
func Ping(c *websocket.Conn) Websocket {
p := &pingingWebsocket{conn: c}
p.conn.SetPongHandler(p.pong)
p.conn.SetReadDeadline(mtime.Now().Add(pongWait))
p.pinger = time.AfterFunc(pingPeriod, p.ping)
return p
}

func (p *pingingWebsocket) ping() {
p.writeLock.Lock()
defer p.writeLock.Unlock()
if err := p.conn.WriteControl(websocket.PingMessage, nil, mtime.Now().Add(writeWait)); err != nil {
log.Errorf("websocket ping error: %v", err)
p.Close()
}
p.pinger.Reset(pingPeriod)
}

func (p *pingingWebsocket) pong(string) error {
p.conn.SetReadDeadline(mtime.Now().Add(pongWait))
return nil
}

// ReadMessage is a helper method for getting a reader using NextReader and
// reading from that reader to a buffer.
func (p *pingingWebsocket) ReadMessage() (int, []byte, error) {
p.readLock.Lock()
defer p.readLock.Unlock()
return p.conn.ReadMessage()
}

// WriteMessage is a helper method for getting a writer using NextWriter,
// writing the message and closing the writer.
func (p *pingingWebsocket) WriteMessage(messageType int, data []byte) error {
p.writeLock.Lock()
defer p.writeLock.Unlock()
if err := p.conn.SetWriteDeadline(mtime.Now().Add(writeWait)); err != nil {
return err
}
return p.conn.WriteMessage(messageType, data)
}

// WriteJSON writes the JSON encoding of v to the connection.
func (p *pingingWebsocket) WriteJSON(v interface{}) error {
p.writeLock.Lock()
defer p.writeLock.Unlock()
w, err := p.conn.NextWriter(websocket.TextMessage)
if err != nil {
return err
}
if err := p.conn.SetWriteDeadline(mtime.Now().Add(writeWait)); err != nil {
return err
}
err1 := codec.NewEncoder(w, &codec.JsonHandle{}).Encode(v)
err2 := w.Close()
if err1 != nil {
Expand All @@ -32,10 +138,12 @@ func WriteJSONtoWS(c *websocket.Conn, v interface{}) error {
return err2
}

// ReadJSONfromWS reads the next JSON-encoded message from the connection and stores
// ReadJSON reads the next JSON-encoded message from the connection and stores
// it in the value pointed to by v.
func ReadJSONfromWS(c *websocket.Conn, v interface{}) error {
_, r, err := c.NextReader()
func (p *pingingWebsocket) ReadJSON(v interface{}) error {
p.readLock.Lock()
defer p.readLock.Unlock()
_, r, err := p.conn.NextReader()
if err != nil {
return err
}
Expand All @@ -46,3 +154,20 @@ func ReadJSONfromWS(c *websocket.Conn, v interface{}) error {
}
return err
}

// Close closes the connection
func (p *pingingWebsocket) Close() error {
p.pinger.Stop()
return p.conn.Close()
}

// IsExpectedWSCloseError returns boolean indicating whether the error is a
// clean disconnection.
func IsExpectedWSCloseError(err error) bool {
return websocket.IsCloseError(
err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
)
}
Loading

0 comments on commit 1d17a33

Please sign in to comment.