Skip to content

Commit

Permalink
Merge pull request #422 from libp2p/feat/advertise-multihash
Browse files Browse the repository at this point in the history
Provider records use multihashes instead of CIDs
  • Loading branch information
Stebalien authored Jan 2, 2020
2 parents 6c4cd42 + 5d2e3df commit 95964c0
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 129 deletions.
8 changes: 4 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/providers"

"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
Expand All @@ -33,6 +32,7 @@ import (
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)

var logger = logging.Logger("dht")
Expand Down Expand Up @@ -374,11 +374,11 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key))
defer eip.Done()

pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.Bytes(), 0)
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
Expand Down
33 changes: 27 additions & 6 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multistream"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -42,8 +44,26 @@ func init() {
for i := 0; i < 100; i++ {
v := fmt.Sprintf("%d -- value", i)

mhv := u.Hash([]byte(v))
testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
var newCid cid.Cid
switch i % 3 {
case 0:
mhv := u.Hash([]byte(v))
newCid = cid.NewCidV0(mhv)
case 1:
mhv := u.Hash([]byte(v))
newCid = cid.NewCidV1(cid.DagCBOR, mhv)
case 2:
rawMh := make([]byte, 12)
binary.PutUvarint(rawMh, cid.Raw)
binary.PutUvarint(rawMh[1:], 10)
copy(rawMh[2:], []byte(v)[:10])
_, mhv, err := multihash.MHFromBytes(rawMh)
if err != nil {
panic(err)
}
newCid = cid.NewCidV1(cid.Raw, mhv)
}
testCaseCids = append(testCaseCids, newCid)
}
}

Expand Down Expand Up @@ -633,7 +653,7 @@ func TestLocalProvides(t *testing.T) {

for _, c := range testCaseCids {
for i := 0; i < 3; i++ {
provs := dhts[i].providers.GetProviders(ctx, c)
provs := dhts[i].providers.GetProviders(ctx, c.Hash())
if len(provs) > 0 {
t.Fatal("shouldnt know this")
}
Expand Down Expand Up @@ -1374,7 +1394,7 @@ func TestClientModeConnect(t *testing.T) {

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.providers.AddProvider(ctx, c, p)
a.providers.AddProvider(ctx, c.Hash(), p)
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
Expand Down Expand Up @@ -1548,6 +1568,7 @@ func TestFindClosestPeers(t *testing.T) {

func TestProvideDisabled(t *testing.T) {
k := testCaseCids[0]
kHash := k.Hash()
for i := 0; i < 3; i++ {
enabledA := (i & 0x1) > 0
enabledB := (i & 0x2) > 0
Expand Down Expand Up @@ -1588,7 +1609,7 @@ func TestProvideDisabled(t *testing.T) {
if err != routing.ErrNotSupported {
t.Fatal("get should have failed on node B")
}
provs := dhtB.providers.GetProviders(ctx, k)
provs := dhtB.providers.GetProviders(ctx, kHash)
if len(provs) != 0 {
t.Fatal("node B should not have found local providers")
}
Expand All @@ -1604,7 +1625,7 @@ func TestProvideDisabled(t *testing.T) {
t.Fatal("node A should not have found providers")
}
}
provAddrs := dhtA.providers.GetProviders(ctx, k)
provAddrs := dhtA.providers.GetProviders(ctx, kHash)
if len(provAddrs) != 0 {
t.Fatal("node A should not have found local providers")
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multihash v0.0.10
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.4.0
go.opencensus.io v0.22.2
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p v0.4.2 h1:p0cthB0jDNHO4gH2HzS8/nAMMXbfUlFHs0jwZ4U+F2g=
github.com/libp2p/go-libp2p v0.4.2/go.mod h1:MNmgUxUw5pMsdOzMlT0EE7oKjRasl+WyVwM0IBlpKgQ=
github.com/libp2p/go-libp2p v0.5.0 h1:/nnb5mc2TK6TwknECsWIkfCwMTHv0AXbvzxlnVivfeg=
github.com/libp2p/go-libp2p v0.5.0/go.mod h1:Os7a5Z3B+ErF4v7zgIJ7nBHNu2LYt8ZMLkTQUB3G/wA=
github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI=
Expand All @@ -189,8 +187,6 @@ github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM=
Expand Down Expand Up @@ -268,8 +264,6 @@ github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQti
github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc=
github.com/libp2p/go-tcp-transport v0.1.1 h1:yGlqURmqgNA2fvzjSgZNlHcsd/IulAnKM8Ncu+vlqnw=
github.com/libp2p/go-tcp-transport v0.1.1/go.mod h1:3HzGvLbx6etZjnFlERyakbaYPdfjg2pWP97dFZworkY=
github.com/libp2p/go-ws-transport v0.1.2 h1:VnxQcLfSGtqupqPpBNu8fUiCv+IN1RJ2BcVqQEM+z8E=
github.com/libp2p/go-ws-transport v0.1.2/go.mod h1:dsh2Ld8F+XNmzpkaAijmg5Is+e9l6/1tK/6VFOdN69Y=
github.com/libp2p/go-ws-transport v0.2.0 h1:MJCw2OrPA9+76YNRvdo1wMnSOxb9Bivj6sVFY1Xrj6w=
github.com/libp2p/go-ws-transport v0.2.0/go.mod h1:9BHJz/4Q5A9ludYWKoGCFC5gUElzlHoKzu0yY9p/klM=
github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg=
Expand Down
29 changes: 14 additions & 15 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -317,26 +316,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
logger.SetTag(ctx, "peer", p)

resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleGetProviders key size too large")
}
logger.SetTag(ctx, "key", c)
logger.SetTag(ctx, "key", key)

// debug logging niceness.
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c)
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
logger.Debugf("%s begin", reqDesc)
defer logger.Debugf("%s end", reqDesc)

// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(convertToDsKey(c.Bytes()))
has, err := dht.datastore.Has(convertToDsKey(key))
if err != nil && err != ds.ErrNotFound {
logger.Debugf("unexpected datastore error: %v\n", err)
has = false
}

// setup providers
providers := dht.providers.GetProviders(ctx, c)
providers := dht.providers.GetProviders(ctx, key)
if has {
providers = append(providers, dht.self)
logger.Debugf("%s have the value. added self as provider", reqDesc)
Expand Down Expand Up @@ -366,13 +365,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
defer func() { logger.FinishWithErr(ctx, _err) }()
logger.SetTag(ctx, "peer", p)

c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleAddProviders key size too large")
}
logger.SetTag(ctx, "key", c)
logger.SetTag(ctx, "key", key)

logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c)
logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)

// add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Expand All @@ -389,12 +388,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

logger.Debugf("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
logger.Debugf("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.providers.AddProvider(ctx, c, p)
dht.providers.AddProvider(ctx, key, p)
}

return nil, nil
Expand Down
18 changes: 15 additions & 3 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
notif "github.com/libp2p/go-libp2p-routing/notifications"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)

func tryFormatLoggableKey(k string) (string, error) {
Expand All @@ -33,11 +35,15 @@ func tryFormatLoggableKey(k string) (string, error) {
cstr = k
}

var encStr string
c, err := cid.Cast([]byte(cstr))
if err != nil {
return "", fmt.Errorf("loggableKey could not cast key to a CID: %x %v", k, err)
if err == nil {
encStr = c.String()
} else {
encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
}
return fmt.Sprintf("/%s/%s", proto, c.String()), nil

return fmt.Sprintf("/%s/%s", proto, encStr), nil
}

func loggableKey(k string) logging.LoggableMap {
Expand All @@ -53,6 +59,12 @@ func loggableKey(k string) logging.LoggableMap {
}
}

func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
return logging.LoggableMap{
"multihash": base32.RawStdEncoding.EncodeToString(mh),
}
}

// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
Expand Down
8 changes: 7 additions & 1 deletion lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ func TestLoggableKey(t *testing.T) {
t.Error("expected cid to be formatted as a loggable key")
}

for _, s := range []string{"bla bla", "/bla", "/bla/asdf", ""} {
for _, s := range []string{"/bla", ""} {
if _, err := tryFormatLoggableKey(s); err == nil {
t.Errorf("expected to fail formatting: %s", s)
}
}

for _, s := range []string{"bla bla", "/bla/asdf"} {
if _, err := tryFormatLoggableKey(s); err != nil {
t.Errorf("expected to be formatable: %s", s)
}
}
}
Loading

0 comments on commit 95964c0

Please sign in to comment.