Skip to content

Commit

Permalink
Revert "[fix] better pool management"
Browse files Browse the repository at this point in the history
This reverts commit e89a8d1.
  • Loading branch information
Musixal committed Oct 7, 2024
1 parent 08ec615 commit 5af1ed5
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 36 deletions.
11 changes: 2 additions & 9 deletions internal/client/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type TcpTransport struct {
usageMonitor *web.Usage
restartMutex sync.Mutex
activeConnections int32
lastRequest time.Time
}
type TcpConfig struct {
RemoteAddr string
Expand Down Expand Up @@ -54,7 +53,6 @@ func NewTCPClient(parentCtx context.Context, config *TcpConfig, logger *logrus.L
controlChannel: nil, // will be set when a control connection is established
usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger),
activeConnections: 0,
lastRequest: time.Now(),
}

return client
Expand Down Expand Up @@ -92,7 +90,6 @@ func (c *TcpTransport) Restart() {
c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger)
c.config.TunnelStatus = ""
c.activeConnections = 0
c.lastRequest = time.Now()

go c.Start()

Expand Down Expand Up @@ -164,7 +161,7 @@ func (c *TcpTransport) channelDialer() {
}

func (c *TcpTransport) poolMaintainer() {
ticker := time.NewTicker(time.Millisecond * 500)
ticker := time.NewTicker(time.Millisecond * 350)
defer ticker.Stop()

for {
Expand All @@ -173,12 +170,9 @@ func (c *TcpTransport) poolMaintainer() {
return

case <-ticker.C:
if time.Since(c.lastRequest).Milliseconds() < 500 {
continue
}
activeConnections := int(c.activeConnections)
c.logger.Tracef("active connections: %d", c.activeConnections)
if activeConnections < c.config.ConnectionPool {
if activeConnections < c.config.ConnectionPool/2 {
neededConn := c.config.ConnectionPool - activeConnections
for i := 0; i < neededConn; i++ {
go c.tunnelDialer()
Expand Down Expand Up @@ -218,7 +212,6 @@ func (c *TcpTransport) channelHandler() {
switch msg {
case utils.SG_Chan:
c.logger.Debug("channel signal received, initiating tunnel dialer")
c.lastRequest = time.Now()
go c.tunnelDialer()
case utils.SG_HB:
c.logger.Debug("heartbeat signal received successfully")
Expand Down
11 changes: 2 additions & 9 deletions internal/client/transport/tcpmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type TcpMuxTransport struct {
usageMonitor *web.Usage
restartMutex sync.Mutex
activeConnections int32
lastRequest time.Time
}

type TcpMuxConfig struct {
Expand Down Expand Up @@ -69,7 +68,6 @@ func NewMuxClient(parentCtx context.Context, config *TcpMuxConfig, logger *logru
controlChannel: nil, // will be set when a control connection is established
activeConnections: 0,
usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger),
lastRequest: time.Now(),
}

return client
Expand Down Expand Up @@ -108,7 +106,6 @@ func (c *TcpMuxTransport) Restart() {
c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger)
c.config.TunnelStatus = ""
c.activeConnections = 0
c.lastRequest = time.Now()

go c.Start()

Expand Down Expand Up @@ -179,7 +176,7 @@ func (c *TcpMuxTransport) channelDialer() {
}

func (c *TcpMuxTransport) poolMaintainer() {
ticker := time.NewTicker(time.Millisecond * 500)
ticker := time.NewTicker(time.Millisecond * 350)
defer ticker.Stop()

for {
Expand All @@ -188,12 +185,9 @@ func (c *TcpMuxTransport) poolMaintainer() {
return

case <-ticker.C:
if time.Since(c.lastRequest).Milliseconds() < 500 {
continue
}
activeConnections := int(c.activeConnections)
c.logger.Tracef("active connections: %d", c.activeConnections)
if activeConnections < c.config.ConnectionPool {
if activeConnections < c.config.ConnectionPool/2 {
neededConn := c.config.ConnectionPool - activeConnections
for i := 0; i < neededConn; i++ {
go c.tunnelDialer()
Expand Down Expand Up @@ -233,7 +227,6 @@ func (c *TcpMuxTransport) channelHandler() {
switch msg {
case utils.SG_Chan:
c.logger.Debug("channel signal received, initiating tunnel dialer")
c.lastRequest = time.Now()
go c.tunnelDialer()

case utils.SG_Closed:
Expand Down
11 changes: 2 additions & 9 deletions internal/client/transport/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type WsTransport struct {
restartMutex sync.Mutex
usageMonitor *web.Usage
activeConnections int32
lastRequest time.Time
}
type WsConfig struct {
RemoteAddr string
Expand Down Expand Up @@ -57,7 +56,6 @@ func NewWSClient(parentCtx context.Context, config *WsConfig, logger *logrus.Log
controlChannel: nil, // will be set when a control connection is established
usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger),
activeConnections: 0,
lastRequest: time.Now(),
}

return client
Expand Down Expand Up @@ -97,7 +95,6 @@ func (c *WsTransport) Restart() {
c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger)
c.config.TunnelStatus = ""
c.activeConnections = 0
c.lastRequest = time.Now()

go c.Start()

Expand Down Expand Up @@ -131,7 +128,7 @@ func (c *WsTransport) channelDialer() {
}

func (c *WsTransport) poolMaintainer() {
ticker := time.NewTicker(time.Millisecond * 500)
ticker := time.NewTicker(time.Millisecond * 350)
defer ticker.Stop()

for {
Expand All @@ -140,12 +137,9 @@ func (c *WsTransport) poolMaintainer() {
return

case <-ticker.C:
if time.Since(c.lastRequest).Milliseconds() < 500 {
continue
}
activeConnections := int(c.activeConnections)
c.logger.Tracef("active connections: %d", c.activeConnections)
if activeConnections < c.config.ConnectionPool {
if activeConnections < c.config.ConnectionPool/2 {
neededConn := c.config.ConnectionPool - activeConnections
for i := 0; i < neededConn; i++ {
go c.tunnelDialer()
Expand Down Expand Up @@ -184,7 +178,6 @@ func (c *WsTransport) channelHandler() {
switch msg {
case utils.SG_Chan:
c.logger.Debug("channel signal received, initiating tunnel dialer")
c.lastRequest = time.Now()
go c.tunnelDialer()
case utils.SG_Closed:
c.logger.Info("control channel has been closed by the server")
Expand Down
11 changes: 2 additions & 9 deletions internal/client/transport/wsmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type WsMuxTransport struct {
usageMonitor *web.Usage
restartMutex sync.Mutex
activeConnections int32
lastRequest time.Time
}
type WsMuxConfig struct {
RemoteAddr string
Expand Down Expand Up @@ -70,7 +69,6 @@ func NewWSMuxClient(parentCtx context.Context, config *WsMuxConfig, logger *logr
controlChannel: nil, // will be set when a control connection is established
activeConnections: 0,
usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger),
lastRequest: time.Now(),
}

return client
Expand Down Expand Up @@ -109,7 +107,6 @@ func (c *WsMuxTransport) Restart() {
c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger)
c.config.TunnelStatus = ""
c.activeConnections = 0
c.lastRequest = time.Now()

go c.Start()

Expand Down Expand Up @@ -144,7 +141,7 @@ func (c *WsMuxTransport) channelDialer() {
}

func (c *WsMuxTransport) poolMaintainer() {
ticker := time.NewTicker(time.Millisecond * 500)
ticker := time.NewTicker(time.Millisecond * 350)
defer ticker.Stop()

for {
Expand All @@ -153,12 +150,9 @@ func (c *WsMuxTransport) poolMaintainer() {
return

case <-ticker.C:
if time.Since(c.lastRequest).Milliseconds() < 500 {
continue
}
activeConnections := int(c.activeConnections)
c.logger.Tracef("active connections: %d", c.activeConnections)
if activeConnections < c.config.ConnectionPool {
if activeConnections < c.config.ConnectionPool/2 {
neededConn := c.config.ConnectionPool - activeConnections
for i := 0; i < neededConn; i++ {
go c.tunnelDialer()
Expand Down Expand Up @@ -198,7 +192,6 @@ func (c *WsMuxTransport) channelHandler() {
switch msg {
case utils.SG_Chan:
c.logger.Debug("channel signal received, initiating tunnel dialer")
c.lastRequest = time.Now()
go c.tunnelDialer()
case utils.SG_HB:
c.logger.Debug("heartbeat received successfully")
Expand Down

0 comments on commit 5af1ed5

Please sign in to comment.