From 73c12f7da864ae0a01436385075ab096200e72f2 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 08:17:49 +0100 Subject: [PATCH 01/18] Use a more explicit namespace for DHT discovery to reduce the chance of colliding with other protocols --- pkg/network/autopeering/autopeering.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/network/autopeering/autopeering.go b/pkg/network/autopeering/autopeering.go index 2dc5e9aa4..c6e53f153 100644 --- a/pkg/network/autopeering/autopeering.go +++ b/pkg/network/autopeering/autopeering.go @@ -2,6 +2,7 @@ package autopeering import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -19,7 +20,7 @@ import ( ) type Manager struct { - networkID string + namespace string p2pManager *p2p.Manager logger log.Logger host host.Host @@ -35,7 +36,7 @@ type Manager struct { // NewManager creates a new autopeering manager. func NewManager(networkID string, p2pManager *p2p.Manager, host host.Host, peerDB *network.DB, logger log.Logger) *Manager { return &Manager{ - networkID: networkID, + namespace: fmt.Sprintf("/iota/%s/1.0.0", networkID), p2pManager: p2pManager, host: host, peerDB: peerDB, @@ -77,7 +78,7 @@ func (m *Manager) Start(ctx context.Context) (err error) { } m.routingDiscovery = routing.NewRoutingDiscovery(kademliaDHT) - util.Advertise(m.ctx, m.routingDiscovery, m.networkID, discovery.TTL(5*time.Minute)) + util.Advertise(m.ctx, m.routingDiscovery, m.namespace, discovery.TTL(5*time.Minute)) go m.discoveryLoop() @@ -119,8 +120,8 @@ func (m *Manager) discoverAndDialPeers() { tctx, cancel := context.WithTimeout(m.ctx, 10*time.Second) defer cancel() - m.logger.LogDebugf("Discovering peers for network ID %s", m.networkID) - peerChan, err := m.routingDiscovery.FindPeers(tctx, m.networkID) + m.logger.LogDebugf("Discovering peers for namespace %s", m.namespace) + peerChan, err := m.routingDiscovery.FindPeers(tctx, m.namespace) if err != nil { m.logger.LogWarnf("Failed to find peers: %s", err) } From a0befb19e4fea0c11c28ebe9c54c1f15229fa24b Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 08:29:56 +0100 Subject: [PATCH 02/18] Use shrinking map in the p2p manager and clean up unused code --- pkg/network/p2p/manager.go | 90 ++++++++++++-------------------------- 1 file changed, 29 insertions(+), 61 deletions(-) diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index d6ed84420..679f8a6da 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -8,10 +8,10 @@ import ( p2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" "google.golang.org/protobuf/proto" + "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/log" "github.com/iotaledger/hive.go/runtime/syncutils" @@ -21,16 +21,6 @@ import ( const ( protocolID = "iota-core/1.0.0" defaultConnectionTimeout = 5 * time.Second // timeout after which the connection must be established. - ioTimeout = 4 * time.Second -) - -var ( - // ErrTimeout is returned when an expected incoming connection was not received in time. - ErrTimeout = ierrors.New("accept timeout") - // ErrDuplicateAccept is returned when the server already registered an accept request for that peer ID. - ErrDuplicateAccept = ierrors.New("accept request for that peer already exists") - // ErrNoP2P means that the given peer does not support the p2p service. - ErrNoP2P = ierrors.New("peer does not have a p2p service") ) // ConnectPeerOption defines an option for the DialPeer and AcceptPeer methods. @@ -76,8 +66,7 @@ type Manager struct { shutdownMutex syncutils.RWMutex isShutdown bool - neighbors map[peer.ID]*Neighbor - neighborsMutex syncutils.RWMutex + neighbors *shrinkingmap.ShrinkingMap[peer.ID, *Neighbor] protocolHandler *ProtocolHandler protocolHandlerMutex syncutils.RWMutex @@ -90,7 +79,7 @@ func NewManager(libp2pHost host.Host, peerDB *network.DB, logger log.Logger) *Ma peerDB: peerDB, logger: logger, Events: NewNeighborEvents(), - neighbors: make(map[peer.ID]*Neighbor), + neighbors: shrinkingmap.New[peer.ID, *Neighbor](), } return m @@ -106,7 +95,7 @@ func (m *Manager) RegisterProtocol(factory func() proto.Message, handler func(pe PacketHandler: handler, } - m.libp2pHost.SetStreamHandler(protocol.ID(protocolID), m.handleStream) + m.libp2pHost.SetStreamHandler(protocolID, m.handleStream) } // UnregisterProtocol unregisters the handler for the protocol. @@ -114,7 +103,7 @@ func (m *Manager) UnregisterProtocol() { m.protocolHandlerMutex.Lock() defer m.protocolHandlerMutex.Unlock() - m.libp2pHost.RemoveStreamHandler(protocol.ID(protocolID)) + m.libp2pHost.RemoveStreamHandler(protocolID) m.protocolHandler = nil } @@ -216,31 +205,18 @@ func (m *Manager) Send(packet proto.Message, to ...peer.ID) { } for _, nbr := range neighbors { - nbr.Enqueue(packet, protocol.ID(protocolID)) + nbr.Enqueue(packet, protocolID) } } // AllNeighbors returns all the neighbors that are currently connected. func (m *Manager) AllNeighbors() []*Neighbor { - m.neighborsMutex.RLock() - defer m.neighborsMutex.RUnlock() - result := make([]*Neighbor, 0, len(m.neighbors)) - for _, n := range m.neighbors { - result = append(result, n) - } - - return result + return m.neighbors.Values() } // AllNeighborsIDs returns all the ids of the neighbors that are currently connected. -func (m *Manager) AllNeighborsIDs() (ids []peer.ID) { - ids = make([]peer.ID, 0) - neighbors := m.AllNeighbors() - for _, nbr := range neighbors { - ids = append(ids, nbr.ID) - } - - return +func (m *Manager) AllNeighborsIDs() []peer.ID { + return m.neighbors.Keys() } // NeighborsByID returns all the neighbors that are currently connected corresponding to the supplied ids. @@ -250,10 +226,8 @@ func (m *Manager) NeighborsByID(ids []peer.ID) []*Neighbor { return result } - m.neighborsMutex.RLock() - defer m.neighborsMutex.RUnlock() for _, id := range ids { - if n, ok := m.neighbors[id]; ok { + if n, ok := m.neighbors.Get(id); ok { result = append(result, n) } } @@ -267,7 +241,7 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { if m.protocolHandler == nil { m.logger.LogError("no protocol handler registered") - stream.Close() + _ = stream.Close() return } @@ -284,16 +258,16 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { ID: stream.Conn().RemotePeer(), Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()}, } - peer := network.NewPeerFromAddrInfo(peerAddrInfo) - if err := m.peerDB.UpdatePeer(peer); err != nil { - m.logger.LogErrorf("failed to update peer in peer database, peerID: %s, error: %s", peer.ID, err) + networkPeer := network.NewPeerFromAddrInfo(peerAddrInfo) + if err := m.peerDB.UpdatePeer(networkPeer); err != nil { + m.logger.LogErrorf("failed to update peer in peer database, peerID: %s, error: %s", networkPeer.ID, err) m.closeStream(stream) return } - if err := m.addNeighbor(peer, ps); err != nil { - m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", peer.ID, err) + if err := m.addNeighbor(networkPeer, ps); err != nil { + m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", networkPeer.ID, err) m.closeStream(stream) return @@ -308,10 +282,7 @@ func (m *Manager) closeStream(s p2pnetwork.Stream) { // neighborWithGroup returns neighbor by ID and group. func (m *Manager) neighbor(id peer.ID) (*Neighbor, error) { - m.neighborsMutex.RLock() - defer m.neighborsMutex.RUnlock() - - nbr, ok := m.neighbors[id] + nbr, ok := m.neighbors.Get(id) if !ok { return nil, ErrUnknownNeighbor } @@ -364,28 +335,25 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { } func (m *Manager) neighborExists(id peer.ID) bool { - m.neighborsMutex.RLock() - defer m.neighborsMutex.RUnlock() - _, exists := m.neighbors[id] - - return exists + return m.neighbors.Has(id) } func (m *Manager) deleteNeighbor(nbr *Neighbor) { - m.neighborsMutex.Lock() - defer m.neighborsMutex.Unlock() - delete(m.neighbors, nbr.ID) + m.neighbors.Delete(nbr.ID) } func (m *Manager) setNeighbor(nbr *Neighbor) error { - m.neighborsMutex.Lock() - defer m.neighborsMutex.Unlock() - if _, exists := m.neighbors[nbr.ID]; exists { - return ierrors.WithStack(ErrDuplicateNeighbor) - } - m.neighbors[nbr.ID] = nbr + var err error + m.neighbors.Compute(nbr.ID, func(currentValue *Neighbor, exists bool) *Neighbor { + if exists { + err = ierrors.WithStack(ErrDuplicateNeighbor) + return currentValue + } - return nil + return nbr + }) + + return err } func (m *Manager) dropAllNeighbors() { From a41223c30f150e4a8fc36844274efc81535e7244 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 08:49:09 +0100 Subject: [PATCH 03/18] Stop AutoPeeringMgr on shutdown --- components/p2p/component.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/components/p2p/component.go b/components/p2p/component.go index 4dbbea364..71ea6e08c 100644 --- a/components/p2p/component.go +++ b/components/p2p/component.go @@ -341,7 +341,10 @@ func run() error { defer func() { if err := deps.ManualPeeringMgr.Stop(); err != nil { - Component.LogErrorf("Failed to stop the manager", "err", err) + Component.LogErrorf("Failed to stop the manager: %s", err) + } + if err := deps.AutoPeeringMgr.Stop(); err != nil { + Component.LogErrorf("Failed to stop autopeering manager: %s", err) } }() //nolint:contextcheck // false positive From 57c97a69736d2589915d3587f1792569496b6657 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 09:06:52 +0100 Subject: [PATCH 04/18] Fixed log format --- pkg/network/p2p/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 679f8a6da..963be8139 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -328,7 +328,7 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { } nbr.readLoop() nbr.writeLoop() - nbr.logger.LogInfo("Connection established to %s") + nbr.logger.LogInfof("Connection established to %s", nbr.ID) m.Events.NeighborAdded.Trigger(nbr) return nil From 4676c52586ba084a01d1fad239c5cf45c2976d25 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 09:07:22 +0100 Subject: [PATCH 05/18] Always create a distinct network name when creating the docker or feature network snapshots --- tools/genesis-snapshot/presets/presets.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tools/genesis-snapshot/presets/presets.go b/tools/genesis-snapshot/presets/presets.go index 357708238..f9128e1c2 100644 --- a/tools/genesis-snapshot/presets/presets.go +++ b/tools/genesis-snapshot/presets/presets.go @@ -1,6 +1,7 @@ package presets import ( + "fmt" "time" "golang.org/x/crypto/blake2b" @@ -23,14 +24,14 @@ var ( // use defaults from iota.go. protocolParamsDocker = iotago.NewV3SnapshotProtocolParameters( - iotago.WithNetworkOptions("docker", iotago.PrefixTestnet), + iotago.WithNetworkOptions(fmt.Sprintf("docker-%d", time.Now().Unix()), iotago.PrefixTestnet), iotago.WithTimeProviderOptions(5, time.Now().Unix(), 10, 13), iotago.WithLivenessOptions(10, 15, 3, 6, 8), ) // use defaults from iota.go. protocolParamsFeature = iotago.NewV3SnapshotProtocolParameters( - iotago.WithNetworkOptions("feature", iotago.PrefixTestnet), + iotago.WithNetworkOptions(fmt.Sprintf("feature-%d", time.Now().Unix()), iotago.PrefixTestnet), iotago.WithTimeProviderOptions(666666, time.Now().Unix()-100_000, 10, 13), ) ) From c5f1188bcaaefbd8427bde5b727a87d9e3c8a410 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 09:14:34 +0100 Subject: [PATCH 06/18] Add a boolean to the config to enable/disable autopeering --- components/p2p/component.go | 19 +++++++++++++------ components/p2p/params.go | 4 ++++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/components/p2p/component.go b/components/p2p/component.go index 71ea6e08c..95558f81d 100644 --- a/components/p2p/component.go +++ b/components/p2p/component.go @@ -335,18 +335,25 @@ func configure() error { func run() error { if err := Component.Daemon().BackgroundWorker(Component.Name, func(ctx context.Context) { deps.ManualPeeringMgr.Start() - if err := deps.AutoPeeringMgr.Start(ctx); err != nil { - Component.LogFatalf("Failed to start autopeering manager: %s", err) - } defer func() { if err := deps.ManualPeeringMgr.Stop(); err != nil { Component.LogErrorf("Failed to stop the manager: %s", err) } - if err := deps.AutoPeeringMgr.Stop(); err != nil { - Component.LogErrorf("Failed to stop autopeering manager: %s", err) - } }() + + if ParamsP2P.Autopeering.Enabled { + if err := deps.AutoPeeringMgr.Start(ctx); err != nil { + Component.LogFatalf("Failed to start autopeering manager: %s", err) + } + + defer func() { + if err := deps.AutoPeeringMgr.Stop(); err != nil { + Component.LogErrorf("Failed to stop autopeering manager: %s", err) + } + }() + } + //nolint:contextcheck // false positive connectConfigKnownPeers() <-ctx.Done() diff --git a/components/p2p/params.go b/components/p2p/params.go index 68acdffbc..faf741207 100644 --- a/components/p2p/params.go +++ b/components/p2p/params.go @@ -28,6 +28,10 @@ type ParametersP2P struct { // Defines the private key used to derive the node identity (optional). IdentityPrivateKey string `default:"" usage:"private key used to derive the node identity (optional)"` + Autopeering struct { + Enabled bool `default:"true" usage:"enable or disable autopeering"` + } + Database struct { // Defines the path to the p2p database. Path string `default:"testnet/p2pstore" usage:"the path to the p2p database"` From 9de27f3d6b1e331180fb260bfc84c8caceb89fbb Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 09:15:26 +0100 Subject: [PATCH 07/18] Use p2p.bootstrapPeers in docker-network instead of p2p.peers since this will default to using autopeering --- tools/docker-network/.env | 4 ++-- tools/docker-network/docker-compose.yml | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/tools/docker-network/.env b/tools/docker-network/.env index 025ff1d9e..ba729e121 100644 --- a/tools/docker-network/.env +++ b/tools/docker-network/.env @@ -9,6 +9,6 @@ COMMON_CONFIG=" --protocol.snapshot.path=/app/data/snapshot.bin " -MANUALPEERING_CONFIG=" ---p2p.peers=/dns/node-1-validator/tcp/15600/p2p/12D3KooWRVt4Engu27jHnF2RjfX48EqiAqJbgLfFdHNt3Vn6BtJK\ +AUTOPEERING_CONFIG=" +--p2p.bootstrapPeers=/dns/node-1-validator/tcp/15600/p2p/12D3KooWRVt4Engu27jHnF2RjfX48EqiAqJbgLfFdHNt3Vn6BtJK\ " diff --git a/tools/docker-network/docker-compose.yml b/tools/docker-network/docker-compose.yml index bd97b4ace..e49804fc5 100644 --- a/tools/docker-network/docker-compose.yml +++ b/tools/docker-network/docker-compose.yml @@ -29,7 +29,6 @@ services: - ./config.json:/app/config.json:ro command: > ${COMMON_CONFIG} - ${MANUALPEERING_CONFIG} --p2p.identityPrivateKey=08735375679f3d8031353e94282ed1d65119e5c288fe56d6639d9184a3f978fee8febfedff11cc376daea0f59c395ae2e9a870a25ac4e36093000fbf4d0e8f18 --inx.enabled=true --inx.bindAddress=0.0.0.0:9029 @@ -56,7 +55,7 @@ services: - ./config.json:/app/config.json:ro command: > ${COMMON_CONFIG} - ${MANUALPEERING_CONFIG} + ${AUTOPEERING_CONFIG} --p2p.identityPrivateKey=ba771419c52132a0dfb2521ed18667813f398da159010a55a0a482af939affb92d3338789ad4a07a7631b91791deb11f82ed5dc612822f24275e9f7a313b691f --inx.enabled=true --inx.bindAddress=0.0.0.0:9029 @@ -83,7 +82,7 @@ services: - ./config.json:/app/config.json:ro command: > ${COMMON_CONFIG} - ${MANUALPEERING_CONFIG} + ${AUTOPEERING_CONFIG} --p2p.identityPrivateKey=a6261ac049755675ff1437654ca9f83b305055f01ff08c4f039209ef5a4a7d96d06fb61df77a8815209a8f4d204226dee593e50d0ec897ec440a2c1fbde77656 --inx.enabled=true --inx.bindAddress=0.0.0.0:9029 @@ -110,7 +109,7 @@ services: - ./config.json:/app/config.json:ro command: > ${COMMON_CONFIG} - ${MANUALPEERING_CONFIG} + ${AUTOPEERING_CONFIG} --p2p.identityPrivateKey=f205f6c4525069f71f9c7e987d72421a16c7900056b494a2b85fdf7942cf906aefbdc580f5d1ce4ae3f86ccfe109c6cd76df9b0e710a437b2aa964358c7b9449 --inx.enabled=true --inx.bindAddress=0.0.0.0:9029 @@ -135,7 +134,7 @@ services: - ./config.json:/app/config.json:ro command: > ${COMMON_CONFIG} - ${MANUALPEERING_CONFIG} + ${AUTOPEERING_CONFIG} --p2p.identityPrivateKey=03feb3bcd25e57f75697bb329e6e0100680431e4c45c85bc013da2aea9e9d0345e08a0c37407dc62369deebc64cb0fb3ea26127d19d141ee7fb8eaa6b92019d7 --inx.enabled=true --inx.bindAddress=0.0.0.0:9029 @@ -162,7 +161,7 @@ services: - ./config.json:/app/config.json:ro command: > ${COMMON_CONFIG} - ${MANUALPEERING_CONFIG} + ${AUTOPEERING_CONFIG} --p2p.identityPrivateKey=7d1491df3ef334dee988d6cdfc4b430b996d520bd63375a01d6754f8cee979b855b200fbea8c936ea1937a27e6ad72a7c9a21c1b17c2bd3c11f1f6994d813446 --inx.enabled=true --inx.bindAddress=0.0.0.0:9029 From 2f559ddb0afb69bf641549f07786b9b79eeac039 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 09:24:09 +0100 Subject: [PATCH 08/18] Let nodes wait for node-1 to be healthy before starting up in the docker network --- tools/docker-network/docker-compose.yml | 21 ++++++++++++++++++--- tools/docker-network/prometheus.yml | 2 +- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/tools/docker-network/docker-compose.yml b/tools/docker-network/docker-compose.yml index e49804fc5..e570cb26f 100644 --- a/tools/docker-network/docker-compose.yml +++ b/tools/docker-network/docker-compose.yml @@ -39,6 +39,9 @@ services: image: docker-network-node-1-validator:latest stop_grace_period: 1m restart: no + depends_on: + node-1-validator: + condition: service_healthy ulimits: nofile: soft: 16384 @@ -66,6 +69,9 @@ services: image: docker-network-node-1-validator:latest stop_grace_period: 1m restart: no + depends_on: + node-1-validator: + condition: service_healthy ulimits: nofile: soft: 16384 @@ -92,7 +98,10 @@ services: node-4-validator: image: docker-network-node-1-validator:latest stop_grace_period: 1m - restart: unless-stopped + restart: no + depends_on: + node-1-validator: + condition: service_healthy ulimits: nofile: soft: 16384 @@ -114,10 +123,13 @@ services: --inx.enabled=true --inx.bindAddress=0.0.0.0:9029 - node-4: + node-5: image: docker-network-node-1-validator:latest stop_grace_period: 1m restart: no + depends_on: + node-1-validator: + condition: service_healthy ulimits: nofile: soft: 16384 @@ -141,10 +153,13 @@ services: --prometheus.goMetrics=true --prometheus.processMetrics=true - node-5: + node-6: image: docker-network-node-1-validator:latest stop_grace_period: 1m restart: no + depends_on: + node-1-validator: + condition: service_healthy ulimits: nofile: soft: 16384 diff --git a/tools/docker-network/prometheus.yml b/tools/docker-network/prometheus.yml index c6458b27f..6568d8236 100644 --- a/tools/docker-network/prometheus.yml +++ b/tools/docker-network/prometheus.yml @@ -7,8 +7,8 @@ scrape_configs: - node-2-validator:9311 - node-3-validator:9311 - node-4-validator:9311 - - node-4:9311 - node-5:9311 + - node-6:9311 dns_sd_configs: - names: - 'peer_replica' From 6d6cc9dfe7490d16ddd166de42f914e014183bd3 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 09:50:33 +0100 Subject: [PATCH 09/18] Better log formats --- pkg/network/autopeering/autopeering.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/network/autopeering/autopeering.go b/pkg/network/autopeering/autopeering.go index c6e53f153..29eadf8e1 100644 --- a/pkg/network/autopeering/autopeering.go +++ b/pkg/network/autopeering/autopeering.go @@ -117,13 +117,14 @@ func (m *Manager) discoveryLoop() { } func (m *Manager) discoverAndDialPeers() { - tctx, cancel := context.WithTimeout(m.ctx, 10*time.Second) + findCtx, cancel := context.WithTimeout(m.ctx, 10*time.Second) defer cancel() m.logger.LogDebugf("Discovering peers for namespace %s", m.namespace) - peerChan, err := m.routingDiscovery.FindPeers(tctx, m.namespace) + peerChan, err := m.routingDiscovery.FindPeers(findCtx, m.namespace) if err != nil { m.logger.LogWarnf("Failed to find peers: %s", err) + return } for peerAddrInfo := range peerChan { @@ -137,10 +138,10 @@ func (m *Manager) discoverAndDialPeers() { peer := network.NewPeerFromAddrInfo(&peerAddrInfo) if err := m.p2pManager.DialPeer(m.ctx, peer); err != nil { if ierrors.Is(err, p2p.ErrDuplicateNeighbor) { - m.logger.LogDebugf("Already connected to peer %s", peer) + m.logger.LogDebugf("Already connected to peer %s", peerAddrInfo) continue } - m.logger.LogWarnf("Failed to dial peer %s: %s", peer, err) + m.logger.LogWarnf("Failed to dial peer %s: %s", peerAddrInfo, err) } } } From 12fb8d6478ace82471b8a0088504420a3ddd154d Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 20 Feb 2024 09:55:18 +0100 Subject: [PATCH 10/18] Reduce log spam for already connected peers when they are found using autopeering --- pkg/network/autopeering/autopeering.go | 11 ++++++----- pkg/network/p2p/manager.go | 6 +++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/network/autopeering/autopeering.go b/pkg/network/autopeering/autopeering.go index 29eadf8e1..cdf22ebae 100644 --- a/pkg/network/autopeering/autopeering.go +++ b/pkg/network/autopeering/autopeering.go @@ -120,7 +120,7 @@ func (m *Manager) discoverAndDialPeers() { findCtx, cancel := context.WithTimeout(m.ctx, 10*time.Second) defer cancel() - m.logger.LogDebugf("Discovering peers for namespace %s", m.namespace) + m.logger.LogDebugf("Discovering new peers for namespace %s", m.namespace) peerChan, err := m.routingDiscovery.FindPeers(findCtx, m.namespace) if err != nil { m.logger.LogWarnf("Failed to find peers: %s", err) @@ -133,14 +133,15 @@ func (m *Manager) discoverAndDialPeers() { continue } + // Do not try to dial already connected peers. + if m.p2pManager.NeighborExists(peerAddrInfo.ID) { + continue + } + m.logger.LogDebugf("Found peer: %s", peerAddrInfo) peer := network.NewPeerFromAddrInfo(&peerAddrInfo) if err := m.p2pManager.DialPeer(m.ctx, peer); err != nil { - if ierrors.Is(err, p2p.ErrDuplicateNeighbor) { - m.logger.LogDebugf("Already connected to peer %s", peerAddrInfo) - continue - } m.logger.LogWarnf("Failed to dial peer %s: %s", peerAddrInfo, err) } } diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 963be8139..7ae469b91 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -116,7 +116,7 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...Conn return ierrors.New("no protocol handler registered to dial peer") } - if m.neighborExists(peer.ID) { + if m.NeighborExists(peer.ID) { return ierrors.Wrapf(ErrDuplicateNeighbor, "peer %s already exists", peer.ID) } @@ -299,7 +299,7 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { if m.isShutdown { return ErrNotRunning } - if m.neighborExists(peer.ID) { + if m.NeighborExists(peer.ID) { return ierrors.WithStack(ErrDuplicateNeighbor) } @@ -334,7 +334,7 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { return nil } -func (m *Manager) neighborExists(id peer.ID) bool { +func (m *Manager) NeighborExists(id peer.ID) bool { return m.neighbors.Has(id) } From af50e3ac4a3476ddb4d3dc643e9b979ce7ca1162 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 09:12:07 +0100 Subject: [PATCH 11/18] Refactored manualpeering and autopeering to inside the P2PManager Added an interface for the P2PManager --- components/dashboard/component.go | 20 +- components/p2p/component.go | 128 ++++--------- components/p2p/params.go | 2 +- components/protocol/component.go | 6 +- pkg/daemon/shutdown.go | 4 - pkg/network/errors.go | 14 ++ pkg/network/manager.go | 32 ++++ pkg/network/neighbor.go | 7 + .../{ => p2p}/autopeering/autopeering.go | 39 ++-- pkg/network/p2p/errors.go | 16 -- pkg/network/p2p/events.go | 22 --- pkg/network/p2p/manager.go | 179 ++++++++++-------- .../{ => p2p}/manualpeering/manualpeering.go | 55 +++--- pkg/network/p2p/neighbor.go | 48 +++-- pkg/network/p2p/neighbor_test.go | 10 +- 15 files changed, 299 insertions(+), 283 deletions(-) create mode 100644 pkg/network/errors.go create mode 100644 pkg/network/manager.go create mode 100644 pkg/network/neighbor.go rename pkg/network/{ => p2p}/autopeering/autopeering.go (78%) delete mode 100644 pkg/network/p2p/errors.go delete mode 100644 pkg/network/p2p/events.go rename pkg/network/{ => p2p}/manualpeering/manualpeering.go (81%) diff --git a/components/dashboard/component.go b/components/dashboard/component.go index 7f79604e8..34b65a464 100644 --- a/components/dashboard/component.go +++ b/components/dashboard/component.go @@ -17,7 +17,7 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/iota-core/components/metricstracker" "github.com/iotaledger/iota-core/pkg/daemon" - "github.com/iotaledger/iota-core/pkg/network/p2p" + "github.com/iotaledger/iota-core/pkg/network" "github.com/iotaledger/iota-core/pkg/protocol" ) @@ -49,7 +49,7 @@ type dependencies struct { Host host.Host Protocol *protocol.Protocol AppInfo *app.Info - P2PManager *p2p.Manager + NetworkManager network.Manager MetricsTracker *metricstracker.MetricsTracker } @@ -171,28 +171,20 @@ func currentNodeStatus() *nodestatus { func neighborMetrics() []neighbormetric { var stats []neighbormetric - if deps.P2PManager == nil { + if deps.NetworkManager == nil { return stats } // gossip plugin might be disabled - neighbors := deps.P2PManager.AllNeighbors() + neighbors := deps.NetworkManager.AllNeighbors() if neighbors == nil { return stats } for _, neighbor := range neighbors { - // origin := "Inbound" - // for _, p := range deps.P2PManager.AllNeighbors() { - // if neighbor.Peer == peer { - // origin = "Outbound" - // break - // } - // } - stats = append(stats, neighbormetric{ - ID: neighbor.Peer.ID.String(), - Addresses: fmt.Sprintf("%s", neighbor.Peer.PeerAddresses), + ID: neighbor.Peer().ID.String(), + Addresses: fmt.Sprintf("%s", neighbor.Peer().PeerAddresses), PacketsRead: neighbor.PacketsRead(), PacketsWritten: neighbor.PacketsWritten(), }) diff --git a/components/p2p/component.go b/components/p2p/component.go index 95558f81d..4e940ca8c 100644 --- a/components/p2p/component.go +++ b/components/p2p/component.go @@ -2,9 +2,7 @@ package p2p import ( "context" - "fmt" "path/filepath" - "time" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" @@ -21,11 +19,8 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" hivedb "github.com/iotaledger/hive.go/kvstore/database" - "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/iota-core/pkg/daemon" "github.com/iotaledger/iota-core/pkg/network" - "github.com/iotaledger/iota-core/pkg/network/autopeering" - "github.com/iotaledger/iota-core/pkg/network/manualpeering" "github.com/iotaledger/iota-core/pkg/network/p2p" "github.com/iotaledger/iota-core/pkg/protocol" ) @@ -51,9 +46,7 @@ type dependencies struct { dig.In PeeringConfig *configuration.Configuration `name:"peeringConfig"` PeeringConfigManager *p2p.ConfigManager - ManualPeeringMgr *manualpeering.Manager - AutoPeeringMgr *autopeering.Manager - P2PManager *p2p.Manager + NetworkManager network.Manager PeerDB *network.DB Protocol *protocol.Protocol PeerDBKVSTore kvstore.KVStore `name:"peerDBKVStore"` @@ -79,49 +72,6 @@ func initConfigParams(c *dig.Container) error { } func provide(c *dig.Container) error { - type manualPeeringDeps struct { - dig.In - - P2PManager *p2p.Manager - } - - if err := c.Provide(func(deps manualPeeringDeps) *manualpeering.Manager { - return manualpeering.NewManager(deps.P2PManager, Component.WorkerPool, Component.Logger) - }); err != nil { - return err - } - - type autoPeeringDeps struct { - dig.In - - Protocol *protocol.Protocol - P2PManager *p2p.Manager - Host host.Host - PeerDB *network.DB - } - - if err := c.Provide(func(deps autoPeeringDeps) *autopeering.Manager { - peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers) - if err != nil { - Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err) - } - - for _, multiAddr := range peersMultiAddresses { - bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr) - if err != nil { - Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err) - } - - if err := deps.PeerDB.UpdatePeer(bootstrapPeer); err != nil { - Component.LogErrorf("Failed to update bootstrap peer: %s", err) - } - } - - return autopeering.NewManager(deps.Protocol.LatestAPI().ProtocolParameters().NetworkName(), deps.P2PManager, deps.Host, deps.PeerDB, Component.Logger) - }); err != nil { - return err - } - type peerDatabaseResult struct { dig.Out @@ -251,7 +201,7 @@ func provide(c *dig.Container) error { connManager, err := connmgr.NewConnManager( ParamsP2P.ConnectionManager.LowWatermark, ParamsP2P.ConnectionManager.HighWatermark, - connmgr.WithGracePeriod(time.Minute), + connmgr.WithEmergencyTrim(true), ) if err != nil { Component.LogPanicf("unable to initialize connection manager: %s", err) @@ -263,6 +213,7 @@ func provide(c *dig.Container) error { libp2p.Transport(tcp.NewTCPTransport), libp2p.ConnectionManager(connManager), libp2p.NATPortMap(), + libp2p.DisableRelay(), // Define a custom address factory to inject external addresses to the DHT advertisements. libp2p.AddrsFactory(func() func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { var externalMultiAddrs []multiaddr.Multiaddr @@ -294,8 +245,31 @@ func provide(c *dig.Container) error { Component.LogPanic(err.Error()) } - return c.Provide(func(host host.Host, peerDB *network.DB) *p2p.Manager { - return p2p.NewManager(host, peerDB, Component.Logger) + type p2pManagerDeps struct { + dig.In + Host host.Host + PeerDB *network.DB + } + + return c.Provide(func(inDeps p2pManagerDeps) network.Manager { + + peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers) + if err != nil { + Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err) + } + + for _, multiAddr := range peersMultiAddresses { + bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr) + if err != nil { + Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err) + } + + if err := inDeps.PeerDB.UpdatePeer(bootstrapPeer); err != nil { + Component.LogErrorf("Failed to update bootstrap peer: %s", err) + } + } + + return p2p.NewManager(inDeps.Host, inDeps.PeerDB, ParamsP2P.Autopeering.MaxPeers, Component.Logger) }) } @@ -321,53 +295,27 @@ func configure() error { } // log the p2p events - deps.P2PManager.Events.NeighborAdded.Hook(func(neighbor *p2p.Neighbor) { - Component.LogInfof("Neighbor added: %s / %s", neighbor.PeerAddresses, neighbor.ID) - }, event.WithWorkerPool(Component.WorkerPool)) + deps.NetworkManager.OnNeighborAdded(func(neighbor network.Neighbor) { + Component.LogInfof("neighbor added: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID) + }) - deps.P2PManager.Events.NeighborRemoved.Hook(func(neighbor *p2p.Neighbor) { - Component.LogInfof("Neighbor removed: %s / %s", neighbor.PeerAddresses, neighbor.ID) - }, event.WithWorkerPool(Component.WorkerPool)) + deps.NetworkManager.OnNeighborRemoved(func(neighbor network.Neighbor) { + Component.LogInfof("neighbor removed: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID) + }) return nil } func run() error { if err := Component.Daemon().BackgroundWorker(Component.Name, func(ctx context.Context) { - deps.ManualPeeringMgr.Start() - - defer func() { - if err := deps.ManualPeeringMgr.Stop(); err != nil { - Component.LogErrorf("Failed to stop the manager: %s", err) - } - }() + defer deps.NetworkManager.Shutdown() - if ParamsP2P.Autopeering.Enabled { - if err := deps.AutoPeeringMgr.Start(ctx); err != nil { - Component.LogFatalf("Failed to start autopeering manager: %s", err) - } - - defer func() { - if err := deps.AutoPeeringMgr.Stop(); err != nil { - Component.LogErrorf("Failed to stop autopeering manager: %s", err) - } - }() + if err := deps.NetworkManager.Start(ctx, deps.Protocol.LatestAPI().ProtocolParameters().NetworkName()); err != nil { + Component.LogFatalf("Failed to start p2p manager: %s", err) } //nolint:contextcheck // false positive connectConfigKnownPeers() - <-ctx.Done() - }, daemon.PriorityManualPeering); err != nil { - Component.LogFatalf("Failed to start as daemon: %s", err) - } - - if err := Component.Daemon().BackgroundWorker(fmt.Sprintf("%s-P2PManager", Component.Name), func(ctx context.Context) { - defer deps.P2PManager.Shutdown() - defer func() { - if err := deps.P2PManager.P2PHost().Close(); err != nil { - Component.LogWarnf("Failed to close libp2p host: %+v", err) - } - }() <-ctx.Done() }, daemon.PriorityP2P); err != nil { @@ -405,7 +353,7 @@ func connectConfigKnownPeers() { Component.LogPanicf("invalid peer address info: %s", err) } - if err := deps.ManualPeeringMgr.AddPeers(multiAddr); err != nil { + if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil { Component.LogInfof("failed to add peer: %s, error: %s", multiAddr.String(), err) } } diff --git a/components/p2p/params.go b/components/p2p/params.go index faf741207..d93d1a08c 100644 --- a/components/p2p/params.go +++ b/components/p2p/params.go @@ -29,7 +29,7 @@ type ParametersP2P struct { IdentityPrivateKey string `default:"" usage:"private key used to derive the node identity (optional)"` Autopeering struct { - Enabled bool `default:"true" usage:"enable or disable autopeering"` + MaxPeers int `default:"2" usage:"the max number of autopeer connections. Set to 0 to disable autopeering."` } Database struct { diff --git a/components/protocol/component.go b/components/protocol/component.go index 798e603bb..17e69f957 100644 --- a/components/protocol/component.go +++ b/components/protocol/component.go @@ -14,7 +14,7 @@ import ( "github.com/iotaledger/hive.go/runtime/workerpool" "github.com/iotaledger/iota-core/pkg/daemon" "github.com/iotaledger/iota-core/pkg/model" - "github.com/iotaledger/iota-core/pkg/network/p2p" + "github.com/iotaledger/iota-core/pkg/network" "github.com/iotaledger/iota-core/pkg/protocol" "github.com/iotaledger/iota-core/pkg/protocol/engine/attestation/slotattestation" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter" @@ -115,7 +115,7 @@ func provide(c *dig.Container) error { DatabaseEngine hivedb.Engine `name:"databaseEngine"` ProtocolParameters []iotago.ProtocolParameters - P2PManager *p2p.Manager + NetworkManager network.Manager } return c.Provide(func(deps protocolDeps) *protocol.Protocol { @@ -132,7 +132,7 @@ func provide(c *dig.Container) error { return protocol.New( Component.Logger, workerpool.NewGroup("Protocol"), - deps.P2PManager, + deps.NetworkManager, protocol.WithBaseDirectory(ParamsDatabase.Path), protocol.WithStorageOptions( storage.WithDBEngine(deps.DatabaseEngine), diff --git a/pkg/daemon/shutdown.go b/pkg/daemon/shutdown.go index 6bf40fd5e..31b8cbd97 100644 --- a/pkg/daemon/shutdown.go +++ b/pkg/daemon/shutdown.go @@ -5,12 +5,8 @@ package daemon const ( PriorityCloseDatabase = iota // no dependencies - PriorityPeerDatabase PriorityP2P - PriorityManualPeering PriorityProtocol - PriorityBlockIssuer - PriorityActivity // depends on BlockIssuer PriorityRestAPI PriorityINX PriorityDashboardMetrics diff --git a/pkg/network/errors.go b/pkg/network/errors.go new file mode 100644 index 000000000..76cc89265 --- /dev/null +++ b/pkg/network/errors.go @@ -0,0 +1,14 @@ +package network + +import "github.com/iotaledger/hive.go/ierrors" + +var ( + // ErrNotRunning is returned when a peer is added to a stopped or not yet started network manager. + ErrNotRunning = ierrors.New("manager not running") + // ErrUnknownPeer is returned when the specified peer is not known to the network manager. + ErrUnknownPeer = ierrors.New("unknown neighbor") + // ErrLoopbackPeer is returned when the own peer is added. + ErrLoopbackPeer = ierrors.New("loopback connection not allowed") + // ErrDuplicatePeer is returned when the same peer is added more than once. + ErrDuplicatePeer = ierrors.New("already connected") +) diff --git a/pkg/network/manager.go b/pkg/network/manager.go new file mode 100644 index 000000000..f97785615 --- /dev/null +++ b/pkg/network/manager.go @@ -0,0 +1,32 @@ +package network + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + + "github.com/iotaledger/hive.go/runtime/event" +) + +type Manager interface { + Endpoint + + DialPeer(context.Context, *Peer) error + + OnNeighborAdded(func(Neighbor)) *event.Hook[func(Neighbor)] + OnNeighborRemoved(func(Neighbor)) *event.Hook[func(Neighbor)] + + AllNeighbors() []Neighbor + + DropNeighbor(peer.ID) error + NeighborExists(peer.ID) bool + + P2PHost() host.Host + + Start(ctx context.Context, networkID string) error + Shutdown() + + AddManualPeers(...multiaddr.Multiaddr) error +} diff --git a/pkg/network/neighbor.go b/pkg/network/neighbor.go new file mode 100644 index 000000000..5a4336fde --- /dev/null +++ b/pkg/network/neighbor.go @@ -0,0 +1,7 @@ +package network + +type Neighbor interface { + Peer() *Peer + PacketsRead() uint64 + PacketsWritten() uint64 +} diff --git a/pkg/network/autopeering/autopeering.go b/pkg/network/p2p/autopeering/autopeering.go similarity index 78% rename from pkg/network/autopeering/autopeering.go rename to pkg/network/p2p/autopeering/autopeering.go index cdf22ebae..3695460e4 100644 --- a/pkg/network/autopeering/autopeering.go +++ b/pkg/network/p2p/autopeering/autopeering.go @@ -16,12 +16,12 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/log" "github.com/iotaledger/iota-core/pkg/network" - "github.com/iotaledger/iota-core/pkg/network/p2p" ) type Manager struct { namespace string - p2pManager *p2p.Manager + maxPeers int + networkManager network.Manager logger log.Logger host host.Host peerDB *network.DB @@ -34,20 +34,25 @@ type Manager struct { } // NewManager creates a new autopeering manager. -func NewManager(networkID string, p2pManager *p2p.Manager, host host.Host, peerDB *network.DB, logger log.Logger) *Manager { +func NewManager(maxPeers int, networkManager network.Manager, host host.Host, peerDB *network.DB, logger log.Logger) *Manager { return &Manager{ - namespace: fmt.Sprintf("/iota/%s/1.0.0", networkID), - p2pManager: p2pManager, - host: host, - peerDB: peerDB, - logger: logger, + maxPeers: maxPeers, + networkManager: networkManager, + host: host, + peerDB: peerDB, + logger: logger, } } +func (m *Manager) MaxNeighbors() int { + return m.maxPeers +} + // Start starts the autopeering manager. -func (m *Manager) Start(ctx context.Context) (err error) { +func (m *Manager) Start(ctx context.Context, networkID string) (err error) { //nolint:contextcheck m.startOnce.Do(func() { + m.namespace = fmt.Sprintf("/iota/%s/1.0.0", networkID) m.ctx, m.stopFunc = context.WithCancel(ctx) kademliaDHT, innerErr := dht.New(m.ctx, m.host, dht.Mode(dht.ModeServer)) if innerErr != nil { @@ -117,6 +122,12 @@ func (m *Manager) discoveryLoop() { } func (m *Manager) discoverAndDialPeers() { + //peersToFind := m.maxPeers - len(m.p2pManager.AutopeeredNeighbors()) + //if peersToFind <= 0 { + // m.logger.LogDebugf("Enough autopeering peers connected, not discovering new ones") + // return + //} + findCtx, cancel := context.WithTimeout(m.ctx, 10*time.Second) defer cancel() @@ -128,21 +139,27 @@ func (m *Manager) discoverAndDialPeers() { } for peerAddrInfo := range peerChan { + //if peersToFind <= 0 { + // m.logger.LogDebugf("Enough new autopeering peers connected") + // return + //} + // Do not self-dial. if peerAddrInfo.ID == m.host.ID() { continue } // Do not try to dial already connected peers. - if m.p2pManager.NeighborExists(peerAddrInfo.ID) { + if m.networkManager.NeighborExists(peerAddrInfo.ID) { continue } m.logger.LogDebugf("Found peer: %s", peerAddrInfo) peer := network.NewPeerFromAddrInfo(&peerAddrInfo) - if err := m.p2pManager.DialPeer(m.ctx, peer); err != nil { + if err := m.networkManager.DialPeer(m.ctx, peer); err != nil { m.logger.LogWarnf("Failed to dial peer %s: %s", peerAddrInfo, err) } + //peersToFind-- } } diff --git a/pkg/network/p2p/errors.go b/pkg/network/p2p/errors.go deleted file mode 100644 index 1951e6d2b..000000000 --- a/pkg/network/p2p/errors.go +++ /dev/null @@ -1,16 +0,0 @@ -package p2p - -import "github.com/iotaledger/hive.go/ierrors" - -var ( - // ErrNotRunning is returned when a neighbor is added to a stopped or not yet started p2p manager. - ErrNotRunning = ierrors.New("manager not running") - // ErrUnknownNeighbor is returned when the specified neighbor is not known to the p2p manager. - ErrUnknownNeighbor = ierrors.New("unknown neighbor") - // ErrLoopbackNeighbor is returned when the own peer is specified as a neighbor. - ErrLoopbackNeighbor = ierrors.New("loopback connection not allowed") - // ErrDuplicateNeighbor is returned when the same peer is added more than once as a neighbor. - ErrDuplicateNeighbor = ierrors.New("already connected") - // ErrNeighborQueueFull is returned when the send queue is already full. - ErrNeighborQueueFull = ierrors.New("send queue is full") -) diff --git a/pkg/network/p2p/events.go b/pkg/network/p2p/events.go deleted file mode 100644 index 5a1aa6a06..000000000 --- a/pkg/network/p2p/events.go +++ /dev/null @@ -1,22 +0,0 @@ -package p2p - -import ( - "github.com/iotaledger/hive.go/runtime/event" -) - -// NeighborEvents is a collection of events specific for a particular neighbors group, e.g "manual" or "auto". -type NeighborEvents struct { - // Fired when a neighbor connection has been established. - NeighborAdded *event.Event1[*Neighbor] - - // Fired when a neighbor has been removed. - NeighborRemoved *event.Event1[*Neighbor] -} - -// NewNeighborEvents returns a new instance of NeighborGroupEvents. -func NewNeighborEvents() *NeighborEvents { - return &NeighborEvents{ - NeighborAdded: event.New1[*Neighbor](), - NeighborRemoved: event.New1[*Neighbor](), - } -} diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 7ae469b91..15aedd5ab 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "time" "github.com/libp2p/go-libp2p/core/host" p2pnetwork "github.com/libp2p/go-libp2p/core/network" @@ -14,49 +13,29 @@ import ( "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/log" + "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/syncutils" "github.com/iotaledger/iota-core/pkg/network" + "github.com/iotaledger/iota-core/pkg/network/p2p/autopeering" + "github.com/iotaledger/iota-core/pkg/network/p2p/manualpeering" ) const ( - protocolID = "iota-core/1.0.0" - defaultConnectionTimeout = 5 * time.Second // timeout after which the connection must be established. + protocolID = "iota-core/1.0.0" ) -// ConnectPeerOption defines an option for the DialPeer and AcceptPeer methods. -type ConnectPeerOption func(conf *connectPeerConfig) - -type connectPeerConfig struct { - useDefaultTimeout bool -} - // ProtocolHandler holds callbacks to handle a protocol. type ProtocolHandler struct { PacketFactory func() proto.Message PacketHandler func(peer.ID, proto.Message) error } -func buildConnectPeerConfig(opts []ConnectPeerOption) *connectPeerConfig { - conf := &connectPeerConfig{ - useDefaultTimeout: true, - } - for _, o := range opts { - o(conf) - } - - return conf -} - -// WithNoDefaultTimeout returns a ConnectPeerOption that disables the default timeout for dial or accept. -func WithNoDefaultTimeout() ConnectPeerOption { - return func(conf *connectPeerConfig) { - conf.useDefaultTimeout = false - } -} - // The Manager handles the connected neighbors. type Manager struct { - Events *NeighborEvents + // Fired when a neighbor connection has been established. + neighborAdded *event.Event1[network.Neighbor] + // Fired when a neighbor has been removed. + neighborRemoved *event.Event1[network.Neighbor] libp2pHost host.Host peerDB *network.DB @@ -66,22 +45,31 @@ type Manager struct { shutdownMutex syncutils.RWMutex isShutdown bool - neighbors *shrinkingmap.ShrinkingMap[peer.ID, *Neighbor] + neighbors *shrinkingmap.ShrinkingMap[peer.ID, *neighbor] protocolHandler *ProtocolHandler protocolHandlerMutex syncutils.RWMutex + + autoPeering *autopeering.Manager + manualPeering *manualpeering.Manager } +var _ network.Manager = (*Manager)(nil) + // NewManager creates a new Manager. -func NewManager(libp2pHost host.Host, peerDB *network.DB, logger log.Logger) *Manager { +func NewManager(libp2pHost host.Host, peerDB *network.DB, maxAutopeeringPeers int, logger log.Logger) *Manager { m := &Manager{ - libp2pHost: libp2pHost, - peerDB: peerDB, - logger: logger, - Events: NewNeighborEvents(), - neighbors: shrinkingmap.New[peer.ID, *Neighbor](), + libp2pHost: libp2pHost, + peerDB: peerDB, + logger: logger, + neighborAdded: event.New1[network.Neighbor](), + neighborRemoved: event.New1[network.Neighbor](), + neighbors: shrinkingmap.New[peer.ID, *neighbor](), } + m.autoPeering = autopeering.NewManager(maxAutopeeringPeers, m, libp2pHost, peerDB, logger) + m.manualPeering = manualpeering.NewManager(m, logger) + return m } @@ -107,8 +95,16 @@ func (m *Manager) UnregisterProtocol() { m.protocolHandler = nil } +func (m *Manager) OnNeighborAdded(handler func(network.Neighbor)) *event.Hook[func(network.Neighbor)] { + return m.neighborAdded.Hook(handler) +} + +func (m *Manager) OnNeighborRemoved(handler func(network.Neighbor)) *event.Hook[func(network.Neighbor)] { + return m.neighborRemoved.Hook(handler) +} + // DialPeer connects to a peer. -func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...ConnectPeerOption) error { +func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { m.protocolHandlerMutex.RLock() defer m.protocolHandlerMutex.RUnlock() @@ -117,19 +113,12 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...Conn } if m.NeighborExists(peer.ID) { - return ierrors.Wrapf(ErrDuplicateNeighbor, "peer %s already exists", peer.ID) + return ierrors.Wrapf(network.ErrDuplicatePeer, "peer %s already exists", peer.ID) } - conf := buildConnectPeerConfig(opts) - // Adds the peer's multiaddresses to the peerstore, so that they can be used for dialing. m.libp2pHost.Peerstore().AddAddrs(peer.ID, peer.PeerAddresses, peerstore.ConnectedAddrTTL) cancelCtx := ctx - if conf.useDefaultTimeout { - var cancel context.CancelFunc - cancelCtx, cancel = context.WithTimeout(ctx, defaultConnectionTimeout) - defer cancel() - } stream, err := m.P2PHost().NewStream(cancelCtx, peer.ID, protocolID) if err != nil { @@ -160,6 +149,17 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...Conn return nil } +// Start starts the manager and initiates manual- and autopeering. +func (m *Manager) Start(ctx context.Context, networkID string) error { + m.manualPeering.Start() + + if m.autoPeering.MaxNeighbors() > 0 { + return m.autoPeering.Start(ctx, networkID) + } + + return nil +} + // Shutdown stops the manager and closes all established connections. func (m *Manager) Shutdown() { m.shutdownMutex.Lock() @@ -169,9 +169,26 @@ func (m *Manager) Shutdown() { return } m.isShutdown = true + + if err := m.autoPeering.Stop(); err != nil { + m.logger.LogErrorf("failed to stop autopeering: %s", err) + } + + if err := m.manualPeering.Stop(); err != nil { + m.logger.LogErrorf("failed to stop manualpeering: %s", err) + } + m.dropAllNeighbors() m.UnregisterProtocol() + + if err := m.libp2pHost.Close(); err != nil { + m.logger.LogErrorf("failed to close libp2p host: %s", err) + } +} + +func (m *Manager) AddManualPeers(peers ...multiaddr.Multiaddr) error { + return m.manualPeering.AddPeers(peers...) } // LocalPeerID returns the local peer ID. @@ -197,11 +214,11 @@ func (m *Manager) DropNeighbor(id peer.ID) error { // Send sends a message with the specific protocol to a set of neighbors. func (m *Manager) Send(packet proto.Message, to ...peer.ID) { - var neighbors []*Neighbor + var neighbors []*neighbor if len(to) == 0 { - neighbors = m.AllNeighbors() + neighbors = m.allNeighbors() } else { - neighbors = m.NeighborsByID(to) + neighbors = m.neighborsByID(to) } for _, nbr := range neighbors { @@ -209,19 +226,29 @@ func (m *Manager) Send(packet proto.Message, to ...peer.ID) { } } -// AllNeighbors returns all the neighbors that are currently connected. -func (m *Manager) AllNeighbors() []*Neighbor { +func (m *Manager) AllNeighbors() []network.Neighbor { + neighbors := m.allNeighbors() + result := make([]network.Neighbor, len(neighbors)) + for i, n := range neighbors { + result[i] = n + } + + return result +} + +// allNeighbors returns all the neighbors that are currently connected. +func (m *Manager) allNeighbors() []*neighbor { return m.neighbors.Values() } -// AllNeighborsIDs returns all the ids of the neighbors that are currently connected. -func (m *Manager) AllNeighborsIDs() []peer.ID { +// allNeighborsIDs returns all the ids of the neighbors that are currently connected. +func (m *Manager) allNeighborsIDs() []peer.ID { return m.neighbors.Keys() } -// NeighborsByID returns all the neighbors that are currently connected corresponding to the supplied ids. -func (m *Manager) NeighborsByID(ids []peer.ID) []*Neighbor { - result := make([]*Neighbor, 0, len(ids)) +// neighborsByID returns all the neighbors that are currently connected corresponding to the supplied ids. +func (m *Manager) neighborsByID(ids []peer.ID) []*neighbor { + result := make([]*neighbor, 0, len(ids)) if len(ids) == 0 { return result } @@ -258,6 +285,7 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { ID: stream.Conn().RemotePeer(), Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()}, } + networkPeer := network.NewPeerFromAddrInfo(peerAddrInfo) if err := m.peerDB.UpdatePeer(networkPeer); err != nil { m.logger.LogErrorf("failed to update peer in peer database, peerID: %s, error: %s", networkPeer.ID, err) @@ -275,16 +303,16 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { } func (m *Manager) closeStream(s p2pnetwork.Stream) { - if err := s.Close(); err != nil { + if err := s.Reset(); err != nil { m.logger.LogWarnf("close error, error: %s", err) } } // neighborWithGroup returns neighbor by ID and group. -func (m *Manager) neighbor(id peer.ID) (*Neighbor, error) { +func (m *Manager) neighbor(id peer.ID) (*neighbor, error) { nbr, ok := m.neighbors.Get(id) if !ok { - return nil, ErrUnknownNeighbor + return nil, network.ErrUnknownPeer } return nbr, nil @@ -292,19 +320,19 @@ func (m *Manager) neighbor(id peer.ID) (*Neighbor, error) { func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { if peer.ID == m.libp2pHost.ID() { - return ierrors.WithStack(ErrLoopbackNeighbor) + return ierrors.WithStack(network.ErrLoopbackPeer) } m.shutdownMutex.RLock() defer m.shutdownMutex.RUnlock() if m.isShutdown { - return ErrNotRunning + return network.ErrNotRunning } if m.NeighborExists(peer.ID) { - return ierrors.WithStack(ErrDuplicateNeighbor) + return ierrors.WithStack(network.ErrDuplicatePeer) } // create and add the neighbor - nbr := NewNeighbor(m.logger, peer, ps, func(nbr *Neighbor, packet proto.Message) { + nbr := newNeighbor(m.logger, peer, ps, func(nbr *neighbor, packet proto.Message) { m.protocolHandlerMutex.RLock() defer m.protocolHandlerMutex.RUnlock() @@ -312,15 +340,15 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { nbr.logger.LogError("Can't handle packet as no protocol is registered") return } - if err := m.protocolHandler.PacketHandler(nbr.ID, packet); err != nil { + if err := m.protocolHandler.PacketHandler(nbr.Peer().ID, packet); err != nil { nbr.logger.LogDebugf("Can't handle packet, error: %s", err) } - }, func(nbr *Neighbor) { + }, func(nbr *neighbor) { m.deleteNeighbor(nbr) - m.Events.NeighborRemoved.Trigger(nbr) + m.neighborRemoved.Trigger(nbr) }) if err := m.setNeighbor(nbr); err != nil { - if resetErr := ps.Close(); resetErr != nil { + if resetErr := ps.Reset(); resetErr != nil { nbr.logger.LogErrorf("error closing stream, error: %s", resetErr) } @@ -328,8 +356,8 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { } nbr.readLoop() nbr.writeLoop() - nbr.logger.LogInfof("Connection established to %s", nbr.ID) - m.Events.NeighborAdded.Trigger(nbr) + nbr.logger.LogInfof("Connection established to %s", nbr.Peer().ID) + m.neighborAdded.Trigger(nbr) return nil } @@ -338,15 +366,18 @@ func (m *Manager) NeighborExists(id peer.ID) bool { return m.neighbors.Has(id) } -func (m *Manager) deleteNeighbor(nbr *Neighbor) { - m.neighbors.Delete(nbr.ID) +func (m *Manager) deleteNeighbor(nbr *neighbor) { + // Close the connection to the peer. + _ = m.libp2pHost.Network().ClosePeer(nbr.Peer().ID) + + m.neighbors.Delete(nbr.Peer().ID) } -func (m *Manager) setNeighbor(nbr *Neighbor) error { +func (m *Manager) setNeighbor(nbr *neighbor) error { var err error - m.neighbors.Compute(nbr.ID, func(currentValue *Neighbor, exists bool) *Neighbor { + m.neighbors.Compute(nbr.Peer().ID, func(currentValue *neighbor, exists bool) *neighbor { if exists { - err = ierrors.WithStack(ErrDuplicateNeighbor) + err = ierrors.WithStack(network.ErrDuplicatePeer) return currentValue } @@ -357,7 +388,7 @@ func (m *Manager) setNeighbor(nbr *Neighbor) error { } func (m *Manager) dropAllNeighbors() { - neighborsList := m.AllNeighbors() + neighborsList := m.allNeighbors() for _, nbr := range neighborsList { nbr.Close() } diff --git a/pkg/network/manualpeering/manualpeering.go b/pkg/network/p2p/manualpeering/manualpeering.go similarity index 81% rename from pkg/network/manualpeering/manualpeering.go rename to pkg/network/p2p/manualpeering/manualpeering.go index 45c8f8fd9..5244bb01c 100644 --- a/pkg/network/manualpeering/manualpeering.go +++ b/pkg/network/p2p/manualpeering/manualpeering.go @@ -13,9 +13,11 @@ import ( "github.com/iotaledger/hive.go/log" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/syncutils" - "github.com/iotaledger/hive.go/runtime/workerpool" "github.com/iotaledger/iota-core/pkg/network" - "github.com/iotaledger/iota-core/pkg/network/p2p" +) + +const ( + manualPeerProtectionTag = "manual-peering" ) // Manager is the core entity in the manual peering package. @@ -27,7 +29,7 @@ import ( // manager will make sure gossip drops that connection. // Manager also subscribes to the gossip events and in case the connection with a manual peer fails it will reconnect. type Manager struct { - p2pm *p2p.Manager + networkManager network.Manager logger log.Logger startOnce sync.Once isStarted atomic.Bool @@ -37,20 +39,18 @@ type Manager struct { reconnectInterval time.Duration knownPeersMutex syncutils.RWMutex knownPeers map[peer.ID]*network.Peer - workerPool *workerpool.WorkerPool - onGossipNeighborRemovedHook *event.Hook[func(*p2p.Neighbor)] - onGossipNeighborAddedHook *event.Hook[func(*p2p.Neighbor)] + onGossipNeighborRemovedHook *event.Hook[func(network.Neighbor)] + onGossipNeighborAddedHook *event.Hook[func(network.Neighbor)] } // NewManager initializes a new Manager instance. -func NewManager(p2pm *p2p.Manager, workerPool *workerpool.WorkerPool, logger log.Logger) *Manager { +func NewManager(networkManager network.Manager, logger log.Logger) *Manager { m := &Manager{ - p2pm: p2pm, + networkManager: networkManager, logger: logger, reconnectInterval: network.DefaultReconnectInterval, knownPeers: make(map[peer.ID]*network.Peer), - workerPool: workerPool, } return m @@ -84,7 +84,10 @@ func (m *Manager) RemovePeer(peerID peer.ID) error { m.knownPeersMutex.Unlock() <-kp.DoneCh - if err := m.p2pm.DropNeighbor(peerID); err != nil && !ierrors.Is(err, p2p.ErrUnknownNeighbor) { + + m.networkManager.P2PHost().ConnManager().Unprotect(peerID, manualPeerProtectionTag) + + if err := m.networkManager.DropNeighbor(peerID); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) { return ierrors.Wrapf(err, "failed to drop known peer %s in the gossip layer", peerID) } @@ -149,13 +152,12 @@ func (m *Manager) GetPeers(opts ...GetPeersOption) []*network.PeerDescriptor { // Calling multiple times has no effect. func (m *Manager) Start() { m.startOnce.Do(func() { - m.workerPool.Start() - m.onGossipNeighborRemovedHook = m.p2pm.Events.NeighborRemoved.Hook(func(neighbor *p2p.Neighbor) { + m.onGossipNeighborRemovedHook = m.networkManager.OnNeighborRemoved(func(neighbor network.Neighbor) { m.onGossipNeighborRemoved(neighbor) - }, event.WithWorkerPool(m.workerPool)) - m.onGossipNeighborAddedHook = m.p2pm.Events.NeighborAdded.Hook(func(neighbor *p2p.Neighbor) { + }) + m.onGossipNeighborAddedHook = m.networkManager.OnNeighborAdded(func(neighbor network.Neighbor) { m.onGossipNeighborAdded(neighbor) - }, event.WithWorkerPool(m.workerPool)) + }) m.isStarted.Store(true) }) } @@ -178,6 +180,14 @@ func (m *Manager) Stop() (err error) { return err } +func (m *Manager) IsPeerKnown(id peer.ID) bool { + m.knownPeersMutex.RLock() + defer m.knownPeersMutex.RUnlock() + + _, exists := m.knownPeers[id] + return exists +} + func (m *Manager) addPeer(peerAddr multiaddr.Multiaddr) error { if !m.isStarted.Load() { return ierrors.New("manual peering manager hasn't been started yet") @@ -196,7 +206,7 @@ func (m *Manager) addPeer(peerAddr multiaddr.Multiaddr) error { } // Do not add self - if p.ID == m.p2pm.P2PHost().ID() { + if p.ID == m.networkManager.P2PHost().ID() { return ierrors.New("not adding self to the list of known peers") } @@ -240,7 +250,7 @@ func (m *Manager) keepPeerConnected(peer *network.Peer) { m.logger.LogInfof("Peer is disconnected, calling gossip layer to establish the connection, peerID: %s", peer.ID) var err error - if err = m.p2pm.DialPeer(ctx, peer); err != nil && !ierrors.Is(err, p2p.ErrDuplicateNeighbor) && !ierrors.Is(err, context.Canceled) { + if err = m.networkManager.DialPeer(ctx, peer); err != nil && !ierrors.Is(err, network.ErrDuplicatePeer) && !ierrors.Is(err, context.Canceled) { m.logger.LogErrorf("Failed to connect a neighbor in the gossip layer, peerID: %s, error: %s", peer.ID, err) } } @@ -253,22 +263,23 @@ func (m *Manager) keepPeerConnected(peer *network.Peer) { } } -func (m *Manager) onGossipNeighborRemoved(neighbor *p2p.Neighbor) { +func (m *Manager) onGossipNeighborRemoved(neighbor network.Neighbor) { m.changeNeighborStatus(neighbor, network.ConnStatusDisconnected) } -func (m *Manager) onGossipNeighborAdded(neighbor *p2p.Neighbor) { +func (m *Manager) onGossipNeighborAdded(neighbor network.Neighbor) { m.changeNeighborStatus(neighbor, network.ConnStatusConnected) - m.logger.LogInfof("Gossip layer successfully connected with the peer %s", neighbor.Peer) + m.logger.LogInfof("Gossip layer successfully connected with the peer %s", neighbor.Peer()) } -func (m *Manager) changeNeighborStatus(neighbor *p2p.Neighbor, connStatus network.ConnectionStatus) { +func (m *Manager) changeNeighborStatus(neighbor network.Neighbor, connStatus network.ConnectionStatus) { m.knownPeersMutex.RLock() defer m.knownPeersMutex.RUnlock() - kp, exists := m.knownPeers[neighbor.ID] + kp, exists := m.knownPeers[neighbor.Peer().ID] if !exists { return } kp.SetConnStatus(connStatus) + m.networkManager.P2PHost().ConnManager().Protect(neighbor.Peer().ID, manualPeerProtectionTag) } diff --git a/pkg/network/p2p/neighbor.go b/pkg/network/p2p/neighbor.go index 445826453..8ac7e5b32 100644 --- a/pkg/network/p2p/neighbor.go +++ b/pkg/network/p2p/neighbor.go @@ -23,13 +23,13 @@ type queuedPacket struct { } type ( - PacketReceivedFunc func(neighbor *Neighbor, packet proto.Message) - NeighborDisconnectedFunc func(neighbor *Neighbor) + PacketReceivedFunc func(neighbor *neighbor, packet proto.Message) + NeighborDisconnectedFunc func(neighbor *neighbor) ) -// Neighbor describes the established p2p connection to another peer. -type Neighbor struct { - *network.Peer +// neighbor describes the established p2p connection to another peer. +type neighbor struct { + peer *network.Peer logger log.Logger @@ -47,12 +47,14 @@ type Neighbor struct { sendQueue chan *queuedPacket } -// NewNeighbor creates a new neighbor from the provided peer and connection. -func NewNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream, packetReceivedCallback PacketReceivedFunc, disconnectedCallback NeighborDisconnectedFunc) *Neighbor { +var _ network.Neighbor = (*neighbor)(nil) + +// newNeighbor creates a new neighbor from the provided peer and connection. +func newNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream, packetReceivedCallback PacketReceivedFunc, disconnectedCallback NeighborDisconnectedFunc) *neighbor { ctx, cancel := context.WithCancel(context.Background()) - n := &Neighbor{ - Peer: p, + n := &neighbor{ + peer: p, logger: parentLogger.NewChildLogger("peer", true), packetReceivedFunc: packetReceivedCallback, disconnectedFunc: disconnectedCallback, @@ -62,12 +64,16 @@ func NewNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream sendQueue: make(chan *queuedPacket, NeighborsSendQueueSize), } - n.logger.LogInfo("created", "ID", n.ID) + n.logger.LogInfo("created", "ID", n.Peer().ID) return n } -func (n *Neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) { +func (n *neighbor) Peer() *network.Peer { + return n.peer +} + +func (n *neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) { select { case n.sendQueue <- &queuedPacket{protocolID: protocolID, packet: packet}: default: @@ -76,21 +82,21 @@ func (n *Neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) { } // PacketsRead returns number of packets this neighbor has received. -func (n *Neighbor) PacketsRead() uint64 { +func (n *neighbor) PacketsRead() uint64 { return n.stream.packetsRead.Load() } // PacketsWritten returns number of packets this neighbor has sent. -func (n *Neighbor) PacketsWritten() uint64 { +func (n *neighbor) PacketsWritten() uint64 { return n.stream.packetsWritten.Load() } // ConnectionEstablished returns the connection established. -func (n *Neighbor) ConnectionEstablished() time.Time { +func (n *neighbor) ConnectionEstablished() time.Time { return n.stream.Stat().Opened } -func (n *Neighbor) readLoop() { +func (n *neighbor) readLoop() { n.wg.Add(1) go func(stream *PacketsStream) { defer n.wg.Done() @@ -121,7 +127,7 @@ func (n *Neighbor) readLoop() { }(n.stream) } -func (n *Neighbor) writeLoop() { +func (n *neighbor) writeLoop() { n.wg.Add(1) go func() { defer n.wg.Done() @@ -132,7 +138,7 @@ func (n *Neighbor) writeLoop() { return case sendPacket := <-n.sendQueue: if n.stream == nil { - n.logger.LogWarnf("send error, no stream for protocol, peerID: %s, protocol: %s", n.ID, sendPacket.protocolID) + n.logger.LogWarnf("send error, no stream for protocol, peerID: %s, protocol: %s", n.Peer().ID, sendPacket.protocolID) if disconnectErr := n.disconnect(); disconnectErr != nil { n.logger.LogWarnf("Failed to disconnect, error: %s", disconnectErr) } @@ -140,7 +146,7 @@ func (n *Neighbor) writeLoop() { return } if err := n.stream.WritePacket(sendPacket.packet); err != nil { - n.logger.LogWarnf("send error, peerID: %s, error: %s", n.ID, err) + n.logger.LogWarnf("send error, peerID: %s, error: %s", n.Peer().ID, err) if disconnectErr := n.disconnect(); disconnectErr != nil { n.logger.LogWarnf("Failed to disconnect, error: %s", disconnectErr) } @@ -153,7 +159,7 @@ func (n *Neighbor) writeLoop() { } // Close closes the connection with the neighbor. -func (n *Neighbor) Close() { +func (n *neighbor) Close() { if err := n.disconnect(); err != nil { n.logger.LogErrorf("Failed to disconnect the neighbor, error: %s", err) } @@ -161,13 +167,13 @@ func (n *Neighbor) Close() { n.logger.UnsubscribeFromParentLogger() } -func (n *Neighbor) disconnect() (err error) { +func (n *neighbor) disconnect() (err error) { n.disconnectOnce.Do(func() { // Stop the loops n.loopCtxCancel() // Close all streams - if streamErr := n.stream.Close(); streamErr != nil { + if streamErr := n.stream.Reset(); streamErr != nil { err = ierrors.WithStack(streamErr) } n.logger.LogInfof("Stream closed, protocol: %s", n.stream.Protocol()) diff --git a/pkg/network/p2p/neighbor_test.go b/pkg/network/p2p/neighbor_test.go index 59db5844a..be8fca69a 100644 --- a/pkg/network/p2p/neighbor_test.go +++ b/pkg/network/p2p/neighbor_test.go @@ -51,7 +51,7 @@ func TestNeighborWrite(t *testing.T) { defer teardown() var countA uint32 - neighborA := newTestNeighbor("A", a, func(neighbor *Neighbor, packet proto.Message) { + neighborA := newTestNeighbor("A", a, func(neighbor *neighbor, packet proto.Message) { _ = packet.(*p2pproto.Negotiation) atomic.AddUint32(&countA, 1) }) @@ -59,7 +59,7 @@ func TestNeighborWrite(t *testing.T) { neighborA.readLoop() var countB uint32 - neighborB := newTestNeighbor("B", b, func(neighbor *Neighbor, packet proto.Message) { + neighborB := newTestNeighbor("B", b, func(neighbor *neighbor, packet proto.Message) { _ = packet.(*p2pproto.Negotiation) atomic.AddUint32(&countB, 1) }) @@ -75,15 +75,15 @@ func TestNeighborWrite(t *testing.T) { assert.Eventually(t, func() bool { return atomic.LoadUint32(&countB) == 1 }, time.Second, 10*time.Millisecond) } -func newTestNeighbor(name string, stream p2pnetwork.Stream, packetReceivedFunc ...PacketReceivedFunc) *Neighbor { +func newTestNeighbor(name string, stream p2pnetwork.Stream, packetReceivedFunc ...PacketReceivedFunc) *neighbor { var packetReceived PacketReceivedFunc if len(packetReceivedFunc) > 0 { packetReceived = packetReceivedFunc[0] } else { - packetReceived = func(neighbor *Neighbor, packet proto.Message) {} + packetReceived = func(neighbor *neighbor, packet proto.Message) {} } - return NewNeighbor(lo.Return1(testLogger.NewChildLogger(name)), newTestPeer(name), NewPacketsStream(stream, packetFactory), packetReceived, func(neighbor *Neighbor) {}) + return newNeighbor(lo.Return1(testLogger.NewChildLogger(name)), newTestPeer(name), NewPacketsStream(stream, packetFactory), packetReceived, func(neighbor *neighbor) {}) } func packetFactory() proto.Message { From 55bd20250036c7791ed85f4a5d07c389062cb887 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 10:25:56 +0100 Subject: [PATCH 12/18] Limit the amount of autopeering neighbors --- components/p2p/params.go | 2 +- go.mod | 2 +- go.sum | 4 +- pkg/network/errors.go | 2 + pkg/network/manager.go | 1 + pkg/network/p2p/autopeering/autopeering.go | 20 +++---- pkg/network/p2p/manager.go | 59 ++++++++++++++++++- .../p2p/manualpeering/manualpeering.go | 9 +-- tools/gendoc/go.mod | 2 +- tools/gendoc/go.sum | 1 + 10 files changed, 80 insertions(+), 22 deletions(-) diff --git a/components/p2p/params.go b/components/p2p/params.go index d93d1a08c..9e3e03cb9 100644 --- a/components/p2p/params.go +++ b/components/p2p/params.go @@ -29,7 +29,7 @@ type ParametersP2P struct { IdentityPrivateKey string `default:"" usage:"private key used to derive the node identity (optional)"` Autopeering struct { - MaxPeers int `default:"2" usage:"the max number of autopeer connections. Set to 0 to disable autopeering."` + MaxPeers int `default:"5" usage:"the max number of autopeer connections. Set to 0 to disable autopeering."` } Database struct { diff --git a/go.mod b/go.mod index 62d74289e..c30e0bb02 100644 --- a/go.mod +++ b/go.mod @@ -144,7 +144,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pokt-network/smt v0.9.2 // indirect github.com/polydawn/refmt v0.89.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.47.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/quic-go/qpack v0.4.0 // indirect diff --git a/go.sum b/go.sum index 1b7425a4e..201ee1bba 100644 --- a/go.sum +++ b/go.sum @@ -547,8 +547,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1: github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= diff --git a/pkg/network/errors.go b/pkg/network/errors.go index 76cc89265..25526c773 100644 --- a/pkg/network/errors.go +++ b/pkg/network/errors.go @@ -11,4 +11,6 @@ var ( ErrLoopbackPeer = ierrors.New("loopback connection not allowed") // ErrDuplicatePeer is returned when the same peer is added more than once. ErrDuplicatePeer = ierrors.New("already connected") + // ErrMaxAutopeeringPeersReached is returned when the maximum number of autopeering peers is reached. + ErrMaxAutopeeringPeersReached = ierrors.New("max autopeering peers reached") ) diff --git a/pkg/network/manager.go b/pkg/network/manager.go index f97785615..98ccae228 100644 --- a/pkg/network/manager.go +++ b/pkg/network/manager.go @@ -19,6 +19,7 @@ type Manager interface { OnNeighborRemoved(func(Neighbor)) *event.Hook[func(Neighbor)] AllNeighbors() []Neighbor + AutopeeringNeighborsCount() int DropNeighbor(peer.ID) error NeighborExists(peer.ID) bool diff --git a/pkg/network/p2p/autopeering/autopeering.go b/pkg/network/p2p/autopeering/autopeering.go index 3695460e4..be77e3ae3 100644 --- a/pkg/network/p2p/autopeering/autopeering.go +++ b/pkg/network/p2p/autopeering/autopeering.go @@ -122,11 +122,11 @@ func (m *Manager) discoveryLoop() { } func (m *Manager) discoverAndDialPeers() { - //peersToFind := m.maxPeers - len(m.p2pManager.AutopeeredNeighbors()) - //if peersToFind <= 0 { - // m.logger.LogDebugf("Enough autopeering peers connected, not discovering new ones") - // return - //} + peersToFind := m.maxPeers - m.networkManager.AutopeeringNeighborsCount() + if peersToFind <= 0 { + m.logger.LogDebug("Enough autopeering peers connected, not discovering new ones") + return + } findCtx, cancel := context.WithTimeout(m.ctx, 10*time.Second) defer cancel() @@ -139,10 +139,10 @@ func (m *Manager) discoverAndDialPeers() { } for peerAddrInfo := range peerChan { - //if peersToFind <= 0 { - // m.logger.LogDebugf("Enough new autopeering peers connected") - // return - //} + if peersToFind <= 0 { + m.logger.LogDebug("Enough new autopeering peers connected") + return + } // Do not self-dial. if peerAddrInfo.ID == m.host.ID() { @@ -160,6 +160,6 @@ func (m *Manager) discoverAndDialPeers() { if err := m.networkManager.DialPeer(m.ctx, peer); err != nil { m.logger.LogWarnf("Failed to dial peer %s: %s", peerAddrInfo, err) } - //peersToFind-- + peersToFind-- } } diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 15aedd5ab..72081d281 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -12,6 +12,7 @@ import ( "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/log" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/syncutils" @@ -52,6 +53,8 @@ type Manager struct { autoPeering *autopeering.Manager manualPeering *manualpeering.Manager + + allowPeerMutex syncutils.Mutex } var _ network.Manager = (*Manager)(nil) @@ -116,6 +119,13 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { return ierrors.Wrapf(network.ErrDuplicatePeer, "peer %s already exists", peer.ID) } + m.allowPeerMutex.Lock() + defer m.allowPeerMutex.Unlock() + + if !m.allowPeer(peer.ID) { + return ierrors.Wrapf(network.ErrMaxAutopeeringPeersReached, "peer %s is not allowed", peer.ID) + } + // Adds the peer's multiaddresses to the peerstore, so that they can be used for dialing. m.libp2pHost.Peerstore().AddAddrs(peer.ID, peer.PeerAddresses, peerstore.ConnectedAddrTTL) cancelCtx := ctx @@ -273,6 +283,18 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { return } + m.allowPeerMutex.Lock() + defer m.allowPeerMutex.Unlock() + + peerID := stream.Conn().RemotePeer() + + if !m.allowPeer(peerID) { + m.logger.LogDebugf("peer %s is not allowed", peerID) + m.closeStream(stream) + + return + } + ps := NewPacketsStream(stream, m.protocolHandler.PacketFactory) if err := ps.receiveNegotiation(); err != nil { m.logger.LogError("failed to receive negotiation message") @@ -282,20 +304,20 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { } peerAddrInfo := &peer.AddrInfo{ - ID: stream.Conn().RemotePeer(), + ID: peerID, Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()}, } networkPeer := network.NewPeerFromAddrInfo(peerAddrInfo) if err := m.peerDB.UpdatePeer(networkPeer); err != nil { - m.logger.LogErrorf("failed to update peer in peer database, peerID: %s, error: %s", networkPeer.ID, err) + m.logger.LogErrorf("failed to update peer in peer database, peerID: %s, error: %s", peerID, err) m.closeStream(stream) return } if err := m.addNeighbor(networkPeer, ps); err != nil { - m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", networkPeer.ID, err) + m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", peerID, err) m.closeStream(stream) return @@ -357,6 +379,7 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { nbr.readLoop() nbr.writeLoop() nbr.logger.LogInfof("Connection established to %s", nbr.Peer().ID) + nbr.Peer().SetConnStatus(network.ConnStatusConnected) m.neighborAdded.Trigger(nbr) return nil @@ -371,6 +394,8 @@ func (m *Manager) deleteNeighbor(nbr *neighbor) { _ = m.libp2pHost.Network().ClosePeer(nbr.Peer().ID) m.neighbors.Delete(nbr.Peer().ID) + + nbr.Peer().SetConnStatus(network.ConnStatusDisconnected) } func (m *Manager) setNeighbor(nbr *neighbor) error { @@ -393,3 +418,31 @@ func (m *Manager) dropAllNeighbors() { nbr.Close() } } + +func (m *Manager) AutopeeringNeighborsCount() int { + return len(lo.Filter(m.allNeighborsIDs(), func(p peer.ID) bool { + return !m.manualPeering.IsPeerKnown(p) + })) +} + +func (m *Manager) allowPeer(id peer.ID) (allow bool) { + // This should always be called from within the allowPeerMutex lock + + // Always allow manual peers + if m.manualPeering.IsPeerKnown(id) { + m.logger.LogDebugf("Allow peer %s because it is a manual peer", id) + return true + } + + // Only allow up to the maximum number of autopeered neighbors + autopeeredNeighbors := m.AutopeeringNeighborsCount() + if autopeeredNeighbors < m.autoPeering.MaxNeighbors() { + m.logger.LogDebugf("Allow peer %s because it is an autopeered peer and the maximum number of autopeered neighbors has not been reached: %d vs %d", id, autopeeredNeighbors, m.autoPeering.MaxNeighbors()) + return true + } + + // Don't allow new peers + m.logger.LogDebugf("Disallow peer %s because it is an autopeered peer and the maximum number of autopeered neighbors has been reached: max %d", id, m.autoPeering.MaxNeighbors()) + + return false +} diff --git a/pkg/network/p2p/manualpeering/manualpeering.go b/pkg/network/p2p/manualpeering/manualpeering.go index 5244bb01c..626b7499a 100644 --- a/pkg/network/p2p/manualpeering/manualpeering.go +++ b/pkg/network/p2p/manualpeering/manualpeering.go @@ -185,6 +185,7 @@ func (m *Manager) IsPeerKnown(id peer.ID) bool { defer m.knownPeersMutex.RUnlock() _, exists := m.knownPeers[id] + return exists } @@ -264,15 +265,15 @@ func (m *Manager) keepPeerConnected(peer *network.Peer) { } func (m *Manager) onGossipNeighborRemoved(neighbor network.Neighbor) { - m.changeNeighborStatus(neighbor, network.ConnStatusDisconnected) + m.changeNeighborStatus(neighbor) } func (m *Manager) onGossipNeighborAdded(neighbor network.Neighbor) { - m.changeNeighborStatus(neighbor, network.ConnStatusConnected) + m.changeNeighborStatus(neighbor) m.logger.LogInfof("Gossip layer successfully connected with the peer %s", neighbor.Peer()) } -func (m *Manager) changeNeighborStatus(neighbor network.Neighbor, connStatus network.ConnectionStatus) { +func (m *Manager) changeNeighborStatus(neighbor network.Neighbor) { m.knownPeersMutex.RLock() defer m.knownPeersMutex.RUnlock() @@ -280,6 +281,6 @@ func (m *Manager) changeNeighborStatus(neighbor network.Neighbor, connStatus net if !exists { return } - kp.SetConnStatus(connStatus) + kp.SetConnStatus(neighbor.Peer().GetConnStatus()) m.networkManager.P2PHost().ConnManager().Protect(neighbor.Peer().ID, manualPeerProtectionTag) } diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod index f87691157..f5fd4b460 100644 --- a/tools/gendoc/go.mod +++ b/tools/gendoc/go.mod @@ -135,7 +135,7 @@ require ( github.com/pokt-network/smt v0.9.2 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.47.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/quic-go/qpack v0.4.0 // indirect diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index fc5db226f..bb7457a61 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -553,6 +553,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= From 5c8af0b5da0d386ed987c005e45fe1267957fe72 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 10:26:23 +0100 Subject: [PATCH 13/18] Limit the peers to 3 per node in the docker network --- tools/docker-network/.env | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/docker-network/.env b/tools/docker-network/.env index ba729e121..efb1a5b3e 100644 --- a/tools/docker-network/.env +++ b/tools/docker-network/.env @@ -10,5 +10,6 @@ COMMON_CONFIG=" " AUTOPEERING_CONFIG=" ---p2p.bootstrapPeers=/dns/node-1-validator/tcp/15600/p2p/12D3KooWRVt4Engu27jHnF2RjfX48EqiAqJbgLfFdHNt3Vn6BtJK\ +--p2p.bootstrapPeers=/dns/node-1-validator/tcp/15600/p2p/12D3KooWRVt4Engu27jHnF2RjfX48EqiAqJbgLfFdHNt3Vn6BtJK +--p2p.autopeering.maxPeers=3 " From fc9b9f052ee71e066b1213e461a6898b3d54535b Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 12:08:49 +0100 Subject: [PATCH 14/18] =?UTF-8?q?Stop=20advertising=20autopeering=20if=20w?= =?UTF-8?q?e=20have=20enough=20peers,=20so=20that=20others=20don=E2=80=99t?= =?UTF-8?q?=20try=20to=20connect=20to=20us?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/network/endpoint.go | 4 + pkg/network/errors.go | 2 + pkg/network/manager.go | 2 +- pkg/network/p2p/autopeering/autopeering.go | 99 ++++++++++++++++--- pkg/network/p2p/manager.go | 66 +++++++------ .../p2p/manualpeering/manualpeering.go | 2 +- pkg/network/p2p/neighbor.go | 12 ++- pkg/network/p2p/neighbor_test.go | 2 +- 8 files changed, 141 insertions(+), 48 deletions(-) diff --git a/pkg/network/endpoint.go b/pkg/network/endpoint.go index ae08ad0f8..bb1ca2f18 100644 --- a/pkg/network/endpoint.go +++ b/pkg/network/endpoint.go @@ -5,6 +5,10 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + CoreProtocolID = "iota-core/1.0.0" +) + type Endpoint interface { LocalPeerID() peer.ID RegisterProtocol(factory func() proto.Message, handler func(peer.ID, proto.Message) error) diff --git a/pkg/network/errors.go b/pkg/network/errors.go index 25526c773..0d485d990 100644 --- a/pkg/network/errors.go +++ b/pkg/network/errors.go @@ -11,6 +11,8 @@ var ( ErrLoopbackPeer = ierrors.New("loopback connection not allowed") // ErrDuplicatePeer is returned when the same peer is added more than once. ErrDuplicatePeer = ierrors.New("already connected") + // ErrFirstPacketNotReceived is returned when the first packet from a peer is not received. + ErrFistPacketNotReceived = ierrors.New("first packet not received") // ErrMaxAutopeeringPeersReached is returned when the maximum number of autopeering peers is reached. ErrMaxAutopeeringPeersReached = ierrors.New("max autopeering peers reached") ) diff --git a/pkg/network/manager.go b/pkg/network/manager.go index 98ccae228..69fc14344 100644 --- a/pkg/network/manager.go +++ b/pkg/network/manager.go @@ -19,7 +19,7 @@ type Manager interface { OnNeighborRemoved(func(Neighbor)) *event.Hook[func(Neighbor)] AllNeighbors() []Neighbor - AutopeeringNeighborsCount() int + AutopeeringNeighbors() []Neighbor DropNeighbor(peer.ID) error NeighborExists(peer.ID) bool diff --git a/pkg/network/p2p/autopeering/autopeering.go b/pkg/network/p2p/autopeering/autopeering.go index be77e3ae3..efe8d675c 100644 --- a/pkg/network/p2p/autopeering/autopeering.go +++ b/pkg/network/p2p/autopeering/autopeering.go @@ -10,6 +10,7 @@ import ( dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/discovery/util" @@ -31,6 +32,10 @@ type Manager struct { ctx context.Context stopFunc context.CancelFunc routingDiscovery *routing.RoutingDiscovery + + advertiseLock sync.Mutex + advertiseCtx context.Context + advertiseCancel context.CancelFunc } // NewManager creates a new autopeering manager. @@ -40,7 +45,7 @@ func NewManager(maxPeers int, networkManager network.Manager, host host.Host, pe networkManager: networkManager, host: host, peerDB: peerDB, - logger: logger, + logger: logger.NewChildLogger("Autopeering"), } } @@ -52,21 +57,31 @@ func (m *Manager) MaxNeighbors() int { func (m *Manager) Start(ctx context.Context, networkID string) (err error) { //nolint:contextcheck m.startOnce.Do(func() { - m.namespace = fmt.Sprintf("/iota/%s/1.0.0", networkID) - m.ctx, m.stopFunc = context.WithCancel(ctx) - kademliaDHT, innerErr := dht.New(m.ctx, m.host, dht.Mode(dht.ModeServer)) + // We will use /iota/networkID/kad/1.0.0 for the DHT protocol. + // And /iota/networkID/iota-core/1.0.0 for the peer discovery. + prefix := protocol.ID("/iota") + extension := protocol.ID(fmt.Sprintf("/%s", networkID)) + m.namespace = fmt.Sprintf("%s%s/%s", prefix, extension, network.CoreProtocolID) + dhtCtx, dhtCancel := context.WithCancel(ctx) + kademliaDHT, innerErr := dht.New(dhtCtx, m.host, dht.Mode(dht.ModeServer), dht.ProtocolPrefix(prefix), dht.ProtocolExtension(extension)) if innerErr != nil { err = innerErr + dhtCancel() + return } // Bootstrap the DHT. In the default configuration, this spawns a Background worker that will keep the // node connected to the bootstrap peers and will disconnect from peers that are not useful. - if innerErr = kademliaDHT.Bootstrap(m.ctx); innerErr != nil { + if innerErr = kademliaDHT.Bootstrap(dhtCtx); innerErr != nil { err = innerErr + dhtCancel() + return } + m.ctx = dhtCtx + for _, seedPeer := range m.peerDB.SeedPeers() { addrInfo := seedPeer.ToAddrInfo() if innerErr := m.host.Connect(ctx, *addrInfo); innerErr != nil { @@ -83,10 +98,23 @@ func (m *Manager) Start(ctx context.Context, networkID string) (err error) { } m.routingDiscovery = routing.NewRoutingDiscovery(kademliaDHT) - util.Advertise(m.ctx, m.routingDiscovery, m.namespace, discovery.TTL(5*time.Minute)) - + m.startAdvertisingIfNeeded() go m.discoveryLoop() + onGossipNeighborRemovedHook := m.networkManager.OnNeighborRemoved(func(_ network.Neighbor) { + m.startAdvertisingIfNeeded() + }) + onGossipNeighborAddedHook := m.networkManager.OnNeighborAdded(func(_ network.Neighbor) { + m.stopAdvertisingItNotNeeded() + }) + + m.stopFunc = func() { + dhtCancel() + onGossipNeighborRemovedHook.Unhook() + onGossipNeighborAddedHook.Unhook() + m.stopAdvertising() + } + m.isStarted.Store(true) }) @@ -100,11 +128,45 @@ func (m *Manager) Stop() error { } m.stopOnce.Do(func() { m.stopFunc() + m.stopAdvertising() }) return nil } +func (m *Manager) startAdvertisingIfNeeded() { + m.advertiseLock.Lock() + defer m.advertiseLock.Unlock() + + if len(m.networkManager.AutopeeringNeighbors()) >= m.maxPeers { + return + } + + if m.advertiseCtx == nil && m.ctx.Err() == nil { + m.logger.LogInfof("Start advertising for namespace %s", m.namespace) + m.advertiseCtx, m.advertiseCancel = context.WithCancel(m.ctx) + util.Advertise(m.advertiseCtx, m.routingDiscovery, m.namespace, discovery.TTL(time.Minute)) + } +} + +func (m *Manager) stopAdvertisingItNotNeeded() { + if len(m.networkManager.AutopeeringNeighbors()) >= m.maxPeers { + m.stopAdvertising() + } +} + +func (m *Manager) stopAdvertising() { + m.advertiseLock.Lock() + defer m.advertiseLock.Unlock() + + if m.advertiseCancel != nil { + m.logger.LogInfof("Stop advertising for namespace %s", m.namespace) + m.advertiseCancel() + m.advertiseCtx = nil + m.advertiseCancel = nil + } +} + func (m *Manager) discoveryLoop() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() @@ -122,16 +184,28 @@ func (m *Manager) discoveryLoop() { } func (m *Manager) discoverAndDialPeers() { - peersToFind := m.maxPeers - m.networkManager.AutopeeringNeighborsCount() - if peersToFind <= 0 { - m.logger.LogDebug("Enough autopeering peers connected, not discovering new ones") + autopeeringNeighbors := m.networkManager.AutopeeringNeighbors() + peersToFind := m.maxPeers - len(autopeeringNeighbors) + if peersToFind == 0 { + m.logger.LogDebugf("%d autopeering peers connected, not discovering new ones", len(autopeeringNeighbors)) + return + } + + if peersToFind < 0 { + m.logger.LogDebugf("Too many autopeering peers connected %d, disconnecting some", -peersToFind) + for i := peersToFind; i < 0; i++ { + if err := m.networkManager.DropNeighbor(autopeeringNeighbors[i].Peer().ID); err != nil { + m.logger.LogDebugf("Failed to disconnect neighbor %s", autopeeringNeighbors[i].Peer().ID) + } + } + return } findCtx, cancel := context.WithTimeout(m.ctx, 10*time.Second) defer cancel() - m.logger.LogDebugf("Discovering new peers for namespace %s", m.namespace) + m.logger.LogDebugf("%d autopeering peers connected. Discovering new peers for namespace %s", len(autopeeringNeighbors), m.namespace) peerChan, err := m.routingDiscovery.FindPeers(findCtx, m.namespace) if err != nil { m.logger.LogWarnf("Failed to find peers: %s", err) @@ -159,7 +233,8 @@ func (m *Manager) discoverAndDialPeers() { peer := network.NewPeerFromAddrInfo(&peerAddrInfo) if err := m.networkManager.DialPeer(m.ctx, peer); err != nil { m.logger.LogWarnf("Failed to dial peer %s: %s", peerAddrInfo, err) + } else { + peersToFind-- } - peersToFind-- } } diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 72081d281..94b9ff899 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "time" "github.com/libp2p/go-libp2p/core/host" p2pnetwork "github.com/libp2p/go-libp2p/core/network" @@ -21,10 +22,6 @@ import ( "github.com/iotaledger/iota-core/pkg/network/p2p/manualpeering" ) -const ( - protocolID = "iota-core/1.0.0" -) - // ProtocolHandler holds callbacks to handle a protocol. type ProtocolHandler struct { PacketFactory func() proto.Message @@ -86,7 +83,7 @@ func (m *Manager) RegisterProtocol(factory func() proto.Message, handler func(pe PacketHandler: handler, } - m.libp2pHost.SetStreamHandler(protocolID, m.handleStream) + m.libp2pHost.SetStreamHandler(network.CoreProtocolID, m.handleStream) } // UnregisterProtocol unregisters the handler for the protocol. @@ -94,7 +91,7 @@ func (m *Manager) UnregisterProtocol() { m.protocolHandlerMutex.Lock() defer m.protocolHandlerMutex.Unlock() - m.libp2pHost.RemoveStreamHandler(protocolID) + m.libp2pHost.RemoveStreamHandler(network.CoreProtocolID) m.protocolHandler = nil } @@ -130,19 +127,19 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { m.libp2pHost.Peerstore().AddAddrs(peer.ID, peer.PeerAddresses, peerstore.ConnectedAddrTTL) cancelCtx := ctx - stream, err := m.P2PHost().NewStream(cancelCtx, peer.ID, protocolID) + stream, err := m.P2PHost().NewStream(cancelCtx, peer.ID, network.CoreProtocolID) if err != nil { - return ierrors.Wrapf(err, "dial %s / %s failed to open stream for proto %s", peer.PeerAddresses, peer.ID, protocolID) + return ierrors.Wrapf(err, "dial %s / %s failed to open stream for proto %s", peer.PeerAddresses, peer.ID, network.CoreProtocolID) } ps := NewPacketsStream(stream, m.protocolHandler.PacketFactory) if err := ps.sendNegotiation(); err != nil { m.closeStream(stream) - return ierrors.Wrapf(err, "dial %s / %s failed to send negotiation for proto %s", peer.PeerAddresses, peer.ID, protocolID) + return ierrors.Wrapf(err, "dial %s / %s failed to send negotiation for proto %s", peer.PeerAddresses, peer.ID, network.CoreProtocolID) } - m.logger.LogDebugf("outgoing stream negotiated, id: %s, addr: %s, proto: %s", peer.ID, ps.Conn().RemoteMultiaddr(), protocolID) + m.logger.LogDebugf("outgoing stream negotiated, id: %s, addr: %s, proto: %s", peer.ID, ps.Conn().RemoteMultiaddr(), network.CoreProtocolID) if err := m.peerDB.UpdatePeer(peer); err != nil { m.closeStream(stream) @@ -150,7 +147,7 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { return ierrors.Wrapf(err, "failed to update peer %s", peer.ID) } - if err := m.addNeighbor(peer, ps); err != nil { + if err := m.addNeighbor(ctx, peer, ps); err != nil { m.closeStream(stream) return ierrors.Errorf("failed to add neighbor %s: %s", peer.ID, err) @@ -232,7 +229,7 @@ func (m *Manager) Send(packet proto.Message, to ...peer.ID) { } for _, nbr := range neighbors { - nbr.Enqueue(packet, protocolID) + nbr.Enqueue(packet, network.CoreProtocolID) } } @@ -251,9 +248,10 @@ func (m *Manager) allNeighbors() []*neighbor { return m.neighbors.Values() } -// allNeighborsIDs returns all the ids of the neighbors that are currently connected. -func (m *Manager) allNeighborsIDs() []peer.ID { - return m.neighbors.Keys() +func (m *Manager) AutopeeringNeighbors() []network.Neighbor { + return lo.Filter(m.AllNeighbors(), func(n network.Neighbor) bool { + return !m.manualPeering.IsPeerKnown(n.Peer().ID) + }) } // neighborsByID returns all the neighbors that are currently connected corresponding to the supplied ids. @@ -316,7 +314,7 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { return } - if err := m.addNeighbor(networkPeer, ps); err != nil { + if err := m.addNeighbor(context.Background(), networkPeer, ps); err != nil { m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", peerID, err) m.closeStream(stream) @@ -340,7 +338,7 @@ func (m *Manager) neighbor(id peer.ID) (*neighbor, error) { return nbr, nil } -func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { +func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *PacketsStream) error { if peer.ID == m.libp2pHost.ID() { return ierrors.WithStack(network.ErrLoopbackPeer) } @@ -353,6 +351,7 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { return ierrors.WithStack(network.ErrDuplicatePeer) } + firstPacketReceivedCtx, firstPacketReceivedCancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second)) // create and add the neighbor nbr := newNeighbor(m.logger, peer, ps, func(nbr *neighbor, packet proto.Message) { m.protocolHandlerMutex.RLock() @@ -365,6 +364,11 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { if err := m.protocolHandler.PacketHandler(nbr.Peer().ID, packet); err != nil { nbr.logger.LogDebugf("Can't handle packet, error: %s", err) } + }, func(nbr *neighbor) { + nbr.logger.LogInfof("Neighbor connected: %s", nbr.Peer().ID) + nbr.Peer().SetConnStatus(network.ConnStatusConnected) + firstPacketReceivedCancel() + m.neighborAdded.Trigger(nbr) }, func(nbr *neighbor) { m.deleteNeighbor(nbr) m.neighborRemoved.Trigger(nbr) @@ -378,9 +382,15 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { } nbr.readLoop() nbr.writeLoop() - nbr.logger.LogInfof("Connection established to %s", nbr.Peer().ID) - nbr.Peer().SetConnStatus(network.ConnStatusConnected) - m.neighborAdded.Trigger(nbr) + + <-firstPacketReceivedCtx.Done() + + if ierrors.Is(firstPacketReceivedCtx.Err(), context.DeadlineExceeded) { + nbr.logger.LogErrorf("First packet not received within deadline") + nbr.Close() + + return ierrors.WithStack(network.ErrFistPacketNotReceived) + } return nil } @@ -419,30 +429,24 @@ func (m *Manager) dropAllNeighbors() { } } -func (m *Manager) AutopeeringNeighborsCount() int { - return len(lo.Filter(m.allNeighborsIDs(), func(p peer.ID) bool { - return !m.manualPeering.IsPeerKnown(p) - })) -} - func (m *Manager) allowPeer(id peer.ID) (allow bool) { // This should always be called from within the allowPeerMutex lock // Always allow manual peers if m.manualPeering.IsPeerKnown(id) { - m.logger.LogDebugf("Allow peer %s because it is a manual peer", id) + m.logger.LogDebugf("Allow manual peer %s", id) return true } // Only allow up to the maximum number of autopeered neighbors - autopeeredNeighbors := m.AutopeeringNeighborsCount() - if autopeeredNeighbors < m.autoPeering.MaxNeighbors() { - m.logger.LogDebugf("Allow peer %s because it is an autopeered peer and the maximum number of autopeered neighbors has not been reached: %d vs %d", id, autopeeredNeighbors, m.autoPeering.MaxNeighbors()) + autopeeredNeighborsCount := len(m.AutopeeringNeighbors()) + if autopeeredNeighborsCount < m.autoPeering.MaxNeighbors() { + m.logger.LogDebugf("Allow autopeered peer %s. Max %d has not been reached: %d", id, m.autoPeering.MaxNeighbors(), autopeeredNeighborsCount) return true } // Don't allow new peers - m.logger.LogDebugf("Disallow peer %s because it is an autopeered peer and the maximum number of autopeered neighbors has been reached: max %d", id, m.autoPeering.MaxNeighbors()) + m.logger.LogDebugf("Disallow autopeered peer %s. Max %d has been reached", id, m.autoPeering.MaxNeighbors()) return false } diff --git a/pkg/network/p2p/manualpeering/manualpeering.go b/pkg/network/p2p/manualpeering/manualpeering.go index 626b7499a..672572177 100644 --- a/pkg/network/p2p/manualpeering/manualpeering.go +++ b/pkg/network/p2p/manualpeering/manualpeering.go @@ -48,7 +48,7 @@ type Manager struct { func NewManager(networkManager network.Manager, logger log.Logger) *Manager { m := &Manager{ networkManager: networkManager, - logger: logger, + logger: logger.NewChildLogger("ManualPeering"), reconnectInterval: network.DefaultReconnectInterval, knownPeers: make(map[peer.ID]*network.Peer), } diff --git a/pkg/network/p2p/neighbor.go b/pkg/network/p2p/neighbor.go index 8ac7e5b32..c54e2eb2e 100644 --- a/pkg/network/p2p/neighbor.go +++ b/pkg/network/p2p/neighbor.go @@ -24,6 +24,7 @@ type queuedPacket struct { type ( PacketReceivedFunc func(neighbor *neighbor, packet proto.Message) + NeighborConnectedFunc func(neighbor *neighbor) NeighborDisconnectedFunc func(neighbor *neighbor) ) @@ -34,8 +35,11 @@ type neighbor struct { logger log.Logger packetReceivedFunc PacketReceivedFunc - disconnectedFunc NeighborDisconnectedFunc + connectedFunc NeighborConnectedFunc + disconnectedFunc NeighborDisconnectedFunc + + connectOnce sync.Once disconnectOnce sync.Once wg sync.WaitGroup @@ -50,13 +54,14 @@ type neighbor struct { var _ network.Neighbor = (*neighbor)(nil) // newNeighbor creates a new neighbor from the provided peer and connection. -func newNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream, packetReceivedCallback PacketReceivedFunc, disconnectedCallback NeighborDisconnectedFunc) *neighbor { +func newNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream, packetReceivedCallback PacketReceivedFunc, connectedCallback NeighborConnectedFunc, disconnectedCallback NeighborDisconnectedFunc) *neighbor { ctx, cancel := context.WithCancel(context.Background()) n := &neighbor{ peer: p, logger: parentLogger.NewChildLogger("peer", true), packetReceivedFunc: packetReceivedCallback, + connectedFunc: connectedCallback, disconnectedFunc: disconnectedCallback, loopCtx: ctx, loopCtxCancel: cancel, @@ -122,6 +127,9 @@ func (n *neighbor) readLoop() { return } + n.connectOnce.Do(func() { + n.connectedFunc(n) + }) n.packetReceivedFunc(n, packet) } }(n.stream) diff --git a/pkg/network/p2p/neighbor_test.go b/pkg/network/p2p/neighbor_test.go index be8fca69a..c07ddf385 100644 --- a/pkg/network/p2p/neighbor_test.go +++ b/pkg/network/p2p/neighbor_test.go @@ -83,7 +83,7 @@ func newTestNeighbor(name string, stream p2pnetwork.Stream, packetReceivedFunc . packetReceived = func(neighbor *neighbor, packet proto.Message) {} } - return newNeighbor(lo.Return1(testLogger.NewChildLogger(name)), newTestPeer(name), NewPacketsStream(stream, packetFactory), packetReceived, func(neighbor *neighbor) {}) + return newNeighbor(lo.Return1(testLogger.NewChildLogger(name)), newTestPeer(name), NewPacketsStream(stream, packetFactory), packetReceived, func(neighbor *neighbor) {}, func(neighbor *neighbor) {}) } func packetFactory() proto.Message { From 554b567f778d99c57d689795cdd25e0b138c97fa Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 12:22:28 +0100 Subject: [PATCH 15/18] Add max peers to the log --- pkg/network/p2p/autopeering/autopeering.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/p2p/autopeering/autopeering.go b/pkg/network/p2p/autopeering/autopeering.go index efe8d675c..62a26db63 100644 --- a/pkg/network/p2p/autopeering/autopeering.go +++ b/pkg/network/p2p/autopeering/autopeering.go @@ -187,7 +187,7 @@ func (m *Manager) discoverAndDialPeers() { autopeeringNeighbors := m.networkManager.AutopeeringNeighbors() peersToFind := m.maxPeers - len(autopeeringNeighbors) if peersToFind == 0 { - m.logger.LogDebugf("%d autopeering peers connected, not discovering new ones", len(autopeeringNeighbors)) + m.logger.LogDebugf("%d autopeering peers connected, not discovering new ones. (max %d)", len(autopeeringNeighbors), m.maxPeers) return } From 1e68cf2b540eb00f68802a5d55aee896e7c126ec Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 12:26:18 +0100 Subject: [PATCH 16/18] run gendoc --- config_defaults.json | 3 +++ documentation/configuration.md | 10 ++++++++++ tools/gendoc/go.sum | 3 +-- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/config_defaults.json b/config_defaults.json index 304046cf2..c9c34d305 100644 --- a/config_defaults.json +++ b/config_defaults.json @@ -28,6 +28,9 @@ }, "externalMultiAddresses": [], "identityPrivateKey": "", + "autopeering": { + "maxPeers": 5 + }, "db": { "path": "testnet/p2pstore" } diff --git a/documentation/configuration.md b/documentation/configuration.md index b9bc44daf..90db7b34d 100644 --- a/documentation/configuration.md +++ b/documentation/configuration.md @@ -99,6 +99,7 @@ Example: | [connectionManager](#p2p_connectionmanager) | Configuration for connectionManager | object | | | externalMultiAddresses | External reacheable multi addresses advertised to the network | array | | | identityPrivateKey | Private key used to derive the node identity (optional) | string | "" | +| [autopeering](#p2p_autopeering) | Configuration for autopeering | object | | | [db](#p2p_db) | Configuration for db | object | | ### ConnectionManager @@ -108,6 +109,12 @@ Example: | highWatermark | The threshold up on which connections count truncates to the lower watermark | int | 10 | | lowWatermark | The minimum connections count to hold after the high watermark was reached | int | 5 | +### Autopeering + +| Name | Description | Type | Default value | +| -------- | ------------------------------------------------------------------------ | ---- | ------------- | +| maxPeers | The max number of autopeer connections. Set to 0 to disable autopeering. | int | 5 | + ### Db | Name | Description | Type | Default value | @@ -129,6 +136,9 @@ Example: }, "externalMultiAddresses": [], "identityPrivateKey": "", + "autopeering": { + "maxPeers": 5 + }, "db": { "path": "testnet/p2pstore" } diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index bb7457a61..8d9fbf916 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -551,8 +551,7 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1: github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= From 47aa25a23ff7e2f23e9d1f0e75f82e4d23a0b5f6 Mon Sep 17 00:00:00 2001 From: muXxer Date: Wed, 21 Feb 2024 13:14:28 +0100 Subject: [PATCH 17/18] Fix typo --- pkg/network/errors.go | 2 +- pkg/network/p2p/manager.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/network/errors.go b/pkg/network/errors.go index 0d485d990..c47584efd 100644 --- a/pkg/network/errors.go +++ b/pkg/network/errors.go @@ -12,7 +12,7 @@ var ( // ErrDuplicatePeer is returned when the same peer is added more than once. ErrDuplicatePeer = ierrors.New("already connected") // ErrFirstPacketNotReceived is returned when the first packet from a peer is not received. - ErrFistPacketNotReceived = ierrors.New("first packet not received") + ErrFirstPacketNotReceived = ierrors.New("first packet not received") // ErrMaxAutopeeringPeersReached is returned when the maximum number of autopeering peers is reached. ErrMaxAutopeeringPeersReached = ierrors.New("max autopeering peers reached") ) diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 94b9ff899..1bf5ece9e 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -389,7 +389,7 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe nbr.logger.LogErrorf("First packet not received within deadline") nbr.Close() - return ierrors.WithStack(network.ErrFistPacketNotReceived) + return ierrors.WithStack(network.ErrFirstPacketNotReceived) } return nil From ee4c2aea1a875b78a3ae066a47593b9fed144a14 Mon Sep 17 00:00:00 2001 From: muXxer Date: Wed, 21 Feb 2024 13:24:36 +0100 Subject: [PATCH 18/18] Ignore errors during stream closing in the tests --- pkg/network/p2p/neighbor_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/network/p2p/neighbor_test.go b/pkg/network/p2p/neighbor_test.go index c07ddf385..cfe4d7f3d 100644 --- a/pkg/network/p2p/neighbor_test.go +++ b/pkg/network/p2p/neighbor_test.go @@ -129,12 +129,12 @@ func newStreamsPipe(t testing.TB) (p2pnetwork.Stream, p2pnetwork.Stream, func()) _, err = dialStream.Write(nil) require.NoError(t, err) acceptStream := <-acceptStremCh + tearDown := func() { - err2 := dialStream.Close() - require.NoError(t, err2) - err2 = acceptStream.Close() - require.NoError(t, err2) - err2 = host1.Close() + dialStream.Close() + acceptStream.Close() + + err2 := host1.Close() require.NoError(t, err2) err2 = host2.Close() require.NoError(t, err2)