Skip to content

Commit

Permalink
fix: commit batch: Always go through commit batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 11, 2024
1 parent fc6229a commit 05f7bcd
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 91 deletions.
8 changes: 4 additions & 4 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return nil, xerrors.Errorf("getting config: %w", err)
}

if notif && total < cfg.MaxCommitBatch {
if notif && total < cfg.MaxCommitBatch && cfg.AggregateCommits {
return nil, nil
}

Expand All @@ -233,7 +233,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return false
}

individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut()
individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() || !cfg.AggregateCommits

if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
if ts.MinTicketBlock().ParentBaseFee.LessThan(cfg.AggregateAboveBaseFee) {
Expand Down Expand Up @@ -444,7 +444,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors2Params: %w", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors3Params: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSectors3, needFunds, maxFee, enc.Bytes())
Expand Down Expand Up @@ -474,7 +474,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto

res.Msg = &mcid

log.Infow("Sent ProveCommitSectors2 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
log.Infow("Sent ProveCommitSectors3 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))

return []sealiface.CommitBatchRes{res}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions storage/pipeline/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
),
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalized{}, SubmitCommitAggregate),
on(SectorFinalizedAvailable{}, SubmitCommitAggregate),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
Expand Down
2 changes: 1 addition & 1 deletion storage/pipeline/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"

// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain (deprecated)
CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain

SubmitCommitAggregate SectorState = "SubmitCommitAggregate"
Expand Down
88 changes: 4 additions & 84 deletions storage/pipeline/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/go-state-types/proof"
"github.com/filecoin-project/go-statemachine"

Expand Down Expand Up @@ -740,89 +739,10 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}

func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Deprecate this path, always go through batcher, just respect the AggregateCommits config in there

cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}

if cfg.AggregateCommits {
nv, err := m.Api.StateNetworkVersion(ctx.Context(), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting network version: %w", err)
}

if nv >= network.Version13 {
return ctx.Send(SectorSubmitCommitAggregate{})
}
}

ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}

if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}

enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sector.SectorNumber,
Proof: sector.Proof,
}

if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
}

mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}

pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}
if pci == nil {
return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")})
}

collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, ts.Key())
if err != nil {
return xerrors.Errorf("getting initial pledge collateral: %w", err)
}

collateral = big.Sub(collateral, pci.PreCommitDeposit)
if collateral.LessThan(big.Zero()) {
collateral = big.Zero()
}

collateral, err = collateralSendAmount(ctx.Context(), m.Api, m.maddr, cfg, collateral)
if err != nil {
return err
}

goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee))

from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from: %w", err)})
}

// TODO: check seed / ticket / deals are up to date
mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}

return ctx.Send(SectorCommitSubmitted{
Message: mcid,
})
// like precommit this is a deprecated state, but we keep it around for
// existing state machines
// todo: drop after nv21
return ctx.Send(SectorSubmitCommitAggregate{})
}

// processPieces returns either:
Expand Down

0 comments on commit 05f7bcd

Please sign in to comment.