Skip to content

Commit

Permalink
Merge pull request #1172 from weaveworks/ws-pipe-fixes
Browse files Browse the repository at this point in the history
Various websocket and pipe fixes.
  • Loading branch information
tomwilkie committed Mar 15, 2016
2 parents fd37851 + 581b711 commit 88c39e2
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 8 deletions.
18 changes: 18 additions & 0 deletions app/pipe_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ const (
ProbeEnd
)

func (e End) String() string {
if e == UIEnd {
return "ui"
}
return "probe"
}

// PipeRouter stores pipes and allows you to connect to either end of them.
type PipeRouter interface {
Exists(context.Context, string) (bool, error)
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, error)
Release(context.Context, string, End) error
Delete(context.Context, string) error
Expand Down Expand Up @@ -78,6 +86,16 @@ func NewLocalPipeRouter() PipeRouter {
return pipeRouter
}

func (pr *localPipeRouter) Exists(_ context.Context, id string) (bool, error) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return true, nil
}
return !p.Closed(), nil
}

func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, error) {
pr.Lock()
defer pr.Unlock()
Expand Down
12 changes: 7 additions & 5 deletions app/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ func RegisterPipeRoutes(router *mux.Router, pr PipeRouter) {
func checkPipe(pr PipeRouter, end End) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["pipeID"]
_, _, err := pr.Get(ctx, id, end)
exists, err := pr.Exists(ctx, id)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
} else if exists {
w.WriteHeader(http.StatusNoContent)
return
} else {
http.NotFound(w, r)
}
pr.Release(ctx, id, end)
}
}

Expand All @@ -59,7 +61,7 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
}
defer conn.Close()

log.Infof("Pipe success %s (%d)", id, end)
log.Infof("Success got pipe %s:%s", id, end)
if err := pipe.CopyToWebsocket(endIO, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
log.Printf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
}
Expand All @@ -69,7 +71,7 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
func deletePipe(pr PipeRouter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
log.Infof("Closing pipe %s", pipeID)
log.Infof("Deleting pipe %s", pipeID)
if err := pr.Delete(ctx, pipeID); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions client/app/scripts/utils/web-api-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ export function getPipeStatus(pipeId) {
},
success: function(res) {
const status = {
200: 'PIPE_ALIVE',
204: 'PIPE_DELETED'
204: 'PIPE_ALIVE',
404: 'PIPE_DELETED'
}[res.status];

if (!status) {
Expand Down
3 changes: 2 additions & 1 deletion common/xfer/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type WSDialer interface {
func DialWS(d WSDialer, urlStr string, requestHeader http.Header) (Websocket, *http.Response, error) {
wsConn, resp, err := d.Dial(urlStr, requestHeader)
if err != nil {
return nil, nil, err
return nil, resp, err
}
return Ping(wsConn), resp, nil
}
Expand All @@ -91,6 +91,7 @@ func (p *pingingWebsocket) ping() {
if err := p.conn.WriteControl(websocket.PingMessage, nil, mtime.Now().Add(writeWait)); err != nil {
log.Errorf("websocket ping error: %v", err)
p.Close()
return
}
p.pinger.Reset(pingPeriod)
}
Expand Down

0 comments on commit 88c39e2

Please sign in to comment.