Skip to content

Commit

Permalink
handle race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Jan 24, 2024
1 parent 96f0f62 commit 0b6a5ba
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
45 changes: 34 additions & 11 deletions engine/execution/ingestion/block_queue/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/rs/zerolog"
)

var ErrMissingParent = fmt.Errorf("missing parent block")

// BlockQueue keeps track of state of blocks and determines which blocks are executable
// A block becomes executable when all the following conditions are met:
// 1. the block has been validated by consensus algorithm
Expand All @@ -18,8 +20,14 @@ type BlockQueue struct {
sync.Mutex
log zerolog.Logger

// when receiving a new block, adding it to the map, and add missing collections to the map
// if a block still exists in this map, it means either some of its collection is missing,
// or its parent block has not been executed.
// if a block's StartState is not nil, it means its parent block has been executed, and
// its parent block must have been removed from this map
// if a block's StartState is nil, it means its parent block has not been executed yet.
// and its parent must be found in the this map as well
blocks map[flow.Identifier]*entity.ExecutableBlock

// a collection could be included in multiple blocks,
// when a missing block is received, it might trigger multiple blocks to be executable, which
// can be looked up by the map
Expand All @@ -30,8 +38,8 @@ type BlockQueue struct {

// blockIDsByHeight is used to find next executable block.
// when a block is executed, the next executable block must be a block with height = current block height + 1
// the following map allows us to find the next executable block by height
blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock // for finding next executable block
// the following map allows us to find the next executable block by height and their parent block ID
blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock
}

type MissingCollection struct {
Expand Down Expand Up @@ -62,11 +70,13 @@ func NewBlockQueue(logger zerolog.Logger) *BlockQueue {
}
}

// OnBlock is called when a new block is received, and its parent is not executed.
// It returns a list of missing collections and a list of executable blocks
// Note: caller must ensure when OnBlock is called with a block,
// if its parent is not executed, then the parent must be added to the queue first.
// if its parent is executed, then the parent's finalState must be passed in.
// OnBlock is called when a new block is received, the parentFinalState indicates
// whether its parent block has been executed.
// Caller must ensure:
// 1. blocks are passsed in order, i.e. parent block is passed in before its child block
// 2. if a block's parent is not executed, then the parent block must be passed in first
// 3. if a block's parent is executed, then the parent's finalState must be passed in
// It returns (nil, nil, nil) if this block is a duplication
func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) (
[]*MissingCollection, // missing collections
[]*entity.ExecutableBlock, // blocks ready to execute
Expand Down Expand Up @@ -125,8 +135,9 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm
if parentFinalState == nil {
_, parentExists := q.blocks[block.Header.ParentID]
if !parentExists {
return nil, nil, fmt.Errorf("parent block %s of block %s is not in the queue",
block.Header.ParentID, blockID)
return nil, nil,
fmt.Errorf("block %s has no parent commitment, but its parent block %s does not exist in the queue: %w",
blockID, block.Header.ParentID, ErrMissingParent)
}
}

Expand Down Expand Up @@ -235,6 +246,8 @@ func (q *BlockQueue) OnCollection(collection *flow.Collection) ([]*entity.Execut

// OnBlockExecuted is called when a block is executed
// It returns a list of executable blocks (usually its child blocks)
// The caller has to ensure OnBlockExecuted is not called in a wrong order, such as
// OnBlockExecuted(childBlock) being called before OnBlockExecuted(parentBlock).
func (q *BlockQueue) OnBlockExecuted(
blockID flow.Identifier,
commit flow.StateCommitment,
Expand All @@ -258,6 +271,15 @@ func (q *BlockQueue) onBlockExecuted(
return nil, nil
}

// sanity check
// if a block exists in the queue and is executed, then its parent block
// must not exist in the queue, otherwise the state is inconsistent
_, parentExists := q.blocks[block.Block.Header.ParentID]
if parentExists {
return nil, fmt.Errorf("parent block %s of block %s is in the queue",
block.Block.Header.ParentID, blockID)
}

delete(q.blocks, blockID)

// remove height index
Expand Down Expand Up @@ -326,7 +348,8 @@ func (q *BlockQueue) checkIfChildBlockBecomeExecutable(
return executables, nil
}

// GetMissingCollections returns the missing collections and the start state
// GetMissingCollections returns the missing collections and the start state for the given block
// Useful for debugging what is missing for the next unexecuted block to become executable.
// It returns an error if the block is not found
func (q *BlockQueue) GetMissingCollections(blockID flow.Identifier) (
[]*MissingCollection, *flow.StateCommitment, error) {
Expand Down
24 changes: 22 additions & 2 deletions engine/execution/ingestion/block_queue/block_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package block_queue

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) {
require.Empty(t, executables)
requireCollectionHas(t, missing, c1)

// block A is executable
// block A has all the collections and become executable
executables, err = q.OnCollection(c1)
require.NoError(t, err)
requireExecutableHas(t, executables, blockA)
Expand All @@ -294,11 +295,30 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) {
requireExecutableHas(t, executables)
requireQueueIsEmpty(t, q)

missing, executables, err = q.OnBlock(blockB, nil)
// verify when race condition happens, ErrMissingParent will be returned
_, _, err = q.OnBlock(blockB, nil)
require.True(t, errors.Is(err, ErrMissingParent), err)

// verify if called again with parent commit, it will be successful
missing, executables, err = q.OnBlock(blockB, commitFor("A"))
require.NoError(t, err)
require.Empty(t, executables)
requireCollectionHas(t, missing, c2, c3)

// verify after receiving all collections, B becomes executable
executables, err = q.OnCollection(c2)
require.NoError(t, err)
require.Empty(t, executables)

executables, err = q.OnCollection(c3)
require.NoError(t, err)
requireExecutableHas(t, executables, blockB)

// verify after B is executed, the queue is empty
executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B"))
require.NoError(t, err)
requireExecutableHas(t, executables)
requireQueueIsEmpty(t, q)
}

/* ==== Test utils ==== */
Expand Down

0 comments on commit 0b6a5ba

Please sign in to comment.