From 986f67d6b9d5fcfc9df1662610b6a0eb6e1a0b91 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Wed, 24 Jun 2020 11:53:08 +0300 Subject: [PATCH] Don't leak goroutines everywhere in the ws code Previous to this past running k6 compiled with the race detector and ws code will most definetely leak enough goroutines in 10 seconds that it will stop because of the 8k limit ... After this it seems to not leak anymore :confetti: --- js/modules/k6/ws/ws.go | 35 +++++++++++++++++++++++++++-------- js/modules/k6/ws/ws_test.go | 3 ++- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/js/modules/k6/ws/ws.go b/js/modules/k6/ws/ws.go index 078b976ac94..55a973ac18d 100644 --- a/js/modules/k6/ws/ws.go +++ b/js/modules/k6/ws/ws.go @@ -247,7 +247,7 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP readErrChan := make(chan error) // Wraps a couple of channels around conn.ReadMessage - go readPump(conn, readDataChan, readErrChan, readCloseChan) + go socket.readPump(readDataChan, readErrChan, readCloseChan) // This is the main control loop. All JS code (including error handlers) // should only be executed by this thread to avoid race conditions @@ -384,7 +384,11 @@ func (s *Socket) SetTimeout(fn goja.Callable, timeoutMs int) { go func() { select { case <-time.After(time.Duration(timeoutMs) * time.Millisecond): - s.scheduled <- fn + select { + case s.scheduled <- fn: + case <-s.done: + return + } case <-s.done: return @@ -402,7 +406,11 @@ func (s *Socket) SetInterval(fn goja.Callable, intervalMs int) { for { select { case <-ticker.C: - s.scheduled <- fn + select { + case s.scheduled <- fn: + case <-s.done: + return + } case <-s.done: return @@ -450,24 +458,35 @@ func (s *Socket) closeConnection(code int) error { } // Wraps conn.ReadMessage in a channel -func readPump(conn *websocket.Conn, readChan chan []byte, errorChan chan error, closeChan chan int) { +func (s *Socket) readPump(readChan chan []byte, errorChan chan error, closeChan chan int) { for { - _, message, err := conn.ReadMessage() + _, message, err := s.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError( err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { // Report an unexpected closure - errorChan <- err + select { + case errorChan <- err: + case <-s.done: + return + } } code := websocket.CloseGoingAway if e, ok := err.(*websocket.CloseError); ok { code = e.Code } - closeChan <- code + select { + case closeChan <- code: + case <-s.done: + } return } - readChan <- message + select { + case readChan <- message: + case <-s.done: + return + } } } diff --git a/js/modules/k6/ws/ws_test.go b/js/modules/k6/ws/ws_test.go index 1acf46a90bb..174d270dc7f 100644 --- a/js/modules/k6/ws/ws_test.go +++ b/js/modules/k6/ws/ws_test.go @@ -581,7 +581,8 @@ func TestReadPump(t *testing.T) { msgChan := make(chan []byte) errChan := make(chan error) closeChan := make(chan int) - go readPump(conn, msgChan, errChan, closeChan) + s := &Socket{conn: conn} + go s.readPump(msgChan, errChan, closeChan) readChans: for {