Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): set dial and accept limit in connection gater #1089

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,3 @@ func (conf *Config) ScaledMaxConns() int {
func (conf *Config) ScaledMinConns() int {
return conf.ScaledMaxConns() / 4
}

func (conf *Config) ConnsThreshold() int {
return conf.ScaledMaxConns() / 8
}
25 changes: 10 additions & 15 deletions network/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,27 @@ func TestIsBootstrapper(t *testing.T) {

func TestScaledConns(t *testing.T) {
tests := []struct {
config Config
expectedMax int
expectedMin int
expectedThreshold int
config Config
expectedMax int
expectedMin int
}{
{Config{MaxConns: 1}, 1, 0, 0},
{Config{MaxConns: 8}, 8, 2, 1},
{Config{MaxConns: 30}, 32, 8, 4},
{Config{MaxConns: 1000}, 1024, 256, 128},
{Config{MaxConns: 1}, 1, 0},
{Config{MaxConns: 8}, 8, 2},
{Config{MaxConns: 30}, 32, 8},
{Config{MaxConns: 1000}, 1024, 256},
}

for _, test := range tests {
resultMax := test.config.ScaledMaxConns()
resultMin := test.config.ScaledMinConns()
resultThreshold := test.config.ConnsThreshold()
if resultMax != test.expectedMax ||
resultMin != test.expectedMin ||
resultThreshold != test.expectedThreshold {
resultMin != test.expectedMin {
t.Errorf("For MaxConns %d, "+
"NormedMaxConns() returned %d (expected %d), "+
"NormedMinConns() returned %d (expected %d), "+
"ConnsThreshold() returned %d (expected %d)",
"NormedMinConns() returned %d (expected %d)",
test.config.MaxConns,
resultMax, test.expectedMax,
resultMin, test.expectedMin,
resultThreshold, test.expectedThreshold)
resultMin, test.expectedMin)
}
}
}
Expand Down
48 changes: 31 additions & 17 deletions network/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
type ConnectionGater struct {
lk sync.RWMutex

filters *multiaddr.Filters
peerMgr *peerMgr
connsLimit int
logger *logger.SubLogger
filters *multiaddr.Filters
peerMgr *peerMgr
acceptLimit int
dialLimit int
logger *logger.SubLogger
}

func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) {
Expand All @@ -29,13 +30,15 @@
filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny)
}

connsLimit := conf.ScaledMaxConns() + conf.ConnsThreshold()
log.Info("connection gater created", "connsLimit", connsLimit)
acceptLimit := conf.ScaledMaxConns()
dialLimit := conf.ScaledMaxConns() / 4
log.Info("connection gater created", "listen", acceptLimit, "dial", dialLimit)

return &ConnectionGater{
filters: filters,
connsLimit: connsLimit,
logger: log,
filters: filters,
acceptLimit: acceptLimit,
dialLimit: dialLimit,
logger: log,
}, nil
}

Expand All @@ -46,20 +49,29 @@
g.peerMgr = peerMgr
}

func (g *ConnectionGater) onConnectionLimit() bool {
func (g *ConnectionGater) onDialLimit() bool {
if g.peerMgr == nil {
return false
}

return g.peerMgr.NumOfConnected() > g.connsLimit
return g.peerMgr.NumOutbound() > g.dialLimit
}

func (g *ConnectionGater) onAcceptLimit() bool {
if g.peerMgr == nil {
return false

Check warning on line 62 in network/gater.go

View check run for this annotation

Codecov / codecov/patch

network/gater.go#L62

Added line #L62 was not covered by tests
}

return g.peerMgr.NumInbound() > g.acceptLimit
}

func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Info("InterceptPeerDial rejected: many connections", "pid", pid)
if g.onDialLimit() {
g.logger.Info("InterceptPeerDial rejected: many connections",
"pid", pid, "outbound", g.peerMgr.NumOutbound())

return false
}
Expand All @@ -71,8 +83,9 @@
g.lk.RLock()
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Info("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String())
if g.onDialLimit() {
g.logger.Info("InterceptAddrDial rejected: many connections",
"pid", pid, "ma", ma.String(), "outbound", g.peerMgr.NumOutbound())

return false
}
Expand All @@ -91,8 +104,9 @@
g.lk.RLock()
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Info("InterceptAccept rejected: many connections")
if g.onAcceptLimit() {
g.logger.Info("InterceptAccept rejected: many connections",
"inbound", g.peerMgr.NumInbound())

return false
}
Expand Down
27 changes: 17 additions & 10 deletions network/gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,38 @@ func TestDenyPrivate(t *testing.T) {
func TestMaxConnection(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
conf.MaxConns = 8
assert.Equal(t, conf.ScaledMinConns(), 2)
assert.Equal(t, conf.ScaledMaxConns(), 8)
assert.Equal(t, conf.ConnsThreshold(), 1)
conf.MaxConns = 4
assert.Equal(t, conf.ScaledMaxConns(), 4)
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
aMultiAddr := multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

for i := 0; i < 9; i++ {
net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound)
}
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)

assert.True(t, net.connGater.InterceptPeerDial(pid))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.True(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))

net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound)

assert.False(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.True(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))

net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound)

assert.False(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
Expand Down
8 changes: 5 additions & 3 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
// The connection manager doesn't reject any connections.
// It just triggers a pruning run once the high watermark is reached (or surpassed).

lowWM := conf.ScaledMinConns() // Low Watermark
highWM := conf.ScaledMaxConns() - conf.ConnsThreshold() // High Watermark
//
lowWM := conf.ScaledMinConns() // Low Watermark, ex: 64 (max)
highWM := conf.ScaledMaxConns() - conf.ScaledMinConns() // High Watermark, ex: 64 (max) - 16 (min) = 48
connMgr, err := lp2pconnmgr.NewConnManager(
lowWM, highWM,
lp2pconnmgr.WithGracePeriod(time.Minute),
Expand Down Expand Up @@ -272,7 +273,8 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
self.logger.Info("network setup", "id", self.host.ID(),
"name", conf.NetworkName,
"address", conf.ListenAddrs(),
"bootstrapper", conf.IsBootstrapper)
"bootstrapper", conf.IsBootstrapper,
"maxConns", conf.MaxConns)

return self, nil
}
Expand Down
56 changes: 44 additions & 12 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type peerMgr struct {
bootstrapAddrs []lp2ppeer.AddrInfo
minConns int
maxConns int
numInbound int
numOutbound int
host lp2phost.Host
peers map[lp2ppeer.ID]*peerInfo
logger *logger.SubLogger
Expand Down Expand Up @@ -70,18 +72,20 @@ func (mgr *peerMgr) Start() {
func (mgr *peerMgr) Stop() {
}

func (mgr *peerMgr) NumOfConnected() int {
mgr.lk.RLock()
defer mgr.lk.RUnlock()

return len(mgr.peers) // TODO: try to keep record of all peers + connected peers
}

func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr, direction lp2pnet.Direction,
func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr,
direction lp2pnet.Direction,
) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

switch direction {
case lp2pnet.DirInbound:
mgr.numInbound++

case lp2pnet.DirOutbound:
mgr.numOutbound++
}

mgr.peers[pid] = &peerInfo{
MultiAddress: ma,
Direction: direction,
Expand All @@ -92,6 +96,21 @@ func (mgr *peerMgr) RemovePeer(pid lp2ppeer.ID) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

peer, ok := mgr.peers[pid]
if !ok {
mgr.logger.Warn("unable to find a peer", "pid", pid)

return
}

switch peer.Direction {
case lp2pnet.DirInbound:
mgr.numInbound--

case lp2pnet.DirOutbound:
mgr.numOutbound--
}

delete(mgr.peers, pid)
}

Expand All @@ -113,9 +132,8 @@ func (mgr *peerMgr) CheckConnectivity() {
mgr.lk.Lock()
defer mgr.lk.Unlock()

net := mgr.host.Network()
connectedPeers := len(net.Peers())
mgr.logger.Debug("check connectivity", "peers", len(mgr.peers), "connected", connectedPeers)
connectedPeers := len(mgr.peers)
mgr.logger.Debug("check connectivity", "peers", connectedPeers)

switch {
case connectedPeers > mgr.maxConns:
Expand All @@ -139,7 +157,7 @@ func (mgr *peerMgr) CheckConnectivity() {
mgr.logger.Debug("try connecting to a bootstrap peer", "peer", ai.String())

// Don't try to connect to an already connected peer.
if net.Connectedness(ai.ID) == lp2pnet.Connected {
if mgr.host.Network().Connectedness(ai.ID) == lp2pnet.Connected {
mgr.logger.Trace("already connected", "peer", ai.String())

continue
Expand All @@ -149,3 +167,17 @@ func (mgr *peerMgr) CheckConnectivity() {
}
}
}

func (mgr *peerMgr) NumInbound() int {
mgr.lk.RLock()
defer mgr.lk.RUnlock()

return mgr.numInbound
}

func (mgr *peerMgr) NumOutbound() int {
mgr.lk.RLock()
defer mgr.lk.RUnlock()

return mgr.numOutbound
}
27 changes: 27 additions & 0 deletions network/peermgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,30 @@ func TestGetMultiAddr(t *testing.T) {
net.peerMgr.RemovePeer(pid)
assert.Nil(t, net.peerMgr.GetMultiAddr(pid))
}

func TestNumInboundOutbound(t *testing.T) {
ts := testsuite.NewTestSuite(t)

conf := testConfig()
net := makeTestNetwork(t, conf, nil)

addr, _ := IPToMultiAddr("1.2.3.4", 1234)

pid1 := ts.RandPeerID()
pid2 := ts.RandPeerID()
pid3 := ts.RandPeerID()

net.peerMgr.AddPeer(pid1, addr, lp2pnet.DirInbound)
net.peerMgr.AddPeer(pid2, addr, lp2pnet.DirOutbound)
net.peerMgr.AddPeer(pid3, addr, lp2pnet.DirOutbound)

assert.Equal(t, 1, net.peerMgr.NumInbound())
assert.Equal(t, 2, net.peerMgr.NumOutbound())

net.peerMgr.RemovePeer(pid1)
net.peerMgr.RemovePeer(pid2)
net.peerMgr.RemovePeer(ts.RandPeerID())

assert.Equal(t, 0, net.peerMgr.NumInbound())
assert.Equal(t, 1, net.peerMgr.NumOutbound())
}
13 changes: 4 additions & 9 deletions network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"slices"
"time"

lp2pspb "github.com/libp2p/go-libp2p-pubsub/pb"
Expand Down Expand Up @@ -67,13 +68,7 @@ func IPToMultiAddr(ip string, port int) (multiaddr.Multiaddr, error) {

// HasPID checks if a peer ID exists in a list of peer IDs.
func HasPID(pids []lp2ppeer.ID, pid lp2ppeer.ID) bool {
for _, p := range pids {
if p == pid {
return true
}
}

return false
return slices.Contains(pids, pid)
}

func ConnectAsync(ctx context.Context, h lp2phost.Host, addrInfo lp2ppeer.AddrInfo, log *logger.SubLogger) {
Expand Down Expand Up @@ -147,10 +142,10 @@ func BuildConcreteLimitConfig(maxConns int) lp2prcmgr.ConcreteLimitConfig {
maxConnVal := lp2prcmgr.LimitVal(int(float32(maxConns) * coefficient))

limit.ConnsInbound = maxConnVal
limit.ConnsOutbound = maxConnVal / 4
limit.ConnsOutbound = maxConnVal
limit.Conns = maxConnVal
limit.StreamsInbound = maxConnVal * 8
limit.StreamsOutbound = maxConnVal * 2
limit.StreamsOutbound = maxConnVal * 8
limit.Streams = maxConnVal * 8
}

Expand Down
Loading