Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage module: add go docs and minor code quality refactors #6259

Merged
merged 13 commits into from
May 20, 2021
11 changes: 11 additions & 0 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,14 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet,
return cs.LoadTipSet(l[len(l)-1].Parents())
}

// ReorgOps takes two tipsets (which can be at different heights), and walks
// their corresponding chains backwards one step at a time until we find
// a common ancestor. It then returns the respective chain segments that fork
// from the identified ancestor, in reverse order, where the first element of
// each slice is the supplied tipset, and the last element is the common
// ancenstor.
raulk marked this conversation as resolved.
Show resolved Hide resolved
//
// If an error happens along the way, we return the error with nil slices.
func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
return ReorgOps(cs.LoadTipSet, a, b)
}
Expand Down Expand Up @@ -1235,6 +1243,9 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return blscids, secpkcids, nil
}

// GetPath returns returns the sequence of atomic head change operations that
raulk marked this conversation as resolved.
Show resolved Hide resolved
// need to be applied in order to switch the head of the chain from the `from`
// tipset to the `to` tipset.
func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) {
fts, err := cs.LoadTipSet(from)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion extern/storage-sealing/precommit_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ type BasicPreCommitPolicy struct {
duration abi.ChainEpoch
}

// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy
// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy.
//
// The provided duration is used as the default sector expiry when the sector
// contains no deals. The proving boundary is used to adjust/align the sector's expiration.
func NewBasicPreCommitPolicy(api Chain, duration abi.ChainEpoch, provingBoundary abi.ChainEpoch) BasicPreCommitPolicy {
return BasicPreCommitPolicy{
api: api,
Expand Down
4 changes: 2 additions & 2 deletions storage/adapter_storage_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
var _ sealing.SealingAPI = new(SealingAPIAdapter)

type SealingAPIAdapter struct {
delegate storageMinerApi
delegate fullNodeFilteredAPI
}

func NewSealingAPIAdapter(api storageMinerApi) SealingAPIAdapter {
func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter {
return SealingAPIAdapter{delegate: api}
}

Expand Down
75 changes: 59 additions & 16 deletions storage/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"errors"
"time"

"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/go-state-types/dline"

"github.com/filecoin-project/go-bitfield"

"github.com/ipfs/go-cid"
Expand All @@ -20,9 +16,13 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/specs-storage/storage"

sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
Expand All @@ -41,8 +41,16 @@ import (

var log = logging.Logger("storageminer")

// Miner is the central miner entrypoint object inside Lotus. It is
// instantiated in the node builder, along with the WindowPoStScheduler.
//
// This object is the owner of the sealing pipeline. Most of the actual logic
// lives in the storage-sealing module (sealing.Sealing), and the Miner object
// exposes it to the rest of the system by proxying calls.
Comment on lines +47 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly tech debt, but it's really tricky to untangle storage-sealing from this object, which is why it's here

//
// Miner#Run starts the sealing FSM.
type Miner struct {
api storageMinerApi
api fullNodeFilteredAPI
feeCfg config.MinerFeeConfig
h host.Host
sealer sectorstorage.SectorManager
Expand Down Expand Up @@ -70,7 +78,9 @@ type SealingStateEvt struct {
Error string
}

type storageMinerApi interface {
// fullNodeFilteredAPI is the subset of the full node API the Miner needs from
// a Lotus full node.
type fullNodeFilteredAPI interface {
// Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
Expand Down Expand Up @@ -116,7 +126,18 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}

func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) {
// NewMiner creates a new Miner object.
func NewMiner(api fullNodeFilteredAPI,
maddr address.Address,
h host.Host,
ds datastore.Batching,
sealer sectorstorage.SectorManager,
sc sealing.SectorIDCounter,
verif ffiwrapper.Verifier,
gsd dtypes.GetSealingConfigFunc,
feeCfg config.MinerFeeConfig,
journal journal.Journal,
as *AddressSelector) (*Miner, error) {
m := &Miner{
api: api,
feeCfg: feeCfg,
Expand All @@ -136,6 +157,7 @@ func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datast
return m, nil
}

// Run starts the sealing FSM in the background, running preliminary checks first.
func (m *Miner) Run(ctx context.Context) error {
if err := m.runPreflightChecks(ctx); err != nil {
return xerrors.Errorf("miner preflight checks failed: %w", err)
Expand All @@ -152,17 +174,37 @@ func (m *Miner) Run(ctx context.Context) error {
MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee),
}

evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api)
// TODO: Maybe we update this policy after actor upgrades?
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, policy.GetMaxSectorExpirationExtension()-(md.WPoStProvingPeriod*2), md.PeriodStart%md.WPoStProvingPeriod)
var (
// consumer of chain head changes.
evts = events.NewEvents(ctx, m.api)
evtsAdapter = NewEventsAdapter(evts)

as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI = NewSealingAPIAdapter(m.api)

// Instantiate a precommit policy.
defaultDuration = policy.GetMaxSectorExpirationExtension() - (md.WPoStProvingPeriod * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a comment to explain this calculation

provingBoundary = md.PeriodStart % md.WPoStProvingPeriod

// TODO: Maybe we update this policy after actor upgrades?
pcp = sealing.NewBasicPreCommitPolicy(adaptedAPI, defaultDuration, provingBoundary)

// address selector.
as = func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a comment explaining what the address selector does

return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}

// sealing configuration.
cfg = sealing.GetSealingConfigFunc(m.getSealConfig)
)

m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications, as)
// Instantiate the sealing FSM.
m.sealing = sealing.New(adaptedAPI, fc, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, cfg, m.handleSealingNotifications, as)

// Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function

return nil
Expand All @@ -184,6 +226,7 @@ func (m *Miner) Stop(ctx context.Context) error {
return m.sealing.Stop(ctx)
}

// runPreflightChecks verifies that preconditions to run the miner are satisfied.
func (m *Miner) runPreflightChecks(ctx context.Context) error {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK)
if err != nil {
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions storage/wdpost_changehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type CompleteSubmitPoSTCb func(err error)

type changeHandlerAPI interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)

startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
onAbort(ts *types.TipSet, deadline *dline.Info)
Expand Down
82 changes: 65 additions & 17 deletions storage/wdpost_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

// failPost records a failure in the journal.
func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) {
raulk marked this conversation as resolved.
Show resolved Hide resolved
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
c := evtCommon{Error: err}
Expand Down Expand Up @@ -167,7 +168,7 @@ func (s *WindowPoStScheduler) runSubmitPoST(
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
if err != nil {
err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err)
log.Errorf("submitPost failed: %+v", err)
log.Errorf("submitPoStMessage failed: %+v", err)

return err
}
Expand All @@ -180,7 +181,7 @@ func (s *WindowPoStScheduler) runSubmitPoST(
post.ChainCommitRand = commRand

// Submit PoST
sm, submitErr := s.submitPost(ctx, post)
sm, submitErr := s.submitPoStMessage(ctx, post)
if submitErr != nil {
log.Errorf("submit window post failed: %+v", submitErr)
} else {
Expand Down Expand Up @@ -233,8 +234,25 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
return sbf, nil
}

func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries")
// declareRecoveries identifies sectors that were previously marked as faulty
// for our miner, but are now recovered (i.e. are now provable again) and
// still not reported as such.
//
// It then reports the recovery on chain via a `DeclareFaultsRecovered`
// message to our miner actor.
//
// This is always invoked ahead of time, before the deadline for the evaluated
// sectors arrives. That way, recoveries are declared in preparation for those
// sectors to be proven.
//
// If a declaration is made, it awaits for build.MessageConfidence confirmations
// on chain before returning.
//
// TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline!
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
defer span.End()

faulty := uint64(0)
Expand Down Expand Up @@ -302,7 +320,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.setSender(ctx, msg, spec); err != nil {
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return recoveries, nil, err
}

Expand All @@ -325,8 +343,21 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
return recoveries, sm, nil
}

func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults")
// declareFaults identifies the sectors on the specified proving deadline that
// are faulty, and reports the faults on chain via the `DeclareFaults` message
// to our miner actor.
//
// This is always invoked ahead of time, before the deadline for the evaluated
// sectors arrives. That way, faults are declared before a penalty is accrued.
//
// If a declaration is made, it awaits for build.MessageConfidence confirmations
// on chain before returning.
//
// TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline!
func (s *WindowPoStScheduler) declareFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareFaults")
defer span.End()

bad := uint64(0)
Expand Down Expand Up @@ -387,7 +418,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
Value: types.NewInt(0), // TODO: Is there a fee?
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.setSender(ctx, msg, spec); err != nil {
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return faults, nil, err
}

Expand All @@ -410,6 +441,12 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
return faults, sm, nil
}

// runPost runs a full cycle of the PoSt process:
//
// 1. performs recovery declarations for the next deadline.
// 2. performs fault declarations for the next deadline.
// 3. computes and submits proofs, batching partitions and making sure they
// don't exceed message capacity.
func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) {
raulk marked this conversation as resolved.
Show resolved Hide resolved
ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End()
Expand Down Expand Up @@ -443,7 +480,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
}
)

if recoveries, sigmsg, err = s.checkNextRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err)
}
Expand All @@ -462,7 +499,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return // FORK: declaring faults after ignition upgrade makes no sense
}

if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err)
}
Expand Down Expand Up @@ -755,7 +792,10 @@ func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors,
return proofSectors, nil
}

func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) {
// submitPoStMessage builds a SubmitWindowedPoSt message and submits it to
// the mpool. It doesn't synchronously block on confirmations, but it does
// monitor in the background simply for the purposes of logging.
func (s *WindowPoStScheduler) submitPoStMessage(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End()

Expand All @@ -773,13 +813,11 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.setSender(ctx, msg, spec); err != nil {
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return nil, err
}

// TODO: consider maybe caring about the output
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)

if err != nil {
return nil, xerrors.Errorf("pushing message to mpool: %w", err)
}
Expand All @@ -803,22 +841,30 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
return sm, nil
}

func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error {
// prepareMessage prepares a message before sending it, setting:
//
// * the sender (from the AddressSelector, falling back to the worker address if none set)
// * the right gas parameters
func (s *WindowPoStScheduler) prepareMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error {
mi, err := s.api.StateMinerInfo(ctx, s.actor, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("error getting miner info: %w", err)
}
// use the worker as a fallback
// set the worker as a fallback
msg.From = mi.Worker

// (optimal) initial estimation with some overestimation that guarantees
// block inclusion within the next 20 tipsets.
gm, err := s.api.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK)
if err != nil {
log.Errorw("estimating gas", "error", err)
return nil
}
*msg = *gm

// estimate
// calculate a more frugal estimation; premium is estimated to guarantee
// inclusion within 5 tipsets, and fee cap is estimated for inclusion
// within 4 tipsets.
minGasFeeMsg := *msg

minGasFeeMsg.GasPremium, err = s.api.GasEstimateGasPremium(ctx, 5, msg.From, msg.GasLimit, types.EmptyTSK)
Expand All @@ -833,6 +879,8 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message,
minGasFeeMsg.GasFeeCap = msg.GasFeeCap
}

// goodFunds = funds needed for optimal inclusion probability.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 👍

// minFunds = funds needed for more speculative inclusion probability.
goodFunds := big.Add(msg.RequiredFunds(), msg.Value)
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)

Expand Down
Loading