Skip to content

Commit

Permalink
Fix bugs relaed to tcp/tcpmux channel handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Musixal committed Oct 13, 2024
1 parent 24f2967 commit dde7951
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 61 deletions.
20 changes: 11 additions & 9 deletions internal/client/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,16 @@ func (c *TcpTransport) Restart() {
defer c.restartMutex.Unlock()

c.logger.Info("restarting client...")

if c.cancel != nil {
c.cancel()
}

// close control channel connection
if c.controlChannel != nil {
c.controlChannel.Close()
}

time.Sleep(2 * time.Second)

ctx, cancel := context.WithCancel(c.parentctx)
Expand Down Expand Up @@ -219,7 +225,6 @@ func (c *TcpTransport) poolMaintainer() {

func (c *TcpTransport) channelHandler() {
msgChan := make(chan byte, 1000)
errChan := make(chan error, 1000)

// Goroutine to handle the blocking ReceiveBinaryString
go func() {
Expand All @@ -230,7 +235,8 @@ func (c *TcpTransport) channelHandler() {
default:
msg, err := utils.ReceiveBinaryByte(c.controlChannel)
if err != nil {
errChan <- err
c.logger.Error("failed to read from control channel. ", err)
go c.Restart()
return
}
msgChan <- msg
Expand All @@ -249,6 +255,7 @@ func (c *TcpTransport) channelHandler() {
switch msg {
case utils.SG_Chan:
atomic.AddInt32(&c.loadConnections, 1)

select {
case <-c.controlFlow: // Do nothing

Expand All @@ -261,7 +268,7 @@ func (c *TcpTransport) channelHandler() {
c.logger.Debug("heartbeat signal received successfully")

case utils.SG_Closed:
c.logger.Info("control channel has been closed by the server")
c.logger.Warn("control channel has been closed by the server")
go c.Restart()
return

Expand All @@ -274,15 +281,10 @@ func (c *TcpTransport) channelHandler() {
}

default:
c.logger.Errorf("unexpected response from channel: %v. Restarting client...", msg)
c.logger.Errorf("unexpected response from channel: %v.", msg)
go c.Restart()
return
}
case err := <-errChan:
// Handle errors from the control channel
c.logger.Error("failed to read channel signal, restarting client: ", err)
go c.Restart()
return
}
}
}
Expand Down
30 changes: 17 additions & 13 deletions internal/client/transport/tcpmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,16 @@ func (c *TcpMuxTransport) Restart() {
defer c.restartMutex.Unlock()

c.logger.Info("restarting client...")

if c.cancel != nil {
c.cancel()
}

// close control channel connection
if c.controlChannel != nil {
c.controlChannel.Close()
}

time.Sleep(2 * time.Second)

ctx, cancel := context.WithCancel(c.parentctx)
Expand Down Expand Up @@ -182,8 +188,6 @@ func (c *TcpMuxTransport) channelDialer() {

}



func (c *TcpMuxTransport) poolMaintainer() {
for i := 0; i < c.config.ConnPoolSize; i++ { //initial pool filling
go c.tunnelDialer()
Expand Down Expand Up @@ -237,7 +241,6 @@ func (c *TcpMuxTransport) poolMaintainer() {

func (c *TcpMuxTransport) channelHandler() {
msgChan := make(chan byte, 1000)
errChan := make(chan error, 1000)

// Goroutine to handle the blocking ReceiveBinaryString
go func() {
Expand All @@ -248,7 +251,8 @@ func (c *TcpMuxTransport) channelHandler() {
default:
msg, err := utils.ReceiveBinaryByte(c.controlChannel)
if err != nil {
errChan <- err
c.logger.Error("failed to read from control channel. ", err)
go c.Restart()
return
}
msgChan <- msg
Expand All @@ -262,10 +266,12 @@ func (c *TcpMuxTransport) channelHandler() {
case <-c.ctx.Done():
_ = utils.SendBinaryByte(c.controlChannel, utils.SG_Closed)
return

case msg := <-msgChan:
switch msg {
case utils.SG_Chan:
atomic.AddInt32(&c.loadConnections, 1)

select {
case <-c.controlFlow: // Do nothing

Expand All @@ -274,22 +280,20 @@ func (c *TcpMuxTransport) channelHandler() {
go c.tunnelDialer()
}

case utils.SG_HB:
c.logger.Debug("heartbeat signal received successfully")

case utils.SG_Closed:
c.logger.Info("control channel has been closed by the server")
c.logger.Warn("control channel has been closed by the server")
go c.Restart()
return
case utils.SG_HB:
c.logger.Debug("heartbeat signal received successfully")

default:
c.logger.Errorf("unexpected response from channel: %v. Restarting client...", msg)
c.logger.Errorf("unexpected response from channel: %v.", msg)
go c.Restart()
return
}
case err := <-errChan:
// Handle errors from the control channel
c.logger.Error("failed to read channel signal, restarting client: ", err)
go c.Restart()
return

}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/client/transport/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (c *WsTransport) channelHandler() {
c.logger.Debug("heartbeat signal received successfully")

case utils.SG_Closed:
c.logger.Info("control channel has been closed by the server")
c.logger.Warn("control channel has been closed by the server")
go c.Restart()
return

Expand Down
2 changes: 1 addition & 1 deletion internal/client/transport/wsmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (c *WsMuxTransport) channelHandler() {
c.logger.Debug("heartbeat received successfully")

case utils.SG_Closed:
c.logger.Info("control channel has been closed by the server")
c.logger.Warn("control channel has been closed by the server")
go c.Restart()
return

Expand Down
36 changes: 19 additions & 17 deletions internal/server/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,7 @@ func (s *TcpTransport) channelHandler() {
defer ticker.Stop()

// Channel to receive the message or error
resultChan := make(chan struct {
message byte
err error
})
messageChan := make(chan byte, 1)

go func() {
for {
Expand All @@ -201,10 +198,13 @@ func (s *TcpTransport) channelHandler() {
return
default:
message, err := utils.ReceiveBinaryByte(s.controlChannel)
resultChan <- struct {
message byte
err error
}{message, err}
if err != nil {
s.logger.Error("failed to read from channel connection. ", err)
close(messageChan) // Closing the channel to signal completion
go s.Restart()
return
}
messageChan <- message
}
}
}()
Expand All @@ -227,30 +227,32 @@ func (s *TcpTransport) channelHandler() {
case <-s.reqNewConnChan:
err := utils.SendBinaryByte(s.controlChannel, utils.SG_Chan)
if err != nil {
s.logger.Error("error sending channel signal, attempting to restart server...")
s.logger.Error("failed to send request new connection signal. ", err)
go s.Restart()
return
}

case <-ticker.C:
err := utils.SendBinaryByte(s.controlChannel, utils.SG_HB)
if err != nil {
s.logger.Error("failed to send heartbeat signal, attempting to restart server...")
s.logger.Error("failed to send heartbeat signal")
go s.Restart()
return
}
s.logger.Trace("heartbeat signal sent successfully")

case result := <-resultChan:
if result.err != nil {
s.logger.Errorf("failed to receive message from channel connection: %v", result.err)
go s.Restart()
case message, ok := <-messageChan:
if !ok {
s.logger.Error("channel closed, likely due to an error in TCP read")
return
} else if result.message == utils.SG_Closed {
s.logger.Fatal("control channel has been closed by the client")
}

if message == utils.SG_Closed {
s.logger.Warn("control channel has been closed by the client")
go s.Restart()
return
} else if result.message == utils.SG_RTT {

} else if message == utils.SG_RTT {
measureRTT := time.Since(rtt)
s.rtt = measureRTT.Milliseconds()
s.logger.Infof("Round Trip Time (RTT): %d ms", s.rtt)
Expand Down
34 changes: 18 additions & 16 deletions internal/server/transport/tcpmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,48 +205,50 @@ func (s *TcpMuxTransport) channelHandler() {
defer ticker.Stop()

// Channel to receive the message or error
resultChan := make(chan struct {
message byte
err error
})
messageChan := make(chan byte, 1)

go func() {
message, err := utils.ReceiveBinaryByte(s.controlChannel)
resultChan <- struct {
message byte
err error
}{message, err}
if err != nil {
s.logger.Error("failed to read from channel connection. ", err)
close(messageChan) // Closing the channel to signal completion
go s.Restart()
return
}
messageChan <- message
}()

for {
select {
case <-s.ctx.Done():
_ = utils.SendBinaryByte(s.controlChannel, utils.SG_Closed)
return

case <-s.reqNewConnChan:
err := utils.SendBinaryByte(s.controlChannel, utils.SG_Chan)
if err != nil {
s.logger.Error("error sending channel signal, attempting to restart server...")
s.logger.Error("failed to send request new connection signal. ", err)
go s.Restart()
return
}

case <-ticker.C:
err := utils.SendBinaryByte(s.controlChannel, utils.SG_HB)
if err != nil {
s.logger.Error("failed to send heartbeat signal, attempting to restart server...")
s.logger.Error("failed to send heartbeat signal")
go s.Restart()
return
}
s.logger.Trace("heartbeat signal sent successfully")

case result := <-resultChan:
if result.err != nil {
s.logger.Errorf("failed to receive message from channel connection: %v", result.err)
go s.Restart()
case message, ok := <-messageChan:
if !ok {
s.logger.Error("channel closed, likely due to an error in TCP read")
return
}
if result.message == utils.SG_Closed {
s.logger.Info("control channel has been closed by the client")

if message == utils.SG_Closed {
s.logger.Warn("control channel has been closed by the client")
go s.Restart()
return
}
Expand Down
4 changes: 2 additions & 2 deletions internal/server/transport/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *WsTransport) channelHandler() {

// Handle specific control channel messages
if bytes.Equal(message, []byte{utils.SG_Closed}) {
s.logger.Info("control channel has been closed by the client")
s.logger.Warn("control channel has been closed by the client")
s.Restart()
return
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (s *WsTransport) tunnelListener() {

if r.URL.Path == "/channel" {
if s.controlChannel != nil {
s.logger.Info("new control channel requested. restarting server...")
s.logger.Warn("new control channel requested.")
s.controlChannel.Close()
conn.Close()
go s.Restart()
Expand Down
4 changes: 2 additions & 2 deletions internal/server/transport/wsmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *WsMuxTransport) channelHandler() {

// Handle specific control channel messages
if bytes.Equal(message, []byte{utils.SG_Closed}) {
s.logger.Info("control channel has been closed by the client")
s.logger.Warn("control channel has been closed by the client")
s.Restart()
return
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *WsMuxTransport) tunnelListener() {

if r.URL.Path == "/channel" {
if s.controlChannel != nil {
s.logger.Info("new control channel requested. restarting server...")
s.logger.Warn("new control channel requested.")
s.controlChannel.Close()
conn.Close()
go s.Restart()
Expand Down

0 comments on commit dde7951

Please sign in to comment.