Skip to content

Commit

Permalink
Adds Prepared Certificates to ensure Istanbul liveness (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua Gutow authored and Asa Oines committed Sep 24, 2019
1 parent 7fbd6f3 commit 0344742
Show file tree
Hide file tree
Showing 26 changed files with 1,639 additions and 500 deletions.
2 changes: 1 addition & 1 deletion consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Backend interface {

// Verify verifies the proposal. If a consensus.ErrFutureBlock error is returned,
// the time difference of the proposal and current time is also returned.
Verify(Proposal, Validator) (time.Duration, error)
Verify(Proposal) (time.Duration, error)

// Sign signs input data with the backend's private key
Sign([]byte) ([]byte, error)
Expand Down
14 changes: 10 additions & 4 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (sb *Backend) EventMux() *event.TypeMux {
}

// Verify implements istanbul.Backend.Verify
func (sb *Backend) Verify(proposal istanbul.Proposal, src istanbul.Validator) (time.Duration, error) {
func (sb *Backend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
// Check if the proposal is a valid block
block := &types.Block{}
block, ok := proposal.(*types.Block)
Expand All @@ -303,11 +303,17 @@ func (sb *Backend) Verify(proposal istanbul.Proposal, src istanbul.Validator) (t
return 0, errInvalidUncleHash
}

// verify the header of proposed block
if block.Header().Coinbase != src.Address() {
// The author should be the first person to propose the block to ensure that randomness matches up.
addr, err := sb.Author(block.Header())
if err != nil {
sb.logger.Error("Could not recover orignal author of the block to verify the randomness", "err", err, "func", "Verify")
return 0, errInvalidProposal
} else if addr != block.Header().Coinbase {
sb.logger.Error("Original author of the block does not match the coinbase", "addr", addr, "coinbase", block.Header().Coinbase, "func", "Verify")
return 0, errInvalidCoinbase
}
err := sb.VerifyHeader(sb.chain, block.Header(), false)

err = sb.VerifyHeader(sb.chain, block.Header(), false)

// ignore errEmptyCommittedSeals error because we don't have the committed seals yet
if err != nil && err != errEmptyCommittedSeals {
Expand Down
39 changes: 30 additions & 9 deletions consensus/istanbul/core/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
return errInvalidMessage
}

// Round change messages should be in the same sequence but be >= the desired round
if msgCode == istanbul.MsgRoundChange {
if view.Sequence.Cmp(c.currentView().Sequence) > 0 {
return errFutureMessage
} else if view.Cmp(c.currentView()) < 0 {
} else if view.Round.Cmp(c.current.DesiredRound()) < 0 {
return errOldMessage
}
return nil
Expand All @@ -57,7 +58,8 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
return errOldMessage
}

if c.waitingForRoundChange {
// Round change messages are already let through.
if c.state == StateWaitingForNewRound {
return errFutureMessage
}

Expand All @@ -76,9 +78,14 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
}

func (c *core) storeBacklog(msg *istanbul.Message, src istanbul.Validator) {
logger := c.logger.New("from", src, "state", c.state)
logger := c.logger.New("from", msg.Address, "state", c.state, "func", "storeBacklog")
if c.current != nil {
logger = logger.New("cur_seq", c.current.Sequence(), "cur_round", c.current.Round())
} else {
logger = logger.New("cur_seq", 0, "cur_round", -1)
}

if src.Address() == c.Address() {
if msg.Address == c.Address() {
logger.Warn("Backlog from self")
return
}
Expand All @@ -99,13 +106,20 @@ func (c *core) storeBacklog(msg *istanbul.Message, src istanbul.Validator) {
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
// for istanbul.MsgRoundChange, istanbul.MsgPrepare and istanbul.MsgCommit cases
default:
case istanbul.MsgPrepare:
fallthrough
case istanbul.MsgCommit:
var p *istanbul.Subject
err := msg.Decode(&p)
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
case istanbul.MsgRoundChange:
var p *istanbul.RoundChange
err := msg.Decode(&p)
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
}
c.backlogs[src] = backlog
}
Expand All @@ -119,7 +133,7 @@ func (c *core) processBacklog() {
continue
}

logger := c.logger.New("from", src, "state", c.state)
logger := c.logger.New("from", src, "state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "processBacklog")
isFuture := false

// We stop processing if
Expand All @@ -136,13 +150,20 @@ func (c *core) processBacklog() {
if err == nil {
view = m.View
}
// for istanbul.MsgRoundChange, istanbul.MsgPrepare and istanbul.MsgCommit cases
default:
case istanbul.MsgPrepare:
fallthrough
case istanbul.MsgCommit:
var sub *istanbul.Subject
err := msg.Decode(&sub)
if err == nil {
view = sub.View
}
case istanbul.MsgRoundChange:
var rc *istanbul.RoundChange
err := msg.Decode(&rc)
if err == nil {
view = rc.View
}
}
if view == nil {
logger.Debug("Nil view", "msg", msg)
Expand Down
111 changes: 65 additions & 46 deletions consensus/istanbul/core/backlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
elog "github.com/ethereum/go-ethereum/log"
)

func TestCheckMessage(t *testing.T) {
testLogger.SetHandler(elog.StdoutHandler)
c := &core{
state: StateAcceptRequest,
logger: testLogger,
state: StateAcceptRequest,
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
}, newTestValidatorSet(4), nil, nil, istanbul.EmptyPreparedCertificate(), nil),
}

// invalid view format
Expand All @@ -46,7 +48,7 @@ func TestCheckMessage(t *testing.T) {
t.Errorf("error mismatch: have %v, want %v", err, errInvalidMessage)
}

testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted}
testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted, StateWaitingForNewRound}
testCode := []uint64{istanbul.MsgPreprepare, istanbul.MsgPrepare, istanbul.MsgCommit, istanbul.MsgRoundChange}

// future sequence
Expand Down Expand Up @@ -83,27 +85,6 @@ func TestCheckMessage(t *testing.T) {
}
}

// current view but waiting for round change
v = &istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}
c.waitingForRoundChange = true
for i := 0; i < len(testStates); i++ {
c.state = testStates[i]
for j := 0; j < len(testCode); j++ {
err := c.checkMessage(testCode[j], v)
if testCode[j] == istanbul.MsgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
}
}
c.waitingForRoundChange = false

v = c.currentView()
// current view, state = StateAcceptRequest
c.state = StateAcceptRequest
Expand Down Expand Up @@ -163,11 +144,25 @@ func TestCheckMessage(t *testing.T) {
}
}

// current view, state = StateWaitingForNewRound
c.state = StateWaitingForNewRound
for i := 0; i < len(testCode); i++ {
err := c.checkMessage(testCode[i], v)
if testCode[i] == istanbul.MsgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
}

}

func TestStoreBacklog(t *testing.T) {
testLogger.SetHandler(elog.StdoutHandler)
c := &core{
logger: log.New("backend", "test", "id", 0),
logger: testLogger,
backlogs: make(map[istanbul.Validator]*prque.Prque),
backlogsMu: new(sync.Mutex),
}
Expand All @@ -183,8 +178,9 @@ func TestStoreBacklog(t *testing.T) {
}
prepreparePayload, _ := Encode(preprepare)
m := &istanbul.Message{
Code: istanbul.MsgPreprepare,
Msg: prepreparePayload,
Code: istanbul.MsgPreprepare,
Msg: prepreparePayload,
Address: p.Address(),
}
c.storeBacklog(m, p)
msg := c.backlogs[p].PopItem()
Expand All @@ -200,8 +196,9 @@ func TestStoreBacklog(t *testing.T) {
subjectPayload, _ := Encode(subject)

m = &istanbul.Message{
Code: istanbul.MsgPrepare,
Msg: subjectPayload,
Code: istanbul.MsgPrepare,
Msg: subjectPayload,
Address: p.Address(),
}
c.storeBacklog(m, p)
msg = c.backlogs[p].PopItem()
Expand All @@ -211,8 +208,9 @@ func TestStoreBacklog(t *testing.T) {

// push commit msg
m = &istanbul.Message{
Code: istanbul.MsgCommit,
Msg: subjectPayload,
Code: istanbul.MsgCommit,
Msg: subjectPayload,
Address: p.Address(),
}
c.storeBacklog(m, p)
msg = c.backlogs[p].PopItem()
Expand All @@ -221,9 +219,16 @@ func TestStoreBacklog(t *testing.T) {
}

// push roundChange msg
rc := &istanbul.RoundChange{
View: v,
PreparedCertificate: istanbul.EmptyPreparedCertificate(),
}
rcPayload, _ := Encode(rc)

m = &istanbul.Message{
Code: istanbul.MsgRoundChange,
Msg: subjectPayload,
Code: istanbul.MsgRoundChange,
Msg: rcPayload,
Address: p.Address(),
}
c.storeBacklog(m, p)
msg = c.backlogs[p].PopItem()
Expand All @@ -236,15 +241,16 @@ func TestProcessFutureBacklog(t *testing.T) {
backend := &testSystemBackend{
events: new(event.TypeMux),
}
testLogger.SetHandler(elog.StdoutHandler)
c := &core{
logger: log.New("backend", "test", "id", 0),
logger: testLogger,
backlogs: make(map[istanbul.Validator]*prque.Prque),
backlogsMu: new(sync.Mutex),
backend: backend,
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
}, newTestValidatorSet(4), nil, nil, istanbul.EmptyPreparedCertificate(), nil),
state: StateAcceptRequest,
}
c.subscribeEvents()
Expand Down Expand Up @@ -298,22 +304,34 @@ func TestProcessBacklog(t *testing.T) {
}
subjectPayload, _ := Encode(subject)

rc := &istanbul.RoundChange{
View: v,
PreparedCertificate: istanbul.EmptyPreparedCertificate(),
}
rcPayload, _ := Encode(rc)

address := common.BytesToAddress([]byte("0xce10ce10"))

msgs := []*istanbul.Message{
{
Code: istanbul.MsgPreprepare,
Msg: prepreparePayload,
Code: istanbul.MsgPreprepare,
Msg: prepreparePayload,
Address: address,
},
{
Code: istanbul.MsgPrepare,
Msg: subjectPayload,
Code: istanbul.MsgPrepare,
Msg: subjectPayload,
Address: address,
},
{
Code: istanbul.MsgCommit,
Msg: subjectPayload,
Code: istanbul.MsgCommit,
Msg: subjectPayload,
Address: address,
},
{
Code: istanbul.MsgRoundChange,
Msg: subjectPayload,
Code: istanbul.MsgRoundChange,
Msg: rcPayload,
Address: address,
},
}
for i := 0; i < len(msgs); i++ {
Expand All @@ -327,16 +345,17 @@ func testProcessBacklog(t *testing.T, msg *istanbul.Message) {
events: new(event.TypeMux),
peers: vset,
}
testLogger.SetHandler(elog.StdoutHandler)
c := &core{
logger: log.New("backend", "test", "id", 0),
logger: testLogger,
backlogs: make(map[istanbul.Validator]*prque.Prque),
backlogsMu: new(sync.Mutex),
backend: backend,
state: State(msg.Code),
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), common.Hash{}, nil, nil, nil),
}, newTestValidatorSet(4), nil, nil, istanbul.EmptyPreparedCertificate(), nil),
}
c.subscribeEvents()
defer c.unsubscribeEvents()
Expand Down
Loading

0 comments on commit 0344742

Please sign in to comment.