From eaa433f27bcafe591f8b2ef02e5f93398ab9e629 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 19 Aug 2016 19:52:49 -0700 Subject: [PATCH 1/4] cmds: implement ipfs dht provide command License: MIT Signed-off-by: Jeromy --- core/commands/dht.go | 111 +++++++++++++++++++++++++++++++++++++++++ routing/dht/routing.go | 5 ++ 2 files changed, 116 insertions(+) diff --git a/core/commands/dht.go b/core/commands/dht.go index 2d1149427c4..a549fddff9f 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -31,6 +31,7 @@ var DhtCmd = &cmds.Command{ "findpeer": findPeerDhtCmd, "get": getValueDhtCmd, "put": putValueDhtCmd, + "provide": provideRefDhtCmd, }, } @@ -227,6 +228,116 @@ var findProvidersDhtCmd = &cmds.Command{ Type: notif.QueryEvent{}, } +var provideRefDhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Announce to the network that you are providing given values.", + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, true, "The key to find providers for.").EnableStdin(), + }, + Options: []cmds.Option{ + cmds.BoolOption("verbose", "v", "Print extra information.").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + dht, ok := n.Routing.(*ipdht.IpfsDHT) + if !ok { + res.SetError(ErrNotDHT, cmds.ErrNormal) + return + } + + var keys []key.Key + for _, arg := range req.Arguments() { + k := key.B58KeyDecode(arg) + if k == "" { + res.SetError(fmt.Errorf("incorrectly formatted key: ", arg), cmds.ErrNormal) + return + } + + has, err := n.Blockstore.Has(k) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + if !has { + res.SetError(fmt.Errorf("block %s not found locally, cannot provide", k), cmds.ErrNormal) + return + } + + keys = append(keys, k) + } + + outChan := make(chan interface{}) + res.SetOutput((<-chan interface{})(outChan)) + + events := make(chan *notif.QueryEvent) + ctx := notif.RegisterForQueryEvents(req.Context(), events) + + go func() { + defer close(outChan) + for e := range events { + outChan <- e + } + }() + + go func() { + defer close(outChan) + for _, k := range keys { + err := dht.Provide(ctx, k) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + return + } + } + }() + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + verbose, _, _ := res.Request().Option("v").Bool() + pfm := pfuncMap{ + notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { + if verbose { + fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID) + } + }, + } + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*notif.QueryEvent) + if !ok { + return nil, u.ErrCast() + } + + buf := new(bytes.Buffer) + printEvent(obj, buf, verbose, pfm) + return buf, nil + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + Res: res, + }, nil + }, + }, + Type: notif.QueryEvent{}, +} + var findPeerDhtCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.", diff --git a/routing/dht/routing.go b/routing/dht/routing.go index b9e547cacb9..4bdb39a8695 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -263,6 +263,10 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error { go func(p peer.ID) { defer wg.Done() log.Debugf("putProvider(%s, %s)", key, p) + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.FinalPeer, + ID: p, + }) err := dht.sendMessage(ctx, p, mes) if err != nil { log.Debug(err) @@ -272,6 +276,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error { wg.Wait() return nil } + func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) { pi := pstore.PeerInfo{ ID: dht.self, From 8aac844641dbf06176943a0d2fcd33030eb55824 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 22 Aug 2016 20:56:24 -0700 Subject: [PATCH 2/4] add -r flag for dht provide License: MIT Signed-off-by: Jeromy --- core/commands/dht.go | 72 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 13 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index a549fddff9f..73740ec99a4 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -9,12 +9,15 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" cmds "github.com/ipfs/go-ipfs/commands" + dag "github.com/ipfs/go-ipfs/merkledag" notif "github.com/ipfs/go-ipfs/notifications" path "github.com/ipfs/go-ipfs/path" + routing "github.com/ipfs/go-ipfs/routing" ipdht "github.com/ipfs/go-ipfs/routing/dht" pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" + "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) var ErrNotDHT = errors.New("routing service is not a DHT") @@ -238,6 +241,7 @@ var provideRefDhtCmd = &cmds.Command{ }, Options: []cmds.Option{ cmds.BoolOption("verbose", "v", "Print extra information.").Default(false), + cmds.BoolOption("recursive", "r", "Recursively provide entire graph.").Default(false), }, Run: func(req cmds.Request, res cmds.Response) { n, err := req.InvocContext().GetNode() @@ -246,12 +250,13 @@ var provideRefDhtCmd = &cmds.Command{ return } - dht, ok := n.Routing.(*ipdht.IpfsDHT) - if !ok { - res.SetError(ErrNotDHT, cmds.ErrNormal) + if n.Routing == nil { + res.SetError(errNotOnline, cmds.ErrNormal) return } + rec, _, _ := req.Option("recursive").Bool() + var keys []key.Key for _, arg := range req.Arguments() { k := key.B58KeyDecode(arg) @@ -288,16 +293,11 @@ var provideRefDhtCmd = &cmds.Command{ }() go func() { - defer close(outChan) - for _, k := range keys { - err := dht.Provide(ctx, k) - if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) - return - } + defer close(events) + if rec { + provideKeysRec(ctx, n.Routing, n.DAG, keys) + } else { + provideKeys(ctx, n.Routing, keys) } }() }, @@ -338,6 +338,52 @@ var provideRefDhtCmd = &cmds.Command{ Type: notif.QueryEvent{}, } +func provideKeys(ctx context.Context, r routing.IpfsRouting, keys []key.Key) { + for _, k := range keys { + err := r.Provide(ctx, k) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + return + } + } +} + +func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, keys []key.Key) { + for _, k := range keys { + kset := key.NewKeySet() + node, err := dserv.Get(ctx, k) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + } + + err = dag.EnumerateChildrenAsync(ctx, dserv, node, kset) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + } + + for _, k := range kset.Keys() { + err = r.Provide(ctx, k) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + return + } + } + } + +} + var findPeerDhtCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.", From 798569bb68c100d09f92971923b6c495b4fc5136 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 24 Aug 2016 10:17:37 -0700 Subject: [PATCH 3/4] don't provide same block twice during call to dht provide License: MIT Signed-off-by: Jeromy --- core/commands/dht.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/commands/dht.go b/core/commands/dht.go index 73740ec99a4..8a29f11b48a 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -352,6 +352,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, keys []key.Key) { } func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, keys []key.Key) { + provided := make(map[key.Key]struct{}) for _, k := range keys { kset := key.NewKeySet() node, err := dserv.Get(ctx, k) @@ -371,6 +372,10 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer } for _, k := range kset.Keys() { + if _, ok := provided[k]; ok { + continue + } + err = r.Provide(ctx, k) if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ @@ -379,6 +384,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer }) return } + provided[k] = struct{}{} } } From 0a6ab30212876dfdc35e1777d7585551ff4c33a2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 24 Aug 2016 10:32:40 -0700 Subject: [PATCH 4/4] address CR feedback License: MIT Signed-off-by: Jeromy --- core/commands/dht.go | 41 ++++++++++++++++++----------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index 8a29f11b48a..b4dc370ff9b 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -237,7 +237,7 @@ var provideRefDhtCmd = &cmds.Command{ }, Arguments: []cmds.Argument{ - cmds.StringArg("key", true, true, "The key to find providers for.").EnableStdin(), + cmds.StringArg("key", true, true, "The key[s] to send provide records for.").EnableStdin(), }, Options: []cmds.Option{ cmds.BoolOption("verbose", "v", "Print extra information.").Default(false), @@ -294,10 +294,17 @@ var provideRefDhtCmd = &cmds.Command{ go func() { defer close(events) + var err error if rec { - provideKeysRec(ctx, n.Routing, n.DAG, keys) + err = provideKeysRec(ctx, n.Routing, n.DAG, keys) } else { - provideKeys(ctx, n.Routing, keys) + err = provideKeys(ctx, n.Routing, keys) + } + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) } }() }, @@ -338,37 +345,28 @@ var provideRefDhtCmd = &cmds.Command{ Type: notif.QueryEvent{}, } -func provideKeys(ctx context.Context, r routing.IpfsRouting, keys []key.Key) { +func provideKeys(ctx context.Context, r routing.IpfsRouting, keys []key.Key) error { for _, k := range keys { err := r.Provide(ctx, k) if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) - return + return err } } + return nil } -func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, keys []key.Key) { +func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, keys []key.Key) error { provided := make(map[key.Key]struct{}) for _, k := range keys { kset := key.NewKeySet() node, err := dserv.Get(ctx, k) if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) + return err } err = dag.EnumerateChildrenAsync(ctx, dserv, node, kset) if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) + return err } for _, k := range kset.Keys() { @@ -378,16 +376,13 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer err = r.Provide(ctx, k) if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) - return + return err } provided[k] = struct{}{} } } + return nil } var findPeerDhtCmd = &cmds.Command{