Skip to content

Commit

Permalink
feat: change init logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Apr 28, 2023
1 parent ad3d0c6 commit cc52aa4
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 173 deletions.
67 changes: 32 additions & 35 deletions cmd/daemon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"errors"
"fmt"
"os"

Expand Down Expand Up @@ -80,7 +81,7 @@ var daemonCmd = &cmds.Command{
return fmt.Errorf("fetching proof parameters: %w", err)
}

exist, err := repo.Exists(repoDir)
exist, err := repo.Exists(repoDir) //The configuration file and devgen are required for the program to start
if err != nil {
return err
}
Expand All @@ -98,7 +99,16 @@ var daemonCmd = &cmds.Command{
if err := re.Emit(repoDir); err != nil {
return err
}
if err := repo.InitFSRepo(repoDir, repo.LatestVersion, config.NewDefaultConfig()); err != nil {

cfg, err := repo.LoadConfig(repoDir) //use exit config, allow user prepare config before
if err != nil {
if errors.Is(err, os.ErrNotExist) {
cfg = config.NewDefaultConfig()
} else {
return err
}
}
if err := repo.InitFSRepo(repoDir, repo.LatestVersion, cfg); err != nil {
return err
}

Expand All @@ -112,7 +122,12 @@ var daemonCmd = &cmds.Command{
}

func initRun(req *cmds.Request) error {
rep, err := getRepo(req)
repoDir, _ := req.Options[OptionRepoDir].(string)
err := repo.WriteVersion(repoDir, repo.LatestVersion)
if err != nil {
return err
}
rep, err := getRepo(repoDir)
if err != nil {
return err
}
Expand Down Expand Up @@ -166,12 +181,22 @@ func initRun(req *cmds.Request) error {
return err
}

// import snapshot argument only work when init
importPath, _ := req.Options[ImportSnapshot].(string)
if len(importPath) != 0 {
err := Import(req.Context, rep, importPath)
if err != nil {
log.Errorf("failed to import snapshot, import path: %s, error: %s", importPath, err.Error())
return err
}
}

return nil
}

func daemonRun(req *cmds.Request, re cmds.ResponseEmitter) error {
// third precedence is config file.
rep, err := getRepo(req)
repoDir, _ := req.Options[OptionRepoDir].(string)
rep, err := getRepo(repoDir)
if err != nil {
return err
}
Expand Down Expand Up @@ -219,7 +244,7 @@ func daemonRun(req *cmds.Request, re cmds.ResponseEmitter) error {
}

if bootPeers, ok := req.Options[BootstrapPeers].([]string); ok && len(bootPeers) > 0 {
config.Bootstrap.Addresses = MergePeers(config.Bootstrap.Addresses, bootPeers)
config.Bootstrap.AddPeers(bootPeers...)
}

opts, err := node.OptionsFromRepo(rep)
Expand All @@ -234,14 +259,6 @@ func daemonRun(req *cmds.Request, re cmds.ResponseEmitter) error {
if isRelay, ok := req.Options[IsRelay].(bool); ok && isRelay {
opts = append(opts, node.IsRelay())
}
importPath, _ := req.Options[ImportSnapshot].(string)
if len(importPath) != 0 {
err := Import(req.Context, rep, importPath)
if err != nil {
log.Errorf("failed to import snapshot, import path: %s, error: %s", importPath, err.Error())
return err
}
}

if password, _ := req.Options[Password].(string); len(password) > 0 {
opts = append(opts, node.SetWalletPassword([]byte(password)))
Expand Down Expand Up @@ -296,8 +313,7 @@ func daemonRun(req *cmds.Request, re cmds.ResponseEmitter) error {
return fcn.RunRPCAndWait(req.Context, RootCmdDaemon, ready)
}

func getRepo(req *cmds.Request) (repo.Repo, error) {
repoDir, _ := req.Options[OptionRepoDir].(string)
func getRepo(repoDir string) (repo.Repo, error) {
repoDir, err := paths.GetRepoPath(repoDir)
if err != nil {
return nil, err
Expand All @@ -307,22 +323,3 @@ func getRepo(req *cmds.Request) (repo.Repo, error) {
}
return repo.OpenFSRepo(repoDir, repo.LatestVersion)
}

func MergePeers(peerSet1 []string, peerSet2 []string) []string {

filter := map[string]struct{}{}
for _, peer := range peerSet1 {
filter[peer] = struct{}{}
}

notInclude := []string{}
for _, peer := range peerSet2 {
_, has := filter[peer]
if has {
continue
}
filter[peer] = struct{}{}
notInclude = append(notInclude, peer)
}
return append(peerSet1, notInclude...)
}
7 changes: 6 additions & 1 deletion fixtures/networks/network_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"strings"

"github.com/filecoin-project/venus/pkg/util"

"github.com/filecoin-project/venus/pkg/config"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/filecoin-project/venus/venus-shared/utils"
Expand All @@ -28,8 +30,11 @@ func SetConfigFromOptions(cfg *config.Config, networkName string) error {
if err != nil {
return err
}
cfg.Bootstrap = &netcfg.Bootstrap
cfg.NetworkParams = &netcfg.Network
//merge with config and option
peers := util.MergePeers(cfg.Bootstrap.Addresses, netcfg.Bootstrap.Addresses)
cfg.Bootstrap = &netcfg.Bootstrap
cfg.Bootstrap.AddPeers(peers...)
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ type BootstrapConfig struct {
Period string `json:"period,omitempty"`
}

func (bsc *BootstrapConfig) AddPeers(peers ...string) {
filter := map[string]struct{}{}
for _, peer := range bsc.Addresses {
filter[peer] = struct{}{}
}

notInclude := []string{}
for _, peer := range peers {
_, has := filter[peer]
if has {
continue
}
filter[peer] = struct{}{}
notInclude = append(notInclude, peer)
}
bsc.Addresses = append(bsc.Addresses, notInclude...)
}

// TODO: provide bootstrap node addresses
func newDefaultBootstrapConfig() *BootstrapConfig {
return &BootstrapConfig{
Expand Down
18 changes: 18 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ func createConfigFile(t *testing.T, content string) (string, error) {
}

func TestDuration(t *testing.T) {
tf.UnitTest(t)

d, err := time.ParseDuration("1h5m")
require.NoError(t, err)

Expand All @@ -277,3 +279,19 @@ func TestDuration(t *testing.T) {
require.NoError(t, json.Unmarshal(data, &res))
require.Equal(t, dd, res)
}

func TestAddBootPeers(t *testing.T) {
tf.UnitTest(t)

boot := &BootstrapConfig{}
boot.AddPeers("a")
assert.Equal(t, []string{"a"}, boot.Addresses)

boot.AddPeers("a")
assert.Equal(t, []string{"a"}, boot.Addresses)

boot.Addresses = []string{"a", "b"}
boot.AddPeers("a", "c")
assert.Equal(t, []string{"a", "b", "c"}, boot.Addresses)

}
85 changes: 16 additions & 69 deletions pkg/repo/fsrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

badgerds "github.com/ipfs/go-ds-badger2"
lockfile "github.com/ipfs/go-fs-lock"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"github.com/pkg/errors"

Expand All @@ -39,14 +38,11 @@ const (
chainDatastorePrefix = "chain"
metaDatastorePrefix = "metadata"
paychDatastorePrefix = "paych"
snapshotStorePrefix = "snapshots"
snapshotFilenamePrefix = "snapshot"
dataTransfer = "data-transfer"
fsSqlite = "sqlite"
)

var log = logging.Logger("repo")

// FSRepo is a repo implementation backed by a filesystem.
type FSRepo struct {
// Path to the repo root directory.
Expand Down Expand Up @@ -88,11 +84,12 @@ func InitFSRepo(targetPath string, version uint, cfg *config.Config) error {
repoPath = "./"
}

exists, err := fileExists(repoPath)
versionFile := filepath.Join(repoPath, versionFilename)
exists, err := fileExists(versionFile)
if err != nil {
return errors.Wrapf(err, "error inspecting repo path %s", repoPath)
} else if exists {
return errors.Errorf("repo at %s, file exists", repoPath)
return errors.Errorf("version file detect at %s, file exists", versionFile)
}

// Create the actual directory and then the link to it.
Expand All @@ -111,14 +108,6 @@ func InitFSRepoDirect(targetPath string, version uint, cfg *config.Config) error
return errors.Wrap(err, "no writable directory")
}

empty, err := isEmptyDir(repoPath)
if err != nil {
return errors.Wrapf(err, "failed to list repo directory %s", repoPath)
}
if !empty {
return fmt.Errorf("refusing to initialize repo in non-empty directory %s", repoPath)
}

if err := WriteVersion(repoPath, version); err != nil {
return errors.Wrap(err, "initializing repo version failed")
}
Expand All @@ -133,16 +122,10 @@ func InitFSRepoDirect(targetPath string, version uint, cfg *config.Config) error
}

func Exists(repoPath string) (bool, error) {
_, err := os.Stat(filepath.Join(repoPath, walletDatastorePrefix))
_, err := os.Stat(filepath.Join(repoPath, versionFilename))
notExist := os.IsNotExist(err)
if notExist {
err = nil

_, err = os.Stat(filepath.Join(repoPath, configFilename))
notExist = os.IsNotExist(err)
if notExist {
err = nil
}
return false, nil
}
return !notExist, err
}
Expand Down Expand Up @@ -264,9 +247,6 @@ func (r *FSRepo) Config() *config.Config {

// ReplaceConfig replaces the current config with the newly passed in one.
func (r *FSRepo) ReplaceConfig(cfg *config.Config) error {
if err := r.SnapshotConfig(r.Config()); err != nil {
log.Warnf("failed to create snapshot: %s", err.Error())
}
r.lk.Lock()
defer r.lk.Unlock()

Expand All @@ -283,20 +263,6 @@ func (r *FSRepo) ReplaceConfig(cfg *config.Config) error {
return os.Rename(tmp, filepath.Join(r.path, configFilename))
}

// SnapshotConfig stores a copy `cfg` in <repo_path>/snapshots/ appending the
// time of snapshot to the filename.
func (r *FSRepo) SnapshotConfig(cfg *config.Config) error {
snapshotFile := filepath.Join(r.path, snapshotStorePrefix, genSnapshotFileName())
exists, err := fileExists(snapshotFile)
if err != nil {
return errors.Wrap(err, "error checking snapshot file")
} else if exists {
// this should never happen
return fmt.Errorf("file already exists: %s", snapshotFile)
}
return cfg.WriteFile(snapshotFile)
}

// Datastore returns the datastore.
func (r *FSRepo) Datastore() blockstoreutil.Blockstore {
return r.ds
Expand All @@ -316,10 +282,6 @@ func (r *FSRepo) MetaDatastore() Datastore {
return r.metaDs
}

/*func (r *FSRepo) MarketDatastore() Datastore {
return r.marketDs
}*/

func (r *FSRepo) PaychDatastore() Datastore {
return r.paychDs
}
Expand Down Expand Up @@ -394,16 +356,19 @@ func hasConfig(p string) (bool, error) {
}
}

func (r *FSRepo) loadConfig() error {
configFile := filepath.Join(r.path, configFilename)
func LoadConfig(p string) (*config.Config, error) {
configFile := filepath.Join(p, configFilename)

cfg, err := config.ReadFile(configFile)
if err != nil {
return errors.Wrapf(err, "failed to read config file at %q", configFile)
return nil, errors.Wrapf(err, "failed to read config file at %q", configFile)
}
return cfg, nil
}

r.cfg = cfg
return nil
func (r *FSRepo) loadConfig() (err error) {
r.cfg, err = LoadConfig(r.path)
return
}

// readVersion reads the repo's version file (but does not change r.version).
Expand Down Expand Up @@ -514,16 +479,11 @@ func initConfig(p string, cfg *config.Config) error {
if err != nil {
return errors.Wrap(err, "error inspecting config file")
} else if exists {
return fmt.Errorf("config file already exists: %s", configFile)
}

if err := cfg.WriteFile(configFile); err != nil {
return err
//config file prepared before
return nil
}

// make the snapshot dir
snapshotDir := filepath.Join(p, snapshotStorePrefix)
return ensureWritableDirectory(snapshotDir)
return cfg.WriteFile(configFile)
}

func initDataTransfer(p string) error {
Expand All @@ -542,10 +502,6 @@ func initDataTransfer(p string) error {
return os.MkdirAll(dataTransferDir, 0o777)
}

func genSnapshotFileName() string {
return fmt.Sprintf("%s-%d.json", snapshotFilenamePrefix, time.Now().UTC().UnixNano())
}

// Ensures that path points to a read/writable directory, creating it if necessary.
func ensureWritableDirectory(path string) error {
// Attempt to create the requested directory, accepting that something might already be there.
Expand All @@ -571,15 +527,6 @@ func ensureWritableDirectory(path string) error {
return nil
}

// Tests whether the directory at path is empty
func isEmptyDir(path string) (bool, error) {
infos, err := os.ReadDir(path)
if err != nil {
return false, err
}
return len(infos) == 0, nil
}

func fileExists(file string) (bool, error) {
_, err := os.Stat(file)
if err == nil {
Expand Down
Loading

0 comments on commit cc52aa4

Please sign in to comment.