Skip to content

Commit

Permalink
Merge pull request #1768 from weaveworks/1202-silence-abnormal-close
Browse files Browse the repository at this point in the history
silence abnormal websocket close

Fixes #1202.
  • Loading branch information
rade authored Aug 12, 2016
2 parents 21c188d + 7480654 commit 6334836
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 16 deletions.
3 changes: 1 addition & 2 deletions app/controls.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package app

import (
"io"
"net/http"
"net/rpc"

Expand Down Expand Up @@ -84,7 +83,7 @@ func handleProbeWS(cr ControlRouter) CtxHandlerFunc {
return
}
defer cr.Deregister(ctx, probeID, id)
if err := codec.WaitForReadError(); err != nil && err != io.EOF && !xfer.IsExpectedWSCloseError(err) {
if err := codec.WaitForReadError(); err != nil && !xfer.IsExpectedWSCloseError(err) {
log.Errorf("Error on websocket: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/pipe_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe,
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
log.Infof("Creating pipe id %s", id)
log.Debugf("Creating pipe id %s", id)
p = &pipe{
ui: end{lastUsedTime: mtime.Now()},
probe: end{lastUsedTime: mtime.Now()},
Expand Down
8 changes: 4 additions & 4 deletions app/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
id := mux.Vars(r)["pipeID"]
pipe, endIO, err := pr.Get(ctx, id, end)
if err != nil {
log.Errorf("Error getting pipe %s: %v", id, err)
// this usually means the pipe has been closed
log.Debugf("Error getting pipe %s: %v", id, err)
http.NotFound(w, r)
return
}
Expand All @@ -65,17 +66,16 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
}
defer conn.Close()

log.Infof("Success got pipe %s:%s", id, end)
if err := pipe.CopyToWebsocket(endIO, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
log.Printf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
log.Errorf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
}
}
}

func deletePipe(pr PipeRouter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
log.Infof("Deleting pipe %s", pipeID)
log.Debugf("Deleting pipe %s", pipeID)
if err := pr.Delete(ctx, pipeID); err != nil {
respondWith(w, http.StatusInternalServerError, err)
}
Expand Down
4 changes: 2 additions & 2 deletions common/xfer/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ func (p *pingingWebsocket) Close() error {
// IsExpectedWSCloseError returns boolean indicating whether the error is a
// clean disconnection.
func IsExpectedWSCloseError(err error) bool {
return websocket.IsCloseError(
err,
return err == io.EOF || err == io.ErrClosedPipe || websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
)
}
5 changes: 4 additions & 1 deletion probe/appclient/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ func (c *appClient) pipeConnection(id string, pipe xfer.Pipe) (bool, error) {
defer c.closeConn(id)

_, remote := pipe.Ends()
return false, pipe.CopyToWebsocket(remote, conn)
if err := pipe.CopyToWebsocket(remote, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
return false, err
}
return false, nil
}

func (c *appClient) PipeConnection(id string, pipe xfer.Pipe) {
Expand Down
10 changes: 4 additions & 6 deletions probe/docker/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ func (r *registry) attachContainer(containerID string, req xfer.Request) xfer.Re
}
pipe.OnClose(func() {
if err := cw.Close(); err != nil {
log.Errorf("Error closing attachment: %v", err)
log.Errorf("Error closing attachment to container %s: %v", containerID, err)
return
}
log.Infof("Attachment to container %s closed.", containerID)
})
go func() {
if err := cw.Wait(); err != nil {
log.Errorf("Error waiting on exec: %v", err)
log.Errorf("Error waiting on attachment to container %s: %v", containerID, err)
}
pipe.Close()
}()
Expand Down Expand Up @@ -136,14 +135,13 @@ func (r *registry) execContainer(containerID string, req xfer.Request) xfer.Resp
}
pipe.OnClose(func() {
if err := cw.Close(); err != nil {
log.Errorf("Error closing exec: %v", err)
log.Errorf("Error closing exec in container %s: %v", containerID, err)
return
}
log.Infof("Exec on container %s closed.", containerID)
})
go func() {
if err := cw.Wait(); err != nil {
log.Errorf("Error waiting on exec: %v", err)
log.Errorf("Error waiting on exec in container %s: %v", containerID, err)
}
pipe.Close()
}()
Expand Down

0 comments on commit 6334836

Please sign in to comment.