diff --git a/Makefile b/Makefile index 25622af5f05..6f7f50f55af 100644 --- a/Makefile +++ b/Makefile @@ -121,6 +121,7 @@ generate-mocks: GO111MODULE=on mockery -name 'ConnectionFactory' -dir="./engine/access/rpc/backend" -case=underscore -output="./engine/access/rpc/backend/mock" -outpkg="mock" GO111MODULE=on mockery -name 'IngestRPC' -dir="./engine/execution/ingestion" -case=underscore -tags relic -output="./engine/execution/ingestion/mock" -outpkg="mock" GO111MODULE=on mockery -name '.*' -dir=model/fingerprint -case=underscore -output="./model/fingerprint/mock" -outpkg="mock" + GO111MODULE=on mockery -name 'ExecForkActor' --structname 'ExecForkActorMock' -dir=module/mempool/consensus/mock/ -case=underscore -output="./module/mempool/consensus/mock/" -outpkg="mock" # this ensures there is no unused dependency being added by accident .PHONY: tidy diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index 9e301618b8a..1c958e9d657 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -36,6 +36,7 @@ import ( chmodule "github.com/onflow/flow-go/module/chunks" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" "github.com/onflow/flow-go/module/mempool" + consensusMempools "github.com/onflow/flow-go/module/mempool/consensus" "github.com/onflow/flow-go/module/mempool/ejectors" "github.com/onflow/flow-go/module/mempool/stdmap" "github.com/onflow/flow-go/module/metrics" @@ -135,7 +136,11 @@ func main() { // use a custom ejector so we don't eject seals that would break // the chain of seals ejector := ejectors.NewLatestIncorporatedResultSeal(node.Storage.Headers) - seals = stdmap.NewIncorporatedResultSeals(sealLimit, stdmap.WithEject(ejector.Eject)) + resultSeals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(sealLimit), stdmap.WithEject(ejector.Eject)) + seals, err = consensusMempools.NewExecStateForkSuppressor(consensusMempools.LogForkAndCrash(node.Logger), resultSeals, node.DB, node.Logger) + if err != nil { + return fmt.Errorf("failed to wrap seals mempool into ExecStateForkSuppressor: %w", err) + } return nil }). Module("consensus node metrics", func(node *cmd.FlowNodeBuilder) error { diff --git a/cmd/util/cmd/block_hash_by_height_test.go b/cmd/util/cmd/block_hash_by_height_test.go index af43e2388ea..b1827198f9d 100644 --- a/cmd/util/cmd/block_hash_by_height_test.go +++ b/cmd/util/cmd/block_hash_by_height_test.go @@ -115,7 +115,7 @@ func storeAndIndexHeader(t *testing.T, db *badger.DB, headers *bstorage.Headers, } func storeAndIndexSealFor(t *testing.T, db *badger.DB, seals *bstorage.Seals, h *flow.Header) { - seal := unittest.SealFixture() + seal := unittest.Seal.Fixture() seal.BlockID = h.ID() err := seals.Store(seal) diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 20e66f706ab..4346b8df2a5 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -187,7 +187,7 @@ func createNode( guarantees, err := stdmap.NewGuarantees(guaranteeLimit) require.NoError(t, err) - seals := stdmap.NewIncorporatedResultSeals(sealLimit) + seals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(sealLimit)) // initialize the block builder build := builder.NewBuilder(metrics, db, state, headersDB, sealsDB, indexDB, guarantees, seals, tracer) diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index d686bbd6fb2..3be9e12735e 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -661,7 +661,7 @@ func (suite *Suite) TestGetAccount() { // setup the latest sealed block header := unittest.BlockHeaderFixture() // create a mock header - seal := unittest.SealFixture() // create a mock seal + seal := unittest.Seal.Fixture() // create a mock seal seal.BlockID = header.ID() // make the seal point to the header suite.snapshot. diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index d0afee6c8ad..3e04ef9c329 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -210,12 +210,19 @@ func (e *Engine) onReceipt(originID flow.Identifier, receipt *flow.ExecutionRece Logger() resultFinalState, ok := receipt.ExecutionResult.FinalStateCommitment() - if !ok { // return + if !ok || len(resultFinalState) < 1 { // discard receipt log.Error().Msg("execution receipt without FinalStateCommit received") return engine.NewInvalidInputErrorf("execution receipt without FinalStateCommit: %x", receipt.ID()) } log = log.With().Hex("final_state", resultFinalState).Logger() + resultInitialState, ok := receipt.ExecutionResult.InitialStateCommit() + if !ok { // discard receipt + log.Error().Msg("execution receipt without InitialStateCommit received") + return engine.NewInvalidInputErrorf("execution receipt without InitialStateCommit: %x", receipt.ID()) + } + log = log.With().Hex("initial_state", resultInitialState).Logger() + // CAUTION INCOMPLETE // For many other messages, we check that the message's origin (as established by the // networking layer) is equal to the message's creator as reported by the message itself. @@ -281,6 +288,7 @@ func (e *Engine) onReceipt(originID flow.Identifier, receipt *flow.ExecutionRece added, err := e.incorporatedResults.Add(flow.NewIncorporatedResult(result.BlockID, result)) if err != nil { log.Err(err).Msg("error inserting incorporated result in mempool") + return fmt.Errorf("error inserting incorporated result in mempool: %w", err) } if !added { log.Debug().Msg("skipping result already in mempool") @@ -722,10 +730,13 @@ func (e *Engine) sealResult(incorporatedResult *flow.IncorporatedResult) error { } // we don't care if the seal is already in the mempool - _ = e.seals.Add(&flow.IncorporatedResultSeal{ + _, err = e.seals.Add(&flow.IncorporatedResultSeal{ IncorporatedResult: incorporatedResult, Seal: seal, }) + if err != nil { + return fmt.Errorf("failed to store IncorporatedResultSeal in mempool: %w", err) + } return nil } diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index a3be01ee8c4..3ccbef516f0 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -205,7 +205,7 @@ func ConsensusNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit approvals, err := stdmap.NewApprovals(1000) require.NoError(t, err) - seals := stdmap.NewIncorporatedResultSeals(1000) + seals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(1000)) // receive collections ingestionEngine, err := consensusingest.New(node.Log, node.Tracer, node.Metrics, node.Metrics, node.Metrics, node.Net, node.State, node.Headers, node.Me, guarantees) diff --git a/engine/verification/match/engine_test.go b/engine/verification/match/engine_test.go index 1d6b7612ce3..beaf318ac81 100644 --- a/engine/verification/match/engine_test.go +++ b/engine/verification/match/engine_test.go @@ -837,6 +837,7 @@ func ChunkWithIndex(blockID flow.Identifier, index int) *flow.Chunk { EventCollection: blockID, // ensure chunks from different blocks with the same index will have different chunk ID BlockID: blockID, }, + EndState: unittest.StateCommitmentFixture(), } return chunk } diff --git a/model/flow/block_test.go b/model/flow/block_test.go index 8ff6ba3120c..396728b56e3 100644 --- a/model/flow/block_test.go +++ b/model/flow/block_test.go @@ -81,7 +81,7 @@ func TestNilProducesSameHashAsEmptySlice(t *testing.T) { func TestOrderingChangesHash(t *testing.T) { - seals := unittest.BlockSealsFixture(5) + seals := unittest.Seal.Fixtures(5) payload1 := flow.Payload{ Seals: seals, diff --git a/model/flow/execution_result.go b/model/flow/execution_result.go index 3f18c995c39..e697961647b 100644 --- a/model/flow/execution_result.go +++ b/model/flow/execution_result.go @@ -39,9 +39,7 @@ func (er ExecutionResult) FinalStateCommitment() (StateCommitment, bool) { return nil, false } s := er.Chunks[er.Chunks.Len()-1].EndState - // TODO: empty state commitment should not be considered valid - // return s, len(s) > 0 - return s, true + return s, len(s) > 0 } // InitialStateCommit returns a commitment to the execution state used as input @@ -56,7 +54,5 @@ func (er ExecutionResult) InitialStateCommit() (StateCommitment, bool) { return nil, false } s := er.Chunks[0].StartState - // TODO: empty state commitment should not be considered valid - // return s, len(s) > 0 - return s, true + return s, len(s) > 0 } diff --git a/module/builder/consensus/builder.go b/module/builder/consensus/builder.go index cf8a64b2075..503345ab4a3 100644 --- a/module/builder/consensus/builder.go +++ b/module/builder/consensus/builder.go @@ -4,7 +4,6 @@ package consensus import ( "bytes" - "encoding/json" "errors" "fmt" "time" @@ -241,43 +240,9 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er // roadmap (https://github.com/dapperlabs/flow-go/issues/4872) // create a mapping of block to seal for all seals in our pool - // We consider two seals as inconsistent, if they have different start or end states - encounteredInconsistentSealsForSameBlock := false byBlock := make(map[flow.Identifier]*flow.IncorporatedResultSeal) for _, irSeal := range b.sealPool.All() { - if len(irSeal.IncorporatedResult.Result.Chunks) < 1 { - return nil, fmt.Errorf("ExecutionResult without chunks: %v", irSeal.IncorporatedResult.Result.ID()) - } - if len(irSeal.Seal.FinalState) < 1 { - // respective Execution Result should have been rejected by matching engine - return nil, fmt.Errorf("seal with empty state commitment: %v", irSeal.ID()) - } - if irSeal2, found := byBlock[irSeal.Seal.BlockID]; found { - sc1json, err := json.Marshal(irSeal) - if err != nil { - return nil, err - } - sc2json, err := json.Marshal(irSeal2) - if err != nil { - return nil, err - } - - // check whether seals are inconsistent: - if !bytes.Equal(irSeal.Seal.FinalState, irSeal2.Seal.FinalState) || - !bytes.Equal(irSeal.IncorporatedResult.Result.Chunks[0].StartState, irSeal2.IncorporatedResult.Result.Chunks[0].StartState) { - fmt.Printf("ERROR: inconsistent seals for the same block %v: %s and %s", irSeal.Seal.BlockID, string(sc1json), string(sc2json)) - encounteredInconsistentSealsForSameBlock = true - } else { - fmt.Printf("WARNING: multiple seals with different IDs for the same block %v: %s and %s", irSeal.Seal.BlockID, string(sc1json), string(sc2json)) - } - - } else { - byBlock[irSeal.Seal.BlockID] = irSeal - } - } - if encounteredInconsistentSealsForSameBlock { - // in case we find inconsistent seals, do not seal anything - byBlock = make(map[flow.Identifier]*flow.IncorporatedResultSeal) + byBlock[irSeal.Seal.BlockID] = irSeal } // get the parent's block seal, which constitutes the beginning of the diff --git a/module/builder/consensus/builder_test.go b/module/builder/consensus/builder_test.go index d009f525671..e0581246e0c 100644 --- a/module/builder/consensus/builder_test.go +++ b/module/builder/consensus/builder_test.go @@ -112,7 +112,11 @@ func (bs *BuilderSuite) createAndRecordBlock(parentBlock *flow.Block) *flow.Bloc incorporatedResultForPrevBlock = unittest.IncorporatedResult.Fixture( unittest.IncorporatedResult.WithResult(previousResult), - unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()), + unittest.IncorporatedResult.WithIncorporatedBlockID(parentBlock.ID()), + // For sealing phase 2, the value for IncorporatedBlockID is the block the result pertains to (here parentBlock). + // In later development phases, we will change the logic such that IncorporatedBlockID references the + // block which actually incorporates the result: + //unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()), ) result := unittest.ExecutionResultFixture( unittest.WithBlock(&block), @@ -140,21 +144,11 @@ func (bs *BuilderSuite) createAndRecordBlock(parentBlock *flow.Block) *flow.Bloc // IncorporatedResultSeal, which ties the seal to the incorporated result it // seals, is also recorded for future access. func (bs *BuilderSuite) chainSeal(incorporatedResult *flow.IncorporatedResult) { - - finalState, _ := incorporatedResult.Result.FinalStateCommitment() - - seal := &flow.Seal{ - BlockID: incorporatedResult.Result.BlockID, - ResultID: incorporatedResult.Result.ID(), - FinalState: finalState, - } - bs.chain = append(bs.chain, seal) - - incorporatedResultSeal := &flow.IncorporatedResultSeal{ - IncorporatedResult: incorporatedResult, - Seal: seal, - } - + incorporatedResultSeal := unittest.IncorporatedResultSeal.Fixture( + unittest.IncorporatedResultSeal.WithResult(incorporatedResult.Result), + unittest.IncorporatedResultSeal.WithIncorporatedBlockID(incorporatedResult.IncorporatedBlockID), + ) + bs.chain = append(bs.chain, incorporatedResultSeal.Seal) bs.irsMap[incorporatedResultSeal.ID()] = incorporatedResultSeal bs.irsList = append(bs.irsList, incorporatedResultSeal) } @@ -524,23 +518,34 @@ func (bs *BuilderSuite) TestPayloadSealAllValid() { bs.Assert().ElementsMatch(bs.chain, bs.assembled.Seals, "should have included valid chain of seals") } -func (bs *BuilderSuite) TestPayloadSealSomeValid() { - - bs.pendingSeals = bs.irsMap - - // add some invalid seals to the mempool +// TestPayloadSealOnlyFork checks that the builder only includes seals corresponding +// to blocks on the current fork (and _not_ seals for sealable blocks on other forks) +func (bs *BuilderSuite) TestPayloadSealOnlyFork() { + // in the test setup, we already created a single fork + // [first] <- [F0] <- [F1] <- [F2] <- [F3] <- [A0] <- [A1] <- [A2] <- [A3] + // Where block + // * [first] is sealed and finalized + // * [F0] ... [F3] are finalized but _not_ sealed + // * [A0] ... [A3] are _not_ finalized and _not_ sealed + // We now create an additional fork: [F3] <- [B0] <- [B1] <- ... <- [B7] + var forkHead *flow.Block + forkHead = bs.blocks[bs.finalID] for i := 0; i < 8; i++ { - invalid := &flow.IncorporatedResultSeal{ - IncorporatedResult: unittest.IncorporatedResultFixture(), - Seal: unittest.SealFixture(), - } - bs.pendingSeals[invalid.ID()] = invalid + forkHead = bs.createAndRecordBlock(forkHead) + // Method createAndRecordBlock adds a seal for every block into the mempool. } - _, err := bs.build.BuildOn(bs.parentID, bs.setter) + bs.pendingSeals = bs.irsMap + _, err := bs.build.BuildOn(forkHead.ID(), bs.setter) bs.Require().NoError(err) + + // expected seals: [F0] <- ... <- [F3] <- [B0] <- ... <- [B7] + // Note: bs.chain contains seals for blocks F0 ... F3 then A0 ... A3 and then B0 ... B7 + bs.Assert().Equal(12, len(bs.assembled.Seals), "unexpected number of seals") + bs.Assert().ElementsMatch(bs.chain[:4], bs.assembled.Seals[:4], "should have included only valid chain of seals") + bs.Assert().ElementsMatch(bs.chain[len(bs.chain)-8:], bs.assembled.Seals[4:], "should have included only valid chain of seals") + bs.Assert().Empty(bs.assembled.Guarantees, "should have no guarantees in payload with empty mempool") - bs.Assert().ElementsMatch(bs.chain, bs.assembled.Seals, "should have included only valid chain of seals") } func (bs *BuilderSuite) TestPayloadSealCutoffChain() { diff --git a/module/mempool/common.go b/module/mempool/common.go new file mode 100644 index 00000000000..7b7bed05f18 --- /dev/null +++ b/module/mempool/common.go @@ -0,0 +1,8 @@ +package mempool + +import "github.com/onflow/flow-go/model/flow" + +// OnEjection is a callback which a mempool executes on ejecting +// one of its elements. The callbacks are executed from within the thread +// that serves the mempool. Implementations should be non-blocking. +type OnEjection func(flow.Entity) diff --git a/module/mempool/consensus/exec_fork_actor.go b/module/mempool/consensus/exec_fork_actor.go new file mode 100644 index 00000000000..a2abffb0327 --- /dev/null +++ b/module/mempool/consensus/exec_fork_actor.go @@ -0,0 +1,28 @@ +package consensus + +import ( + "encoding/json" + "fmt" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" +) + +type ExecForkActor func([]*flow.IncorporatedResultSeal) + +func LogForkAndCrash(log zerolog.Logger) ExecForkActor { + return func(conflictingSeals []*flow.IncorporatedResultSeal) { + l := log.Fatal().Int("number conflicting seals", len(conflictingSeals)) + for i, s := range conflictingSeals { + sealAsJson, err := json.Marshal(s) + if err != nil { + err = fmt.Errorf("failed to marshal candidate seal to json: %w", err) + l.Str(fmt.Sprintf("seal_%d", i), err.Error()) + continue + } + l = l.Str(fmt.Sprintf("seal_%d", i), string(sealAsJson)) + } + l.Msg("inconsistent seals for the same block") + } +} diff --git a/module/mempool/consensus/exec_fork_suppressor.go b/module/mempool/consensus/exec_fork_suppressor.go new file mode 100644 index 00000000000..9f7720367b4 --- /dev/null +++ b/module/mempool/consensus/exec_fork_suppressor.go @@ -0,0 +1,351 @@ +package consensus + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "sync" + + "github.com/dgraph-io/badger/v2" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/badger/operation" +) + +var executionForkErr = fmt.Errorf("forked execution state detected") // sentinel error + +// ExecForkSuppressor is a wrapper around a conventional mempool.IncorporatedResultSeals +// mempool. It implements the following mitigation strategy for execution forks: +// * In case two conflicting results are considered sealable for the same block, +// sealing should halt. Specifically, two results are considered conflicting, +// if they differ in their start or end state. +// * Even after a restart, the sealing should not resume. +// * We rely on human intervention to resolve the conflict. +// The ExecForkSuppressor implements this mitigation strategy as follows: +// * For each candidate seal inserted into the mempool, inspect the state +// transition for the respective block. +// * If this is the first seal for a block, store the seal as an archetype +// for the state transition into the internal map `sealsForBlock`. +// * If the mempool already knows about a state transition for a block, +// and a second seal for the same block is inserted, check whether +// the seal has the same state transition. +// * If conflicting state transitions for the same block are detected, +// ExecForkSuppressor sets an internal flag and thereafter +// reports the mempool as empty, which will lead to the respective +// consensus node not including any more seals. +// * Evidence for an execution fork stored in a database (persisted across restarts). +// Implementation is concurrency safe. +type ExecForkSuppressor struct { + mutex sync.RWMutex + seals mempool.IncorporatedResultSeals + sealsForBlock map[flow.Identifier]sealSet // map BlockID -> set of IncorporatedResultSeal + execForkDetected bool + onExecFork ExecForkActor + db *badger.DB + log zerolog.Logger +} + +// sealSet is a set of seals; internally represented as a map from sealID -> to seal +type sealSet map[flow.Identifier]*flow.IncorporatedResultSeal + +func NewExecStateForkSuppressor(onExecFork ExecForkActor, seals mempool.IncorporatedResultSeals, db *badger.DB, log zerolog.Logger) (*ExecForkSuppressor, error) { + conflictingSeals, err := checkExecutionForkEvidence(db) + if err != nil { + return nil, fmt.Errorf("failed to interface with storage: %w", err) + } + execForkDetectedFlag := len(conflictingSeals) != 0 + if execForkDetectedFlag { + onExecFork(conflictingSeals) + } + + wrapper := ExecForkSuppressor{ + mutex: sync.RWMutex{}, + seals: seals, + sealsForBlock: make(map[flow.Identifier]sealSet), + execForkDetected: execForkDetectedFlag, + onExecFork: onExecFork, + db: db, + log: log.With().Str("mempool", "ExecForkSuppressor").Logger(), + } + seals.RegisterEjectionCallbacks(wrapper.onEject) + + return &wrapper, nil +} + +// onEject is the callback, which the wrapped mempool should call whenever it ejects an element +func (s *ExecForkSuppressor) onEject(entity flow.Entity) { + // uncaught type assertion; should never panic as mempool.IncorporatedResultSeals only stores IncorporatedResultSeal + irSeal := entity.(*flow.IncorporatedResultSeal) + sealID := irSeal.ID() + blockID := irSeal.Seal.BlockID + log := s.log.With(). + Hex("seal_id", sealID[:]). + Hex("block_id", blockID[:]). + Logger() + + // CAUTION: potential edge case: + // Upon adding a new seal, the ejector of the wrapped mempool decides to eject the element which was just added. + // In this case, the ejected seal is _not_ the secondary index. + // (a) we don't have any seals for the respective block stored in the secondary index (yet). + // (b) the secondary index contains only one seal for the block, which + // is different than the seal just ejected + set, found := s.sealsForBlock[irSeal.Seal.BlockID] + if !found { // case (a) + return + } + delete(set, irSeal.ID()) + if len(set) == 0 { + delete(s.sealsForBlock, irSeal.Seal.BlockID) + } + log.Debug().Msg("ejected seal") +} + +// Add adds the given seal to the mempool. Return value indicates whether or not seal was added to mempool. +// Error returns: +// * engine.InvalidInputError (sentinel error) +// In case a seal fails one of the required consistency checks; +func (s *ExecForkSuppressor) Add(newSeal *flow.IncorporatedResultSeal) (bool, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.execForkDetected { + return false, nil + } + + // STEP 1: ensure locally that newSeal's start and end state are non-empty values + // This wrapper is a temporary safety layer; we check all conditions that are + // required for its correct functioning locally, to not delegate safety-critical + // implementation aspects to external components + err := s.enforceValidStates(newSeal) + if err != nil { + return false, fmt.Errorf("invalid candidate seal: %w", err) + } + blockID := newSeal.Seal.BlockID + + // STEP 2: enforce that newSeal's state transition does not conflict with other stored seals for the same block + otherSeals, found := s.sealsForBlock[blockID] + if found { + // already other seal for this block in mempool => compare consistency of results' state transitions + otherSeal := getArbitraryElement(otherSeals) // cannot be nil, as otherSeals is guaranteed to always contain at least one element + err := s.enforceConsistentStateTransitions(newSeal, otherSeal) + if errors.Is(err, executionForkErr) { + s.onExecFork([]*flow.IncorporatedResultSeal{newSeal, otherSeal}) + return false, nil + } + if err != nil { + return false, fmt.Errorf("state consistency check failed: %w", err) + } + } // no conflicting state transition for this block known + + // STEP 3: add newSeal to the wrapped mempool + added, err := s.seals.Add(newSeal) // internally de-duplicates + if err != nil { + return added, fmt.Errorf("failed to add seal to wrapped mempool: %w", err) + } + if !added { // if underlying mempool did not accept the seal => nothing to do anymore + return false, nil + } + + // STEP 4: check whether wrapped mempool ejected the newSeal right away; + // important to prevent memory leak + newSealID := newSeal.ID() + if _, exists := s.seals.ByID(newSealID); !exists { + return added, nil + } + + // STEP 4: add newSeal to secondary index of this wrapper + // CAUTION: the following edge case needs to be considered: + // * the mempool only holds a single other seal (denominated as `otherSeal`) for this block + // * upon adding the new seal, the mempool might decide to eject otherSeal + // * during the ejection, we will delete the entire set from the `sealsForBlock` + // because at this time, it only held otherSeal, which was ejected + // Therefore, the value for `found` in the line below might + // be different than the value in the earlier call above. + blockSeals, found := s.sealsForBlock[blockID] + if !found { + // no other seal for this block was in mempool before => create a set for the seals for this block + blockSeals = make(sealSet) + s.sealsForBlock[blockID] = blockSeals + } + blockSeals[newSealID] = newSeal + return true, nil +} + +// All returns all the IncorporatedResultSeals in the mempool +func (s *ExecForkSuppressor) All() []*flow.IncorporatedResultSeal { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.seals.All() +} + +// ByID returns an IncorporatedResultSeal by its ID +func (s *ExecForkSuppressor) ByID(identifier flow.Identifier) (*flow.IncorporatedResultSeal, bool) { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.seals.ByID(identifier) +} + +// Rem removes the IncorporatedResultSeal with id from the mempool +func (s *ExecForkSuppressor) Rem(id flow.Identifier) bool { + s.mutex.Lock() + defer s.mutex.Unlock() + + seal, found := s.seals.ByID(id) + if found { + s.seals.Rem(id) + set, found := s.sealsForBlock[seal.Seal.BlockID] + if !found { + // In the current implementation, this cannot happen, as every entity in the mempool is also contained in sealsForBlock. + // we nevertheless perform this sanity check here, to catch future inconsistent code modifications + s.log.Fatal().Msg("inconsistent state detected: seal not in secondary index") + } + if len(set) > 1 { + delete(set, id) + } else { + delete(s.sealsForBlock, seal.Seal.BlockID) + } + } + return found +} + +// Size returns the number of items in the mempool +func (s *ExecForkSuppressor) Size() uint { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.seals.Size() +} + +// Limit returns the size limit of the mempool +func (s *ExecForkSuppressor) Limit() uint { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.seals.Limit() +} + +// Clear removes all entities from the pool. +// The wrapper clears the internal state as well as its local (additional) state. +func (s *ExecForkSuppressor) Clear() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.sealsForBlock = make(map[flow.Identifier]sealSet) + s.seals.Clear() +} + +// RegisterEjectionCallbacks adds the provided OnEjection callbacks +func (s *ExecForkSuppressor) RegisterEjectionCallbacks(callbacks ...mempool.OnEjection) { + s.seals.RegisterEjectionCallbacks(callbacks...) +} + +// enforceValidStates checks that seal has valid, non-empty, initial and final state. +// In case a seal fails the check, a detailed error message is logged and an +// engine.InvalidInputError (sentinel error) is returned. +func (s *ExecForkSuppressor) enforceValidStates(irSeal *flow.IncorporatedResultSeal) error { + result := irSeal.IncorporatedResult.Result + + if _, ok := result.InitialStateCommit(); !ok { + scjson, err := json.Marshal(irSeal) + if err != nil { + return err + } + s.log.Error(). + Str("seal", string(scjson)). + Msg("seal's execution result has no InitialStateCommit") + return engine.NewInvalidInputErrorf("seal's execution result has no InitialStateCommit: %x", result.ID()) + } + + if _, ok := result.FinalStateCommitment(); !ok { + scjson, err := json.Marshal(irSeal) + if err != nil { + return err + } + s.log.Error(). + Str("seal", string(scjson)). + Msg("seal's execution result has no FinalStateCommit") + return engine.NewInvalidInputErrorf("seal's execution result has no FinalStateCommit: %x", result.ID()) + } + + return nil +} + +// getArbitraryElement picks and returns an arbitrary element from the set. Returns nil, if set is empty. +func getArbitraryElement(set sealSet) *flow.IncorporatedResultSeal { + for _, seal := range set { + return seal + } + return nil +} + +// enforceConsistentStateTransitions checks whether the execution results in the seals +// have matching state transitions. If a fork in the execution state is detected: +// * wrapped mempool is cleared +// * internal execForkDetected flag is ste to true +// * the new value of execForkDetected is persisted to data base +// and executionForkErr (sentinel error) is returned +func (s *ExecForkSuppressor) enforceConsistentStateTransitions(irSeal, irSeal2 *flow.IncorporatedResultSeal) error { + if irSeal.IncorporatedResult.Result.ID() == irSeal2.IncorporatedResult.Result.ID() { + // happy case: candidate seals are for the same result + return nil + } + // the results for the seals have different IDs (!) + // => check whether initial and final state match in both seals + + // unsafe: we assume validity of states has been checked before + irSeal1InitialState, _ := irSeal.IncorporatedResult.Result.InitialStateCommit() + irSeal1FinalState, _ := irSeal.IncorporatedResult.Result.FinalStateCommitment() + irSeal2InitialState, _ := irSeal2.IncorporatedResult.Result.InitialStateCommit() + irSeal2FinalState, _ := irSeal2.IncorporatedResult.Result.FinalStateCommitment() + + if !bytes.Equal(irSeal1InitialState, irSeal2InitialState) || !bytes.Equal(irSeal1FinalState, irSeal2FinalState) { + log.Error().Msg("inconsistent seals for the same block") + s.seals.Clear() + s.execForkDetected = true + err := storeExecutionForkEvidence([]*flow.IncorporatedResultSeal{irSeal, irSeal2}, s.db) + if err != nil { + return fmt.Errorf("failed to update execution-fork-detected flag: %w", err) + } + return executionForkErr + } + log.Warn().Msg("seals with different ID but consistent state transition") + return nil +} + +// checkExecutionForkDetected checks the database whether evidence +// about an execution fork is stored. Returns the stored evidence. +func checkExecutionForkEvidence(db *badger.DB) ([]*flow.IncorporatedResultSeal, error) { + var conflictingSeals []*flow.IncorporatedResultSeal + err := db.View(func(tx *badger.Txn) error { + err := operation.RetrieveExecutionForkEvidence(&conflictingSeals)(tx) + if errors.Is(err, storage.ErrNotFound) { + return nil // no evidence in data base; conflictingSeals is still nil slice + } + if err != nil { + return fmt.Errorf("failed to load evidence whether or not an execution fork occured: %w", err) + } + return nil + }) + return conflictingSeals, err +} + +// storeExecutionForkEvidence stores the provided seals in the database +// as evidence for an execution fork. +func storeExecutionForkEvidence(conflictingSeals []*flow.IncorporatedResultSeal, db *badger.DB) error { + err := operation.RetryOnConflict(db.Update, func(tx *badger.Txn) error { + err := operation.InsertExecutionForkEvidence(conflictingSeals)(tx) + if errors.Is(err, storage.ErrAlreadyExists) { + // some evidence about execution fork already stored; + // we only keep the first evidence => noting more to do + return nil + } + if err != nil { + return fmt.Errorf("failed to store evidence about execution fork: %w", err) + } + return nil + }) + return err +} diff --git a/module/mempool/consensus/exec_fork_suppressor_test.go b/module/mempool/consensus/exec_fork_suppressor_test.go new file mode 100644 index 00000000000..c54af21dab4 --- /dev/null +++ b/module/mempool/consensus/exec_fork_suppressor_test.go @@ -0,0 +1,408 @@ +package consensus + +import ( + "os" + "testing" + + "github.com/dgraph-io/badger/v2" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" + actormock "github.com/onflow/flow-go/module/mempool/consensus/mock" + poolmock "github.com/onflow/flow-go/module/mempool/mock" + "github.com/onflow/flow-go/module/mempool/stdmap" + "github.com/onflow/flow-go/utils/unittest" +) + +// Test_ImplementsInterfaces is a compile-time check: +// verifies that ExecForkSuppressor implements mempool.IncorporatedResultSeals interface +func Test_ImplementsInterfaces(t *testing.T) { + var _ mempool.IncorporatedResultSeals = &ExecForkSuppressor{} +} + +// Test_Construction verifies correctness of the initial size and limit values +func Test_Construction(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + wrappedMempool.On("Size").Return(uint(0)).Once() + require.Equal(t, uint(0), wrapper.Size()) + wrappedMempool.On("Limit").Return(uint(0)).Once() + require.Equal(t, uint(0), wrapper.Limit()) + wrappedMempool.AssertExpectations(t) + }) +} + +// Test_Size checks that ExecForkSuppressor is reporting the size of the wrapped mempool +func Test_Size(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + wrappedMempool.On("Size").Return(uint(139)).Once() + require.Equal(t, uint(139), wrapper.Size()) + wrappedMempool.AssertExpectations(t) + }) +} + +// Test_Limit checks that ExecForkSuppressor is reporting the capacity limit of the wrapped mempool +func Test_Limit(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + wrappedMempool.On("Limit").Return(uint(227)).Once() + require.Equal(t, uint(227), wrapper.Limit()) + wrappedMempool.AssertExpectations(t) + }) +} + +// Test_Clear checks that, when clearing the ExecForkSuppressor: +// * the wrapper also clears the wrapped mempool; +// * the reported mempool size, _after_ clearing should be zero +func Test_Clear(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + wrappedMempool.On("Clear").Return().Once() + + wrapper.Clear() + wrappedMempool.On("Size").Return(uint(0)) + require.Equal(t, uint(0), wrapper.Size()) + wrappedMempool.AssertExpectations(t) + }) +} + +// Test_All checks that ExecForkSuppressor.All() is returning the elements of the wrapped mempool +func Test_All(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + expectedSeals := unittest.IncorporatedResultSeal.Fixtures(7) + wrappedMempool.On("All").Return(expectedSeals) + retrievedSeals := wrapper.All() + require.Equal(t, len(expectedSeals), len(retrievedSeals)) + for i := 0; i < len(expectedSeals); i++ { + require.Equal(t, expectedSeals[i].ID(), retrievedSeals[i].ID()) + } + }) +} + +// Test_Add adds IncorporatedResultSeals for +// * 2 different blocks +// * for each block, we generate one specific result, +// for which we add 3 IncorporatedResultSeals +// o IncorporatedResultSeal (1): +// incorporated in block B1 +// o IncorporatedResultSeal (2): +// incorporated in block B2 +// o IncorporatedResultSeal (3): +// same result as (1) and incorporated in same block B1; +// should be automatically de-duplicated (irrespective of approvals on the seal). +func Test_Add(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + for _, block := range unittest.BlockFixtures(2) { + result := unittest.ExecutionResultFixture(unittest.WithBlock(block)) + + // IncorporatedResultSeal (1): + irSeal1 := unittest.IncorporatedResultSeal.Fixture(unittest.IncorporatedResultSeal.WithResult(result)) + wrappedMempool.On("Add", irSeal1).Return(true, nil).Once() + wrappedMempool.On("ByID", irSeal1.ID()).Return(irSeal1, true) + added, err := wrapper.Add(irSeal1) + assert.NoError(t, err) + assert.True(t, added) + wrappedMempool.AssertExpectations(t) + + // IncorporatedResultSeal (2): + // the value for IncorporatedResultSeal.IncorporatedResult.IncorporatedBlockID is randomly + // generated and therefore, will be different than for irSeal1 + irSeal2 := unittest.IncorporatedResultSeal.Fixture(unittest.IncorporatedResultSeal.WithResult(result)) + assert.False(t, irSeal1.ID() == irSeal2.ID()) // incorporated in different block => different seal ID expected + wrappedMempool.On("Add", irSeal2).Return(true, nil).Once() + wrappedMempool.On("ByID", irSeal2.ID()).Return(irSeal2, true) + added, err = wrapper.Add(irSeal2) + assert.NoError(t, err) + assert.True(t, added) + wrappedMempool.AssertExpectations(t) + + // IncorporatedResultSeal (3): + irSeal3 := unittest.IncorporatedResultSeal.Fixture( + unittest.IncorporatedResultSeal.WithResult(result), + unittest.IncorporatedResultSeal.WithIncorporatedBlockID(irSeal1.IncorporatedResult.IncorporatedBlockID), + ) + assert.True(t, irSeal1.ID() == irSeal3.ID()) // same result incorporated same block as (1) => identical ID expected + wrappedMempool.On("Add", irSeal3).Return(false, nil).Once() // deduplicate + wrappedMempool.On("ByID", irSeal3.ID()).Return(nil, false) + added, err = wrapper.Add(irSeal3) + assert.NoError(t, err) + assert.False(t, added) + wrappedMempool.AssertExpectations(t) + } + }) +} + +// Test_Rem checks that ExecForkSuppressor.Rem() +// * delegates the call to the underlying mempool +func Test_Rem(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + // element is in wrapped mempool: Rem should be called + seal := unittest.IncorporatedResultSeal.Fixture() + wrappedMempool.On("Add", seal).Return(true, nil).Once() + wrappedMempool.On("ByID", seal.ID()).Return(seal, true) + added, err := wrapper.Add(seal) + assert.NoError(t, err) + assert.True(t, added) + + wrappedMempool.On("ByID", seal.ID()).Return(seal, true) + wrappedMempool.On("Rem", seal.ID()).Return(true).Once() + removed := wrapper.Rem(seal.ID()) + require.Equal(t, true, removed) + wrappedMempool.AssertExpectations(t) + + // element _not_ in wrapped mempool: Rem might be called + seal = unittest.IncorporatedResultSeal.Fixture() + wrappedMempool.On("ByID", seal.ID()).Return(seal, false) + wrappedMempool.On("Rem", seal.ID()).Return(false).Maybe() + removed = wrapper.Rem(seal.ID()) + require.Equal(t, false, removed) + wrappedMempool.AssertExpectations(t) + }) +} + +// Test_RejectInvalidSeals verifies that ExecForkSuppressor rejects seals whose +// end state is invalid. Specifically, this can happen if: +// * the end state of the last chunk is empty +// * there are no chunks in the result (invalid result, as system chunk is missing) +func Test_RejectInvalidSeals(t *testing.T) { + t.Run("reject seal for result with missing end state", func(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + irSeal := unittest.IncorporatedResultSeal.Fixture() + chunks := irSeal.IncorporatedResult.Result.Chunks + emptyState := make([]byte, 0) + chunks[len(chunks)-1].EndState = emptyState + irSeal.Seal.FinalState = emptyState + + added, err := wrapper.Add(irSeal) + assert.Error(t, err) + assert.True(t, engine.IsInvalidInputError(err)) + assert.False(t, added) + }) + }) + + t.Run("reject seal without chunks", func(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + irSeal := unittest.IncorporatedResultSeal.Fixture() + irSeal.IncorporatedResult.Result.Chunks = make(flow.ChunkList, 0) + irSeal.Seal.FinalState = make([]byte, 0) + + added, err := wrapper.Add(irSeal) + assert.Error(t, err) + assert.True(t, engine.IsInvalidInputError(err)) + assert.False(t, added) + }) + }) +} + +// Test_ConflictingResults verifies that ExecForkSuppressor detects a fork in the execution chain. +// The expected behaviour is: +// * clear the wrapped mempool +// * reject addition of all further entities (even valid seals) +func Test_ConflictingResults(t *testing.T) { + WithExecStateForkSuppressor(t, func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock) { + // add 3 random irSeals + irSeals := unittest.IncorporatedResultSeal.Fixtures(3) + for _, s := range irSeals { + wrappedMempool.On("Add", s).Return(true, nil).Once() + wrappedMempool.On("ByID", s.ID()).Return(s, true) + added, err := wrapper.Add(s) + assert.NoError(t, err) + assert.True(t, added) + } + + // add seal for result that is _conflicting_ with irSeals[1] + result := unittest.ExecutionResultFixture() + result.BlockID = irSeals[1].Seal.BlockID + for _, c := range result.Chunks { + c.BlockID = result.BlockID + } + conflictingSeal := unittest.IncorporatedResultSeal.Fixture(unittest.IncorporatedResultSeal.WithResult(result)) + + wrappedMempool.On("Clear").Return().Once() + execForkActor.On("OnExecFork", []*flow.IncorporatedResultSeal{conflictingSeal, irSeals[1]}).Return().Once() + added, err := wrapper.Add(conflictingSeal) + assert.NoError(t, err) + assert.False(t, added) + wrappedMempool.AssertExpectations(t) + + // mempool should be cleared + wrappedMempool.On("Size").Return(uint(0)) // we asserted that Clear was called on wrappedMempool + assert.Equal(t, uint(0), wrapper.Size()) + + // additional seals should not be accepted anymore + added, err = wrapper.Add(unittest.IncorporatedResultSeal.Fixture()) + assert.NoError(t, err) + assert.False(t, added) + assert.Equal(t, uint(0), wrapper.Size()) + wrappedMempool.AssertExpectations(t) + execForkActor.AssertExpectations(t) + }) +} + +// Test_ForkDetectionPersisted verifies that, when ExecForkSuppressor detects a fork, this information is +// persisted in the data base +func Test_ForkDetectionPersisted(t *testing.T) { + unittest.RunWithTempDir(t, func(dir string) { + db := unittest.BadgerDB(t, dir) + defer db.Close() + + // initialize ExecForkSuppressor + wrappedMempool := &poolmock.IncorporatedResultSeals{} + wrappedMempool.On("RegisterEjectionCallbacks", mock.Anything).Return() + execForkActor := &actormock.ExecForkActorMock{} + wrapper, _ := NewExecStateForkSuppressor(execForkActor.OnExecFork, wrappedMempool, db, zerolog.New(os.Stderr)) + + // add seal + block := unittest.BlockFixture() + sealA := unittest.IncorporatedResultSeal.Fixture(unittest.IncorporatedResultSeal.WithResult(unittest.ExecutionResultFixture(unittest.WithBlock(&block)))) + wrappedMempool.On("Add", sealA).Return(true, nil).Once() + wrappedMempool.On("ByID", sealA.ID()).Return(sealA, true) + _, _ = wrapper.Add(sealA) + + // add conflicting seal + sealB := unittest.IncorporatedResultSeal.Fixture(unittest.IncorporatedResultSeal.WithResult(unittest.ExecutionResultFixture(unittest.WithBlock(&block)))) + execForkActor.On("OnExecFork", []*flow.IncorporatedResultSeal{sealB, sealA}).Return().Once() + wrappedMempool.On("Clear").Return().Once() + added, _ := wrapper.Add(sealB) // should be rejected because it is conflicting with sealA + assert.False(t, added) + wrappedMempool.AssertExpectations(t) + execForkActor.AssertExpectations(t) + + // crash => re-initialization + db.Close() + db2 := unittest.BadgerDB(t, dir) + wrappedMempool2 := &poolmock.IncorporatedResultSeals{} + wrappedMempool2.On("RegisterEjectionCallbacks", mock.Anything).Return() + execForkActor2 := &actormock.ExecForkActorMock{} + execForkActor2.On("OnExecFork", mock.Anything). + Run(func(args mock.Arguments) { + conflictingSeals := args[0].([]*flow.IncorporatedResultSeal) + assert.Equal(t, 2, len(conflictingSeals)) + assert.Equal(t, sealB.ID(), conflictingSeals[0].ID()) + assert.Equal(t, sealA.ID(), conflictingSeals[1].ID()) + }).Return().Once() + wrapper2, _ := NewExecStateForkSuppressor(execForkActor2.OnExecFork, wrappedMempool2, db2, zerolog.New(os.Stderr)) + + // add another (non-conflicting) seal to ExecForkSuppressor + // fail test if seal is added to wrapped mempool + wrappedMempool2.On("Add", mock.Anything). + Run(func(args mock.Arguments) { assert.Fail(t, "seal was added to wrapped mempool") }). + Return(true, nil).Maybe() + added, _ = wrapper2.Add(unittest.IncorporatedResultSeal.Fixture()) + assert.False(t, added) + wrappedMempool2.On("Size").Return(uint(0)) // we asserted that Clear was called on wrappedMempool + assert.Equal(t, uint(0), wrapper2.Size()) + + wrappedMempool2.AssertExpectations(t) + execForkActor2.AssertExpectations(t) + }) +} + +// Test_EjectorRemovesNewSeal covers the following edge case: +// * upon adding a seal, the ejector of the wrapped mempool decides to eject the element which was just added +// We verify this by inspecting the internal data structure of ExecForkSuppressor +func Test_EjectorRemovesNewSeal(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + wrappedMempool := &poolmock.IncorporatedResultSeals{} + var ejectionCallback mempool.OnEjection + wrappedMempool.On("RegisterEjectionCallbacks", mock.Anything). + Run(func(args mock.Arguments) { ejectionCallback = args[0].(mempool.OnEjection) }). + Return() + execForkActor := &actormock.ExecForkActorMock{} + wrapper, _ := NewExecStateForkSuppressor(execForkActor.OnExecFork, wrappedMempool, db, zerolog.New(os.Stderr)) + + // as soon as a seal is added, the underlying mempool ejects it right away again + seal := unittest.IncorporatedResultSeal.Fixture() + wrappedMempool.On("Add", seal). + Run(func(args mock.Arguments) { ejectionCallback(seal) }). + Return(true, nil) + wrappedMempool.On("ByID", seal.ID()).Return(nil, false) + + added, err := wrapper.Add(seal) + require.NoError(t, err) + assert.True(t, added) + assert.Equal(t, 0, len(wrapper.sealsForBlock)) + }) +} + +// Test_AddRem_SmokeTest tests a real system of stdmap.IncorporatedResultSeals mempool +// which is wrapped in an ExecForkSuppressor. +// We add and remove lots of different seals. +func Test_AddRem_SmokeTest(t *testing.T) { + onExecFork := func([]*flow.IncorporatedResultSeal) { + assert.Fail(t, "no call to onExecFork expected ") + } + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + wrappedMempool := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(3)) + wrapper, err := NewExecStateForkSuppressor(onExecFork, wrappedMempool, db, zerolog.New(os.Stderr)) + require.NoError(t, err) + require.NotNil(t, wrapper) + + // add custom ejector to wrapped mempool to track ejected elements + var ejected map[flow.Identifier]struct{} + wrapper.RegisterEjectionCallbacks( + func(entity flow.Entity) { + ejected[entity.ID()] = struct{}{} + }) + + // Run 100 experiments of the following kind: + // * add 10 seals to mempool, which should eject 7 seals + // * test that ejected seals are not in mempool anymore + // * remove remaining seals + for i := 100; i > 0; i-- { + ejected = make(map[flow.Identifier]struct{}) + seals := unittest.IncorporatedResultSeal.Fixtures(10) + for _, s := range seals { + added, err := wrapper.Add(s) + require.NoError(t, err) + require.True(t, added) + } + + require.Equal(t, 7, len(ejected)) + require.Equal(t, uint(3), wrappedMempool.Size()) + require.Equal(t, uint(3), wrapper.Size()) + for _, s := range seals { + id := s.ID() + if _, wasEjected := ejected[id]; wasEjected { + _, found := wrapper.ByID(id) + require.False(t, found) + } else { + _, found := wrapper.ByID(id) + require.True(t, found) + wrapper.Rem(id) + + _, found = wrapper.ByID(id) + require.False(t, found) + _, found = wrappedMempool.ByID(id) + require.False(t, found) + } + } + + require.Equal(t, uint(0), wrappedMempool.Size()) + require.Equal(t, uint(0), wrapper.Size()) + require.Equal(t, 0, len(wrapper.sealsForBlock)) + } + + }) +} + +// WithExecStateForkSuppressor +// 1. constructs a mock (aka `wrappedMempool`) of an IncorporatedResultSeals mempool +// 2. wraps `wrappedMempool` in a ExecForkSuppressor +// 3. ensures that initializing the wrapper did not error +// 4. executes the `testLogic` +func WithExecStateForkSuppressor(t testing.TB, testLogic func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActorMock)) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + wrappedMempool := &poolmock.IncorporatedResultSeals{} + wrappedMempool.On("RegisterEjectionCallbacks", mock.Anything).Return() + + execForkActor := &actormock.ExecForkActorMock{} + wrapper, err := NewExecStateForkSuppressor(execForkActor.OnExecFork, wrappedMempool, db, zerolog.New(os.Stderr)) + require.NoError(t, err) + require.NotNil(t, wrapper) + testLogic(wrapper, wrappedMempool, execForkActor) + }) +} diff --git a/module/mempool/consensus/mock/exec_fork_actor.go b/module/mempool/consensus/mock/exec_fork_actor.go new file mode 100644 index 00000000000..43a8665a126 --- /dev/null +++ b/module/mempool/consensus/mock/exec_fork_actor.go @@ -0,0 +1,18 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" +) + +// ExecForkActorMock is an autogenerated mock type for the ExecForkActor type +type ExecForkActorMock struct { + mock.Mock +} + +// OnExecFork provides a mock function with given fields: _a0 +func (_m *ExecForkActorMock) OnExecFork(_a0 []*flow.IncorporatedResultSeal) { + _m.Called(_a0) +} diff --git a/module/mempool/consensus/mock/mock_actor.go b/module/mempool/consensus/mock/mock_actor.go new file mode 100644 index 00000000000..c20209499ae --- /dev/null +++ b/module/mempool/consensus/mock/mock_actor.go @@ -0,0 +1,8 @@ +package mock + +import "github.com/onflow/flow-go/model/flow" + +// ExecForkActor allows to create a mock for the ExecForkActor callback +type ExecForkActor interface { + OnExecFork([]*flow.IncorporatedResultSeal) +} diff --git a/module/mempool/ejectors/incorporated_result_seals_test.go b/module/mempool/ejectors/incorporated_result_seals_test.go index a48a7a2bb3f..a3a337d3f62 100644 --- a/module/mempool/ejectors/incorporated_result_seals_test.go +++ b/module/mempool/ejectors/incorporated_result_seals_test.go @@ -21,7 +21,7 @@ func TestLatestSealEjector(t *testing.T) { headers := storage.NewHeaders(metrics.NewNoopCollector(), db) ejector := NewLatestIncorporatedResultSeal(headers) - pool := stdmap.NewIncorporatedResultSeals(N, stdmap.WithEject(ejector.Eject)) + pool := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(N), stdmap.WithEject(ejector.Eject)) var ( maxHeader flow.Header @@ -34,7 +34,7 @@ func TestLatestSealEjector(t *testing.T) { err := headers.Store(&header) require.Nil(t, err) - seal := unittest.SealFixture() + seal := unittest.Seal.Fixture() seal.BlockID = header.ID() er := unittest.ExecutionResultFixture() @@ -46,7 +46,8 @@ func TestLatestSealEjector(t *testing.T) { }, Seal: seal, } - ok := pool.Add(ir) + ok, err := pool.Add(ir) + require.NoError(t, err) assert.True(t, ok) if header.Height >= maxHeader.Height { diff --git a/module/mempool/incorporated_result_seals.go b/module/mempool/incorporated_result_seals.go index 6f4a309bbce..4052779e01e 100644 --- a/module/mempool/incorporated_result_seals.go +++ b/module/mempool/incorporated_result_seals.go @@ -10,7 +10,7 @@ import ( // incorporated result seals type IncorporatedResultSeals interface { // Add adds an IncorporatedResultSeal to the mempool - Add(irSeal *flow.IncorporatedResultSeal) bool + Add(irSeal *flow.IncorporatedResultSeal) (bool, error) // All returns all the IncorporatedResultSeals in the mempool All() []*flow.IncorporatedResultSeal @@ -18,6 +18,9 @@ type IncorporatedResultSeals interface { // ByID returns an IncorporatedResultSeal by ID ByID(flow.Identifier) (*flow.IncorporatedResultSeal, bool) + // RegisterEjectionCallbacks adds the provided OnEjection callbacks + RegisterEjectionCallbacks(callbacks ...OnEjection) + // Limit returns the size limit of the mempool Limit() uint @@ -26,4 +29,7 @@ type IncorporatedResultSeals interface { // Size returns the number of items in the mempool Size() uint + + // Clear removes all entities from the pool. + Clear() } diff --git a/module/mempool/mock/incorporated_result_seals.go b/module/mempool/mock/incorporated_result_seals.go index 8828c8a3009..cf4db966e11 100644 --- a/module/mempool/mock/incorporated_result_seals.go +++ b/module/mempool/mock/incorporated_result_seals.go @@ -4,6 +4,7 @@ package mempool import ( flow "github.com/onflow/flow-go/model/flow" + mempool "github.com/onflow/flow-go/module/mempool" mock "github.com/stretchr/testify/mock" ) @@ -14,7 +15,7 @@ type IncorporatedResultSeals struct { } // Add provides a mock function with given fields: irSeal -func (_m *IncorporatedResultSeals) Add(irSeal *flow.IncorporatedResultSeal) bool { +func (_m *IncorporatedResultSeals) Add(irSeal *flow.IncorporatedResultSeal) (bool, error) { ret := _m.Called(irSeal) var r0 bool @@ -24,7 +25,14 @@ func (_m *IncorporatedResultSeals) Add(irSeal *flow.IncorporatedResultSeal) bool r0 = ret.Get(0).(bool) } - return r0 + var r1 error + if rf, ok := ret.Get(1).(func(*flow.IncorporatedResultSeal) error); ok { + r1 = rf(irSeal) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // All provides a mock function with given fields: @@ -66,6 +74,11 @@ func (_m *IncorporatedResultSeals) ByID(_a0 flow.Identifier) (*flow.Incorporated return r0, r1 } +// Clear provides a mock function with given fields: +func (_m *IncorporatedResultSeals) Clear() { + _m.Called() +} + // Limit provides a mock function with given fields: func (_m *IncorporatedResultSeals) Limit() uint { ret := _m.Called() @@ -80,6 +93,17 @@ func (_m *IncorporatedResultSeals) Limit() uint { return r0 } +// RegisterEjectionCallbacks provides a mock function with given fields: callbacks +func (_m *IncorporatedResultSeals) RegisterEjectionCallbacks(callbacks ...mempool.OnEjection) { + _va := make([]interface{}, len(callbacks)) + for _i := range callbacks { + _va[_i] = callbacks[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + _m.Called(_ca...) +} + // Rem provides a mock function with given fields: incorporatedResultID func (_m *IncorporatedResultSeals) Rem(incorporatedResultID flow.Identifier) bool { ret := _m.Called(incorporatedResultID) diff --git a/module/mempool/stdmap/approvals.go b/module/mempool/stdmap/approvals.go index 5a44e3a5c9a..ccbd9d0983d 100644 --- a/module/mempool/stdmap/approvals.go +++ b/module/mempool/stdmap/approvals.go @@ -4,7 +4,6 @@ package stdmap import ( "encoding/binary" - "fmt" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/mempool/model" @@ -46,10 +45,7 @@ func NewApprovals(limit uint, opts ...OptionFunc) (*Approvals, error) { approvalMapEntity := entity.(*model.ApprovalMapEntity) mempool.size -= uint(len(approvalMapEntity.Approvals)) } - err := mempool.backend.RegisterEjectionCallback(adjustSizeOnEjection) - if err != nil { - return nil, fmt.Errorf("failed to register ejection callback with Approvals mempool's backend") - } + mempool.backend.RegisterEjectionCallbacks(adjustSizeOnEjection) return mempool, nil } diff --git a/module/mempool/stdmap/backend.go b/module/mempool/stdmap/backend.go index 5464bcfb54a..0155a0b854f 100644 --- a/module/mempool/stdmap/backend.go +++ b/module/mempool/stdmap/backend.go @@ -3,11 +3,11 @@ package stdmap import ( - "fmt" "math" "sync" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" ) // Backdata implements a generic memory pool backed by a Go map. @@ -101,18 +101,18 @@ func (b *Backdata) Hash() flow.Identifier { type Backend struct { sync.RWMutex Backdata - limit uint - eject EjectFunc - ejectionCallback OnEjection + limit uint + eject EjectFunc + ejectionCallbacks []mempool.OnEjection } // NewBackend creates a new memory pool backend. func NewBackend(options ...OptionFunc) *Backend { b := Backend{ - Backdata: NewBackdata(), - limit: uint(math.MaxUint32), - eject: EjectTrueRandom, - ejectionCallback: nil, + Backdata: NewBackdata(), + limit: uint(math.MaxUint32), + eject: EjectTrueRandom, + ejectionCallbacks: nil, } for _, option := range options { option(&b) @@ -203,16 +203,11 @@ func (b *Backend) Hash() flow.Identifier { return b.Backdata.Hash() } -// RegisterEjectionCallback sets the provided OnEjection callback -// errors if another callback was already registered -func (b *Backend) RegisterEjectionCallback(callback OnEjection) error { +// RegisterEjectionCallbacks adds the provided OnEjection callbacks +func (b *Backend) RegisterEjectionCallbacks(callbacks ...mempool.OnEjection) { b.Lock() defer b.Unlock() - if b.ejectionCallback != nil { - return fmt.Errorf("OnEjection callback already set") - } - b.ejectionCallback = callback - return nil + b.ejectionCallbacks = append(b.ejectionCallbacks, callbacks...) } // reduce will reduce the size of the kept entities until we are within the @@ -235,8 +230,8 @@ func (b *Backend) reduce() { delete(b.entities, key) // notify callback - if b.ejectionCallback != nil { - b.ejectionCallback(entity) + for _, callback := range b.ejectionCallbacks { + callback(entity) } } } diff --git a/module/mempool/stdmap/backend_test.go b/module/mempool/stdmap/backend_test.go index 53324ed05e4..95be3a8ea8c 100644 --- a/module/mempool/stdmap/backend_test.go +++ b/module/mempool/stdmap/backend_test.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -101,6 +102,18 @@ func TestAdjust(t *testing.T) { }) } +// Test that size mempool deduplicates based on ID +func Test_DeduplicationByID(t *testing.T) { + item1 := fake("A") + item2 := fake("A") // identical ID, but different instance + assert.True(t, item1.ID() == item2.ID()) + + pool := NewBackend() + pool.Add(item1) + pool.Add(item2) + assert.Equal(t, uint(1), pool.Size()) +} + // TestBackend_RunLimitChecking defines a backend with size limit of `limit`. It then // starts adding `swarm`-many items concurrently to the backend each on a separate goroutine, // where `swarm` > `limit`, @@ -155,8 +168,7 @@ func TestBackend_RegisterEjectionCallback(t *testing.T) { require.False(t, pool.Has(id)) }() } - err := pool.RegisterEjectionCallback(ensureEntityNotInMempool) - require.NoError(t, err) + pool.RegisterEjectionCallbacks(ensureEntityNotInMempool) wg := sync.WaitGroup{} wg.Add(swarm) @@ -173,13 +185,53 @@ func TestBackend_RegisterEjectionCallback(t *testing.T) { require.Equal(t, uint(limit), pool.Size(), "expected mempool to be at max capacity limit") } -// TestBackend_ErrorOnRepeatedEjectionCallback verifies that the Backend errors is the -// ejection callback is set repeatedly -func TestBackend_ErrorOnRepeatedEjectionCallback(t *testing.T) { - pool := NewBackend() - err := pool.RegisterEjectionCallback(func(entity flow.Entity) {}) - require.NoError(t, err) +// TestBackend_Multiple_OnEjectionCallbacks verifies that the Backend +// handles multiple ejection callbacks correctly +func TestBackend_Multiple_OnEjectionCallbacks(t *testing.T) { + // ejection callback counts number of calls + calls := uint64(0) + callback := func(entity flow.Entity) { + atomic.AddUint64(&calls, 1) + } + + // construct backend + const ( + limit = 3 + ) + pool := NewBackend(WithLimit(limit)) + pool.RegisterEjectionCallbacks(callback, callback) + + t.Run("fill mempool up to limit", func(t *testing.T) { + addRandomEntities(t, pool, limit) + require.Equal(t, uint(limit), pool.Size(), "expected mempool to be at max capacity limit") + require.Equal(t, uint64(0), atomic.LoadUint64(&calls)) + }) + + t.Run("add elements beyond limit", func(t *testing.T) { + addRandomEntities(t, pool, 2) // as we registered callback _twice_, we should receive 2 calls per ejection + require.Equal(t, uint(limit), pool.Size(), "expected mempool to be at max capacity limit") + require.Equal(t, uint64(4), atomic.LoadUint64(&calls)) + }) + + t.Run("fill mempool up to limit", func(t *testing.T) { + atomic.StoreUint64(&calls, uint64(0)) + pool.RegisterEjectionCallbacks(callback) // now we have registered the callback three times + addRandomEntities(t, pool, 7) // => we should receive 3 calls per ejection + require.Equal(t, uint(limit), pool.Size(), "expected mempool to be at max capacity limit") + require.Equal(t, uint64(21), atomic.LoadUint64(&calls)) + }) +} - err = pool.RegisterEjectionCallback(func(entity flow.Entity) {}) - require.Error(t, err) +func addRandomEntities(t *testing.T, backend *Backend, num int) { + // add swarm-number of items to backend + wg := sync.WaitGroup{} + wg.Add(num) + for ; num > 0; num-- { + go func() { + randID := unittest.IdentifierFixture() + backend.Add(fake(randID[:])) // creates and adds a fake item to the mempool + wg.Done() + }() + } + unittest.RequireReturnsBefore(t, wg.Wait, 1*time.Second, "failed to add elements in time") } diff --git a/module/mempool/stdmap/incorporated_result_seals.go b/module/mempool/stdmap/incorporated_result_seals.go index f6abdd6b1df..3d1fff111b5 100644 --- a/module/mempool/stdmap/incorporated_result_seals.go +++ b/module/mempool/stdmap/incorporated_result_seals.go @@ -1,6 +1,9 @@ package stdmap -import "github.com/onflow/flow-go/model/flow" +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" +) // IncorporatedResultSeals implements the incorporated result seals memory pool // of the consensus nodes, used to store seals that need to be added to blocks. @@ -9,15 +12,15 @@ type IncorporatedResultSeals struct { } // NewIncorporatedResults creates a mempool for the incorporated result seals -func NewIncorporatedResultSeals(limit uint, opts ...OptionFunc) *IncorporatedResultSeals { +func NewIncorporatedResultSeals(opts ...OptionFunc) *IncorporatedResultSeals { return &IncorporatedResultSeals{ - Backend: NewBackend(append(opts, WithLimit(limit))...), + Backend: NewBackend(opts...), } } // Add adds an IncorporatedResultSeal to the mempool -func (ir *IncorporatedResultSeals) Add(seal *flow.IncorporatedResultSeal) bool { - return ir.Backend.Add(seal) +func (ir *IncorporatedResultSeals) Add(seal *flow.IncorporatedResultSeal) (bool, error) { + return ir.Backend.Add(seal), nil } // All returns all the items in the mempool @@ -42,6 +45,16 @@ func (ir *IncorporatedResultSeals) ByID(id flow.Identifier) (*flow.IncorporatedR } // Rem removes an IncorporatedResultSeal from the mempool -func (ir *IncorporatedResultSeals) Rem(incorporatedResultID flow.Identifier) bool { - return ir.Backend.Rem(incorporatedResultID) +func (ir *IncorporatedResultSeals) Rem(id flow.Identifier) bool { + return ir.Backend.Rem(id) +} + +// Clear removes all entities from the pool. +func (ir *IncorporatedResultSeals) Clear() { + ir.Backend.Clear() +} + +// RegisterEjectionCallbacks adds the provided OnEjection callbacks +func (ir *IncorporatedResultSeals) RegisterEjectionCallbacks(callbacks ...mempool.OnEjection) { + ir.Backend.RegisterEjectionCallbacks(callbacks...) } diff --git a/module/mempool/stdmap/incorporated_results.go b/module/mempool/stdmap/incorporated_results.go index c978cfbe78e..6bd30e58b54 100644 --- a/module/mempool/stdmap/incorporated_results.go +++ b/module/mempool/stdmap/incorporated_results.go @@ -2,7 +2,6 @@ package stdmap import ( "errors" - "fmt" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/mempool/model" @@ -30,10 +29,7 @@ func NewIncorporatedResults(limit uint, opts ...OptionFunc) (*IncorporatedResult incorporatedResultMap := entity.(*model.IncorporatedResultMap) mempool.size -= uint(len(incorporatedResultMap.IncorporatedResults)) } - err := mempool.backend.RegisterEjectionCallback(adjustSizeOnEjection) - if err != nil { - return nil, fmt.Errorf("failed to register ejection callback with IncorporatedResults mempool's backend") - } + mempool.backend.RegisterEjectionCallbacks(adjustSizeOnEjection) return mempool, nil } diff --git a/module/mempool/stdmap/incorporated_results_test.go b/module/mempool/stdmap/incorporated_results_test.go index a19cd028312..fea88066906 100644 --- a/module/mempool/stdmap/incorporated_results_test.go +++ b/module/mempool/stdmap/incorporated_results_test.go @@ -14,7 +14,7 @@ func TestIncorporatedResults(t *testing.T) { pool, err := NewIncorporatedResults(1000) require.NoError(t, err) - ir1 := unittest.IncorporatedResultFixture() + ir1 := unittest.IncorporatedResult.Fixture() t.Run("Adding first incorporated result", func(t *testing.T) { ok, err := pool.Add(ir1) require.True(t, ok) @@ -44,7 +44,7 @@ func TestIncorporatedResults(t *testing.T) { require.Contains(t, incorporatedResults, ir2.IncorporatedBlockID) }) - ir3 := unittest.IncorporatedResultFixture() + ir3 := unittest.IncorporatedResult.Fixture() t.Run("Adding third incorporated result", func(t *testing.T) { ok, err := pool.Add(ir3) require.True(t, ok) @@ -84,7 +84,7 @@ func TestIncorporatedResultsEjectSize(t *testing.T) { // insert 20 items (10 above limit) for i := 0; i < 20; i++ { - _, _ = pool.Add(unittest.IncorporatedResultFixture()) + _, _ = pool.Add(unittest.IncorporatedResult.Fixture()) } // 10 items should have been evicted, so size 10 diff --git a/module/mempool/stdmap/options.go b/module/mempool/stdmap/options.go index b7668a72223..309afba7024 100644 --- a/module/mempool/stdmap/options.go +++ b/module/mempool/stdmap/options.go @@ -2,17 +2,10 @@ package stdmap -import "github.com/onflow/flow-go/model/flow" - // OptionFunc is a function that can be provided to the backend on creation in // order to set a certain custom option. type OptionFunc func(*Backend) -// OnEjection is a callback which a stdmap.Backend executes on ejecting -// one of its elements. The callbacks are executed from within the thread -// that serves the stdmap.Backend. Implementations should be non-blocking. -type OnEjection func(flow.Entity) - // WithLimit can be provided to the backend on creation in order to set a custom // maximum limit of entities in the memory pool. func WithLimit(limit uint) OptionFunc { diff --git a/state/protocol/badger/mutator_test.go b/state/protocol/badger/mutator_test.go index 421c7ce9d79..8df4fad40b4 100644 --- a/state/protocol/badger/mutator_test.go +++ b/state/protocol/badger/mutator_test.go @@ -219,7 +219,7 @@ func TestBootstrapWithSeal(t *testing.T) { util.RunWithProtocolState(t, func(db *badger.DB, state *protocol.State) { block := unittest.GenesisFixture(participants) - block.Payload.Seals = []*flow.Seal{unittest.SealFixture()} + block.Payload.Seals = []*flow.Seal{unittest.Seal.Fixture()} block.Header.PayloadHash = block.Payload.Hash() result := unittest.ExecutionResultFixture() @@ -228,7 +228,7 @@ func TestBootstrapWithSeal(t *testing.T) { finalState, ok := result.FinalStateCommitment() require.True(t, ok) - seal := unittest.SealFixture() + seal := unittest.Seal.Fixture() seal.BlockID = block.ID() seal.ResultID = result.ID() seal.FinalState = finalState @@ -646,11 +646,11 @@ func TestExtendHighestSeal(t *testing.T) { require.Nil(t, err) // create seals for block2 and block3 - seal2 := unittest.SealFixture( - unittest.SealWithBlockID(block2.ID()), + seal2 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block2.ID()), ) - seal3 := unittest.SealFixture( - unittest.SealWithBlockID(block3.ID()), + seal3 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block3.ID()), ) // include the seals in block4 @@ -723,9 +723,9 @@ func TestExtendEpochTransitionValid(t *testing.T) { ) // create the seal referencing block1 and including the setup event - seal1 := unittest.SealFixture( - unittest.SealWithBlockID(block1.ID()), - unittest.WithServiceEvents(epoch2Setup.ServiceEvent()), + seal1 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block1.ID()), + unittest.Seal.WithServiceEvents(epoch2Setup.ServiceEvent()), ) // block 2 contains the epoch setup service event @@ -770,9 +770,9 @@ func TestExtendEpochTransitionValid(t *testing.T) { unittest.WithDKGFromParticipants(epoch2Participants), ) - seal2 := unittest.SealFixture( - unittest.SealWithBlockID(block2.ID()), - unittest.WithServiceEvents(epoch2Commit.ServiceEvent()), + seal2 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block2.ID()), + unittest.Seal.WithServiceEvents(epoch2Commit.ServiceEvent()), ) // block 3 contains the epoch commit service event @@ -896,15 +896,15 @@ func TestExtendConflictingEpochEvents(t *testing.T) { ) // create one seal containing the first setup event - seal1 := unittest.SealFixture( - unittest.SealWithBlockID(block1.ID()), - unittest.WithServiceEvents(nextEpochSetup1.ServiceEvent()), + seal1 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block1.ID()), + unittest.Seal.WithServiceEvents(nextEpochSetup1.ServiceEvent()), ) // create another seal containing the second setup event - seal2 := unittest.SealFixture( - unittest.SealWithBlockID(block2.ID()), - unittest.WithServiceEvents(nextEpochSetup2.ServiceEvent()), + seal2 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block2.ID()), + unittest.Seal.WithServiceEvents(nextEpochSetup2.ServiceEvent()), ) // block 3 builds on block 1, contains setup event 1 @@ -965,9 +965,9 @@ func TestExtendEpochSetupInvalid(t *testing.T) { unittest.SetupWithCounter(epoch1Setup.Counter+1), unittest.WithFinalView(epoch1Setup.FinalView+1000), ) - seal := unittest.SealFixture( - unittest.SealWithBlockID(block1.ID()), - unittest.WithServiceEvents(setup.ServiceEvent()), + seal := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block1.ID()), + unittest.Seal.WithServiceEvents(setup.ServiceEvent()), ) return setup, seal } @@ -1047,9 +1047,9 @@ func TestExtendEpochCommitInvalid(t *testing.T) { unittest.SetupWithCounter(epoch1Setup.Counter+1), unittest.WithFinalView(epoch1Setup.FinalView+1000), ) - seal := unittest.SealFixture( - unittest.SealWithBlockID(block1.ID()), - unittest.WithServiceEvents(setup.ServiceEvent()), + seal := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block1.ID()), + unittest.Seal.WithServiceEvents(setup.ServiceEvent()), ) return setup, seal } @@ -1060,9 +1060,9 @@ func TestExtendEpochCommitInvalid(t *testing.T) { unittest.WithDKGFromParticipants(epoch2Participants), ) - seal := unittest.SealFixture( - unittest.SealWithBlockID(refBlockID), - unittest.WithServiceEvents(commit.ServiceEvent()), + seal := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(refBlockID), + unittest.Seal.WithServiceEvents(commit.ServiceEvent()), ) return commit, seal } @@ -1181,9 +1181,9 @@ func TestExtendEpochTransitionWithoutCommit(t *testing.T) { ) // create the seal referencing block1 and including the setup event - seal1 := unittest.SealFixture( - unittest.SealWithBlockID(block1.ID()), - unittest.WithServiceEvents(epoch2Setup.ServiceEvent()), + seal1 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block1.ID()), + unittest.Seal.WithServiceEvents(epoch2Setup.ServiceEvent()), ) // block 2 contains the epoch setup service event @@ -1369,11 +1369,11 @@ func TestHeaderExtendHighestSeal(t *testing.T) { require.Nil(t, err) // create seals for block2 and block3 - seal2 := unittest.SealFixture( - unittest.SealWithBlockID(block2.ID()), + seal2 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block2.ID()), ) - seal3 := unittest.SealFixture( - unittest.SealWithBlockID(block3.ID()), + seal3 := unittest.Seal.Fixture( + unittest.Seal.WithBlockID(block3.ID()), ) // include the seals in block4 diff --git a/storage/badger/operation/prefix.go b/storage/badger/operation/prefix.go index a24fa1c1b5b..4b56656b2d1 100644 --- a/storage/badger/operation/prefix.go +++ b/storage/badger/operation/prefix.go @@ -65,6 +65,9 @@ const ( codeIndexCollection = 200 codeIndexExecutionResultByBlock = 202 codeIndexCollectionByTransaction = 203 + + // internal failure information that should be preserved across restarts + codeExecutionFork = 254 ) func makePrefix(code byte, keys ...interface{}) []byte { diff --git a/storage/badger/operation/seals.go b/storage/badger/operation/seals.go index 61facb7c457..d694c73705d 100644 --- a/storage/badger/operation/seals.go +++ b/storage/badger/operation/seals.go @@ -29,3 +29,15 @@ func IndexBlockSeal(blockID flow.Identifier, sealID flow.Identifier) func(*badge func LookupBlockSeal(blockID flow.Identifier, sealID *flow.Identifier) func(*badger.Txn) error { return retrieve(makePrefix(codeBlockToSeal, blockID), &sealID) } + +func InsertExecutionForkEvidence(conflictingSeals []*flow.IncorporatedResultSeal) func(*badger.Txn) error { + return insert(makePrefix(codeExecutionFork), conflictingSeals) +} + +func RemoveExecutionForkEvidence() func(*badger.Txn) error { + return remove(makePrefix(codeExecutionFork)) +} + +func RetrieveExecutionForkEvidence(conflictingSeals *[]*flow.IncorporatedResultSeal) func(*badger.Txn) error { + return retrieve(makePrefix(codeExecutionFork), conflictingSeals) +} diff --git a/storage/badger/operation/seals_test.go b/storage/badger/operation/seals_test.go index 16c6042f73a..90e1211a5d3 100644 --- a/storage/badger/operation/seals_test.go +++ b/storage/badger/operation/seals_test.go @@ -15,7 +15,7 @@ import ( func TestSealInsertCheckRetrieve(t *testing.T) { unittest.RunWithBadgerDB(t, func(db *badger.DB) { - expected := unittest.SealFixture() + expected := unittest.Seal.Fixture() err := db.Update(InsertSeal(expected.ID(), expected)) require.Nil(t, err) @@ -30,8 +30,8 @@ func TestSealInsertCheckRetrieve(t *testing.T) { func TestSealIndexAndLookup(t *testing.T) { unittest.RunWithBadgerDB(t, func(db *badger.DB) { - seal1 := unittest.SealFixture() - seal2 := unittest.SealFixture() + seal1 := unittest.Seal.Fixture() + seal2 := unittest.Seal.Fixture() seals := []*flow.Seal{seal1, seal2} diff --git a/utils/unittest/epoch_builder.go b/utils/unittest/epoch_builder.go index 71641838f0e..82ca90f3d48 100644 --- a/utils/unittest/epoch_builder.go +++ b/utils/unittest/epoch_builder.go @@ -86,7 +86,7 @@ func (builder *EpochBuilder) BuildEpoch() *EpochBuilder { for height := sealed.Height + 1; height <= A.Height; height++ { next, err := builder.state.AtHeight(height).Head() require.Nil(builder.t, err) - seals = append(seals, SealFixture(SealWithBlockID(next.ID()))) + seals = append(seals, Seal.Fixture(Seal.WithBlockID(next.ID()))) } // build block B, sealing up to and including block A @@ -109,9 +109,9 @@ func (builder *EpochBuilder) BuildEpoch() *EpochBuilder { // C contains a seal for block B and the EpochSetup event setup := EpochSetupFixture(append(setupDefaults, builder.setupOpts...)...) C := BlockWithParentFixture(B.Header) - sealForB := SealFixture( - SealWithBlockID(B.ID()), - WithServiceEvents(setup.ServiceEvent()), + sealForB := Seal.Fixture( + Seal.WithBlockID(B.ID()), + Seal.WithServiceEvents(setup.ServiceEvent()), ) C.SetPayload(flow.Payload{ Seals: []*flow.Seal{sealForB}, @@ -132,9 +132,9 @@ func (builder *EpochBuilder) BuildEpoch() *EpochBuilder { // D contains a seal for block C and the EpochCommit event commit := EpochCommitFixture(append(commitDefaults, builder.commitOpts...)...) D := BlockWithParentFixture(C.Header) - sealForC := SealFixture( - SealWithBlockID(C.ID()), - WithServiceEvents(commit.ServiceEvent()), + sealForC := Seal.Fixture( + Seal.WithBlockID(C.ID()), + Seal.WithServiceEvents(commit.ServiceEvent()), ) D.SetPayload(flow.Payload{ Seals: []*flow.Seal{sealForC}, diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index d670283a1d3..c304489201b 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -105,6 +105,15 @@ func BlockFixture() flow.Block { return BlockWithParentFixture(&header) } +func BlockFixtures(number int) []*flow.Block { + blocks := make([]*flow.Block, 0, number) + for ; number > 0; number-- { + block := BlockFixture() + blocks = append(blocks, &block) + } + return blocks +} + func ProposalFixture() *messages.BlockProposal { block := BlockFixture() return ProposalFromBlock(&block) @@ -140,7 +149,7 @@ func StateDeltaFixture() *messages.ExecutionStateDelta { func PayloadFixture(options ...func(*flow.Payload)) *flow.Payload { payload := flow.Payload{ Guarantees: CollectionGuaranteesFixture(16), - Seals: BlockSealsFixture(16), + Seals: Seal.Fixtures(16), } for _, option := range options { option(&payload) @@ -184,8 +193,8 @@ func BlockWithParentAndSeal( if sealed != nil { payload.Seals = []*flow.Seal{ - SealFixture( - SealWithBlockID(sealed.ID()), + Seal.Fixture( + Seal.WithBlockID(sealed.ID()), ), } } @@ -329,43 +338,10 @@ func CollectionGuaranteesFixture(n int, options ...func(*flow.CollectionGuarante return guarantees } -func SealFromResult(result *flow.ExecutionResult) func(*flow.Seal) { - return func(seal *flow.Seal) { - finalState, _ := result.FinalStateCommitment() - seal.ResultID = result.ID() - seal.BlockID = result.BlockID - seal.FinalState = finalState - } -} - -func SealWithBlockID(blockID flow.Identifier) func(*flow.Seal) { - return func(seal *flow.Seal) { - seal.BlockID = blockID - } -} - -func WithServiceEvents(events ...flow.ServiceEvent) func(*flow.Seal) { - return func(seal *flow.Seal) { - seal.ServiceEvents = events - } -} - -func SealFixture(opts ...func(*flow.Seal)) *flow.Seal { - seal := &flow.Seal{ - BlockID: IdentifierFixture(), - ResultID: IdentifierFixture(), - FinalState: StateCommitmentFixture(), - } - for _, apply := range opts { - apply(seal) - } - return seal -} - func BlockSealsFixture(n int) []*flow.Seal { seals := make([]*flow.Seal, 0, n) for i := 0; i < n; i++ { - seal := SealFixture() + seal := Seal.Fixture() seals = append(seals, seal) } return seals @@ -492,25 +468,6 @@ func ExecutionResultFixture(opts ...func(*flow.ExecutionResult)) *flow.Execution return result } -// TODO replace by usage unittest.IncorporatedResult -func IncorporatedResultFixture(opts ...func(*flow.IncorporatedResult)) *flow.IncorporatedResult { - result := ExecutionResultFixture() - incorporatedBlockID := IdentifierFixture() - ir := flow.NewIncorporatedResult(incorporatedBlockID, result) - - for _, apply := range opts { - apply(ir) - } - return ir -} - -// TODO replace by usage unittest.IncorporatedResult -func IncorporatedResultForBlockFixture(block *flow.Block) *flow.IncorporatedResult { - result := ExecutionResultFixture(WithBlock(block)) - incorporatedBlockID := IdentifierFixture() - return flow.NewIncorporatedResult(incorporatedBlockID, result) -} - func WithExecutionResultID(id flow.Identifier) func(*flow.ResultApproval) { return func(ra *flow.ResultApproval) { ra.Body.ExecutionResultID = id @@ -1081,7 +1038,10 @@ func BootstrapFixture(participants flow.IdentityList, opts ...func(*flow.Block)) WithFinalView(root.Header.View+1000), ) commit := EpochCommitFixture(WithDKGFromParticipants(participants), CommitWithCounter(counter)) - seal := SealFixture(SealFromResult(result), WithServiceEvents(setup.ServiceEvent(), commit.ServiceEvent())) + seal := Seal.Fixture( + Seal.WithResult(result), + Seal.WithServiceEvents(setup.ServiceEvent(), commit.ServiceEvent()), + ) return root, result, seal } diff --git a/utils/unittest/incorporated_results_seals.go b/utils/unittest/incorporated_results_seals.go new file mode 100644 index 00000000000..0f792089173 --- /dev/null +++ b/utils/unittest/incorporated_results_seals.go @@ -0,0 +1,49 @@ +package unittest + +import "github.com/onflow/flow-go/model/flow" + +var IncorporatedResultSeal incorporatedResultSealFactory + +type incorporatedResultSealFactory struct{} + +func (f *incorporatedResultSealFactory) Fixture(opts ...func(*flow.IncorporatedResultSeal)) *flow.IncorporatedResultSeal { + result := ExecutionResultFixture() + incorporatedBlockID := IdentifierFixture() + + ir := IncorporatedResult.Fixture( + IncorporatedResult.WithResult(result), + IncorporatedResult.WithIncorporatedBlockID(incorporatedBlockID), + ) + seal := Seal.Fixture(Seal.WithResult(result)) + + irSeal := &flow.IncorporatedResultSeal{ + IncorporatedResult: ir, + Seal: seal, + } + + for _, apply := range opts { + apply(irSeal) + } + return irSeal +} + +func (f *incorporatedResultSealFactory) Fixtures(n int) []*flow.IncorporatedResultSeal { + seals := make([]*flow.IncorporatedResultSeal, 0, n) + for i := 0; i < n; i++ { + seals = append(seals, IncorporatedResultSeal.Fixture()) + } + return seals +} + +func (f *incorporatedResultSealFactory) WithResult(result *flow.ExecutionResult) func(*flow.IncorporatedResultSeal) { + return func(irSeal *flow.IncorporatedResultSeal) { + IncorporatedResult.WithResult(result)(irSeal.IncorporatedResult) + Seal.WithResult(result)(irSeal.Seal) + } +} + +func (f *incorporatedResultSealFactory) WithIncorporatedBlockID(id flow.Identifier) func(*flow.IncorporatedResultSeal) { + return func(irSeal *flow.IncorporatedResultSeal) { + IncorporatedResult.WithIncorporatedBlockID(id)(irSeal.IncorporatedResult) + } +} diff --git a/utils/unittest/seals.go b/utils/unittest/seals.go new file mode 100644 index 00000000000..55aa5d5500b --- /dev/null +++ b/utils/unittest/seals.go @@ -0,0 +1,72 @@ +package unittest + +import "github.com/onflow/flow-go/model/flow" + +var Seal sealFactory + +type sealFactory struct{} + +func (f *sealFactory) Fixture(opts ...func(*flow.Seal)) *flow.Seal { + seal := &flow.Seal{ + BlockID: IdentifierFixture(), + ResultID: IdentifierFixture(), + FinalState: StateCommitmentFixture(), + AggregatedApprovalSigs: Seal.AggregatedSignatureFixtures(3), // 3 chunks + } + for _, apply := range opts { + apply(seal) + } + return seal +} + +func (f *sealFactory) Fixtures(n int) []*flow.Seal { + seals := make([]*flow.Seal, 0, n) + for i := 0; i < n; i++ { + seal := Seal.Fixture() + seals = append(seals, seal) + } + return seals +} + +func (f *sealFactory) WithResult(result *flow.ExecutionResult) func(*flow.Seal) { + return func(seal *flow.Seal) { + finalState, _ := result.FinalStateCommitment() + seal.ResultID = result.ID() + seal.BlockID = result.BlockID + seal.FinalState = finalState + seal.AggregatedApprovalSigs = Seal.AggregatedSignatureFixtures(len(result.Chunks)) + } +} + +func (f *sealFactory) WithBlockID(blockID flow.Identifier) func(*flow.Seal) { + return func(seal *flow.Seal) { + seal.BlockID = blockID + } +} + +func (f *sealFactory) WithBlock(block *flow.Header) func(*flow.Seal) { + return func(seal *flow.Seal) { + seal.BlockID = block.ID() + } +} + +func (f *sealFactory) WithServiceEvents(events ...flow.ServiceEvent) func(*flow.Seal) { + return func(seal *flow.Seal) { + seal.ServiceEvents = events + } +} + +func (f *sealFactory) AggregatedSignatureFixtures(number int) []flow.AggregatedSignature { + sigs := make([]flow.AggregatedSignature, 0, number) + for ; number > 0; number-- { + sigs = append(sigs, Seal.AggregatedSignatureFixture()) + } + return sigs +} + +func (f *sealFactory) AggregatedSignatureFixture() flow.AggregatedSignature { + return flow.AggregatedSignature{ + VerifierSignatures: SignaturesFixture(7), + SignerIDs: IdentifierListFixture(7), + } +}