From aa1413e33c63a099d1e710f02782a4064d028cc9 Mon Sep 17 00:00:00 2001 From: Thomas Winant <thomas@well-typed.com> Date: Mon, 2 Nov 2020 11:30:37 +0100 Subject: [PATCH] ChainDB: let the BlockFetch client add blocks asynchronously Fixes #2487. Currently, the effective queue size when adding blocks to the ChainDB is 1 (for why, see #2487). In this commit, we let the BlockFetch client add blocks fully asynchronously to the ChainDB, which restores the effective queue size to the configured value again, e.g., 10. The BlockFetch client will no longer wait until the block has been written to the VolatileDB (and thus also not until the block has been processed by chain selection). The BlockFetch client can just hand over the block and continue downloading with minimum delay. To make this possible, we change the behaviour of `getIsFetched` and `getMaxSlotNo` to account for the blocks in the queue, otherwise the BlockFetch client might try to redownload already-fetched blocks. This is an alternative to #2489, which let the BlockFetch client write blocks to the VolatileDB synchronously. The problem with that approach is that multiple threads are writing to the VolatileDB, instead of a single background thread. We have relied on the latter to simplify the VolatileDB w.r.t. consistency after incomplete writes. --- .../src/Ouroboros/Consensus/NodeKernel.hs | 6 +- .../Storage/ChainDB/Impl/ChainSel.hs | 11 ++- .../Consensus/Storage/ChainDB/Impl/Query.hs | 30 +++---- .../Consensus/Storage/ChainDB/Impl/Types.hs | 83 +++++++++++++++---- 4 files changed, 90 insertions(+), 40 deletions(-) diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs index eee49e55632..ab05160ac36 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs @@ -262,10 +262,10 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize btime readFetchedBlocks :: STM m (Point blk -> Bool) readFetchedBlocks = ChainDB.getIsFetched chainDB - -- Waits until the block has been written to disk, but not until chain - -- selection has processed the block. + -- Hand over the block to the ChainDB, but don't wait until it has been + -- written to disk or processed. addFetchedBlock :: Point blk -> blk -> m () - addFetchedBlock _pt = void . ChainDB.addBlockWaitWrittenToDisk chainDB + addFetchedBlock _pt = void . ChainDB.addBlockAsync chainDB readFetchedMaxSlotNo :: STM m MaxSlotNo readFetchedMaxSlotNo = ChainDB.getMaxSlotNo chainDB diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs index c383755c38d..188015ec881 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs @@ -253,12 +253,11 @@ addBlockSync cdb@CDB {..} BlockToAdd { blockToAdd = b, .. } = do let immBlockNo = AF.anchorBlockNo curChain -- We follow the steps from section "## Adding a block" in ChainDB.md - - -- Note: we call 'chainSelectionForFutureBlocks' in all branches instead - -- of once, before branching, because we want to do it /after/ writing the - -- block to the VolatileDB and delivering the 'varBlockWrittenToDisk' - -- promise, as this is the promise the BlockFetch client waits for. - -- Otherwise, the BlockFetch client would have to wait for + -- + -- Note: we call 'chainSelectionForFutureBlocks' in all branches instead of + -- once, before branching, because we want to do it /after/ writing the + -- block to the VolatileDB so that any threads waiting on the + -- 'varBlockWrittenToDisk' promise don't have to wait for the result of -- 'chainSelectionForFutureBlocks'. -- ### Ignore diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs index 403dff098b4..9520eaac233 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs @@ -146,18 +146,15 @@ getBlockComponent :: getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB getIsFetched :: - forall m blk. IOLike m + forall m blk. (IOLike m, HasHeader blk) => ChainDbEnv m blk -> STM m (Point blk -> Bool) -getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB - where - -- The volatile DB indexes by hash only, not by points. However, it should - -- not be possible to have two points with the same hash but different - -- slot numbers. - basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool - basedOnHash f p = - case pointHash p of - BlockHash hash -> f hash - GenesisHash -> False +getIsFetched CDB{..} = do + checkBlocksToAdd <- memberBlocksToAdd cdbBlocksToAdd + checkVolDb <- VolatileDB.getIsMember cdbVolatileDB + return $ \pt -> + case pointToWithOriginRealPoint pt of + Origin -> False + NotOrigin pt' -> checkBlocksToAdd pt' || checkVolDb (realPointHash pt') getIsInvalidBlock :: forall m blk. (IOLike m, HasHeader blk) @@ -194,10 +191,13 @@ getMaxSlotNo CDB{..} = do -- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot -- of the current chain will be 10 (being the anchor point of the empty -- current chain), while the max slot of the VolatileDB will be 9. - curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot - <$> readTVar cdbChain - volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB - return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo + -- + -- Moreover, we have to look in 'BlocksToAdd' too. + curChainMaxSlotNo <- + maxSlotNoFromWithOrigin . AF.headSlot <$> readTVar cdbChain + volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB + blocksToAddMaxSlotNo <- getBlocksToAddMaxSlotNo cdbBlocksToAdd + return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` blocksToAddMaxSlotNo {------------------------------------------------------------------------------- Unifying interface over the immutable DB and volatile DB, but independent diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index 50c5cf9c6ce..d18df50b627 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -41,11 +41,13 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( -- * Future blocks , FutureBlocks -- * Blocks to add - , BlocksToAdd + , BlocksToAdd -- opaque , BlockToAdd (..) , newBlocksToAdd , addBlockToAdd , getBlockToAdd + , memberBlocksToAdd + , getBlocksToAddMaxSlotNo -- * Trace types , TraceEvent (..) , NewTipInfo (..) @@ -61,6 +63,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( import Control.Tracer import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map import Data.Typeable import Data.Void (Void) import Data.Word (Word64) @@ -70,6 +73,7 @@ import NoThunks.Class (OnlyCheckWhnfNamed (..)) import Control.Monad.Class.MonadSTM.Strict (newEmptyTMVarIO) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) +import Ouroboros.Network.Block (MaxSlotNo (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config @@ -424,23 +428,40 @@ type FutureBlocks blk = Map (HeaderHash blk) (Header blk) -- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are -- read from this queue by a background thread, which processes the blocks -- synchronously. -newtype BlocksToAdd m blk = BlocksToAdd (TBQueue m (BlockToAdd m blk)) +data BlocksToAdd m blk = BlocksToAdd { + -- TODO use a better data structure, e.g., a heap from the @heaps@ + -- package. Wish list: + -- + O(1) pop min value + -- + O(log n) insert + -- + O(n) get all + -- + Bounded in size + -- + -- TODO join consecutive blocks into a fragment that can be added at + -- once. + blocksToAddQueue :: !(StrictTVar m (Map (RealPoint blk) (BlockToAdd m blk))) + , blocksToAddCapacity :: !Word + } deriving NoThunks via OnlyCheckWhnfNamed "BlocksToAdd" (BlocksToAdd m blk) -- | Entry in the 'BlocksToAdd' queue: a block together with the 'TMVar's used -- to implement 'AddBlockPromise'. -data BlockToAdd m blk = BlockToAdd - { blockToAdd :: !blk - , varBlockWrittenToDisk :: !(StrictTMVar m Bool) - -- ^ Used for the 'blockWrittenToDisk' field of 'AddBlockPromise'. - , varBlockProcessed :: !(StrictTMVar m (Point blk)) - -- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'. - } +data BlockToAdd m blk = BlockToAdd { + blockToAdd :: !blk + , varBlockWrittenToDisk :: !(StrictTMVar m Bool) + -- ^ Used for the 'blockWrittenToDisk' field of 'AddBlockPromise'. + , varBlockProcessed :: !(StrictTMVar m (Point blk)) + -- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'. + } + deriving NoThunks via OnlyCheckWhnfNamed "BlockToAdd" (BlockToAdd m blk) -- | Create a new 'BlocksToAdd' with the given size. -newBlocksToAdd :: IOLike m => Word -> m (BlocksToAdd m blk) -newBlocksToAdd queueSize = BlocksToAdd <$> - atomically (newTBQueue (fromIntegral queueSize)) +newBlocksToAdd :: (IOLike m, HasHeader blk) => Word -> m (BlocksToAdd m blk) +newBlocksToAdd queueCapacity = do + varQueue <- newTVarIO mempty + return BlocksToAdd { + blocksToAddQueue = varQueue + , blocksToAddCapacity = queueCapacity + } -- | Add a block to the 'BlocksToAdd' queue. Can block when the queue is full. addBlockToAdd @@ -449,7 +470,7 @@ addBlockToAdd -> BlocksToAdd m blk -> blk -> m (AddBlockPromise m blk) -addBlockToAdd tracer (BlocksToAdd queue) blk = do +addBlockToAdd tracer (BlocksToAdd varQueue queueCapacity) blk = do varBlockWrittenToDisk <- newEmptyTMVarIO varBlockProcessed <- newEmptyTMVarIO let !toAdd = BlockToAdd @@ -458,8 +479,12 @@ addBlockToAdd tracer (BlocksToAdd queue) blk = do , varBlockProcessed } queueSize <- atomically $ do - writeTBQueue queue toAdd - lengthTBQueue queue + queue <- readTVar varQueue + let queue' = Map.insert (blockRealPoint blk) toAdd queue + queueSize = Map.size queue' + check (fromIntegral queueSize <= queueCapacity) + writeTVar varQueue queue' + return queueSize traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) (fromIntegral queueSize) return AddBlockPromise @@ -470,7 +495,33 @@ addBlockToAdd tracer (BlocksToAdd queue) blk = do -- | Get the oldest block from the 'BlocksToAdd' queue. Can block when the -- queue is empty. getBlockToAdd :: IOLike m => BlocksToAdd m blk -> m (BlockToAdd m blk) -getBlockToAdd (BlocksToAdd queue) = atomically $ readTBQueue queue +getBlockToAdd (BlocksToAdd varQueue _) = atomically $ do + queue <- readTVar varQueue + case Map.minView queue of + Nothing -> retry + Just (toProcess, queue') -> do + writeTVar varQueue queue' + return toProcess + +-- | Return a function to test the membership for the given 'BlocksToAdd'. +memberBlocksToAdd :: + (IOLike m, HasHeader blk) + => BlocksToAdd m blk + -> STM m (RealPoint blk -> Bool) +memberBlocksToAdd (BlocksToAdd varQueue _) = flip Map.member <$> readTVar varQueue + +getBlocksToAddMaxSlotNo :: + IOLike m + => BlocksToAdd m blk + -> STM m MaxSlotNo +getBlocksToAddMaxSlotNo (BlocksToAdd varQueue _) = aux <$> readTVar varQueue + where + -- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the + -- maximal key of the map has the greatest 'SlotNo'. + aux :: Map (RealPoint blk) (BlockToAdd m blk) -> MaxSlotNo + aux queue = case Map.lookupMax queue of + Nothing -> NoMaxSlotNo + Just (RealPoint s _, _) -> MaxSlotNo s {------------------------------------------------------------------------------- Trace types