From 09962ccb04b7f184f88d1740bba2c68c4de4bffd Mon Sep 17 00:00:00 2001 From: yutianwu Date: Wed, 8 Dec 2021 16:42:59 +0800 Subject: [PATCH] add timeout for stopping p2p server --- eth/peerset.go | 36 ++++++++++++++++++++++++++++++++++-- p2p/server.go | 18 +++++++++++++++++- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/eth/peerset.go b/eth/peerset.go index 220b01d832..dc7807a109 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -20,6 +20,7 @@ import ( "errors" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/downloader" @@ -38,6 +39,9 @@ var ( // to the peer set, but one with the same id already exists. errPeerAlreadyRegistered = errors.New("peer already registered") + // errPeerWaitTimeout is returned if a peer waits extension for too long + errPeerWaitTimeout = errors.New("peer wait timeout") + // errPeerNotRegistered is returned if a peer is attempted to be removed from // a peer set, but no peer with the given id exists. errPeerNotRegistered = errors.New("peer not registered") @@ -51,6 +55,12 @@ var ( errDiffWithoutEth = errors.New("peer connected on diff without compatible eth support") ) +const ( + // extensionWaitTimeout is the maximum allowed time for the extension wait to + // complete before dropping the connection.= as malicious. + extensionWaitTimeout = 5 * time.Second +) + // peerSet represents the collection of active peers currently participating in // the `eth` protocol, with or without the `snap` extension. type peerSet struct { @@ -169,7 +179,18 @@ func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) { ps.snapWait[id] = wait ps.lock.Unlock() - return <-wait, nil + select { + case peer := <-wait: + return peer, nil + + case <-time.After(extensionWaitTimeout): + ps.lock.Lock() + if _, ok := ps.snapWait[id]; ok { + delete(ps.snapWait, id) + } + ps.lock.Unlock() + return nil, errPeerWaitTimeout + } } // waitDiffExtension blocks until all satellite protocols are connected and tracked @@ -203,7 +224,18 @@ func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) { ps.diffWait[id] = wait ps.lock.Unlock() - return <-wait, nil + select { + case peer := <-wait: + return peer, nil + + case <-time.After(extensionWaitTimeout): + ps.lock.Lock() + if _, ok := ps.diffWait[id]; ok { + delete(ps.diffWait, id) + } + ps.lock.Unlock() + return nil, errPeerWaitTimeout + } } func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer { diff --git a/p2p/server.go b/p2p/server.go index dbaee12ea1..fe58c9f160 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -63,6 +63,9 @@ const ( // Maximum amount of time allowed for writing a complete message. frameWriteTimeout = 20 * time.Second + + // Maximum time to wait before stop the p2p server + stopTimeout = 5 * time.Second ) var errServerStopped = errors.New("server stopped") @@ -403,7 +406,20 @@ func (srv *Server) Stop() { } close(srv.quit) srv.lock.Unlock() - srv.loopWG.Wait() + + stopChan := make(chan struct{}) + defer close(stopChan) + + go func() { + srv.loopWG.Wait() + stopChan <- struct{}{} + }() + + select { + case <-stopChan: + case <-time.After(stopTimeout): + log.Warn("stop p2p server timeout, forcing stop") + } } // sharedUDPConn implements a shared connection. Write sends messages to the underlying connection while read returns