diff --git a/components/dashboard/component.go b/components/dashboard/component.go
index 7f79604e8..34b65a464 100644
--- a/components/dashboard/component.go
+++ b/components/dashboard/component.go
@@ -17,7 +17,7 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/iota-core/components/metricstracker"
"github.com/iotaledger/iota-core/pkg/daemon"
- "github.com/iotaledger/iota-core/pkg/network/p2p"
+ "github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/protocol"
)
@@ -49,7 +49,7 @@ type dependencies struct {
Host host.Host
Protocol *protocol.Protocol
AppInfo *app.Info
- P2PManager *p2p.Manager
+ NetworkManager network.Manager
MetricsTracker *metricstracker.MetricsTracker
}
@@ -171,28 +171,20 @@ func currentNodeStatus() *nodestatus {
func neighborMetrics() []neighbormetric {
var stats []neighbormetric
- if deps.P2PManager == nil {
+ if deps.NetworkManager == nil {
return stats
}
// gossip plugin might be disabled
- neighbors := deps.P2PManager.AllNeighbors()
+ neighbors := deps.NetworkManager.AllNeighbors()
if neighbors == nil {
return stats
}
for _, neighbor := range neighbors {
- // origin := "Inbound"
- // for _, p := range deps.P2PManager.AllNeighbors() {
- // if neighbor.Peer == peer {
- // origin = "Outbound"
- // break
- // }
- // }
-
stats = append(stats, neighbormetric{
- ID: neighbor.Peer.ID.String(),
- Addresses: fmt.Sprintf("%s", neighbor.Peer.PeerAddresses),
+ ID: neighbor.Peer().ID.String(),
+ Addresses: fmt.Sprintf("%s", neighbor.Peer().PeerAddresses),
PacketsRead: neighbor.PacketsRead(),
PacketsWritten: neighbor.PacketsWritten(),
})
diff --git a/components/p2p/component.go b/components/p2p/component.go
index 4dbbea364..4e940ca8c 100644
--- a/components/p2p/component.go
+++ b/components/p2p/component.go
@@ -2,9 +2,7 @@ package p2p
import (
"context"
- "fmt"
"path/filepath"
- "time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
@@ -21,11 +19,8 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
hivedb "github.com/iotaledger/hive.go/kvstore/database"
- "github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/network"
- "github.com/iotaledger/iota-core/pkg/network/autopeering"
- "github.com/iotaledger/iota-core/pkg/network/manualpeering"
"github.com/iotaledger/iota-core/pkg/network/p2p"
"github.com/iotaledger/iota-core/pkg/protocol"
)
@@ -51,9 +46,7 @@ type dependencies struct {
dig.In
PeeringConfig *configuration.Configuration `name:"peeringConfig"`
PeeringConfigManager *p2p.ConfigManager
- ManualPeeringMgr *manualpeering.Manager
- AutoPeeringMgr *autopeering.Manager
- P2PManager *p2p.Manager
+ NetworkManager network.Manager
PeerDB *network.DB
Protocol *protocol.Protocol
PeerDBKVSTore kvstore.KVStore `name:"peerDBKVStore"`
@@ -79,49 +72,6 @@ func initConfigParams(c *dig.Container) error {
}
func provide(c *dig.Container) error {
- type manualPeeringDeps struct {
- dig.In
-
- P2PManager *p2p.Manager
- }
-
- if err := c.Provide(func(deps manualPeeringDeps) *manualpeering.Manager {
- return manualpeering.NewManager(deps.P2PManager, Component.WorkerPool, Component.Logger)
- }); err != nil {
- return err
- }
-
- type autoPeeringDeps struct {
- dig.In
-
- Protocol *protocol.Protocol
- P2PManager *p2p.Manager
- Host host.Host
- PeerDB *network.DB
- }
-
- if err := c.Provide(func(deps autoPeeringDeps) *autopeering.Manager {
- peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers)
- if err != nil {
- Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err)
- }
-
- for _, multiAddr := range peersMultiAddresses {
- bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr)
- if err != nil {
- Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err)
- }
-
- if err := deps.PeerDB.UpdatePeer(bootstrapPeer); err != nil {
- Component.LogErrorf("Failed to update bootstrap peer: %s", err)
- }
- }
-
- return autopeering.NewManager(deps.Protocol.LatestAPI().ProtocolParameters().NetworkName(), deps.P2PManager, deps.Host, deps.PeerDB, Component.Logger)
- }); err != nil {
- return err
- }
-
type peerDatabaseResult struct {
dig.Out
@@ -251,7 +201,7 @@ func provide(c *dig.Container) error {
connManager, err := connmgr.NewConnManager(
ParamsP2P.ConnectionManager.LowWatermark,
ParamsP2P.ConnectionManager.HighWatermark,
- connmgr.WithGracePeriod(time.Minute),
+ connmgr.WithEmergencyTrim(true),
)
if err != nil {
Component.LogPanicf("unable to initialize connection manager: %s", err)
@@ -263,6 +213,7 @@ func provide(c *dig.Container) error {
libp2p.Transport(tcp.NewTCPTransport),
libp2p.ConnectionManager(connManager),
libp2p.NATPortMap(),
+ libp2p.DisableRelay(),
// Define a custom address factory to inject external addresses to the DHT advertisements.
libp2p.AddrsFactory(func() func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr {
var externalMultiAddrs []multiaddr.Multiaddr
@@ -294,8 +245,31 @@ func provide(c *dig.Container) error {
Component.LogPanic(err.Error())
}
- return c.Provide(func(host host.Host, peerDB *network.DB) *p2p.Manager {
- return p2p.NewManager(host, peerDB, Component.Logger)
+ type p2pManagerDeps struct {
+ dig.In
+ Host host.Host
+ PeerDB *network.DB
+ }
+
+ return c.Provide(func(inDeps p2pManagerDeps) network.Manager {
+
+ peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers)
+ if err != nil {
+ Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err)
+ }
+
+ for _, multiAddr := range peersMultiAddresses {
+ bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr)
+ if err != nil {
+ Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err)
+ }
+
+ if err := inDeps.PeerDB.UpdatePeer(bootstrapPeer); err != nil {
+ Component.LogErrorf("Failed to update bootstrap peer: %s", err)
+ }
+ }
+
+ return p2p.NewManager(inDeps.Host, inDeps.PeerDB, ParamsP2P.Autopeering.MaxPeers, Component.Logger)
})
}
@@ -321,43 +295,27 @@ func configure() error {
}
// log the p2p events
- deps.P2PManager.Events.NeighborAdded.Hook(func(neighbor *p2p.Neighbor) {
- Component.LogInfof("Neighbor added: %s / %s", neighbor.PeerAddresses, neighbor.ID)
- }, event.WithWorkerPool(Component.WorkerPool))
+ deps.NetworkManager.OnNeighborAdded(func(neighbor network.Neighbor) {
+ Component.LogInfof("neighbor added: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID)
+ })
- deps.P2PManager.Events.NeighborRemoved.Hook(func(neighbor *p2p.Neighbor) {
- Component.LogInfof("Neighbor removed: %s / %s", neighbor.PeerAddresses, neighbor.ID)
- }, event.WithWorkerPool(Component.WorkerPool))
+ deps.NetworkManager.OnNeighborRemoved(func(neighbor network.Neighbor) {
+ Component.LogInfof("neighbor removed: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID)
+ })
return nil
}
func run() error {
if err := Component.Daemon().BackgroundWorker(Component.Name, func(ctx context.Context) {
- deps.ManualPeeringMgr.Start()
- if err := deps.AutoPeeringMgr.Start(ctx); err != nil {
- Component.LogFatalf("Failed to start autopeering manager: %s", err)
+ defer deps.NetworkManager.Shutdown()
+
+ if err := deps.NetworkManager.Start(ctx, deps.Protocol.LatestAPI().ProtocolParameters().NetworkName()); err != nil {
+ Component.LogFatalf("Failed to start p2p manager: %s", err)
}
- defer func() {
- if err := deps.ManualPeeringMgr.Stop(); err != nil {
- Component.LogErrorf("Failed to stop the manager", "err", err)
- }
- }()
//nolint:contextcheck // false positive
connectConfigKnownPeers()
- <-ctx.Done()
- }, daemon.PriorityManualPeering); err != nil {
- Component.LogFatalf("Failed to start as daemon: %s", err)
- }
-
- if err := Component.Daemon().BackgroundWorker(fmt.Sprintf("%s-P2PManager", Component.Name), func(ctx context.Context) {
- defer deps.P2PManager.Shutdown()
- defer func() {
- if err := deps.P2PManager.P2PHost().Close(); err != nil {
- Component.LogWarnf("Failed to close libp2p host: %+v", err)
- }
- }()
<-ctx.Done()
}, daemon.PriorityP2P); err != nil {
@@ -395,7 +353,7 @@ func connectConfigKnownPeers() {
Component.LogPanicf("invalid peer address info: %s", err)
}
- if err := deps.ManualPeeringMgr.AddPeers(multiAddr); err != nil {
+ if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil {
Component.LogInfof("failed to add peer: %s, error: %s", multiAddr.String(), err)
}
}
diff --git a/components/p2p/params.go b/components/p2p/params.go
index 68acdffbc..9e3e03cb9 100644
--- a/components/p2p/params.go
+++ b/components/p2p/params.go
@@ -28,6 +28,10 @@ type ParametersP2P struct {
// Defines the private key used to derive the node identity (optional).
IdentityPrivateKey string `default:"" usage:"private key used to derive the node identity (optional)"`
+ Autopeering struct {
+ MaxPeers int `default:"5" usage:"the max number of autopeer connections. Set to 0 to disable autopeering."`
+ }
+
Database struct {
// Defines the path to the p2p database.
Path string `default:"testnet/p2pstore" usage:"the path to the p2p database"`
diff --git a/components/protocol/component.go b/components/protocol/component.go
index 798e603bb..17e69f957 100644
--- a/components/protocol/component.go
+++ b/components/protocol/component.go
@@ -14,7 +14,7 @@ import (
"github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/model"
- "github.com/iotaledger/iota-core/pkg/network/p2p"
+ "github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/protocol"
"github.com/iotaledger/iota-core/pkg/protocol/engine/attestation/slotattestation"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter"
@@ -115,7 +115,7 @@ func provide(c *dig.Container) error {
DatabaseEngine hivedb.Engine `name:"databaseEngine"`
ProtocolParameters []iotago.ProtocolParameters
- P2PManager *p2p.Manager
+ NetworkManager network.Manager
}
return c.Provide(func(deps protocolDeps) *protocol.Protocol {
@@ -132,7 +132,7 @@ func provide(c *dig.Container) error {
return protocol.New(
Component.Logger,
workerpool.NewGroup("Protocol"),
- deps.P2PManager,
+ deps.NetworkManager,
protocol.WithBaseDirectory(ParamsDatabase.Path),
protocol.WithStorageOptions(
storage.WithDBEngine(deps.DatabaseEngine),
diff --git a/config_defaults.json b/config_defaults.json
index 304046cf2..c9c34d305 100644
--- a/config_defaults.json
+++ b/config_defaults.json
@@ -28,6 +28,9 @@
},
"externalMultiAddresses": [],
"identityPrivateKey": "",
+ "autopeering": {
+ "maxPeers": 5
+ },
"db": {
"path": "testnet/p2pstore"
}
diff --git a/documentation/configuration.md b/documentation/configuration.md
index b9bc44daf..90db7b34d 100644
--- a/documentation/configuration.md
+++ b/documentation/configuration.md
@@ -99,6 +99,7 @@ Example:
| [connectionManager](#p2p_connectionmanager) | Configuration for connectionManager | object | |
| externalMultiAddresses | External reacheable multi addresses advertised to the network | array | |
| identityPrivateKey | Private key used to derive the node identity (optional) | string | "" |
+| [autopeering](#p2p_autopeering) | Configuration for autopeering | object | |
| [db](#p2p_db) | Configuration for db | object | |
### ConnectionManager
@@ -108,6 +109,12 @@ Example:
| highWatermark | The threshold up on which connections count truncates to the lower watermark | int | 10 |
| lowWatermark | The minimum connections count to hold after the high watermark was reached | int | 5 |
+### Autopeering
+
+| Name | Description | Type | Default value |
+| -------- | ------------------------------------------------------------------------ | ---- | ------------- |
+| maxPeers | The max number of autopeer connections. Set to 0 to disable autopeering. | int | 5 |
+
### Db
| Name | Description | Type | Default value |
@@ -129,6 +136,9 @@ Example:
},
"externalMultiAddresses": [],
"identityPrivateKey": "",
+ "autopeering": {
+ "maxPeers": 5
+ },
"db": {
"path": "testnet/p2pstore"
}
diff --git a/go.mod b/go.mod
index 62d74289e..c30e0bb02 100644
--- a/go.mod
+++ b/go.mod
@@ -144,7 +144,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pokt-network/smt v0.9.2 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
- github.com/prometheus/client_model v0.5.0 // indirect
+ github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
diff --git a/go.sum b/go.sum
index 1b7425a4e..201ee1bba 100644
--- a/go.sum
+++ b/go.sum
@@ -547,8 +547,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
-github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
-github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
+github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
+github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
diff --git a/pkg/daemon/shutdown.go b/pkg/daemon/shutdown.go
index 6bf40fd5e..31b8cbd97 100644
--- a/pkg/daemon/shutdown.go
+++ b/pkg/daemon/shutdown.go
@@ -5,12 +5,8 @@ package daemon
const (
PriorityCloseDatabase = iota // no dependencies
- PriorityPeerDatabase
PriorityP2P
- PriorityManualPeering
PriorityProtocol
- PriorityBlockIssuer
- PriorityActivity // depends on BlockIssuer
PriorityRestAPI
PriorityINX
PriorityDashboardMetrics
diff --git a/pkg/network/autopeering/autopeering.go b/pkg/network/autopeering/autopeering.go
deleted file mode 100644
index 2dc5e9aa4..000000000
--- a/pkg/network/autopeering/autopeering.go
+++ /dev/null
@@ -1,145 +0,0 @@
-package autopeering
-
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- dht "github.com/libp2p/go-libp2p-kad-dht"
- "github.com/libp2p/go-libp2p/core/discovery"
- "github.com/libp2p/go-libp2p/core/host"
- "github.com/libp2p/go-libp2p/p2p/discovery/routing"
- "github.com/libp2p/go-libp2p/p2p/discovery/util"
-
- "github.com/iotaledger/hive.go/ierrors"
- "github.com/iotaledger/hive.go/log"
- "github.com/iotaledger/iota-core/pkg/network"
- "github.com/iotaledger/iota-core/pkg/network/p2p"
-)
-
-type Manager struct {
- networkID string
- p2pManager *p2p.Manager
- logger log.Logger
- host host.Host
- peerDB *network.DB
- startOnce sync.Once
- isStarted atomic.Bool
- stopOnce sync.Once
- ctx context.Context
- stopFunc context.CancelFunc
- routingDiscovery *routing.RoutingDiscovery
-}
-
-// NewManager creates a new autopeering manager.
-func NewManager(networkID string, p2pManager *p2p.Manager, host host.Host, peerDB *network.DB, logger log.Logger) *Manager {
- return &Manager{
- networkID: networkID,
- p2pManager: p2pManager,
- host: host,
- peerDB: peerDB,
- logger: logger,
- }
-}
-
-// Start starts the autopeering manager.
-func (m *Manager) Start(ctx context.Context) (err error) {
- //nolint:contextcheck
- m.startOnce.Do(func() {
- m.ctx, m.stopFunc = context.WithCancel(ctx)
- kademliaDHT, innerErr := dht.New(m.ctx, m.host, dht.Mode(dht.ModeServer))
- if innerErr != nil {
- err = innerErr
- return
- }
-
- // Bootstrap the DHT. In the default configuration, this spawns a Background worker that will keep the
- // node connected to the bootstrap peers and will disconnect from peers that are not useful.
- if innerErr = kademliaDHT.Bootstrap(m.ctx); innerErr != nil {
- err = innerErr
- return
- }
-
- for _, seedPeer := range m.peerDB.SeedPeers() {
- addrInfo := seedPeer.ToAddrInfo()
- if innerErr := m.host.Connect(ctx, *addrInfo); innerErr != nil {
- m.logger.LogInfof("Failed to connect to bootstrap node, peer: %s, error: %s", seedPeer, innerErr)
- continue
- }
-
- if _, innerErr := kademliaDHT.RoutingTable().TryAddPeer(addrInfo.ID, true, true); innerErr != nil {
- m.logger.LogWarnf("Failed to add bootstrap node to routing table, error: %s", innerErr)
- continue
- }
-
- m.logger.LogDebugf("Connected to bootstrap node, peer: %s", seedPeer)
- }
-
- m.routingDiscovery = routing.NewRoutingDiscovery(kademliaDHT)
- util.Advertise(m.ctx, m.routingDiscovery, m.networkID, discovery.TTL(5*time.Minute))
-
- go m.discoveryLoop()
-
- m.isStarted.Store(true)
- })
-
- return err
-}
-
-// Stop terminates internal background workers. Calling multiple times has no effect.
-func (m *Manager) Stop() error {
- if !m.isStarted.Load() {
- return ierrors.New("can't stop the manager: it hasn't been started yet")
- }
- m.stopOnce.Do(func() {
- m.stopFunc()
- })
-
- return nil
-}
-
-func (m *Manager) discoveryLoop() {
- ticker := time.NewTicker(1 * time.Minute)
- defer ticker.Stop()
-
- m.discoverAndDialPeers()
-
- for {
- select {
- case <-ticker.C:
- m.discoverAndDialPeers()
- case <-m.ctx.Done():
- return
- }
- }
-}
-
-func (m *Manager) discoverAndDialPeers() {
- tctx, cancel := context.WithTimeout(m.ctx, 10*time.Second)
- defer cancel()
-
- m.logger.LogDebugf("Discovering peers for network ID %s", m.networkID)
- peerChan, err := m.routingDiscovery.FindPeers(tctx, m.networkID)
- if err != nil {
- m.logger.LogWarnf("Failed to find peers: %s", err)
- }
-
- for peerAddrInfo := range peerChan {
- // Do not self-dial.
- if peerAddrInfo.ID == m.host.ID() {
- continue
- }
-
- m.logger.LogDebugf("Found peer: %s", peerAddrInfo)
-
- peer := network.NewPeerFromAddrInfo(&peerAddrInfo)
- if err := m.p2pManager.DialPeer(m.ctx, peer); err != nil {
- if ierrors.Is(err, p2p.ErrDuplicateNeighbor) {
- m.logger.LogDebugf("Already connected to peer %s", peer)
- continue
- }
- m.logger.LogWarnf("Failed to dial peer %s: %s", peer, err)
- }
- }
-}
diff --git a/pkg/network/endpoint.go b/pkg/network/endpoint.go
index ae08ad0f8..bb1ca2f18 100644
--- a/pkg/network/endpoint.go
+++ b/pkg/network/endpoint.go
@@ -5,6 +5,10 @@ import (
"google.golang.org/protobuf/proto"
)
+const (
+ CoreProtocolID = "iota-core/1.0.0"
+)
+
type Endpoint interface {
LocalPeerID() peer.ID
RegisterProtocol(factory func() proto.Message, handler func(peer.ID, proto.Message) error)
diff --git a/pkg/network/errors.go b/pkg/network/errors.go
new file mode 100644
index 000000000..c47584efd
--- /dev/null
+++ b/pkg/network/errors.go
@@ -0,0 +1,18 @@
+package network
+
+import "github.com/iotaledger/hive.go/ierrors"
+
+var (
+ // ErrNotRunning is returned when a peer is added to a stopped or not yet started network manager.
+ ErrNotRunning = ierrors.New("manager not running")
+ // ErrUnknownPeer is returned when the specified peer is not known to the network manager.
+ ErrUnknownPeer = ierrors.New("unknown neighbor")
+ // ErrLoopbackPeer is returned when the own peer is added.
+ ErrLoopbackPeer = ierrors.New("loopback connection not allowed")
+ // ErrDuplicatePeer is returned when the same peer is added more than once.
+ ErrDuplicatePeer = ierrors.New("already connected")
+ // ErrFirstPacketNotReceived is returned when the first packet from a peer is not received.
+ ErrFirstPacketNotReceived = ierrors.New("first packet not received")
+ // ErrMaxAutopeeringPeersReached is returned when the maximum number of autopeering peers is reached.
+ ErrMaxAutopeeringPeersReached = ierrors.New("max autopeering peers reached")
+)
diff --git a/pkg/network/manager.go b/pkg/network/manager.go
new file mode 100644
index 000000000..69fc14344
--- /dev/null
+++ b/pkg/network/manager.go
@@ -0,0 +1,33 @@
+package network
+
+import (
+ "context"
+
+ "github.com/libp2p/go-libp2p/core/host"
+ "github.com/libp2p/go-libp2p/core/peer"
+ "github.com/multiformats/go-multiaddr"
+
+ "github.com/iotaledger/hive.go/runtime/event"
+)
+
+type Manager interface {
+ Endpoint
+
+ DialPeer(context.Context, *Peer) error
+
+ OnNeighborAdded(func(Neighbor)) *event.Hook[func(Neighbor)]
+ OnNeighborRemoved(func(Neighbor)) *event.Hook[func(Neighbor)]
+
+ AllNeighbors() []Neighbor
+ AutopeeringNeighbors() []Neighbor
+
+ DropNeighbor(peer.ID) error
+ NeighborExists(peer.ID) bool
+
+ P2PHost() host.Host
+
+ Start(ctx context.Context, networkID string) error
+ Shutdown()
+
+ AddManualPeers(...multiaddr.Multiaddr) error
+}
diff --git a/pkg/network/neighbor.go b/pkg/network/neighbor.go
new file mode 100644
index 000000000..5a4336fde
--- /dev/null
+++ b/pkg/network/neighbor.go
@@ -0,0 +1,7 @@
+package network
+
+type Neighbor interface {
+ Peer() *Peer
+ PacketsRead() uint64
+ PacketsWritten() uint64
+}
diff --git a/pkg/network/p2p/autopeering/autopeering.go b/pkg/network/p2p/autopeering/autopeering.go
new file mode 100644
index 000000000..62a26db63
--- /dev/null
+++ b/pkg/network/p2p/autopeering/autopeering.go
@@ -0,0 +1,240 @@
+package autopeering
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ dht "github.com/libp2p/go-libp2p-kad-dht"
+ "github.com/libp2p/go-libp2p/core/discovery"
+ "github.com/libp2p/go-libp2p/core/host"
+ "github.com/libp2p/go-libp2p/core/protocol"
+ "github.com/libp2p/go-libp2p/p2p/discovery/routing"
+ "github.com/libp2p/go-libp2p/p2p/discovery/util"
+
+ "github.com/iotaledger/hive.go/ierrors"
+ "github.com/iotaledger/hive.go/log"
+ "github.com/iotaledger/iota-core/pkg/network"
+)
+
+type Manager struct {
+ namespace string
+ maxPeers int
+ networkManager network.Manager
+ logger log.Logger
+ host host.Host
+ peerDB *network.DB
+ startOnce sync.Once
+ isStarted atomic.Bool
+ stopOnce sync.Once
+ ctx context.Context
+ stopFunc context.CancelFunc
+ routingDiscovery *routing.RoutingDiscovery
+
+ advertiseLock sync.Mutex
+ advertiseCtx context.Context
+ advertiseCancel context.CancelFunc
+}
+
+// NewManager creates a new autopeering manager.
+func NewManager(maxPeers int, networkManager network.Manager, host host.Host, peerDB *network.DB, logger log.Logger) *Manager {
+ return &Manager{
+ maxPeers: maxPeers,
+ networkManager: networkManager,
+ host: host,
+ peerDB: peerDB,
+ logger: logger.NewChildLogger("Autopeering"),
+ }
+}
+
+func (m *Manager) MaxNeighbors() int {
+ return m.maxPeers
+}
+
+// Start starts the autopeering manager.
+func (m *Manager) Start(ctx context.Context, networkID string) (err error) {
+ //nolint:contextcheck
+ m.startOnce.Do(func() {
+ // We will use /iota/networkID/kad/1.0.0 for the DHT protocol.
+ // And /iota/networkID/iota-core/1.0.0 for the peer discovery.
+ prefix := protocol.ID("/iota")
+ extension := protocol.ID(fmt.Sprintf("/%s", networkID))
+ m.namespace = fmt.Sprintf("%s%s/%s", prefix, extension, network.CoreProtocolID)
+ dhtCtx, dhtCancel := context.WithCancel(ctx)
+ kademliaDHT, innerErr := dht.New(dhtCtx, m.host, dht.Mode(dht.ModeServer), dht.ProtocolPrefix(prefix), dht.ProtocolExtension(extension))
+ if innerErr != nil {
+ err = innerErr
+ dhtCancel()
+
+ return
+ }
+
+ // Bootstrap the DHT. In the default configuration, this spawns a Background worker that will keep the
+ // node connected to the bootstrap peers and will disconnect from peers that are not useful.
+ if innerErr = kademliaDHT.Bootstrap(dhtCtx); innerErr != nil {
+ err = innerErr
+ dhtCancel()
+
+ return
+ }
+
+ m.ctx = dhtCtx
+
+ for _, seedPeer := range m.peerDB.SeedPeers() {
+ addrInfo := seedPeer.ToAddrInfo()
+ if innerErr := m.host.Connect(ctx, *addrInfo); innerErr != nil {
+ m.logger.LogInfof("Failed to connect to bootstrap node, peer: %s, error: %s", seedPeer, innerErr)
+ continue
+ }
+
+ if _, innerErr := kademliaDHT.RoutingTable().TryAddPeer(addrInfo.ID, true, true); innerErr != nil {
+ m.logger.LogWarnf("Failed to add bootstrap node to routing table, error: %s", innerErr)
+ continue
+ }
+
+ m.logger.LogDebugf("Connected to bootstrap node, peer: %s", seedPeer)
+ }
+
+ m.routingDiscovery = routing.NewRoutingDiscovery(kademliaDHT)
+ m.startAdvertisingIfNeeded()
+ go m.discoveryLoop()
+
+ onGossipNeighborRemovedHook := m.networkManager.OnNeighborRemoved(func(_ network.Neighbor) {
+ m.startAdvertisingIfNeeded()
+ })
+ onGossipNeighborAddedHook := m.networkManager.OnNeighborAdded(func(_ network.Neighbor) {
+ m.stopAdvertisingItNotNeeded()
+ })
+
+ m.stopFunc = func() {
+ dhtCancel()
+ onGossipNeighborRemovedHook.Unhook()
+ onGossipNeighborAddedHook.Unhook()
+ m.stopAdvertising()
+ }
+
+ m.isStarted.Store(true)
+ })
+
+ return err
+}
+
+// Stop terminates internal background workers. Calling multiple times has no effect.
+func (m *Manager) Stop() error {
+ if !m.isStarted.Load() {
+ return ierrors.New("can't stop the manager: it hasn't been started yet")
+ }
+ m.stopOnce.Do(func() {
+ m.stopFunc()
+ m.stopAdvertising()
+ })
+
+ return nil
+}
+
+func (m *Manager) startAdvertisingIfNeeded() {
+ m.advertiseLock.Lock()
+ defer m.advertiseLock.Unlock()
+
+ if len(m.networkManager.AutopeeringNeighbors()) >= m.maxPeers {
+ return
+ }
+
+ if m.advertiseCtx == nil && m.ctx.Err() == nil {
+ m.logger.LogInfof("Start advertising for namespace %s", m.namespace)
+ m.advertiseCtx, m.advertiseCancel = context.WithCancel(m.ctx)
+ util.Advertise(m.advertiseCtx, m.routingDiscovery, m.namespace, discovery.TTL(time.Minute))
+ }
+}
+
+func (m *Manager) stopAdvertisingItNotNeeded() {
+ if len(m.networkManager.AutopeeringNeighbors()) >= m.maxPeers {
+ m.stopAdvertising()
+ }
+}
+
+func (m *Manager) stopAdvertising() {
+ m.advertiseLock.Lock()
+ defer m.advertiseLock.Unlock()
+
+ if m.advertiseCancel != nil {
+ m.logger.LogInfof("Stop advertising for namespace %s", m.namespace)
+ m.advertiseCancel()
+ m.advertiseCtx = nil
+ m.advertiseCancel = nil
+ }
+}
+
+func (m *Manager) discoveryLoop() {
+ ticker := time.NewTicker(1 * time.Minute)
+ defer ticker.Stop()
+
+ m.discoverAndDialPeers()
+
+ for {
+ select {
+ case <-ticker.C:
+ m.discoverAndDialPeers()
+ case <-m.ctx.Done():
+ return
+ }
+ }
+}
+
+func (m *Manager) discoverAndDialPeers() {
+ autopeeringNeighbors := m.networkManager.AutopeeringNeighbors()
+ peersToFind := m.maxPeers - len(autopeeringNeighbors)
+ if peersToFind == 0 {
+ m.logger.LogDebugf("%d autopeering peers connected, not discovering new ones. (max %d)", len(autopeeringNeighbors), m.maxPeers)
+ return
+ }
+
+ if peersToFind < 0 {
+ m.logger.LogDebugf("Too many autopeering peers connected %d, disconnecting some", -peersToFind)
+ for i := peersToFind; i < 0; i++ {
+ if err := m.networkManager.DropNeighbor(autopeeringNeighbors[i].Peer().ID); err != nil {
+ m.logger.LogDebugf("Failed to disconnect neighbor %s", autopeeringNeighbors[i].Peer().ID)
+ }
+ }
+
+ return
+ }
+
+ findCtx, cancel := context.WithTimeout(m.ctx, 10*time.Second)
+ defer cancel()
+
+ m.logger.LogDebugf("%d autopeering peers connected. Discovering new peers for namespace %s", len(autopeeringNeighbors), m.namespace)
+ peerChan, err := m.routingDiscovery.FindPeers(findCtx, m.namespace)
+ if err != nil {
+ m.logger.LogWarnf("Failed to find peers: %s", err)
+ return
+ }
+
+ for peerAddrInfo := range peerChan {
+ if peersToFind <= 0 {
+ m.logger.LogDebug("Enough new autopeering peers connected")
+ return
+ }
+
+ // Do not self-dial.
+ if peerAddrInfo.ID == m.host.ID() {
+ continue
+ }
+
+ // Do not try to dial already connected peers.
+ if m.networkManager.NeighborExists(peerAddrInfo.ID) {
+ continue
+ }
+
+ m.logger.LogDebugf("Found peer: %s", peerAddrInfo)
+
+ peer := network.NewPeerFromAddrInfo(&peerAddrInfo)
+ if err := m.networkManager.DialPeer(m.ctx, peer); err != nil {
+ m.logger.LogWarnf("Failed to dial peer %s: %s", peerAddrInfo, err)
+ } else {
+ peersToFind--
+ }
+ }
+}
diff --git a/pkg/network/p2p/errors.go b/pkg/network/p2p/errors.go
deleted file mode 100644
index 1951e6d2b..000000000
--- a/pkg/network/p2p/errors.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package p2p
-
-import "github.com/iotaledger/hive.go/ierrors"
-
-var (
- // ErrNotRunning is returned when a neighbor is added to a stopped or not yet started p2p manager.
- ErrNotRunning = ierrors.New("manager not running")
- // ErrUnknownNeighbor is returned when the specified neighbor is not known to the p2p manager.
- ErrUnknownNeighbor = ierrors.New("unknown neighbor")
- // ErrLoopbackNeighbor is returned when the own peer is specified as a neighbor.
- ErrLoopbackNeighbor = ierrors.New("loopback connection not allowed")
- // ErrDuplicateNeighbor is returned when the same peer is added more than once as a neighbor.
- ErrDuplicateNeighbor = ierrors.New("already connected")
- // ErrNeighborQueueFull is returned when the send queue is already full.
- ErrNeighborQueueFull = ierrors.New("send queue is full")
-)
diff --git a/pkg/network/p2p/events.go b/pkg/network/p2p/events.go
deleted file mode 100644
index 5a1aa6a06..000000000
--- a/pkg/network/p2p/events.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package p2p
-
-import (
- "github.com/iotaledger/hive.go/runtime/event"
-)
-
-// NeighborEvents is a collection of events specific for a particular neighbors group, e.g "manual" or "auto".
-type NeighborEvents struct {
- // Fired when a neighbor connection has been established.
- NeighborAdded *event.Event1[*Neighbor]
-
- // Fired when a neighbor has been removed.
- NeighborRemoved *event.Event1[*Neighbor]
-}
-
-// NewNeighborEvents returns a new instance of NeighborGroupEvents.
-func NewNeighborEvents() *NeighborEvents {
- return &NeighborEvents{
- NeighborAdded: event.New1[*Neighbor](),
- NeighborRemoved: event.New1[*Neighbor](),
- }
-}
diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go
index d6ed84420..1bf5ece9e 100644
--- a/pkg/network/p2p/manager.go
+++ b/pkg/network/p2p/manager.go
@@ -8,65 +8,32 @@ import (
p2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
- "github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"
+ "github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
+ "github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/log"
+ "github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/iota-core/pkg/network"
+ "github.com/iotaledger/iota-core/pkg/network/p2p/autopeering"
+ "github.com/iotaledger/iota-core/pkg/network/p2p/manualpeering"
)
-const (
- protocolID = "iota-core/1.0.0"
- defaultConnectionTimeout = 5 * time.Second // timeout after which the connection must be established.
- ioTimeout = 4 * time.Second
-)
-
-var (
- // ErrTimeout is returned when an expected incoming connection was not received in time.
- ErrTimeout = ierrors.New("accept timeout")
- // ErrDuplicateAccept is returned when the server already registered an accept request for that peer ID.
- ErrDuplicateAccept = ierrors.New("accept request for that peer already exists")
- // ErrNoP2P means that the given peer does not support the p2p service.
- ErrNoP2P = ierrors.New("peer does not have a p2p service")
-)
-
-// ConnectPeerOption defines an option for the DialPeer and AcceptPeer methods.
-type ConnectPeerOption func(conf *connectPeerConfig)
-
-type connectPeerConfig struct {
- useDefaultTimeout bool
-}
-
// ProtocolHandler holds callbacks to handle a protocol.
type ProtocolHandler struct {
PacketFactory func() proto.Message
PacketHandler func(peer.ID, proto.Message) error
}
-func buildConnectPeerConfig(opts []ConnectPeerOption) *connectPeerConfig {
- conf := &connectPeerConfig{
- useDefaultTimeout: true,
- }
- for _, o := range opts {
- o(conf)
- }
-
- return conf
-}
-
-// WithNoDefaultTimeout returns a ConnectPeerOption that disables the default timeout for dial or accept.
-func WithNoDefaultTimeout() ConnectPeerOption {
- return func(conf *connectPeerConfig) {
- conf.useDefaultTimeout = false
- }
-}
-
// The Manager handles the connected neighbors.
type Manager struct {
- Events *NeighborEvents
+ // Fired when a neighbor connection has been established.
+ neighborAdded *event.Event1[network.Neighbor]
+ // Fired when a neighbor has been removed.
+ neighborRemoved *event.Event1[network.Neighbor]
libp2pHost host.Host
peerDB *network.DB
@@ -76,23 +43,33 @@ type Manager struct {
shutdownMutex syncutils.RWMutex
isShutdown bool
- neighbors map[peer.ID]*Neighbor
- neighborsMutex syncutils.RWMutex
+ neighbors *shrinkingmap.ShrinkingMap[peer.ID, *neighbor]
protocolHandler *ProtocolHandler
protocolHandlerMutex syncutils.RWMutex
+
+ autoPeering *autopeering.Manager
+ manualPeering *manualpeering.Manager
+
+ allowPeerMutex syncutils.Mutex
}
+var _ network.Manager = (*Manager)(nil)
+
// NewManager creates a new Manager.
-func NewManager(libp2pHost host.Host, peerDB *network.DB, logger log.Logger) *Manager {
+func NewManager(libp2pHost host.Host, peerDB *network.DB, maxAutopeeringPeers int, logger log.Logger) *Manager {
m := &Manager{
- libp2pHost: libp2pHost,
- peerDB: peerDB,
- logger: logger,
- Events: NewNeighborEvents(),
- neighbors: make(map[peer.ID]*Neighbor),
+ libp2pHost: libp2pHost,
+ peerDB: peerDB,
+ logger: logger,
+ neighborAdded: event.New1[network.Neighbor](),
+ neighborRemoved: event.New1[network.Neighbor](),
+ neighbors: shrinkingmap.New[peer.ID, *neighbor](),
}
+ m.autoPeering = autopeering.NewManager(maxAutopeeringPeers, m, libp2pHost, peerDB, logger)
+ m.manualPeering = manualpeering.NewManager(m, logger)
+
return m
}
@@ -106,7 +83,7 @@ func (m *Manager) RegisterProtocol(factory func() proto.Message, handler func(pe
PacketHandler: handler,
}
- m.libp2pHost.SetStreamHandler(protocol.ID(protocolID), m.handleStream)
+ m.libp2pHost.SetStreamHandler(network.CoreProtocolID, m.handleStream)
}
// UnregisterProtocol unregisters the handler for the protocol.
@@ -114,12 +91,20 @@ func (m *Manager) UnregisterProtocol() {
m.protocolHandlerMutex.Lock()
defer m.protocolHandlerMutex.Unlock()
- m.libp2pHost.RemoveStreamHandler(protocol.ID(protocolID))
+ m.libp2pHost.RemoveStreamHandler(network.CoreProtocolID)
m.protocolHandler = nil
}
+func (m *Manager) OnNeighborAdded(handler func(network.Neighbor)) *event.Hook[func(network.Neighbor)] {
+ return m.neighborAdded.Hook(handler)
+}
+
+func (m *Manager) OnNeighborRemoved(handler func(network.Neighbor)) *event.Hook[func(network.Neighbor)] {
+ return m.neighborRemoved.Hook(handler)
+}
+
// DialPeer connects to a peer.
-func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...ConnectPeerOption) error {
+func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error {
m.protocolHandlerMutex.RLock()
defer m.protocolHandlerMutex.RUnlock()
@@ -127,34 +112,34 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...Conn
return ierrors.New("no protocol handler registered to dial peer")
}
- if m.neighborExists(peer.ID) {
- return ierrors.Wrapf(ErrDuplicateNeighbor, "peer %s already exists", peer.ID)
+ if m.NeighborExists(peer.ID) {
+ return ierrors.Wrapf(network.ErrDuplicatePeer, "peer %s already exists", peer.ID)
}
- conf := buildConnectPeerConfig(opts)
+ m.allowPeerMutex.Lock()
+ defer m.allowPeerMutex.Unlock()
+
+ if !m.allowPeer(peer.ID) {
+ return ierrors.Wrapf(network.ErrMaxAutopeeringPeersReached, "peer %s is not allowed", peer.ID)
+ }
// Adds the peer's multiaddresses to the peerstore, so that they can be used for dialing.
m.libp2pHost.Peerstore().AddAddrs(peer.ID, peer.PeerAddresses, peerstore.ConnectedAddrTTL)
cancelCtx := ctx
- if conf.useDefaultTimeout {
- var cancel context.CancelFunc
- cancelCtx, cancel = context.WithTimeout(ctx, defaultConnectionTimeout)
- defer cancel()
- }
- stream, err := m.P2PHost().NewStream(cancelCtx, peer.ID, protocolID)
+ stream, err := m.P2PHost().NewStream(cancelCtx, peer.ID, network.CoreProtocolID)
if err != nil {
- return ierrors.Wrapf(err, "dial %s / %s failed to open stream for proto %s", peer.PeerAddresses, peer.ID, protocolID)
+ return ierrors.Wrapf(err, "dial %s / %s failed to open stream for proto %s", peer.PeerAddresses, peer.ID, network.CoreProtocolID)
}
ps := NewPacketsStream(stream, m.protocolHandler.PacketFactory)
if err := ps.sendNegotiation(); err != nil {
m.closeStream(stream)
- return ierrors.Wrapf(err, "dial %s / %s failed to send negotiation for proto %s", peer.PeerAddresses, peer.ID, protocolID)
+ return ierrors.Wrapf(err, "dial %s / %s failed to send negotiation for proto %s", peer.PeerAddresses, peer.ID, network.CoreProtocolID)
}
- m.logger.LogDebugf("outgoing stream negotiated, id: %s, addr: %s, proto: %s", peer.ID, ps.Conn().RemoteMultiaddr(), protocolID)
+ m.logger.LogDebugf("outgoing stream negotiated, id: %s, addr: %s, proto: %s", peer.ID, ps.Conn().RemoteMultiaddr(), network.CoreProtocolID)
if err := m.peerDB.UpdatePeer(peer); err != nil {
m.closeStream(stream)
@@ -162,7 +147,7 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...Conn
return ierrors.Wrapf(err, "failed to update peer %s", peer.ID)
}
- if err := m.addNeighbor(peer, ps); err != nil {
+ if err := m.addNeighbor(ctx, peer, ps); err != nil {
m.closeStream(stream)
return ierrors.Errorf("failed to add neighbor %s: %s", peer.ID, err)
@@ -171,6 +156,17 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer, opts ...Conn
return nil
}
+// Start starts the manager and initiates manual- and autopeering.
+func (m *Manager) Start(ctx context.Context, networkID string) error {
+ m.manualPeering.Start()
+
+ if m.autoPeering.MaxNeighbors() > 0 {
+ return m.autoPeering.Start(ctx, networkID)
+ }
+
+ return nil
+}
+
// Shutdown stops the manager and closes all established connections.
func (m *Manager) Shutdown() {
m.shutdownMutex.Lock()
@@ -180,9 +176,26 @@ func (m *Manager) Shutdown() {
return
}
m.isShutdown = true
+
+ if err := m.autoPeering.Stop(); err != nil {
+ m.logger.LogErrorf("failed to stop autopeering: %s", err)
+ }
+
+ if err := m.manualPeering.Stop(); err != nil {
+ m.logger.LogErrorf("failed to stop manualpeering: %s", err)
+ }
+
m.dropAllNeighbors()
m.UnregisterProtocol()
+
+ if err := m.libp2pHost.Close(); err != nil {
+ m.logger.LogErrorf("failed to close libp2p host: %s", err)
+ }
+}
+
+func (m *Manager) AddManualPeers(peers ...multiaddr.Multiaddr) error {
+ return m.manualPeering.AddPeers(peers...)
}
// LocalPeerID returns the local peer ID.
@@ -208,52 +221,48 @@ func (m *Manager) DropNeighbor(id peer.ID) error {
// Send sends a message with the specific protocol to a set of neighbors.
func (m *Manager) Send(packet proto.Message, to ...peer.ID) {
- var neighbors []*Neighbor
+ var neighbors []*neighbor
if len(to) == 0 {
- neighbors = m.AllNeighbors()
+ neighbors = m.allNeighbors()
} else {
- neighbors = m.NeighborsByID(to)
+ neighbors = m.neighborsByID(to)
}
for _, nbr := range neighbors {
- nbr.Enqueue(packet, protocol.ID(protocolID))
+ nbr.Enqueue(packet, network.CoreProtocolID)
}
}
-// AllNeighbors returns all the neighbors that are currently connected.
-func (m *Manager) AllNeighbors() []*Neighbor {
- m.neighborsMutex.RLock()
- defer m.neighborsMutex.RUnlock()
- result := make([]*Neighbor, 0, len(m.neighbors))
- for _, n := range m.neighbors {
- result = append(result, n)
+func (m *Manager) AllNeighbors() []network.Neighbor {
+ neighbors := m.allNeighbors()
+ result := make([]network.Neighbor, len(neighbors))
+ for i, n := range neighbors {
+ result[i] = n
}
return result
}
-// AllNeighborsIDs returns all the ids of the neighbors that are currently connected.
-func (m *Manager) AllNeighborsIDs() (ids []peer.ID) {
- ids = make([]peer.ID, 0)
- neighbors := m.AllNeighbors()
- for _, nbr := range neighbors {
- ids = append(ids, nbr.ID)
- }
+// allNeighbors returns all the neighbors that are currently connected.
+func (m *Manager) allNeighbors() []*neighbor {
+ return m.neighbors.Values()
+}
- return
+func (m *Manager) AutopeeringNeighbors() []network.Neighbor {
+ return lo.Filter(m.AllNeighbors(), func(n network.Neighbor) bool {
+ return !m.manualPeering.IsPeerKnown(n.Peer().ID)
+ })
}
-// NeighborsByID returns all the neighbors that are currently connected corresponding to the supplied ids.
-func (m *Manager) NeighborsByID(ids []peer.ID) []*Neighbor {
- result := make([]*Neighbor, 0, len(ids))
+// neighborsByID returns all the neighbors that are currently connected corresponding to the supplied ids.
+func (m *Manager) neighborsByID(ids []peer.ID) []*neighbor {
+ result := make([]*neighbor, 0, len(ids))
if len(ids) == 0 {
return result
}
- m.neighborsMutex.RLock()
- defer m.neighborsMutex.RUnlock()
for _, id := range ids {
- if n, ok := m.neighbors[id]; ok {
+ if n, ok := m.neighbors.Get(id); ok {
result = append(result, n)
}
}
@@ -267,7 +276,19 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) {
if m.protocolHandler == nil {
m.logger.LogError("no protocol handler registered")
- stream.Close()
+ _ = stream.Close()
+
+ return
+ }
+
+ m.allowPeerMutex.Lock()
+ defer m.allowPeerMutex.Unlock()
+
+ peerID := stream.Conn().RemotePeer()
+
+ if !m.allowPeer(peerID) {
+ m.logger.LogDebugf("peer %s is not allowed", peerID)
+ m.closeStream(stream)
return
}
@@ -281,19 +302,20 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) {
}
peerAddrInfo := &peer.AddrInfo{
- ID: stream.Conn().RemotePeer(),
+ ID: peerID,
Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()},
}
- peer := network.NewPeerFromAddrInfo(peerAddrInfo)
- if err := m.peerDB.UpdatePeer(peer); err != nil {
- m.logger.LogErrorf("failed to update peer in peer database, peerID: %s, error: %s", peer.ID, err)
+
+ networkPeer := network.NewPeerFromAddrInfo(peerAddrInfo)
+ if err := m.peerDB.UpdatePeer(networkPeer); err != nil {
+ m.logger.LogErrorf("failed to update peer in peer database, peerID: %s, error: %s", peerID, err)
m.closeStream(stream)
return
}
- if err := m.addNeighbor(peer, ps); err != nil {
- m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", peer.ID, err)
+ if err := m.addNeighbor(context.Background(), networkPeer, ps); err != nil {
+ m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", peerID, err)
m.closeStream(stream)
return
@@ -301,39 +323,37 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) {
}
func (m *Manager) closeStream(s p2pnetwork.Stream) {
- if err := s.Close(); err != nil {
+ if err := s.Reset(); err != nil {
m.logger.LogWarnf("close error, error: %s", err)
}
}
// neighborWithGroup returns neighbor by ID and group.
-func (m *Manager) neighbor(id peer.ID) (*Neighbor, error) {
- m.neighborsMutex.RLock()
- defer m.neighborsMutex.RUnlock()
-
- nbr, ok := m.neighbors[id]
+func (m *Manager) neighbor(id peer.ID) (*neighbor, error) {
+ nbr, ok := m.neighbors.Get(id)
if !ok {
- return nil, ErrUnknownNeighbor
+ return nil, network.ErrUnknownPeer
}
return nbr, nil
}
-func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error {
+func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *PacketsStream) error {
if peer.ID == m.libp2pHost.ID() {
- return ierrors.WithStack(ErrLoopbackNeighbor)
+ return ierrors.WithStack(network.ErrLoopbackPeer)
}
m.shutdownMutex.RLock()
defer m.shutdownMutex.RUnlock()
if m.isShutdown {
- return ErrNotRunning
+ return network.ErrNotRunning
}
- if m.neighborExists(peer.ID) {
- return ierrors.WithStack(ErrDuplicateNeighbor)
+ if m.NeighborExists(peer.ID) {
+ return ierrors.WithStack(network.ErrDuplicatePeer)
}
+ firstPacketReceivedCtx, firstPacketReceivedCancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
// create and add the neighbor
- nbr := NewNeighbor(m.logger, peer, ps, func(nbr *Neighbor, packet proto.Message) {
+ nbr := newNeighbor(m.logger, peer, ps, func(nbr *neighbor, packet proto.Message) {
m.protocolHandlerMutex.RLock()
defer m.protocolHandlerMutex.RUnlock()
@@ -341,15 +361,20 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error {
nbr.logger.LogError("Can't handle packet as no protocol is registered")
return
}
- if err := m.protocolHandler.PacketHandler(nbr.ID, packet); err != nil {
+ if err := m.protocolHandler.PacketHandler(nbr.Peer().ID, packet); err != nil {
nbr.logger.LogDebugf("Can't handle packet, error: %s", err)
}
- }, func(nbr *Neighbor) {
+ }, func(nbr *neighbor) {
+ nbr.logger.LogInfof("Neighbor connected: %s", nbr.Peer().ID)
+ nbr.Peer().SetConnStatus(network.ConnStatusConnected)
+ firstPacketReceivedCancel()
+ m.neighborAdded.Trigger(nbr)
+ }, func(nbr *neighbor) {
m.deleteNeighbor(nbr)
- m.Events.NeighborRemoved.Trigger(nbr)
+ m.neighborRemoved.Trigger(nbr)
})
if err := m.setNeighbor(nbr); err != nil {
- if resetErr := ps.Close(); resetErr != nil {
+ if resetErr := ps.Reset(); resetErr != nil {
nbr.logger.LogErrorf("error closing stream, error: %s", resetErr)
}
@@ -357,40 +382,71 @@ func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error {
}
nbr.readLoop()
nbr.writeLoop()
- nbr.logger.LogInfo("Connection established to %s")
- m.Events.NeighborAdded.Trigger(nbr)
+
+ <-firstPacketReceivedCtx.Done()
+
+ if ierrors.Is(firstPacketReceivedCtx.Err(), context.DeadlineExceeded) {
+ nbr.logger.LogErrorf("First packet not received within deadline")
+ nbr.Close()
+
+ return ierrors.WithStack(network.ErrFirstPacketNotReceived)
+ }
return nil
}
-func (m *Manager) neighborExists(id peer.ID) bool {
- m.neighborsMutex.RLock()
- defer m.neighborsMutex.RUnlock()
- _, exists := m.neighbors[id]
-
- return exists
+func (m *Manager) NeighborExists(id peer.ID) bool {
+ return m.neighbors.Has(id)
}
-func (m *Manager) deleteNeighbor(nbr *Neighbor) {
- m.neighborsMutex.Lock()
- defer m.neighborsMutex.Unlock()
- delete(m.neighbors, nbr.ID)
+func (m *Manager) deleteNeighbor(nbr *neighbor) {
+ // Close the connection to the peer.
+ _ = m.libp2pHost.Network().ClosePeer(nbr.Peer().ID)
+
+ m.neighbors.Delete(nbr.Peer().ID)
+
+ nbr.Peer().SetConnStatus(network.ConnStatusDisconnected)
}
-func (m *Manager) setNeighbor(nbr *Neighbor) error {
- m.neighborsMutex.Lock()
- defer m.neighborsMutex.Unlock()
- if _, exists := m.neighbors[nbr.ID]; exists {
- return ierrors.WithStack(ErrDuplicateNeighbor)
- }
- m.neighbors[nbr.ID] = nbr
+func (m *Manager) setNeighbor(nbr *neighbor) error {
+ var err error
+ m.neighbors.Compute(nbr.Peer().ID, func(currentValue *neighbor, exists bool) *neighbor {
+ if exists {
+ err = ierrors.WithStack(network.ErrDuplicatePeer)
+ return currentValue
+ }
- return nil
+ return nbr
+ })
+
+ return err
}
func (m *Manager) dropAllNeighbors() {
- neighborsList := m.AllNeighbors()
+ neighborsList := m.allNeighbors()
for _, nbr := range neighborsList {
nbr.Close()
}
}
+
+func (m *Manager) allowPeer(id peer.ID) (allow bool) {
+ // This should always be called from within the allowPeerMutex lock
+
+ // Always allow manual peers
+ if m.manualPeering.IsPeerKnown(id) {
+ m.logger.LogDebugf("Allow manual peer %s", id)
+ return true
+ }
+
+ // Only allow up to the maximum number of autopeered neighbors
+ autopeeredNeighborsCount := len(m.AutopeeringNeighbors())
+ if autopeeredNeighborsCount < m.autoPeering.MaxNeighbors() {
+ m.logger.LogDebugf("Allow autopeered peer %s. Max %d has not been reached: %d", id, m.autoPeering.MaxNeighbors(), autopeeredNeighborsCount)
+ return true
+ }
+
+ // Don't allow new peers
+ m.logger.LogDebugf("Disallow autopeered peer %s. Max %d has been reached", id, m.autoPeering.MaxNeighbors())
+
+ return false
+}
diff --git a/pkg/network/manualpeering/manualpeering.go b/pkg/network/p2p/manualpeering/manualpeering.go
similarity index 79%
rename from pkg/network/manualpeering/manualpeering.go
rename to pkg/network/p2p/manualpeering/manualpeering.go
index 45c8f8fd9..672572177 100644
--- a/pkg/network/manualpeering/manualpeering.go
+++ b/pkg/network/p2p/manualpeering/manualpeering.go
@@ -13,9 +13,11 @@ import (
"github.com/iotaledger/hive.go/log"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/syncutils"
- "github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/network"
- "github.com/iotaledger/iota-core/pkg/network/p2p"
+)
+
+const (
+ manualPeerProtectionTag = "manual-peering"
)
// Manager is the core entity in the manual peering package.
@@ -27,7 +29,7 @@ import (
// manager will make sure gossip drops that connection.
// Manager also subscribes to the gossip events and in case the connection with a manual peer fails it will reconnect.
type Manager struct {
- p2pm *p2p.Manager
+ networkManager network.Manager
logger log.Logger
startOnce sync.Once
isStarted atomic.Bool
@@ -37,20 +39,18 @@ type Manager struct {
reconnectInterval time.Duration
knownPeersMutex syncutils.RWMutex
knownPeers map[peer.ID]*network.Peer
- workerPool *workerpool.WorkerPool
- onGossipNeighborRemovedHook *event.Hook[func(*p2p.Neighbor)]
- onGossipNeighborAddedHook *event.Hook[func(*p2p.Neighbor)]
+ onGossipNeighborRemovedHook *event.Hook[func(network.Neighbor)]
+ onGossipNeighborAddedHook *event.Hook[func(network.Neighbor)]
}
// NewManager initializes a new Manager instance.
-func NewManager(p2pm *p2p.Manager, workerPool *workerpool.WorkerPool, logger log.Logger) *Manager {
+func NewManager(networkManager network.Manager, logger log.Logger) *Manager {
m := &Manager{
- p2pm: p2pm,
- logger: logger,
+ networkManager: networkManager,
+ logger: logger.NewChildLogger("ManualPeering"),
reconnectInterval: network.DefaultReconnectInterval,
knownPeers: make(map[peer.ID]*network.Peer),
- workerPool: workerPool,
}
return m
@@ -84,7 +84,10 @@ func (m *Manager) RemovePeer(peerID peer.ID) error {
m.knownPeersMutex.Unlock()
<-kp.DoneCh
- if err := m.p2pm.DropNeighbor(peerID); err != nil && !ierrors.Is(err, p2p.ErrUnknownNeighbor) {
+
+ m.networkManager.P2PHost().ConnManager().Unprotect(peerID, manualPeerProtectionTag)
+
+ if err := m.networkManager.DropNeighbor(peerID); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) {
return ierrors.Wrapf(err, "failed to drop known peer %s in the gossip layer", peerID)
}
@@ -149,13 +152,12 @@ func (m *Manager) GetPeers(opts ...GetPeersOption) []*network.PeerDescriptor {
// Calling multiple times has no effect.
func (m *Manager) Start() {
m.startOnce.Do(func() {
- m.workerPool.Start()
- m.onGossipNeighborRemovedHook = m.p2pm.Events.NeighborRemoved.Hook(func(neighbor *p2p.Neighbor) {
+ m.onGossipNeighborRemovedHook = m.networkManager.OnNeighborRemoved(func(neighbor network.Neighbor) {
m.onGossipNeighborRemoved(neighbor)
- }, event.WithWorkerPool(m.workerPool))
- m.onGossipNeighborAddedHook = m.p2pm.Events.NeighborAdded.Hook(func(neighbor *p2p.Neighbor) {
+ })
+ m.onGossipNeighborAddedHook = m.networkManager.OnNeighborAdded(func(neighbor network.Neighbor) {
m.onGossipNeighborAdded(neighbor)
- }, event.WithWorkerPool(m.workerPool))
+ })
m.isStarted.Store(true)
})
}
@@ -178,6 +180,15 @@ func (m *Manager) Stop() (err error) {
return err
}
+func (m *Manager) IsPeerKnown(id peer.ID) bool {
+ m.knownPeersMutex.RLock()
+ defer m.knownPeersMutex.RUnlock()
+
+ _, exists := m.knownPeers[id]
+
+ return exists
+}
+
func (m *Manager) addPeer(peerAddr multiaddr.Multiaddr) error {
if !m.isStarted.Load() {
return ierrors.New("manual peering manager hasn't been started yet")
@@ -196,7 +207,7 @@ func (m *Manager) addPeer(peerAddr multiaddr.Multiaddr) error {
}
// Do not add self
- if p.ID == m.p2pm.P2PHost().ID() {
+ if p.ID == m.networkManager.P2PHost().ID() {
return ierrors.New("not adding self to the list of known peers")
}
@@ -240,7 +251,7 @@ func (m *Manager) keepPeerConnected(peer *network.Peer) {
m.logger.LogInfof("Peer is disconnected, calling gossip layer to establish the connection, peerID: %s", peer.ID)
var err error
- if err = m.p2pm.DialPeer(ctx, peer); err != nil && !ierrors.Is(err, p2p.ErrDuplicateNeighbor) && !ierrors.Is(err, context.Canceled) {
+ if err = m.networkManager.DialPeer(ctx, peer); err != nil && !ierrors.Is(err, network.ErrDuplicatePeer) && !ierrors.Is(err, context.Canceled) {
m.logger.LogErrorf("Failed to connect a neighbor in the gossip layer, peerID: %s, error: %s", peer.ID, err)
}
}
@@ -253,22 +264,23 @@ func (m *Manager) keepPeerConnected(peer *network.Peer) {
}
}
-func (m *Manager) onGossipNeighborRemoved(neighbor *p2p.Neighbor) {
- m.changeNeighborStatus(neighbor, network.ConnStatusDisconnected)
+func (m *Manager) onGossipNeighborRemoved(neighbor network.Neighbor) {
+ m.changeNeighborStatus(neighbor)
}
-func (m *Manager) onGossipNeighborAdded(neighbor *p2p.Neighbor) {
- m.changeNeighborStatus(neighbor, network.ConnStatusConnected)
- m.logger.LogInfof("Gossip layer successfully connected with the peer %s", neighbor.Peer)
+func (m *Manager) onGossipNeighborAdded(neighbor network.Neighbor) {
+ m.changeNeighborStatus(neighbor)
+ m.logger.LogInfof("Gossip layer successfully connected with the peer %s", neighbor.Peer())
}
-func (m *Manager) changeNeighborStatus(neighbor *p2p.Neighbor, connStatus network.ConnectionStatus) {
+func (m *Manager) changeNeighborStatus(neighbor network.Neighbor) {
m.knownPeersMutex.RLock()
defer m.knownPeersMutex.RUnlock()
- kp, exists := m.knownPeers[neighbor.ID]
+ kp, exists := m.knownPeers[neighbor.Peer().ID]
if !exists {
return
}
- kp.SetConnStatus(connStatus)
+ kp.SetConnStatus(neighbor.Peer().GetConnStatus())
+ m.networkManager.P2PHost().ConnManager().Protect(neighbor.Peer().ID, manualPeerProtectionTag)
}
diff --git a/pkg/network/p2p/neighbor.go b/pkg/network/p2p/neighbor.go
index 445826453..c54e2eb2e 100644
--- a/pkg/network/p2p/neighbor.go
+++ b/pkg/network/p2p/neighbor.go
@@ -23,19 +23,23 @@ type queuedPacket struct {
}
type (
- PacketReceivedFunc func(neighbor *Neighbor, packet proto.Message)
- NeighborDisconnectedFunc func(neighbor *Neighbor)
+ PacketReceivedFunc func(neighbor *neighbor, packet proto.Message)
+ NeighborConnectedFunc func(neighbor *neighbor)
+ NeighborDisconnectedFunc func(neighbor *neighbor)
)
-// Neighbor describes the established p2p connection to another peer.
-type Neighbor struct {
- *network.Peer
+// neighbor describes the established p2p connection to another peer.
+type neighbor struct {
+ peer *network.Peer
logger log.Logger
packetReceivedFunc PacketReceivedFunc
- disconnectedFunc NeighborDisconnectedFunc
+ connectedFunc NeighborConnectedFunc
+ disconnectedFunc NeighborDisconnectedFunc
+
+ connectOnce sync.Once
disconnectOnce sync.Once
wg sync.WaitGroup
@@ -47,14 +51,17 @@ type Neighbor struct {
sendQueue chan *queuedPacket
}
-// NewNeighbor creates a new neighbor from the provided peer and connection.
-func NewNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream, packetReceivedCallback PacketReceivedFunc, disconnectedCallback NeighborDisconnectedFunc) *Neighbor {
+var _ network.Neighbor = (*neighbor)(nil)
+
+// newNeighbor creates a new neighbor from the provided peer and connection.
+func newNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream, packetReceivedCallback PacketReceivedFunc, connectedCallback NeighborConnectedFunc, disconnectedCallback NeighborDisconnectedFunc) *neighbor {
ctx, cancel := context.WithCancel(context.Background())
- n := &Neighbor{
- Peer: p,
+ n := &neighbor{
+ peer: p,
logger: parentLogger.NewChildLogger("peer", true),
packetReceivedFunc: packetReceivedCallback,
+ connectedFunc: connectedCallback,
disconnectedFunc: disconnectedCallback,
loopCtx: ctx,
loopCtxCancel: cancel,
@@ -62,12 +69,16 @@ func NewNeighbor(parentLogger log.Logger, p *network.Peer, stream *PacketsStream
sendQueue: make(chan *queuedPacket, NeighborsSendQueueSize),
}
- n.logger.LogInfo("created", "ID", n.ID)
+ n.logger.LogInfo("created", "ID", n.Peer().ID)
return n
}
-func (n *Neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) {
+func (n *neighbor) Peer() *network.Peer {
+ return n.peer
+}
+
+func (n *neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) {
select {
case n.sendQueue <- &queuedPacket{protocolID: protocolID, packet: packet}:
default:
@@ -76,21 +87,21 @@ func (n *Neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) {
}
// PacketsRead returns number of packets this neighbor has received.
-func (n *Neighbor) PacketsRead() uint64 {
+func (n *neighbor) PacketsRead() uint64 {
return n.stream.packetsRead.Load()
}
// PacketsWritten returns number of packets this neighbor has sent.
-func (n *Neighbor) PacketsWritten() uint64 {
+func (n *neighbor) PacketsWritten() uint64 {
return n.stream.packetsWritten.Load()
}
// ConnectionEstablished returns the connection established.
-func (n *Neighbor) ConnectionEstablished() time.Time {
+func (n *neighbor) ConnectionEstablished() time.Time {
return n.stream.Stat().Opened
}
-func (n *Neighbor) readLoop() {
+func (n *neighbor) readLoop() {
n.wg.Add(1)
go func(stream *PacketsStream) {
defer n.wg.Done()
@@ -116,12 +127,15 @@ func (n *Neighbor) readLoop() {
return
}
+ n.connectOnce.Do(func() {
+ n.connectedFunc(n)
+ })
n.packetReceivedFunc(n, packet)
}
}(n.stream)
}
-func (n *Neighbor) writeLoop() {
+func (n *neighbor) writeLoop() {
n.wg.Add(1)
go func() {
defer n.wg.Done()
@@ -132,7 +146,7 @@ func (n *Neighbor) writeLoop() {
return
case sendPacket := <-n.sendQueue:
if n.stream == nil {
- n.logger.LogWarnf("send error, no stream for protocol, peerID: %s, protocol: %s", n.ID, sendPacket.protocolID)
+ n.logger.LogWarnf("send error, no stream for protocol, peerID: %s, protocol: %s", n.Peer().ID, sendPacket.protocolID)
if disconnectErr := n.disconnect(); disconnectErr != nil {
n.logger.LogWarnf("Failed to disconnect, error: %s", disconnectErr)
}
@@ -140,7 +154,7 @@ func (n *Neighbor) writeLoop() {
return
}
if err := n.stream.WritePacket(sendPacket.packet); err != nil {
- n.logger.LogWarnf("send error, peerID: %s, error: %s", n.ID, err)
+ n.logger.LogWarnf("send error, peerID: %s, error: %s", n.Peer().ID, err)
if disconnectErr := n.disconnect(); disconnectErr != nil {
n.logger.LogWarnf("Failed to disconnect, error: %s", disconnectErr)
}
@@ -153,7 +167,7 @@ func (n *Neighbor) writeLoop() {
}
// Close closes the connection with the neighbor.
-func (n *Neighbor) Close() {
+func (n *neighbor) Close() {
if err := n.disconnect(); err != nil {
n.logger.LogErrorf("Failed to disconnect the neighbor, error: %s", err)
}
@@ -161,13 +175,13 @@ func (n *Neighbor) Close() {
n.logger.UnsubscribeFromParentLogger()
}
-func (n *Neighbor) disconnect() (err error) {
+func (n *neighbor) disconnect() (err error) {
n.disconnectOnce.Do(func() {
// Stop the loops
n.loopCtxCancel()
// Close all streams
- if streamErr := n.stream.Close(); streamErr != nil {
+ if streamErr := n.stream.Reset(); streamErr != nil {
err = ierrors.WithStack(streamErr)
}
n.logger.LogInfof("Stream closed, protocol: %s", n.stream.Protocol())
diff --git a/pkg/network/p2p/neighbor_test.go b/pkg/network/p2p/neighbor_test.go
index 59db5844a..cfe4d7f3d 100644
--- a/pkg/network/p2p/neighbor_test.go
+++ b/pkg/network/p2p/neighbor_test.go
@@ -51,7 +51,7 @@ func TestNeighborWrite(t *testing.T) {
defer teardown()
var countA uint32
- neighborA := newTestNeighbor("A", a, func(neighbor *Neighbor, packet proto.Message) {
+ neighborA := newTestNeighbor("A", a, func(neighbor *neighbor, packet proto.Message) {
_ = packet.(*p2pproto.Negotiation)
atomic.AddUint32(&countA, 1)
})
@@ -59,7 +59,7 @@ func TestNeighborWrite(t *testing.T) {
neighborA.readLoop()
var countB uint32
- neighborB := newTestNeighbor("B", b, func(neighbor *Neighbor, packet proto.Message) {
+ neighborB := newTestNeighbor("B", b, func(neighbor *neighbor, packet proto.Message) {
_ = packet.(*p2pproto.Negotiation)
atomic.AddUint32(&countB, 1)
})
@@ -75,15 +75,15 @@ func TestNeighborWrite(t *testing.T) {
assert.Eventually(t, func() bool { return atomic.LoadUint32(&countB) == 1 }, time.Second, 10*time.Millisecond)
}
-func newTestNeighbor(name string, stream p2pnetwork.Stream, packetReceivedFunc ...PacketReceivedFunc) *Neighbor {
+func newTestNeighbor(name string, stream p2pnetwork.Stream, packetReceivedFunc ...PacketReceivedFunc) *neighbor {
var packetReceived PacketReceivedFunc
if len(packetReceivedFunc) > 0 {
packetReceived = packetReceivedFunc[0]
} else {
- packetReceived = func(neighbor *Neighbor, packet proto.Message) {}
+ packetReceived = func(neighbor *neighbor, packet proto.Message) {}
}
- return NewNeighbor(lo.Return1(testLogger.NewChildLogger(name)), newTestPeer(name), NewPacketsStream(stream, packetFactory), packetReceived, func(neighbor *Neighbor) {})
+ return newNeighbor(lo.Return1(testLogger.NewChildLogger(name)), newTestPeer(name), NewPacketsStream(stream, packetFactory), packetReceived, func(neighbor *neighbor) {}, func(neighbor *neighbor) {})
}
func packetFactory() proto.Message {
@@ -129,12 +129,12 @@ func newStreamsPipe(t testing.TB) (p2pnetwork.Stream, p2pnetwork.Stream, func())
_, err = dialStream.Write(nil)
require.NoError(t, err)
acceptStream := <-acceptStremCh
+
tearDown := func() {
- err2 := dialStream.Close()
- require.NoError(t, err2)
- err2 = acceptStream.Close()
- require.NoError(t, err2)
- err2 = host1.Close()
+ dialStream.Close()
+ acceptStream.Close()
+
+ err2 := host1.Close()
require.NoError(t, err2)
err2 = host2.Close()
require.NoError(t, err2)
diff --git a/tools/docker-network/.env b/tools/docker-network/.env
index 025ff1d9e..efb1a5b3e 100644
--- a/tools/docker-network/.env
+++ b/tools/docker-network/.env
@@ -9,6 +9,7 @@ COMMON_CONFIG="
--protocol.snapshot.path=/app/data/snapshot.bin
"
-MANUALPEERING_CONFIG="
---p2p.peers=/dns/node-1-validator/tcp/15600/p2p/12D3KooWRVt4Engu27jHnF2RjfX48EqiAqJbgLfFdHNt3Vn6BtJK\
+AUTOPEERING_CONFIG="
+--p2p.bootstrapPeers=/dns/node-1-validator/tcp/15600/p2p/12D3KooWRVt4Engu27jHnF2RjfX48EqiAqJbgLfFdHNt3Vn6BtJK
+--p2p.autopeering.maxPeers=3
"
diff --git a/tools/docker-network/docker-compose.yml b/tools/docker-network/docker-compose.yml
index a487caa67..b69def761 100644
--- a/tools/docker-network/docker-compose.yml
+++ b/tools/docker-network/docker-compose.yml
@@ -29,7 +29,6 @@ services:
- ./config.json:/app/config.json:ro
command: >
${COMMON_CONFIG}
- ${MANUALPEERING_CONFIG}
--p2p.identityPrivateKey=08735375679f3d8031353e94282ed1d65119e5c288fe56d6639d9184a3f978fee8febfedff11cc376daea0f59c395ae2e9a870a25ac4e36093000fbf4d0e8f18
--inx.enabled=true
--inx.bindAddress=0.0.0.0:9029
@@ -40,6 +39,9 @@ services:
image: docker-network-node-1-validator:latest
stop_grace_period: 1m
restart: no
+ depends_on:
+ node-1-validator:
+ condition: service_healthy
ulimits:
nofile:
soft: 16384
@@ -56,7 +58,7 @@ services:
- ./config.json:/app/config.json:ro
command: >
${COMMON_CONFIG}
- ${MANUALPEERING_CONFIG}
+ ${AUTOPEERING_CONFIG}
--p2p.identityPrivateKey=ba771419c52132a0dfb2521ed18667813f398da159010a55a0a482af939affb92d3338789ad4a07a7631b91791deb11f82ed5dc612822f24275e9f7a313b691f
--inx.enabled=true
--inx.bindAddress=0.0.0.0:9029
@@ -67,6 +69,9 @@ services:
image: docker-network-node-1-validator:latest
stop_grace_period: 1m
restart: no
+ depends_on:
+ node-1-validator:
+ condition: service_healthy
ulimits:
nofile:
soft: 16384
@@ -83,7 +88,7 @@ services:
- ./config.json:/app/config.json:ro
command: >
${COMMON_CONFIG}
- ${MANUALPEERING_CONFIG}
+ ${AUTOPEERING_CONFIG}
--p2p.identityPrivateKey=a6261ac049755675ff1437654ca9f83b305055f01ff08c4f039209ef5a4a7d96d06fb61df77a8815209a8f4d204226dee593e50d0ec897ec440a2c1fbde77656
--inx.enabled=true
--inx.bindAddress=0.0.0.0:9029
@@ -93,7 +98,10 @@ services:
node-4-validator:
image: docker-network-node-1-validator:latest
stop_grace_period: 1m
- restart: unless-stopped
+ restart: no
+ depends_on:
+ node-1-validator:
+ condition: service_healthy
ulimits:
nofile:
soft: 16384
@@ -110,15 +118,18 @@ services:
- ./config.json:/app/config.json:ro
command: >
${COMMON_CONFIG}
- ${MANUALPEERING_CONFIG}
+ ${AUTOPEERING_CONFIG}
--p2p.identityPrivateKey=f205f6c4525069f71f9c7e987d72421a16c7900056b494a2b85fdf7942cf906aefbdc580f5d1ce4ae3f86ccfe109c6cd76df9b0e710a437b2aa964358c7b9449
--inx.enabled=true
--inx.bindAddress=0.0.0.0:9029
- node-4:
+ node-5:
image: docker-network-node-1-validator:latest
stop_grace_period: 1m
restart: no
+ depends_on:
+ node-1-validator:
+ condition: service_healthy
ulimits:
nofile:
soft: 16384
@@ -135,17 +146,20 @@ services:
- ./config.json:/app/config.json:ro
command: >
${COMMON_CONFIG}
- ${MANUALPEERING_CONFIG}
+ ${AUTOPEERING_CONFIG}
--p2p.identityPrivateKey=03feb3bcd25e57f75697bb329e6e0100680431e4c45c85bc013da2aea9e9d0345e08a0c37407dc62369deebc64cb0fb3ea26127d19d141ee7fb8eaa6b92019d7
--inx.enabled=true
--inx.bindAddress=0.0.0.0:9029
--prometheus.goMetrics=true
--prometheus.processMetrics=true
- node-5:
+ node-6:
image: docker-network-node-1-validator:latest
stop_grace_period: 1m
restart: no
+ depends_on:
+ node-1-validator:
+ condition: service_healthy
ulimits:
nofile:
soft: 16384
@@ -162,7 +176,7 @@ services:
- ./config.json:/app/config.json:ro
command: >
${COMMON_CONFIG}
- ${MANUALPEERING_CONFIG}
+ ${AUTOPEERING_CONFIG}
--p2p.identityPrivateKey=7d1491df3ef334dee988d6cdfc4b430b996d520bd63375a01d6754f8cee979b855b200fbea8c936ea1937a27e6ad72a7c9a21c1b17c2bd3c11f1f6994d813446
--inx.enabled=true
--inx.bindAddress=0.0.0.0:9029
diff --git a/tools/docker-network/prometheus.yml b/tools/docker-network/prometheus.yml
index c6458b27f..6568d8236 100644
--- a/tools/docker-network/prometheus.yml
+++ b/tools/docker-network/prometheus.yml
@@ -7,8 +7,8 @@ scrape_configs:
- node-2-validator:9311
- node-3-validator:9311
- node-4-validator:9311
- - node-4:9311
- node-5:9311
+ - node-6:9311
dns_sd_configs:
- names:
- 'peer_replica'
diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod
index f87691157..f5fd4b460 100644
--- a/tools/gendoc/go.mod
+++ b/tools/gendoc/go.mod
@@ -135,7 +135,7 @@ require (
github.com/pokt-network/smt v0.9.2 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
- github.com/prometheus/client_model v0.5.0 // indirect
+ github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum
index fc5db226f..8d9fbf916 100644
--- a/tools/gendoc/go.sum
+++ b/tools/gendoc/go.sum
@@ -551,8 +551,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
-github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
-github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
+github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
+github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
diff --git a/tools/genesis-snapshot/presets/presets.go b/tools/genesis-snapshot/presets/presets.go
index 8e01bd6ce..e6f6ccbe8 100644
--- a/tools/genesis-snapshot/presets/presets.go
+++ b/tools/genesis-snapshot/presets/presets.go
@@ -1,6 +1,7 @@
package presets
import (
+ "fmt"
"time"
"golang.org/x/crypto/blake2b"
@@ -23,14 +24,14 @@ var (
// use defaults from iota.go.
ProtocolParamsDocker = iotago.NewV3SnapshotProtocolParameters(
- iotago.WithNetworkOptions("docker", iotago.PrefixTestnet),
+ iotago.WithNetworkOptions(fmt.Sprintf("docker-%d", time.Now().Unix()), iotago.PrefixTestnet),
iotago.WithTimeProviderOptions(5, time.Now().Unix(), 10, 13),
iotago.WithLivenessOptions(10, 15, 3, 6, 8),
)
// use defaults from iota.go.
ProtocolParamsFeature = iotago.NewV3SnapshotProtocolParameters(
- iotago.WithNetworkOptions("feature", iotago.PrefixTestnet),
+ iotago.WithNetworkOptions(fmt.Sprintf("feature-%d", time.Now().Unix()), iotago.PrefixTestnet),
iotago.WithTimeProviderOptions(666666, time.Now().Unix()-100_000, 10, 13),
)
)