From 3a1d12e8b441121329b316d077a4c92668c0e398 Mon Sep 17 00:00:00 2001 From: genshen Date: Sat, 1 Aug 2020 23:01:24 +0800 Subject: [PATCH 1/4] feat(client): set timeout for sending heartbeat to wssocks server --- client/client.go | 2 +- wss/heart_beat.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index 043b94c..a37c798 100644 --- a/client/client.go +++ b/client/client.go @@ -176,7 +176,7 @@ func (c *client) Run() error { go func() { defer wg.Done() defer once.Do(closeAll) - if err := hdl.hb.Start(hbCtx); err != nil { + if err := hdl.hb.Start(hbCtx, time.Minute); err != nil { log.Info("heartbeat ending", err) } }() diff --git a/wss/heart_beat.go b/wss/heart_beat.go index 30e46b9..fbe2ed1 100644 --- a/wss/heart_beat.go +++ b/wss/heart_beat.go @@ -31,7 +31,7 @@ func (hb *HeartBeat) Close() { } // start sending heart beat to server. -func (hb *HeartBeat) Start(ctx context.Context) error { +func (hb *HeartBeat) Start(ctx context.Context, writeTimeout time.Duration) error { t := time.NewTicker(time.Second * 15) defer t.Stop() for { @@ -44,7 +44,8 @@ func (hb *HeartBeat) Start(ctx context.Context) error { Type: WsTpBeats, Data: nil, } - if err := wsjson.Write(context.TODO(), hb.wsc.WsConn, heartBeats); err != nil { + writeCtx, _ := context.WithTimeout(ctx, writeTimeout) + if err := wsjson.Write(writeCtx, hb.wsc.WsConn, heartBeats); err != nil { return err } } From cf739b89f733a8f430b60d3466d7d37292d7dd49 Mon Sep 17 00:00:00 2001 From: genshen Date: Sun, 2 Aug 2020 01:54:12 +0800 Subject: [PATCH 2/4] refactor(server): rm ctx passed to establish in ProxyEstablish, use timeout ctx in establish instead In establish function of ProxyEstablish interface, we donot pass context to establish, instead, we setup timeout context in establish function for usage. --- wss/proxy_server.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/wss/proxy_server.go b/wss/proxy_server.go index 79306a5..b3ea853 100644 --- a/wss/proxy_server.go +++ b/wss/proxy_server.go @@ -23,7 +23,7 @@ type Connector struct { // interface of establishing proxy connection with target type ProxyEstablish interface { - establish(ctx context.Context, hub *Hub, id ksuid.KSUID, proxyType int, addr string, data []byte) error + establish(hub *Hub, id ksuid.KSUID, proxyType int, addr string, data []byte) error // data from client todo data with type onData(data ClientData) error @@ -116,8 +116,7 @@ func establishProxy(hub *Hub, proxyMeta ProxyRegister) { e = &DefaultProxyEst{} } - ctx, _ := context.WithCancel(context.Background()) - err := e.establish(ctx, hub, proxyMeta.id, proxyMeta._type, proxyMeta.addr, proxyMeta.withData) + err := e.establish(hub, proxyMeta.id, proxyMeta._type, proxyMeta.addr, proxyMeta.withData) if err == nil { hub.tellClosed(proxyMeta.id) // tell client to close connection. } else if err != ConnCloseByClient { @@ -153,7 +152,7 @@ func (e *DefaultProxyEst) Close(tell bool) error { } // data: data send in establish step (can be nil). -func (e *DefaultProxyEst) establish(ctx context.Context, hub *Hub, id ksuid.KSUID, proxyType int, addr string, data []byte) error { +func (e *DefaultProxyEst) establish(hub *Hub, id ksuid.KSUID, proxyType int, addr string, data []byte) error { conn, err := net.DialTimeout("tcp", addr, time.Second*8) // todo config timeout if err != nil { return err @@ -168,6 +167,8 @@ func (e *DefaultProxyEst) establish(ctx context.Context, hub *Hub, id ksuid.KSUI hub.register <- &ProxyServer{Id: id, ProxyIns: e} defer hub.RemoveProxy(id) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() switch proxyType { case ProxyTypeSocks5: if err := hub.WriteProxyMessage(ctx, id, TagData, []byte{0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}); err != nil { @@ -180,7 +181,7 @@ func (e *DefaultProxyEst) establish(ctx context.Context, hub *Hub, id ksuid.KSUI } go func() { - writer := WebSocketWriter{WSC: &hub.ConcurrentWebSocket, Id: id, Ctx: context.TODO()} + writer := WebSocketWriter{WSC: &hub.ConcurrentWebSocket, Id: id, Ctx: context.Background()} if _, err := io.Copy(&writer, conn); err != nil { log.Error("copy error,", err) e.done <- ChanDone{true, err} @@ -212,9 +213,11 @@ func (h *HttpProxyEst) Close(tell bool) error { return h.bodyReadCloser.Close() // close from client } -func (h *HttpProxyEst) establish(ctx context.Context, hub *Hub, id ksuid.KSUID, proxyType int, addr string, header []byte) error { +func (h *HttpProxyEst) establish(hub *Hub, id ksuid.KSUID, proxyType int, addr string, header []byte) error { if header == nil { hub.tellClosed(id) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() _ = hub.WriteProxyMessage(ctx, id, TagEstErr, nil) return errors.New("http header empty") } @@ -233,7 +236,9 @@ func (h *HttpProxyEst) establish(ctx context.Context, hub *Hub, id ksuid.KSUID, } }() - if err := hub.WriteProxyMessage(ctx, id, TagEstOk, nil); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + if err := hub.ConcurrentWebSocket.WriteProxyMessage(ctx, id, TagEstOk, nil); err != nil { return err } @@ -252,7 +257,7 @@ func (h *HttpProxyEst) establish(ctx context.Context, hub *Hub, id ksuid.KSUID, } defer resp.Body.Close() - writer := WebSocketWriter{WSC: &hub.ConcurrentWebSocket, Id: id, Ctx: context.TODO()} + writer := WebSocketWriter{WSC: &hub.ConcurrentWebSocket, Id: id, Ctx: context.Background()} var headerBuffer bytes.Buffer HttpRespHeader(&headerBuffer, resp) writer.Write(headerBuffer.Bytes()) From 1bc664fa382c14fdf7b375c6081aee49bf003a2d Mon Sep 17 00:00:00 2001 From: genshen Date: Sun, 2 Aug 2020 11:05:14 +0800 Subject: [PATCH 3/4] feat(client): use timeout context or cancel context to writing data to server --- wss/websocket_client.go | 5 ++++- wss/wssocks_client.go | 7 +++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/wss/websocket_client.go b/wss/websocket_client.go index 5c600e7..9dfc399 100644 --- a/wss/websocket_client.go +++ b/wss/websocket_client.go @@ -9,6 +9,7 @@ import ( "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" "sync" + "time" ) // WebSocketClient is a collection of proxy clients. @@ -71,7 +72,9 @@ func (wsc *WebSocketClient) TellClose(id ksuid.KSUID) error { Type: WsTpClose, Data: nil, } - if err := wsjson.Write(context.TODO(), wsc.WsConn, &finish); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + if err := wsjson.Write(ctx, wsc.WsConn, &finish); err != nil { return err } return nil diff --git a/wss/wssocks_client.go b/wss/wssocks_client.go index 380184a..4c41b67 100644 --- a/wss/wssocks_client.go +++ b/wss/wssocks_client.go @@ -153,9 +153,12 @@ func (client *Client) transData(wsc *WebSocketClient, conn *net.TCPConn, firstSe } // trans incoming data from proxy client application. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // cancel data writing go func() { - writer := WebSocketWriter{WSC: &wsc.ConcurrentWebSocket, Id: proxy.Id, Ctx: context.TODO()} - if _, err := io.Copy(&writer, conn); err != nil { + writer := WebSocketWriter{WSC: &wsc.ConcurrentWebSocket, Id: proxy.Id, Ctx: ctx} + _, err := io.Copy(&writer, conn) + if err != nil { log.Error("write error:", err) } done <- Done{true, nil} From 6df52080c159ab5c81c62de600b460005d1acf08 Mon Sep 17 00:00:00 2001 From: genshen Date: Sun, 2 Aug 2020 13:49:52 +0800 Subject: [PATCH 4/4] feat(client): use more specified context in establish and http proxy client to send data to server In establish, use cancel context to send establish message to server; in http proxy client, use cancel context for writing TagNoMore message to server and use background context to copy data from user side(e.g browser, curl) to server. --- wss/proxy_client.go | 16 ++++++++++++---- wss/proxy_client_http.go | 12 ++++++++---- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/wss/proxy_client.go b/wss/proxy_client.go index 1f94747..b99b1d8 100644 --- a/wss/proxy_client.go +++ b/wss/proxy_client.go @@ -3,6 +3,8 @@ package wss import ( "context" "encoding/base64" + "time" + "github.com/segmentio/ksuid" log "github.com/sirupsen/logrus" "nhooyr.io/websocket/wsjson" @@ -28,8 +30,8 @@ type ServerData struct { Data []byte } -// handel socket dial results processing -// copy income connection data to proxy serve via websocket +// tell wssocks proxy server to establish a proxy connection by sending server +// proxy address, type, initial data. func (p *ProxyClient) Establish(wsc *WebSocketClient, firstSendData []byte, proxyType int, addr string) error { estMsg := ProxyEstMessage{ Type: proxyType, @@ -40,8 +42,14 @@ func (p *ProxyClient) Establish(wsc *WebSocketClient, firstSendData []byte, prox estMsg.WithData = true estMsg.DataBase64 = base64.StdEncoding.EncodeToString(firstSendData) } - addrSend := WebSocketMessage{Type: WsTpEst, Id: p.Id.String(), Data: estMsg} - if err := wsjson.Write(context.TODO(), wsc.WsConn, &addrSend); err != nil { + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + if err := wsjson.Write(ctx, wsc.WsConn, &WebSocketMessage{ + Type: WsTpEst, + Id: p.Id.String(), + Data: estMsg, + }); err != nil { log.Error("json error:", err) return err } diff --git a/wss/proxy_client_http.go b/wss/proxy_client_http.go index 921f5cc..c6ef0e6 100644 --- a/wss/proxy_client_http.go +++ b/wss/proxy_client_http.go @@ -3,12 +3,13 @@ package wss import ( "bytes" "context" - "github.com/segmentio/ksuid" - log "github.com/sirupsen/logrus" "io" "net" "net/http" "net/url" + + "github.com/segmentio/ksuid" + log "github.com/sirupsen/logrus" ) type HttpClient struct { @@ -83,7 +84,7 @@ func (client *HttpClient) ServeHTTP(w http.ResponseWriter, req *http.Request) { } // copy request body data - writer := WebSocketWriter{WSC: &client.wsc.ConcurrentWebSocket, Id: proxy.Id, Ctx: context.TODO()} + writer := WebSocketWriter{WSC: &client.wsc.ConcurrentWebSocket, Id: proxy.Id, Ctx: context.Background()} if _, err := io.Copy(&writer, req.Body); err != nil { log.Error("write body error:", err) client.wsc.RemoveProxy(proxy.Id) @@ -92,7 +93,10 @@ func (client *HttpClient) ServeHTTP(w http.ResponseWriter, req *http.Request) { } return } - if err := client.wsc.WriteProxyMessage(context.TODO(), proxy.Id, TagNoMore, nil); err != nil { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := client.wsc.WriteProxyMessage(ctx, proxy.Id, TagNoMore, nil); err != nil { log.Error("write body error:", err) client.wsc.RemoveProxy(proxy.Id) if err := client.wsc.TellClose(proxy.Id); err != nil {