Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Commit

Permalink
Feat/support oss storage (#147)
Browse files Browse the repository at this point in the history
* Sector Redo: The context passed by the command is released before the thread runs (#142)

Co-authored-by: 一页素书 <[email protected]>

* Fix bugs (#143)

* Fix the bug of SetConfig exception

* Missing some color schemes for sector status

* Repair the pieces info of the cc data modified in the verification deal (#146)

Co-authored-by: 一页素书 <[email protected]>

* use oss storage

* fix cli

Co-authored-by: Susanoo <[email protected]>
Co-authored-by: 一页素书 <[email protected]>
Co-authored-by: 问心 <[email protected]>
  • Loading branch information
4 people committed Dec 8, 2021
1 parent 8007a79 commit d313bbc
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 63 deletions.
9 changes: 9 additions & 0 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func GetFullNodeAPIV2(cctx *cli.Context) (FullNode, jsonrpc.ClientCloser, error)
return NewFullNodeRPC(cctx.Context, addr, apiInfo.AuthHeader())
}

func GetFullNodeFromNodeConfig(ctx context.Context, cfg *config.NodeConfig) (FullNode, jsonrpc.ClientCloser, error) {
apiInfo := apiinfo.NewAPIInfo(cfg.Url, cfg.Token)
addr, err := apiInfo.DialArgs("v1")
if err != nil {
return nil, nil, xerrors.Errorf("could not get DialArgs: %w", err)
}
return NewFullNodeRPC(ctx, addr, apiInfo.AuthHeader())
}

func GetFullNodeAPIFromConfig(cctx *cli.Context) (apiinfo.APIInfo, error) {
repoPath := cctx.String("repo")
cfgPath := config.FsConfig(repoPath)
Expand Down
2 changes: 2 additions & 0 deletions api/impl/strageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package impl
import (
"context"
"encoding/json"
api2 "github.com/filecoin-project/venus-market/api"
types4 "github.com/filecoin-project/venus-market/types"
"net/http"
"strconv"
"time"
Expand Down
1 change: 1 addition & 0 deletions api/storage_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
types4 "github.com/filecoin-project/venus-market/types"
"time"

"github.com/google/uuid"
Expand Down
114 changes: 77 additions & 37 deletions app/venus-sealer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/filecoin-project/venus-market/piecestorage"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -130,20 +131,28 @@ var initCmd = &cli.Command{
Usage: "gateway token",
},

&cli.StringFlag{
Name: "market-url",
Usage: "market url",
},
&cli.StringFlag{
Name: "market-token",
Usage: "market token",
},

&cli.StringFlag{
Name: "auth-token",
Usage: "auth token",
},

&cli.StringFlag{
Name: "piecestorage",
Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket}",
},
},
Action: func(cctx *cli.Context) error {
ctx := api.ReqContext(cctx)
log.Info("Initializing venus sealer")

sectorSizeInt, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return err
}
ssize := abi.SectorSize(sectorSizeInt)

gasPrice, err := types.BigFromString(cctx.String("gas-premium"))
if err != nil {
return xerrors.Errorf("failed to parse gas-price flag: %s", err)
Expand All @@ -160,8 +169,11 @@ var initCmd = &cli.Command{
return err
}

setAuthToken(cctx)
parseFlag(defaultCfg, cctx)
setAuthToken(defaultCfg, cctx)
err = parseFlag(defaultCfg, cctx)
if err != nil {
return err
}
if err := checkURL(defaultCfg); err != nil {
return err
}
Expand All @@ -176,14 +188,9 @@ var initCmd = &cli.Command{
return err
}

ctx := api.ReqContext(cctx)
if err := paramfetch.GetParams(ctx, ps, srs, uint64(ssize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

log.Info("Trying to connect to full node RPC")

fullNode, closer, err := api.GetFullNodeAPIV2(cctx) // TODO: consider storing full node address in config
fullNode, closer, err := api.GetFullNodeFromNodeConfig(ctx, &defaultCfg.Node) // TODO: consider storing full node address in config
if err != nil {
return err
}
Expand Down Expand Up @@ -282,7 +289,12 @@ var initCmd = &cli.Command{
}
}

if err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, ssize, gasPrice); err != nil {
ssize, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return fmt.Errorf("failed to parse sector size: %w", err)
}
minerAddr, err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, abi.SectorSize(ssize), gasPrice)
if err != nil {
log.Errorf("Failed to initialize venus-miner: %+v", err)
path, err := homedir.Expand(defaultCfg.DataDir)
if err != nil {
Expand All @@ -295,23 +307,31 @@ var initCmd = &cli.Command{
return xerrors.Errorf("Storage-miner init failed")
}

minerInfo, err := fullNode.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
if err != nil {
return err
}
// TODO: Point to setting storage price, maybe do it interactively or something
log.Info("Sealer successfully created, you can now start it with 'venus-sealer run'")

if err := paramfetch.GetParams(ctx, ps, srs, uint64(minerInfo.SectorSize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
return nil
},
}

func setAuthToken(cctx *cli.Context) {
func setAuthToken(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("auth-token") {
authToken := cctx.String("auth-token")
_ = cctx.Set("node-token", authToken)
_ = cctx.Set("messager-token", authToken)
_ = cctx.Set("gateway-token", authToken)
cfg.Node.Token = authToken
cfg.Messager.Token = authToken
cfg.RegisterProof.Token = authToken
cfg.RegisterMarket.Token = authToken
cfg.RegisterMarket.Token = authToken
}
}

func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) error {
cfg.DataDir = cctx.String("repo")

if cctx.IsSet("messager-url") {
Expand All @@ -326,6 +346,11 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
cfg.RegisterProof.Urls = cctx.StringSlice("gateway-url")
}

if cctx.IsSet("market-url") {
cfg.RegisterMarket.Urls = []string{cctx.String("market-url")}
cfg.Market.Url = cctx.String("market-url")
}

if cctx.IsSet("node-token") {
cfg.Node.Token = cctx.String("node-token")
}
Expand All @@ -337,6 +362,21 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("gateway-token") {
cfg.RegisterProof.Token = cctx.String("gateway-token")
}

if cctx.IsSet("market-token") {
cfg.Market.Token = cctx.String("market-token")
cfg.RegisterMarket.Token = cctx.String("market-token")
}

if cctx.IsSet("piecestorage") {
pieceStorage, err := piecestorage.ParserProtocol(cctx.String("piecestorage"))
if err != nil {
return err
}

cfg.PieceStorage = pieceStorage
}
return nil
}

func parseMultiAddr(url string) error {
Expand Down Expand Up @@ -367,76 +407,76 @@ func checkURL(cfg *config.StorageMiner) error {
return nil
}

func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) error {
func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) (address.Address, error) {
log.Info("Initializing libp2p identity")

repo, err := models.SetDataBase(config.HomeDir(cfg.DataDir), &cfg.DB)
if err != nil {
return err
return address.Undef, err
}
err = repo.AutoMigrate()
if err != nil {
return err
return address.Undef, err
}

metaDataService := service.NewMetadataService(repo)
sectorInfoService := service.NewSectorInfoService(repo)
p2pSk, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return xerrors.Errorf("make host key: %w", err)
return address.Undef, xerrors.Errorf("make host key: %w", err)
}

peerid, err := peer.IDFromPrivateKey(p2pSk)
if err != nil {
return xerrors.Errorf("peer ID from private key: %w", err)
return address.Undef, xerrors.Errorf("peer ID from private key: %w", err)
}

var addr address.Address
if act := cctx.String("actor"); act != "" {
a, err := address.NewFromString(act)
if err != nil {
return xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
return address.Undef, xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
}

if cctx.Bool("genesis-miner") {
if err := metaDataService.SaveMinerAddress(a); err != nil {
return err
return address.Undef, err
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

return nil
return a, nil
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

addr = a
} else {
a, err := createStorageMiner(ctx, api, messagerClient, peerid, gasPrice, cctx)
if err != nil {
return xerrors.Errorf("creating miner failed: %w", err)
return address.Undef, xerrors.Errorf("creating miner failed: %w", err)
}

addr = a
Expand All @@ -445,10 +485,10 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
log.Infof("Created new miner: %s", addr)

if err := metaDataService.SaveMinerAddress(addr); err != nil {
return err
return address.Undef, err
}

return nil
return addr, nil
}

func createStorageMiner(ctx context.Context, nodeAPI api.FullNode, messagerClient api.IMessager, peerid peer.ID, gasPrice types.BigInt, cctx *cli.Context) (address.Address, error) {
Expand Down
4 changes: 4 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/filecoin-project/go-state-types/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
api2 "github.com/filecoin-project/venus-market/api"
config2 "github.com/filecoin-project/venus-market/config"
"github.com/filecoin-project/venus-market/piecestorage"
"github.com/filecoin-project/venus-sealer/api"
"github.com/filecoin-project/venus-sealer/api/impl"
"github.com/filecoin-project/venus-sealer/config"
Expand Down Expand Up @@ -167,6 +169,7 @@ func Repo(cfg *config.StorageMiner) Option {
Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(*storage.AddressSelector), AddressSelector(&cfg.Addresses)),
Override(new(*config.DbConfig), &cfg.DB),
Override(new(*config2.PieceStorage), &cfg.PieceStorage),
Override(new(*config.StorageMiner), cfg),
Override(new(*config.MessagerConfig), &cfg.Messager),
Override(new(*config.MarketConfig), &cfg.Market),
Expand All @@ -176,6 +179,7 @@ func Repo(cfg *config.StorageMiner) Option {

Override(new(api.IMessager), api.NewMessageRPC),
Override(new(api2.MarketFullNode), api.NewMarketRPC),
Override(new(piecestorage.IPieceStorage), NewPieceStorage),
Override(new(*market_client.MarketEventClient), market_client.NewMarketEventClient),
Override(new(*proof_client.ProofEventClient), proof_client.NewProofEventClient),
Override(new(repo.Repo), models.SetDataBase),
Expand Down
28 changes: 15 additions & 13 deletions config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"encoding"
"github.com/filecoin-project/venus-market/config"
"net/http"
"strings"
"time"
Expand All @@ -25,7 +26,6 @@ const (
RetrievalPricingExternalMode = "external"
)


type HomeDir string

type StorageWorker struct {
Expand All @@ -41,19 +41,21 @@ func (cfg StorageWorker) LocalStorage() *LocalStorage {

// StorageMiner is a miner config
type StorageMiner struct {
DataDir string
API API
Dealmaking DealmakingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
NetParams NetParamsConfig
DB DbConfig
Node NodeConfig
JWT JWTConfig
Messager MessagerConfig
DataDir string
API API
Dealmaking DealmakingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
NetParams NetParamsConfig
DB DbConfig
Node NodeConfig
JWT JWTConfig
Messager MessagerConfig

Market MarketConfig
PieceStorage config.PieceStorage
RegisterProof RegisterProofConfig
RegisterMarket RegisterMarketConfig

Expand Down
Loading

0 comments on commit d313bbc

Please sign in to comment.