From e1a407668555fc2e9134d29714d52b209e5fee22 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 24 Apr 2020 19:11:23 -0700 Subject: [PATCH 1/2] fix: re-validate peers whenever their state changes 1. When a peer changes their listening addresses, we need to re-run our routing table filters, possibly removing them. 2. When a peer _starts_ supporting the DHT protocol, we need to add them to our routing table. Previously, we'd only do the inverse. --- subscriber_notifee.go | 35 +++++++---------------------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/subscriber_notifee.go b/subscriber_notifee.go index f1a6a3efd..ed643864a 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -92,9 +92,9 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { default: } case event.EvtPeerProtocolsUpdated: - handlePeerProtocolsUpdatedEvent(dht, evt) + handlePeerChangeEvent(dht, evt.Peer) case event.EvtPeerIdentificationCompleted: - handlePeerIdentificationCompletedEvent(dht, evt) + handlePeerChangeEvent(dht, evt.Peer) case event.EvtLocalReachabilityChanged: if dht.auto == ModeAuto || dht.auto == ModeAutoServer { handleLocalReachabilityChangedEvent(dht, evt) @@ -112,40 +112,19 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { } } -func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, e event.EvtPeerIdentificationCompleted) { - dht.plk.Lock() - defer dht.plk.Unlock() - if dht.host.Network().Connectedness(e.Peer) != network.Connected { - return - } - - // if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed - valid, err := dht.validRTPeer(e.Peer) +func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) { + valid, err := dht.validRTPeer(p) if err != nil { logger.Errorf("could not check peerstore for protocol support: err: %s", err) return } else if valid { - dht.peerFound(dht.ctx, e.Peer, false) + dht.peerFound(dht.ctx, p, false) dht.fixRTIfNeeded() + } else { + dht.peerStoppedDHT(dht.ctx, p) } } -func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, e event.EvtPeerProtocolsUpdated) { - valid, err := dht.validRTPeer(e.Peer) - if err != nil { - logger.Errorf("could not check peerstore for protocol support: err: %s", err) - return - } - - if !valid { - dht.peerStoppedDHT(dht.ctx, e.Peer) - return - } - - // we just might have discovered a peer that supports the DHT protocol - dht.fixRTIfNeeded() -} - func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabilityChanged) { var target mode From 7f60a9d42afe9f20c49880d48b11b634d6a12967 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 24 Apr 2020 19:25:41 -0700 Subject: [PATCH 2/2] fix: don't add addresses for connected peers They've told us better ones via identify. --- dht.go | 9 +++++++++ query.go | 2 +- routing.go | 4 +--- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/dht.go b/dht.go index 1525c64c7..7d5fd61bb 100644 --- a/dht.go +++ b/dht.go @@ -32,6 +32,7 @@ import ( "github.com/jbenet/goprocess" goprocessctx "github.com/jbenet/goprocess/context" "github.com/multiformats/go-base32" + ma "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" ) @@ -702,3 +703,11 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta ) // ignoring error as it is unrelated to the actual function of this code. return ctx } + +func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + // Don't add addresses for self or our connected peers. We have better ones. + if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected { + return + } + dht.peerstore.AddAddrs(p, addrs, ttl) +} diff --git a/query.go b/query.go index fcf8d8456..43eb808ec 100644 --- a/query.go +++ b/query.go @@ -416,7 +416,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID // add their addresses to the dialer's peerstore if q.dht.queryPeerFilter(q.dht, *next) { - q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL) + q.dht.maybeAddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL) saw = append(saw, next.ID) } } diff --git a/routing.go b/routing.go index 4d0f077c5..b57e0ae84 100644 --- a/routing.go +++ b/routing.go @@ -575,9 +575,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash // Add unique providers from request, up to 'count' for _, prov := range provs { - if prov.ID != dht.self { - dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) - } + dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) logger.Debugf("got provider: %s", prov) if ps.TryAdd(prov.ID) { logger.Debugf("using provider: %s", prov)