Skip to content

Commit

Permalink
discard timeouted (after 3000ms) local connections to improve tunnel …
Browse files Browse the repository at this point in the history
…stabillity
  • Loading branch information
Musixal committed Oct 30, 2024
1 parent 3ca7658 commit d057cd7
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 13 deletions.
5 changes: 3 additions & 2 deletions internal/server/transport/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ type TunnelChannel struct { // for websocket
}

type LocalTCPConn struct {
conn net.Conn
remoteAddr string
conn net.Conn
remoteAddr string
timeCreated int64
}

type LocalAcceptUDPConn struct {
Expand Down
8 changes: 7 additions & 1 deletion internal/server/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (s *TcpTransport) acceptLocalConn(listener net.Listener, remoteAddr string)
}

select {
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr}:
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr, timeCreated: time.Now().UnixMilli()}:

select {
case s.reqNewConnChan <- struct{}{}:
Expand Down Expand Up @@ -521,6 +521,12 @@ func (s *TcpTransport) handleLoop() {
case localConn := <-s.localChannel:
loop:
for {
if time.Now().UnixMilli()-localConn.timeCreated > 3000 { // 3000ms
s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-localConn.timeCreated)
localConn.conn.Close()
break loop
}

select {
case <-s.ctx.Done():
return
Expand Down
14 changes: 10 additions & 4 deletions internal/server/transport/tcpmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (s *TcpMuxTransport) acceptLocalConn(listener net.Listener, remoteAddr stri
}

select {
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr}:
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr, timeCreated: time.Now().UnixMilli()}:
s.logger.Debugf("accepted incoming TCP connection from %s", tcpConn.RemoteAddr().String())

default: // channel is full, discard the connection
Expand Down Expand Up @@ -549,15 +549,21 @@ func (s *TcpMuxTransport) handleSession(session *smux.Session, next chan struct{
}
s.logger.Tracef("stream counter: %v, session counter: %v", atomic.LoadInt32(&s.streamCounter), atomic.LoadInt32(&s.sessionCounter))

// +1 for Muxed connections counter
done <- struct{}{}

select {
case <-s.ctx.Done():
session.Close()
return

case incomingConn := <-s.localChannel:
if time.Now().UnixMilli()-incomingConn.timeCreated > 3000 { // 3000ms
s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-incomingConn.timeCreated)
incomingConn.conn.Close()
continue
}

// +1 for mux connection counter
done <- struct{}{}

// +1 for stream counter
atomic.AddInt32(&s.streamCounter, 1)

Expand Down
7 changes: 6 additions & 1 deletion internal/server/transport/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (s *UdpTransport) localListener(localAddr, remoteAddr string) {

// Build the UDP connection object
newUDPConn := LocalUDPConn{
timeCreated: time.Now().UnixNano(), // Just for debugging
timeCreated: time.Now().UnixMilli(), // Just for debugging
payload: payloadChan,
remoteAddr: remoteAddr,
listener: listener,
Expand Down Expand Up @@ -568,6 +568,11 @@ func (s *UdpTransport) handleLoop(udpChan chan *LocalUDPConn, activeConnections
case <-s.ctx.Done():
return
case localConn := <-udpChan:
if time.Now().UnixMilli()-localConn.timeCreated > 3000 { // 3000ms
s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-localConn.timeCreated)
continue
}

loop:
for {
select {
Expand Down
8 changes: 7 additions & 1 deletion internal/server/transport/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (s *WsTransport) acceptLocalConn(listener net.Listener, remoteAddr string)
}

select {
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr}:
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr, timeCreated: time.Now().UnixMilli()}:

select {
case s.reqNewConnChan <- struct{}{}:
Expand Down Expand Up @@ -482,6 +482,12 @@ func (s *WsTransport) handleLoop() {
case localConn := <-s.localChannel:
loop:
for {
if time.Now().UnixMilli()-localConn.timeCreated > 3000 { // 3000ms
s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-localConn.timeCreated)
localConn.conn.Close()
break loop
}

select {
case <-s.ctx.Done():
return
Expand Down
14 changes: 10 additions & 4 deletions internal/server/transport/wsmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (s *WsMuxTransport) acceptLocalConn(listener net.Listener, remoteAddr strin
}

select {
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr}:
case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr, timeCreated: time.Now().UnixMilli()}:
s.logger.Debugf("accepted incoming TCP connection from %s", tcpConn.RemoteAddr().String())

default: // channel is full, discard the connection
Expand Down Expand Up @@ -522,15 +522,21 @@ func (s *WsMuxTransport) handleSession(session *smux.Session, next chan struct{}
}
s.logger.Tracef("stream counter: %v, session counter: %v", atomic.LoadInt32(&s.streamCounter), atomic.LoadInt32(&s.sessionCounter))

// +1 for Muxed connections counter
done <- struct{}{}

select {
case <-s.ctx.Done():
session.Close()
return

case incomingConn := <-s.localChannel:
if time.Now().UnixMilli()-incomingConn.timeCreated > 3000 { // 3000ms
s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-incomingConn.timeCreated)
incomingConn.conn.Close()
continue
}

// +1 for Muxed connections counter
done <- struct{}{}

// +1 for stream counter
atomic.AddInt32(&s.streamCounter, 1)

Expand Down

0 comments on commit d057cd7

Please sign in to comment.