Skip to content

Commit

Permalink
fix(network): check connection threshold on gater (#803)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Nov 7, 2023
1 parent 749116c commit d2c7b65
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 103 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func SaveTestnetConfig(path string, numValidators int) error {
"/dns/pactus.nodesync.top/tcp/21777/p2p/12D3KooWP25ejVsd7cL5DvWAPwEu4JTUwnPniHBf4w93tgSezVt8", // NodeSync.Top ([email protected])
"/ip4/95.217.89.202/tcp/21777/p2p/12D3KooWMsi5oYkbbpyyXctmPXzF8UZu2pCvKPRZGyvymhN9BzTD", // CodeBlockLabs ([email protected])
}
conf.Network.MinConns = 8
conf.Network.MaxConns = 16
conf.Network.MinConns = 16
conf.Network.MaxConns = 32
conf.Network.EnableNAT = false
conf.Network.EnableRelay = false
conf.Network.RelayAddrStrings = []string{
Expand Down
8 changes: 4 additions & 4 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
## bootstrap_addrs = ["/ip4/172.104.46.145/tcp/21777/p2p/12D3KooWNYD4bB82YZRXv6oNyYPwc5ozabx2epv75ATV3D8VD3Mq"]

# `min_connections` is the minimum number of connections that the Pactus node should maintain.
# Default is 8
## min_connections = 8
# Default is 16
## min_connections = 16

# `max_connections` is the maximum number of connections that the Pactus node should maintain.
# Default is 16
## max_connections = 16
# Default is 32
## max_connections = 32

# `enable_nat` indicates whether NAT service should be enabled or not.
# NAT service allows many machines to share a single public address.
Expand Down
4 changes: 2 additions & 2 deletions network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func DefaultConfig() *Config {
},
RelayAddrStrings: []string{},
BootstrapAddrStrings: bootstrapAddrs,
MinConns: 8,
MaxConns: 16,
MinConns: 16,
MaxConns: 32,
EnableNAT: false,
EnableRelay: false,
EnableMdns: false,
Expand Down
106 changes: 66 additions & 40 deletions network/gater.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,108 @@
package network

import (
"sync"

lp2pconnmgr "github.com/libp2p/go-libp2p/core/connmgr"
lp2pcontrol "github.com/libp2p/go-libp2p/core/control"
lp2pnetwork "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
lp2pconngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/util/logger"
)

var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{}

type ConnectionGater struct {
*lp2pconngater.BasicConnectionGater
logger *logger.SubLogger
lk sync.RWMutex

filters *multiaddr.Filters
peerMgr *peerMgr
maxConn int
logger *logger.SubLogger
}

func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) {
connGater, err := lp2pconngater.NewBasicConnectionGater(nil)
if err != nil {
return nil, err
}

filters := multiaddr.NewFilters()
if !conf.ForcePrivateNetwork {
privateSubnets := PrivateSubnets()
for _, sn := range privateSubnets {
err := connGater.BlockSubnet(sn)
if err != nil {
return nil, LibP2PError{Err: err}
}
}
filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny)
}

return &ConnectionGater{
BasicConnectionGater: connGater,
logger: log,
filters: filters,
maxConn: conf.MaxConns,
logger: log,
}, nil
}

func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool {
allow := g.BasicConnectionGater.InterceptPeerDial(p)
if !allow {
g.logger.Debug("InterceptPeerDial not allowed", "p")
func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) {
g.lk.Lock()
defer g.lk.Unlock()

g.peerMgr = peerMgr
}

func (g *ConnectionGater) hasMaxConnections() bool {
if g.peerMgr == nil {
return false
}

return allow
return g.peerMgr.NumOfConnected() > g.maxConn
}

func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiaddr) bool {
allow := g.BasicConnectionGater.InterceptAddrDial(p, ma)
if !allow {
g.logger.Debug("InterceptAddrDial not allowed", "p", p, "ma", ma.String())
func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid)
return false
}

return allow
return true
}

func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
allow := g.BasicConnectionGater.InterceptAccept(cma)
if !allow {
g.logger.Debug("InterceptAccept not allowed")
func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multiaddr) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String())
return false
}

deny := g.filters.AddrBlocked(ma)
if deny {
g.logger.Debug("InterceptAddrDial rejected", "pid", pid, "ma", ma.String())
return false
}

return allow
return true
}

func (g *ConnectionGater) InterceptSecured(dir lp2pnetwork.Direction, p lp2ppeer.ID,
cma lp2pnetwork.ConnMultiaddrs,
) bool {
allow := g.BasicConnectionGater.InterceptSecured(dir, p, cma)
if !allow {
g.logger.Debug("InterceptSecured not allowed", "p", p)
func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
g.logger.Debug("InterceptAccept rejected: many connections")
return false
}

return allow
deny := g.filters.AddrBlocked(cma.RemoteMultiaddr())
if deny {
g.logger.Debug("InterceptAccept rejected")
return false
}

return true
}

func (g *ConnectionGater) InterceptSecured(_ lp2pnetwork.Direction, _ lp2ppeer.ID, _ lp2pnetwork.ConnMultiaddrs) bool {
return true
}

func (g *ConnectionGater) InterceptUpgraded(con lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) {
return g.BasicConnectionGater.InterceptUpgraded(con)
func (g *ConnectionGater) InterceptUpgraded(_ lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) {
return true, 0
}
82 changes: 82 additions & 0 deletions network/gater_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,83 @@
package network

import (
"testing"

lp2pnet "github.com/libp2p/go-libp2p/core/network"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/assert"
)

type mockConnMultiaddrs struct {
remote multiaddr.Multiaddr
}

func (cma *mockConnMultiaddrs) LocalMultiaddr() multiaddr.Multiaddr {
return nil
}

func (cma *mockConnMultiaddrs) RemoteMultiaddr() multiaddr.Multiaddr {
return cma.remote
}

func TestAllowedConnections(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

assert.True(t, net.connGater.InterceptPeerDial(pid))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.True(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))
}

func TestDenyPrivate(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
conf.ForcePrivateNetwork = false
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

assert.True(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.False(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))
}

func TestMaxConnection(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
conf.MaxConns = 1
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnet.DirInbound, nil)
net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnet.DirInbound, nil)

assert.False(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.False(t, net.connGater.InterceptAccept(cmaPrivate))
assert.False(t, net.connGater.InterceptAccept(cmaPublic))
}
6 changes: 4 additions & 2 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,13 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts []
n.mdns = newMdnsService(ctx, n.host, n.logger)
}
n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, streamProtocolID, conf, n.logger)
n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger)
n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger)
n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID, conf.Bootstrapper)
n.notifee = newNotifeeService(n.host, n.eventChannel, n.peerMgr, streamProtocolID, conf.Bootstrapper, n.logger)

n.host.Network().Notify(n.notifee)
n.connGater.SetPeerManager(n.peerMgr)

n.logger.Info("network setup", "id", n.host.ID(),
"address", conf.ListenAddrStrings,
Expand Down Expand Up @@ -272,6 +273,7 @@ func (n *network) SelfID() lp2ppeer.ID {
}

func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error {
n.logger.Trace("Sending new message", "to", pid)
return n.stream.SendRequest(msg, pid)
}

Expand Down
44 changes: 27 additions & 17 deletions network/notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"time"

lp2pcore "github.com/libp2p/go-libp2p/core"
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -16,61 +17,70 @@ type NotifeeService struct {
eventChannel chan<- Event
logger *logger.SubLogger
protocolID protocol.ID
peerMgr *peerMgr
bootstrapper bool
}

func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event,
log *logger.SubLogger, protocolID protocol.ID, bootstrapper bool,
func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, peerMgr *peerMgr,
protocolID protocol.ID, bootstrapper bool, log *logger.SubLogger,
) *NotifeeService {
notifee := &NotifeeService{
host: host,
eventChannel: eventChannel,
logger: log,
protocolID: protocolID,
bootstrapper: bootstrapper,
peerMgr: peerMgr,
logger: log,
}
host.Network().Notify(notifee)
return notifee
}

func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.Conn) {
peerID := conn.RemotePeer()
n.logger.Info("connected to peer", "pid", peerID)
pid := conn.RemotePeer()
n.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction)

var protocols []lp2pcore.ProtocolID
go func() {
for i := 0; i < 10; i++ {
// TODO: better way?
// Wait to complete libp2p identify
time.Sleep(1 * time.Second)

peerStore := lp2pn.Peerstore()
protocols, _ := peerStore.GetProtocols(peerID)
protocols, _ = peerStore.GetProtocols(pid)
if len(protocols) > 0 {
if slices.Contains(protocols, n.protocolID) {
n.logger.Debug("peer supports the stream protocol",
"pid", peerID, "protocols", protocols)
"pid", pid, "protocols", protocols)

n.eventChannel <- &ConnectEvent{PeerID: peerID}
n.eventChannel <- &ConnectEvent{PeerID: pid}
} else {
n.logger.Debug("peer doesn't support the stream protocol",
"pid", peerID, "protocols", protocols)
"pid", pid, "protocols", protocols)
}
return
break
}
}

n.logger.Info("unable to get supported protocols", "pid", peerID)
if !n.bootstrapper {
// Close this connection since we can't send a direct message to this peer.
_ = n.host.Network().ClosePeer(peerID)
if len(protocols) == 0 {
n.logger.Info("unable to get supported protocols", "pid", pid)
if !n.bootstrapper {
// Close this connection since we can't send a direct message to this peer.
_ = n.host.Network().ClosePeer(pid)
}
}

n.peerMgr.AddPeer(pid, conn.RemoteMultiaddr(), conn.Stat().Direction, protocols)
}()
}

func (n *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
peerID := conn.RemotePeer()
n.logger.Info("disconnected from peer", "pid", peerID)
n.eventChannel <- &DisconnectEvent{PeerID: peerID}
pid := conn.RemotePeer()
n.logger.Info("disconnected from peer", "pid", pid)
n.eventChannel <- &DisconnectEvent{PeerID: pid}

n.peerMgr.RemovePeer(pid)
}

func (n *NotifeeService) Listen(_ lp2pnetwork.Network, ma multiaddr.Multiaddr) {
Expand Down
Loading

0 comments on commit d2c7b65

Please sign in to comment.