diff --git a/consensus/consensusfsm/chainedfsm.go b/consensus/consensusfsm/chainedfsm.go new file mode 100644 index 0000000000..c0532cff10 --- /dev/null +++ b/consensus/consensusfsm/chainedfsm.go @@ -0,0 +1,182 @@ +package consensusfsm + +import ( + "github.com/facebookgo/clock" + fsm "github.com/iotexproject/go-fsm" + "github.com/pkg/errors" +) + +const ( + // consensus states + SFinalized fsm.State = "S_FINALIZED" + SInvalid fsm.State = "S_INVALID" +) + +type ChainedConsensusFSM struct { + *ConsensusFSM +} + +func NewChainedConsensusFSM(ctx Context, clock clock.Clock) (*ChainedConsensusFSM, error) { + mm := &ConsensusFSM{ + evtq: make(chan *ConsensusEvent, ctx.EventChanSize()), + close: make(chan interface{}), + ctx: ctx, + clock: clock, + } + cm := &ChainedConsensusFSM{ + ConsensusFSM: mm, + } + b := fsm.NewBuilder(). + AddInitialState(sPrepare). + AddStates( + sAcceptBlockProposal, + sAcceptProposalEndorsement, + sAcceptLockEndorsement, + sAcceptPreCommitEndorsement, + SInvalid, + SFinalized, + ). + AddTransition(sPrepare, ePrepare, cm.prepare, []fsm.State{ + sPrepare, + sAcceptBlockProposal, + sAcceptPreCommitEndorsement, + }). + AddTransition( + sAcceptBlockProposal, + eReceiveBlock, + cm.onReceiveBlock, + []fsm.State{ + sAcceptBlockProposal, // proposed block invalid + sAcceptProposalEndorsement, // receive valid block, jump to next step + }). + AddTransition( + sAcceptBlockProposal, + eFailedToReceiveBlock, + cm.onFailedToReceiveBlock, + []fsm.State{ + sAcceptProposalEndorsement, // no valid block, jump to next step + }). + AddTransition( + sAcceptProposalEndorsement, + eReceiveProposalEndorsement, + cm.onReceiveProposalEndorsementInAcceptProposalEndorsementState, + []fsm.State{ + sAcceptProposalEndorsement, // not enough endorsements + sAcceptLockEndorsement, // enough endorsements + }). + AddTransition( + sAcceptProposalEndorsement, + eReceivePreCommitEndorsement, + cm.onReceiveProposalEndorsementInAcceptProposalEndorsementState, + []fsm.State{ + sAcceptProposalEndorsement, // not enough endorsements + sAcceptLockEndorsement, // enough endorsements + }). + AddTransition( + sAcceptProposalEndorsement, + eStopReceivingProposalEndorsement, + cm.onStopReceivingProposalEndorsement, + []fsm.State{ + sAcceptLockEndorsement, // timeout, jump to next step + }). + AddTransition( + sAcceptLockEndorsement, + eReceiveProposalEndorsement, + cm.onReceiveProposalEndorsementInAcceptLockEndorsementState, + []fsm.State{ + sAcceptLockEndorsement, + }, + ). + AddTransition( + sAcceptLockEndorsement, + eReceiveLockEndorsement, + cm.onReceiveLockEndorsement, + []fsm.State{ + sAcceptLockEndorsement, // not enough endorsements + sAcceptPreCommitEndorsement, // reach commit agreement, jump to next step + }). + AddTransition( + sAcceptLockEndorsement, + eReceivePreCommitEndorsement, + cm.onReceiveLockEndorsement, + []fsm.State{ + sAcceptLockEndorsement, // not enough endorsements + sAcceptPreCommitEndorsement, // reach commit agreement, jump to next step + }). + AddTransition( + sAcceptLockEndorsement, + eStopReceivingLockEndorsement, + cm.onStopReceivingLockEndorsement, + []fsm.State{ + SInvalid, // timeout, invalid + }). + AddTransition( + sAcceptPreCommitEndorsement, + eBroadcastPreCommitEndorsement, + cm.onBroadcastPreCommitEndorsement, + []fsm.State{ + sAcceptPreCommitEndorsement, + }). + AddTransition( + sAcceptPreCommitEndorsement, + eStopReceivingPreCommitEndorsement, + cm.onStopReceivingPreCommitEndorsement, + []fsm.State{ + SInvalid, + }). + AddTransition( + sAcceptPreCommitEndorsement, + eReceivePreCommitEndorsement, + cm.onReceivePreCommitEndorsement, + []fsm.State{ + sAcceptPreCommitEndorsement, + SFinalized, // reach consensus, start next epoch + }) + // Add the backdoor transition so that we could unit test the transition from any given state + for _, state := range consensusStates { + b = b.AddTransition(state, BackdoorEvent, cm.handleBackdoorEvt, consensusStates) + if state != sPrepare { + b = b.AddTransition(state, eCalibrate, cm.calibrate, []fsm.State{sPrepare, state}) + } + } + m, err := b.Build() + if err != nil { + return nil, errors.Wrap(err, "error when building the FSM") + } + cm.fsm = m + return cm, nil +} + +func (m *ChainedConsensusFSM) onStopReceivingLockEndorsement(evt fsm.Event) (fsm.State, error) { + m.ctx.Logger().Warn("Not enough lock endorsements") + + return m.Invalid() +} + +func (m *ChainedConsensusFSM) onStopReceivingPreCommitEndorsement(evt fsm.Event) (fsm.State, error) { + m.ctx.Logger().Warn("Not enough pre-commit endorsements") + + return m.Invalid() +} + +func (m *ChainedConsensusFSM) onReceivePreCommitEndorsement(evt fsm.Event) (fsm.State, error) { + cEvt, ok := evt.(*ConsensusEvent) + if !ok { + return sAcceptPreCommitEndorsement, errors.Wrap(ErrEvtCast, "failed to cast to consensus event") + } + committed, err := m.ctx.Commit(cEvt.Data()) + if err != nil || !committed { + return sAcceptPreCommitEndorsement, err + } + return SFinalized, nil +} + +func (m *ChainedConsensusFSM) Finalize() (fsm.State, error) { + m.ctx.Logger().Warn("Finalized") + return SFinalized, nil +} + +func (m *ChainedConsensusFSM) Invalid() (fsm.State, error) { + m.ctx.Logger().Warn("Invalid") + return SInvalid, nil +} diff --git a/consensus/consensusfsm/context.go b/consensus/consensusfsm/context.go index 2acbd6d866..e161b07910 100644 --- a/consensus/consensusfsm/context.go +++ b/consensus/consensusfsm/context.go @@ -25,6 +25,7 @@ type Context interface { Logger() *zap.Logger Height() uint64 + Number() uint32 NewConsensusEvent(fsm.EventType, interface{}) *ConsensusEvent NewBackdoorEvt(fsm.State) *ConsensusEvent diff --git a/consensus/consensusfsm/fsm.go b/consensus/consensusfsm/fsm.go index 7f02c92920..220faa2c12 100644 --- a/consensus/consensusfsm/fsm.go +++ b/consensus/consensusfsm/fsm.go @@ -305,6 +305,7 @@ func (m *ConsensusFSM) produce(evt *ConsensusEvent, delay time.Duration) { if evt == nil { return } + // m.ctx.Logger().Debug("produce event", zap.Any("event", evt.Type()), zap.Stack("stack")) _consensusEvtsMtc.WithLabelValues(string(evt.Type()), "produced").Inc() if delay > 0 { m.wg.Add(1) @@ -328,7 +329,7 @@ func (m *ConsensusFSM) handle(evt *ConsensusEvent) error { return nil } if m.ctx.IsFutureEvent(evt) { - m.ctx.Logger().Debug("future event", zap.Any("event", evt.Type())) + m.ctx.Logger().Debug("future event", zap.Any("event", evt.Type()), zap.Any("eventHeight", evt.Height()), zap.Any("eventRount", evt.Round())) // TODO: find a more appropriate delay m.produce(evt, m.ctx.UnmatchedEventInterval(evt.Height())) _consensusEvtsMtc.WithLabelValues(string(evt.Type()), "backoff").Inc() @@ -397,10 +398,10 @@ func (m *ConsensusFSM) calibrate(evt fsm.Event) (fsm.State, error) { func (m *ConsensusFSM) prepare(evt fsm.Event) (fsm.State, error) { if err := m.ctx.Prepare(); err != nil { - m.ctx.Logger().Error("Error during prepare", zap.Error(err)) + m.ctx.Logger().Error("Error during prepare", zap.Error(err), zap.Stack("stack")) return m.BackToPrepare(0) } - m.ctx.Logger().Debug("Start a new round") + m.ctx.Logger().Debug("Start a new round", zap.Stack("stack")) proposal, err := m.ctx.Proposal() if err != nil { m.ctx.Logger().Error("failed to generate block proposal", zap.Error(err)) diff --git a/consensus/scheme/rolldpos/chainedrolldpos.go b/consensus/scheme/rolldpos/chainedrolldpos.go new file mode 100644 index 0000000000..6a794c368b --- /dev/null +++ b/consensus/scheme/rolldpos/chainedrolldpos.go @@ -0,0 +1,272 @@ +package rolldpos + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/iotexproject/iotex-proto/golang/iotextypes" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/v2/blockchain/block" + "github.com/iotexproject/iotex-core/v2/consensus/consensusfsm" + "github.com/iotexproject/iotex-core/v2/consensus/scheme" + "github.com/iotexproject/iotex-core/v2/pkg/log" +) + +type ( + Round interface { + Start(ctx context.Context) error + Stop(ctx context.Context) error + Height() uint64 + RoundNum() uint32 + Handle(msg *iotextypes.ConsensusMessage) + Result() chan int + Calibrate(h uint64) + } + + ChainedRollDPoS struct { + rounds []Round + quit chan struct{} + builder *Builder + wg sync.WaitGroup + active bool + mutex sync.Mutex + } +) + +func NewChainedRollDPoS(builder *Builder) *ChainedRollDPoS { + return &ChainedRollDPoS{ + rounds: make([]Round, 0), + quit: make(chan struct{}), + builder: builder, + active: true, + } +} + +func (cr *ChainedRollDPoS) Start(ctx context.Context) error { + // create a new RollDPoS instance every 1s + cr.wg.Add(1) + go func() { + defer cr.wg.Done() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + counter := 0 + for { + select { + case <-cr.quit: + return + case <-ticker.C: + func() { + cr.mutex.Lock() + defer cr.mutex.Unlock() + if len(cr.rounds) < 4 && counter < 1000 { + log.L().Debug("new round started") + defer log.L().Debug("new round finished") + rs, err := cr.newRound() + if err != nil { + log.L().Error("Failed to create new round", zap.Error(err)) + return + } + ctx, cancel := context.WithCancel(context.Background()) + cr.RunRound(ctx, rs) + + for _, r := range cr.rounds { + if r.Height() == rs.Height() && r.RoundNum() == rs.RoundNum() { + log.L().Debug("round already exists", zap.Uint64("height", rs.Height()), zap.Uint32("round", rs.RoundNum())) + cancel() + return + } + } + cr.rounds = append(cr.rounds, rs) + counter++ + } + }() + } + } + }() + cr.wg.Add(1) + go func() { + defer cr.wg.Done() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-cr.quit: + return + case <-ticker.C: + cr.mutex.Lock() + fmt.Printf("\nrounds size: %d\n", len(cr.rounds)) + // for _, r := range cr.rounds { + // fmt.Printf("round height: %d, round num: %d\n", r.Height(), r.RoundNum()) + // } + fmt.Printf("\n") + cr.mutex.Unlock() + } + } + }() + return nil +} + +func (cr *ChainedRollDPoS) Stop(ctx context.Context) error { + close(cr.quit) + cr.wg.Wait() + return nil +} + +func (cr *ChainedRollDPoS) HandleConsensusMsg(msg *iotextypes.ConsensusMessage) error { + for _, r := range cr.rounds { + if r.Height() == msg.Height { + r.Handle(msg) + } + } + return nil +} + +func (cr *ChainedRollDPoS) Calibrate(h uint64) { + for _, r := range cr.rounds { + r.Calibrate(h) + } +} + +func (cr *ChainedRollDPoS) ValidateBlockFooter(*block.Block) error { + return nil +} + +func (cr *ChainedRollDPoS) Metrics() (scheme.ConsensusMetrics, error) { + return scheme.ConsensusMetrics{}, nil +} + +func (cr *ChainedRollDPoS) Activate(b bool) { + cr.active = b +} + +func (cr *ChainedRollDPoS) Active() bool { + return cr.active +} + +func (cr *ChainedRollDPoS) removeRound(round Round) { + for i, r := range cr.rounds { + if r.Height() == round.Height() && r.RoundNum() == round.RoundNum() { + cr.rounds = append(cr.rounds[:i], cr.rounds[i+1:]...) + log.L().Debug("round removed", zap.Uint64("height", round.Height()), zap.Uint32("round", round.RoundNum())) + break + } + } +} + +func (cr *ChainedRollDPoS) RunRound(ctx context.Context, r Round) { + // rolldpos round loop, but ternimate when round status is finalized or invalid + err := r.Start(ctx) + if err != nil { + log.L().Error("Failed to start round", zap.Error(err)) + return + } + go func() { + defer func() { + err := r.Stop(ctx) + if err != nil { + log.L().Error("Failed to stop round", zap.Error(err)) + } + cr.mutex.Lock() + cr.removeRound(r) + cr.mutex.Unlock() + }() + + var res int + select { + case res = <-r.Result(): + case <-ctx.Done(): + log.L().Info("round canceled", zap.Error(ctx.Err()), zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + return + } + switch res { + case 1: + log.L().Info("round finished", zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + case 0: + log.L().Info("round invalid", zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + default: + log.L().Info("round terminated", zap.Int("result", res), zap.Uint64("height", r.Height()), zap.Uint32("round", r.RoundNum())) + } + }() +} + +func (cr *ChainedRollDPoS) newRound() (Round, error) { + dpos, err := cr.builder.BuildV2() + if err != nil { + return nil, err + } + return &roundV2{ + dpos: dpos, + res: make(chan int, 1), + quit: make(chan struct{}), + }, nil +} + +type roundV2 struct { + dpos *RollDPoS + res chan int + quit chan struct{} + wg sync.WaitGroup +} + +func (r *roundV2) Start(ctx context.Context) error { + err := r.dpos.Start(ctx) + if err != nil { + return err + } + r.wg.Add(1) + go func() { + defer r.wg.Done() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-r.quit: + return + case <-ticker.C: + switch r.dpos.cfsm.CurrentState() { + case consensusfsm.SFinalized: + r.res <- 1 + return + case consensusfsm.SInvalid: + r.dpos.ctx.Chain().DropDraftBlock(r.Height()) + r.res <- 0 + return + default: + } + } + } + }() + return nil +} + +func (r *roundV2) Stop(ctx context.Context) error { + close(r.quit) + r.wg.Wait() + return r.dpos.Stop(ctx) +} + +func (r *roundV2) Height() uint64 { + return r.dpos.ctx.Height() +} + +func (r *roundV2) RoundNum() uint32 { + return r.dpos.ctx.Number() +} + +func (r *roundV2) Handle(msg *iotextypes.ConsensusMessage) { + err := r.dpos.HandleConsensusMsg(msg) + if err != nil { + log.L().Error("Failed to handle consensus message", zap.Error(err)) + } +} + +func (r *roundV2) Result() chan int { + return r.res +} + +func (r *roundV2) Calibrate(h uint64) { + r.dpos.Calibrate(h) +} diff --git a/consensus/scheme/rolldpos/chainedrolldpos_test.go b/consensus/scheme/rolldpos/chainedrolldpos_test.go new file mode 100644 index 0000000000..1f9263aa23 --- /dev/null +++ b/consensus/scheme/rolldpos/chainedrolldpos_test.go @@ -0,0 +1,228 @@ +package rolldpos + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/facebookgo/clock" + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/iotexproject/iotex-core/v2/action/protocol/rolldpos" + "github.com/iotexproject/iotex-core/v2/blockchain" + "github.com/iotexproject/iotex-core/v2/blockchain/block" + "github.com/iotexproject/iotex-core/v2/blockchain/genesis" + "github.com/iotexproject/iotex-core/v2/consensus/consensusfsm" + "github.com/iotexproject/iotex-core/v2/db" + "github.com/iotexproject/iotex-core/v2/pkg/log" + "github.com/iotexproject/iotex-core/v2/test/identityset" + "github.com/iotexproject/iotex-core/v2/test/mock/mock_blockchain" + "github.com/iotexproject/iotex-core/v2/testutil" +) + +type mockChain struct { + draft uint64 + finalized uint64 + finalizedHash map[string]bool + blocks map[uint64]*block.Block + draftBlocks map[uint64]*block.Block + mutex sync.Mutex +} + +func (m *mockChain) Height() uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.draft +} + +func (m *mockChain) NewDraft() uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + m.draft++ + return m.draft +} + +func (m *mockChain) NewDraftBlock(blk *block.Block) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.draftBlocks[blk.Height()] = blk +} + +func (m *mockChain) Finalized(blkHash []byte) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + if _, ok := m.finalizedHash[string(blkHash)]; ok { + return true + } + return false +} + +func (m *mockChain) FinalizeBlock(blk *block.Block) error { + m.mutex.Lock() + defer m.mutex.Unlock() + height := blk.Height() + hash := blk.HashBlock() + if height != m.finalized+1 { + return errors.Errorf("cannot finalize block %d, current finalized block is %d", height, m.finalized) + } + m.finalized = height + m.finalizedHash[string(hash[:])] = true + m.blocks[height] = blk + delete(m.draftBlocks, height) + return nil +} + +func (m *mockChain) Block(height uint64) (*block.Block, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + blk, ok := m.blocks[height] + if !ok { + blk, ok = m.draftBlocks[height] + if !ok { + return nil, errors.Errorf("block %d not found", height) + } + } + return blk, nil +} + +func (m *mockChain) TipHeight() uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.finalized +} + +func (m *mockChain) Cancel(height uint64) { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := height; i <= m.draft; i++ { + delete(m.draftBlocks, i) + } + m.draft = height - 1 +} + +func TestChainedRollDPoS(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + r := require.New(t) + g := genesis.Default + g.NumCandidateDelegates = 1 + g.NumDelegates = 1 + builderCfg := BuilderConfig{ + Chain: blockchain.DefaultConfig, + Consensus: DefaultConfig, + DardanellesUpgrade: consensusfsm.DefaultDardanellesUpgradeConfig, + DB: db.DefaultConfig, + Genesis: g, + SystemActive: true, + } + consensusdb, err := testutil.PathOfTempFile("consensus") + r.NoError(err) + defer func() { + testutil.CleanupPath(consensusdb) + }() + + builderCfg.Consensus.ConsensusDBPath = consensusdb + builderCfg.Consensus.Delay = 400 * time.Millisecond + builderCfg.Consensus.ToleratedOvertime = 400 * time.Millisecond + builderCfg.Consensus.FSM.AcceptBlockTTL = 1 * time.Second + builderCfg.Consensus.FSM.AcceptProposalEndorsementTTL = 1 * time.Second + builderCfg.Consensus.FSM.AcceptLockEndorsementTTL = 1 * time.Second + builderCfg.Consensus.FSM.CommitTTL = 1 * time.Second + builderCfg.Consensus.FSM.UnmatchedEventTTL = 1 * time.Second + builderCfg.Consensus.FSM.UnmatchedEventInterval = 100 * time.Millisecond + builderCfg.Consensus.FSM.EventChanSize = 10000 + builderCfg.Genesis.BlockInterval = 5 * time.Second + builderCfg.DardanellesUpgrade.AcceptBlockTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.AcceptLockEndorsementTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.AcceptProposalEndorsementTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.BlockInterval = 5 * time.Second + builderCfg.DardanellesUpgrade.CommitTTL = 1 * time.Second + builderCfg.DardanellesUpgrade.Delay = 400 * time.Millisecond + builderCfg.DardanellesUpgrade.UnmatchedEventInterval = 100 * time.Millisecond + builderCfg.DardanellesUpgrade.UnmatchedEventTTL = 1 * time.Second + + chain := &mockChain{ + finalizedHash: make(map[string]bool), + blocks: make(map[uint64]*block.Block), + draftBlocks: make(map[uint64]*block.Block), + } + bc := mock_blockchain.NewMockBlockchain(ctrl) + bc.EXPECT().EvmNetworkID().Return(uint32(1)).AnyTimes() + bc.EXPECT().Genesis().Return(g).AnyTimes() + bc.EXPECT().MintNewBlock(gomock.Any()).DoAndReturn(func(timestamp time.Time) (*block.Block, error) { + b := block.NewBuilder(block.NewRunnableActionsBuilder().Build()) + b.SetHeight(chain.NewDraft()) + b.SetTimestamp(timestamp) + blk, err := b.SignAndBuild(identityset.PrivateKey(1)) + if err != nil { + return nil, err + } + log.L().Debug("mint new block", zap.Time("ts", timestamp), zap.Uint64("height", blk.Height())) + chain.NewDraftBlock(&blk) + return &blk, nil + }).AnyTimes() + bc.EXPECT().ValidateBlock(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + bc.EXPECT().CommitBlock(gomock.Any()).DoAndReturn(func(blk *block.Block) error { + return chain.FinalizeBlock(blk) + }).AnyTimes() + bc.EXPECT().ChainAddress().Return(builderCfg.Chain.Address).AnyTimes() + bc.EXPECT().BlockFooterByHeight(gomock.Any()).DoAndReturn(func(height uint64) (*block.Footer, error) { + blk, err := chain.Block(height) + if err != nil { + return nil, errors.Wrapf(err, "block %d not found", height) + } + return &blk.Footer, nil + }).AnyTimes() + bc.EXPECT().BlockHeaderByHeight(gomock.Any()).DoAndReturn(func(height uint64) (*block.Header, error) { + blk, err := chain.Block(height) + if err != nil { + return nil, errors.Wrapf(err, "block %d not found", height) + } + return &blk.Header, nil + }).AnyTimes() + bc.EXPECT().TipHeight().DoAndReturn(func() uint64 { + return chain.TipHeight() + }).AnyTimes() + clock := clock.New() + broadcastHandler := func(msg proto.Message) error { return nil } + delegatesByEpochFunc := func(uint64) ([]string, error) { + delegates := []string{} + for i := 1; i <= int(g.NumCandidateDelegates); i++ { + delegates = append(delegates, identityset.Address(i).String()) + } + return delegates, nil + } + proposersByEpochFunc := func(uint64) ([]string, error) { + return []string{ + identityset.Address(1).String(), + }, nil + } + rp := rolldpos.NewProtocol( + g.NumCandidateDelegates, + g.NumDelegates, + g.NumSubEpochs, + ) + bd := NewRollDPoSBuilder(). + SetAddr(identityset.Address(1).String()). + SetPriKey(identityset.PrivateKey(1)). + SetConfig(builderCfg). + SetChainManager(NewChainManager2(bc, chain)). + SetBlockDeserializer(block.NewDeserializer(bc.EvmNetworkID())). + SetClock(clock). + SetBroadcast(broadcastHandler). + SetDelegatesByEpochFunc(delegatesByEpochFunc). + SetProposersByEpochFunc(proposersByEpochFunc). + RegisterProtocol(rp) + crdpos := NewChainedRollDPoS(bd) + r.NoError(crdpos.Start(context.Background())) + defer func() { + r.NoError(crdpos.Stop(context.Background())) + }() + + time.Sleep(60 * time.Second) +} diff --git a/consensus/scheme/rolldpos/chainedrolldposctx.go b/consensus/scheme/rolldpos/chainedrolldposctx.go new file mode 100644 index 0000000000..3cd5b1cdce --- /dev/null +++ b/consensus/scheme/rolldpos/chainedrolldposctx.go @@ -0,0 +1,41 @@ +package rolldpos + +import ( + "go.uber.org/zap" +) + +type chainedRollDPoSCtx struct { + *rollDPoSCtx +} + +func NewChainedRollDPoSCtx( + ctx *rollDPoSCtx, +) (RDPoSCtx, error) { + return &chainedRollDPoSCtx{ + rollDPoSCtx: ctx, + }, nil +} + +func (cctx *chainedRollDPoSCtx) Prepare() error { + ctx := cctx.rollDPoSCtx + ctx.mutex.Lock() + defer ctx.mutex.Unlock() + height := cctx.chain.DraftHeight() + 1 + newRound, err := ctx.roundCalc.UpdateRound(ctx.round, height, ctx.BlockInterval(height), ctx.clock.Now(), ctx.toleratedOvertime) + if err != nil { + return err + } + ctx.logger().Debug( + "new round", + zap.Uint64("newheight", newRound.height), + zap.String("newts", ctx.clock.Now().String()), + zap.Uint64("newepoch", newRound.epochNum), + zap.Uint64("newepochStartHeight", newRound.epochStartHeight), + zap.Uint32("newround", newRound.roundNum), + zap.String("newroundStartTime", newRound.roundStartTime.String()), + ) + ctx.round = newRound + _consensusHeightMtc.WithLabelValues().Set(float64(ctx.round.height)) + _timeSlotMtc.WithLabelValues().Set(float64(ctx.round.roundNum)) + return nil +} diff --git a/consensus/scheme/rolldpos/endorsementmanager.go b/consensus/scheme/rolldpos/endorsementmanager.go index f977c038d3..0fba73e66a 100644 --- a/consensus/scheme/rolldpos/endorsementmanager.go +++ b/consensus/scheme/rolldpos/endorsementmanager.go @@ -214,19 +214,35 @@ func (bc *blockEndorsementCollection) Endorsements( type endorsementManager struct { isMajorityFunc EndorsedByMajorityFunc - eManagerDB db.KVStore + eManagerDB *roundStore collections map[string]*blockEndorsementCollection cachedMintedBlk *block.Block + deserializer *block.Deserializer } -func newEndorsementManager(eManagerDB db.KVStore, deserializer *block.Deserializer) (*endorsementManager, error) { +func newEndorsementManager(eManagerDB *roundStore, deserializer *block.Deserializer) (*endorsementManager, error) { if eManagerDB == nil { return &endorsementManager{ eManagerDB: nil, collections: map[string]*blockEndorsementCollection{}, cachedMintedBlk: nil, + deserializer: deserializer, }, nil } + manager := &endorsementManager{ + eManagerDB: eManagerDB, + collections: map[string]*blockEndorsementCollection{}, + cachedMintedBlk: nil, + deserializer: deserializer, + } + if err := manager.load(); err != nil { + return nil, err + } + return manager, nil +} + +func (m *endorsementManager) load() error { + eManagerDB := m.eManagerDB bytes, err := eManagerDB.Get(_eManagerNS, _statusKey) switch errors.Cause(err) { case nil: @@ -234,23 +250,19 @@ func newEndorsementManager(eManagerDB db.KVStore, deserializer *block.Deserializ manager := &endorsementManager{eManagerDB: eManagerDB} managerProto := &endorsementpb.EndorsementManager{} if err = proto.Unmarshal(bytes, managerProto); err != nil { - return nil, err + return err } - if err = manager.fromProto(managerProto, deserializer); err != nil { - return nil, err + if err = manager.fromProto(managerProto, m.deserializer); err != nil { + return err } manager.eManagerDB = eManagerDB - return manager, nil + return nil case db.ErrNotExist: // If DB doesn't have any information log.L().Info("First initializing DB") - return &endorsementManager{ - eManagerDB: eManagerDB, - collections: map[string]*blockEndorsementCollection{}, - cachedMintedBlk: nil, - }, nil + return nil default: - return nil, err + return err } } @@ -406,6 +418,15 @@ func (m *endorsementManager) Cleanup(timestamp time.Time) error { return nil } +func (m *endorsementManager) WithRound(height uint64, roundNum uint32) error { + m.Cleanup(time.Time{}) + if m.eManagerDB == nil { + return nil + } + m.eManagerDB.ChangeRound(height, roundNum) + return m.load() +} + func (m *endorsementManager) Log( logger *zap.Logger, delegates []string, diff --git a/consensus/scheme/rolldpos/rolldpos.go b/consensus/scheme/rolldpos/rolldpos.go index 5ca1cdf3fe..029156b271 100644 --- a/consensus/scheme/rolldpos/rolldpos.go +++ b/consensus/scheme/rolldpos/rolldpos.go @@ -62,10 +62,33 @@ type ( TipHeight() uint64 // ChainAddress returns chain address on parent chain, the root chain return empty. ChainAddress() string + // DraftHeight returns draft height + DraftHeight() uint64 + DropDraftBlock(uint64) + } + + workingSet interface { + Height() uint64 + Block(uint64) (*block.Block, error) + Cancel(uint64) } chainManager struct { bc blockchain.Blockchain + ws workingSet + } + + consensusFsm interface { + Start(context.Context) error + Stop(context.Context) error + BackToPrepare(time.Duration) (fsm.State, error) + ProduceReceiveBlockEvent(any) + ProduceReceiveProposalEndorsementEvent(any) + ProduceReceiveLockEndorsementEvent(any) + ProduceReceivePreCommitEndorsementEvent(any) + Calibrate(uint64) + NumPendingEvents() int + CurrentState() fsm.State } ) @@ -92,11 +115,24 @@ func NewChainManager(bc blockchain.Blockchain) ChainManager { } } +func NewChainManager2(bc blockchain.Blockchain, ws workingSet) ChainManager { + return &chainManager{ + bc: bc, + ws: ws, + } +} + // BlockProposeTime return propose time by height func (cm *chainManager) BlockProposeTime(height uint64) (time.Time, error) { if height == 0 { return time.Unix(cm.bc.Genesis().Timestamp, 0), nil } + if cm.ws != nil && height > cm.bc.TipHeight() { + blk, err := cm.ws.Block(height) + if err == nil { + return blk.Timestamp(), nil + } + } header, err := cm.bc.BlockHeaderByHeight(height) if err != nil { return time.Time{}, errors.Wrapf( @@ -109,6 +145,12 @@ func (cm *chainManager) BlockProposeTime(height uint64) (time.Time, error) { // BlockCommitTime return commit time by height func (cm *chainManager) BlockCommitTime(height uint64) (time.Time, error) { + if cm.ws != nil && height > cm.bc.TipHeight() { + blk, err := cm.ws.Block(height) + if err == nil { + return blk.Timestamp(), nil + } + } footer, err := cm.bc.BlockFooterByHeight(height) if err != nil { return time.Time{}, errors.Wrapf( @@ -144,9 +186,22 @@ func (cm *chainManager) ChainAddress() string { return cm.bc.ChainAddress() } +func (cm *chainManager) DraftHeight() uint64 { + if cm.ws == nil { + return cm.TipHeight() + } + return cm.ws.Height() +} + +func (cm *chainManager) DropDraftBlock(height uint64) { + if cm.ws != nil { + cm.ws.Cancel(height) + } +} + // RollDPoS is Roll-DPoS consensus main entrance type RollDPoS struct { - cfsm *consensusfsm.ConsensusFSM + cfsm consensusFsm ctx RDPoSCtx startDelay time.Duration ready chan interface{} @@ -456,3 +511,51 @@ func (b *Builder) Build() (*RollDPoS, error) { ready: make(chan interface{}), }, nil } + +func (b *Builder) BuildV2() (*RollDPoS, error) { + if b.chain == nil { + return nil, errors.Wrap(ErrNewRollDPoS, "blockchain APIs is nil") + } + if b.broadcastHandler == nil { + return nil, errors.Wrap(ErrNewRollDPoS, "broadcast callback is nil") + } + if b.clock == nil { + b.clock = clock.New() + } + b.cfg.DB.DbPath = "" // disable consensus db + b.cfg.Consensus.Delay = 0 + ctx, err := NewRollDPoSCtx( + consensusfsm.NewConsensusConfig(b.cfg.Consensus.FSM, b.cfg.DardanellesUpgrade, b.cfg.Genesis, b.cfg.Consensus.Delay), + b.cfg.DB, + b.cfg.SystemActive, + b.cfg.Consensus.ToleratedOvertime, + b.cfg.Genesis.TimeBasedRotation, + b.chain, + b.blockDeserializer, + b.rp, + b.broadcastHandler, + b.delegatesByEpochFunc, + b.proposersByEpochFunc, + b.encodedAddr, + b.priKey, + b.clock, + b.cfg.Genesis.BeringBlockHeight, + ) + if err != nil { + return nil, errors.Wrap(err, "error when constructing consensus context") + } + ctx, err = NewChainedRollDPoSCtx(ctx.(*rollDPoSCtx)) + if err != nil { + return nil, errors.Wrap(err, "error when constructing chained roll dpos context") + } + cfsm, err := consensusfsm.NewChainedConsensusFSM(ctx, b.clock) + if err != nil { + return nil, errors.Wrap(err, "error when constructing the consensus FSM") + } + return &RollDPoS{ + cfsm: cfsm, + ctx: ctx, + startDelay: b.cfg.Consensus.Delay, + ready: make(chan interface{}), + }, nil +} diff --git a/consensus/scheme/rolldpos/rolldposctx.go b/consensus/scheme/rolldpos/rolldposctx.go index 62c9a4e3ef..648713174f 100644 --- a/consensus/scheme/rolldpos/rolldposctx.go +++ b/consensus/scheme/rolldpos/rolldposctx.go @@ -136,16 +136,16 @@ func NewRollDPoSCtx( if proposersByEpochFunc == nil { return nil, errors.New("proposers by epoch function cannot be nil") } - if cfg.AcceptBlockTTL(0)+cfg.AcceptProposalEndorsementTTL(0)+cfg.AcceptLockEndorsementTTL(0)+cfg.CommitTTL(0) > cfg.BlockInterval(0) { - return nil, errors.Errorf( - "invalid ttl config, the sum of ttls should be equal to block interval. acceptBlockTTL %d, acceptProposalEndorsementTTL %d, acceptLockEndorsementTTL %d, commitTTL %d, blockInterval %d", - cfg.AcceptBlockTTL(0), - cfg.AcceptProposalEndorsementTTL(0), - cfg.AcceptLockEndorsementTTL(0), - cfg.CommitTTL(0), - cfg.BlockInterval(0), - ) - } + // if cfg.AcceptBlockTTL(0)+cfg.AcceptProposalEndorsementTTL(0)+cfg.AcceptLockEndorsementTTL(0)+cfg.CommitTTL(0) > cfg.BlockInterval(0) { + // return nil, errors.Errorf( + // "invalid ttl config, the sum of ttls should be equal to block interval. acceptBlockTTL %d, acceptProposalEndorsementTTL %d, acceptLockEndorsementTTL %d, commitTTL %d, blockInterval %d", + // cfg.AcceptBlockTTL(0), + // cfg.AcceptProposalEndorsementTTL(0), + // cfg.AcceptLockEndorsementTTL(0), + // cfg.CommitTTL(0), + // cfg.BlockInterval(0), + // ) + // } var eManagerDB db.KVStore if len(consensusDBConfig.DbPath) > 0 { eManagerDB = db.NewBoltDB(consensusDBConfig) @@ -179,7 +179,7 @@ func (ctx *rollDPoSCtx) Start(c context.Context) (err error) { if err := ctx.eManagerDB.Start(c); err != nil { return errors.Wrap(err, "Error when starting the collectionDB") } - eManager, err = newEndorsementManager(ctx.eManagerDB, ctx.blockDeserializer) + eManager, err = newEndorsementManager(newRoundStore(ctx.eManagerDB, 0, 0), ctx.blockDeserializer) } ctx.round, err = ctx.roundCalc.NewRoundWithToleration(0, ctx.BlockInterval(0), ctx.clock.Now(), eManager, ctx.toleratedOvertime) @@ -604,6 +604,13 @@ func (ctx *rollDPoSCtx) Height() uint64 { return ctx.round.Height() } +func (ctx *rollDPoSCtx) Number() uint32 { + ctx.mutex.RLock() + defer ctx.mutex.RUnlock() + + return ctx.round.Number() +} + func (ctx *rollDPoSCtx) Activate(active bool) { ctx.mutex.Lock() defer ctx.mutex.Unlock() diff --git a/consensus/scheme/rolldpos/roundcalculator.go b/consensus/scheme/rolldpos/roundcalculator.go index 4fc21ac9a6..c9905382e5 100644 --- a/consensus/scheme/rolldpos/roundcalculator.go +++ b/consensus/scheme/rolldpos/roundcalculator.go @@ -9,9 +9,11 @@ import ( "time" "github.com/pkg/errors" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/action/protocol/rolldpos" "github.com/iotexproject/iotex-core/v2/endorsement" + "github.com/iotexproject/iotex-core/v2/pkg/log" ) var errInvalidCurrentTime = errors.New("invalid current time") @@ -72,7 +74,7 @@ func (c *roundCalculator) UpdateRound(round *roundCtx, height uint64, blockInter blockInLock = round.blockInLock proofOfLock = round.proofOfLock } else { - err = round.eManager.Cleanup(time.Time{}) + err := round.eManager.WithRound(height, roundNum) if err != nil { return nil, err } @@ -135,12 +137,14 @@ func (c *roundCalculator) roundInfo( now time.Time, toleratedOvertime time.Duration, ) (roundNum uint32, roundStartTime time.Time, err error) { + blockProcessDuration := blockInterval + blockInterval = time.Second var lastBlockTime time.Time if lastBlockTime, err = c.chain.BlockProposeTime(0); err != nil { return } if height > 1 { - if height >= c.beringHeight { + if true { var lastBlkProposeTime time.Time if lastBlkProposeTime, err = c.chain.BlockProposeTime(height - 1); err != nil { return @@ -157,22 +161,25 @@ func (c *roundCalculator) roundInfo( if !lastBlockTime.Before(now) { // TODO: if this is the case, it is possible that the system time is far behind the time of other nodes. // better error handling may be needed on the caller side - err = errors.Wrapf( - errInvalidCurrentTime, - "last block time %s is after than current time %s", - lastBlockTime, - now, - ) - return + // err = errors.Wrapf( + // errInvalidCurrentTime, + // "last block time %s is after than current time %s height %d", + // lastBlockTime, + // now, + // height, + // ) + roundStartTime = lastBlockTime.Add(blockInterval) + return roundNum, roundStartTime, nil } duration := now.Sub(lastBlockTime) if duration > blockInterval { - roundNum = uint32(duration / blockInterval) - if toleratedOvertime == 0 || duration%blockInterval < toleratedOvertime { + roundNum = 1 + uint32((duration-blockInterval)/blockProcessDuration) + if toleratedOvertime == 0 || (duration-blockInterval)%blockProcessDuration < toleratedOvertime { roundNum-- } } - roundStartTime = lastBlockTime.Add(time.Duration(roundNum+1) * blockInterval) + log.L().Debug("round info", zap.Time("lastBlockTime", lastBlockTime), zap.Time("now", now), zap.Uint64("height", height), zap.Duration("duration", duration), zap.Uint32("roundNum", roundNum)) + roundStartTime = lastBlockTime.Add(time.Duration(roundNum)*blockProcessDuration + blockInterval) return roundNum, roundStartTime, nil } @@ -244,6 +251,7 @@ func (c *roundCalculator) newRound( return nil, err } } + eManager.WithRound(height, roundNum) round = &roundCtx{ epochNum: epochNum, epochStartHeight: epochStartHeight, diff --git a/consensus/scheme/rolldpos/roundstore.go b/consensus/scheme/rolldpos/roundstore.go new file mode 100644 index 0000000000..b1f90d3bb3 --- /dev/null +++ b/consensus/scheme/rolldpos/roundstore.go @@ -0,0 +1,40 @@ +package rolldpos + +import "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" + +type ( + kvStore interface { + Put(string, []byte, []byte) error + Get(string, []byte) ([]byte, error) + } + + roundStore struct { + underlying kvStore + height uint64 + roundNum uint32 + } +) + +func newRoundStore(kv kvStore, height uint64, roundNum uint32) *roundStore { + return &roundStore{underlying: kv, height: height, roundNum: roundNum} +} + +func (rs *roundStore) Put(ns string, key []byte, value []byte) error { + return rs.underlying.Put(ns, rs.realKey(key), value) +} + +func (rs *roundStore) Get(ns string, key []byte) ([]byte, error) { + return rs.underlying.Get(ns, rs.realKey(key)) +} + +func (rs *roundStore) ChangeRound(height uint64, roundNum uint32) { + rs.height = height + rs.roundNum = roundNum +} + +func (rs *roundStore) realKey(key []byte) []byte { + keys := byteutil.Uint64ToBytes(rs.height) + keys = append(keys, byteutil.Uint32ToBytes(rs.roundNum)...) + keys = append(keys, key...) + return keys +} diff --git a/pkg/log/log.go b/pkg/log/log.go index 5df5d489c1..9ae33a7d90 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -39,6 +39,7 @@ func init() { zapCfg := zap.NewDevelopmentConfig() zapCfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder zapCfg.Level.SetLevel(zap.InfoLevel) + zapCfg.Level.SetLevel(zap.DebugLevel) l, err := zapCfg.Build() if err != nil { log.Println("Failed to init zap global logger, no zap log will be shown till zap is properly initialized: ", err)