diff --git a/config/config.go b/config/config.go index 6e2bb78dc..5e41739e0 100644 --- a/config/config.go +++ b/config/config.go @@ -119,8 +119,8 @@ func SaveTestnetConfig(path string, numValidators int) error { "/dns/pactus.nodesync.top/tcp/21777/p2p/12D3KooWP25ejVsd7cL5DvWAPwEu4JTUwnPniHBf4w93tgSezVt8", // NodeSync.Top (lthuan2011@gmail.com) "/ip4/95.217.89.202/tcp/21777/p2p/12D3KooWMsi5oYkbbpyyXctmPXzF8UZu2pCvKPRZGyvymhN9BzTD", // CodeBlockLabs (emailbuatcariduit@gmail.com) } - conf.Network.MinConns = 8 - conf.Network.MaxConns = 16 + conf.Network.MinConns = 16 + conf.Network.MaxConns = 32 conf.Network.EnableNAT = false conf.Network.EnableRelay = false conf.Network.RelayAddrStrings = []string{ diff --git a/config/example_config.toml b/config/example_config.toml index 64c93ed85..0b52b19eb 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -39,12 +39,12 @@ ## bootstrap_addrs = ["/ip4/172.104.46.145/tcp/21777/p2p/12D3KooWNYD4bB82YZRXv6oNyYPwc5ozabx2epv75ATV3D8VD3Mq"] # `min_connections` is the minimum number of connections that the Pactus node should maintain. - # Default is 8 - ## min_connections = 8 + # Default is 16 + ## min_connections = 16 # `max_connections` is the maximum number of connections that the Pactus node should maintain. - # Default is 16 - ## max_connections = 16 + # Default is 32 + ## max_connections = 32 # `enable_nat` indicates whether NAT service should be enabled or not. # NAT service allows many machines to share a single public address. diff --git a/network/config.go b/network/config.go index 1e49bce13..6267703e6 100644 --- a/network/config.go +++ b/network/config.go @@ -53,8 +53,8 @@ func DefaultConfig() *Config { }, RelayAddrStrings: []string{}, BootstrapAddrStrings: bootstrapAddrs, - MinConns: 8, - MaxConns: 16, + MinConns: 16, + MaxConns: 32, EnableNAT: false, EnableRelay: false, EnableMdns: false, diff --git a/network/gater.go b/network/gater.go index 420e569b5..e05648118 100644 --- a/network/gater.go +++ b/network/gater.go @@ -1,11 +1,12 @@ package network import ( + "sync" + lp2pconnmgr "github.com/libp2p/go-libp2p/core/connmgr" lp2pcontrol "github.com/libp2p/go-libp2p/core/control" lp2pnetwork "github.com/libp2p/go-libp2p/core/network" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" - lp2pconngater "github.com/libp2p/go-libp2p/p2p/net/conngater" "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util/logger" ) @@ -13,70 +14,95 @@ import ( var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{} type ConnectionGater struct { - *lp2pconngater.BasicConnectionGater - logger *logger.SubLogger + lk sync.RWMutex + + filters *multiaddr.Filters + peerMgr *peerMgr + maxConn int + logger *logger.SubLogger } func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) { - connGater, err := lp2pconngater.NewBasicConnectionGater(nil) - if err != nil { - return nil, err - } - + filters := multiaddr.NewFilters() if !conf.ForcePrivateNetwork { privateSubnets := PrivateSubnets() - for _, sn := range privateSubnets { - err := connGater.BlockSubnet(sn) - if err != nil { - return nil, LibP2PError{Err: err} - } - } + filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny) } return &ConnectionGater{ - BasicConnectionGater: connGater, - logger: log, + filters: filters, + maxConn: conf.MaxConns, + logger: log, }, nil } -func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool { - allow := g.BasicConnectionGater.InterceptPeerDial(p) - if !allow { - g.logger.Debug("InterceptPeerDial not allowed", "p") +func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) { + g.lk.Lock() + defer g.lk.Unlock() + + g.peerMgr = peerMgr +} + +func (g *ConnectionGater) hasMaxConnections() bool { + if g.peerMgr == nil { + return false } - return allow + return g.peerMgr.NumOfConnected() > g.maxConn } -func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiaddr) bool { - allow := g.BasicConnectionGater.InterceptAddrDial(p, ma) - if !allow { - g.logger.Debug("InterceptAddrDial not allowed", "p", p, "ma", ma.String()) +func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool { + g.lk.RLock() + defer g.lk.RUnlock() + + if g.hasMaxConnections() { + g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid) + return false } - return allow + return true } -func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { - allow := g.BasicConnectionGater.InterceptAccept(cma) - if !allow { - g.logger.Debug("InterceptAccept not allowed") +func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multiaddr) bool { + g.lk.RLock() + defer g.lk.RUnlock() + + if g.hasMaxConnections() { + g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String()) + return false + } + + deny := g.filters.AddrBlocked(ma) + if deny { + g.logger.Debug("InterceptAddrDial rejected", "pid", pid, "ma", ma.String()) + return false } - return allow + return true } -func (g *ConnectionGater) InterceptSecured(dir lp2pnetwork.Direction, p lp2ppeer.ID, - cma lp2pnetwork.ConnMultiaddrs, -) bool { - allow := g.BasicConnectionGater.InterceptSecured(dir, p, cma) - if !allow { - g.logger.Debug("InterceptSecured not allowed", "p", p) +func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { + g.lk.RLock() + defer g.lk.RUnlock() + + if g.hasMaxConnections() { + g.logger.Debug("InterceptAccept rejected: many connections") + return false } - return allow + deny := g.filters.AddrBlocked(cma.RemoteMultiaddr()) + if deny { + g.logger.Debug("InterceptAccept rejected") + return false + } + + return true +} + +func (g *ConnectionGater) InterceptSecured(_ lp2pnetwork.Direction, _ lp2ppeer.ID, _ lp2pnetwork.ConnMultiaddrs) bool { + return true } -func (g *ConnectionGater) InterceptUpgraded(con lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) { - return g.BasicConnectionGater.InterceptUpgraded(con) +func (g *ConnectionGater) InterceptUpgraded(_ lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) { + return true, 0 } diff --git a/network/gater_test.go b/network/gater_test.go index 1ae2e9d50..df2a8c48e 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -1 +1,83 @@ package network + +import ( + "testing" + + lp2pnet "github.com/libp2p/go-libp2p/core/network" + "github.com/multiformats/go-multiaddr" + "github.com/pactus-project/pactus/util/testsuite" + "github.com/stretchr/testify/assert" +) + +type mockConnMultiaddrs struct { + remote multiaddr.Multiaddr +} + +func (cma *mockConnMultiaddrs) LocalMultiaddr() multiaddr.Multiaddr { + return nil +} + +func (cma *mockConnMultiaddrs) RemoteMultiaddr() multiaddr.Multiaddr { + return cma.remote +} + +func TestAllowedConnections(t *testing.T) { + ts := testsuite.NewTestSuite(t) + conf := testConfig() + net := makeTestNetwork(t, conf, nil) + + maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") + maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234") + cmaPrivate := &mockConnMultiaddrs{remote: maPrivate} + cmaPublic := &mockConnMultiaddrs{remote: maPublic} + pid := ts.RandPeerID() + + assert.True(t, net.connGater.InterceptPeerDial(pid)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.True(t, net.connGater.InterceptAccept(cmaPublic)) +} + +func TestDenyPrivate(t *testing.T) { + ts := testsuite.NewTestSuite(t) + conf := testConfig() + conf.ForcePrivateNetwork = false + net := makeTestNetwork(t, conf, nil) + + maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") + maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234") + cmaPrivate := &mockConnMultiaddrs{remote: maPrivate} + cmaPublic := &mockConnMultiaddrs{remote: maPublic} + pid := ts.RandPeerID() + + assert.True(t, net.connGater.InterceptPeerDial(pid)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.False(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.True(t, net.connGater.InterceptAccept(cmaPublic)) +} + +func TestMaxConnection(t *testing.T) { + ts := testsuite.NewTestSuite(t) + conf := testConfig() + conf.MaxConns = 1 + net := makeTestNetwork(t, conf, nil) + + maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") + maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234") + cmaPrivate := &mockConnMultiaddrs{remote: maPrivate} + cmaPublic := &mockConnMultiaddrs{remote: maPublic} + pid := ts.RandPeerID() + + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnet.DirInbound, nil) + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnet.DirInbound, nil) + + assert.False(t, net.connGater.InterceptPeerDial(pid)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.False(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.False(t, net.connGater.InterceptAccept(cmaPublic)) +} diff --git a/network/network.go b/network/network.go index f874679d9..834ca0a5f 100644 --- a/network/network.go +++ b/network/network.go @@ -215,12 +215,13 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts [] n.mdns = newMdnsService(ctx, n.host, n.logger) } n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger) - n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger) + n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, streamProtocolID, conf, n.logger) n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger) n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger) - n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID, conf.Bootstrapper) + n.notifee = newNotifeeService(n.host, n.eventChannel, n.peerMgr, streamProtocolID, conf.Bootstrapper, n.logger) n.host.Network().Notify(n.notifee) + n.connGater.SetPeerManager(n.peerMgr) n.logger.Info("network setup", "id", n.host.ID(), "address", conf.ListenAddrStrings, @@ -272,6 +273,7 @@ func (n *network) SelfID() lp2ppeer.ID { } func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error { + n.logger.Trace("Sending new message", "to", pid) return n.stream.SendRequest(msg, pid) } diff --git a/network/notifee.go b/network/notifee.go index 02282ba76..2b7c11093 100644 --- a/network/notifee.go +++ b/network/notifee.go @@ -3,6 +3,7 @@ package network import ( "time" + lp2pcore "github.com/libp2p/go-libp2p/core" lp2phost "github.com/libp2p/go-libp2p/core/host" lp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" @@ -16,27 +17,30 @@ type NotifeeService struct { eventChannel chan<- Event logger *logger.SubLogger protocolID protocol.ID + peerMgr *peerMgr bootstrapper bool } -func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, - log *logger.SubLogger, protocolID protocol.ID, bootstrapper bool, +func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, peerMgr *peerMgr, + protocolID protocol.ID, bootstrapper bool, log *logger.SubLogger, ) *NotifeeService { notifee := &NotifeeService{ host: host, eventChannel: eventChannel, - logger: log, protocolID: protocolID, bootstrapper: bootstrapper, + peerMgr: peerMgr, + logger: log, } host.Network().Notify(notifee) return notifee } func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.Conn) { - peerID := conn.RemotePeer() - n.logger.Info("connected to peer", "pid", peerID) + pid := conn.RemotePeer() + n.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction) + var protocols []lp2pcore.ProtocolID go func() { for i := 0; i < 10; i++ { // TODO: better way? @@ -44,33 +48,39 @@ func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.C time.Sleep(1 * time.Second) peerStore := lp2pn.Peerstore() - protocols, _ := peerStore.GetProtocols(peerID) + protocols, _ = peerStore.GetProtocols(pid) if len(protocols) > 0 { if slices.Contains(protocols, n.protocolID) { n.logger.Debug("peer supports the stream protocol", - "pid", peerID, "protocols", protocols) + "pid", pid, "protocols", protocols) - n.eventChannel <- &ConnectEvent{PeerID: peerID} + n.eventChannel <- &ConnectEvent{PeerID: pid} } else { n.logger.Debug("peer doesn't support the stream protocol", - "pid", peerID, "protocols", protocols) + "pid", pid, "protocols", protocols) } - return + break } } - n.logger.Info("unable to get supported protocols", "pid", peerID) - if !n.bootstrapper { - // Close this connection since we can't send a direct message to this peer. - _ = n.host.Network().ClosePeer(peerID) + if len(protocols) == 0 { + n.logger.Info("unable to get supported protocols", "pid", pid) + if !n.bootstrapper { + // Close this connection since we can't send a direct message to this peer. + _ = n.host.Network().ClosePeer(pid) + } } + + n.peerMgr.AddPeer(pid, conn.RemoteMultiaddr(), conn.Stat().Direction, protocols) }() } func (n *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) { - peerID := conn.RemotePeer() - n.logger.Info("disconnected from peer", "pid", peerID) - n.eventChannel <- &DisconnectEvent{PeerID: peerID} + pid := conn.RemotePeer() + n.logger.Info("disconnected from peer", "pid", pid) + n.eventChannel <- &DisconnectEvent{PeerID: pid} + + n.peerMgr.RemovePeer(pid) } func (n *NotifeeService) Listen(_ lp2pnetwork.Network, ma multiaddr.Multiaddr) { diff --git a/network/peermgr.go b/network/peermgr.go index a0f6ad97a..83b504684 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -2,50 +2,55 @@ package network import ( "context" + "sync" "time" lp2pdht "github.com/libp2p/go-libp2p-kad-dht" + lp2pcore "github.com/libp2p/go-libp2p/core" lp2phost "github.com/libp2p/go-libp2p/core/host" lp2pnet "github.com/libp2p/go-libp2p/core/network" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util/logger" ) -// peerMgr attempts to keep the p2p host connected to the network -// by keeping a minimum threshold of connections. If the threshold isn't met it -// connects to a random subset of the peerMgr peers. It does not use peer routing -// to discover new peers. To stop a peerMgr cancel the context passed in Start() -// or call Stop(). -type peerMgr struct { - ctx context.Context - bootstrapAddrs []lp2ppeer.AddrInfo - minConns int - maxConns int - - // Dependencies - host lp2phost.Host - dialer lp2pnet.Dialer - dht *lp2pdht.IpfsDHT - - logger *logger.SubLogger +type peerInfo struct { + MultiAddress multiaddr.Multiaddr + Direction lp2pnet.Direction + Protocols []lp2pcore.ProtocolID } -// newPeerMgr creates a new Peer Manager instance. // Peer Manager attempts to establish connections with other nodes when the // number of connections falls below the minimum threshold. +type peerMgr struct { + lk sync.RWMutex + + ctx context.Context + bootstrapAddrs []lp2ppeer.AddrInfo + minConns int + maxConns int + host lp2phost.Host + dht *lp2pdht.IpfsDHT + peers map[lp2ppeer.ID]*peerInfo + streamProtocolID lp2pcore.ProtocolID + logger *logger.SubLogger +} + +// newPeerMgr creates a new Peer Manager instance. func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT, - conf *Config, log *logger.SubLogger, + streamProtocolID lp2pcore.ProtocolID, conf *Config, log *logger.SubLogger, ) *peerMgr { b := &peerMgr{ - ctx: ctx, - bootstrapAddrs: conf.BootstrapAddrInfos(), - minConns: conf.MinConns, - maxConns: conf.MaxConns, - host: h, - dialer: h.Network(), - dht: dht, - logger: log, + ctx: ctx, + bootstrapAddrs: conf.BootstrapAddrInfos(), + minConns: conf.MinConns, + maxConns: conf.MaxConns, + streamProtocolID: streamProtocolID, + peers: make(map[lp2ppeer.ID]*peerInfo), + host: h, + dht: dht, + logger: log, } return b @@ -53,7 +58,7 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT, // Start starts the Peer Manager. func (mgr *peerMgr) Start() { - mgr.checkConnectivity() + mgr.CheckConnectivity() go func() { ticker := time.NewTicker(1 * time.Minute) @@ -64,7 +69,7 @@ func (mgr *peerMgr) Start() { case <-mgr.ctx.Done(): return case <-ticker.C: - mgr.checkConnectivity() + mgr.CheckConnectivity() } } }() @@ -75,20 +80,52 @@ func (mgr *peerMgr) Stop() { // TODO: complete me } +func (mgr *peerMgr) NumOfConnected() int { + mgr.lk.RLock() + defer mgr.lk.RUnlock() + + return len(mgr.peers) // TODO: try to keep record of all peers + connected peers +} + +func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr, + direction lp2pnet.Direction, protocols []lp2pcore.ProtocolID, +) { + mgr.lk.Lock() + defer mgr.lk.Unlock() + + mgr.peers[pid] = &peerInfo{ + MultiAddress: ma, + Direction: direction, + Protocols: protocols, + } +} + +func (mgr *peerMgr) RemovePeer(pid lp2ppeer.ID) { + mgr.lk.Lock() + defer mgr.lk.Unlock() + + delete(mgr.peers, pid) +} + // checkConnectivity performs the actual work of maintaining connections. // It ensures that the number of connections stays within the minimum and maximum thresholds. -func (mgr *peerMgr) checkConnectivity() { - currentPeers := mgr.dialer.Peers() - mgr.logger.Debug("check connectivity", "peers", len(currentPeers)) +func (mgr *peerMgr) CheckConnectivity() { + mgr.lk.Lock() + defer mgr.lk.Unlock() + + mgr.logger.Debug("check connectivity", "peers", len(mgr.peers)) + + net := mgr.host.Network() // Let's check if some peers are disconnected var connectedPeers []lp2ppeer.ID - for _, p := range currentPeers { - connectedness := mgr.dialer.Connectedness(p) + for pid := range mgr.peers { + connectedness := net.Connectedness(pid) if connectedness == lp2pnet.Connected { - connectedPeers = append(connectedPeers, p) + connectedPeers = append(connectedPeers, pid) } else { - mgr.logger.Debug("peer is not connected to us", "peer", p) + mgr.logger.Debug("peer is not connected to us", "peer", pid) + delete(mgr.peers, pid) } } diff --git a/network/utils.go b/network/utils.go index 33887a6ee..201dc24f0 100644 --- a/network/utils.go +++ b/network/utils.go @@ -139,18 +139,24 @@ func SubnetsToFilters(subnets []*net.IPNet, action multiaddr.Action) *multiaddr. func MakeScalingLimitConfig(minConns, maxConns int) lp2prcmgr.ScalingLimitConfig { limit := lp2prcmgr.DefaultLimits + limit.SystemBaseLimit.ConnsOutbound = LogScale(maxConns / 2) limit.SystemBaseLimit.ConnsInbound = LogScale(maxConns / 2) limit.SystemBaseLimit.Conns = LogScale(maxConns) + limit.SystemBaseLimit.StreamsOutbound = LogScale(maxConns / 2) limit.SystemBaseLimit.StreamsInbound = LogScale(maxConns / 2) limit.SystemBaseLimit.Streams = LogScale(maxConns) + limit.ServiceLimitIncrease.ConnsOutbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.ConnsInbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.Conns = LogScale(minConns) + limit.ServiceLimitIncrease.StreamsOutbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.StreamsInbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.Streams = LogScale(minConns) + limit.TransientBaseLimit.ConnsOutbound = LogScale(maxConns / 2) limit.TransientBaseLimit.ConnsInbound = LogScale(maxConns / 2) limit.TransientBaseLimit.Conns = LogScale(maxConns) + limit.TransientBaseLimit.StreamsOutbound = LogScale(maxConns / 2) limit.TransientBaseLimit.StreamsInbound = LogScale(maxConns / 2) limit.TransientBaseLimit.Streams = LogScale(maxConns)