Skip to content

Commit

Permalink
Don't leak goroutines everywhere in the ws code
Browse files Browse the repository at this point in the history
Previous to this past running k6 compiled with the race detector and ws
code will most definetely leak enough goroutines in 10 seconds that it
will stop because of the 8k limit ...
After this it seems to not leak anymore :confetti:
  • Loading branch information
mstoykov committed Jun 24, 2020
1 parent 56a03ae commit 986f67d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
35 changes: 27 additions & 8 deletions js/modules/k6/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP
readErrChan := make(chan error)

// Wraps a couple of channels around conn.ReadMessage
go readPump(conn, readDataChan, readErrChan, readCloseChan)
go socket.readPump(readDataChan, readErrChan, readCloseChan)

// This is the main control loop. All JS code (including error handlers)
// should only be executed by this thread to avoid race conditions
Expand Down Expand Up @@ -384,7 +384,11 @@ func (s *Socket) SetTimeout(fn goja.Callable, timeoutMs int) {
go func() {
select {
case <-time.After(time.Duration(timeoutMs) * time.Millisecond):
s.scheduled <- fn
select {
case s.scheduled <- fn:
case <-s.done:
return
}

case <-s.done:
return
Expand All @@ -402,7 +406,11 @@ func (s *Socket) SetInterval(fn goja.Callable, intervalMs int) {
for {
select {
case <-ticker.C:
s.scheduled <- fn
select {
case s.scheduled <- fn:
case <-s.done:
return
}

case <-s.done:
return
Expand Down Expand Up @@ -450,24 +458,35 @@ func (s *Socket) closeConnection(code int) error {
}

// Wraps conn.ReadMessage in a channel
func readPump(conn *websocket.Conn, readChan chan []byte, errorChan chan error, closeChan chan int) {
func (s *Socket) readPump(readChan chan []byte, errorChan chan error, closeChan chan int) {
for {
_, message, err := conn.ReadMessage()
_, message, err := s.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
// Report an unexpected closure
errorChan <- err
select {
case errorChan <- err:
case <-s.done:
return
}
}
code := websocket.CloseGoingAway
if e, ok := err.(*websocket.CloseError); ok {
code = e.Code
}
closeChan <- code
select {
case closeChan <- code:
case <-s.done:
}
return
}

readChan <- message
select {
case readChan <- message:
case <-s.done:
return
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ func TestReadPump(t *testing.T) {
msgChan := make(chan []byte)
errChan := make(chan error)
closeChan := make(chan int)
go readPump(conn, msgChan, errChan, closeChan)
s := &Socket{conn: conn}
go s.readPump(msgChan, errChan, closeChan)

readChans:
for {
Expand Down

0 comments on commit 986f67d

Please sign in to comment.