Skip to content

Commit

Permalink
Add Metadata Store to Extended Peerstore
Browse files Browse the repository at this point in the history
  • Loading branch information
Axel Kingsley authored and Axel Kingsley committed Dec 14, 2023
1 parent 6e371b4 commit f5f57c1
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 12 deletions.
13 changes: 13 additions & 0 deletions op-node/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"

"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)

Expand Down Expand Up @@ -355,10 +356,22 @@ func (n *NodeP2P) DiscoveryProcess(ctx context.Context, log log.Logger, cfg *rol
if err != nil {
continue
}

// record metadata to the peerstore if it is an extended peerstore
if eps, ok := pstore.(store.ExtendedPeerstore); ok {
_, err := eps.SetPeerMetadata(info.ID, store.PeerMetadata{
ENR: found.String(),
OPStackID: dat.chainID,
})
if err != nil {
log.Warn("failed to set peer metadata", "peer", info.ID, "err", err)
}
}
// We add the addresses to the peerstore, and update the address TTL.
//After that we stop using the address, assuming it may not be valid anymore (until we rediscover the node)
pstore.AddAddrs(info.ID, info.Addrs, discoveredAddrTTL)
_ = pstore.AddPubKey(info.ID, pub)

// Tag the peer, we'd rather have the connection manager prune away old peers,
// or peers on different chains, or anyone we have not seen via discovery.
// There is no tag score decay yet, so just set it to 42.
Expand Down
18 changes: 18 additions & 0 deletions op-node/p2p/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
Expand Down Expand Up @@ -333,6 +334,23 @@ func TestDiscovery(t *testing.T) {
peersOfB = append(peersOfB, c.RemotePeer())
}
}

// For each node, check that they have recorded metadata about the other nodes during discovery
for _, n1 := range []*NodeP2P{nodeA, nodeB, nodeC} {
eps, ok := n1.Host().Peerstore().(store.ExtendedPeerstore)
require.True(t, ok)
for _, n2 := range []*NodeP2P{nodeA, nodeB, nodeC} {
if n1 == n2 {
continue
}
md, err := eps.GetPeerMetadata(n2.Host().ID())
require.NoError(t, err)
// we don't scrutinize the ENR itself, just that it exists
require.NotEmpty(t, md.ENR)
require.Equal(t, uint64(901), md.OPStackID)
}
}

}

// Most tests should use mocknets instead of using the actual local host network
Expand Down
16 changes: 4 additions & 12 deletions op-node/p2p/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func dumpPeer(id peer.ID, nw network.Network, pstore peerstore.Peerstore, connMg
if dat, err := eps.GetPeerScores(id); err == nil {
info.PeerScores = dat
}
if md, err := eps.GetPeerMetadata(id); err == nil {
info.ENR = md.ENR
info.ChainID = md.OPStackID
}
}
if dat, err := pstore.Get(id, "ProtocolVersion"); err == nil {
protocolVersion, ok := dat.(string)
Expand All @@ -128,12 +132,6 @@ func dumpPeer(id peer.ID, nw network.Network, pstore peerstore.Peerstore, connMg
info.UserAgent = agentVersion
}
}
if dat, err := pstore.Get(id, "ENR"); err == nil {
enodeData, ok := dat.(*enode.Node)
if ok {
info.ENR = enodeData.String()
}
}
// include the /p2p/ address component in all of the addresses for convenience of the API user.
p2pAddrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: id, Addrs: pstore.Addrs(id)})
if err == nil {
Expand All @@ -152,12 +150,6 @@ func dumpPeer(id peer.ID, nw network.Network, pstore peerstore.Peerstore, connMg
info.Direction = c.Stat().Direction
break
}
if dat, err := pstore.Get(id, "optimismChainID"); err == nil {
chID, ok := dat.(uint64)
if ok {
info.ChainID = chID
}
}
info.Latency = pstore.LatencyEWMA(id)
if connMgr != nil {
info.Protected = connMgr.IsProtected(id, "")
Expand Down
10 changes: 10 additions & 0 deletions op-node/p2p/store/extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type extendedStore struct {
*scoreBook
*peerBanBook
*ipBanBook
*metadataBook
}

func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching, scoreRetention time.Duration) (ExtendedPeerstore, error) {
Expand All @@ -40,17 +41,26 @@ func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Cl
return nil, fmt.Errorf("create IP ban book: %w", err)
}
ib.startGC()
md, err := newMetadataBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create metadata book: %w", err)
}
md.startGC()
return &extendedStore{
Peerstore: ps,
CertifiedAddrBook: cab,
scoreBook: sb,
peerBanBook: pb,
ipBanBook: ib,
metadataBook: md,
}, nil
}

func (s *extendedStore) Close() error {
s.scoreBook.Close()
s.peerBanBook.Close()
s.ipBanBook.Close()
s.metadataBook.Close()
return s.Peerstore.Close()
}

Expand Down
8 changes: 8 additions & 0 deletions op-node/p2p/store/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,19 @@ type IPBanStore interface {
GetIPBanExpiration(ip net.IP) (time.Time, error)
}

type MetadataStore interface {
// SetPeerMetadata sets the metadata for the specified peer
SetPeerMetadata(id peer.ID, md PeerMetadata) (PeerMetadata, error)
// GetPeerMetadata returns the metadata for the specified peer
GetPeerMetadata(id peer.ID) (PeerMetadata, error)
}

// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
type ExtendedPeerstore interface {
peerstore.Peerstore
ScoreDatastore
peerstore.CertifiedAddrBook
PeerBanStore
IPBanStore
MetadataStore
}
98 changes: 98 additions & 0 deletions op-node/p2p/store/mdbook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package store

import (
"context"
"encoding/json"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peer"
)

const (
mdCacheSize = 100
mdRecordExpiration = time.Hour * 24 * 7
)

var metadataBase = ds.NewKey("/peers/md")

// LastUpdate requires atomic update operations. Use the helper functions SetLastUpdated and LastUpdated to modify and access this field.
type metadataRecord struct {
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
PeerMetadata PeerMetadata `json:"peerMetadata"`
}

type PeerMetadata struct {
ENR string `json:"enr"`
OPStackID uint64 `json:"opStackID"`
}

func (m *metadataRecord) SetLastUpdated(t time.Time) {
atomic.StoreInt64(&m.LastUpdate, t.Unix())
}

func (m *metadataRecord) LastUpdated() time.Time {
return time.Unix(atomic.LoadInt64(&m.LastUpdate), 0)
}

func (m *metadataRecord) MarshalBinary() (data []byte, err error) {
return json.Marshal(m)
}

func (m *metadataRecord) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, m)
}

type metadataBook struct {
book *recordsBook[peer.ID, *metadataRecord]
}

func newMetadataRecord() *metadataRecord {
return new(metadataRecord)
}

func newMetadataBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*metadataBook, error) {
book, err := newRecordsBook[peer.ID, *metadataRecord](ctx, logger, clock, store, mdCacheSize, mdRecordExpiration, metadataBase, newMetadataRecord, peerIDKey)
if err != nil {
return nil, err
}
return &metadataBook{book: book}, nil
}

func (m *metadataBook) startGC() {
m.book.startGC()
}

func (m *metadataBook) GetPeerMetadata(id peer.ID) (PeerMetadata, error) {
record, err := m.book.getRecord(id)
// If the record is not found, return an empty PeerMetadata
if err == UnknownRecordErr {
return PeerMetadata{}, nil
}
if err != nil {
return PeerMetadata{}, err
}
return record.PeerMetadata, nil
}

// Apply simply overwrites the record with the new one.
// presently, metadata is only collected during peering, so this is fine.
// if in the future this data can be updated or expanded, this function will need to be updated.
func (md *metadataRecord) Apply(rec *metadataRecord) {
*rec = *md
}

func (m *metadataBook) SetPeerMetadata(id peer.ID, md PeerMetadata) (PeerMetadata, error) {
rec := newMetadataRecord()
rec.PeerMetadata = md
rec.SetLastUpdated(m.book.clock.Now())
v, err := m.book.SetRecord(id, rec)
return v.PeerMetadata, err
}

func (m *metadataBook) Close() {
m.book.Close()
}

0 comments on commit f5f57c1

Please sign in to comment.