From 8033722f1f625b55482996ba251973e8b2a87ebe Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 18 Oct 2018 10:49:22 +0100 Subject: [PATCH] feat(commands): --stream option for ls Convert LS Command to use current cmds lib Update LS Command to support streaming Rebase fixes License: MIT Signed-off-by: hannahhoward --- core/commands/ls.go | 287 ++++++++++++++++++++++++++---------------- core/commands/root.go | 4 +- 2 files changed, 184 insertions(+), 107 deletions(-) diff --git a/core/commands/ls.go b/core/commands/ls.go index a911d2d701b8..760426c24148 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -1,17 +1,17 @@ package commands import ( - "bytes" "fmt" "io" "text/tabwriter" - cmds "github.com/ipfs/go-ipfs/commands" + cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" e "github.com/ipfs/go-ipfs/core/commands/e" iface "github.com/ipfs/go-ipfs/core/coreapi/interface" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" + cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" merkledag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag" offline "gx/ipfs/QmT6dHGp3UYd3vUMpy7rzX2CXQv7HLcj42Vtq8qwwjgASb/go-ipfs-exchange-offline" unixfs "gx/ipfs/QmUaZkqxmKvUX16F8XeAAk9LVvmNMktvbhcx4PG4s8SqDG/go-unixfs" @@ -21,24 +21,34 @@ import ( "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" ) +// LsLink contains printable data for a single ipld link in ls output type LsLink struct { Name, Hash string Size uint64 Type unixfspb.Data_DataType } +// LsObject is an element of LsOutput +// It can represent a whole directory, a directory header, one or more links, +// Or a the end of a directory type LsObject struct { - Hash string - Links []LsLink + Hash string + Links []LsLink + HasHeader bool + HasLinks bool + HasFooter bool } +// LsObject is a set of printable data for directories type LsOutput struct { - Objects []LsObject + MultipleFolders bool + Objects []LsObject } const ( lsHeadersOptionNameTime = "headers" lsResolveTypeOptionName = "resolve-type" + lsStreamOptionName = "stream" ) var LsCmd = &cmds.Command{ @@ -60,32 +70,20 @@ The JSON output contains type information. Options: []cmdkit.Option{ cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."), cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true), + cmdkit.BoolOption(lsStreamOptionName, "s", "Stream directory entries as they are found."), }, - Run: func(req cmds.Request, res cmds.Response) { - nd, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - api, err := req.InvocContext().GetApi() + api, err := cmdenv.GetApi(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - // get options early -> exit early in case of error - if _, _, err := req.Option(lsHeadersOptionNameTime).Bool(); err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - resolve, _, err := req.Option(lsResolveTypeOptionName).Bool() - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } + resolve, _ := req.Options[lsResolveTypeOptionName].(bool) dserv := nd.DAG if !resolve { offlineexch := offline.Exchange(nd.Blockstore) @@ -93,125 +91,204 @@ The JSON output contains type information. dserv = merkledag.NewDAGService(bserv) } - paths := req.Arguments() + err = req.ParseBodyArgs() + if err != nil { + return err + } + + paths := req.Arguments var dagnodes []ipld.Node for _, fpath := range paths { p, err := iface.ParsePath(fpath) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - dagnode, err := api.ResolveNode(req.Context(), p) + dagnode, err := api.ResolveNode(req.Context, p) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } dagnodes = append(dagnodes, dagnode) } - - output := make([]LsObject, len(req.Arguments())) - ng := merkledag.NewSession(req.Context(), nd.DAG) + ng := merkledag.NewSession(req.Context, nd.DAG) ro := merkledag.NewReadOnlyDagService(ng) + stream, _ := req.Options[lsStreamOptionName].(bool) + multipleFolders := len(req.Arguments) > 1 + if !stream { + output := make([]LsObject, len(req.Arguments)) + + for i, dagnode := range dagnodes { + dir, err := uio.NewDirectoryFromNode(ro, dagnode) + if err != nil && err != uio.ErrNotADir { + return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err) + } + + var links []*ipld.Link + if dir == nil { + links = dagnode.Links() + } else { + links, err = dir.Links(req.Context) + if err != nil { + return err + } + } + outputLinks := make([]LsLink, len(links)) + for j, link := range links { + lsLink, err := makeLsLink(req, dserv, resolve, link) + if err != nil { + return err + } + outputLinks[j] = *lsLink + } + output[i] = newFullDirectoryLsObject(paths[i], outputLinks) + } + + return cmds.EmitOnce(res, &LsOutput{multipleFolders, output}) + } + for i, dagnode := range dagnodes { dir, err := uio.NewDirectoryFromNode(ro, dagnode) if err != nil && err != uio.ErrNotADir { - res.SetError(fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err), cmdkit.ErrNormal) - return + return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err) } - var links []*ipld.Link + var linkResults <-chan unixfs.LinkResult if dir == nil { - links = dagnode.Links() + linkResults = makeDagNodeLinkResults(req, dagnode) } else { - links, err = dir.Links(req.Context()) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } + linkResults = dir.EnumLinksAsync(req.Context) } - output[i] = LsObject{ - Hash: paths[i], - Links: make([]LsLink, len(links)), - } - - for j, link := range links { - t := unixfspb.Data_DataType(-1) - - switch link.Cid.Type() { - case cid.Raw: - // No need to check with raw leaves - t = unixfs.TFile - case cid.DagProtobuf: - linkNode, err := link.GetNode(req.Context(), dserv) - if err == ipld.ErrNotFound && !resolve { - // not an error - linkNode = nil - } else if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } + output := make([]LsObject, 1) + outputLinks := make([]LsLink, 1) - if pn, ok := linkNode.(*merkledag.ProtoNode); ok { - d, err := unixfs.FSNodeFromBytes(pn.Data()) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - t = d.Type() - } + output[0] = newDirectoryHeaderLsObject(paths[i]) + if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil { + return nil + } + for linkResult := range linkResults { + if linkResult.Err != nil { + return linkResult.Err + } + link := linkResult.Link + lsLink, err := makeLsLink(req, dserv, resolve, link) + if err != nil { + return err } - output[i].Links[j] = LsLink{ - Name: link.Name, - Hash: link.Cid.String(), - Size: link.Size, - Type: t, + outputLinks[0] = *lsLink + output[0] = newDirectoryLinksLsObject(outputLinks) + if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil { + return err } } + output[0] = newDirectoryFooterLsObject() + if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil { + return err + } } - - res.SetOutput(&LsOutput{output}) + return nil }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func(res cmds.Response) (io.Reader, error) { - - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - - headers, _, _ := res.Request().Option(lsHeadersOptionNameTime).Bool() + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { + headers, _ := req.Options[lsHeadersOptionNameTime].(bool) output, ok := v.(*LsOutput) if !ok { - return nil, e.TypeErr(output, v) + return e.TypeErr(output, v) } - buf := new(bytes.Buffer) - w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) + tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0) for _, object := range output.Objects { - if len(output.Objects) > 1 { - fmt.Fprintf(w, "%s:\n", object.Hash) - } - if headers { - fmt.Fprintln(w, "Hash\tSize\tName") + if object.HasHeader { + if output.MultipleFolders { + fmt.Fprintf(tw, "%s:\n", object.Hash) + } + if headers { + fmt.Fprintln(tw, "Hash\tSize\tName") + } } - for _, link := range object.Links { - if link.Type == unixfs.TDirectory { - link.Name += "/" + if object.HasLinks { + for _, link := range object.Links { + if link.Type == unixfs.TDirectory { + link.Name += "/" + } + + fmt.Fprintf(tw, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name) } - fmt.Fprintf(w, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name) } - if len(output.Objects) > 1 { - fmt.Fprintln(w) + if object.HasFooter { + if output.MultipleFolders { + fmt.Fprintln(tw) + } } } - w.Flush() - - return buf, nil - }, + tw.Flush() + return nil + }), }, Type: LsOutput{}, } + +func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) <-chan unixfs.LinkResult { + linkResults := make(chan unixfs.LinkResult) + go func() { + defer close(linkResults) + for _, l := range dagnode.Links() { + select { + case linkResults <- unixfs.LinkResult{ + Link: l, + Err: nil, + }: + case <-req.Context.Done(): + return + } + } + }() + return linkResults +} + +func newFullDirectoryLsObject(hash string, links []LsLink) LsObject { + return LsObject{hash, links, true, true, true} +} +func newDirectoryHeaderLsObject(hash string) LsObject { + return LsObject{hash, nil, true, false, false} +} +func newDirectoryLinksLsObject(links []LsLink) LsObject { + return LsObject{"", links, false, true, false} +} +func newDirectoryFooterLsObject() LsObject { + return LsObject{"", nil, false, false, true} +} + +func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolve bool, link *ipld.Link) (*LsLink, error) { + t := unixfspb.Data_DataType(-1) + + switch link.Cid.Type() { + case cid.Raw: + // No need to check with raw leaves + t = unixfs.TFile + case cid.DagProtobuf: + linkNode, err := link.GetNode(req.Context, dserv) + if err == ipld.ErrNotFound && !resolve { + // not an error + linkNode = nil + } else if err != nil { + return nil, err + } + + if pn, ok := linkNode.(*merkledag.ProtoNode); ok { + d, err := unixfs.FSNodeFromBytes(pn.Data()) + if err != nil { + return nil, err + } + t = d.Type() + } + } + return &LsLink{ + Name: link.Name, + Hash: link.Cid.String(), + Size: link.Size, + Type: t, + }, nil +} diff --git a/core/commands/root.go b/core/commands/root.go index 9ddd227ed6c3..a3a9f20b6d3d 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -131,7 +131,7 @@ var rootSubcommands = map[string]*cmds.Command{ "id": IDCmd, "key": KeyCmd, "log": lgc.NewCommand(LogCmd), - "ls": lgc.NewCommand(LsCmd), + "ls": LsCmd, "mount": lgc.NewCommand(MountCmd), "name": name.NameCmd, "object": ocmd.ObjectCmd, @@ -168,7 +168,7 @@ var rootROSubcommands = map[string]*cmds.Command{ }, "get": GetCmd, "dns": DNSCmd, - "ls": lgc.NewCommand(LsCmd), + "ls": LsCmd, "name": { Subcommands: map[string]*cmds.Command{ "resolve": name.IpnsCmd,