Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autopeering improvements #758

Merged
merged 19 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
73c12f7
Use a more explicit namespace for DHT discovery to reduce the chance …
alexsporn Feb 20, 2024
a0befb1
Use shrinking map in the p2p manager and clean up unused code
alexsporn Feb 20, 2024
a41223c
Stop AutoPeeringMgr on shutdown
alexsporn Feb 20, 2024
57c97a6
Fixed log format
alexsporn Feb 20, 2024
4676c52
Always create a distinct network name when creating the docker or fea…
alexsporn Feb 20, 2024
c5f1188
Add a boolean to the config to enable/disable autopeering
alexsporn Feb 20, 2024
9de27f3
Use p2p.bootstrapPeers in docker-network instead of p2p.peers since t…
alexsporn Feb 20, 2024
2f559dd
Let nodes wait for node-1 to be healthy before starting up in the doc…
alexsporn Feb 20, 2024
6d6cc9d
Better log formats
alexsporn Feb 20, 2024
12fb8d6
Reduce log spam for already connected peers when they are found using…
alexsporn Feb 20, 2024
af50e3a
Refactored manualpeering and autopeering to inside the P2PManager
alexsporn Feb 21, 2024
55bd202
Limit the amount of autopeering neighbors
alexsporn Feb 21, 2024
5c8af0b
Limit the peers to 3 per node in the docker network
alexsporn Feb 21, 2024
fc9b9f0
Stop advertising autopeering if we have enough peers, so that others …
alexsporn Feb 21, 2024
554b567
Add max peers to the log
alexsporn Feb 21, 2024
54d5b97
Merge branch 'develop' of github.com:iotaledger/iota-core into feat/a…
alexsporn Feb 21, 2024
1e68cf2
run gendoc
alexsporn Feb 21, 2024
47aa25a
Fix typo
muXxer Feb 21, 2024
ee4c2ae
Ignore errors during stream closing in the tests
muXxer Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions components/dashboard/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/iota-core/components/metricstracker"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/network/p2p"
"github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/protocol"
)

Expand Down Expand Up @@ -49,7 +49,7 @@ type dependencies struct {
Host host.Host
Protocol *protocol.Protocol
AppInfo *app.Info
P2PManager *p2p.Manager
NetworkManager network.Manager
MetricsTracker *metricstracker.MetricsTracker
}

Expand Down Expand Up @@ -171,28 +171,20 @@ func currentNodeStatus() *nodestatus {

func neighborMetrics() []neighbormetric {
var stats []neighbormetric
if deps.P2PManager == nil {
if deps.NetworkManager == nil {
return stats
}

// gossip plugin might be disabled
neighbors := deps.P2PManager.AllNeighbors()
neighbors := deps.NetworkManager.AllNeighbors()
if neighbors == nil {
return stats
}

for _, neighbor := range neighbors {
// origin := "Inbound"
// for _, p := range deps.P2PManager.AllNeighbors() {
// if neighbor.Peer == peer {
// origin = "Outbound"
// break
// }
// }

stats = append(stats, neighbormetric{
ID: neighbor.Peer.ID.String(),
Addresses: fmt.Sprintf("%s", neighbor.Peer.PeerAddresses),
ID: neighbor.Peer().ID.String(),
Addresses: fmt.Sprintf("%s", neighbor.Peer().PeerAddresses),
PacketsRead: neighbor.PacketsRead(),
PacketsWritten: neighbor.PacketsWritten(),
})
Expand Down
120 changes: 39 additions & 81 deletions components/p2p/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package p2p

import (
"context"
"fmt"
"path/filepath"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
Expand All @@ -21,11 +19,8 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
hivedb "github.com/iotaledger/hive.go/kvstore/database"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/network/autopeering"
"github.com/iotaledger/iota-core/pkg/network/manualpeering"
"github.com/iotaledger/iota-core/pkg/network/p2p"
"github.com/iotaledger/iota-core/pkg/protocol"
)
Expand All @@ -51,9 +46,7 @@ type dependencies struct {
dig.In
PeeringConfig *configuration.Configuration `name:"peeringConfig"`
PeeringConfigManager *p2p.ConfigManager
ManualPeeringMgr *manualpeering.Manager
AutoPeeringMgr *autopeering.Manager
P2PManager *p2p.Manager
NetworkManager network.Manager
PeerDB *network.DB
Protocol *protocol.Protocol
PeerDBKVSTore kvstore.KVStore `name:"peerDBKVStore"`
Expand All @@ -79,49 +72,6 @@ func initConfigParams(c *dig.Container) error {
}

func provide(c *dig.Container) error {
type manualPeeringDeps struct {
dig.In

P2PManager *p2p.Manager
}

if err := c.Provide(func(deps manualPeeringDeps) *manualpeering.Manager {
return manualpeering.NewManager(deps.P2PManager, Component.WorkerPool, Component.Logger)
}); err != nil {
return err
}

type autoPeeringDeps struct {
dig.In

Protocol *protocol.Protocol
P2PManager *p2p.Manager
Host host.Host
PeerDB *network.DB
}

if err := c.Provide(func(deps autoPeeringDeps) *autopeering.Manager {
peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers)
if err != nil {
Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err)
}

for _, multiAddr := range peersMultiAddresses {
bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr)
if err != nil {
Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err)
}

if err := deps.PeerDB.UpdatePeer(bootstrapPeer); err != nil {
Component.LogErrorf("Failed to update bootstrap peer: %s", err)
}
}

return autopeering.NewManager(deps.Protocol.LatestAPI().ProtocolParameters().NetworkName(), deps.P2PManager, deps.Host, deps.PeerDB, Component.Logger)
}); err != nil {
return err
}

type peerDatabaseResult struct {
dig.Out

Expand Down Expand Up @@ -251,7 +201,7 @@ func provide(c *dig.Container) error {
connManager, err := connmgr.NewConnManager(
ParamsP2P.ConnectionManager.LowWatermark,
ParamsP2P.ConnectionManager.HighWatermark,
connmgr.WithGracePeriod(time.Minute),
connmgr.WithEmergencyTrim(true),
)
if err != nil {
Component.LogPanicf("unable to initialize connection manager: %s", err)
Expand All @@ -263,6 +213,7 @@ func provide(c *dig.Container) error {
libp2p.Transport(tcp.NewTCPTransport),
libp2p.ConnectionManager(connManager),
libp2p.NATPortMap(),
libp2p.DisableRelay(),
// Define a custom address factory to inject external addresses to the DHT advertisements.
libp2p.AddrsFactory(func() func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr {
var externalMultiAddrs []multiaddr.Multiaddr
Expand Down Expand Up @@ -294,8 +245,31 @@ func provide(c *dig.Container) error {
Component.LogPanic(err.Error())
}

return c.Provide(func(host host.Host, peerDB *network.DB) *p2p.Manager {
return p2p.NewManager(host, peerDB, Component.Logger)
type p2pManagerDeps struct {
dig.In
Host host.Host
PeerDB *network.DB
}

return c.Provide(func(inDeps p2pManagerDeps) network.Manager {

peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers)
if err != nil {
Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err)
}

for _, multiAddr := range peersMultiAddresses {
bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr)
if err != nil {
Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err)
}

if err := inDeps.PeerDB.UpdatePeer(bootstrapPeer); err != nil {
Component.LogErrorf("Failed to update bootstrap peer: %s", err)
}
}

return p2p.NewManager(inDeps.Host, inDeps.PeerDB, ParamsP2P.Autopeering.MaxPeers, Component.Logger)
})
}

Expand All @@ -321,43 +295,27 @@ func configure() error {
}

// log the p2p events
deps.P2PManager.Events.NeighborAdded.Hook(func(neighbor *p2p.Neighbor) {
Component.LogInfof("Neighbor added: %s / %s", neighbor.PeerAddresses, neighbor.ID)
}, event.WithWorkerPool(Component.WorkerPool))
deps.NetworkManager.OnNeighborAdded(func(neighbor network.Neighbor) {
Component.LogInfof("neighbor added: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID)
})

deps.P2PManager.Events.NeighborRemoved.Hook(func(neighbor *p2p.Neighbor) {
Component.LogInfof("Neighbor removed: %s / %s", neighbor.PeerAddresses, neighbor.ID)
}, event.WithWorkerPool(Component.WorkerPool))
deps.NetworkManager.OnNeighborRemoved(func(neighbor network.Neighbor) {
Component.LogInfof("neighbor removed: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID)
})

return nil
}

func run() error {
if err := Component.Daemon().BackgroundWorker(Component.Name, func(ctx context.Context) {
deps.ManualPeeringMgr.Start()
if err := deps.AutoPeeringMgr.Start(ctx); err != nil {
Component.LogFatalf("Failed to start autopeering manager: %s", err)
defer deps.NetworkManager.Shutdown()

if err := deps.NetworkManager.Start(ctx, deps.Protocol.LatestAPI().ProtocolParameters().NetworkName()); err != nil {
Component.LogFatalf("Failed to start p2p manager: %s", err)
}

defer func() {
if err := deps.ManualPeeringMgr.Stop(); err != nil {
Component.LogErrorf("Failed to stop the manager", "err", err)
}
}()
//nolint:contextcheck // false positive
connectConfigKnownPeers()
<-ctx.Done()
}, daemon.PriorityManualPeering); err != nil {
Component.LogFatalf("Failed to start as daemon: %s", err)
}

if err := Component.Daemon().BackgroundWorker(fmt.Sprintf("%s-P2PManager", Component.Name), func(ctx context.Context) {
defer deps.P2PManager.Shutdown()
defer func() {
if err := deps.P2PManager.P2PHost().Close(); err != nil {
Component.LogWarnf("Failed to close libp2p host: %+v", err)
}
}()

<-ctx.Done()
}, daemon.PriorityP2P); err != nil {
Expand Down Expand Up @@ -395,7 +353,7 @@ func connectConfigKnownPeers() {
Component.LogPanicf("invalid peer address info: %s", err)
}

if err := deps.ManualPeeringMgr.AddPeers(multiAddr); err != nil {
if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil {
Component.LogInfof("failed to add peer: %s, error: %s", multiAddr.String(), err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions components/p2p/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type ParametersP2P struct {
// Defines the private key used to derive the node identity (optional).
IdentityPrivateKey string `default:"" usage:"private key used to derive the node identity (optional)"`

Autopeering struct {
MaxPeers int `default:"5" usage:"the max number of autopeer connections. Set to 0 to disable autopeering."`
}

Database struct {
// Defines the path to the p2p database.
Path string `default:"testnet/p2pstore" usage:"the path to the p2p database"`
Expand Down
6 changes: 3 additions & 3 deletions components/protocol/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/network/p2p"
"github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/protocol"
"github.com/iotaledger/iota-core/pkg/protocol/engine/attestation/slotattestation"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter"
Expand Down Expand Up @@ -115,7 +115,7 @@ func provide(c *dig.Container) error {

DatabaseEngine hivedb.Engine `name:"databaseEngine"`
ProtocolParameters []iotago.ProtocolParameters
P2PManager *p2p.Manager
NetworkManager network.Manager
}

return c.Provide(func(deps protocolDeps) *protocol.Protocol {
Expand All @@ -132,7 +132,7 @@ func provide(c *dig.Container) error {
return protocol.New(
Component.Logger,
workerpool.NewGroup("Protocol"),
deps.P2PManager,
deps.NetworkManager,
protocol.WithBaseDirectory(ParamsDatabase.Path),
protocol.WithStorageOptions(
storage.WithDBEngine(deps.DatabaseEngine),
Expand Down
3 changes: 3 additions & 0 deletions config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
},
"externalMultiAddresses": [],
"identityPrivateKey": "",
"autopeering": {
"maxPeers": 5
},
"db": {
"path": "testnet/p2pstore"
}
Expand Down
10 changes: 10 additions & 0 deletions documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Example:
| [connectionManager](#p2p_connectionmanager) | Configuration for connectionManager | object | |
| externalMultiAddresses | External reacheable multi addresses advertised to the network | array | |
| identityPrivateKey | Private key used to derive the node identity (optional) | string | "" |
| [autopeering](#p2p_autopeering) | Configuration for autopeering | object | |
| [db](#p2p_db) | Configuration for db | object | |

### <a id="p2p_connectionmanager"></a> ConnectionManager
Expand All @@ -108,6 +109,12 @@ Example:
| highWatermark | The threshold up on which connections count truncates to the lower watermark | int | 10 |
| lowWatermark | The minimum connections count to hold after the high watermark was reached | int | 5 |

### <a id="p2p_autopeering"></a> Autopeering

| Name | Description | Type | Default value |
| -------- | ------------------------------------------------------------------------ | ---- | ------------- |
| maxPeers | The max number of autopeer connections. Set to 0 to disable autopeering. | int | 5 |

### <a id="p2p_db"></a> Db

| Name | Description | Type | Default value |
Expand All @@ -129,6 +136,9 @@ Example:
},
"externalMultiAddresses": [],
"identityPrivateKey": "",
"autopeering": {
"maxPeers": 5
},
"db": {
"path": "testnet/p2pstore"
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pokt-network/smt v0.9.2 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
Expand Down
4 changes: 0 additions & 4 deletions pkg/daemon/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ package daemon

const (
PriorityCloseDatabase = iota // no dependencies
PriorityPeerDatabase
PriorityP2P
PriorityManualPeering
PriorityProtocol
PriorityBlockIssuer
PriorityActivity // depends on BlockIssuer
PriorityRestAPI
PriorityINX
PriorityDashboardMetrics
Expand Down
Loading
Loading