Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provider records use multihashes instead of CIDs #422

Merged
merged 3 commits into from
Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/providers"

"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
Expand All @@ -33,6 +32,7 @@ import (
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)

var logger = logging.Logger("dht")
Expand Down Expand Up @@ -374,11 +374,11 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key))
defer eip.Done()

pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.Bytes(), 0)
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
Expand Down
33 changes: 27 additions & 6 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multistream"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -42,8 +44,26 @@ func init() {
for i := 0; i < 100; i++ {
v := fmt.Sprintf("%d -- value", i)

mhv := u.Hash([]byte(v))
testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
var newCid cid.Cid
switch i % 3 {
case 0:
mhv := u.Hash([]byte(v))
newCid = cid.NewCidV0(mhv)
case 1:
mhv := u.Hash([]byte(v))
newCid = cid.NewCidV1(cid.DagCBOR, mhv)
case 2:
rawMh := make([]byte, 12)
binary.PutUvarint(rawMh, cid.Raw)
binary.PutUvarint(rawMh[1:], 10)
copy(rawMh[2:], []byte(v)[:10])
_, mhv, err := multihash.MHFromBytes(rawMh)
if err != nil {
panic(err)
}
newCid = cid.NewCidV1(cid.Raw, mhv)
}
testCaseCids = append(testCaseCids, newCid)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -633,7 +653,7 @@ func TestLocalProvides(t *testing.T) {

for _, c := range testCaseCids {
for i := 0; i < 3; i++ {
provs := dhts[i].providers.GetProviders(ctx, c)
provs := dhts[i].providers.GetProviders(ctx, c.Hash())
if len(provs) > 0 {
t.Fatal("shouldnt know this")
}
Expand Down Expand Up @@ -1370,7 +1390,7 @@ func TestClientModeConnect(t *testing.T) {

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.providers.AddProvider(ctx, c, p)
a.providers.AddProvider(ctx, c.Hash(), p)
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
Expand Down Expand Up @@ -1544,6 +1564,7 @@ func TestFindClosestPeers(t *testing.T) {

func TestProvideDisabled(t *testing.T) {
k := testCaseCids[0]
kHash := k.Hash()
for i := 0; i < 3; i++ {
enabledA := (i & 0x1) > 0
enabledB := (i & 0x2) > 0
Expand Down Expand Up @@ -1584,7 +1605,7 @@ func TestProvideDisabled(t *testing.T) {
if err != routing.ErrNotSupported {
t.Fatal("get should have failed on node B")
}
provs := dhtB.providers.GetProviders(ctx, k)
provs := dhtB.providers.GetProviders(ctx, kHash)
if len(provs) != 0 {
t.Fatal("node B should not have found local providers")
}
Expand All @@ -1600,7 +1621,7 @@ func TestProvideDisabled(t *testing.T) {
t.Fatal("node A should not have found providers")
}
}
provAddrs := dhtA.providers.GetProviders(ctx, k)
provAddrs := dhtA.providers.GetProviders(ctx, kHash)
if len(provAddrs) != 0 {
t.Fatal("node A should not have found local providers")
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multihash v0.0.10
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.4.0
go.opencensus.io v0.22.2
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p v0.4.2 h1:p0cthB0jDNHO4gH2HzS8/nAMMXbfUlFHs0jwZ4U+F2g=
github.com/libp2p/go-libp2p v0.4.2/go.mod h1:MNmgUxUw5pMsdOzMlT0EE7oKjRasl+WyVwM0IBlpKgQ=
github.com/libp2p/go-libp2p v0.5.0 h1:/nnb5mc2TK6TwknECsWIkfCwMTHv0AXbvzxlnVivfeg=
github.com/libp2p/go-libp2p v0.5.0/go.mod h1:Os7a5Z3B+ErF4v7zgIJ7nBHNu2LYt8ZMLkTQUB3G/wA=
github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI=
Expand All @@ -189,8 +187,6 @@ github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM=
Expand Down Expand Up @@ -268,8 +264,6 @@ github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQti
github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc=
github.com/libp2p/go-tcp-transport v0.1.1 h1:yGlqURmqgNA2fvzjSgZNlHcsd/IulAnKM8Ncu+vlqnw=
github.com/libp2p/go-tcp-transport v0.1.1/go.mod h1:3HzGvLbx6etZjnFlERyakbaYPdfjg2pWP97dFZworkY=
github.com/libp2p/go-ws-transport v0.1.2 h1:VnxQcLfSGtqupqPpBNu8fUiCv+IN1RJ2BcVqQEM+z8E=
github.com/libp2p/go-ws-transport v0.1.2/go.mod h1:dsh2Ld8F+XNmzpkaAijmg5Is+e9l6/1tK/6VFOdN69Y=
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
github.com/libp2p/go-ws-transport v0.2.0 h1:MJCw2OrPA9+76YNRvdo1wMnSOxb9Bivj6sVFY1Xrj6w=
github.com/libp2p/go-ws-transport v0.2.0/go.mod h1:9BHJz/4Q5A9ludYWKoGCFC5gUElzlHoKzu0yY9p/klM=
github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg=
Expand Down
29 changes: 14 additions & 15 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -317,26 +316,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
logger.SetTag(ctx, "peer", p)

resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleGetProviders key size too large")
}
logger.SetTag(ctx, "key", c)
logger.SetTag(ctx, "key", key)

// debug logging niceness.
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c)
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
logger.Debugf("%s begin", reqDesc)
defer logger.Debugf("%s end", reqDesc)

// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(convertToDsKey(c.Bytes()))
has, err := dht.datastore.Has(convertToDsKey(key))
if err != nil && err != ds.ErrNotFound {
logger.Debugf("unexpected datastore error: %v\n", err)
has = false
}

// setup providers
providers := dht.providers.GetProviders(ctx, c)
providers := dht.providers.GetProviders(ctx, key)
if has {
providers = append(providers, dht.self)
logger.Debugf("%s have the value. added self as provider", reqDesc)
Expand Down Expand Up @@ -366,13 +365,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
defer func() { logger.FinishWithErr(ctx, _err) }()
logger.SetTag(ctx, "peer", p)

c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleAddProviders key size too large")
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}
logger.SetTag(ctx, "key", c)
logger.SetTag(ctx, "key", key)

logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c)
logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)

// add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Expand All @@ -389,12 +388,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

logger.Debugf("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
logger.Debugf("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.providers.AddProvider(ctx, c, p)
dht.providers.AddProvider(ctx, key, p)
}

return nil, nil
Expand Down
18 changes: 15 additions & 3 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
notif "github.com/libp2p/go-libp2p-routing/notifications"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)

func tryFormatLoggableKey(k string) (string, error) {
Expand All @@ -33,11 +35,15 @@ func tryFormatLoggableKey(k string) (string, error) {
cstr = k
}

var encStr string
c, err := cid.Cast([]byte(cstr))
if err != nil {
return "", fmt.Errorf("loggableKey could not cast key to a CID: %x %v", k, err)
if err == nil {
encStr = c.String()
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
} else {
encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}
return fmt.Sprintf("/%s/%s", proto, c.String()), nil

return fmt.Sprintf("/%s/%s", proto, encStr), nil
}

func loggableKey(k string) logging.LoggableMap {
Expand All @@ -53,6 +59,12 @@ func loggableKey(k string) logging.LoggableMap {
}
}

func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
return logging.LoggableMap{
"multihash": base32.RawStdEncoding.EncodeToString(mh),
}
Comment on lines +63 to +65
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is uppercase, but when we encode CIDs using b32 we use lowercase. Should I lowercase this?

}

// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
Expand Down
8 changes: 7 additions & 1 deletion lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ func TestLoggableKey(t *testing.T) {
t.Error("expected cid to be formatted as a loggable key")
}

for _, s := range []string{"bla bla", "/bla", "/bla/asdf", ""} {
for _, s := range []string{"/bla", ""} {
if _, err := tryFormatLoggableKey(s); err == nil {
t.Errorf("expected to fail formatting: %s", s)
}
}

for _, s := range []string{"bla bla", "/bla/asdf"} {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if _, err := tryFormatLoggableKey(s); err != nil {
t.Errorf("expected to be formatable: %s", s)
}
}
}
Loading