Skip to content

Commit

Permalink
chained rolldpos scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc committed Jan 23, 2025
1 parent 58df69e commit 9088583
Show file tree
Hide file tree
Showing 12 changed files with 944 additions and 39 deletions.
182 changes: 182 additions & 0 deletions consensus/consensusfsm/chainedfsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package consensusfsm

import (
"github.com/facebookgo/clock"
fsm "github.com/iotexproject/go-fsm"
"github.com/pkg/errors"
)

const (
// consensus states
SFinalized fsm.State = "S_FINALIZED"
SInvalid fsm.State = "S_INVALID"
)

type ChainedConsensusFSM struct {
*ConsensusFSM
}

func NewChainedConsensusFSM(ctx Context, clock clock.Clock) (*ChainedConsensusFSM, error) {
mm := &ConsensusFSM{
evtq: make(chan *ConsensusEvent, ctx.EventChanSize()),
close: make(chan interface{}),
ctx: ctx,
clock: clock,
}
cm := &ChainedConsensusFSM{
ConsensusFSM: mm,
}
b := fsm.NewBuilder().
AddInitialState(sPrepare).
AddStates(
sAcceptBlockProposal,
sAcceptProposalEndorsement,
sAcceptLockEndorsement,
sAcceptPreCommitEndorsement,
SInvalid,
SFinalized,
).
AddTransition(sPrepare, ePrepare, cm.prepare, []fsm.State{
sPrepare,
sAcceptBlockProposal,
sAcceptPreCommitEndorsement,
}).
AddTransition(
sAcceptBlockProposal,
eReceiveBlock,
cm.onReceiveBlock,
[]fsm.State{
sAcceptBlockProposal, // proposed block invalid
sAcceptProposalEndorsement, // receive valid block, jump to next step
}).
AddTransition(
sAcceptBlockProposal,
eFailedToReceiveBlock,
cm.onFailedToReceiveBlock,
[]fsm.State{
sAcceptProposalEndorsement, // no valid block, jump to next step
}).
AddTransition(
sAcceptProposalEndorsement,
eReceiveProposalEndorsement,
cm.onReceiveProposalEndorsementInAcceptProposalEndorsementState,
[]fsm.State{
sAcceptProposalEndorsement, // not enough endorsements
sAcceptLockEndorsement, // enough endorsements
}).
AddTransition(
sAcceptProposalEndorsement,
eReceivePreCommitEndorsement,
cm.onReceiveProposalEndorsementInAcceptProposalEndorsementState,
[]fsm.State{
sAcceptProposalEndorsement, // not enough endorsements
sAcceptLockEndorsement, // enough endorsements
}).
AddTransition(
sAcceptProposalEndorsement,
eStopReceivingProposalEndorsement,
cm.onStopReceivingProposalEndorsement,
[]fsm.State{
sAcceptLockEndorsement, // timeout, jump to next step
}).
AddTransition(
sAcceptLockEndorsement,
eReceiveProposalEndorsement,
cm.onReceiveProposalEndorsementInAcceptLockEndorsementState,
[]fsm.State{
sAcceptLockEndorsement,
},
).
AddTransition(
sAcceptLockEndorsement,
eReceiveLockEndorsement,
cm.onReceiveLockEndorsement,
[]fsm.State{
sAcceptLockEndorsement, // not enough endorsements
sAcceptPreCommitEndorsement, // reach commit agreement, jump to next step
}).
AddTransition(
sAcceptLockEndorsement,
eReceivePreCommitEndorsement,
cm.onReceiveLockEndorsement,
[]fsm.State{
sAcceptLockEndorsement, // not enough endorsements
sAcceptPreCommitEndorsement, // reach commit agreement, jump to next step
}).
AddTransition(
sAcceptLockEndorsement,
eStopReceivingLockEndorsement,
cm.onStopReceivingLockEndorsement,
[]fsm.State{
SInvalid, // timeout, invalid
}).
AddTransition(
sAcceptPreCommitEndorsement,
eBroadcastPreCommitEndorsement,
cm.onBroadcastPreCommitEndorsement,
[]fsm.State{
sAcceptPreCommitEndorsement,
}).
AddTransition(
sAcceptPreCommitEndorsement,
eStopReceivingPreCommitEndorsement,
cm.onStopReceivingPreCommitEndorsement,
[]fsm.State{
SInvalid,
}).
AddTransition(
sAcceptPreCommitEndorsement,
eReceivePreCommitEndorsement,
cm.onReceivePreCommitEndorsement,
[]fsm.State{
sAcceptPreCommitEndorsement,
SFinalized, // reach consensus, start next epoch
})
// Add the backdoor transition so that we could unit test the transition from any given state
for _, state := range consensusStates {
b = b.AddTransition(state, BackdoorEvent, cm.handleBackdoorEvt, consensusStates)
if state != sPrepare {
b = b.AddTransition(state, eCalibrate, cm.calibrate, []fsm.State{sPrepare, state})
}
}
m, err := b.Build()
if err != nil {
return nil, errors.Wrap(err, "error when building the FSM")
}
cm.fsm = m
return cm, nil
}

func (m *ChainedConsensusFSM) onStopReceivingLockEndorsement(evt fsm.Event) (fsm.State, error) {
m.ctx.Logger().Warn("Not enough lock endorsements")

return m.Invalid()
}

func (m *ChainedConsensusFSM) onStopReceivingPreCommitEndorsement(evt fsm.Event) (fsm.State, error) {
m.ctx.Logger().Warn("Not enough pre-commit endorsements")

return m.Invalid()
}

func (m *ChainedConsensusFSM) onReceivePreCommitEndorsement(evt fsm.Event) (fsm.State, error) {
cEvt, ok := evt.(*ConsensusEvent)
if !ok {
return sAcceptPreCommitEndorsement, errors.Wrap(ErrEvtCast, "failed to cast to consensus event")
}
committed, err := m.ctx.Commit(cEvt.Data())
if err != nil || !committed {
return sAcceptPreCommitEndorsement, err
}
return SFinalized, nil
}

func (m *ChainedConsensusFSM) Finalize() (fsm.State, error) {
m.ctx.Logger().Warn("Finalized")
return SFinalized, nil
}

func (m *ChainedConsensusFSM) Invalid() (fsm.State, error) {
m.ctx.Logger().Warn("Invalid")
return SInvalid, nil
}
1 change: 1 addition & 0 deletions consensus/consensusfsm/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Context interface {

Logger() *zap.Logger
Height() uint64
Number() uint32

NewConsensusEvent(fsm.EventType, interface{}) *ConsensusEvent
NewBackdoorEvt(fsm.State) *ConsensusEvent
Expand Down
7 changes: 4 additions & 3 deletions consensus/consensusfsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (m *ConsensusFSM) produce(evt *ConsensusEvent, delay time.Duration) {
if evt == nil {
return
}
// m.ctx.Logger().Debug("produce event", zap.Any("event", evt.Type()), zap.Stack("stack"))
_consensusEvtsMtc.WithLabelValues(string(evt.Type()), "produced").Inc()
if delay > 0 {
m.wg.Add(1)
Expand All @@ -328,7 +329,7 @@ func (m *ConsensusFSM) handle(evt *ConsensusEvent) error {
return nil
}
if m.ctx.IsFutureEvent(evt) {
m.ctx.Logger().Debug("future event", zap.Any("event", evt.Type()))
m.ctx.Logger().Debug("future event", zap.Any("event", evt.Type()), zap.Any("eventHeight", evt.Height()), zap.Any("eventRount", evt.Round()))
// TODO: find a more appropriate delay
m.produce(evt, m.ctx.UnmatchedEventInterval(evt.Height()))
_consensusEvtsMtc.WithLabelValues(string(evt.Type()), "backoff").Inc()
Expand Down Expand Up @@ -397,10 +398,10 @@ func (m *ConsensusFSM) calibrate(evt fsm.Event) (fsm.State, error) {

func (m *ConsensusFSM) prepare(evt fsm.Event) (fsm.State, error) {
if err := m.ctx.Prepare(); err != nil {
m.ctx.Logger().Error("Error during prepare", zap.Error(err))
m.ctx.Logger().Error("Error during prepare", zap.Error(err), zap.Stack("stack"))
return m.BackToPrepare(0)
}
m.ctx.Logger().Debug("Start a new round")
m.ctx.Logger().Debug("Start a new round", zap.Stack("stack"))
proposal, err := m.ctx.Proposal()
if err != nil {
m.ctx.Logger().Error("failed to generate block proposal", zap.Error(err))
Expand Down
Loading

0 comments on commit 9088583

Please sign in to comment.