Skip to content

Commit

Permalink
fix(network): set dial and accept limit in connection gater (#1089)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Feb 9, 2024
1 parent 7c4bb85 commit 2d9dd44
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 70 deletions.
4 changes: 0 additions & 4 deletions network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,3 @@ func (conf *Config) ScaledMaxConns() int {
func (conf *Config) ScaledMinConns() int {
return conf.ScaledMaxConns() / 4
}

func (conf *Config) ConnsThreshold() int {
return conf.ScaledMaxConns() / 8
}
25 changes: 10 additions & 15 deletions network/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,27 @@ func TestIsBootstrapper(t *testing.T) {

func TestScaledConns(t *testing.T) {
tests := []struct {
config Config
expectedMax int
expectedMin int
expectedThreshold int
config Config
expectedMax int
expectedMin int
}{
{Config{MaxConns: 1}, 1, 0, 0},
{Config{MaxConns: 8}, 8, 2, 1},
{Config{MaxConns: 30}, 32, 8, 4},
{Config{MaxConns: 1000}, 1024, 256, 128},
{Config{MaxConns: 1}, 1, 0},
{Config{MaxConns: 8}, 8, 2},
{Config{MaxConns: 30}, 32, 8},
{Config{MaxConns: 1000}, 1024, 256},
}

for _, test := range tests {
resultMax := test.config.ScaledMaxConns()
resultMin := test.config.ScaledMinConns()
resultThreshold := test.config.ConnsThreshold()
if resultMax != test.expectedMax ||
resultMin != test.expectedMin ||
resultThreshold != test.expectedThreshold {
resultMin != test.expectedMin {
t.Errorf("For MaxConns %d, "+
"NormedMaxConns() returned %d (expected %d), "+
"NormedMinConns() returned %d (expected %d), "+
"ConnsThreshold() returned %d (expected %d)",
"NormedMinConns() returned %d (expected %d)",
test.config.MaxConns,
resultMax, test.expectedMax,
resultMin, test.expectedMin,
resultThreshold, test.expectedThreshold)
resultMin, test.expectedMin)
}
}
}
Expand Down
48 changes: 31 additions & 17 deletions network/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{}
type ConnectionGater struct {
lk sync.RWMutex

filters *multiaddr.Filters
peerMgr *peerMgr
connsLimit int
logger *logger.SubLogger
filters *multiaddr.Filters
peerMgr *peerMgr
acceptLimit int
dialLimit int
logger *logger.SubLogger
}

func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) {
Expand All @@ -29,13 +30,15 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater,
filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny)
}

connsLimit := conf.ScaledMaxConns() + conf.ConnsThreshold()
log.Info("connection gater created", "connsLimit", connsLimit)
acceptLimit := conf.ScaledMaxConns()
dialLimit := conf.ScaledMaxConns() / 4
log.Info("connection gater created", "listen", acceptLimit, "dial", dialLimit)

return &ConnectionGater{
filters: filters,
connsLimit: connsLimit,
logger: log,
filters: filters,
acceptLimit: acceptLimit,
dialLimit: dialLimit,
logger: log,
}, nil
}

Expand All @@ -46,20 +49,29 @@ func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) {
g.peerMgr = peerMgr
}

func (g *ConnectionGater) onConnectionLimit() bool {
func (g *ConnectionGater) onDialLimit() bool {
if g.peerMgr == nil {
return false
}

return g.peerMgr.NumOfConnected() > g.connsLimit
return g.peerMgr.NumOutbound() > g.dialLimit
}

func (g *ConnectionGater) onAcceptLimit() bool {
if g.peerMgr == nil {
return false
}

return g.peerMgr.NumInbound() > g.acceptLimit
}

func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Info("InterceptPeerDial rejected: many connections", "pid", pid)
if g.onDialLimit() {
g.logger.Info("InterceptPeerDial rejected: many connections",
"pid", pid, "outbound", g.peerMgr.NumOutbound())

return false
}
Expand All @@ -71,8 +83,9 @@ func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multia
g.lk.RLock()
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Info("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String())
if g.onDialLimit() {
g.logger.Info("InterceptAddrDial rejected: many connections",
"pid", pid, "ma", ma.String(), "outbound", g.peerMgr.NumOutbound())

return false
}
Expand All @@ -91,8 +104,9 @@ func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Info("InterceptAccept rejected: many connections")
if g.onAcceptLimit() {
g.logger.Info("InterceptAccept rejected: many connections",
"inbound", g.peerMgr.NumInbound())

return false
}
Expand Down
27 changes: 17 additions & 10 deletions network/gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,38 @@ func TestDenyPrivate(t *testing.T) {
func TestMaxConnection(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
conf.MaxConns = 8
assert.Equal(t, conf.ScaledMinConns(), 2)
assert.Equal(t, conf.ScaledMaxConns(), 8)
assert.Equal(t, conf.ConnsThreshold(), 1)
conf.MaxConns = 4
assert.Equal(t, conf.ScaledMaxConns(), 4)
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
aMultiAddr := multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

for i := 0; i < 9; i++ {
net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound)
}
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)

assert.True(t, net.connGater.InterceptPeerDial(pid))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.True(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))

net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound)

assert.False(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.True(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))

net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)

assert.False(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
Expand Down
8 changes: 5 additions & 3 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
// The connection manager doesn't reject any connections.
// It just triggers a pruning run once the high watermark is reached (or surpassed).

lowWM := conf.ScaledMinConns() // Low Watermark
highWM := conf.ScaledMaxConns() - conf.ConnsThreshold() // High Watermark
//
lowWM := conf.ScaledMinConns() // Low Watermark, ex: 64 (max)
highWM := conf.ScaledMaxConns() - conf.ScaledMinConns() // High Watermark, ex: 64 (max) - 16 (min) = 48
connMgr, err := lp2pconnmgr.NewConnManager(
lowWM, highWM,
lp2pconnmgr.WithGracePeriod(time.Minute),
Expand Down Expand Up @@ -272,7 +273,8 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
self.logger.Info("network setup", "id", self.host.ID(),
"name", conf.NetworkName,
"address", conf.ListenAddrs(),
"bootstrapper", conf.IsBootstrapper)
"bootstrapper", conf.IsBootstrapper,
"maxConns", conf.MaxConns)

return self, nil
}
Expand Down
56 changes: 44 additions & 12 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type peerMgr struct {
bootstrapAddrs []lp2ppeer.AddrInfo
minConns int
maxConns int
numInbound int
numOutbound int
host lp2phost.Host
peers map[lp2ppeer.ID]*peerInfo
logger *logger.SubLogger
Expand Down Expand Up @@ -70,18 +72,20 @@ func (mgr *peerMgr) Start() {
func (mgr *peerMgr) Stop() {
}

func (mgr *peerMgr) NumOfConnected() int {
mgr.lk.RLock()
defer mgr.lk.RUnlock()

return len(mgr.peers) // TODO: try to keep record of all peers + connected peers
}

func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr, direction lp2pnet.Direction,
func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr,
direction lp2pnet.Direction,
) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

switch direction {
case lp2pnet.DirInbound:
mgr.numInbound++

case lp2pnet.DirOutbound:
mgr.numOutbound++
}

mgr.peers[pid] = &peerInfo{
MultiAddress: ma,
Direction: direction,
Expand All @@ -92,6 +96,21 @@ func (mgr *peerMgr) RemovePeer(pid lp2ppeer.ID) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

peer, ok := mgr.peers[pid]
if !ok {
mgr.logger.Warn("unable to find a peer", "pid", pid)

return
}

switch peer.Direction {
case lp2pnet.DirInbound:
mgr.numInbound--

case lp2pnet.DirOutbound:
mgr.numOutbound--
}

delete(mgr.peers, pid)
}

Expand All @@ -113,9 +132,8 @@ func (mgr *peerMgr) CheckConnectivity() {
mgr.lk.Lock()
defer mgr.lk.Unlock()

net := mgr.host.Network()
connectedPeers := len(net.Peers())
mgr.logger.Debug("check connectivity", "peers", len(mgr.peers), "connected", connectedPeers)
connectedPeers := len(mgr.peers)
mgr.logger.Debug("check connectivity", "peers", connectedPeers)

switch {
case connectedPeers > mgr.maxConns:
Expand All @@ -139,7 +157,7 @@ func (mgr *peerMgr) CheckConnectivity() {
mgr.logger.Debug("try connecting to a bootstrap peer", "peer", ai.String())

// Don't try to connect to an already connected peer.
if net.Connectedness(ai.ID) == lp2pnet.Connected {
if mgr.host.Network().Connectedness(ai.ID) == lp2pnet.Connected {
mgr.logger.Trace("already connected", "peer", ai.String())

continue
Expand All @@ -149,3 +167,17 @@ func (mgr *peerMgr) CheckConnectivity() {
}
}
}

func (mgr *peerMgr) NumInbound() int {
mgr.lk.RLock()
defer mgr.lk.RUnlock()

return mgr.numInbound
}

func (mgr *peerMgr) NumOutbound() int {
mgr.lk.RLock()
defer mgr.lk.RUnlock()

return mgr.numOutbound
}
27 changes: 27 additions & 0 deletions network/peermgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,30 @@ func TestGetMultiAddr(t *testing.T) {
net.peerMgr.RemovePeer(pid)
assert.Nil(t, net.peerMgr.GetMultiAddr(pid))
}

func TestNumInboundOutbound(t *testing.T) {
ts := testsuite.NewTestSuite(t)

conf := testConfig()
net := makeTestNetwork(t, conf, nil)

addr, _ := IPToMultiAddr("1.2.3.4", 1234)

pid1 := ts.RandPeerID()
pid2 := ts.RandPeerID()
pid3 := ts.RandPeerID()

net.peerMgr.AddPeer(pid1, addr, lp2pnet.DirInbound)
net.peerMgr.AddPeer(pid2, addr, lp2pnet.DirOutbound)
net.peerMgr.AddPeer(pid3, addr, lp2pnet.DirOutbound)

assert.Equal(t, 1, net.peerMgr.NumInbound())
assert.Equal(t, 2, net.peerMgr.NumOutbound())

net.peerMgr.RemovePeer(pid1)
net.peerMgr.RemovePeer(pid2)
net.peerMgr.RemovePeer(ts.RandPeerID())

assert.Equal(t, 0, net.peerMgr.NumInbound())
assert.Equal(t, 1, net.peerMgr.NumOutbound())
}
13 changes: 4 additions & 9 deletions network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"slices"
"time"

lp2pspb "github.com/libp2p/go-libp2p-pubsub/pb"
Expand Down Expand Up @@ -67,13 +68,7 @@ func IPToMultiAddr(ip string, port int) (multiaddr.Multiaddr, error) {

// HasPID checks if a peer ID exists in a list of peer IDs.
func HasPID(pids []lp2ppeer.ID, pid lp2ppeer.ID) bool {
for _, p := range pids {
if p == pid {
return true
}
}

return false
return slices.Contains(pids, pid)
}

func ConnectAsync(ctx context.Context, h lp2phost.Host, addrInfo lp2ppeer.AddrInfo, log *logger.SubLogger) {
Expand Down Expand Up @@ -147,10 +142,10 @@ func BuildConcreteLimitConfig(maxConns int) lp2prcmgr.ConcreteLimitConfig {
maxConnVal := lp2prcmgr.LimitVal(int(float32(maxConns) * coefficient))

limit.ConnsInbound = maxConnVal
limit.ConnsOutbound = maxConnVal / 4
limit.ConnsOutbound = maxConnVal
limit.Conns = maxConnVal
limit.StreamsInbound = maxConnVal * 8
limit.StreamsOutbound = maxConnVal * 2
limit.StreamsOutbound = maxConnVal * 8
limit.Streams = maxConnVal * 8
}

Expand Down

0 comments on commit 2d9dd44

Please sign in to comment.