From de17a1d36818702210fa3cedf6092dc2b256b38e Mon Sep 17 00:00:00 2001 From: halulu Date: Mon, 3 Dec 2018 14:25:13 +0800 Subject: [PATCH] fix mux websocket receive message --- client/client.go | 2 +- core/mux/group.go | 7 ++++--- core/mux/websocket.go | 35 +++++++++++++++++++++++++++++------ core/websocket.go | 8 +++++--- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/client/client.go b/client/client.go index d4fdff2..6efb4bc 100644 --- a/client/client.go +++ b/client/client.go @@ -107,7 +107,7 @@ func (client *WebSocksClient) HandleConn(conn *net.TCPConn) { } //debug log - log.Printf("created new mux conn: %x", muxConn.ID) + log.Printf("created new mux conn: %x %s", muxConn.ID, host) muxConn.Run(conn) return diff --git a/core/mux/group.go b/core/mux/group.go index 1e2c471..e8a876d 100644 --- a/core/mux/group.go +++ b/core/mux/group.go @@ -54,7 +54,7 @@ func (group *Group) Handle(m *Message) { //debug log err := errors.New(fmt.Sprintf("conn does not exist: %x", m.ConnID)) log.Println(err.Error()) - log.Println(m) + log.Printf("%X %X %X %d", m.Method, m.ConnID, m.MessageID, m.Length) return } @@ -76,7 +76,9 @@ func (group *Group) AddConn(conn *Conn) { } func (group *Group) DeleteConn(id uint32) { + group.connMapMutex.Lock() delete(group.connMap, id) + group.connMapMutex.Unlock() return } @@ -119,12 +121,11 @@ func (group *Group) Listen(muxWS *MuxWebSocket) { for { m, err := muxWS.Receive() if err != nil { - log.Printf(err.Error()) + log.Println(err.Error()) return } go group.Handle(m) } - return }() } diff --git a/core/mux/websocket.go b/core/mux/websocket.go index 0a04aaf..cd06aaa 100644 --- a/core/mux/websocket.go +++ b/core/mux/websocket.go @@ -1,7 +1,9 @@ package mux import ( + "bytes" "io" + "log" "sync" "github.com/lzjluzijie/websocks/core" @@ -27,7 +29,10 @@ func (muxWS *MuxWebSocket) Send(m *Message) (err error) { muxWS.sMutex.Lock() _, err = io.Copy(muxWS, m) if err != nil { - //muxWS.Close() + e := muxWS.Close() + if e != nil { + log.Println(e.Error()) + } return } muxWS.sMutex.Unlock() @@ -39,26 +44,44 @@ func (muxWS *MuxWebSocket) Send(m *Message) (err error) { func (muxWS *MuxWebSocket) Receive() (m *Message, err error) { muxWS.rMutex.Lock() + h := make([]byte, 13) + _, err = muxWS.Read(h) if err != nil { - //muxWS.Close() + e := muxWS.Close() + if e != nil { + log.Println(e.Error()) + } return } + //debug log + //log.Printf("%d %x",n, h) + m = LoadMessage(h) - data := make([]byte, m.Length) + buf := &bytes.Buffer{} + r := io.LimitReader(muxWS, int64(m.Length)) - _, err = muxWS.Read(data) + _, err = io.Copy(buf, r) if err != nil { - //muxWS.Close() + e := muxWS.Close() + if e != nil { + log.Println(e.Error()) + } return } muxWS.rMutex.Unlock() - m.Data = data + m.Data = buf.Bytes() ////debug log //log.Printf("received %#v", m) return } + +func (muxWS *MuxWebSocket) Close() (err error) { + muxWS.group.MuxWSs = nil + err = muxWS.WebSocket.Close() + return +} diff --git a/core/websocket.go b/core/websocket.go index fd9521c..f0b8cdd 100644 --- a/core/websocket.go +++ b/core/websocket.go @@ -7,6 +7,8 @@ import ( "github.com/gorilla/websocket" ) +var ErrWebSocketClosed = errors.New("websocket closed") + type WebSocket struct { conn *websocket.Conn buf []byte @@ -28,7 +30,7 @@ func NewWebSocket(conn *websocket.Conn, stats *Stats) (ws *WebSocket) { func (ws *WebSocket) Read(p []byte) (n int, err error) { if ws.closed == true { - return 0, errors.New("websocket closed") + return 0, ErrWebSocketClosed } if len(ws.buf) == 0 { @@ -51,7 +53,7 @@ func (ws *WebSocket) Read(p []byte) (n int, err error) { func (ws *WebSocket) Write(p []byte) (n int, err error) { if ws.closed == true { - return 0, errors.New("websocket closed") + return 0, ErrWebSocketClosed } err = ws.conn.WriteMessage(websocket.BinaryMessage, p) @@ -68,7 +70,7 @@ func (ws *WebSocket) Write(p []byte) (n int, err error) { } func (ws *WebSocket) Close() (err error) { - ws.conn.Close() ws.closed = true + err = ws.conn.Close() return }