Skip to content

Commit

Permalink
Add fix for send on closed channel - Issue #298
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jan 21, 2022
1 parent ecadd8e commit 3930870
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions internal/events/websockets/websocket_connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -44,6 +44,7 @@ type websocketConnection struct {
connID string
sendMessages chan interface{}
senderDone chan struct{}
receiverDone chan struct{}
autoAck bool
started []*websocketStartedSub
inflight []*fftypes.EventDeliveryResponse
Expand All @@ -64,6 +65,7 @@ func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn)
connID: connID,
sendMessages: make(chan interface{}),
senderDone: make(chan struct{}),
receiverDone: make(chan struct{}),
}
go wc.sendLoop()
go wc.receiveLoop()
Expand Down Expand Up @@ -104,11 +106,7 @@ func (wc *websocketConnection) sendLoop() {
defer wc.close()
for {
select {
case msg, ok := <-wc.sendMessages:
if !ok {
l.Debugf("Sender closing")
return
}
case msg := <-wc.sendMessages:
l.Tracef("Sending: %+v", msg)
writer, err := wc.wsConn.NextWriter(websocket.TextMessage)
if err == nil {
Expand All @@ -119,6 +117,9 @@ func (wc *websocketConnection) sendLoop() {
l.Errorf("Write failed on socket: %s", err)
return
}
case <-wc.receiverDone:
l.Debugf("Sender closing - receiver completed")
return
case <-wc.ctx.Done():
l.Debugf("Sender closing - context cancelled")
return
Expand All @@ -128,7 +129,7 @@ func (wc *websocketConnection) sendLoop() {

func (wc *websocketConnection) receiveLoop() {
l := log.L(wc.ctx)
defer close(wc.sendMessages)
defer close(wc.receiverDone)
for {
var msgData []byte
var msgHeader fftypes.WSClientActionBase
Expand Down Expand Up @@ -361,5 +362,5 @@ func (wc *websocketConnection) close() {

func (wc *websocketConnection) waitClose() {
<-wc.senderDone
<-wc.sendMessages
<-wc.receiverDone
}

0 comments on commit 3930870

Please sign in to comment.