Skip to content

Commit

Permalink
fix: correctly apply addrFilters in the dht
Browse files Browse the repository at this point in the history
This still does not do the fullrt client but it wasn't doing it before either.
  • Loading branch information
Jorropo committed Aug 24, 2023
1 parent 2cbe38a commit fe4a592
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 16 deletions.
10 changes: 7 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,12 @@ func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Dura
if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
return
}
if dht.addrFilter != nil {
addrs = dht.addrFilter(addrs)
dht.peerstore.AddAddrs(p, dht.filterAddrs(addrs), ttl)
}

func (dht *IpfsDHT) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
if f := dht.addrFilter; f != nil {
return f(addrs)
}
dht.peerstore.AddAddrs(p, addrs, ttl)
return addrs
}
5 changes: 4 additions & 1 deletion fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,10 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
}

successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.h)
err := dht.protoMessenger.PutProviderAddrs(ctx, p, keyMH, peer.AddrInfo{
ID: dht.self,
Addrs: dht.h.Addrs(),
})
return err
}, peers, true)

Expand Down
5 changes: 4 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

dht.providerStore.AddProvider(ctx, key, peer.AddrInfo{ID: pi.ID, Addrs: pi.Addrs})
// We run the addrs filter after checking for the length,
// this allows transient nodes with varying /p2p-circuit addresses to still have their anouncement go through.
addrs := dht.filterAddrs(pi.Addrs)
dht.providerStore.AddProvider(ctx, key, peer.AddrInfo{ID: pi.ID, Addrs: addrs})
}

return nil, nil
Expand Down
5 changes: 4 additions & 1 deletion lookup_optim.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ func (os *optimisticState) stopFn(qps *qpeerset.QueryPeerset) bool {
}

func (os *optimisticState) putProviderRecord(pid peer.ID) {
err := os.dht.protoMessenger.PutProvider(os.putCtx, pid, []byte(os.key), os.dht.host)
err := os.dht.protoMessenger.PutProviderAddrs(os.putCtx, pid, []byte(os.key), peer.AddrInfo{
ID: os.dht.self,
Addrs: os.dht.filterAddrs(os.dht.host.Addrs()),
})
os.peerStatesLk.Lock()
if err != nil {
os.peerStates[pid] = failure
Expand Down
21 changes: 12 additions & 9 deletions pb/protocol_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,16 @@ func (pm *ProtocolMessenger) GetClosestPeers(ctx context.Context, p peer.ID, id
return peers, nil
}

// PutProvider asks a peer to store that we are a provider for the given key.
func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key multihash.Multihash, host host.Host) (err error) {
// PutProvider is deprecated please use [ProtocolMessenger.PutProviderAddrs].
func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key multihash.Multihash, h host.Host) error {
return pm.PutProviderAddrs(ctx, p, key, peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
})
}

// PutProviderAddrs asks a peer to store that we are a provider for the given key.
func (pm *ProtocolMessenger) PutProviderAddrs(ctx context.Context, p peer.ID, key multihash.Multihash, self peer.AddrInfo) (err error) {
ctx, span := internal.StartSpan(ctx, "ProtocolMessenger.PutProvider")
defer span.End()
if span.IsRecording() {
Expand All @@ -182,19 +190,14 @@ func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key mul
}()
}

pi := peer.AddrInfo{
ID: host.ID(),
Addrs: host.Addrs(),
}

// TODO: We may want to limit the type of addresses in our provider records
// For example, in a WAN-only DHT prohibit sharing non-WAN addresses (e.g. 192.168.0.100)
if len(pi.Addrs) < 1 {
if len(self.Addrs) < 1 {
return fmt.Errorf("no known addresses for self, cannot put provider")
}

pmes := NewMessage(Message_ADD_PROVIDER, key, 0)
pmes.ProviderPeers = RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
pmes.ProviderPeers = RawPeerInfosToPBPeers([]peer.AddrInfo{self})

return pm.m.SendMessage(ctx, p, pmes)
}
Expand Down
5 changes: 4 additions & 1 deletion routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,10 @@ func (dht *IpfsDHT) classicProvide(ctx context.Context, keyMH multihash.Multihas
go func(p peer.ID) {
defer wg.Done()
logger.Debugf("putProvider(%s, %s)", internal.LoggableProviderRecordBytes(keyMH), p)
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.host)
err := dht.protoMessenger.PutProviderAddrs(ctx, p, keyMH, peer.AddrInfo{
ID: dht.self,
Addrs: dht.filterAddrs(dht.host.Addrs()),
})
if err != nil {
logger.Debug(err)
}
Expand Down

0 comments on commit fe4a592

Please sign in to comment.