Skip to content

Commit

Permalink
coreapi/dht: Review, cleanup
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <[email protected]>
  • Loading branch information
magik6k committed Mar 31, 2018
1 parent 77f6604 commit 6665373
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 136 deletions.
145 changes: 11 additions & 134 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"

routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing"
notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
ipdht "gx/ipfs/QmY1y2M1aCcVhy8UuTbZJBvuFbegZm47f9cDAdgxiehQfx/go-libp2p-kad-dht"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
Expand All @@ -23,73 +20,16 @@ var ErrNotDHT = errors.New("routing service is not a DHT")

type DhtAPI CoreAPI

func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (<-chan ma.Multiaddr, error) {
dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}

outChan := make(chan ma.Multiaddr)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)

go func() {
defer close(outChan)

sendAddrs := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
for _, addr := range response.Addrs {
select {
case outChan <- addr:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}

for event := range events {
if event.Type == notif.FinalPeer {
err := sendAddrs(event.Responses)
if err != nil {
return
}
}
}
}()

go func() {
defer close(events)
pi, err := dht.FindPeer(ctx, peer.ID(p))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
return
}

notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.FinalPeer,
Responses: []*pstore.PeerInfo{&pi},
})
}()

return outChan, nil
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
return api.node.Routing.FindPeer(ctx, p)
}

func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.ID, error) {
func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) {
settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}

dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}

p, err = api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
Expand All @@ -99,50 +39,10 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...

numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
return nil, fmt.Errorf("number of providers to find must be greater than 0")
}

outChan := make(chan peer.ID)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)

pchan := dht.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)

sendProviders := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
select {
case outChan <- response.ID:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

for event := range events {
if event.Type == notif.Provider {
err := sendProviders(event.Responses)
if err != nil {
return
}
}
}
}()

go func() {
defer close(events)
for p := range pchan {
np := p
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Provider,
Responses: []*pstore.PeerInfo{&np},
})
}
}()

return outChan, nil
return api.node.Routing.FindProvidersAsync(ctx, c, numProviders), nil
}

func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error {
Expand All @@ -155,43 +55,19 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
return errors.New("cannot provide in offline mode")
}

if len(api.node.PeerHost.Network().Conns()) == 0 {
return errors.New("cannot provide, no connected peers")
}

c := path.Cid()

has, err := api.node.Blockstore.Has(c)
has, err := api.node.Blockstore.Has(path.Cid())
if err != nil {
return err
}

if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
return fmt.Errorf("block %s not found locally, cannot provide", path.Cid())
}

//TODO: either remove or use
//outChan := make(chan interface{})

//events := make(chan *notif.QueryEvent)
//ctx = notif.RegisterForQueryEvents(ctx, events)

/*go func() {
defer close(outChan)
for range events {
select {
case <-ctx.Done():
return
default:
}
}
}()*/

//defer close(events)
if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c})
err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{path.Cid()})
} else {
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c})
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{path.Cid()})
}
if err != nil {
return err
Expand All @@ -211,10 +87,11 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
}

func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error {
provided := cid.NewSet()
provided := cid.NewSet() //TODO: Use a bloom filter
for _, c := range cids {
kset := cid.NewSet()

//TODO: After https://github.com/ipfs/go-ipfs/pull/4333 is merged, use n.Provider for this
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/interface/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
)

// DhtAPI specifies the interface to the DHT
type DhtAPI interface {
// FindPeer queries the DHT for all of the multiaddresses associated with a
// Peer ID
FindPeer(context.Context, peer.ID) (<-chan ma.Multiaddr, error)
FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error)

// FindProviders finds peers in the DHT who can provide a specific value
// given a key.
Expand Down

0 comments on commit 6665373

Please sign in to comment.