Skip to content

Commit

Permalink
p2psim: Add network simulation for DHT
Browse files Browse the repository at this point in the history
  • Loading branch information
sonhv0212 committed Dec 20, 2024
1 parent 5e305f1 commit 7ad48b5
Show file tree
Hide file tree
Showing 10 changed files with 1,016 additions and 33 deletions.
433 changes: 433 additions & 0 deletions cmd/p2psim/main.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,20 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return t, nil
}

func (t *UDPv4) NodesInDHT() [][]enode.Node {
if t == nil || t.tab == nil {
return nil
}
nodes := make([][]enode.Node, len(t.tab.buckets))
for i, bucket := range t.tab.buckets {
nodes[i] = make([]enode.Node, len(bucket.entries))
for j, entry := range bucket.entries {
nodes[i][j] = entry.Node
}
}
return nodes
}

// Self returns the local node.
func (t *UDPv4) Self() *enode.Node {
return t.localNode.Node()
Expand Down
9 changes: 9 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ func (c *conn) set(f connFlag, val bool) {
}
}

// SetListenFunc sets the function used to accept inbound connections (testing only)
func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) {
srv.listenFunc = f
}

func (srv *Server) UDPv4() *discover.UDPv4 {
return srv.ntab
}

// LocalNode returns the local node record.
func (srv *Server) LocalNode() *enode.LocalNode {
return srv.localnode
Expand Down
15 changes: 15 additions & 0 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,21 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
}

// Empty PeerStats
func (n *ExecNode) PeerStats() *PeerStats {
return &PeerStats{}
}

// Empty DHT
func (n *ExecNode) NodesInDHT() [][]enode.Node {
return nil
}

// Empty PeersInfo
func (n *ExecNode) PeersInfo() []*p2p.PeerInfo {
return nil
}

// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
Expand Down
210 changes: 202 additions & 8 deletions p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"net"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/simulations/pipes"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
)
Expand Down Expand Up @@ -91,28 +96,80 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
return nil, err
}

p2pCfg := p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: config.MaxPeers,
NoDiscovery: config.NoDiscovery,
EnableMsgEvents: config.EnableMsgEvents,
DHTBucketSize: config.DHTBucketSize,
}
if !config.DisableTCPListener {
p2pCfg.ListenAddr = fmt.Sprintf(":%d", config.Port)
} else {
p2pCfg.ListenAddr = ""
}
if len(config.BootstrapNodeURLs) > 0 {
for _, url := range strings.Split(config.BootstrapNodeURLs, ",") {
if len(url) == 0 {
continue
}
n, err := enode.Parse(enode.ValidSchemes, url)
if err != nil {
log.Warn("invalid bootstrap node URL", "url", url, "err", err)
continue
}
p2pCfg.BootstrapNodes = append(p2pCfg.BootstrapNodes, n)
}
}

n, err := node.New(&node.Config{
P2P: p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: math.MaxInt32,
NoDiscovery: true,
Dialer: s,
EnableMsgEvents: config.EnableMsgEvents,
},
P2P: p2pCfg,
ExternalSigner: config.ExternalSigner,
Logger: log.New("node.id", id.String()),
})
if err != nil {
return nil, err
}

if config.UseFakeIPListener {
n.Server().SetListenFunc(listenFakeAddrFunc)
}

simNode := &SimNode{
ID: id,
config: config,
node: n,
adapter: s,
running: make(map[string]node.Lifecycle),
}
if !config.UseTCPDialer {
n.Server().Dialer = s
} else {
simNode.dialer = &wrapTCPDialerStats{
d: &net.Dialer{Timeout: 15 * time.Second},
resultCh: make(chan resultDial, 10000),
}
n.Server().Dialer = simNode.dialer
}

if config.EnableENRFilter {
n.Server().SetFilter(func(id forkid.ID) error {
var eth struct {
ForkID forkid.ID
Rest []rlp.RawValue `rlp:"tail"`
}
if err := n.Server().Self().Record().Load(enr.WithEntry("eth", &eth)); err != nil {
log.Warn("failed to load eth entry", "err", err)
return err
}

if id == eth.ForkID {
return nil
}
return forkid.ErrLocalIncompatibleOrStale
})
}

s.nodes[id] = simNode
return simNode, nil
}
Expand Down Expand Up @@ -162,6 +219,8 @@ func (s *SimAdapter) GetNode(id enode.ID) (*SimNode, bool) {
// net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that
// pipe
type SimNode struct {
ctx context.Context
cancel context.CancelFunc
lock sync.RWMutex
ID enode.ID
config *NodeConfig
Expand All @@ -170,6 +229,11 @@ type SimNode struct {
running map[string]node.Lifecycle
client *rpc.Client
registerOnce sync.Once
dialer *wrapTCPDialerStats

// Track different nodes discovered by the node
discoveredNodes sync.Map
differentNodeCount int
}

// Close closes the underlaying node.Node to release
Expand Down Expand Up @@ -240,6 +304,15 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) {

// Start registers the services and starts the underlying devp2p node
func (sn *SimNode) Start(snapshots map[string][]byte) error {
sn.lock.Lock()
if sn.cancel != nil {
sn.lock.Unlock()
return errors.New("node already started")
}

sn.ctx, sn.cancel = context.WithCancel(context.Background())
sn.lock.Unlock()

// ensure we only register the services once in the case of the node
// being stopped and then started again
var regErr error
Expand Down Expand Up @@ -282,6 +355,8 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error {
sn.client = client
sn.lock.Unlock()

go sn.trackDiscoveredNode()

return nil
}

Expand All @@ -292,6 +367,10 @@ func (sn *SimNode) Stop() error {
sn.client.Close()
sn.client = nil
}
if sn.cancel != nil {
sn.cancel()
sn.cancel = nil
}
sn.lock.Unlock()
return sn.node.Close()
}
Expand Down Expand Up @@ -351,3 +430,118 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo {
}
return server.NodeInfo()
}

// PeerStats returns statistics about the node's peers
func (sn *SimNode) PeerStats() *PeerStats {
if sn.dialer == nil || sn.node.Server() == nil || sn.node.Server().UDPv4() == nil {
return &PeerStats{}
}

nodesCount := 0
sn.discoveredNodes.Range(func(_, _ interface{}) bool {
nodesCount++
return true
})
buckets := sn.node.Server().UDPv4().NodesInDHT()
bucketSizes := make([]int, len(buckets))
for i, bucket := range buckets {
bucketSizes[i] = len(bucket)
}
return &PeerStats{
PeerCount: sn.node.Server().PeerCount(),
Failed: sn.dialer.failed,
Tried: sn.dialer.tried,
DifferentNodesDiscovered: nodesCount,
DHTBuckets: bucketSizes,
}
}

// NodesInDHT returns the nodes in the DHT buckets
func (sn *SimNode) NodesInDHT() [][]enode.Node {
if sn.node.Server() == nil || sn.node.Server().UDPv4() == nil {
return nil
}
return sn.node.Server().UDPv4().NodesInDHT()
}

// PeersInfo returns information about the node's peers
func (sn *SimNode) PeersInfo() []*p2p.PeerInfo {
if sn.node.Server() == nil {
return nil
}
return sn.node.Server().PeersInfo()
}

func (sn *SimNode) trackDiscoveredNode() {
if sn.dialer == nil {
return
}

for {
select {
case <-sn.ctx.Done():
return
case r := <-sn.dialer.resultCh:
if _, ok := sn.discoveredNodes.LoadOrStore(r.node, struct{}{}); !ok {
sn.differentNodeCount++
}
if r.err != nil {
log.Info("dial failed", "node", r.node, "err", r.err)
sn.dialer.failed++
}
log.Info("dial tried", "from", sn.ID, "to", r.node)
sn.dialer.tried++
}
}
}

func listenFakeAddrFunc(network, laddr string) (net.Listener, error) {
l, err := net.Listen(network, laddr)
if err != nil {
return nil, err
}
fakeAddr := &net.TCPAddr{IP: net.IP{byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))}, Port: rand.Intn(65535)}
return &fakeAddrListener{l, fakeAddr}, nil
}

// fakeAddrListener is a listener that creates connections with a mocked remote address.
type fakeAddrListener struct {
net.Listener
remoteAddr net.Addr
}

type fakeAddrConn struct {
net.Conn
remoteAddr net.Addr
}

func (l *fakeAddrListener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return &fakeAddrConn{c, l.remoteAddr}, nil
}

func (c *fakeAddrConn) RemoteAddr() net.Addr {
return c.remoteAddr
}

type wrapTCPDialerStats struct {
d *net.Dialer
failed int
tried int
resultCh chan resultDial
}

type resultDial struct {
err error
node enode.ID
}

func (d wrapTCPDialerStats) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
nodeAddr := &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}
conn, err := d.d.DialContext(ctx, "tcp", nodeAddr.String())
d.resultCh <- resultDial{err, dest.ID()}
return conn, err
}
Loading

0 comments on commit 7ad48b5

Please sign in to comment.