Skip to content

Commit

Permalink
Merge pull request #178 from onflow/alex/consensus-sealing-halt-on-ex…
Browse files Browse the repository at this point in the history
…ec-fork

sealing halt on exec fork
  • Loading branch information
Alexander Hentschel authored Dec 3, 2020
2 parents 9e0f71e + 3ded2bf commit 2d65647
Show file tree
Hide file tree
Showing 36 changed files with 1,217 additions and 240 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ generate-mocks:
GO111MODULE=on mockery -name 'ConnectionFactory' -dir="./engine/access/rpc/backend" -case=underscore -output="./engine/access/rpc/backend/mock" -outpkg="mock"
GO111MODULE=on mockery -name 'IngestRPC' -dir="./engine/execution/ingestion" -case=underscore -tags relic -output="./engine/execution/ingestion/mock" -outpkg="mock"
GO111MODULE=on mockery -name '.*' -dir=model/fingerprint -case=underscore -output="./model/fingerprint/mock" -outpkg="mock"
GO111MODULE=on mockery -name 'ExecForkActor' --structname 'ExecForkActorMock' -dir=module/mempool/consensus/mock/ -case=underscore -output="./module/mempool/consensus/mock/" -outpkg="mock"

# this ensures there is no unused dependency being added by accident
.PHONY: tidy
Expand Down
7 changes: 6 additions & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
chmodule "github.com/onflow/flow-go/module/chunks"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/mempool"
consensusMempools "github.com/onflow/flow-go/module/mempool/consensus"
"github.com/onflow/flow-go/module/mempool/ejectors"
"github.com/onflow/flow-go/module/mempool/stdmap"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -135,7 +136,11 @@ func main() {
// use a custom ejector so we don't eject seals that would break
// the chain of seals
ejector := ejectors.NewLatestIncorporatedResultSeal(node.Storage.Headers)
seals = stdmap.NewIncorporatedResultSeals(sealLimit, stdmap.WithEject(ejector.Eject))
resultSeals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(sealLimit), stdmap.WithEject(ejector.Eject))
seals, err = consensusMempools.NewExecStateForkSuppressor(consensusMempools.LogForkAndCrash(node.Logger), resultSeals, node.DB, node.Logger)
if err != nil {
return fmt.Errorf("failed to wrap seals mempool into ExecStateForkSuppressor: %w", err)
}
return nil
}).
Module("consensus node metrics", func(node *cmd.FlowNodeBuilder) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/block_hash_by_height_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func storeAndIndexHeader(t *testing.T, db *badger.DB, headers *bstorage.Headers,
}

func storeAndIndexSealFor(t *testing.T, db *badger.DB, seals *bstorage.Seals, h *flow.Header) {
seal := unittest.SealFixture()
seal := unittest.Seal.Fixture()
seal.BlockID = h.ID()

err := seals.Store(seal)
Expand Down
2 changes: 1 addition & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func createNode(
guarantees, err := stdmap.NewGuarantees(guaranteeLimit)
require.NoError(t, err)

seals := stdmap.NewIncorporatedResultSeals(sealLimit)
seals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(sealLimit))

// initialize the block builder
build := builder.NewBuilder(metrics, db, state, headersDB, sealsDB, indexDB, guarantees, seals, tracer)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ func (suite *Suite) TestGetAccount() {

// setup the latest sealed block
header := unittest.BlockHeaderFixture() // create a mock header
seal := unittest.SealFixture() // create a mock seal
seal := unittest.Seal.Fixture() // create a mock seal
seal.BlockID = header.ID() // make the seal point to the header

suite.snapshot.
Expand Down
15 changes: 13 additions & 2 deletions engine/consensus/matching/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,19 @@ func (e *Engine) onReceipt(originID flow.Identifier, receipt *flow.ExecutionRece
Logger()

resultFinalState, ok := receipt.ExecutionResult.FinalStateCommitment()
if !ok { // return
if !ok || len(resultFinalState) < 1 { // discard receipt
log.Error().Msg("execution receipt without FinalStateCommit received")
return engine.NewInvalidInputErrorf("execution receipt without FinalStateCommit: %x", receipt.ID())
}
log = log.With().Hex("final_state", resultFinalState).Logger()

resultInitialState, ok := receipt.ExecutionResult.InitialStateCommit()
if !ok { // discard receipt
log.Error().Msg("execution receipt without InitialStateCommit received")
return engine.NewInvalidInputErrorf("execution receipt without InitialStateCommit: %x", receipt.ID())
}
log = log.With().Hex("initial_state", resultInitialState).Logger()

// CAUTION INCOMPLETE
// For many other messages, we check that the message's origin (as established by the
// networking layer) is equal to the message's creator as reported by the message itself.
Expand Down Expand Up @@ -281,6 +288,7 @@ func (e *Engine) onReceipt(originID flow.Identifier, receipt *flow.ExecutionRece
added, err := e.incorporatedResults.Add(flow.NewIncorporatedResult(result.BlockID, result))
if err != nil {
log.Err(err).Msg("error inserting incorporated result in mempool")
return fmt.Errorf("error inserting incorporated result in mempool: %w", err)
}
if !added {
log.Debug().Msg("skipping result already in mempool")
Expand Down Expand Up @@ -722,10 +730,13 @@ func (e *Engine) sealResult(incorporatedResult *flow.IncorporatedResult) error {
}

// we don't care if the seal is already in the mempool
_ = e.seals.Add(&flow.IncorporatedResultSeal{
_, err = e.seals.Add(&flow.IncorporatedResultSeal{
IncorporatedResult: incorporatedResult,
Seal: seal,
})
if err != nil {
return fmt.Errorf("failed to store IncorporatedResultSeal in mempool: %w", err)
}

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func ConsensusNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit
approvals, err := stdmap.NewApprovals(1000)
require.NoError(t, err)

seals := stdmap.NewIncorporatedResultSeals(1000)
seals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(1000))

// receive collections
ingestionEngine, err := consensusingest.New(node.Log, node.Tracer, node.Metrics, node.Metrics, node.Metrics, node.Net, node.State, node.Headers, node.Me, guarantees)
Expand Down
1 change: 1 addition & 0 deletions engine/verification/match/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ func ChunkWithIndex(blockID flow.Identifier, index int) *flow.Chunk {
EventCollection: blockID, // ensure chunks from different blocks with the same index will have different chunk ID
BlockID: blockID,
},
EndState: unittest.StateCommitmentFixture(),
}
return chunk
}
Expand Down
2 changes: 1 addition & 1 deletion model/flow/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestNilProducesSameHashAsEmptySlice(t *testing.T) {

func TestOrderingChangesHash(t *testing.T) {

seals := unittest.BlockSealsFixture(5)
seals := unittest.Seal.Fixtures(5)

payload1 := flow.Payload{
Seals: seals,
Expand Down
8 changes: 2 additions & 6 deletions model/flow/execution_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ func (er ExecutionResult) FinalStateCommitment() (StateCommitment, bool) {
return nil, false
}
s := er.Chunks[er.Chunks.Len()-1].EndState
// TODO: empty state commitment should not be considered valid
// return s, len(s) > 0
return s, true
return s, len(s) > 0
}

// InitialStateCommit returns a commitment to the execution state used as input
Expand All @@ -56,7 +54,5 @@ func (er ExecutionResult) InitialStateCommit() (StateCommitment, bool) {
return nil, false
}
s := er.Chunks[0].StartState
// TODO: empty state commitment should not be considered valid
// return s, len(s) > 0
return s, true
return s, len(s) > 0
}
37 changes: 1 addition & 36 deletions module/builder/consensus/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package consensus

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -241,43 +240,9 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
// roadmap (https://github.com/dapperlabs/flow-go/issues/4872)

// create a mapping of block to seal for all seals in our pool
// We consider two seals as inconsistent, if they have different start or end states
encounteredInconsistentSealsForSameBlock := false
byBlock := make(map[flow.Identifier]*flow.IncorporatedResultSeal)
for _, irSeal := range b.sealPool.All() {
if len(irSeal.IncorporatedResult.Result.Chunks) < 1 {
return nil, fmt.Errorf("ExecutionResult without chunks: %v", irSeal.IncorporatedResult.Result.ID())
}
if len(irSeal.Seal.FinalState) < 1 {
// respective Execution Result should have been rejected by matching engine
return nil, fmt.Errorf("seal with empty state commitment: %v", irSeal.ID())
}
if irSeal2, found := byBlock[irSeal.Seal.BlockID]; found {
sc1json, err := json.Marshal(irSeal)
if err != nil {
return nil, err
}
sc2json, err := json.Marshal(irSeal2)
if err != nil {
return nil, err
}

// check whether seals are inconsistent:
if !bytes.Equal(irSeal.Seal.FinalState, irSeal2.Seal.FinalState) ||
!bytes.Equal(irSeal.IncorporatedResult.Result.Chunks[0].StartState, irSeal2.IncorporatedResult.Result.Chunks[0].StartState) {
fmt.Printf("ERROR: inconsistent seals for the same block %v: %s and %s", irSeal.Seal.BlockID, string(sc1json), string(sc2json))
encounteredInconsistentSealsForSameBlock = true
} else {
fmt.Printf("WARNING: multiple seals with different IDs for the same block %v: %s and %s", irSeal.Seal.BlockID, string(sc1json), string(sc2json))
}

} else {
byBlock[irSeal.Seal.BlockID] = irSeal
}
}
if encounteredInconsistentSealsForSameBlock {
// in case we find inconsistent seals, do not seal anything
byBlock = make(map[flow.Identifier]*flow.IncorporatedResultSeal)
byBlock[irSeal.Seal.BlockID] = irSeal
}

// get the parent's block seal, which constitutes the beginning of the
Expand Down
61 changes: 33 additions & 28 deletions module/builder/consensus/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func (bs *BuilderSuite) createAndRecordBlock(parentBlock *flow.Block) *flow.Bloc

incorporatedResultForPrevBlock = unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithResult(previousResult),
unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()),
unittest.IncorporatedResult.WithIncorporatedBlockID(parentBlock.ID()),
// For sealing phase 2, the value for IncorporatedBlockID is the block the result pertains to (here parentBlock).
// In later development phases, we will change the logic such that IncorporatedBlockID references the
// block which actually incorporates the result:
//unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()),
)
result := unittest.ExecutionResultFixture(
unittest.WithBlock(&block),
Expand Down Expand Up @@ -140,21 +144,11 @@ func (bs *BuilderSuite) createAndRecordBlock(parentBlock *flow.Block) *flow.Bloc
// IncorporatedResultSeal, which ties the seal to the incorporated result it
// seals, is also recorded for future access.
func (bs *BuilderSuite) chainSeal(incorporatedResult *flow.IncorporatedResult) {

finalState, _ := incorporatedResult.Result.FinalStateCommitment()

seal := &flow.Seal{
BlockID: incorporatedResult.Result.BlockID,
ResultID: incorporatedResult.Result.ID(),
FinalState: finalState,
}
bs.chain = append(bs.chain, seal)

incorporatedResultSeal := &flow.IncorporatedResultSeal{
IncorporatedResult: incorporatedResult,
Seal: seal,
}

incorporatedResultSeal := unittest.IncorporatedResultSeal.Fixture(
unittest.IncorporatedResultSeal.WithResult(incorporatedResult.Result),
unittest.IncorporatedResultSeal.WithIncorporatedBlockID(incorporatedResult.IncorporatedBlockID),
)
bs.chain = append(bs.chain, incorporatedResultSeal.Seal)
bs.irsMap[incorporatedResultSeal.ID()] = incorporatedResultSeal
bs.irsList = append(bs.irsList, incorporatedResultSeal)
}
Expand Down Expand Up @@ -524,23 +518,34 @@ func (bs *BuilderSuite) TestPayloadSealAllValid() {
bs.Assert().ElementsMatch(bs.chain, bs.assembled.Seals, "should have included valid chain of seals")
}

func (bs *BuilderSuite) TestPayloadSealSomeValid() {

bs.pendingSeals = bs.irsMap

// add some invalid seals to the mempool
// TestPayloadSealOnlyFork checks that the builder only includes seals corresponding
// to blocks on the current fork (and _not_ seals for sealable blocks on other forks)
func (bs *BuilderSuite) TestPayloadSealOnlyFork() {
// in the test setup, we already created a single fork
// [first] <- [F0] <- [F1] <- [F2] <- [F3] <- [A0] <- [A1] <- [A2] <- [A3]
// Where block
// * [first] is sealed and finalized
// * [F0] ... [F3] are finalized but _not_ sealed
// * [A0] ... [A3] are _not_ finalized and _not_ sealed
// We now create an additional fork: [F3] <- [B0] <- [B1] <- ... <- [B7]
var forkHead *flow.Block
forkHead = bs.blocks[bs.finalID]
for i := 0; i < 8; i++ {
invalid := &flow.IncorporatedResultSeal{
IncorporatedResult: unittest.IncorporatedResultFixture(),
Seal: unittest.SealFixture(),
}
bs.pendingSeals[invalid.ID()] = invalid
forkHead = bs.createAndRecordBlock(forkHead)
// Method createAndRecordBlock adds a seal for every block into the mempool.
}

_, err := bs.build.BuildOn(bs.parentID, bs.setter)
bs.pendingSeals = bs.irsMap
_, err := bs.build.BuildOn(forkHead.ID(), bs.setter)
bs.Require().NoError(err)

// expected seals: [F0] <- ... <- [F3] <- [B0] <- ... <- [B7]
// Note: bs.chain contains seals for blocks F0 ... F3 then A0 ... A3 and then B0 ... B7
bs.Assert().Equal(12, len(bs.assembled.Seals), "unexpected number of seals")
bs.Assert().ElementsMatch(bs.chain[:4], bs.assembled.Seals[:4], "should have included only valid chain of seals")
bs.Assert().ElementsMatch(bs.chain[len(bs.chain)-8:], bs.assembled.Seals[4:], "should have included only valid chain of seals")

bs.Assert().Empty(bs.assembled.Guarantees, "should have no guarantees in payload with empty mempool")
bs.Assert().ElementsMatch(bs.chain, bs.assembled.Seals, "should have included only valid chain of seals")
}

func (bs *BuilderSuite) TestPayloadSealCutoffChain() {
Expand Down
8 changes: 8 additions & 0 deletions module/mempool/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package mempool

import "github.com/onflow/flow-go/model/flow"

// OnEjection is a callback which a mempool executes on ejecting
// one of its elements. The callbacks are executed from within the thread
// that serves the mempool. Implementations should be non-blocking.
type OnEjection func(flow.Entity)
28 changes: 28 additions & 0 deletions module/mempool/consensus/exec_fork_actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package consensus

import (
"encoding/json"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
)

type ExecForkActor func([]*flow.IncorporatedResultSeal)

func LogForkAndCrash(log zerolog.Logger) ExecForkActor {
return func(conflictingSeals []*flow.IncorporatedResultSeal) {
l := log.Fatal().Int("number conflicting seals", len(conflictingSeals))
for i, s := range conflictingSeals {
sealAsJson, err := json.Marshal(s)
if err != nil {
err = fmt.Errorf("failed to marshal candidate seal to json: %w", err)
l.Str(fmt.Sprintf("seal_%d", i), err.Error())
continue
}
l = l.Str(fmt.Sprintf("seal_%d", i), string(sealAsJson))
}
l.Msg("inconsistent seals for the same block")
}
}
Loading

0 comments on commit 2d65647

Please sign in to comment.