Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: commit batch: Always go through commit batcher #11704

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return nil, xerrors.Errorf("getting config: %w", err)
}

if notif && total < cfg.MaxCommitBatch {
if notif && total < cfg.MaxCommitBatch && cfg.AggregateCommits {
return nil, nil
}

Expand All @@ -233,7 +233,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return false
}

individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut()
individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() || !cfg.AggregateCommits

if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
if ts.MinTicketBlock().ParentBaseFee.LessThan(cfg.AggregateAboveBaseFee) {
Expand Down Expand Up @@ -444,7 +444,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors2Params: %w", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors3Params: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSectors3, needFunds, maxFee, enc.Bytes())
Expand Down Expand Up @@ -474,7 +474,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto

res.Msg = &mcid

log.Infow("Sent ProveCommitSectors2 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
log.Infow("Sent ProveCommitSectors3 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))

return []sealiface.CommitBatchRes{res}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions storage/pipeline/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
),
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalized{}, SubmitCommitAggregate),
on(SectorFinalizedAvailable{}, SubmitCommitAggregate),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
Expand Down Expand Up @@ -674,7 +674,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
}
case SectorCommitted: // the normal case
e.apply(state)
state.State = SubmitCommit
state.State = SubmitCommitAggregate
case SectorProofReady: // early finalize
e.apply(state)
state.State = CommitFinalize
Expand Down
23 changes: 10 additions & 13 deletions storage/pipeline/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,18 @@ func TestHappyPath(t *testing.T) {
require.Equal(m.t, m.state.State, Committing)

m.planSingle(SectorCommitted{})
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitAggregateWait)

m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving}
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -135,9 +135,6 @@ func TestHappyPathFinalizeEarly(t *testing.T) {
require.Equal(m.t, m.state.State, CommitFinalize)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)

m.planSingle(SectorSubmitCommitAggregate{})
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitAggregateSent{})
Expand All @@ -149,7 +146,7 @@ func TestHappyPathFinalizeEarly(t *testing.T) {
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -188,9 +185,9 @@ func TestCommitFinalizeFailed(t *testing.T) {
require.Equal(m.t, m.state.State, CommitFinalize)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommit}
expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommitAggregate}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -242,10 +239,10 @@ func TestSeedRevert(t *testing.T) {
// not changing the seed this time
_, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state)
require.NoError(t, err)
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitAggregateWait)

m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)
Expand Down
2 changes: 1 addition & 1 deletion storage/pipeline/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"

// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain (deprecated)
CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain

SubmitCommitAggregate SectorState = "SubmitCommitAggregate"
Expand Down
88 changes: 4 additions & 84 deletions storage/pipeline/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
actorstypes "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
miner2 "github.com/filecoin-project/go-state-types/builtin/v13/miner"
verifreg13 "github.com/filecoin-project/go-state-types/builtin/v13/verifreg"
"github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
Expand Down Expand Up @@ -740,89 +739,10 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}

func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Deprecate this path, always go through batcher, just respect the AggregateCommits config in there

cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}

if cfg.AggregateCommits {
nv, err := m.Api.StateNetworkVersion(ctx.Context(), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting network version: %w", err)
}

if nv >= network.Version13 {
return ctx.Send(SectorSubmitCommitAggregate{})
}
}

ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}

if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}

enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sector.SectorNumber,
Proof: sector.Proof,
}

if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
}

mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}

pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}
if pci == nil {
return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")})
}

collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, ts.Key())
if err != nil {
return xerrors.Errorf("getting initial pledge collateral: %w", err)
}

collateral = big.Sub(collateral, pci.PreCommitDeposit)
if collateral.LessThan(big.Zero()) {
collateral = big.Zero()
}

collateral, err = collateralSendAmount(ctx.Context(), m.Api, m.maddr, cfg, collateral)
if err != nil {
return err
}

goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee))

from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from: %w", err)})
}

// TODO: check seed / ticket / deals are up to date
mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}

return ctx.Send(SectorCommitSubmitted{
Message: mcid,
})
// like precommit this is a deprecated state, but we keep it around for
// existing state machines
// todo: drop after nv21
return ctx.Send(SectorSubmitCommitAggregate{})
}

// processPieces returns either:
Expand Down