Skip to content

Commit

Permalink
Enhance WebSocket error handling and message queueing logic
Browse files Browse the repository at this point in the history
Introduced panic recovery in readMessages and SendMessage to prevent application crashes due to unexpected panics. Improved handling of closed channels in writeMessages to avoid runtime errors. Added checks for context cancellation (Context.Done) in SendMessage and writeMessages for better connection management.

Used a mutex to ensure thread-safe access to the WebSocket connection when sending messages in writeMessages. Enhanced error handling when the outgoing message buffer is full, ensuring graceful connection closure. Updated SendMessage to clarify behavior during context cancellation and buffer overflow scenarios.
  • Loading branch information
albeebe committed Dec 7, 2024
1 parent 9c59937 commit 835fd65
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
1 change: 1 addition & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type WebSocket struct {
IncomingMessages chan Message // Channel for received messages
outgoingMessages chan Message // Channel for outgoing messages
closeOnce sync.Once // Ensures close logic runs only once
writeMutex sync.Mutex // Synchronizes writes to the WebSocket connection.
conn *websocket.Conn // Underlying WebSocket connection
}

Expand Down
38 changes: 30 additions & 8 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ package websocket

import (
"context"
"errors"
"fmt"

"github.com/gorilla/websocket"
)

Expand Down Expand Up @@ -64,6 +66,12 @@ func NewWebSocketClient(url string) *WebSocket {
// readMessages continuously reads messages from the WebSocket connection and sends
// them to the IncomingMessages channel.
func (ws *WebSocket) readMessages() {
defer func() {
if r := recover(); r != nil {
ws.closeConnection(fmt.Errorf("panic during read message: %v", r))
}
}()

for {
select {
case <-ws.Context.Done(): // Context cancellation detected
Expand Down Expand Up @@ -98,8 +106,14 @@ func (ws *WebSocket) writeMessages() {
case <-ws.Context.Done(): // Context cancellation detected
ws.closeConnection(nil)
return
case message := <-ws.outgoingMessages: // Get the next message to send
case message, ok := <-ws.outgoingMessages:
if !ok {
// Channel is closed; exit the loop.
return
}
ws.writeMutex.Lock()
err := ws.conn.WriteMessage(int(message.Type), message.Data)
ws.writeMutex.Unlock()
if err != nil {
ws.closeConnection(err)
return
Expand All @@ -108,15 +122,23 @@ func (ws *WebSocket) writeMessages() {
}
}

// SendMessage queues a message to be sent through the WebSocket connection.
// SendMessage queues a message for sending through the WebSocket connection.
// Panics due to channel closure are gracefully handled.
func (ws *WebSocket) SendMessage(messageType MessageType, data []byte) {
defer func() {
if r := recover(); r != nil {
ws.closeConnection(fmt.Errorf("panic during message send: %v", r))
}
}()

select {
case ws.outgoingMessages <- Message{
Type: messageType,
Data: data,
}:
case <-ws.Context.Done(): // Prevent sending messages if context is canceled
ws.closeConnection(nil)
case <-ws.Context.Done():
// Connection context is canceled; message is not sent.
case ws.outgoingMessages <- Message{Type: messageType, Data: data}:
// Message successfully enqueued.
default:
// Buffer is full; close the connection with an error.
ws.closeConnection(errors.New("outgoing message buffer is full"))
}
}

Expand Down

0 comments on commit 835fd65

Please sign in to comment.