Skip to content

Commit

Permalink
provider record keys can be an arbitrary byte array less than 80 byte…
Browse files Browse the repository at this point in the history
…s instead of only a multihash
  • Loading branch information
aschmahmann committed Dec 12, 2019
1 parent e96d761 commit c315513
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 53 deletions.
10 changes: 5 additions & 5 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,18 +374,18 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, keyCid cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, keyCid)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
defer eip.Done()

key := keyCid.Hash()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
keyMH := key.Hash()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, keyMH, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
logger.Warningf("read timeout: %s %s", p.Pretty(), key)
logger.Warningf("read timeout: %s %s", p.Pretty(), keyMH)
fallthrough
default:
eip.SetError(err)
Expand Down
30 changes: 14 additions & 16 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"

mh "github.com/multiformats/go-multihash"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
Expand Down Expand Up @@ -318,26 +316,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
logger.SetTag(ctx, "peer", p)

resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
_, h, err := mh.MHFromBytes(pmes.GetKey())
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleGetProviders key size too large")
}
logger.SetTag(ctx, "key", h)
logger.SetTag(ctx, "key", key)

// debug logging niceness.
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, h)
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
logger.Debugf("%s begin", reqDesc)
defer logger.Debugf("%s end", reqDesc)

// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(convertToDsKey(h))
has, err := dht.datastore.Has(convertToDsKey(key))
if err != nil && err != ds.ErrNotFound {
logger.Debugf("unexpected datastore error: %v\n", err)
has = false
}

// setup providers
providers := dht.providers.GetProviders(ctx, h)
providers := dht.providers.GetProviders(ctx, key)
if has {
providers = append(providers, dht.self)
logger.Debugf("%s have the value. added self as provider", reqDesc)
Expand Down Expand Up @@ -367,13 +365,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
defer func() { logger.FinishWithErr(ctx, _err) }()
logger.SetTag(ctx, "peer", p)

_, h, err := mh.MHFromBytes(pmes.GetKey())
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleAddProviders key size too large")
}
logger.SetTag(ctx, "key", h)
logger.SetTag(ctx, "key", key)

logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, h)
logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)

// add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Expand All @@ -390,12 +388,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

logger.Debugf("received provider %s for %s (addrs: %s)", p, h, pi.Addrs)
logger.Debugf("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.providers.AddProvider(ctx, h, p)
dht.providers.AddProvider(ctx, key, p)
}

return nil, nil
Expand Down
24 changes: 11 additions & 13 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/libp2p/go-libp2p-core/peer"

mh "github.com/multiformats/go-multihash"

lru "github.com/hashicorp/golang-lru/simplelru"
ds "github.com/ipfs/go-datastore"
autobatch "github.com/ipfs/go-datastore/autobatch"
Expand Down Expand Up @@ -48,12 +46,12 @@ type providerSet struct {
}

type addProv struct {
k mh.Multihash
k []byte
val peer.ID
}

type getProv struct {
k mh.Multihash
k []byte
resp chan []peer.ID
}

Expand All @@ -77,23 +75,23 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)

const providersKeyPrefix = "/providers/"

func mkProvKey(k mh.Multihash) string {
func mkProvKey(k []byte) string {
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
}

func (pm *ProviderManager) Process() goprocess.Process {
return pm.proc
}

func (pm *ProviderManager) providersForKey(k mh.Multihash) ([]peer.ID, error) {
func (pm *ProviderManager) providersForKey(k []byte) ([]peer.ID, error) {
pset, err := pm.getProvSet(k)
if err != nil {
return nil, err
}
return pset.providers, nil
}

func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) {
func (pm *ProviderManager) getProvSet(k []byte) (*providerSet, error) {
cached, ok := pm.providers.Get(string(k))
if ok {
return cached.(*providerSet), nil
Expand All @@ -111,7 +109,7 @@ func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) {
return pset, nil
}

func loadProvSet(dstore ds.Datastore, k mh.Multihash) (*providerSet, error) {
func loadProvSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
if err != nil {
return nil, err
Expand Down Expand Up @@ -175,7 +173,7 @@ func readTimeValue(data []byte) (time.Time, error) {
return time.Unix(0, nsec), nil
}

func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error {
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
now := time.Now()
if provs, ok := pm.providers.Get(string(k)); ok {
provs.(*providerSet).setVal(p, now)
Expand All @@ -184,11 +182,11 @@ func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error {
return writeProviderEntry(pm.dstore, k, p, now)
}

func mkProvKeyFor(k mh.Multihash, p peer.ID) string {
func mkProvKeyFor(k []byte, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}

func writeProviderEntry(dstore ds.Datastore, k mh.Multihash, p peer.ID, t time.Time) error {
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
dsk := mkProvKeyFor(k, p)

buf := make([]byte, 16)
Expand Down Expand Up @@ -301,7 +299,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
}

// AddProvider adds a provider.
func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val peer.ID) {
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
prov := &addProv{
k: k,
val: val,
Expand All @@ -314,7 +312,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val

// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k mh.Multihash) []peer.ID {
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
gp := &getProv{
k: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
Expand Down
35 changes: 16 additions & 19 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"

mh "github.com/multiformats/go-multihash"

"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -414,12 +412,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha
// locations of the value, similarly to Coral and Mainline DHT.

// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (err error) {
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
if !dht.enableProviders {
return routing.ErrNotSupported
}
eip := logger.EventBegin(ctx, "Provide", keyCid, logging.LoggableMap{"broadcast": brdcst})
key := keyCid.Hash()
eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
keyMH := key.Hash()
defer func() {
if err != nil {
eip.SetError(err)
Expand All @@ -428,7 +426,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
}()

// add self locally
dht.providers.AddProvider(ctx, key, dht.self)
dht.providers.AddProvider(ctx, keyMH, dht.self)
if !brdcst {
return nil
}
Expand All @@ -454,12 +452,12 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
defer cancel()
}

peers, err := dht.GetClosestPeers(closerCtx, string(key))
peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
if err != nil {
return err
}

mes, err := dht.makeProvRecord(key)
mes, err := dht.makeProvRecord(keyMH)
if err != nil {
return err
}
Expand All @@ -469,7 +467,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
logger.Debugf("putProvider(%s, %s)", key, p)
logger.Debugf("putProvider(%s, %s)", keyMH, p)
err := dht.sendMessage(ctx, p, mes)
if err != nil {
logger.Debug(err)
Expand All @@ -479,7 +477,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
wg.Wait()
return nil
}
func (dht *IpfsDHT) makeProvRecord(key mh.Multihash) (*pb.Message, error) {
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
pi := peer.AddrInfo{
ID: dht.self,
Addrs: dht.host.Addrs(),
Expand Down Expand Up @@ -524,14 +522,13 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
return peerOut
}

func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Cid, count int, peerOut chan peer.AddrInfo) {
defer logger.EventBegin(ctx, "findProvidersAsync", keyCid).Done()
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan peer.AddrInfo) {
defer logger.EventBegin(ctx, "findProvidersAsync", key).Done()
defer close(peerOut)

key := keyCid.Hash()

keyMH := key.Hash()
ps := peer.NewLimitedSet(count)
provs := dht.providers.GetProviders(ctx, key)
provs := dht.providers.GetProviders(ctx, keyMH)
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
Expand All @@ -550,7 +547,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci
}
}

peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(keyMH)), AlphaValue)
if len(peers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
Expand All @@ -561,12 +558,12 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci

// setup the Query
parent := ctx
query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
query := dht.newQuery(string(keyMH), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
routing.PublishQueryEvent(parent, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
pmes, err := dht.findProvidersSingle(ctx, p, keyCid)
pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -629,7 +626,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci
}

// refresh the k-bucket containing this key after the query is run
dht.routingTable.BucketForID(kb.ConvertKey(string(key))).ResetRefreshedAt(time.Now())
dht.routingTable.BucketForID(kb.ConvertKey(string(keyMH))).ResetRefreshedAt(time.Now())

}

Expand Down

0 comments on commit c315513

Please sign in to comment.