diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index efc32b84b9..49c401342f 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -110,6 +110,14 @@ type ( BlockBuilderFactory interface { // NewBlockBuilder creates block builder NewBlockBuilder(context.Context, func(action.Envelope) (*action.SealedEnvelope, error)) (*block.Builder, error) + // OngoingBlockHeight returns the height of current ongoing block (not yet committed) + OngoingBlockHeight() uint64 + // PendingBlockHeader returns the header of pending blocks (not yet committed) + PendingBlockHeader(uint64) (*block.Header, error) + // PutBlockHeader stores the pending block header + PutBlockHeader(*block.Header) + // CancelBlock indicates the block producing fails + CancelBlock(uint64) } // blockchain implements the Blockchain interface @@ -287,10 +295,7 @@ func (bc *blockchain) ValidateBlock(blk *block.Block, opts ...BlockValidationOpt if blk == nil { return ErrInvalidBlock } - tipHeight, err := bc.dao.Height() - if err != nil { - return err - } + tipHeight := blk.Height() - 1 tip, err := bc.tipInfo(tipHeight) if err != nil { return err @@ -412,10 +417,7 @@ func (bc *blockchain) MintNewBlock(timestamp time.Time) (*block.Block, error) { defer bc.mu.RUnlock() mintNewBlockTimer := bc.timerFactory.NewTimer("MintNewBlock") defer mintNewBlockTimer.End() - tipHeight, err := bc.dao.Height() - if err != nil { - return nil, err - } + tipHeight := bc.bbf.OngoingBlockHeight() newblockHeight := tipHeight + 1 ctx, err := bc.context(context.Background(), tipHeight) if err != nil { @@ -439,6 +441,7 @@ func (bc *blockchain) MintNewBlock(timestamp time.Time) (*block.Block, error) { if err != nil { return nil, errors.Wrapf(err, "failed to create block") } + bc.bbf.PutBlockHeader(&blk.Header) _blockMtc.WithLabelValues("MintGas").Set(float64(blk.GasUsed())) _blockMtc.WithLabelValues("MintActions").Set(float64(len(blk.Actions))) return &blk, nil @@ -478,21 +481,30 @@ func (bc *blockchain) Genesis() genesis.Genesis { // private functions //===================================== -func (bc *blockchain) tipInfo(tipHeight uint64) (*protocol.TipInfo, error) { - if tipHeight == 0 { +func (bc *blockchain) tipInfo(height uint64) (*protocol.TipInfo, error) { + if height == 0 { return &protocol.TipInfo{ Height: 0, Hash: bc.genesis.Hash(), Timestamp: time.Unix(bc.genesis.Timestamp, 0), }, nil } - header, err := bc.dao.HeaderByHeight(tipHeight) + tipHeight, err := bc.dao.Height() + if err != nil { + return nil, err + } + var header *block.Header + if height <= tipHeight { + header, err = bc.dao.HeaderByHeight(height) + } else { + header, err = bc.bbf.PendingBlockHeader(height) + } if err != nil { return nil, err } return &protocol.TipInfo{ - Height: tipHeight, + Height: height, GasUsed: header.GasUsed(), Hash: header.HashBlock(), Timestamp: header.Timestamp(), diff --git a/blockchain/integrity/integrity_test.go b/blockchain/integrity/integrity_test.go index a93b494513..bdbfa70689 100644 --- a/blockchain/integrity/integrity_test.go +++ b/blockchain/integrity/integrity_test.go @@ -959,6 +959,133 @@ func createChain(cfg config.Config, inMem bool) (blockchain.Blockchain, factory. return bc, sf, dao, ap, nil } +func TestBlockPipeline(t *testing.T) { + require := require.New(t) + + cfg := config.Default + testIndexPath, err := testutil.PathOfTempFile("index") + require.NoError(err) + + defer func() { + testutil.CleanupPath(testIndexPath) + // clear the gateway + delete(cfg.Plugins, config.GatewayPlugin) + }() + + minGas := big.NewInt(unit.Qev) + cfg.Chain.IndexDBPath = testIndexPath + cfg.Chain.ProducerPrivKey = "a000000000000000000000000000000000000000000000000000000000000000" + cfg.Genesis.EnableGravityChainVoting = false + cfg.Plugins[config.GatewayPlugin] = true + cfg.Chain.EnableAsyncIndexWrite = false + cfg.ActPool.MinGasPriceStr = minGas.String() + cfg.Genesis.PacificBlockHeight = 2 + cfg.Genesis.AleutianBlockHeight = 2 + cfg.Genesis.BeringBlockHeight = 2 + cfg.Genesis.CookBlockHeight = 2 + cfg.Genesis.DaytonaBlockHeight = 2 + cfg.Genesis.DardanellesBlockHeight = 2 + cfg.Genesis.EasterBlockHeight = 2 + cfg.Genesis.FbkMigrationBlockHeight = 2 + cfg.Genesis.FairbankBlockHeight = 2 + cfg.Genesis.GreenlandBlockHeight = 2 + cfg.Genesis.HawaiiBlockHeight = 2 + cfg.Genesis.IcelandBlockHeight = 2 + cfg.Genesis.JutlandBlockHeight = 2 + cfg.Genesis.KamchatkaBlockHeight = 2 + cfg.Genesis.LordHoweBlockHeight = 2 + cfg.Genesis.MidwayBlockHeight = 2 + cfg.Genesis.NewfoundlandBlockHeight = 2 + cfg.Genesis.OkhotskBlockHeight = 2 + cfg.Genesis.PalauBlockHeight = 2 + cfg.Genesis.InitBalanceMap[identityset.Address(27).String()] = unit.ConvertIotxToRau(10000000000).String() + + ctx := genesis.WithGenesisContext(context.Background(), cfg.Genesis) + bc, sf, _, ap, err := createChain(cfg, true) + require.NoError(err) + sk, err := iotexcrypto.HexStringToPrivateKey(cfg.Chain.ProducerPrivKey) + require.NoError(err) + producer := sk.PublicKey().Address() + ctrl := gomock.NewController(t) + pp := mock_poll.NewMockProtocol(ctrl) + pp.EXPECT().CreateGenesisStates(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + pp.EXPECT().Candidates(gomock.Any(), gomock.Any()).Return([]*state.Candidate{ + { + Address: producer.String(), + RewardAddress: producer.String(), + }, + }, nil).AnyTimes() + pp.EXPECT().Register(gomock.Any()).DoAndReturn(func(reg *protocol.Registry) error { + return reg.Register("poll", pp) + }).AnyTimes() + pp.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + require.NoError(sf.Register(pp)) + require.NoError(bc.Start(ctx)) + defer func() { + require.NoError(bc.Stop(ctx)) + }() + + // Add block 1 + nonce, err := ap.GetPendingNonce(identityset.Address(27).String()) + require.NoError(err) + require.EqualValues(1, nonce) + nonce, err = ap.GetPendingNonce(identityset.Address(25).String()) + require.NoError(err) + require.EqualValues(1, nonce) + priKey0 := identityset.PrivateKey(27) + ex1, err := action.SignedExecution(action.EmptyAddress, priKey0, 1, new(big.Int), 500000, minGas, _constantinopleOpCodeContract) + require.NoError(err) + require.NoError(ap.Add(ctx, ex1)) + tsf1, err := action.SignedTransfer(identityset.Address(25).String(), priKey0, 2, big.NewInt(10000), nil, 500000, minGas) + require.NoError(err) + require.NoError(ap.Add(ctx, tsf1)) + tsf2, err := action.SignedTransfer(identityset.Address(24).String(), priKey0, 3, big.NewInt(20000), nil, 500000, minGas) + require.NoError(err) + require.NoError(ap.Add(ctx, tsf2)) + blockTime := time.Unix(1546329600, 0) + blk, err := bc.MintNewBlock(blockTime) + require.NoError(err) + require.EqualValues(1, blk.Height()) + require.Equal(4, len(blk.Body.Actions)) + require.NoError(bc.ValidateBlock(blk)) + // add block 2 + tsf3, err := action.SignedTransfer(identityset.Address(23).String(), priKey0, 4, big.NewInt(30000), nil, 500000, minGas) + require.NoError(err) + require.NoError(ap.Add(ctx, tsf3)) + deterministic, err := address.FromHex("3fab184622dc19b6109349b94811493bf2a45362") + require.NoError(err) + tsf4, err := action.SignedTransfer(deterministic.String(), priKey0, 5, big.NewInt(10000000000000000), nil, 500000, minGas) + require.NoError(err) + require.NoError(ap.Add(ctx, tsf4)) + println("mint block 2") + blk1, err := bc.MintNewBlock(blockTime.Add(time.Second)) + require.NoError(err) + require.EqualValues(2, blk1.Height()) + require.Equal(3, len(blk1.Body.Actions)) + require.NoError(bc.ValidateBlock(blk1)) + require.NoError(bc.CommitBlock(blk)) + require.NoError(bc.CommitBlock(blk1)) + // verify accounts + for _, v := range []struct { + a address.Address + atype int32 + nonce uint64 + b string + }{ + {identityset.Address(27), 0, 6, "9999999999180825999999940000"}, + {identityset.Address(25), 0, 1, "100000000000000000000010000"}, + {identityset.Address(24), 0, 1, "100000000000000000000020000"}, + {identityset.Address(23), 0, 1, "100000000000000000000030000"}, + {deterministic, 1, 0, "10000000000000000"}, + } { + a, err := accountutil.AccountState(ctx, sf, v.a) + require.NoError(err) + require.Equal(v.atype, a.AccountType()) + require.Equal(v.nonce, a.PendingNonce()) + require.Equal(v.b, a.Balance.String()) + } +} + func TestBlockchainHardForkFeatures(t *testing.T) { require := require.New(t) diff --git a/consensus/consensus.go b/consensus/consensus.go index 76bcdd4fd2..e6babb5192 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -136,7 +136,7 @@ func NewConsensus( SetAddr(cfg.Chain.ProducerAddress().String()). SetPriKey(cfg.Chain.ProducerPrivateKey()). SetConfig(cfg). - SetChainManager(rolldpos.NewChainManager(bc)). + SetChainManager(rolldpos.NewChainManager2(bc, sf)). SetBlockDeserializer(block.NewDeserializer(bc.EvmNetworkID())). SetClock(clock). SetBroadcast(ops.broadcastHandler). @@ -144,7 +144,8 @@ func NewConsensus( SetProposersByEpochFunc(proposersByEpochFunc). RegisterProtocol(ops.rp) // TODO: explorer dependency deleted here at #1085, need to revive by migrating to api - cs.scheme, err = bd.Build() + // cs.scheme, err = bd.Build() + cs.scheme = rolldpos.NewChainedRollDPoS(bd) if err != nil { log.Logger("consensus").Panic("Error when constructing RollDPoS.", zap.Error(err)) } diff --git a/consensus/consensusfsm/chainedfsm.go b/consensus/consensusfsm/chainedfsm.go new file mode 100644 index 0000000000..06caf32455 --- /dev/null +++ b/consensus/consensusfsm/chainedfsm.go @@ -0,0 +1,248 @@ +package consensusfsm + +import ( + "time" + + "github.com/facebookgo/clock" + fsm "github.com/iotexproject/go-fsm" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/v2/pkg/log" +) + +const ( + // consensus states + SFinalized fsm.State = "S_FINALIZED" + SInvalid fsm.State = "S_INVALID" +) + +type ChainedConsensusFSM struct { + *ConsensusFSM +} + +func NewChainedConsensusFSM(ctx Context, clock clock.Clock) (*ChainedConsensusFSM, error) { + mm := &ConsensusFSM{ + evtq: make(chan *ConsensusEvent, ctx.EventChanSize()), + close: make(chan interface{}), + ctx: ctx, + clock: clock, + } + cm := &ChainedConsensusFSM{ + ConsensusFSM: mm, + } + b := fsm.NewBuilder(). + AddInitialState(sPrepare). + AddStates( + sAcceptBlockProposal, + sAcceptProposalEndorsement, + sAcceptLockEndorsement, + sAcceptPreCommitEndorsement, + SInvalid, + SFinalized, + ). + AddTransition(sPrepare, ePrepare, cm.prepare, []fsm.State{ + sPrepare, + sAcceptBlockProposal, + sAcceptPreCommitEndorsement, + SInvalid, + }). + AddTransition( + sAcceptBlockProposal, + eReceiveBlock, + cm.onReceiveBlock, + []fsm.State{ + sAcceptBlockProposal, // proposed block invalid + sAcceptProposalEndorsement, // receive valid block, jump to next step + }). + AddTransition( + sAcceptBlockProposal, + eFailedToReceiveBlock, + cm.onFailedToReceiveBlock, + []fsm.State{ + sAcceptProposalEndorsement, // no valid block, jump to next step + }). + AddTransition( + sAcceptProposalEndorsement, + eReceiveProposalEndorsement, + cm.onReceiveProposalEndorsementInAcceptProposalEndorsementState, + []fsm.State{ + sAcceptProposalEndorsement, // not enough endorsements + sAcceptLockEndorsement, // enough endorsements + }). + AddTransition( + sAcceptProposalEndorsement, + eReceivePreCommitEndorsement, + cm.onReceiveProposalEndorsementInAcceptProposalEndorsementState, + []fsm.State{ + sAcceptProposalEndorsement, // not enough endorsements + sAcceptLockEndorsement, // enough endorsements + }). + AddTransition( + sAcceptProposalEndorsement, + eStopReceivingProposalEndorsement, + cm.onStopReceivingProposalEndorsement, + []fsm.State{ + sAcceptLockEndorsement, // timeout, jump to next step + }). + AddTransition( + sAcceptLockEndorsement, + eReceiveProposalEndorsement, + cm.onReceiveProposalEndorsementInAcceptLockEndorsementState, + []fsm.State{ + sAcceptLockEndorsement, + }, + ). + AddTransition( + sAcceptLockEndorsement, + eReceiveLockEndorsement, + cm.onReceiveLockEndorsement, + []fsm.State{ + sAcceptLockEndorsement, // not enough endorsements + sAcceptPreCommitEndorsement, // reach commit agreement, jump to next step + }). + AddTransition( + sAcceptLockEndorsement, + eReceivePreCommitEndorsement, + cm.onReceiveLockEndorsement, + []fsm.State{ + sAcceptLockEndorsement, // not enough endorsements + sAcceptPreCommitEndorsement, // reach commit agreement, jump to next step + }). + AddTransition( + sAcceptLockEndorsement, + eStopReceivingLockEndorsement, + cm.onStopReceivingLockEndorsement, + []fsm.State{ + SInvalid, // timeout, invalid + }). + AddTransition( + sAcceptPreCommitEndorsement, + eBroadcastPreCommitEndorsement, + cm.onBroadcastPreCommitEndorsement, + []fsm.State{ + sAcceptPreCommitEndorsement, + }). + AddTransition( + sAcceptPreCommitEndorsement, + eStopReceivingPreCommitEndorsement, + cm.onStopReceivingPreCommitEndorsement, + []fsm.State{ + SInvalid, + }). + AddTransition( + sAcceptPreCommitEndorsement, + eReceivePreCommitEndorsement, + cm.onReceivePreCommitEndorsement, + []fsm.State{ + sAcceptPreCommitEndorsement, + SFinalized, // reach consensus, start next epoch + }) + // Add the backdoor transition so that we could unit test the transition from any given state + for _, state := range consensusStates { + b = b.AddTransition(state, BackdoorEvent, cm.handleBackdoorEvt, consensusStates) + if state != sPrepare { + b = b.AddTransition(state, eCalibrate, cm.calibrate, []fsm.State{sPrepare, state}) + } + } + m, err := b.Build() + if err != nil { + return nil, errors.Wrap(err, "error when building the FSM") + } + cm.fsm = m + return cm, nil +} + +func (m *ChainedConsensusFSM) prepare(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) + if err := m.ctx.Prepare(); err != nil { + m.ctx.Logger().Error("Error during prepare", zap.Error(err), zap.Stack("stack")) + return m.Invalid() + } + m.ctx.Logger().Debug("Start a new round", zap.Stack("stack")) + proposal, err := m.ctx.Proposal() + if err != nil { + m.ctx.Logger().Error("failed to generate block proposal", zap.Error(err)) + return m.BackToPrepare(100 * time.Millisecond) + } + + overtime := m.ctx.WaitUntilRoundStart() + if proposal != nil { + m.ctx.Broadcast(proposal) + } + if !m.ctx.IsDelegate() { + return m.BackToPrepare(0) + } + if proposal != nil { + m.ProduceReceiveBlockEvent(proposal) + } + + var h uint64 + cEvt, ok := evt.(*ConsensusEvent) + if !ok { + m.ctx.Logger().Panic("failed to convert ConsensusEvent in prepare") + } + h = cEvt.Height() + ttl := m.ctx.AcceptBlockTTL(h) - overtime + // Setup timeouts + if preCommitEndorsement := m.ctx.PreCommitEndorsement(); preCommitEndorsement != nil { + cEvt := m.ctx.NewConsensusEvent(eBroadcastPreCommitEndorsement, preCommitEndorsement) + m.produce(cEvt, ttl) + ttl += m.ctx.AcceptProposalEndorsementTTL(cEvt.Height()) + m.produce(cEvt, ttl) + ttl += m.ctx.AcceptLockEndorsementTTL(cEvt.Height()) + m.produce(cEvt, ttl) + ttl += m.ctx.CommitTTL(cEvt.Height()) + m.produceConsensusEvent(eStopReceivingPreCommitEndorsement, ttl) + return sAcceptPreCommitEndorsement, nil + } + m.produceConsensusEvent(eFailedToReceiveBlock, ttl) + ttl += m.ctx.AcceptProposalEndorsementTTL(h) + m.produceConsensusEvent(eStopReceivingProposalEndorsement, ttl) + ttl += m.ctx.AcceptLockEndorsementTTL(h) + m.produceConsensusEvent(eStopReceivingLockEndorsement, ttl) + ttl += m.ctx.CommitTTL(h) + m.produceConsensusEvent(eStopReceivingPreCommitEndorsement, ttl) + return sAcceptBlockProposal, nil +} + +func (m *ChainedConsensusFSM) onStopReceivingLockEndorsement(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) + m.ctx.Logger().Warn("Not enough lock endorsements") + + return m.Invalid() +} + +func (m *ChainedConsensusFSM) onStopReceivingPreCommitEndorsement(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) + m.ctx.Logger().Warn("Not enough pre-commit endorsements") + + return m.Invalid() +} + +func (m *ChainedConsensusFSM) onReceivePreCommitEndorsement(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) + cEvt, ok := evt.(*ConsensusEvent) + if !ok { + return sAcceptPreCommitEndorsement, errors.Wrap(ErrEvtCast, "failed to cast to consensus event") + } + committed, err := m.ctx.Commit(cEvt.Data()) + if err != nil || !committed { + return sAcceptPreCommitEndorsement, err + } + return SFinalized, nil +} + +func (m *ChainedConsensusFSM) Finalize() (fsm.State, error) { + m.ctx.Logger().Warn("Finalized") + return SFinalized, nil +} + +func (m *ChainedConsensusFSM) Invalid() (fsm.State, error) { + m.ctx.Logger().Warn("Invalid") + return SInvalid, nil +} diff --git a/consensus/consensusfsm/context.go b/consensus/consensusfsm/context.go index 2acbd6d866..e161b07910 100644 --- a/consensus/consensusfsm/context.go +++ b/consensus/consensusfsm/context.go @@ -25,6 +25,7 @@ type Context interface { Logger() *zap.Logger Height() uint64 + Number() uint32 NewConsensusEvent(fsm.EventType, interface{}) *ConsensusEvent NewBackdoorEvt(fsm.State) *ConsensusEvent diff --git a/consensus/consensusfsm/fsm.go b/consensus/consensusfsm/fsm.go index 7f02c92920..b8aef0aa06 100644 --- a/consensus/consensusfsm/fsm.go +++ b/consensus/consensusfsm/fsm.go @@ -15,6 +15,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/v2/pkg/log" ) /** @@ -305,6 +307,7 @@ func (m *ConsensusFSM) produce(evt *ConsensusEvent, delay time.Duration) { if evt == nil { return } + // m.ctx.Logger().Debug("produce event", zap.Any("event", evt.Type()), zap.Stack("stack")) _consensusEvtsMtc.WithLabelValues(string(evt.Type()), "produced").Inc() if delay > 0 { m.wg.Add(1) @@ -328,7 +331,7 @@ func (m *ConsensusFSM) handle(evt *ConsensusEvent) error { return nil } if m.ctx.IsFutureEvent(evt) { - m.ctx.Logger().Debug("future event", zap.Any("event", evt.Type())) + m.ctx.Logger().Debug("future event", zap.Any("event", evt.Type()), zap.Any("eventHeight", evt.Height()), zap.Any("eventRount", evt.Round())) // TODO: find a more appropriate delay m.produce(evt, m.ctx.UnmatchedEventInterval(evt.Height())) _consensusEvtsMtc.WithLabelValues(string(evt.Type()), "backoff").Inc() @@ -375,6 +378,8 @@ func (m *ConsensusFSM) handle(evt *ConsensusEvent) error { } func (m *ConsensusFSM) calibrate(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) cEvt, ok := evt.(*ConsensusEvent) if !ok { return sPrepare, errors.New("invalid fsm event") @@ -397,14 +402,14 @@ func (m *ConsensusFSM) calibrate(evt fsm.Event) (fsm.State, error) { func (m *ConsensusFSM) prepare(evt fsm.Event) (fsm.State, error) { if err := m.ctx.Prepare(); err != nil { - m.ctx.Logger().Error("Error during prepare", zap.Error(err)) - return m.BackToPrepare(0) + m.ctx.Logger().Error("Error during prepare", zap.Error(err), zap.Stack("stack")) + return m.BackToPrepare(100 * time.Millisecond) } - m.ctx.Logger().Debug("Start a new round") + m.ctx.Logger().Debug("Start a new round", zap.Stack("stack")) proposal, err := m.ctx.Proposal() if err != nil { m.ctx.Logger().Error("failed to generate block proposal", zap.Error(err)) - return m.BackToPrepare(0) + return m.BackToPrepare(100 * time.Millisecond) } overtime := m.ctx.WaitUntilRoundStart() @@ -448,6 +453,8 @@ func (m *ConsensusFSM) prepare(evt fsm.Event) (fsm.State, error) { } func (m *ConsensusFSM) onReceiveBlock(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) m.ctx.Logger().Debug("Receive block") cEvt, ok := evt.(*ConsensusEvent) if !ok { @@ -473,6 +480,8 @@ func (m *ConsensusFSM) processBlock(block interface{}) error { } func (m *ConsensusFSM) onFailedToReceiveBlock(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) m.ctx.Logger().Warn("didn't receive the proposed block before timeout") if err := m.processBlock(nil); err != nil { m.ctx.Logger().Debug("Failed to generate proposal endorsement", zap.Error(err)) @@ -482,10 +491,14 @@ func (m *ConsensusFSM) onFailedToReceiveBlock(evt fsm.Event) (fsm.State, error) } func (m *ConsensusFSM) onReceiveProposalEndorsementInAcceptLockEndorsementState(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) return m.onReceiveProposalEndorsement(evt, sAcceptLockEndorsement) } func (m *ConsensusFSM) onReceiveProposalEndorsementInAcceptProposalEndorsementState(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) return m.onReceiveProposalEndorsement(evt, sAcceptProposalEndorsement) } @@ -509,12 +522,16 @@ func (m *ConsensusFSM) onReceiveProposalEndorsement(evt fsm.Event, currentState } func (m *ConsensusFSM) onStopReceivingProposalEndorsement(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) m.ctx.Logger().Warn("Not enough proposal endorsements") return sAcceptLockEndorsement, nil } func (m *ConsensusFSM) onReceiveLockEndorsement(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) cEvt, ok := evt.(*ConsensusEvent) if !ok { return sAcceptLockEndorsement, errors.Wrap(ErrEvtCast, "failed to cast to consensus event") @@ -533,6 +550,8 @@ func (m *ConsensusFSM) onReceiveLockEndorsement(evt fsm.Event) (fsm.State, error } func (m *ConsensusFSM) onBroadcastPreCommitEndorsement(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) cEvt, ok := evt.(*ConsensusEvent) if !ok { return sAcceptPreCommitEndorsement, errors.Wrap(ErrEvtCast, "failed to cast to consensus event") @@ -569,6 +588,8 @@ func (m *ConsensusFSM) onStopReceivingPreCommitEndorsement(evt fsm.Event) (fsm.S // handleBackdoorEvt takes the dst state from the event and move the FSM into it func (m *ConsensusFSM) handleBackdoorEvt(evt fsm.Event) (fsm.State, error) { + log.L().Debug("Handle event", zap.String("event", string(evt.Type()))) + defer log.L().Debug("Handled event", zap.String("event", string(evt.Type()))) cEvt, ok := evt.(*ConsensusEvent) if !ok { return sPrepare, errors.Wrap(ErrEvtCast, "the event is not a backdoor event") diff --git a/consensus/scheme/rolldpos/chainedrolldpos.go b/consensus/scheme/rolldpos/chainedrolldpos.go new file mode 100644 index 0000000000..08b059d403 --- /dev/null +++ b/consensus/scheme/rolldpos/chainedrolldpos.go @@ -0,0 +1,276 @@ +package rolldpos + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/iotexproject/go-fsm" + "github.com/iotexproject/iotex-proto/golang/iotextypes" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/v2/blockchain/block" + "github.com/iotexproject/iotex-core/v2/consensus/consensusfsm" + "github.com/iotexproject/iotex-core/v2/consensus/scheme" + "github.com/iotexproject/iotex-core/v2/pkg/log" +) + +type ( + Round interface { + Start(ctx context.Context) error + Stop(ctx context.Context) error + Height() uint64 + RoundNum() uint32 + Handle(msg *iotextypes.ConsensusMessage) + Result() chan int + Calibrate(h uint64) + State() fsm.State + } + + ChainedRollDPoS struct { + rounds []Round + quit chan struct{} + builder *Builder + wg sync.WaitGroup + active bool + mutex sync.RWMutex + } +) + +func NewChainedRollDPoS(builder *Builder) *ChainedRollDPoS { + return &ChainedRollDPoS{ + rounds: make([]Round, 0), + quit: make(chan struct{}), + builder: builder, + active: true, + } +} + +func (cr *ChainedRollDPoS) Start(ctx context.Context) error { + // create a new RollDPoS instance every 1s + cr.wg.Add(1) + go func() { + defer cr.wg.Done() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-cr.quit: + return + case <-ticker.C: + func() { + cr.mutex.Lock() + defer cr.mutex.Unlock() + if len(cr.rounds) < 5 { + // log.L().Debug("create round") + // defer log.L().Debug("new round finished") + rs, err := cr.newRound() + if err != nil { + log.L().Error("Failed to create new round", zap.Error(err)) + return + } + ctx, cancel := context.WithCancel(context.Background()) + cr.RunRound(ctx, rs) + + for _, r := range cr.rounds { + if r.Height() == rs.Height() { + log.L().Debug("round already exists", zap.Uint64("height", rs.Height()), zap.Uint32("round", rs.RoundNum())) + cancel() + return + } + } + cr.rounds = append(cr.rounds, rs) + } + }() + } + } + }() + cr.wg.Add(1) + go func() { + defer cr.wg.Done() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-cr.quit: + return + case <-ticker.C: + cr.mutex.RLock() + fmt.Printf("\nrounds size: %d\n", len(cr.rounds)) + for _, r := range cr.rounds { + fmt.Printf("round height: %d, round num: %d, state: %+v\n", r.Height(), r.RoundNum(), r.State()) + } + fmt.Printf("\n") + cr.mutex.RUnlock() + } + } + }() + return nil +} + +func (cr *ChainedRollDPoS) Stop(ctx context.Context) error { + close(cr.quit) + cr.wg.Wait() + return nil +} + +func (cr *ChainedRollDPoS) HandleConsensusMsg(msg *iotextypes.ConsensusMessage) error { + for _, r := range cr.rounds { + if r.Height() == msg.Height { + r.Handle(msg) + } + } + return nil +} + +func (cr *ChainedRollDPoS) Calibrate(h uint64) { + for _, r := range cr.rounds { + r.Calibrate(h) + } +} + +func (cr *ChainedRollDPoS) ValidateBlockFooter(*block.Block) error { + return nil +} + +func (cr *ChainedRollDPoS) Metrics() (scheme.ConsensusMetrics, error) { + return scheme.ConsensusMetrics{}, nil +} + +func (cr *ChainedRollDPoS) Activate(b bool) { + cr.active = b +} + +func (cr *ChainedRollDPoS) Active() bool { + return cr.active +} + +func (cr *ChainedRollDPoS) removeRound(round Round) { + for i, r := range cr.rounds { + if r.Height() == round.Height() { + cr.rounds = append(cr.rounds[:i], cr.rounds[i+1:]...) + log.L().Debug("round removed", zap.Uint64("height", round.Height()), zap.Uint32("round", round.RoundNum())) + break + } + } +} + +func (cr *ChainedRollDPoS) RunRound(ctx context.Context, r Round) { + // rolldpos round loop, but ternimate when round status is finalized or invalid + err := r.Start(ctx) + if err != nil { + log.L().Error("Failed to start round", zap.Error(err)) + return + } + go func() { + defer func() { + err := r.Stop(ctx) + if err != nil { + log.L().Error("Failed to stop round", zap.Error(err)) + } + cr.mutex.Lock() + cr.removeRound(r) + cr.mutex.Unlock() + }() + + var res int + select { + case res = <-r.Result(): + case <-ctx.Done(): + log.L().Info("round canceled", zap.Error(ctx.Err()), zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + return + } + switch res { + case 1: + log.L().Info("round finished", zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + case 0: + log.L().Info("round invalid", zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + default: + log.L().Info("round terminated", zap.Int("result", res), zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + } + }() +} + +func (cr *ChainedRollDPoS) newRound() (Round, error) { + dpos, err := cr.builder.BuildV2() + if err != nil { + return nil, err + } + return &roundV2{ + dpos: dpos, + res: make(chan int, 1), + quit: make(chan struct{}), + }, nil +} + +type roundV2 struct { + dpos *RollDPoS + res chan int + quit chan struct{} + wg sync.WaitGroup +} + +func (r *roundV2) Start(ctx context.Context) error { + err := r.dpos.Start(ctx) + if err != nil { + return err + } + r.wg.Add(1) + go func() { + defer r.wg.Done() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-r.quit: + return + case <-ticker.C: + switch r.dpos.cfsm.CurrentState() { + case consensusfsm.SFinalized: + r.res <- 1 + return + case consensusfsm.SInvalid: + r.dpos.ctx.Chain().DropDraftBlock(r.Height()) + r.res <- 0 + return + default: + } + } + } + }() + return nil +} + +func (r *roundV2) Stop(ctx context.Context) error { + close(r.quit) + r.wg.Wait() + return r.dpos.Stop(ctx) +} + +func (r *roundV2) Height() uint64 { + return r.dpos.ctx.Height() +} + +func (r *roundV2) RoundNum() uint32 { + return r.dpos.ctx.Number() +} + +func (r *roundV2) Handle(msg *iotextypes.ConsensusMessage) { + err := r.dpos.HandleConsensusMsg(msg) + if err != nil { + log.L().Error("Failed to handle consensus message", zap.Error(err)) + } +} + +func (r *roundV2) Result() chan int { + return r.res +} + +func (r *roundV2) Calibrate(h uint64) { + r.dpos.Calibrate(h) +} + +func (r *roundV2) State() fsm.State { + return r.dpos.cfsm.CurrentState() +} diff --git a/consensus/scheme/rolldpos/chainedrolldpos_test.go b/consensus/scheme/rolldpos/chainedrolldpos_test.go new file mode 100644 index 0000000000..ac9915a7eb --- /dev/null +++ b/consensus/scheme/rolldpos/chainedrolldpos_test.go @@ -0,0 +1,248 @@ +package rolldpos + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/facebookgo/clock" + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/iotexproject/iotex-core/v2/action/protocol/rolldpos" + "github.com/iotexproject/iotex-core/v2/blockchain" + "github.com/iotexproject/iotex-core/v2/blockchain/block" + "github.com/iotexproject/iotex-core/v2/blockchain/genesis" + "github.com/iotexproject/iotex-core/v2/consensus/consensusfsm" + "github.com/iotexproject/iotex-core/v2/db" + "github.com/iotexproject/iotex-core/v2/pkg/log" + "github.com/iotexproject/iotex-core/v2/test/identityset" + "github.com/iotexproject/iotex-core/v2/test/mock/mock_blockchain" + "github.com/iotexproject/iotex-core/v2/testutil" +) + +type mockChain struct { + draft uint64 + finalized uint64 + finalizedHash map[string]bool + blocks map[uint64]*block.Block + draftBlocks map[uint64]*block.Header + mutex sync.RWMutex +} + +func (m *mockChain) OngoingBlockHeight() uint64 { + log.L().Debug("OngoingBlockHeight enter") + m.mutex.RLock() + log.L().Debug("OngoingBlockHeight locked") + defer m.mutex.RUnlock() + return m.draft +} + +func (m *mockChain) NewDraft() uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + m.draft++ + return m.draft +} + +func (m *mockChain) NewDraftBlock(blk *block.Block) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.draftBlocks[blk.Height()] = &blk.Header +} + +func (m *mockChain) Finalized(blkHash []byte) bool { + m.mutex.RLock() + defer m.mutex.RUnlock() + if _, ok := m.finalizedHash[string(blkHash)]; ok { + return true + } + return false +} + +func (m *mockChain) FinalizeBlock(blk *block.Block) error { + m.mutex.Lock() + defer m.mutex.Unlock() + height := blk.Height() + hash := blk.HashBlock() + if height != m.finalized+1 { + return errors.Errorf("cannot finalize block %d, current finalized block is %d", height, m.finalized) + } + m.finalized = height + m.finalizedHash[string(hash[:])] = true + m.blocks[height] = blk + delete(m.draftBlocks, height) + return nil +} + +func (m *mockChain) Block(height uint64) (*block.Block, error) { + m.mutex.RLock() + defer m.mutex.RUnlock() + blk, ok := m.blocks[height] + if !ok { + return nil, errors.Errorf("block %d not found", height) + } + return blk, nil +} + +func (m *mockChain) PendingBlockHeader(height uint64) (*block.Header, error) { + m.mutex.RLock() + defer m.mutex.RUnlock() + var header *block.Header + blk, ok := m.blocks[height] + if !ok { + header, ok = m.draftBlocks[height] + if !ok { + return nil, errors.Errorf("block %d not found", height) + } + } else { + header = &blk.Header + } + return header, nil +} + +func (m *mockChain) TipHeight() uint64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.finalized +} + +func (m *mockChain) CancelBlock(height uint64) { + m.mutex.Lock() + defer m.mutex.Unlock() + log.L().Debug("cancel block", zap.Uint64("height", height), zap.Uint64("draft", m.draft)) + for i := height; i <= m.draft; i++ { + delete(m.draftBlocks, i) + } + if height > 0 { + m.draft = height - 1 + } else { + m.draft = 0 + } +} + +func TestChainedRollDPoS(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + r := require.New(t) + g := genesis.Default + g.NumCandidateDelegates = 4 + g.NumDelegates = 1 + builderCfg := BuilderConfig{ + Chain: blockchain.DefaultConfig, + Consensus: DefaultConfig, + DardanellesUpgrade: consensusfsm.DefaultDardanellesUpgradeConfig, + DB: db.DefaultConfig, + Genesis: g, + SystemActive: true, + } + consensusdb, err := testutil.PathOfTempFile("consensus") + r.NoError(err) + defer func() { + testutil.CleanupPath(consensusdb) + }() + + builderCfg.Consensus.ConsensusDBPath = consensusdb + builderCfg.Consensus.Delay = 400 * time.Millisecond + builderCfg.Consensus.ToleratedOvertime = 400 * time.Millisecond + builderCfg.Consensus.FSM.AcceptBlockTTL = 1 * time.Second + builderCfg.Consensus.FSM.AcceptProposalEndorsementTTL = 1 * time.Second + builderCfg.Consensus.FSM.AcceptLockEndorsementTTL = 1 * time.Second + builderCfg.Consensus.FSM.CommitTTL = 1 * time.Second + builderCfg.Consensus.FSM.UnmatchedEventTTL = 1 * time.Second + builderCfg.Consensus.FSM.UnmatchedEventInterval = 100 * time.Millisecond + builderCfg.Consensus.FSM.EventChanSize = 10000 + builderCfg.Genesis.BlockInterval = 5 * time.Second + builderCfg.DardanellesUpgrade.AcceptBlockTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.AcceptLockEndorsementTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.AcceptProposalEndorsementTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.BlockInterval = 5 * time.Second + builderCfg.DardanellesUpgrade.CommitTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.Delay = 400 * time.Millisecond + builderCfg.DardanellesUpgrade.UnmatchedEventInterval = 100 * time.Millisecond + builderCfg.DardanellesUpgrade.UnmatchedEventTTL = 1 * time.Second + + chain := &mockChain{ + finalizedHash: make(map[string]bool), + blocks: make(map[uint64]*block.Block), + draftBlocks: make(map[uint64]*block.Header), + } + bc := mock_blockchain.NewMockBlockchain(ctrl) + bc.EXPECT().EvmNetworkID().Return(uint32(1)).AnyTimes() + bc.EXPECT().Genesis().Return(g).AnyTimes() + bc.EXPECT().MintNewBlock(gomock.Any()).DoAndReturn(func(timestamp time.Time) (*block.Block, error) { + b := block.NewBuilder(block.NewRunnableActionsBuilder().Build()) + b.SetHeight(chain.NewDraft()) + b.SetTimestamp(timestamp) + blk, err := b.SignAndBuild(identityset.PrivateKey(1)) + if err != nil { + return nil, err + } + log.L().Debug("mint new block", zap.Time("ts", timestamp), zap.Uint64("height", blk.Height())) + chain.NewDraftBlock(&blk) + return &blk, nil + }).AnyTimes() + bc.EXPECT().ValidateBlock(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + bc.EXPECT().CommitBlock(gomock.Any()).DoAndReturn(func(blk *block.Block) error { + return chain.FinalizeBlock(blk) + }).AnyTimes() + bc.EXPECT().ChainAddress().Return(builderCfg.Chain.Address).AnyTimes() + bc.EXPECT().BlockFooterByHeight(gomock.Any()).DoAndReturn(func(height uint64) (*block.Footer, error) { + blk, err := chain.Block(height) + if err != nil { + return nil, errors.Wrapf(err, "block %d not found", height) + } + return &blk.Footer, nil + }).AnyTimes() + bc.EXPECT().BlockHeaderByHeight(gomock.Any()).DoAndReturn(func(height uint64) (*block.Header, error) { + blk, err := chain.PendingBlockHeader(height) + if err != nil { + return nil, errors.Wrapf(err, "block %d not found", height) + } + return blk, nil + }).AnyTimes() + bc.EXPECT().TipHeight().DoAndReturn(func() uint64 { + return chain.TipHeight() + }).AnyTimes() + clock := clock.New() + broadcastHandler := func(msg proto.Message) error { return nil } + delegatesByEpochFunc := func(uint64) ([]string, error) { + delegates := []string{} + for i := 1; i <= int(g.NumCandidateDelegates); i++ { + delegates = append(delegates, identityset.Address(i).String()) + } + return delegates, nil + } + proposersByEpochFunc := func(uint64) ([]string, error) { + return []string{ + identityset.Address(1).String(), + }, nil + } + rp := rolldpos.NewProtocol( + g.NumCandidateDelegates, + g.NumDelegates, + g.NumSubEpochs, + ) + bd := NewRollDPoSBuilder(). + SetAddr(identityset.Address(1).String()). + SetPriKey(identityset.PrivateKey(1)). + SetConfig(builderCfg). + SetChainManager(NewChainManager2(bc, chain)). + SetBlockDeserializer(block.NewDeserializer(bc.EvmNetworkID())). + SetClock(clock). + SetBroadcast(broadcastHandler). + SetDelegatesByEpochFunc(delegatesByEpochFunc). + SetProposersByEpochFunc(proposersByEpochFunc). + RegisterProtocol(rp) + crdpos := NewChainedRollDPoS(bd) + r.NoError(crdpos.Start(context.Background())) + defer func() { + r.NoError(crdpos.Stop(context.Background())) + }() + + time.Sleep(60 * time.Second) +} diff --git a/consensus/scheme/rolldpos/chainedrolldposctx.go b/consensus/scheme/rolldpos/chainedrolldposctx.go new file mode 100644 index 0000000000..fadbcaa0eb --- /dev/null +++ b/consensus/scheme/rolldpos/chainedrolldposctx.go @@ -0,0 +1,44 @@ +package rolldpos + +import ( + "go.uber.org/zap" +) + +type chainedRollDPoSCtx struct { + *rollDPoSCtx +} + +func NewChainedRollDPoSCtx( + ctx *rollDPoSCtx, +) (RDPoSCtx, error) { + return &chainedRollDPoSCtx{ + rollDPoSCtx: ctx, + }, nil +} + +func (cctx *chainedRollDPoSCtx) Prepare() error { + ctx := cctx.rollDPoSCtx + ctx.logger().Debug("prepare") + ctx.mutex.Lock() + ctx.logger().Debug("prepare lock") + defer ctx.mutex.Unlock() + height := cctx.chain.DraftHeight() + 1 + ctx.logger().Debug("prepare draft height", zap.Uint64("height", height)) + newRound, err := ctx.roundCalc.UpdateRound(ctx.round, height, ctx.BlockInterval(height), ctx.clock.Now(), ctx.toleratedOvertime) + if err != nil { + return err + } + ctx.logger().Debug( + "new round", + zap.Uint64("newheight", newRound.height), + zap.String("newts", ctx.clock.Now().String()), + zap.Uint64("newepoch", newRound.epochNum), + zap.Uint64("newepochStartHeight", newRound.epochStartHeight), + zap.Uint32("newround", newRound.roundNum), + zap.String("newroundStartTime", newRound.roundStartTime.String()), + ) + ctx.round = newRound + _consensusHeightMtc.WithLabelValues().Set(float64(ctx.round.height)) + _timeSlotMtc.WithLabelValues().Set(float64(ctx.round.roundNum)) + return nil +} diff --git a/consensus/scheme/rolldpos/endorsementmanager.go b/consensus/scheme/rolldpos/endorsementmanager.go index f977c038d3..0fba73e66a 100644 --- a/consensus/scheme/rolldpos/endorsementmanager.go +++ b/consensus/scheme/rolldpos/endorsementmanager.go @@ -214,19 +214,35 @@ func (bc *blockEndorsementCollection) Endorsements( type endorsementManager struct { isMajorityFunc EndorsedByMajorityFunc - eManagerDB db.KVStore + eManagerDB *roundStore collections map[string]*blockEndorsementCollection cachedMintedBlk *block.Block + deserializer *block.Deserializer } -func newEndorsementManager(eManagerDB db.KVStore, deserializer *block.Deserializer) (*endorsementManager, error) { +func newEndorsementManager(eManagerDB *roundStore, deserializer *block.Deserializer) (*endorsementManager, error) { if eManagerDB == nil { return &endorsementManager{ eManagerDB: nil, collections: map[string]*blockEndorsementCollection{}, cachedMintedBlk: nil, + deserializer: deserializer, }, nil } + manager := &endorsementManager{ + eManagerDB: eManagerDB, + collections: map[string]*blockEndorsementCollection{}, + cachedMintedBlk: nil, + deserializer: deserializer, + } + if err := manager.load(); err != nil { + return nil, err + } + return manager, nil +} + +func (m *endorsementManager) load() error { + eManagerDB := m.eManagerDB bytes, err := eManagerDB.Get(_eManagerNS, _statusKey) switch errors.Cause(err) { case nil: @@ -234,23 +250,19 @@ func newEndorsementManager(eManagerDB db.KVStore, deserializer *block.Deserializ manager := &endorsementManager{eManagerDB: eManagerDB} managerProto := &endorsementpb.EndorsementManager{} if err = proto.Unmarshal(bytes, managerProto); err != nil { - return nil, err + return err } - if err = manager.fromProto(managerProto, deserializer); err != nil { - return nil, err + if err = manager.fromProto(managerProto, m.deserializer); err != nil { + return err } manager.eManagerDB = eManagerDB - return manager, nil + return nil case db.ErrNotExist: // If DB doesn't have any information log.L().Info("First initializing DB") - return &endorsementManager{ - eManagerDB: eManagerDB, - collections: map[string]*blockEndorsementCollection{}, - cachedMintedBlk: nil, - }, nil + return nil default: - return nil, err + return err } } @@ -406,6 +418,15 @@ func (m *endorsementManager) Cleanup(timestamp time.Time) error { return nil } +func (m *endorsementManager) WithRound(height uint64, roundNum uint32) error { + m.Cleanup(time.Time{}) + if m.eManagerDB == nil { + return nil + } + m.eManagerDB.ChangeRound(height, roundNum) + return m.load() +} + func (m *endorsementManager) Log( logger *zap.Logger, delegates []string, diff --git a/consensus/scheme/rolldpos/rolldpos.go b/consensus/scheme/rolldpos/rolldpos.go index 5ca1cdf3fe..bfc9c811c9 100644 --- a/consensus/scheme/rolldpos/rolldpos.go +++ b/consensus/scheme/rolldpos/rolldpos.go @@ -62,10 +62,35 @@ type ( TipHeight() uint64 // ChainAddress returns chain address on parent chain, the root chain return empty. ChainAddress() string + // DraftHeight returns draft height + DraftHeight() uint64 + DropDraftBlock(uint64) + } + + workingSet interface { + OngoingBlockHeight() uint64 + PendingBlockHeader(uint64) (*block.Header, error) + CancelBlock(uint64) + // OngoingBlockHeight() uint64 + // PendingBlockHeader(uint64) (*block.Header, error) } chainManager struct { bc blockchain.Blockchain + ws workingSet + } + + consensusFsm interface { + Start(context.Context) error + Stop(context.Context) error + BackToPrepare(time.Duration) (fsm.State, error) + ProduceReceiveBlockEvent(any) + ProduceReceiveProposalEndorsementEvent(any) + ProduceReceiveLockEndorsementEvent(any) + ProduceReceivePreCommitEndorsementEvent(any) + Calibrate(uint64) + NumPendingEvents() int + CurrentState() fsm.State } ) @@ -92,11 +117,25 @@ func NewChainManager(bc blockchain.Blockchain) ChainManager { } } +func NewChainManager2(bc blockchain.Blockchain, ws workingSet) ChainManager { + return &chainManager{ + bc: bc, + ws: ws, + } +} + // BlockProposeTime return propose time by height func (cm *chainManager) BlockProposeTime(height uint64) (time.Time, error) { if height == 0 { return time.Unix(cm.bc.Genesis().Timestamp, 0), nil } + if cm.ws != nil && height > cm.bc.TipHeight() { + log.L().Debug("Get propose time from working set", zap.Uint64("height", height)) + blk, err := cm.ws.PendingBlockHeader(height) + if err == nil { + return blk.Timestamp(), nil + } + } header, err := cm.bc.BlockHeaderByHeight(height) if err != nil { return time.Time{}, errors.Wrapf( @@ -109,6 +148,12 @@ func (cm *chainManager) BlockProposeTime(height uint64) (time.Time, error) { // BlockCommitTime return commit time by height func (cm *chainManager) BlockCommitTime(height uint64) (time.Time, error) { + if cm.ws != nil && height > cm.bc.TipHeight() { + blk, err := cm.ws.PendingBlockHeader(height) + if err == nil { + return blk.Timestamp(), nil + } + } footer, err := cm.bc.BlockFooterByHeight(height) if err != nil { return time.Time{}, errors.Wrapf( @@ -144,9 +189,22 @@ func (cm *chainManager) ChainAddress() string { return cm.bc.ChainAddress() } +func (cm *chainManager) DraftHeight() uint64 { + if cm.ws == nil { + return cm.TipHeight() + } + return cm.ws.OngoingBlockHeight() +} + +func (cm *chainManager) DropDraftBlock(height uint64) { + if cm.ws != nil { + cm.ws.CancelBlock(height) + } +} + // RollDPoS is Roll-DPoS consensus main entrance type RollDPoS struct { - cfsm *consensusfsm.ConsensusFSM + cfsm consensusFsm ctx RDPoSCtx startDelay time.Duration ready chan interface{} @@ -456,3 +514,51 @@ func (b *Builder) Build() (*RollDPoS, error) { ready: make(chan interface{}), }, nil } + +func (b *Builder) BuildV2() (*RollDPoS, error) { + if b.chain == nil { + return nil, errors.Wrap(ErrNewRollDPoS, "blockchain APIs is nil") + } + if b.broadcastHandler == nil { + return nil, errors.Wrap(ErrNewRollDPoS, "broadcast callback is nil") + } + if b.clock == nil { + b.clock = clock.New() + } + b.cfg.DB.DbPath = "" // disable consensus db + b.cfg.Consensus.Delay = 0 + ctx, err := NewRollDPoSCtx( + consensusfsm.NewConsensusConfig(b.cfg.Consensus.FSM, b.cfg.DardanellesUpgrade, b.cfg.Genesis, b.cfg.Consensus.Delay), + b.cfg.DB, + b.cfg.SystemActive, + b.cfg.Consensus.ToleratedOvertime, + b.cfg.Genesis.TimeBasedRotation, + b.chain, + b.blockDeserializer, + b.rp, + b.broadcastHandler, + b.delegatesByEpochFunc, + b.proposersByEpochFunc, + b.encodedAddr, + b.priKey, + b.clock, + b.cfg.Genesis.BeringBlockHeight, + ) + if err != nil { + return nil, errors.Wrap(err, "error when constructing consensus context") + } + ctx, err = NewChainedRollDPoSCtx(ctx.(*rollDPoSCtx)) + if err != nil { + return nil, errors.Wrap(err, "error when constructing chained roll dpos context") + } + cfsm, err := consensusfsm.NewChainedConsensusFSM(ctx, b.clock) + if err != nil { + return nil, errors.Wrap(err, "error when constructing the consensus FSM") + } + return &RollDPoS{ + cfsm: cfsm, + ctx: ctx, + startDelay: b.cfg.Consensus.Delay, + ready: make(chan interface{}), + }, nil +} diff --git a/consensus/scheme/rolldpos/rolldposctx.go b/consensus/scheme/rolldpos/rolldposctx.go index 62c9a4e3ef..5a4ba505c3 100644 --- a/consensus/scheme/rolldpos/rolldposctx.go +++ b/consensus/scheme/rolldpos/rolldposctx.go @@ -136,16 +136,16 @@ func NewRollDPoSCtx( if proposersByEpochFunc == nil { return nil, errors.New("proposers by epoch function cannot be nil") } - if cfg.AcceptBlockTTL(0)+cfg.AcceptProposalEndorsementTTL(0)+cfg.AcceptLockEndorsementTTL(0)+cfg.CommitTTL(0) > cfg.BlockInterval(0) { - return nil, errors.Errorf( - "invalid ttl config, the sum of ttls should be equal to block interval. acceptBlockTTL %d, acceptProposalEndorsementTTL %d, acceptLockEndorsementTTL %d, commitTTL %d, blockInterval %d", - cfg.AcceptBlockTTL(0), - cfg.AcceptProposalEndorsementTTL(0), - cfg.AcceptLockEndorsementTTL(0), - cfg.CommitTTL(0), - cfg.BlockInterval(0), - ) - } + // if cfg.AcceptBlockTTL(0)+cfg.AcceptProposalEndorsementTTL(0)+cfg.AcceptLockEndorsementTTL(0)+cfg.CommitTTL(0) > cfg.BlockInterval(0) { + // return nil, errors.Errorf( + // "invalid ttl config, the sum of ttls should be equal to block interval. acceptBlockTTL %d, acceptProposalEndorsementTTL %d, acceptLockEndorsementTTL %d, commitTTL %d, blockInterval %d", + // cfg.AcceptBlockTTL(0), + // cfg.AcceptProposalEndorsementTTL(0), + // cfg.AcceptLockEndorsementTTL(0), + // cfg.CommitTTL(0), + // cfg.BlockInterval(0), + // ) + // } var eManagerDB db.KVStore if len(consensusDBConfig.DbPath) > 0 { eManagerDB = db.NewBoltDB(consensusDBConfig) @@ -179,7 +179,7 @@ func (ctx *rollDPoSCtx) Start(c context.Context) (err error) { if err := ctx.eManagerDB.Start(c); err != nil { return errors.Wrap(err, "Error when starting the collectionDB") } - eManager, err = newEndorsementManager(ctx.eManagerDB, ctx.blockDeserializer) + eManager, err = newEndorsementManager(newRoundStore(ctx.eManagerDB, 0, 0), ctx.blockDeserializer) } ctx.round, err = ctx.roundCalc.NewRoundWithToleration(0, ctx.BlockInterval(0), ctx.clock.Now(), eManager, ctx.toleratedOvertime) @@ -376,11 +376,13 @@ func (ctx *rollDPoSCtx) WaitUntilRoundStart() time.Duration { now := ctx.clock.Now() startTime := ctx.round.StartTime() if now.Before(startTime) { + ctx.Logger().Debug("wait until round start", zap.Time("start", startTime), zap.Time("now", now)) time.Sleep(startTime.Sub(now)) return 0 } overTime := now.Sub(startTime) if !ctx.isDelegate() && ctx.toleratedOvertime > overTime { + ctx.Logger().Debug("tolerating overtime", zap.Duration("toleratedOvertime", ctx.toleratedOvertime), zap.Duration("overTime", overTime)) time.Sleep(ctx.toleratedOvertime - overTime) return 0 } @@ -604,6 +606,13 @@ func (ctx *rollDPoSCtx) Height() uint64 { return ctx.round.Height() } +func (ctx *rollDPoSCtx) Number() uint32 { + ctx.mutex.RLock() + defer ctx.mutex.RUnlock() + + return ctx.round.Number() +} + func (ctx *rollDPoSCtx) Activate(active bool) { ctx.mutex.Lock() defer ctx.mutex.Unlock() @@ -660,7 +669,7 @@ func (ctx *rollDPoSCtx) endorseBlockProposal(proposal *blockProposal) (*Endorsed } func (ctx *rollDPoSCtx) logger() *zap.Logger { - return ctx.round.Log(log.Logger("consensus")) + return ctx.round.Log(log.Logger("consensus")).With(zap.String("ioAddr", ctx.encodedAddr)) } func (ctx *rollDPoSCtx) newConsensusEvent( diff --git a/consensus/scheme/rolldpos/roundcalculator.go b/consensus/scheme/rolldpos/roundcalculator.go index 4fc21ac9a6..c9905382e5 100644 --- a/consensus/scheme/rolldpos/roundcalculator.go +++ b/consensus/scheme/rolldpos/roundcalculator.go @@ -9,9 +9,11 @@ import ( "time" "github.com/pkg/errors" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/action/protocol/rolldpos" "github.com/iotexproject/iotex-core/v2/endorsement" + "github.com/iotexproject/iotex-core/v2/pkg/log" ) var errInvalidCurrentTime = errors.New("invalid current time") @@ -72,7 +74,7 @@ func (c *roundCalculator) UpdateRound(round *roundCtx, height uint64, blockInter blockInLock = round.blockInLock proofOfLock = round.proofOfLock } else { - err = round.eManager.Cleanup(time.Time{}) + err := round.eManager.WithRound(height, roundNum) if err != nil { return nil, err } @@ -135,12 +137,14 @@ func (c *roundCalculator) roundInfo( now time.Time, toleratedOvertime time.Duration, ) (roundNum uint32, roundStartTime time.Time, err error) { + blockProcessDuration := blockInterval + blockInterval = time.Second var lastBlockTime time.Time if lastBlockTime, err = c.chain.BlockProposeTime(0); err != nil { return } if height > 1 { - if height >= c.beringHeight { + if true { var lastBlkProposeTime time.Time if lastBlkProposeTime, err = c.chain.BlockProposeTime(height - 1); err != nil { return @@ -157,22 +161,25 @@ func (c *roundCalculator) roundInfo( if !lastBlockTime.Before(now) { // TODO: if this is the case, it is possible that the system time is far behind the time of other nodes. // better error handling may be needed on the caller side - err = errors.Wrapf( - errInvalidCurrentTime, - "last block time %s is after than current time %s", - lastBlockTime, - now, - ) - return + // err = errors.Wrapf( + // errInvalidCurrentTime, + // "last block time %s is after than current time %s height %d", + // lastBlockTime, + // now, + // height, + // ) + roundStartTime = lastBlockTime.Add(blockInterval) + return roundNum, roundStartTime, nil } duration := now.Sub(lastBlockTime) if duration > blockInterval { - roundNum = uint32(duration / blockInterval) - if toleratedOvertime == 0 || duration%blockInterval < toleratedOvertime { + roundNum = 1 + uint32((duration-blockInterval)/blockProcessDuration) + if toleratedOvertime == 0 || (duration-blockInterval)%blockProcessDuration < toleratedOvertime { roundNum-- } } - roundStartTime = lastBlockTime.Add(time.Duration(roundNum+1) * blockInterval) + log.L().Debug("round info", zap.Time("lastBlockTime", lastBlockTime), zap.Time("now", now), zap.Uint64("height", height), zap.Duration("duration", duration), zap.Uint32("roundNum", roundNum)) + roundStartTime = lastBlockTime.Add(time.Duration(roundNum)*blockProcessDuration + blockInterval) return roundNum, roundStartTime, nil } @@ -244,6 +251,7 @@ func (c *roundCalculator) newRound( return nil, err } } + eManager.WithRound(height, roundNum) round = &roundCtx{ epochNum: epochNum, epochStartHeight: epochStartHeight, diff --git a/consensus/scheme/rolldpos/roundstore.go b/consensus/scheme/rolldpos/roundstore.go new file mode 100644 index 0000000000..b1f90d3bb3 --- /dev/null +++ b/consensus/scheme/rolldpos/roundstore.go @@ -0,0 +1,40 @@ +package rolldpos + +import "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" + +type ( + kvStore interface { + Put(string, []byte, []byte) error + Get(string, []byte) ([]byte, error) + } + + roundStore struct { + underlying kvStore + height uint64 + roundNum uint32 + } +) + +func newRoundStore(kv kvStore, height uint64, roundNum uint32) *roundStore { + return &roundStore{underlying: kv, height: height, roundNum: roundNum} +} + +func (rs *roundStore) Put(ns string, key []byte, value []byte) error { + return rs.underlying.Put(ns, rs.realKey(key), value) +} + +func (rs *roundStore) Get(ns string, key []byte) ([]byte, error) { + return rs.underlying.Get(ns, rs.realKey(key)) +} + +func (rs *roundStore) ChangeRound(height uint64, roundNum uint32) { + rs.height = height + rs.roundNum = roundNum +} + +func (rs *roundStore) realKey(key []byte) []byte { + keys := byteutil.Uint64ToBytes(rs.height) + keys = append(keys, byteutil.Uint32ToBytes(rs.roundNum)...) + keys = append(keys, key...) + return keys +} diff --git a/db/kvstorewithbuffer.go b/db/kvstorewithbuffer.go index 2fe33d1b09..2ecbc6bebe 100644 --- a/db/kvstorewithbuffer.go +++ b/db/kvstorewithbuffer.go @@ -24,6 +24,7 @@ type ( // and transaction with multiple writes KVStoreWithBuffer interface { KVStore + GetDirty(string, []byte) ([]byte, bool) withBuffer } @@ -176,6 +177,13 @@ func (kvb *kvStoreWithBuffer) Get(ns string, key []byte) ([]byte, error) { return value, err } +func (kvb *kvStoreWithBuffer) GetDirty(ns string, key []byte) ([]byte, bool) { + if value, err := kvb.buffer.Get(ns, key); err == nil { + return value, true + } + return nil, false +} + func (kvb *kvStoreWithBuffer) Put(ns string, key, value []byte) error { kvb.buffer.Put(ns, key, value, fmt.Sprintf("faild to put %x in %s", key, ns)) return nil diff --git a/pkg/log/log.go b/pkg/log/log.go index 5df5d489c1..e25753e63e 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -39,6 +39,7 @@ func init() { zapCfg := zap.NewDevelopmentConfig() zapCfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder zapCfg.Level.SetLevel(zap.InfoLevel) + // zapCfg.Level.SetLevel(zap.DebugLevel) l, err := zapCfg.Build() if err != nil { log.Println("Failed to init zap global logger, no zap log will be shown till zap is properly initialized: ", err) diff --git a/state/factory/factory.go b/state/factory/factory.go index 274cc41b7f..73a2fb5f61 100644 --- a/state/factory/factory.go +++ b/state/factory/factory.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "github.com/iotexproject/go-pkgs/cache" "github.com/iotexproject/go-pkgs/hash" "github.com/iotexproject/iotex-core/v2/action" @@ -88,6 +87,10 @@ type ( PutBlock(context.Context, *block.Block) error WorkingSet(context.Context) (protocol.StateManager, error) WorkingSetAtHeight(context.Context, uint64, ...*action.SealedEnvelope) (protocol.StateManager, error) + OngoingBlockHeight() uint64 + PendingBlockHeader(uint64) (*block.Header, error) + PutBlockHeader(*block.Header) + CancelBlock(uint64) } // factory implements StateFactory interface, tracks changes to account/contract and batch-commits to DB @@ -101,7 +104,7 @@ type ( twoLayerTrie trie.TwoLayerTrie // global state trie, this is a read only trie dao db.KVStore // the underlying DB for account/contract storage timerFactory *prometheustimer.TimerFactory - workingsets cache.LRUCache // lru cache for workingsets + chamber WorkingSetChamber protocolView protocol.View skipBlockValidationOnPut bool ps *patchStore @@ -157,7 +160,7 @@ func NewFactory(cfg Config, dao db.KVStore, opts ...Option) (Factory, error) { registry: protocol.NewRegistry(), saveHistory: cfg.Chain.EnableArchiveMode, protocolView: protocol.View{}, - workingsets: cache.NewThreadSafeLruCache(int(cfg.Chain.WorkingSetCacheSize)), + chamber: newWorkingsetChamber(int(cfg.Chain.WorkingSetCacheSize)), dao: dao, } @@ -236,7 +239,7 @@ func (sf *factory) Stop(ctx context.Context) error { if err := sf.dao.Stop(ctx); err != nil { return err } - sf.workingsets.Clear() + sf.chamber.Clear() return sf.lifecycle.OnStop(ctx) } @@ -251,6 +254,25 @@ func (sf *factory) Height() (uint64, error) { return byteutil.BytesToUint64(height), nil } +func (sf *factory) OngoingBlockHeight() uint64 { + return sf.chamber.OngoingBlockHeight() +} + +func (sf *factory) PendingBlockHeader(height uint64) (*block.Header, error) { + if h := sf.chamber.GetBlockHeader(height); h != nil { + return h, nil + } + return nil, errors.Errorf("pending block %d not exist", height) +} + +func (sf *factory) PutBlockHeader(header *block.Header) { + sf.chamber.PutBlockHeader(header) +} + +func (sf *factory) CancelBlock(height uint64) { + sf.chamber.AbandonWorkingSets(height) +} + func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSet, error) { span := tracer.SpanFromContext(ctx) span.AddEvent("factory.newWorkingSet") @@ -269,7 +291,11 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe if err != nil { return nil, err } - return sf.createSfWorkingSet(ctx, height, store) + var parent *workingSet + if height > 0 { + parent = sf.chamber.GetWorkingSet(height - 1) + } + return sf.createSfWorkingSet(ctx, height, store, parent) } func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*workingSet, error) { @@ -290,10 +316,10 @@ func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*w if err != nil { return nil, err } - return sf.createSfWorkingSet(ctx, height, store) + return sf.createSfWorkingSet(ctx, height, store, nil) } -func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore) (*workingSet, error) { +func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore, parent *workingSet) (*workingSet, error) { if err := store.Start(ctx); err != nil { return nil, err } @@ -308,7 +334,7 @@ func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store } } } - return newWorkingSet(height, store), nil + return newWorkingSet(height, store, parent), nil } func (sf *factory) flusherOptions(preEaster bool) []db.KVStoreFlusherOption { @@ -356,7 +382,7 @@ func (sf *factory) Validate(ctx context.Context, blk *block.Block) error { if err := ws.ValidateBlock(ctx, blk); err != nil { return errors.Wrap(err, "failed to validate block with workingset in factory") } - sf.putIntoWorkingSets(key, ws) + sf.chamber.PutWorkingSet(key, ws) } receipts, err := ws.Receipts() if err != nil { @@ -374,7 +400,7 @@ func (sf *factory) NewBlockBuilder( ) (*block.Builder, error) { sf.mutex.Lock() ctx = protocol.WithRegistry(ctx, sf.registry) - ws, err := sf.newWorkingSet(ctx, sf.currentChainHeight+1) + ws, err := sf.newWorkingSet(ctx, sf.chamber.OngoingBlockHeight()+1) sf.mutex.Unlock() if err != nil { return nil, errors.Wrap(err, "Failed to obtain working set from state factory") @@ -398,7 +424,7 @@ func (sf *factory) NewBlockBuilder( blkCtx := protocol.MustGetBlockCtx(ctx) key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String()) - sf.putIntoWorkingSets(key, ws) + sf.chamber.PutWorkingSet(key, ws) return blkBuilder, nil } @@ -434,19 +460,34 @@ func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64, preact } // PutBlock persists all changes in RunActions() into the DB -func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error { +func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) (err error) { timer := sf.timerFactory.NewTimer("Commit") - defer timer.End() + var ( + ws *workingSet + isExist bool + ) + defer func() { + timer.End() + if err != nil { + // abandon current workingset, and all pending workingsets beyond current height + ws.abandon() + sf.chamber.AbandonWorkingSets(ws.height) + } + }() producer := blk.PublicKey().Address() if producer == nil { return errors.New("failed to get address") } ctx = protocol.WithRegistry(ctx, sf.registry) key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress()) - ws, isExist, err := sf.getFromWorkingSets(ctx, key) + ws, isExist, err = sf.getFromWorkingSets(ctx, key) if err != nil { - return err + return } + if err = ws.verifyParent(); err != nil { + return + } + ws.detachParent() if !isExist { // regenerate workingset if !sf.skipBlockValidationOnPut { @@ -456,14 +497,14 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error { } if err != nil { log.L().Error("Failed to update state.", zap.Error(err)) - return err + return } } sf.mutex.Lock() defer sf.mutex.Unlock() receipts, err := ws.Receipts() if err != nil { - return err + return } blk.Receipts = receipts h, _ := ws.Height() @@ -475,18 +516,17 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error { ) } - if err := ws.Commit(ctx); err != nil { - return err + if err = ws.Commit(ctx); err != nil { + return } rh, err := sf.dao.Get(ArchiveTrieNamespace, []byte(ArchiveTrieRootKey)) if err != nil { - return err + return } - if err := sf.twoLayerTrie.SetRootHash(rh); err != nil { - return err + if err = sf.twoLayerTrie.SetRootHash(rh); err != nil { + return } sf.currentChainHeight = h - return nil } @@ -578,12 +618,9 @@ func (sf *factory) createGenesisStates(ctx context.Context) error { func (sf *factory) getFromWorkingSets(ctx context.Context, key hash.Hash256) (*workingSet, bool, error) { sf.mutex.RLock() defer sf.mutex.RUnlock() - if data, ok := sf.workingsets.Get(key); ok { - if ws, ok := data.(*workingSet); ok { - // if it is already validated, return workingset - return ws, true, nil - } - return nil, false, errors.New("type assertion failed to be WorkingSet") + if ws := sf.chamber.GetWorkingSet(key); ws != nil { + // if it is already validated, return workingset + return ws, true, nil } ws, err := sf.newWorkingSet(ctx, sf.currentChainHeight+1) if err != nil { @@ -591,9 +628,3 @@ func (sf *factory) getFromWorkingSets(ctx context.Context, key hash.Hash256) (*w } return ws, false, nil } - -func (sf *factory) putIntoWorkingSets(key hash.Hash256, ws *workingSet) { - sf.mutex.Lock() - defer sf.mutex.Unlock() - sf.workingsets.Add(key, ws) -} diff --git a/state/factory/minter.go b/state/factory/minter.go index ef5b301d64..0de4be4740 100644 --- a/state/factory/minter.go +++ b/state/factory/minter.go @@ -49,3 +49,19 @@ func (m *minter) NewBlockBuilder(ctx context.Context, sign func(action.Envelope) } return m.f.NewBlockBuilder(ctx, m.ap, sign) } + +func (m *minter) OngoingBlockHeight() uint64 { + return m.f.OngoingBlockHeight() +} + +func (m *minter) PendingBlockHeader(height uint64) (*block.Header, error) { + return m.f.PendingBlockHeader(height) +} + +func (m *minter) PutBlockHeader(header *block.Header) { + m.f.PutBlockHeader(header) +} + +func (m *minter) CancelBlock(height uint64) { + m.f.CancelBlock(height) +} diff --git a/state/factory/statedb.go b/state/factory/statedb.go index 03634eb060..2b394bbbec 100644 --- a/state/factory/statedb.go +++ b/state/factory/statedb.go @@ -15,7 +15,6 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" - "github.com/iotexproject/go-pkgs/cache" "github.com/iotexproject/go-pkgs/hash" "github.com/iotexproject/iotex-core/v2/action" @@ -49,7 +48,7 @@ type ( registry *protocol.Registry dao daoRetrofitter timerFactory *prometheustimer.TimerFactory - workingsets cache.LRUCache // lru cache for workingsets + chamber WorkingSetChamber protocolView protocol.View skipBlockValidationOnPut bool ps *patchStore @@ -86,7 +85,7 @@ func SkipBlockValidationStateDBOption() StateDBOption { // DisableWorkingSetCacheOption disable workingset cache func DisableWorkingSetCacheOption() StateDBOption { return func(sdb *stateDB, cfg *Config) error { - sdb.workingsets = cache.NewDummyLruCache() + sdb.chamber = newWorkingsetChamber(8) return nil } } @@ -98,7 +97,7 @@ func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, err currentChainHeight: 0, registry: protocol.NewRegistry(), protocolView: protocol.View{}, - workingsets: cache.NewThreadSafeLruCache(int(cfg.Chain.WorkingSetCacheSize)), + chamber: newWorkingsetChamber(int(cfg.Chain.WorkingSetCacheSize)), } for _, opt := range opts { if err := opt(&sdb, &cfg); err != nil { @@ -165,7 +164,7 @@ func (sdb *stateDB) Start(ctx context.Context) error { func (sdb *stateDB) Stop(ctx context.Context) error { sdb.mutex.Lock() defer sdb.mutex.Unlock() - sdb.workingsets.Clear() + sdb.chamber.Clear() return sdb.dao.Stop(ctx) } @@ -176,6 +175,25 @@ func (sdb *stateDB) Height() (uint64, error) { return sdb.dao.getHeight() } +func (sdb *stateDB) OngoingBlockHeight() uint64 { + return sdb.chamber.OngoingBlockHeight() +} + +func (sdb *stateDB) PendingBlockHeader(height uint64) (*block.Header, error) { + if h := sdb.chamber.GetBlockHeader(height); h != nil { + return h, nil + } + return nil, errors.Errorf("pending block %d not exist", height) +} + +func (sdb *stateDB) PutBlockHeader(header *block.Header) { + sdb.chamber.PutBlockHeader(header) +} + +func (sdb *stateDB) CancelBlock(height uint64) { + sdb.chamber.AbandonWorkingSets(height) +} + func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingSet, error) { g := genesis.MustExtractGenesisContext(ctx) flusher, err := db.NewKVStoreFlusher( @@ -197,8 +215,11 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS if err := store.Start(ctx); err != nil { return nil, err } - - return newWorkingSet(height, store), nil + var parent *workingSet + if height > 0 { + parent = sdb.chamber.GetWorkingSet(height - 1) + } + return newWorkingSet(height, store, parent), nil } func (sdb *stateDB) Register(p protocol.Protocol) error { @@ -216,7 +237,7 @@ func (sdb *stateDB) Validate(ctx context.Context, blk *block.Block) error { if err = ws.ValidateBlock(ctx, blk); err != nil { return errors.Wrap(err, "failed to validate block with workingset in statedb") } - sdb.workingsets.Add(key, ws) + sdb.chamber.PutWorkingSet(key, ws) } receipts, err := ws.Receipts() if err != nil { @@ -234,7 +255,7 @@ func (sdb *stateDB) NewBlockBuilder( ) (*block.Builder, error) { ctx = protocol.WithRegistry(ctx, sdb.registry) sdb.mutex.RLock() - currHeight := sdb.currentChainHeight + currHeight := sdb.chamber.OngoingBlockHeight() sdb.mutex.RUnlock() ws, err := sdb.newWorkingSet(ctx, currHeight+1) if err != nil { @@ -259,7 +280,7 @@ func (sdb *stateDB) NewBlockBuilder( blkCtx := protocol.MustGetBlockCtx(ctx) key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String()) - sdb.workingsets.Add(key, ws) + sdb.chamber.PutWorkingSet(key, ws) return blkBuilder, nil } @@ -287,21 +308,36 @@ func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64, preac } // PutBlock persists all changes in RunActions() into the DB -func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error { +func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) (err error) { sdb.mutex.Lock() timer := sdb.timerFactory.NewTimer("Commit") sdb.mutex.Unlock() - defer timer.End() + var ( + ws *workingSet + isExist bool + ) + defer func() { + timer.End() + if err != nil { + // abandon current workingset, and all pending workingsets beyond current height + ws.abandon() + sdb.chamber.AbandonWorkingSets(ws.height) + } + }() producer := blk.PublicKey().Address() if producer == nil { return errors.New("failed to get address") } ctx = protocol.WithRegistry(ctx, sdb.registry) key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress()) - ws, isExist, err := sdb.getFromWorkingSets(ctx, key) + ws, isExist, err = sdb.getFromWorkingSets(ctx, key) if err != nil { - return err + return } + if err = ws.verifyParent(); err != nil { + return + } + ws.detachParent() if !isExist { if !sdb.skipBlockValidationOnPut { err = ws.ValidateBlock(ctx, blk) @@ -310,14 +346,14 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error { } if err != nil { log.L().Error("Failed to update state.", zap.Error(err)) - return err + return } } sdb.mutex.Lock() defer sdb.mutex.Unlock() receipts, err := ws.Receipts() if err != nil { - return err + return } blk.Receipts = receipts h, _ := ws.Height() @@ -329,8 +365,8 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error { ) } - if err := ws.Commit(ctx); err != nil { - return err + if err = ws.Commit(ctx); err != nil { + return } sdb.currentChainHeight = h return nil @@ -430,16 +466,14 @@ func (sdb *stateDB) createGenesisStates(ctx context.Context) error { // getFromWorkingSets returns (workingset, true) if it exists in a cache, otherwise generates new workingset and return (ws, false) func (sdb *stateDB) getFromWorkingSets(ctx context.Context, key hash.Hash256) (*workingSet, bool, error) { - if data, ok := sdb.workingsets.Get(key); ok { - if ws, ok := data.(*workingSet); ok { - // if it is already validated, return workingset - return ws, true, nil - } - return nil, false, errors.New("type assertion failed to be WorkingSet") + if ws := sdb.chamber.GetWorkingSet(key); ws != nil { + // if it is already validated, return workingset + return ws, true, nil + } sdb.mutex.RLock() currHeight := sdb.currentChainHeight sdb.mutex.RUnlock() - tx, err := sdb.newWorkingSet(ctx, currHeight+1) - return tx, false, err + ws, err := sdb.newWorkingSet(ctx, currHeight+1) + return ws, false, err } diff --git a/state/factory/workingset.go b/state/factory/workingset.go index 9c9b3f6352..d33d2b8484 100644 --- a/state/factory/workingset.go +++ b/state/factory/workingset.go @@ -7,8 +7,10 @@ package factory import ( "context" + "fmt" "math/big" "sort" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -64,17 +66,20 @@ type ( height uint64 store workingSetStore finalized bool + abandoned atomic.Bool dock protocol.Dock txValidator *protocol.GenericValidator receipts []*action.Receipt + parent *workingSet } ) -func newWorkingSet(height uint64, store workingSetStore) *workingSet { +func newWorkingSet(height uint64, store workingSetStore, parent *workingSet) *workingSet { ws := &workingSet{ height: height, store: store, dock: protocol.NewDock(), + parent: parent, } ws.txValidator = protocol.NewGenericValidator(ws, accountutil.AccountState) return ws @@ -114,6 +119,26 @@ func (ws *workingSet) validate(ctx context.Context) error { return nil } +func (ws *workingSet) isAbandoned() bool { + return ws.abandoned.Load() +} + +func (ws *workingSet) abandon() { + ws.abandoned.Store(true) +} + +func (ws *workingSet) verifyParent() error { + if ws.parent != nil && ws.parent.isAbandoned() { + ws.abandon() + return errors.New("workingset abandoned") + } + return nil +} + +func (ws *workingSet) detachParent() { + ws.parent = nil +} + func withActionCtx(ctx context.Context, selp *action.SealedEnvelope) (context.Context, error) { var actionCtx protocol.ActionCtx var err error @@ -197,6 +222,7 @@ func (ws *workingSet) runAction( return nil, err } } + fmt.Printf("action %x added to block\n", selpHash) return receipt, nil } } @@ -292,8 +318,15 @@ func (ws *workingSet) freshAccountConversion(ctx context.Context, actCtx *protoc return nil } +func (ws *workingSet) getDirty(ns string, key []byte) ([]byte, bool) { + return ws.store.GetDirty(ns, key) +} + // Commit persists all changes in RunActions() into the DB func (ws *workingSet) Commit(ctx context.Context) error { + if err := ws.verifyParent(); err != nil { + return err + } if err := protocolPreCommit(ctx, ws); err != nil { return err } @@ -318,6 +351,14 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64 if cfg.Keys != nil { return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet") } + if ws.parent != nil { + if value, dirty := ws.getDirty(cfg.Namespace, cfg.Key); dirty { + return ws.height, state.Deserialize(s, value) + } + if value, dirty := ws.parent.getDirty(cfg.Namespace, cfg.Key); dirty { + return ws.height, state.Deserialize(s, value) + } + } value, err := ws.store.Get(cfg.Namespace, cfg.Key) if err != nil { return ws.height, err @@ -333,6 +374,7 @@ func (ws *workingSet) States(opts ...protocol.StateOption) (uint64, state.Iterat if cfg.Key != nil { return 0, nil, errors.Wrap(ErrNotSupported, "Read states with key option has not been implemented yet") } + // TODO: check parent keys, values, err := ws.store.States(cfg.Namespace, cfg.Keys) if err != nil { return 0, nil, err @@ -480,6 +522,9 @@ func (ws *workingSet) Process(ctx context.Context, actions []*action.SealedEnvel } func (ws *workingSet) processWithCorrectOrder(ctx context.Context, actions []*action.SealedEnvelope) error { + if err := ws.verifyParent(); err != nil { + return err + } if err := ws.validate(ctx); err != nil { return err } @@ -643,7 +688,9 @@ func (ws *workingSet) pickAndRunActions( if err := ws.txValidator.ValidateWithState(ctxWithBlockContext, nextAction); err != nil { log.L().Debug("failed to ValidateWithState", zap.Uint64("height", ws.height), zap.Error(err)) ap.DeleteAction(nextAction.SenderAddress()) - actionIterator.PopAccount() + if errors.Cause(err) != action.ErrNonceTooLow { + actionIterator.PopAccount() + } continue } actionCtx, err := withActionCtx(ctxWithBlockContext, nextAction) diff --git a/state/factory/workingset_chamber.go b/state/factory/workingset_chamber.go new file mode 100644 index 0000000000..67c1cf7b76 --- /dev/null +++ b/state/factory/workingset_chamber.go @@ -0,0 +1,83 @@ +// Copyright (c) 2024 IoTeX Foundation +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +package factory + +import ( + "github.com/iotexproject/go-pkgs/cache" + "github.com/iotexproject/go-pkgs/hash" + + "github.com/iotexproject/iotex-core/v2/blockchain/block" +) + +type ( + WorkingSetChamber interface { + GetWorkingSet(any) *workingSet + PutWorkingSet(hash.Hash256, *workingSet) + AbandonWorkingSets(uint64) + OngoingBlockHeight() uint64 + GetBlockHeader(uint64) *block.Header + PutBlockHeader(*block.Header) + Clear() + } + + chamber struct { + height uint64 + ws cache.LRUCache + header cache.LRUCache + } +) + +func newWorkingsetChamber(size int) WorkingSetChamber { + return &chamber{ + ws: cache.NewThreadSafeLruCache(size), + header: cache.NewThreadSafeLruCache(size), + } +} + +func (cmb *chamber) GetWorkingSet(key any) *workingSet { + if data, ok := cmb.ws.Get(key); ok { + return data.(*workingSet) + } + return nil +} + +func (cmb *chamber) PutWorkingSet(key hash.Hash256, ws *workingSet) { + cmb.ws.Add(key, ws) + cmb.ws.Add(ws.height, ws) + if ws.height > cmb.height { + cmb.height = ws.height + } +} + +func (cmb *chamber) AbandonWorkingSets(h uint64) { + for ; ; h++ { + if ws := cmb.GetWorkingSet(h); ws != nil { + cmb.ws.Remove(h) + } else { + break + } + } +} + +func (cmb *chamber) OngoingBlockHeight() uint64 { + return cmb.height +} + +func (cmb *chamber) GetBlockHeader(height uint64) *block.Header { + if data, ok := cmb.header.Get(height); ok { + return data.(*block.Header) + } + return nil +} + +func (cmb *chamber) PutBlockHeader(h *block.Header) { + cmb.header.Add(h.Height(), h) +} + +func (cmb *chamber) Clear() { + cmb.ws.Clear() + cmb.header.Clear() +} diff --git a/state/factory/workingsetstore.go b/state/factory/workingsetstore.go index a6445df079..a15b17236f 100644 --- a/state/factory/workingsetstore.go +++ b/state/factory/workingsetstore.go @@ -15,6 +15,7 @@ import ( type ( workingSetStore interface { db.KVStoreBasic + GetDirty(string, []byte) ([]byte, bool) Commit() error States(string, [][]byte) ([][]byte, [][]byte, error) Digest() hash.Hash256 @@ -39,6 +40,10 @@ func (store *workingSetStoreCommon) WriteView(name string, value interface{}) er return store.view.Write(name, value) } +func (store *workingSetStoreCommon) GetDirty(ns string, key []byte) ([]byte, bool) { + return store.flusher.KVStoreWithBuffer().GetDirty(ns, key) +} + func (store *workingSetStoreCommon) Put(ns string, key []byte, value []byte) error { store.flusher.KVStoreWithBuffer().MustPut(ns, key, value) return nil diff --git a/tools/minicluster/minicluster.go b/tools/minicluster/minicluster.go index b9507339d7..2336fa53cd 100644 --- a/tools/minicluster/minicluster.go +++ b/tools/minicluster/minicluster.go @@ -124,6 +124,7 @@ func main() { config.System.SystemLogDBPath = systemLogDBPath config.Chain.ContractStakingIndexDBPath = contractStakingIndexDBPath config.Chain.BlobStoreDBPath = blobDBPath + config.ActPool.Store = nil if i == 0 { config.Network.BootstrapNodes = []string{} config.Network.MasterKey = "bootnode"