Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix dht commands when pubsub routing is enabled #5200

Merged
merged 3 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 27 additions & 30 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"

ipdht "gx/ipfs/QmNg6M98bwS97SL9ArvrRxKujFps3eV6XvmKgduiYga8Bn/go-libp2p-kad-dht"
routing "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing"
notif "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing/notifications"
b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58"
Expand All @@ -26,6 +25,9 @@ import (

var ErrNotDHT = errors.New("routing service is not a DHT")

// TODO: Factor into `ipfs dht` and `ipfs routing`.
// Everything *except `query` goes into `ipfs routing`.

var DhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Issue commands directly through the DHT.",
Expand Down Expand Up @@ -61,8 +63,7 @@ var queryDhtCmd = &cmds.Command{
return
}

dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
if n.DHT == nil {
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
return
}
Expand All @@ -76,7 +77,7 @@ var queryDhtCmd = &cmds.Command{
return
}

closestPeers, err := dht.GetClosestPeers(ctx, string(id))
closestPeers, err := n.DHT.GetClosestPeers(ctx, string(id))
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -140,7 +141,7 @@ var queryDhtCmd = &cmds.Command{

var findProvidersDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Find peers in the DHT that can provide a specific value, given a key.",
Tagline: "Find peers that can provide a specific value, given a key.",
ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.",
},

Expand All @@ -158,9 +159,8 @@ var findProvidersDhtCmd = &cmds.Command{
return
}

dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
if n.Routing == nil {
res.SetError(errNotOnline, cmdkit.ErrNormal)
return
}

Expand All @@ -186,7 +186,7 @@ var findProvidersDhtCmd = &cmds.Command{
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))

pchan := dht.FindProvidersAsync(ctx, c, numProviders)
pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)
for e := range events {
Expand Down Expand Up @@ -406,7 +406,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGSe

var findPeerDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.",
Tagline: "Find the multiaddresses associated with a Peer ID.",
ShortDescription: "Outputs a list of newline-delimited multiaddresses.",
},

Expand All @@ -423,9 +423,8 @@ var findPeerDhtCmd = &cmds.Command{
return
}

dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
if n.Routing == nil {
res.SetError(errNotOnline, cmdkit.ErrNormal)
return
}

Expand Down Expand Up @@ -454,7 +453,7 @@ var findPeerDhtCmd = &cmds.Command{

go func() {
defer close(events)
pi, err := dht.FindPeer(ctx, pid)
pi, err := n.Routing.FindPeer(ctx, pid)
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Expand Down Expand Up @@ -504,14 +503,14 @@ var findPeerDhtCmd = &cmds.Command{

var getValueDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Given a key, query the DHT for its best value.",
Tagline: "Given a key, query the routing system for its best value.",
ShortDescription: `
Outputs the best value for the given key.

There may be several different values for a given key stored in the DHT; in
this context 'best' means the record that is most desirable. There is no one
metric for 'best': it depends entirely on the key type. For IPNS, 'best' is
the record that is both valid and has the highest sequence number (freshest).
There may be several different values for a given key stored in the routing
system; in this context 'best' means the record that is most desirable. There is
no one metric for 'best': it depends entirely on the key type. For IPNS, 'best'
is the record that is both valid and has the highest sequence number (freshest).
Different key types can specify other 'best' rules.
`,
},
Expand All @@ -529,9 +528,8 @@ Different key types can specify other 'best' rules.
return
}

dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
if n.Routing == nil {
res.SetError(errNotOnline, cmdkit.ErrNormal)
return
}

Expand Down Expand Up @@ -559,7 +557,7 @@ Different key types can specify other 'best' rules.

go func() {
defer close(events)
val, err := dht.GetValue(ctx, dhtkey)
val, err := n.Routing.GetValue(ctx, dhtkey)
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Expand Down Expand Up @@ -610,18 +608,18 @@ Different key types can specify other 'best' rules.

var putValueDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Write a key/value pair to the DHT.",
Tagline: "Write a key/value pair to the routing system.",
ShortDescription: `
Given a key of the form /foo/bar and a value of any form, this will write that
value to the DHT with that key.
value to the routing system with that key.

Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the
/ipns keytype, and expects the key name to be a Peer ID. IPNS entries are
specifically formatted (protocol buffer).

You may only use keytypes that are supported in your ipfs binary: currently
this is only /ipns. Unless you have a relatively deep understanding of the
go-ipfs DHT internals, you likely want to be using 'ipfs name publish' instead
go-ipfs routing internals, you likely want to be using 'ipfs name publish' instead
of this.

Value is arbitrary text. Standard input can be used to provide value.
Expand All @@ -644,9 +642,8 @@ NOTE: A value may not exceed 2048 bytes.
return
}

dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
if n.Routing == nil {
res.SetError(errNotOnline, cmdkit.ErrNormal)
return
}

Expand Down Expand Up @@ -677,7 +674,7 @@ NOTE: A value may not exceed 2048 bytes.

go func() {
defer close(events)
err := dht.PutValue(ctx, key, []byte(data))
err := n.Routing.PutValue(ctx, key, []byte(data))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Expand Down
26 changes: 22 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
dhtopts "gx/ipfs/QmNg6M98bwS97SL9ArvrRxKujFps3eV6XvmKgduiYga8Bn/go-libp2p-kad-dht/opts"
u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util"
routing "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing"
psrouter "gx/ipfs/QmPxCZ99jTHMxD93qQV4pN3WJbBHrBLCQiumWDgfJjFQJy/go-libp2p-pubsub-router"
psrouter "gx/ipfs/QmR2hqcem4qjd4DkuyiwSFjfUiCP5eXHdPoM7o7dWKwct9/go-libp2p-pubsub-router"
pnet "gx/ipfs/QmRGvSwDpN4eunxgDNfmQhayZ6Z9F5a2v31V2D7y77osLg/go-libp2p-pnet"
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
Expand All @@ -65,7 +65,7 @@ import (
ping "gx/ipfs/QmZ86eLPtXkQ1Dfa992Q8NpXArUoWWh3y728JDcWvzRrvC/go-libp2p/p2p/protocol/ping"
mplex "gx/ipfs/QmZHiqdRuNXujvSPNu1ZWxxzV6a2WhoZpfYkesdgyaKF9f/go-smux-multiplex"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
rhelpers "gx/ipfs/QmZw5m4ioaoNmATBtP3o7qC1UERubJgz84RzccT3UEHZKr/go-libp2p-routing-helpers"
rhelpers "gx/ipfs/Qmafsgr3GSDKyGHW8SU9dbe6Vtv4rEgcgJ3WRnS72qtAzv/go-libp2p-routing-helpers"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
Expand Down Expand Up @@ -137,6 +137,7 @@ type IpfsNode struct {

Floodsub *floodsub.PubSub
PSRouter *psrouter.PubsubValueStore
DHT *dht.IpfsDHT
P2P *p2p.P2P

proc goprocess.Process
Expand Down Expand Up @@ -465,6 +466,23 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
}
n.Routing = r

// TODO: I'm not a fan of type assertions like this but the
// `RoutingOption` system doesn't currently provide access to the
// IpfsNode.
//
// Ideally, we'd do something like:
//
// 1. Add some fancy method to introspect into tiered routers to extract
// things like the pubsub router or the DHT (complicated, messy,
// probably not worth it).
// 2. Pass the IpfsNode into the RoutingOption (would also remove the
// PSRouter case below.
// 3. Introduce some kind of service manager? (my personal favorite but
// that requires a fair amount of work).
if dht, ok := r.(*dht.IpfsDHT); ok {
n.DHT = dht
}

if ipnsps {
n.PSRouter = psrouter.NewPubsubValueStore(
ctx,
Expand Down Expand Up @@ -601,8 +619,8 @@ func (n *IpfsNode) teardown() error {
closers = append(closers, mount.Closer(n.Mounts.Ipns))
}

if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht.Process())
if n.DHT != nil {
closers = append(closers, n.DHT.Process())
}

if n.Blocks != nil {
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -524,15 +524,15 @@
},
{
"author": "stebalien",
"hash": "QmPxCZ99jTHMxD93qQV4pN3WJbBHrBLCQiumWDgfJjFQJy",
"hash": "QmR2hqcem4qjd4DkuyiwSFjfUiCP5eXHdPoM7o7dWKwct9",
"name": "go-libp2p-pubsub-router",
"version": "0.3.5"
"version": "0.3.6"
},
{
"author": "Stebalien",
"hash": "QmZw5m4ioaoNmATBtP3o7qC1UERubJgz84RzccT3UEHZKr",
"hash": "Qmafsgr3GSDKyGHW8SU9dbe6Vtv4rEgcgJ3WRnS72qtAzv",
"name": "go-libp2p-routing-helpers",
"version": "0.2.4"
"version": "0.2.5"
},
{
"author": "fsnotify",
Expand Down
6 changes: 3 additions & 3 deletions test/sharness/lib/iptb-lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ startup_cluster() {
bound=$(expr "$num_nodes" - 1)

if test -n "$other_args"; then
test_expect_success "start up nodes with additional args" '
iptb start --args $other_args
'
test_expect_success "start up nodes with additional args" "
iptb start --args \"${other_args[@]}\"
"
else
test_expect_success "start up nodes" '
iptb start
Expand Down
Loading