Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract and rework commands package #3856

Merged
merged 3 commits into from
Nov 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"

context "context"

"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
)

Expand Down
15 changes: 10 additions & 5 deletions blocks/blockstore/util/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c
return stillOkay
}

// ProcRmOutput takes the channel returned by RmBlocks and writes
// to stdout/stderr according to the RemovedBlock objects received in
// that channel.
func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error {
// ProcRmOutput takes a function which returns a result from RmBlocks or EOF if there is no input.
// It then writes to stdout/stderr according to the RemovedBlock object returned from the function.
func ProcRmOutput(next func() (interface{}, error), sout io.Writer, serr io.Writer) error {
someFailed := false
for res := range in {
for {
res, err := next()
if err == io.EOF {
break
} else if err != nil {
return err
}
r := res.(*RemovedBlock)
if r.Hash == "" && r.Error != "" {
return fmt.Errorf("aborted: %s", r.Error)
Expand Down
80 changes: 41 additions & 39 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sort"
"sync"

cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
commands "github.com/ipfs/go-ipfs/core/commands"
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
Expand All @@ -20,7 +19,9 @@ import (
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"

"gx/ipfs/QmSNbH2A1evCCbJSDC6u3RV3GGDhgu6pRGbXHvrN89tMKf/go-ipfs-cmdkit"
mprome "gx/ipfs/QmSk46nSD78YiuNojYMS8NW6hSCjH95JajqqzzoychZgef/go-metrics-prometheus"
cmds "gx/ipfs/QmUgr8HrEkQqXfBPtj1A2UEg1V7cvhUhDsmL44wFPCJk5k/go-ipfs-cmds"
"gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus"
"gx/ipfs/QmX3U3YXCQ6UYBxq2LVWF8dARS1hPUTEYLrSx654Qyxyw6/go-multiaddr-net"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
Expand Down Expand Up @@ -51,7 +52,7 @@ const (
)

var daemonCmd = &cmds.Command{
Helptext: cmds.HelpText{
Helptext: cmdkit.HelpText{
Tagline: "Run a network-connected IPFS node.",
ShortDescription: `
'ipfs daemon' runs a persistent ipfs daemon that can serve commands
Expand Down Expand Up @@ -142,24 +143,25 @@ Headers.
`,
},

Options: []cmds.Option{
cmds.BoolOption(initOptionKwd, "Initialize ipfs with default settings if not already initialized").Default(false),
cmds.StringOption(routingOptionKwd, "Overrides the routing option").Default("dht"),
cmds.BoolOption(mountKwd, "Mounts IPFS to the filesystem").Default(false),
cmds.BoolOption(writableKwd, "Enable writing objects (with POST, PUT and DELETE)").Default(false),
cmds.StringOption(ipfsMountKwd, "Path to the mountpoint for IPFS (if using --mount). Defaults to config setting."),
cmds.StringOption(ipnsMountKwd, "Path to the mountpoint for IPNS (if using --mount). Defaults to config setting."),
cmds.BoolOption(unrestrictedApiAccessKwd, "Allow API access to unlisted hashes").Default(false),
cmds.BoolOption(unencryptTransportKwd, "Disable transport encryption (for debugging protocols)").Default(false),
cmds.BoolOption(enableGCKwd, "Enable automatic periodic repo garbage collection").Default(false),
cmds.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").Default(true),
cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmds.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").Default(true),
Options: []cmdkit.Option{
cmdkit.BoolOption(initOptionKwd, "Initialize ipfs with default settings if not already initialized").Default(false),
cmdkit.StringOption(routingOptionKwd, "Overrides the routing option").Default("dht"),
cmdkit.BoolOption(mountKwd, "Mounts IPFS to the filesystem").Default(false),
cmdkit.BoolOption(writableKwd, "Enable writing objects (with POST, PUT and DELETE)").Default(false),
cmdkit.StringOption(ipfsMountKwd, "Path to the mountpoint for IPFS (if using --mount). Defaults to config setting."),
cmdkit.StringOption(ipnsMountKwd, "Path to the mountpoint for IPNS (if using --mount). Defaults to config setting."),
cmdkit.BoolOption(unrestrictedApiAccessKwd, "Allow API access to unlisted hashes").Default(false),
cmdkit.BoolOption(unencryptTransportKwd, "Disable transport encryption (for debugging protocols)").Default(false),
cmdkit.BoolOption(enableGCKwd, "Enable automatic periodic repo garbage collection").Default(false),
cmdkit.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").Default(true),
cmdkit.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false),
cmdkit.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmdkit.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").Default(true),

// TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
// cmds.StringOption(swarmAddrKwd, "Address for the swarm socket (overrides config)"),
// cmdkit.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
// cmdkit.StringOption(swarmAddrKwd, "Address for the swarm socket (overrides config)"),
},
Subcommands: map[string]*cmds.Command{},
Run: daemonFunc,
Expand All @@ -178,7 +180,7 @@ func defaultMux(path string) corehttp.ServeOption {

var fileDescriptorCheck = func() error { return nil }

func daemonFunc(req cmds.Request, res cmds.Response) {
func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooof, changing res to re makes this diff a lot noisier than it already is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but it's only in the places where we use the new API. I feel using res would be confusing because there are places we still handle a Response

// Inject metrics before we do anything

err := mprome.Inject()
Expand Down Expand Up @@ -216,7 +218,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
// running in an uninitialized state.
initialize, _, err := req.Option(initOptionKwd).Bool()
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}

Expand All @@ -226,7 +228,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
if !fsrepo.IsInitialized(cfg) {
err := initWithDefaults(os.Stdout, cfg)
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}
}
Expand All @@ -237,7 +239,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
repo, err := fsrepo.Open(ctx.ConfigRoot)
switch err {
default:
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
case fsrepo.ErrNeedMigration:
domigrate, found, _ := req.Option(migrateKwd).Bool()
Expand All @@ -250,7 +252,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
if !domigrate {
fmt.Println("Not running migrations of fs-repo now.")
fmt.Println("Please get fs-repo-migrations from https://dist.ipfs.io")
res.SetError(fmt.Errorf("fs-repo requires migration"), cmds.ErrNormal)
re.SetError(fmt.Errorf("fs-repo requires migration"), cmdkit.ErrNormal)
return
}

Expand All @@ -260,13 +262,13 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
fmt.Printf(" %s\n", err)
fmt.Println("If you think this is a bug, please file an issue and include this whole log output.")
fmt.Println(" https://github.com/ipfs/fs-repo-migrations")
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}

repo, err = fsrepo.Open(ctx.ConfigRoot)
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}
case nil:
Expand All @@ -275,7 +277,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {

cfg, err := ctx.GetConfig()
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}

Expand All @@ -297,12 +299,12 @@ func daemonFunc(req cmds.Request, res cmds.Response) {

routingOption, _, err := req.Option(routingOptionKwd).String()
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}
switch routingOption {
case routingOptionSupernodeKwd:
res.SetError(errors.New("supernode routing was never fully implemented and has been removed"), cmds.ErrNormal)
re.SetError(errors.New("supernode routing was never fully implemented and has been removed"), cmdkit.ErrNormal)
return
case routingOptionDHTClientKwd:
ncfg.Routing = core.DHTClientOption
Expand All @@ -311,14 +313,14 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
case routingOptionNoneKwd:
ncfg.Routing = core.NilRouterOption
default:
res.SetError(fmt.Errorf("unrecognized routing option: %s", routingOption), cmds.ErrNormal)
re.SetError(fmt.Errorf("unrecognized routing option: %s", routingOption), cmdkit.ErrNormal)
return
}

node, err := core.NewNode(req.Context(), ncfg)
if err != nil {
log.Error("error from node construction: ", err)
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}
node.SetLocal(false)
Expand Down Expand Up @@ -349,32 +351,32 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
// construct api endpoint - every time
err, apiErrc := serveHTTPApi(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}

// construct fuse mountpoints - if the user provided the --mount flag
mount, _, err := req.Option(mountKwd).Bool()
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}
if mount && offline {
res.SetError(errors.New("mount is not currently supported in offline mode"),
cmds.ErrClient)
re.SetError(errors.New("mount is not currently supported in offline mode"),
cmdkit.ErrClient)
return
}
if mount {
if err := mountFuse(req); err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}
}

// repo blockstore GC - if --enable-gc flag is present
err, gcErrc := maybeRunGC(req, node)
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}

Expand All @@ -384,7 +386,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
var err error
err, gwErrc = serveHTTPGateway(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
return
}
}
Expand All @@ -398,7 +400,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
for err := range merge(apiErrc, gwErrc, gcErrc) {
if err != nil {
log.Error(err)
res.SetError(err, cmds.ErrNormal)
re.SetError(err, cmdkit.ErrNormal)
}
}
}
Expand Down
37 changes: 21 additions & 16 deletions cmd/ipfs/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -9,21 +10,22 @@ import (
"path"
"strings"

context "context"
assets "github.com/ipfs/go-ipfs/assets"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
namesys "github.com/ipfs/go-ipfs/namesys"
config "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"

"gx/ipfs/QmSNbH2A1evCCbJSDC6u3RV3GGDhgu6pRGbXHvrN89tMKf/go-ipfs-cmdkit"
)

const (
nBitsForKeypairDefault = 2048
)

var initCmd = &cmds.Command{
Helptext: cmds.HelpText{
Helptext: cmdkit.HelpText{
Tagline: "Initializes ipfs config file.",
ShortDescription: `
Initializes ipfs configuration files and generates a new keypair.
Expand All @@ -44,18 +46,18 @@ environment variable:
export IPFS_PATH=/path/to/ipfsrepo
`,
},
Arguments: []cmds.Argument{
cmds.FileArg("default-config", false, false, "Initialize with the given configuration.").EnableStdin(),
Arguments: []cmdkit.Argument{
cmdkit.FileArg("default-config", false, false, "Initialize with the given configuration.").EnableStdin(),
},
Options: []cmds.Option{
cmds.IntOption("bits", "b", "Number of bits to use in the generated RSA private key.").Default(nBitsForKeypairDefault),
cmds.BoolOption("empty-repo", "e", "Don't add and pin help files to the local storage.").Default(false),
cmds.StringOption("profile", "p", "Apply profile settings to config. Multiple profiles can be separated by ','"),
Options: []cmdkit.Option{
cmdkit.IntOption("bits", "b", "Number of bits to use in the generated RSA private key.").Default(nBitsForKeypairDefault),
cmdkit.BoolOption("empty-repo", "e", "Don't add and pin help files to the local storage.").Default(false),
cmdkit.StringOption("profile", "p", "Apply profile settings to config. Multiple profiles can be separated by ','"),

// TODO need to decide whether to expose the override as a file or a
// directory. That is: should we allow the user to also specify the
// name of the file?
// TODO cmds.StringOption("event-logs", "l", "Location for machine-readable event logs."),
// TODO cmdkit.StringOption("event-logs", "l", "Location for machine-readable event logs."),
},
PreRun: func(req cmds.Request) error {
daemonLocked, err := fsrepo.LockedByOtherProcess(req.InvocContext().ConfigRoot)
Expand All @@ -73,20 +75,23 @@ environment variable:
return nil
},
Run: func(req cmds.Request, res cmds.Response) {
// needs to be called at least once
res.SetOutput(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know i've asked this before, but remind me again why this is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because res.Output is called concurrently and we need to make sure SetOutput is called before Output reads the value. So there is a channel read at the beginning of Output, and a channel close (guarded by a sync.Once) at the end of SetOutput.
So if we didn't call SetError, the channel wouldn't close and Output would sit there and wait forever.


if req.InvocContext().Online {
res.SetError(errors.New("init must be run offline only!"), cmds.ErrNormal)
res.SetError(errors.New("init must be run offline only!"), cmdkit.ErrNormal)
return
}

empty, _, err := req.Option("e").Bool()
if err != nil {
res.SetError(err, cmds.ErrNormal)
res.SetError(err, cmdkit.ErrNormal)
return
}

nBitsForKeypair, _, err := req.Option("b").Int()
if err != nil {
res.SetError(err, cmds.ErrNormal)
res.SetError(err, cmdkit.ErrNormal)
return
}

Expand All @@ -96,20 +101,20 @@ environment variable:
if f != nil {
confFile, err := f.NextFile()
if err != nil {
res.SetError(err, cmds.ErrNormal)
res.SetError(err, cmdkit.ErrNormal)
return
}

conf = &config.Config{}
if err := json.NewDecoder(confFile).Decode(conf); err != nil {
res.SetError(err, cmds.ErrNormal)
res.SetError(err, cmdkit.ErrNormal)
return
}
}

profile, _, err := req.Option("profile").String()
if err != nil {
res.SetError(err, cmds.ErrNormal)
res.SetError(err, cmdkit.ErrNormal)
return
}

Expand All @@ -119,7 +124,7 @@ environment variable:
}

if err := doInit(os.Stdout, req.InvocContext().ConfigRoot, empty, nBitsForKeypair, profiles, conf); err != nil {
res.SetError(err, cmds.ErrNormal)
res.SetError(err, cmdkit.ErrNormal)
return
}
},
Expand Down
Loading