Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autopeering improvements #758

Merged
merged 19 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
73c12f7
Use a more explicit namespace for DHT discovery to reduce the chance …
alexsporn Feb 20, 2024
a0befb1
Use shrinking map in the p2p manager and clean up unused code
alexsporn Feb 20, 2024
a41223c
Stop AutoPeeringMgr on shutdown
alexsporn Feb 20, 2024
57c97a6
Fixed log format
alexsporn Feb 20, 2024
4676c52
Always create a distinct network name when creating the docker or fea…
alexsporn Feb 20, 2024
c5f1188
Add a boolean to the config to enable/disable autopeering
alexsporn Feb 20, 2024
9de27f3
Use p2p.bootstrapPeers in docker-network instead of p2p.peers since t…
alexsporn Feb 20, 2024
2f559dd
Let nodes wait for node-1 to be healthy before starting up in the doc…
alexsporn Feb 20, 2024
6d6cc9d
Better log formats
alexsporn Feb 20, 2024
12fb8d6
Reduce log spam for already connected peers when they are found using…
alexsporn Feb 20, 2024
af50e3a
Refactored manualpeering and autopeering to inside the P2PManager
alexsporn Feb 21, 2024
55bd202
Limit the amount of autopeering neighbors
alexsporn Feb 21, 2024
5c8af0b
Limit the peers to 3 per node in the docker network
alexsporn Feb 21, 2024
fc9b9f0
Stop advertising autopeering if we have enough peers, so that others …
alexsporn Feb 21, 2024
554b567
Add max peers to the log
alexsporn Feb 21, 2024
54d5b97
Merge branch 'develop' of github.com:iotaledger/iota-core into feat/a…
alexsporn Feb 21, 2024
1e68cf2
run gendoc
alexsporn Feb 21, 2024
47aa25a
Fix typo
muXxer Feb 21, 2024
ee4c2ae
Ignore errors during stream closing in the tests
muXxer Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions components/dashboard/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/iota-core/components/metricstracker"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/network/p2p"
"github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/protocol"
)

Expand Down Expand Up @@ -49,7 +49,7 @@ type dependencies struct {
Host host.Host
Protocol *protocol.Protocol
AppInfo *app.Info
P2PManager *p2p.Manager
NetworkManager network.Manager
MetricsTracker *metricstracker.MetricsTracker
}

Expand Down Expand Up @@ -171,28 +171,20 @@ func currentNodeStatus() *nodestatus {

func neighborMetrics() []neighbormetric {
var stats []neighbormetric
if deps.P2PManager == nil {
if deps.NetworkManager == nil {
return stats
}

// gossip plugin might be disabled
neighbors := deps.P2PManager.AllNeighbors()
neighbors := deps.NetworkManager.AllNeighbors()
if neighbors == nil {
return stats
}

for _, neighbor := range neighbors {
// origin := "Inbound"
// for _, p := range deps.P2PManager.AllNeighbors() {
// if neighbor.Peer == peer {
// origin = "Outbound"
// break
// }
// }

stats = append(stats, neighbormetric{
ID: neighbor.Peer.ID.String(),
Addresses: fmt.Sprintf("%s", neighbor.Peer.PeerAddresses),
ID: neighbor.Peer().ID.String(),
Addresses: fmt.Sprintf("%s", neighbor.Peer().PeerAddresses),
PacketsRead: neighbor.PacketsRead(),
PacketsWritten: neighbor.PacketsWritten(),
})
Expand Down
128 changes: 38 additions & 90 deletions components/p2p/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package p2p

import (
"context"
"fmt"
"path/filepath"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
Expand All @@ -21,11 +19,8 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
hivedb "github.com/iotaledger/hive.go/kvstore/database"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/network/autopeering"
"github.com/iotaledger/iota-core/pkg/network/manualpeering"
"github.com/iotaledger/iota-core/pkg/network/p2p"
"github.com/iotaledger/iota-core/pkg/protocol"
)
Expand All @@ -51,9 +46,7 @@ type dependencies struct {
dig.In
PeeringConfig *configuration.Configuration `name:"peeringConfig"`
PeeringConfigManager *p2p.ConfigManager
ManualPeeringMgr *manualpeering.Manager
AutoPeeringMgr *autopeering.Manager
P2PManager *p2p.Manager
NetworkManager network.Manager
PeerDB *network.DB
Protocol *protocol.Protocol
PeerDBKVSTore kvstore.KVStore `name:"peerDBKVStore"`
Expand All @@ -79,49 +72,6 @@ func initConfigParams(c *dig.Container) error {
}

func provide(c *dig.Container) error {
type manualPeeringDeps struct {
dig.In

P2PManager *p2p.Manager
}

if err := c.Provide(func(deps manualPeeringDeps) *manualpeering.Manager {
return manualpeering.NewManager(deps.P2PManager, Component.WorkerPool, Component.Logger)
}); err != nil {
return err
}

type autoPeeringDeps struct {
dig.In

Protocol *protocol.Protocol
P2PManager *p2p.Manager
Host host.Host
PeerDB *network.DB
}

if err := c.Provide(func(deps autoPeeringDeps) *autopeering.Manager {
peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers)
if err != nil {
Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err)
}

for _, multiAddr := range peersMultiAddresses {
bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr)
if err != nil {
Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err)
}

if err := deps.PeerDB.UpdatePeer(bootstrapPeer); err != nil {
Component.LogErrorf("Failed to update bootstrap peer: %s", err)
}
}

return autopeering.NewManager(deps.Protocol.LatestAPI().ProtocolParameters().NetworkName(), deps.P2PManager, deps.Host, deps.PeerDB, Component.Logger)
}); err != nil {
return err
}

type peerDatabaseResult struct {
dig.Out

Expand Down Expand Up @@ -251,7 +201,7 @@ func provide(c *dig.Container) error {
connManager, err := connmgr.NewConnManager(
ParamsP2P.ConnectionManager.LowWatermark,
ParamsP2P.ConnectionManager.HighWatermark,
connmgr.WithGracePeriod(time.Minute),
connmgr.WithEmergencyTrim(true),
)
if err != nil {
Component.LogPanicf("unable to initialize connection manager: %s", err)
Expand All @@ -263,6 +213,7 @@ func provide(c *dig.Container) error {
libp2p.Transport(tcp.NewTCPTransport),
libp2p.ConnectionManager(connManager),
libp2p.NATPortMap(),
libp2p.DisableRelay(),
// Define a custom address factory to inject external addresses to the DHT advertisements.
libp2p.AddrsFactory(func() func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr {
var externalMultiAddrs []multiaddr.Multiaddr
Expand Down Expand Up @@ -294,8 +245,31 @@ func provide(c *dig.Container) error {
Component.LogPanic(err.Error())
}

return c.Provide(func(host host.Host, peerDB *network.DB) *p2p.Manager {
return p2p.NewManager(host, peerDB, Component.Logger)
type p2pManagerDeps struct {
dig.In
Host host.Host
PeerDB *network.DB
}

return c.Provide(func(inDeps p2pManagerDeps) network.Manager {

peersMultiAddresses, err := getMultiAddrsFromString(ParamsPeers.BootstrapPeers)
if err != nil {
Component.LogFatalf("Failed to parse bootstrapPeers param: %s", err)
}

for _, multiAddr := range peersMultiAddresses {
bootstrapPeer, err := network.NewPeerFromMultiAddr(multiAddr)
if err != nil {
Component.LogFatalf("Failed to parse bootstrap peer multiaddress: %s", err)
}

if err := inDeps.PeerDB.UpdatePeer(bootstrapPeer); err != nil {
Component.LogErrorf("Failed to update bootstrap peer: %s", err)
}
}

return p2p.NewManager(inDeps.Host, inDeps.PeerDB, ParamsP2P.Autopeering.MaxPeers, Component.Logger)
})
}

Expand All @@ -321,53 +295,27 @@ func configure() error {
}

// log the p2p events
deps.P2PManager.Events.NeighborAdded.Hook(func(neighbor *p2p.Neighbor) {
Component.LogInfof("Neighbor added: %s / %s", neighbor.PeerAddresses, neighbor.ID)
}, event.WithWorkerPool(Component.WorkerPool))
deps.NetworkManager.OnNeighborAdded(func(neighbor network.Neighbor) {
Component.LogInfof("neighbor added: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID)
})

deps.P2PManager.Events.NeighborRemoved.Hook(func(neighbor *p2p.Neighbor) {
Component.LogInfof("Neighbor removed: %s / %s", neighbor.PeerAddresses, neighbor.ID)
}, event.WithWorkerPool(Component.WorkerPool))
deps.NetworkManager.OnNeighborRemoved(func(neighbor network.Neighbor) {
Component.LogInfof("neighbor removed: %s / %s", neighbor.Peer().PeerAddresses, neighbor.Peer().ID)
})

return nil
}

func run() error {
if err := Component.Daemon().BackgroundWorker(Component.Name, func(ctx context.Context) {
deps.ManualPeeringMgr.Start()

defer func() {
if err := deps.ManualPeeringMgr.Stop(); err != nil {
Component.LogErrorf("Failed to stop the manager: %s", err)
}
}()
defer deps.NetworkManager.Shutdown()

if ParamsP2P.Autopeering.Enabled {
if err := deps.AutoPeeringMgr.Start(ctx); err != nil {
Component.LogFatalf("Failed to start autopeering manager: %s", err)
}

defer func() {
if err := deps.AutoPeeringMgr.Stop(); err != nil {
Component.LogErrorf("Failed to stop autopeering manager: %s", err)
}
}()
if err := deps.NetworkManager.Start(ctx, deps.Protocol.LatestAPI().ProtocolParameters().NetworkName()); err != nil {
Component.LogFatalf("Failed to start p2p manager: %s", err)
}

//nolint:contextcheck // false positive
connectConfigKnownPeers()
<-ctx.Done()
}, daemon.PriorityManualPeering); err != nil {
Component.LogFatalf("Failed to start as daemon: %s", err)
}

if err := Component.Daemon().BackgroundWorker(fmt.Sprintf("%s-P2PManager", Component.Name), func(ctx context.Context) {
defer deps.P2PManager.Shutdown()
defer func() {
if err := deps.P2PManager.P2PHost().Close(); err != nil {
Component.LogWarnf("Failed to close libp2p host: %+v", err)
}
}()

<-ctx.Done()
}, daemon.PriorityP2P); err != nil {
Expand Down Expand Up @@ -405,7 +353,7 @@ func connectConfigKnownPeers() {
Component.LogPanicf("invalid peer address info: %s", err)
}

if err := deps.ManualPeeringMgr.AddPeers(multiAddr); err != nil {
if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil {
Component.LogInfof("failed to add peer: %s, error: %s", multiAddr.String(), err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion components/p2p/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ParametersP2P struct {
IdentityPrivateKey string `default:"" usage:"private key used to derive the node identity (optional)"`

Autopeering struct {
Enabled bool `default:"true" usage:"enable or disable autopeering"`
MaxPeers int `default:"5" usage:"the max number of autopeer connections. Set to 0 to disable autopeering."`
}

Database struct {
Expand Down
6 changes: 3 additions & 3 deletions components/protocol/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/daemon"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/network/p2p"
"github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/protocol"
"github.com/iotaledger/iota-core/pkg/protocol/engine/attestation/slotattestation"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter"
Expand Down Expand Up @@ -115,7 +115,7 @@ func provide(c *dig.Container) error {

DatabaseEngine hivedb.Engine `name:"databaseEngine"`
ProtocolParameters []iotago.ProtocolParameters
P2PManager *p2p.Manager
NetworkManager network.Manager
}

return c.Provide(func(deps protocolDeps) *protocol.Protocol {
Expand All @@ -132,7 +132,7 @@ func provide(c *dig.Container) error {
return protocol.New(
Component.Logger,
workerpool.NewGroup("Protocol"),
deps.P2PManager,
deps.NetworkManager,
protocol.WithBaseDirectory(ParamsDatabase.Path),
protocol.WithStorageOptions(
storage.WithDBEngine(deps.DatabaseEngine),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pokt-network/smt v0.9.2 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
Expand Down
4 changes: 0 additions & 4 deletions pkg/daemon/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ package daemon

const (
PriorityCloseDatabase = iota // no dependencies
PriorityPeerDatabase
PriorityP2P
PriorityManualPeering
PriorityProtocol
PriorityBlockIssuer
PriorityActivity // depends on BlockIssuer
PriorityRestAPI
PriorityINX
PriorityDashboardMetrics
Expand Down
16 changes: 16 additions & 0 deletions pkg/network/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package network

import "github.com/iotaledger/hive.go/ierrors"

var (
// ErrNotRunning is returned when a peer is added to a stopped or not yet started network manager.
ErrNotRunning = ierrors.New("manager not running")
// ErrUnknownPeer is returned when the specified peer is not known to the network manager.
ErrUnknownPeer = ierrors.New("unknown neighbor")
// ErrLoopbackPeer is returned when the own peer is added.
ErrLoopbackPeer = ierrors.New("loopback connection not allowed")
// ErrDuplicatePeer is returned when the same peer is added more than once.
ErrDuplicatePeer = ierrors.New("already connected")
// ErrMaxAutopeeringPeersReached is returned when the maximum number of autopeering peers is reached.
ErrMaxAutopeeringPeersReached = ierrors.New("max autopeering peers reached")
)
33 changes: 33 additions & 0 deletions pkg/network/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package network

import (
"context"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/iotaledger/hive.go/runtime/event"
)

type Manager interface {
Endpoint

DialPeer(context.Context, *Peer) error

OnNeighborAdded(func(Neighbor)) *event.Hook[func(Neighbor)]
OnNeighborRemoved(func(Neighbor)) *event.Hook[func(Neighbor)]

AllNeighbors() []Neighbor
AutopeeringNeighborsCount() int

DropNeighbor(peer.ID) error
NeighborExists(peer.ID) bool

P2PHost() host.Host

Start(ctx context.Context, networkID string) error
Shutdown()

AddManualPeers(...multiaddr.Multiaddr) error
}
7 changes: 7 additions & 0 deletions pkg/network/neighbor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package network

type Neighbor interface {
Peer() *Peer
PacketsRead() uint64
PacketsWritten() uint64
}
Loading