Skip to content

Commit

Permalink
Merge pull request #6259 from filecoin-project/raulk/docs-storage
Browse files Browse the repository at this point in the history
`storage` module: add go docs and minor code quality refactors
  • Loading branch information
magik6k authored May 20, 2021
2 parents 9ddeb53 + 996feda commit 5f2e163
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 70 deletions.
11 changes: 11 additions & 0 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,14 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet,
return cs.LoadTipSet(l[len(l)-1].Parents())
}

// ReorgOps takes two tipsets (which can be at different heights), and walks
// their corresponding chains backwards one step at a time until we find
// a common ancestor. It then returns the respective chain segments that fork
// from the identified ancestor, in reverse order, where the first element of
// each slice is the supplied tipset, and the last element is the common
// ancestor.
//
// If an error happens along the way, we return the error with nil slices.
func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
return ReorgOps(cs.LoadTipSet, a, b)
}
Expand Down Expand Up @@ -1235,6 +1243,9 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return blscids, secpkcids, nil
}

// GetPath returns the sequence of atomic head change operations that
// need to be applied in order to switch the head of the chain from the `from`
// tipset to the `to` tipset.
func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) {
fts, err := cs.LoadTipSet(from)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion extern/storage-sealing/precommit_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ type BasicPreCommitPolicy struct {
duration abi.ChainEpoch
}

// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy
// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy.
//
// The provided duration is used as the default sector expiry when the sector
// contains no deals. The proving boundary is used to adjust/align the sector's expiration.
func NewBasicPreCommitPolicy(api Chain, duration abi.ChainEpoch, provingBoundary abi.ChainEpoch) BasicPreCommitPolicy {
return BasicPreCommitPolicy{
api: api,
Expand Down
4 changes: 2 additions & 2 deletions storage/adapter_storage_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
var _ sealing.SealingAPI = new(SealingAPIAdapter)

type SealingAPIAdapter struct {
delegate storageMinerApi
delegate fullNodeFilteredAPI
}

func NewSealingAPIAdapter(api storageMinerApi) SealingAPIAdapter {
func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter {
return SealingAPIAdapter{delegate: api}
}

Expand Down
75 changes: 59 additions & 16 deletions storage/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"errors"
"time"

"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/go-state-types/dline"

"github.com/filecoin-project/go-bitfield"

"github.com/ipfs/go-cid"
Expand All @@ -20,9 +16,13 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/specs-storage/storage"

sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
Expand All @@ -41,8 +41,16 @@ import (

var log = logging.Logger("storageminer")

// Miner is the central miner entrypoint object inside Lotus. It is
// instantiated in the node builder, along with the WindowPoStScheduler.
//
// This object is the owner of the sealing pipeline. Most of the actual logic
// lives in the storage-sealing module (sealing.Sealing), and the Miner object
// exposes it to the rest of the system by proxying calls.
//
// Miner#Run starts the sealing FSM.
type Miner struct {
api storageMinerApi
api fullNodeFilteredAPI
feeCfg config.MinerFeeConfig
h host.Host
sealer sectorstorage.SectorManager
Expand Down Expand Up @@ -70,7 +78,9 @@ type SealingStateEvt struct {
Error string
}

type storageMinerApi interface {
// fullNodeFilteredAPI is the subset of the full node API the Miner needs from
// a Lotus full node.
type fullNodeFilteredAPI interface {
// Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
Expand Down Expand Up @@ -116,7 +126,18 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}

func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) {
// NewMiner creates a new Miner object.
func NewMiner(api fullNodeFilteredAPI,
maddr address.Address,
h host.Host,
ds datastore.Batching,
sealer sectorstorage.SectorManager,
sc sealing.SectorIDCounter,
verif ffiwrapper.Verifier,
gsd dtypes.GetSealingConfigFunc,
feeCfg config.MinerFeeConfig,
journal journal.Journal,
as *AddressSelector) (*Miner, error) {
m := &Miner{
api: api,
feeCfg: feeCfg,
Expand All @@ -136,6 +157,7 @@ func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datast
return m, nil
}

// Run starts the sealing FSM in the background, running preliminary checks first.
func (m *Miner) Run(ctx context.Context) error {
if err := m.runPreflightChecks(ctx); err != nil {
return xerrors.Errorf("miner preflight checks failed: %w", err)
Expand All @@ -152,17 +174,37 @@ func (m *Miner) Run(ctx context.Context) error {
MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee),
}

evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api)
// TODO: Maybe we update this policy after actor upgrades?
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, policy.GetMaxSectorExpirationExtension()-(md.WPoStProvingPeriod*2), md.PeriodStart%md.WPoStProvingPeriod)
var (
// consumer of chain head changes.
evts = events.NewEvents(ctx, m.api)
evtsAdapter = NewEventsAdapter(evts)

as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI = NewSealingAPIAdapter(m.api)

// Instantiate a precommit policy.
defaultDuration = policy.GetMaxSectorExpirationExtension() - (md.WPoStProvingPeriod * 2)
provingBoundary = md.PeriodStart % md.WPoStProvingPeriod

// TODO: Maybe we update this policy after actor upgrades?
pcp = sealing.NewBasicPreCommitPolicy(adaptedAPI, defaultDuration, provingBoundary)

// address selector.
as = func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}

// sealing configuration.
cfg = sealing.GetSealingConfigFunc(m.getSealConfig)
)

m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications, as)
// Instantiate the sealing FSM.
m.sealing = sealing.New(adaptedAPI, fc, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, cfg, m.handleSealingNotifications, as)

// Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function

return nil
Expand All @@ -184,6 +226,7 @@ func (m *Miner) Stop(ctx context.Context) error {
return m.sealing.Stop(ctx)
}

// runPreflightChecks verifies that preconditions to run the miner are satisfied.
func (m *Miner) runPreflightChecks(ctx context.Context) error {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK)
if err != nil {
Expand Down
File renamed without changes.
23 changes: 13 additions & 10 deletions storage/wdpost_changehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ const (
type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
type CompleteSubmitPoSTCb func(err error)

type changeHandlerAPI interface {
// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
// by the changeHandler to execute actions and query state.
type wdPoStCommands interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)

startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
onAbort(ts *types.TipSet, deadline *dline.Info)
failPost(err error, ts *types.TipSet, deadline *dline.Info)
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
}

type changeHandler struct {
api changeHandlerAPI
api wdPoStCommands
actor address.Address
proveHdlr *proveHandler
submitHdlr *submitHandler
}

func newChangeHandler(api changeHandlerAPI, actor address.Address) *changeHandler {
func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler {
posts := newPostsCache()
p := newProver(api, posts)
s := newSubmitter(api, posts)
Expand Down Expand Up @@ -146,7 +149,7 @@ type postResult struct {

// proveHandler generates proofs
type proveHandler struct {
api changeHandlerAPI
api wdPoStCommands
posts *postsCache

postResults chan *postResult
Expand All @@ -163,7 +166,7 @@ type proveHandler struct {
}

func newProver(
api changeHandlerAPI,
api wdPoStCommands,
posts *postsCache,
) *proveHandler {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -248,7 +251,7 @@ func (p *proveHandler) processPostResult(res *postResult) {
di := res.currPost.di
if res.err != nil {
// Proving failed so inform the API
p.api.failPost(res.err, res.ts, di)
p.api.recordPoStFailure(res.err, res.ts, di)
log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
p.api.onAbort(res.ts, di)

Expand Down Expand Up @@ -295,7 +298,7 @@ type postInfo struct {

// submitHandler submits proofs on-chain
type submitHandler struct {
api changeHandlerAPI
api wdPoStCommands
posts *postsCache

submitResults chan *submitResult
Expand All @@ -319,7 +322,7 @@ type submitHandler struct {
}

func newSubmitter(
api changeHandlerAPI,
api wdPoStCommands,
posts *postsCache,
) *submitHandler {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -488,7 +491,7 @@ func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet
func (s *submitHandler) processSubmitResult(res *submitResult) {
if res.err != nil {
// Submit failed so inform the API and go back to the start state
s.api.failPost(res.err, res.pw.ts, res.pw.di)
s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di)
log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
s.api.onAbort(res.pw.ts, res.pw.di)

Expand Down
2 changes: 1 addition & 1 deletion storage/wdpost_changehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (m *mockAPI) wasAbortCalled() bool {
return m.abortCalled
}

func (m *mockAPI) failPost(err error, ts *types.TipSet, deadline *dline.Info) {
func (m *mockAPI) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) {
}

func (m *mockAPI) setChangeHandler(ch *changeHandler) {
Expand Down
Loading

0 comments on commit 5f2e163

Please sign in to comment.