Skip to content

Commit

Permalink
issue #74: bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yinqiwen committed Mar 7, 2018
1 parent 7d1791f commit e10e6cc
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 9 deletions.
15 changes: 11 additions & 4 deletions common/channel/direct/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (

type directStream struct {
net.Conn
conf *channel.ProxyChannelConfig
addr string
session *directMuxSession
proxyServer string
conf *channel.ProxyChannelConfig
addr string
session *directMuxSession
proxyServer string
latestIOTime time.Time
}

func (tc *directStream) Auth(req *mux.AuthRequest) error {
Expand Down Expand Up @@ -98,16 +99,22 @@ func (tc *directStream) StreamID() uint32 {
return 0
}

func (s *directStream) LatestIOTime() time.Time {
return s.latestIOTime
}

func (tc *directStream) Read(p []byte) (int, error) {
if nil == tc.Conn {
return 0, io.EOF
}
tc.latestIOTime = time.Now()
return tc.Conn.Read(p)
}
func (tc *directStream) Write(p []byte) (int, error) {
if nil == tc.Conn {
return 0, io.EOF
}
tc.latestIOTime = time.Now()
return tc.Conn.Write(p)
}

Expand Down
13 changes: 10 additions & 3 deletions common/channel/ssh/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ import (

type sshStream struct {
net.Conn
conf *channel.ProxyChannelConfig
addr string
session *sshMuxSession
conf *channel.ProxyChannelConfig
addr string
session *sshMuxSession
latestIOTime time.Time
}

func (s *sshStream) LatestIOTime() time.Time {
return s.latestIOTime
}

func (tc *sshStream) Auth(req *mux.AuthRequest) error {
Expand Down Expand Up @@ -56,12 +61,14 @@ func (tc *sshStream) Read(p []byte) (int, error) {
if nil == tc.Conn {
return 0, io.EOF
}
tc.latestIOTime = time.Now()
return tc.Conn.Read(p)
}
func (tc *sshStream) Write(p []byte) (int, error) {
if nil == tc.Conn {
return 0, io.EOF
}
tc.latestIOTime = time.Now()
return tc.Conn.Write(p)
}

Expand Down
18 changes: 16 additions & 2 deletions common/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type MuxStream interface {
StreamID() uint32
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
LatestIOTime() time.Time
}

type MuxSession interface {
Expand All @@ -115,8 +116,21 @@ type MuxSession interface {

type ProxyMuxStream struct {
TimeoutReadWriteCloser
session MuxSession
sessionID int64
session MuxSession
sessionID int64
latestIOTime time.Time
}

func (s *ProxyMuxStream) Read(p []byte) (int, error) {
s.latestIOTime = time.Now()
return s.Read(p)
}
func (s *ProxyMuxStream) Write(p []byte) (int, error) {
s.latestIOTime = time.Now()
return s.Write(p)
}
func (s *ProxyMuxStream) LatestIOTime() time.Time {
return s.latestIOTime
}

func (s *ProxyMuxStream) StreamID() uint32 {
Expand Down
23 changes: 23 additions & 0 deletions local/local_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,25 @@ START:
io.CopyBuffer(localConn, streamReader, buf)
closeCh <- 1
}()

//start task to check stream timeout(if the stream has no read&write action more than 10s)
timeoutTicker := time.NewTicker(2 * time.Second)
stopTicker := make(chan bool, 1)
go func() {
for {
select {
case <-timeoutTicker.C:
if time.Now().Sub(stream.LatestIOTime()) > 10*time.Second {
localConn.Close()
stream.Close()
logger.Error("Close stream[%d] since it's not active since %v ago.", ssid, time.Now().Sub(stream.LatestIOTime()))
return
}
case <-stopTicker:
return
}
}
}()
if (isSocksProxy || isHttpsProxy || isTransparentProxy) && nil == initialHTTPReq {
buf := make([]byte, 128*1024)
io.CopyBuffer(streamWriter, bufconn, buf)
Expand All @@ -250,6 +269,9 @@ START:
//localConn.SetReadDeadline(time.Now().Add(5 * time.Second))
proxyReq, err = http.ReadRequest(bufconn)
if nil != err {
if err, ok := err.(net.Error); ok && err.Timeout() {

}
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
logger.Notice("Failed to read proxy http request to %s:%s for reason:%v", remoteHost, remotePort, err)
}
Expand All @@ -263,6 +285,7 @@ START:
}
}
<-closeCh
close(stopTicker)
}

func startLocalProxyServer(proxyIdx int) (*net.TCPListener, error) {
Expand Down

0 comments on commit e10e6cc

Please sign in to comment.