Skip to content

Commit

Permalink
feat(dht): provider records use multihashes instead of CIDs
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Dec 11, 2019
1 parent 85ccd07 commit db32365
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 110 deletions.
7 changes: 4 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,12 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, keyCid cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, keyCid)
defer eip.Done()

pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.Bytes(), 0)
key := keyCid.Hash()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
Expand Down
28 changes: 24 additions & 4 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multistream"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -42,8 +44,26 @@ func init() {
for i := 0; i < 100; i++ {
v := fmt.Sprintf("%d -- value", i)

mhv := u.Hash([]byte(v))
testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
var newCid cid.Cid
switch i % 3 {
case 0:
mhv := u.Hash([]byte(v))
newCid = cid.NewCidV0(mhv)
case 1:
mhv := u.Hash([]byte(v))
newCid = cid.NewCidV1(cid.DagCBOR, mhv)
case 2:
rawMh := make([]byte, 12)
binary.PutUvarint(rawMh, cid.Raw)
binary.PutUvarint(rawMh[1:], 10)
copy(rawMh[2:], []byte(v)[:10])
_, mhv, err := multihash.MHFromBytes(rawMh)
if err != nil {
panic(err)
}
newCid = cid.NewCidV1(cid.Raw, mhv)
}
testCaseCids = append(testCaseCids, newCid)
}
}

Expand Down Expand Up @@ -591,7 +611,7 @@ func TestLocalProvides(t *testing.T) {

for _, c := range testCaseCids {
for i := 0; i < 3; i++ {
provs := dhts[i].providers.GetProviders(ctx, c)
provs := dhts[i].providers.GetProviders(ctx, c.Hash())
if len(provs) > 0 {
t.Fatal("shouldnt know this")
}
Expand Down Expand Up @@ -1255,7 +1275,7 @@ func TestClientModeConnect(t *testing.T) {

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.providers.AddProvider(ctx, c, p)
a.providers.AddProvider(ctx, c.Hash(), p)
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multihash v0.0.9
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.4.0
go.opencensus.io v0.22.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,6 @@ github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lg
github.com/multiformats/go-multiaddr v0.1.0/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.1.1 h1:rVAztJYMhCQ7vEFr8FvxW3mS+HF2eY/oPbOMeS0ZDnE=
github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo=
github.com/multiformats/go-multiaddr v0.1.2 h1:HWYHNSyyllbQopmVIF5K7JKJugiah+L9/kuZKHbmNdQ=
github.com/multiformats/go-multiaddr v0.1.2/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr v0.2.0 h1:lR52sFwcTCuQb6bTfnXF6zA2XfyYvyd+5a9qECv/J90=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
Expand Down
23 changes: 12 additions & 11 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"

mh "github.com/multiformats/go-multihash"

"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -307,26 +308,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
logger.SetTag(ctx, "peer", p)

resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
c, err := cid.Cast([]byte(pmes.GetKey()))
_, h, err := mh.MHFromBytes(pmes.GetKey())
if err != nil {
return nil, err
}
logger.SetTag(ctx, "key", c)
logger.SetTag(ctx, "key", h)

// debug logging niceness.
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c)
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, h)
logger.Debugf("%s begin", reqDesc)
defer logger.Debugf("%s end", reqDesc)

// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(convertToDsKey(c.Bytes()))
has, err := dht.datastore.Has(convertToDsKey(h))
if err != nil && err != ds.ErrNotFound {
logger.Debugf("unexpected datastore error: %v\n", err)
has = false
}

// setup providers
providers := dht.providers.GetProviders(ctx, c)
providers := dht.providers.GetProviders(ctx, h)
if has {
providers = append(providers, dht.self)
logger.Debugf("%s have the value. added self as provider", reqDesc)
Expand Down Expand Up @@ -356,13 +357,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
defer func() { logger.FinishWithErr(ctx, _err) }()
logger.SetTag(ctx, "peer", p)

c, err := cid.Cast([]byte(pmes.GetKey()))
_, h, err := mh.MHFromBytes(pmes.GetKey())
if err != nil {
return nil, err
}
logger.SetTag(ctx, "key", c)
logger.SetTag(ctx, "key", h)

logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c)
logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, h)

// add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Expand All @@ -379,12 +380,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

logger.Debugf("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
logger.Debugf("received provider %s for %s (addrs: %s)", p, h, pi.Addrs)
if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.providers.AddProvider(ctx, c, p)
dht.providers.AddProvider(ctx, h, p)
}

return nil, nil
Expand Down
33 changes: 17 additions & 16 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (

"github.com/libp2p/go-libp2p-core/peer"

mh "github.com/multiformats/go-multihash"

lru "github.com/hashicorp/golang-lru/simplelru"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
autobatch "github.com/ipfs/go-datastore/autobatch"
dsq "github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -47,12 +48,12 @@ type providerSet struct {
}

type addProv struct {
k cid.Cid
k mh.Multihash
val peer.ID
}

type getProv struct {
k cid.Cid
k mh.Multihash
resp chan []peer.ID
}

Expand All @@ -76,24 +77,24 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)

const providersKeyPrefix = "/providers/"

func mkProvKey(k cid.Cid) string {
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
func mkProvKey(k mh.Multihash) string {
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
}

func (pm *ProviderManager) Process() goprocess.Process {
return pm.proc
}

func (pm *ProviderManager) providersForKey(k cid.Cid) ([]peer.ID, error) {
func (pm *ProviderManager) providersForKey(k mh.Multihash) ([]peer.ID, error) {
pset, err := pm.getProvSet(k)
if err != nil {
return nil, err
}
return pset.providers, nil
}

func (pm *ProviderManager) getProvSet(k cid.Cid) (*providerSet, error) {
cached, ok := pm.providers.Get(k)
func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) {
cached, ok := pm.providers.Get(string(k))
if ok {
return cached.(*providerSet), nil
}
Expand All @@ -104,13 +105,13 @@ func (pm *ProviderManager) getProvSet(k cid.Cid) (*providerSet, error) {
}

if len(pset.providers) > 0 {
pm.providers.Add(k, pset)
pm.providers.Add(string(k), pset)
}

return pset, nil
}

func loadProvSet(dstore ds.Datastore, k cid.Cid) (*providerSet, error) {
func loadProvSet(dstore ds.Datastore, k mh.Multihash) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
if err != nil {
return nil, err
Expand Down Expand Up @@ -174,20 +175,20 @@ func readTimeValue(data []byte) (time.Time, error) {
return time.Unix(0, nsec), nil
}

func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error {
func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error {
now := time.Now()
if provs, ok := pm.providers.Get(k); ok {
if provs, ok := pm.providers.Get(string(k)); ok {
provs.(*providerSet).setVal(p, now)
} // else not cached, just write through

return writeProviderEntry(pm.dstore, k, p, now)
}

func mkProvKeyFor(k cid.Cid, p peer.ID) string {
func mkProvKeyFor(k mh.Multihash, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}

func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error {
func writeProviderEntry(dstore ds.Datastore, k mh.Multihash, p peer.ID, t time.Time) error {
dsk := mkProvKeyFor(k, p)

buf := make([]byte, 16)
Expand Down Expand Up @@ -300,7 +301,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
}

// AddProvider adds a provider.
func (pm *ProviderManager) AddProvider(ctx context.Context, k cid.Cid, val peer.ID) {
func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val peer.ID) {
prov := &addProv{
k: k,
val: val,
Expand All @@ -313,7 +314,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k cid.Cid, val peer.

// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k cid.Cid) []peer.ID {
func (pm *ProviderManager) GetProviders(ctx context.Context, k mh.Multihash) []peer.ID {
gp := &getProv{
k: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
Expand Down
Loading

0 comments on commit db32365

Please sign in to comment.