Skip to content

Commit

Permalink
ChainDB: let the BlockFetch client add blocks asynchronously
Browse files Browse the repository at this point in the history
Fixes #2487.

Currently, the effective queue size when adding blocks to the ChainDB is 1 (for
why, see #2487). In this commit, we let the BlockFetch client add blocks fully
asynchronously to the ChainDB, which restores the effective queue size to the
configured value again, e.g., 10.

The BlockFetch client will no longer wait until the block has been written to
the VolatileDB (and thus also not until the block has been processed by chain
selection). The BlockFetch client can just hand over the block and continue
downloading with minimum delay. To make this possible, we change the behaviour
of `getIsFetched` and `getMaxSlotNo` to account for the blocks in the queue,
otherwise the BlockFetch client might try to redownload already-fetched blocks.

This is an alternative to #2489, which let the BlockFetch client write blocks to
the VolatileDB synchronously. The problem with that approach is that multiple
threads are writing to the VolatileDB, instead of a single background thread. We
have relied on the latter to simplify the VolatileDB w.r.t. consistency after
incomplete writes.
  • Loading branch information
mrBliss committed Nov 3, 2020
1 parent 95a7cd5 commit aa1413e
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 40 deletions.
6 changes: 3 additions & 3 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize btime
readFetchedBlocks :: STM m (Point blk -> Bool)
readFetchedBlocks = ChainDB.getIsFetched chainDB

-- Waits until the block has been written to disk, but not until chain
-- selection has processed the block.
-- Hand over the block to the ChainDB, but don't wait until it has been
-- written to disk or processed.
addFetchedBlock :: Point blk -> blk -> m ()
addFetchedBlock _pt = void . ChainDB.addBlockWaitWrittenToDisk chainDB
addFetchedBlock _pt = void . ChainDB.addBlockAsync chainDB

readFetchedMaxSlotNo :: STM m MaxSlotNo
readFetchedMaxSlotNo = ChainDB.getMaxSlotNo chainDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,11 @@ addBlockSync cdb@CDB {..} BlockToAdd { blockToAdd = b, .. } = do
let immBlockNo = AF.anchorBlockNo curChain

-- We follow the steps from section "## Adding a block" in ChainDB.md

-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead
-- of once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB and delivering the 'varBlockWrittenToDisk'
-- promise, as this is the promise the BlockFetch client waits for.
-- Otherwise, the BlockFetch client would have to wait for
--
-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead of
-- once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB so that any threads waiting on the
-- 'varBlockWrittenToDisk' promise don't have to wait for the result of
-- 'chainSelectionForFutureBlocks'.

-- ### Ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,15 @@ getBlockComponent ::
getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB

getIsFetched ::
forall m blk. IOLike m
forall m blk. (IOLike m, HasHeader blk)
=> ChainDbEnv m blk -> STM m (Point blk -> Bool)
getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB
where
-- The volatile DB indexes by hash only, not by points. However, it should
-- not be possible to have two points with the same hash but different
-- slot numbers.
basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool
basedOnHash f p =
case pointHash p of
BlockHash hash -> f hash
GenesisHash -> False
getIsFetched CDB{..} = do
checkBlocksToAdd <- memberBlocksToAdd cdbBlocksToAdd
checkVolDb <- VolatileDB.getIsMember cdbVolatileDB
return $ \pt ->
case pointToWithOriginRealPoint pt of
Origin -> False
NotOrigin pt' -> checkBlocksToAdd pt' || checkVolDb (realPointHash pt')

getIsInvalidBlock ::
forall m blk. (IOLike m, HasHeader blk)
Expand Down Expand Up @@ -194,10 +191,13 @@ getMaxSlotNo CDB{..} = do
-- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot
-- of the current chain will be 10 (being the anchor point of the empty
-- current chain), while the max slot of the VolatileDB will be 9.
curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot
<$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo
--
-- Moreover, we have to look in 'BlocksToAdd' too.
curChainMaxSlotNo <-
maxSlotNoFromWithOrigin . AF.headSlot <$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
blocksToAddMaxSlotNo <- getBlocksToAddMaxSlotNo cdbBlocksToAdd
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` blocksToAddMaxSlotNo

{-------------------------------------------------------------------------------
Unifying interface over the immutable DB and volatile DB, but independent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
-- * Future blocks
, FutureBlocks
-- * Blocks to add
, BlocksToAdd
, BlocksToAdd -- opaque
, BlockToAdd (..)
, newBlocksToAdd
, addBlockToAdd
, getBlockToAdd
, memberBlocksToAdd
, getBlocksToAddMaxSlotNo
-- * Trace types
, TraceEvent (..)
, NewTipInfo (..)
Expand All @@ -61,6 +63,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (

import Control.Tracer
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Typeable
import Data.Void (Void)
import Data.Word (Word64)
Expand All @@ -70,6 +73,7 @@ import NoThunks.Class (OnlyCheckWhnfNamed (..))
import Control.Monad.Class.MonadSTM.Strict (newEmptyTMVarIO)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.Block (MaxSlotNo (..))

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
Expand Down Expand Up @@ -424,23 +428,40 @@ type FutureBlocks blk = Map (HeaderHash blk) (Header blk)
-- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are
-- read from this queue by a background thread, which processes the blocks
-- synchronously.
newtype BlocksToAdd m blk = BlocksToAdd (TBQueue m (BlockToAdd m blk))
data BlocksToAdd m blk = BlocksToAdd {
-- TODO use a better data structure, e.g., a heap from the @heaps@
-- package. Wish list:
-- + O(1) pop min value
-- + O(log n) insert
-- + O(n) get all
-- + Bounded in size
--
-- TODO join consecutive blocks into a fragment that can be added at
-- once.
blocksToAddQueue :: !(StrictTVar m (Map (RealPoint blk) (BlockToAdd m blk)))
, blocksToAddCapacity :: !Word
}
deriving NoThunks via OnlyCheckWhnfNamed "BlocksToAdd" (BlocksToAdd m blk)

-- | Entry in the 'BlocksToAdd' queue: a block together with the 'TMVar's used
-- to implement 'AddBlockPromise'.
data BlockToAdd m blk = BlockToAdd
{ blockToAdd :: !blk
, varBlockWrittenToDisk :: !(StrictTMVar m Bool)
-- ^ Used for the 'blockWrittenToDisk' field of 'AddBlockPromise'.
, varBlockProcessed :: !(StrictTMVar m (Point blk))
-- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'.
}
data BlockToAdd m blk = BlockToAdd {
blockToAdd :: !blk
, varBlockWrittenToDisk :: !(StrictTMVar m Bool)
-- ^ Used for the 'blockWrittenToDisk' field of 'AddBlockPromise'.
, varBlockProcessed :: !(StrictTMVar m (Point blk))
-- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'.
}
deriving NoThunks via OnlyCheckWhnfNamed "BlockToAdd" (BlockToAdd m blk)

-- | Create a new 'BlocksToAdd' with the given size.
newBlocksToAdd :: IOLike m => Word -> m (BlocksToAdd m blk)
newBlocksToAdd queueSize = BlocksToAdd <$>
atomically (newTBQueue (fromIntegral queueSize))
newBlocksToAdd :: (IOLike m, HasHeader blk) => Word -> m (BlocksToAdd m blk)
newBlocksToAdd queueCapacity = do
varQueue <- newTVarIO mempty
return BlocksToAdd {
blocksToAddQueue = varQueue
, blocksToAddCapacity = queueCapacity
}

-- | Add a block to the 'BlocksToAdd' queue. Can block when the queue is full.
addBlockToAdd
Expand All @@ -449,7 +470,7 @@ addBlockToAdd
-> BlocksToAdd m blk
-> blk
-> m (AddBlockPromise m blk)
addBlockToAdd tracer (BlocksToAdd queue) blk = do
addBlockToAdd tracer (BlocksToAdd varQueue queueCapacity) blk = do
varBlockWrittenToDisk <- newEmptyTMVarIO
varBlockProcessed <- newEmptyTMVarIO
let !toAdd = BlockToAdd
Expand All @@ -458,8 +479,12 @@ addBlockToAdd tracer (BlocksToAdd queue) blk = do
, varBlockProcessed
}
queueSize <- atomically $ do
writeTBQueue queue toAdd
lengthTBQueue queue
queue <- readTVar varQueue
let queue' = Map.insert (blockRealPoint blk) toAdd queue
queueSize = Map.size queue'
check (fromIntegral queueSize <= queueCapacity)
writeTVar varQueue queue'
return queueSize
traceWith tracer $
AddedBlockToQueue (blockRealPoint blk) (fromIntegral queueSize)
return AddBlockPromise
Expand All @@ -470,7 +495,33 @@ addBlockToAdd tracer (BlocksToAdd queue) blk = do
-- | Get the oldest block from the 'BlocksToAdd' queue. Can block when the
-- queue is empty.
getBlockToAdd :: IOLike m => BlocksToAdd m blk -> m (BlockToAdd m blk)
getBlockToAdd (BlocksToAdd queue) = atomically $ readTBQueue queue
getBlockToAdd (BlocksToAdd varQueue _) = atomically $ do
queue <- readTVar varQueue
case Map.minView queue of
Nothing -> retry
Just (toProcess, queue') -> do
writeTVar varQueue queue'
return toProcess

-- | Return a function to test the membership for the given 'BlocksToAdd'.
memberBlocksToAdd ::
(IOLike m, HasHeader blk)
=> BlocksToAdd m blk
-> STM m (RealPoint blk -> Bool)
memberBlocksToAdd (BlocksToAdd varQueue _) = flip Map.member <$> readTVar varQueue

getBlocksToAddMaxSlotNo ::
IOLike m
=> BlocksToAdd m blk
-> STM m MaxSlotNo
getBlocksToAddMaxSlotNo (BlocksToAdd varQueue _) = aux <$> readTVar varQueue
where
-- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the
-- maximal key of the map has the greatest 'SlotNo'.
aux :: Map (RealPoint blk) (BlockToAdd m blk) -> MaxSlotNo
aux queue = case Map.lookupMax queue of
Nothing -> NoMaxSlotNo
Just (RealPoint s _, _) -> MaxSlotNo s

{-------------------------------------------------------------------------------
Trace types
Expand Down

0 comments on commit aa1413e

Please sign in to comment.