Skip to content

Commit

Permalink
feat:add contexts to all peerstore methods (#2312)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Jun 3, 2023
1 parent e89814c commit 8864d1c
Show file tree
Hide file tree
Showing 61 changed files with 757 additions and 740 deletions.
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"crypto/rand"
"errors"
"fmt"
Expand Down Expand Up @@ -150,10 +151,10 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return nil, err
}

if err := cfg.Peerstore.AddPrivKey(pid, cfg.PeerKey); err != nil {
if err := cfg.Peerstore.AddPrivKey(context.Background(), pid, cfg.PeerKey); err != nil {
return nil, err
}
if err := cfg.Peerstore.AddPubKey(pid, cfg.PeerKey.GetPublic()); err != nil {
if err := cfg.Peerstore.AddPubKey(context.Background(), pid, cfg.PeerKey.GetPublic()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -194,7 +195,7 @@ func (cfg *Config) addTransports(h host.Host) error {
fx.Supply(cfg.Muxers),
fx.Supply(h.ID()),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(context.Background(), h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
Expand Down
6 changes: 4 additions & 2 deletions core/peerstore/helpers.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package peerstore

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"
)

// AddrInfos returns an AddrInfo for each specified peer ID, in-order.
func AddrInfos(ps Peerstore, peers []peer.ID) []peer.AddrInfo {
func AddrInfos(ctx context.Context, ps Peerstore, peers []peer.ID) []peer.AddrInfo {
pi := make([]peer.AddrInfo, len(peers))
for i, p := range peers {
pi[i] = ps.PeerInfo(p)
pi[i] = ps.PeerInfo(ctx, p)
}
return pi
}
58 changes: 29 additions & 29 deletions core/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ type Peerstore interface {
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(peer.ID) peer.AddrInfo
PeerInfo(context.Context, peer.ID) peer.AddrInfo

// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
Peers(context.Context) peer.IDSlice
}

// PeerMetadata can handle values of any type. Serializing values is
Expand All @@ -77,47 +77,47 @@ type PeerMetadata interface {
// Get / Put is a simple registry for other peer-related key/value pairs.
// If we find something we use often, it should become its own set of
// methods. This is a last resort.
Get(p peer.ID, key string) (interface{}, error)
Put(p peer.ID, key string, val interface{}) error
Get(ctx context.Context, p peer.ID, key string) (interface{}, error)
Put(ctx context.Context, p peer.ID, key string, val interface{}) error

// RemovePeer removes all values stored for a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}

// AddrBook holds the multiaddrs of peers.
type AddrBook interface {
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
AddAddr(context.Context, peer.ID, ma.Multiaddr, time.Duration)

// AddAddrs gives this AddrBook addresses to use, with a given ttl
// (time-to-live), after which the address is no longer valid.
// If the manager has a longer TTL, the operation is a no-op for that address
AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
AddAddrs(context.Context, peer.ID, []ma.Multiaddr, time.Duration)

// SetAddr calls mgr.SetAddrs(p, addr, ttl)
SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
SetAddr(context.Context, peer.ID, ma.Multiaddr, time.Duration)

// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
SetAddrs(context.Context, peer.ID, []ma.Multiaddr, time.Duration)

// UpdateAddrs updates the addresses associated with the given peer that have
// the given oldTTL to have the given newTTL.
UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration)
UpdateAddrs(context.Context, peer.ID, time.Duration, time.Duration)

// Addrs returns all known (and valid) addresses for a given peer.
Addrs(p peer.ID) []ma.Multiaddr
Addrs(context.Context, peer.ID) []ma.Multiaddr

// AddrStream returns a channel that gets all addresses for a given
// peer sent on it. If new addresses are added after the call is made
// they will be sent along through the channel as well.
AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr

// ClearAddresses removes all previously stored addresses.
ClearAddrs(p peer.ID)
ClearAddrs(context.Context, peer.ID)

// PeersWithAddrs returns all of the peer IDs stored in the AddrBook.
PeersWithAddrs() peer.IDSlice
PeersWithAddrs(context.Context) peer.IDSlice
}

// CertifiedAddrBook manages "self-certified" addresses for remote peers.
Expand Down Expand Up @@ -172,12 +172,12 @@ type CertifiedAddrBook interface {
// AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used
// to update the TTL of certified addresses that have previously been
// added via ConsumePeerRecord.
ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error)
ConsumePeerRecord(context.Context, *record.Envelope, time.Duration) (accepted bool, err error)

// GetPeerRecord returns a Envelope containing a PeerRecord for the
// given peer id, if one exists.
// Returns nil if no signed PeerRecord exists for the peer.
GetPeerRecord(p peer.ID) *record.Envelope
GetPeerRecord(context.Context, peer.ID) *record.Envelope
}

// GetCertifiedAddrBook is a helper to "upcast" an AddrBook to a
Expand All @@ -196,24 +196,24 @@ func GetCertifiedAddrBook(ab AddrBook) (cab CertifiedAddrBook, ok bool) {
// KeyBook tracks the keys of Peers.
type KeyBook interface {
// PubKey stores the public key of a peer.
PubKey(peer.ID) ic.PubKey
PubKey(context.Context, peer.ID) ic.PubKey

// AddPubKey stores the public key of a peer.
AddPubKey(peer.ID, ic.PubKey) error
AddPubKey(context.Context, peer.ID, ic.PubKey) error

// PrivKey returns the private key of a peer, if known. Generally this might only be our own
// private key, see
// https://discuss.libp2p.io/t/what-is-the-purpose-of-having-map-peer-id-privatekey-in-peerstore/74.
PrivKey(peer.ID) ic.PrivKey
PrivKey(context.Context, peer.ID) ic.PrivKey

// AddPrivKey stores the private key of a peer.
AddPrivKey(peer.ID, ic.PrivKey) error
AddPrivKey(context.Context, peer.ID, ic.PrivKey) error

// PeersWithKeys returns all the peer IDs stored in the KeyBook.
PeersWithKeys() peer.IDSlice
PeersWithKeys(context.Context) peer.IDSlice

// RemovePeer removes all keys associated with a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}

// Metrics tracks metrics across a set of peers.
Expand All @@ -226,25 +226,25 @@ type Metrics interface {
LatencyEWMA(peer.ID) time.Duration

// RemovePeer removes all metrics stored for a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}

// ProtoBook tracks the protocols supported by peers.
type ProtoBook interface {
GetProtocols(peer.ID) ([]protocol.ID, error)
AddProtocols(peer.ID, ...protocol.ID) error
SetProtocols(peer.ID, ...protocol.ID) error
RemoveProtocols(peer.ID, ...protocol.ID) error
GetProtocols(context.Context, peer.ID) ([]protocol.ID, error)
AddProtocols(context.Context, peer.ID, ...protocol.ID) error
SetProtocols(context.Context, peer.ID, ...protocol.ID) error
RemoveProtocols(context.Context, peer.ID, ...protocol.ID) error

// SupportsProtocols returns the set of protocols the peer supports from among the given protocols.
// If the returned error is not nil, the result is indeterminate.
SupportsProtocols(peer.ID, ...protocol.ID) ([]protocol.ID, error)
SupportsProtocols(context.Context, peer.ID, ...protocol.ID) ([]protocol.ID, error)

// FirstSupportedProtocol returns the first protocol that the peer supports among the given protocols.
// If the peer does not support any of the given protocols, this function will return an empty protocol.ID and a nil error.
// If the returned error is not nil, the result is indeterminate.
FirstSupportedProtocol(peer.ID, ...protocol.ID) (protocol.ID, error)
FirstSupportedProtocol(context.Context, peer.ID, ...protocol.ID) (protocol.ID, error)

// RemovePeer removes all protocols associated with a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}
2 changes: 1 addition & 1 deletion p2p/discovery/backoff/backoffconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func getNetHosts(t *testing.T, n int) []host.Host {
func loadCh(peers []host.Host) <-chan peer.AddrInfo {
ch := make(chan peer.AddrInfo, len(peers))
for _, p := range peers {
ch <- p.Peerstore().PeerInfo(p.ID())
ch <- p.Peerstore().PeerInfo(context.Background(), p.ID())
}
close(ch)
return ch
Expand Down
8 changes: 4 additions & 4 deletions p2p/host/autonat/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (as *AmbientAutoNAT) background() {
as.confidence--
}
case event.EvtPeerIdentificationCompleted:
if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
if s, err := as.host.Peerstore().SupportsProtocols(context.Background(), e.Peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := *as.status.Load()
if currentStatus == network.ReachabilityUnknown {
as.tryProbe(e.Peer)
Expand Down Expand Up @@ -367,7 +367,7 @@ func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool {
}
as.cleanupRecentProbes()

info := as.host.Peerstore().PeerInfo(p)
info := as.host.Peerstore().PeerInfo(context.Background(), p)

if !as.config.dialPolicy.skipPeer(info.Addrs) {
as.recentProbes[p] = time.Now()
Expand Down Expand Up @@ -402,9 +402,9 @@ func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
candidates := make([]peer.ID, 0, len(peers))

for _, p := range peers {
info := as.host.Peerstore().PeerInfo(p)
info := as.host.Peerstore().PeerInfo(context.Background(), p)
// Exclude peers which don't support the autonat protocol.
if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil {
if proto, err := as.host.Peerstore().SupportsProtocols(context.Background(), p, AutoNATProto); len(proto) == 0 || err != nil {
continue
}

Expand Down
8 changes: 4 additions & 4 deletions p2p/host/autonat/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ func makeAutoNATServicePublic(t *testing.T) host.Host {

func makeAutoNAT(t *testing.T, ash host.Host) (host.Host, AutoNAT) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute)
h.Peerstore().AddProtocols(ash.ID(), AutoNATProto)
h.Peerstore().AddAddrs(context.Background(), ash.ID(), ash.Addrs(), time.Minute)
h.Peerstore().AddProtocols(context.Background(), ash.ID(), AutoNATProto)
a, _ := New(h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay())
a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond
return h, a
}

func identifyAsServer(server, recip host.Host) {
recip.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Minute)
recip.Peerstore().AddProtocols(server.ID(), AutoNATProto)
recip.Peerstore().AddAddrs(context.Background(), server.ID(), server.Addrs(), time.Minute)
recip.Peerstore().AddProtocols(context.Background(), server.ID(), AutoNATProto)

}

Expand Down
8 changes: 4 additions & 4 deletions p2p/host/autonat/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
ctx, cancel := context.WithTimeout(context.Background(), as.config.dialTimeout)
defer cancel()

as.config.dialer.Peerstore().ClearAddrs(pi.ID)
as.config.dialer.Peerstore().ClearAddrs(ctx, pi.ID)

as.config.dialer.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
as.config.dialer.Peerstore().AddAddrs(ctx, pi.ID, pi.Addrs, peerstore.TempAddrTTL)

defer func() {
as.config.dialer.Peerstore().ClearAddrs(pi.ID)
as.config.dialer.Peerstore().RemovePeer(pi.ID)
as.config.dialer.Peerstore().ClearAddrs(ctx, pi.ID)
as.config.dialer.Peerstore().RemovePeer(ctx, pi.ID)
}()

conn, err := as.config.dialer.DialPeer(ctx, pi.ID)
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/autonat/test/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestAutonatRoundtrip(t *testing.T) {
t.Fatal(err)
}

client.Peerstore().AddAddrs(service.ID(), service.Addrs(), time.Hour)
require.NoError(t, client.Connect(context.Background(), service.Peerstore().PeerInfo(service.ID())))
client.Peerstore().AddAddrs(context.Background(), service.ID(), service.Addrs(), time.Hour)
require.NoError(t, client.Connect(context.Background(), service.Peerstore().PeerInfo(context.Background(), service.ID())))

cSub, err := client.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR
return false, ctx.Err()
}

protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv2)
protos, err := rf.host.Peerstore().SupportsProtocols(ctx, pi.ID, protoIDv2)
if err != nil {
return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", pi.ID, err)
}
Expand Down Expand Up @@ -734,7 +734,7 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
// add relay specific addrs to the list
relayAddrCnt := 0
for p := range rf.relays {
addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(p))
addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(context.Background(), p))
relayAddrCnt += len(addrs)
circuit := ma.StringCast(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty()))
for _, addr := range addrs {
Expand Down
16 changes: 8 additions & 8 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
}
h.caBook = cab

h.signKey = h.Peerstore().PrivKey(h.ID())
h.signKey = h.Peerstore().PrivKey(context.Background(), h.ID())
if h.signKey == nil {
return nil, errors.New("unable to access host key")
}
Expand All @@ -221,7 +221,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if err != nil {
return nil, fmt.Errorf("failed to create signed record for self: %w", err)
}
if _, err := cab.ConsumePeerRecord(ev, peerstore.PermanentAddrTTL); err != nil {
if _, err := cab.ConsumePeerRecord(context.Background(), ev, peerstore.PermanentAddrTTL); err != nil {
return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err)
}
}
Expand Down Expand Up @@ -515,7 +515,7 @@ func (h *BasicHost) background() {
changeEvt.SignedPeerRecord = sr

// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
if _, err := h.caBook.ConsumePeerRecord(context.Background(), sr, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}
Expand Down Expand Up @@ -656,7 +656,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
return nil, ctx.Err()
}

pref, err := h.preferredProtocol(p, pids)
pref, err := h.preferredProtocol(ctx, p, pids)
if err != nil {
_ = s.Reset()
return nil, err
Expand Down Expand Up @@ -692,12 +692,12 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
}

s.SetProtocol(selected)
h.Peerstore().AddProtocols(p, selected)
h.Peerstore().AddProtocols(ctx, p, selected)
return s, nil
}

func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.ID, error) {
supported, err := h.Peerstore().SupportsProtocols(p, pids...)
func (h *BasicHost) preferredProtocol(ctx context.Context, p peer.ID, pids []protocol.ID) (protocol.ID, error) {
supported, err := h.Peerstore().SupportsProtocols(ctx, p, pids...)
if err != nil {
return "", err
}
Expand All @@ -716,7 +716,7 @@ func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.I
// It will also resolve any /dns4, /dns6, and /dnsaddr addresses.
func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
// absorb addresses into peerstore
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
h.Peerstore().AddAddrs(ctx, pi.ID, pi.Addrs, peerstore.TempAddrTTL)

forceDirect, _ := network.GetForceDirectDial(ctx)
if !forceDirect {
Expand Down
Loading

0 comments on commit 8864d1c

Please sign in to comment.