Skip to content

Commit

Permalink
Merge pull request #4 from filecoin-project/feat/dynamic-worker-address
Browse files Browse the repository at this point in the history
we should always load worker address from the chain
  • Loading branch information
magik6k authored Apr 9, 2020
2 parents 1c35692 + 7b387ba commit 8fc0fd3
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 24 deletions.
14 changes: 2 additions & 12 deletions checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,7 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {

// checkPrecommit checks that data commitment generated in the sealing process
// matches pieces, and that the seal ticket isn't expired
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) (err error) {
tok, height, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}

func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, height abi.ChainEpoch, api SealingAPI) (err error) {
commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok)
if err != nil {
return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)}
Expand All @@ -89,12 +84,7 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, a
return nil
}

func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte) (err error) {
tok, _, err := m.api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}

func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, tok TipSetToken) (err error) {
if si.SeedEpoch == 0 {
return &ErrBadSeed{xerrors.Errorf("seed epoch was not set")}
}
Expand Down
7 changes: 3 additions & 4 deletions sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type SealingAPI interface {
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerSectorSize(context.Context, address.Address, TipSetToken) (abi.SectorSize, error)
StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok TipSetToken) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, market.DealState, error)
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, gasPrice big.Int, gasLimit int64, params []byte) (cid.Cid, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
Expand All @@ -41,8 +42,7 @@ type Sealing struct {
api SealingAPI
events Events

maddr address.Address
worker address.Address
maddr address.Address

sealer sectorstorage.SectorManager
sectors *statemachine.StateGroup
Expand All @@ -53,13 +53,12 @@ type Sealing struct {
pcp PreCommitPolicy
}

func New(api SealingAPI, events Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, tktFn TicketFn, pcp PreCommitPolicy) *Sealing {
func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, tktFn TicketFn, pcp PreCommitPolicy) *Sealing {
s := &Sealing{
api: api,
events: events,

maddr: maddr,
worker: worker,
sealer: sealer,
sc: sc,
verif: verif,
Expand Down
46 changes: 41 additions & 5 deletions states.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,19 @@ func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo)
}

func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, m.api); err != nil {
tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
}

waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
}

if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
Expand Down Expand Up @@ -133,7 +145,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
}

log.Info("submitting precommit for sector: ", sector.SectorNumber)
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.PreCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.PreCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
Expand Down Expand Up @@ -206,7 +218,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}

if err := m.checkCommit(ctx.Context(), sector, proof); err != nil {
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}

if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}

Expand All @@ -222,8 +240,14 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
}

waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}

// TODO: check seed / ticket are up to date
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.ProveCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
Expand Down Expand Up @@ -279,7 +303,19 @@ func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) erro
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)})
}

mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.DeclareTemporaryFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}

waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}

mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.DeclareTemporaryFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil {
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
}
Expand Down
18 changes: 15 additions & 3 deletions states_failed.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo)
}

func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, m.api); err != nil {
tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil
}

if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
Expand Down Expand Up @@ -114,7 +120,13 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
}

func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPrecommit(ctx.Context(), m.maddr, sector, m.api); err != nil {
tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}

if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
Expand All @@ -128,7 +140,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
}
}

if err := m.checkCommit(ctx.Context(), sector, sector.Proof); err != nil {
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
Expand Down

0 comments on commit 8fc0fd3

Please sign in to comment.