diff --git a/network/config.go b/network/config.go index 16a15f0f4..ec73f0f10 100644 --- a/network/config.go +++ b/network/config.go @@ -145,7 +145,3 @@ func (conf *Config) ScaledMaxConns() int { func (conf *Config) ScaledMinConns() int { return conf.ScaledMaxConns() / 4 } - -func (conf *Config) ConnsThreshold() int { - return conf.ScaledMaxConns() / 8 -} diff --git a/network/config_test.go b/network/config_test.go index ee9506a51..e2b5dfb2c 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -136,32 +136,27 @@ func TestIsBootstrapper(t *testing.T) { func TestScaledConns(t *testing.T) { tests := []struct { - config Config - expectedMax int - expectedMin int - expectedThreshold int + config Config + expectedMax int + expectedMin int }{ - {Config{MaxConns: 1}, 1, 0, 0}, - {Config{MaxConns: 8}, 8, 2, 1}, - {Config{MaxConns: 30}, 32, 8, 4}, - {Config{MaxConns: 1000}, 1024, 256, 128}, + {Config{MaxConns: 1}, 1, 0}, + {Config{MaxConns: 8}, 8, 2}, + {Config{MaxConns: 30}, 32, 8}, + {Config{MaxConns: 1000}, 1024, 256}, } for _, test := range tests { resultMax := test.config.ScaledMaxConns() resultMin := test.config.ScaledMinConns() - resultThreshold := test.config.ConnsThreshold() if resultMax != test.expectedMax || - resultMin != test.expectedMin || - resultThreshold != test.expectedThreshold { + resultMin != test.expectedMin { t.Errorf("For MaxConns %d, "+ "NormedMaxConns() returned %d (expected %d), "+ - "NormedMinConns() returned %d (expected %d), "+ - "ConnsThreshold() returned %d (expected %d)", + "NormedMinConns() returned %d (expected %d)", test.config.MaxConns, resultMax, test.expectedMax, - resultMin, test.expectedMin, - resultThreshold, test.expectedThreshold) + resultMin, test.expectedMin) } } } diff --git a/network/gater.go b/network/gater.go index cf615309c..2622da78a 100644 --- a/network/gater.go +++ b/network/gater.go @@ -16,10 +16,11 @@ var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{} type ConnectionGater struct { lk sync.RWMutex - filters *multiaddr.Filters - peerMgr *peerMgr - connsLimit int - logger *logger.SubLogger + filters *multiaddr.Filters + peerMgr *peerMgr + acceptLimit int + dialLimit int + logger *logger.SubLogger } func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) { @@ -29,13 +30,15 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny) } - connsLimit := conf.ScaledMaxConns() + conf.ConnsThreshold() - log.Info("connection gater created", "connsLimit", connsLimit) + acceptLimit := conf.ScaledMaxConns() + dialLimit := conf.ScaledMaxConns() / 4 + log.Info("connection gater created", "listen", acceptLimit, "dial", dialLimit) return &ConnectionGater{ - filters: filters, - connsLimit: connsLimit, - logger: log, + filters: filters, + acceptLimit: acceptLimit, + dialLimit: dialLimit, + logger: log, }, nil } @@ -46,20 +49,29 @@ func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) { g.peerMgr = peerMgr } -func (g *ConnectionGater) onConnectionLimit() bool { +func (g *ConnectionGater) onDialLimit() bool { if g.peerMgr == nil { return false } - return g.peerMgr.NumOfConnected() > g.connsLimit + return g.peerMgr.NumOutbound() > g.dialLimit +} + +func (g *ConnectionGater) onAcceptLimit() bool { + if g.peerMgr == nil { + return false + } + + return g.peerMgr.NumInbound() > g.acceptLimit } func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool { g.lk.RLock() defer g.lk.RUnlock() - if g.onConnectionLimit() { - g.logger.Info("InterceptPeerDial rejected: many connections", "pid", pid) + if g.onDialLimit() { + g.logger.Info("InterceptPeerDial rejected: many connections", + "pid", pid, "outbound", g.peerMgr.NumOutbound()) return false } @@ -71,8 +83,9 @@ func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multia g.lk.RLock() defer g.lk.RUnlock() - if g.onConnectionLimit() { - g.logger.Info("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String()) + if g.onDialLimit() { + g.logger.Info("InterceptAddrDial rejected: many connections", + "pid", pid, "ma", ma.String(), "outbound", g.peerMgr.NumOutbound()) return false } @@ -91,8 +104,9 @@ func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { g.lk.RLock() defer g.lk.RUnlock() - if g.onConnectionLimit() { - g.logger.Info("InterceptAccept rejected: many connections") + if g.onAcceptLimit() { + g.logger.Info("InterceptAccept rejected: many connections", + "inbound", g.peerMgr.NumInbound()) return false } diff --git a/network/gater_test.go b/network/gater_test.go index 4fb25523f..c01bbe0cb 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -61,22 +61,22 @@ func TestDenyPrivate(t *testing.T) { func TestMaxConnection(t *testing.T) { ts := testsuite.NewTestSuite(t) conf := testConfig() - conf.MaxConns = 8 - assert.Equal(t, conf.ScaledMinConns(), 2) - assert.Equal(t, conf.ScaledMaxConns(), 8) - assert.Equal(t, conf.ConnsThreshold(), 1) + conf.MaxConns = 4 + assert.Equal(t, conf.ScaledMaxConns(), 4) net := makeTestNetwork(t, conf, nil) maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234") + aMultiAddr := multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234") cmaPrivate := &mockConnMultiaddrs{remote: maPrivate} cmaPublic := &mockConnMultiaddrs{remote: maPublic} pid := ts.RandPeerID() - for i := 0; i < 9; i++ { - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) - } + net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound) + net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) assert.True(t, net.connGater.InterceptPeerDial(pid)) assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) @@ -84,8 +84,15 @@ func TestMaxConnection(t *testing.T) { assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) assert.True(t, net.connGater.InterceptAccept(cmaPublic)) - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) + net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound) + + assert.False(t, net.connGater.InterceptPeerDial(pid)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.True(t, net.connGater.InterceptAccept(cmaPublic)) + + net.peerMgr.AddPeer(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) assert.False(t, net.connGater.InterceptPeerDial(pid)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) diff --git a/network/network.go b/network/network.go index 833f2d5ac..0c88da789 100644 --- a/network/network.go +++ b/network/network.go @@ -133,8 +133,9 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo // The connection manager doesn't reject any connections. // It just triggers a pruning run once the high watermark is reached (or surpassed). - lowWM := conf.ScaledMinConns() // Low Watermark - highWM := conf.ScaledMaxConns() - conf.ConnsThreshold() // High Watermark + // + lowWM := conf.ScaledMinConns() // Low Watermark, ex: 64 (max) + highWM := conf.ScaledMaxConns() - conf.ScaledMinConns() // High Watermark, ex: 64 (max) - 16 (min) = 48 connMgr, err := lp2pconnmgr.NewConnManager( lowWM, highWM, lp2pconnmgr.WithGracePeriod(time.Minute), @@ -272,7 +273,8 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo self.logger.Info("network setup", "id", self.host.ID(), "name", conf.NetworkName, "address", conf.ListenAddrs(), - "bootstrapper", conf.IsBootstrapper) + "bootstrapper", conf.IsBootstrapper, + "maxConns", conf.MaxConns) return self, nil } diff --git a/network/peermgr.go b/network/peermgr.go index 45a84bbb2..b56f6decd 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -26,6 +26,8 @@ type peerMgr struct { bootstrapAddrs []lp2ppeer.AddrInfo minConns int maxConns int + numInbound int + numOutbound int host lp2phost.Host peers map[lp2ppeer.ID]*peerInfo logger *logger.SubLogger @@ -70,18 +72,20 @@ func (mgr *peerMgr) Start() { func (mgr *peerMgr) Stop() { } -func (mgr *peerMgr) NumOfConnected() int { - mgr.lk.RLock() - defer mgr.lk.RUnlock() - - return len(mgr.peers) // TODO: try to keep record of all peers + connected peers -} - -func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr, direction lp2pnet.Direction, +func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr, + direction lp2pnet.Direction, ) { mgr.lk.Lock() defer mgr.lk.Unlock() + switch direction { + case lp2pnet.DirInbound: + mgr.numInbound++ + + case lp2pnet.DirOutbound: + mgr.numOutbound++ + } + mgr.peers[pid] = &peerInfo{ MultiAddress: ma, Direction: direction, @@ -92,6 +96,21 @@ func (mgr *peerMgr) RemovePeer(pid lp2ppeer.ID) { mgr.lk.Lock() defer mgr.lk.Unlock() + peer, ok := mgr.peers[pid] + if !ok { + mgr.logger.Warn("unable to find a peer", "pid", pid) + + return + } + + switch peer.Direction { + case lp2pnet.DirInbound: + mgr.numInbound-- + + case lp2pnet.DirOutbound: + mgr.numOutbound-- + } + delete(mgr.peers, pid) } @@ -113,9 +132,8 @@ func (mgr *peerMgr) CheckConnectivity() { mgr.lk.Lock() defer mgr.lk.Unlock() - net := mgr.host.Network() - connectedPeers := len(net.Peers()) - mgr.logger.Debug("check connectivity", "peers", len(mgr.peers), "connected", connectedPeers) + connectedPeers := len(mgr.peers) + mgr.logger.Debug("check connectivity", "peers", connectedPeers) switch { case connectedPeers > mgr.maxConns: @@ -139,7 +157,7 @@ func (mgr *peerMgr) CheckConnectivity() { mgr.logger.Debug("try connecting to a bootstrap peer", "peer", ai.String()) // Don't try to connect to an already connected peer. - if net.Connectedness(ai.ID) == lp2pnet.Connected { + if mgr.host.Network().Connectedness(ai.ID) == lp2pnet.Connected { mgr.logger.Trace("already connected", "peer", ai.String()) continue @@ -149,3 +167,17 @@ func (mgr *peerMgr) CheckConnectivity() { } } } + +func (mgr *peerMgr) NumInbound() int { + mgr.lk.RLock() + defer mgr.lk.RUnlock() + + return mgr.numInbound +} + +func (mgr *peerMgr) NumOutbound() int { + mgr.lk.RLock() + defer mgr.lk.RUnlock() + + return mgr.numOutbound +} diff --git a/network/peermgr_test.go b/network/peermgr_test.go index e1a262ca5..2f1931597 100644 --- a/network/peermgr_test.go +++ b/network/peermgr_test.go @@ -24,3 +24,30 @@ func TestGetMultiAddr(t *testing.T) { net.peerMgr.RemovePeer(pid) assert.Nil(t, net.peerMgr.GetMultiAddr(pid)) } + +func TestNumInboundOutbound(t *testing.T) { + ts := testsuite.NewTestSuite(t) + + conf := testConfig() + net := makeTestNetwork(t, conf, nil) + + addr, _ := IPToMultiAddr("1.2.3.4", 1234) + + pid1 := ts.RandPeerID() + pid2 := ts.RandPeerID() + pid3 := ts.RandPeerID() + + net.peerMgr.AddPeer(pid1, addr, lp2pnet.DirInbound) + net.peerMgr.AddPeer(pid2, addr, lp2pnet.DirOutbound) + net.peerMgr.AddPeer(pid3, addr, lp2pnet.DirOutbound) + + assert.Equal(t, 1, net.peerMgr.NumInbound()) + assert.Equal(t, 2, net.peerMgr.NumOutbound()) + + net.peerMgr.RemovePeer(pid1) + net.peerMgr.RemovePeer(pid2) + net.peerMgr.RemovePeer(ts.RandPeerID()) + + assert.Equal(t, 0, net.peerMgr.NumInbound()) + assert.Equal(t, 1, net.peerMgr.NumOutbound()) +} diff --git a/network/utils.go b/network/utils.go index c4f19ab2b..b1297f99b 100644 --- a/network/utils.go +++ b/network/utils.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "slices" "time" lp2pspb "github.com/libp2p/go-libp2p-pubsub/pb" @@ -67,13 +68,7 @@ func IPToMultiAddr(ip string, port int) (multiaddr.Multiaddr, error) { // HasPID checks if a peer ID exists in a list of peer IDs. func HasPID(pids []lp2ppeer.ID, pid lp2ppeer.ID) bool { - for _, p := range pids { - if p == pid { - return true - } - } - - return false + return slices.Contains(pids, pid) } func ConnectAsync(ctx context.Context, h lp2phost.Host, addrInfo lp2ppeer.AddrInfo, log *logger.SubLogger) { @@ -147,10 +142,10 @@ func BuildConcreteLimitConfig(maxConns int) lp2prcmgr.ConcreteLimitConfig { maxConnVal := lp2prcmgr.LimitVal(int(float32(maxConns) * coefficient)) limit.ConnsInbound = maxConnVal - limit.ConnsOutbound = maxConnVal / 4 + limit.ConnsOutbound = maxConnVal limit.Conns = maxConnVal limit.StreamsInbound = maxConnVal * 8 - limit.StreamsOutbound = maxConnVal * 2 + limit.StreamsOutbound = maxConnVal * 8 limit.Streams = maxConnVal * 8 }