Skip to content

Commit

Permalink
WIP: add cdbChainWithTime
Browse files Browse the repository at this point in the history
  • Loading branch information
nfrisby committed Dec 10, 2024
1 parent b08aa97 commit f40c927
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 38 deletions.
9 changes: 0 additions & 9 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,3 @@ package ouroboros-network
if(os(windows))
constraints:
bitvec -simd

source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: 947f9b8ad41775c9488127189216b76aaab3108a
subdir:
ouroboros-network-api
ouroboros-network
--sha256: sha256-tCmNw5L8w0zqizks7Fa8wlBbQYL4/nsoKDAWpbV+Qvw=
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
, GSM.getChainSyncStates = fmap cschState <$> readTVar varChainSyncHandles
, GSM.getCurrentSelection = do
headers <- ChainDB.getCurrentChain chainDB
extLedgerState <- ChainDB.getCurrentLedger chainDB
headers <- ChainDB.getCurrentChainWithTime chainDB
extLedgerState <- ChainDB.getCurrentLedger chainDB
return (headers, ledgerState extLedgerState)
, GSM.minCaughtUpDuration = gsmMinCaughtUpDuration
, GSM.setCaughtUpPersistentMark = \upd ->
Expand Down Expand Up @@ -349,7 +349,7 @@ data InternalState m addrNTN addrNTC blk = IS {
, registry :: ResourceRegistry m
, btime :: BlockchainTime m
, chainDB :: ChainDB m blk
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) (HeaderWithTime blk) blk m
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (HeaderWithTime blk) blk m
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
, varGsmState :: StrictTVar m GSM.GsmState
Expand Down Expand Up @@ -397,7 +397,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
(ChainDB.getCurrentChain chainDB)
getUseBootstrapPeers
(GSM.gsmStateToLedgerJudgement <$> readTVar varGsmState)
blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) (HeaderWithTime blk) blk m
blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m
blockFetchInterface = BlockFetchClientInterface.mkBlockFetchConsensusInterface
(configBlock cfg)
(BlockFetchClientInterface.defaultChainDbView chainDB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Ouroboros.Consensus.Fragment.Diff (
-- * Application
, apply
-- * Manipulation
, Ouroboros.Consensus.Fragment.Diff.map
, append
, mapM
, takeWhileOldest
Expand Down Expand Up @@ -166,6 +167,18 @@ takeWhileOldest ::
takeWhileOldest accept (ChainDiff nbRollback suffix) =
ChainDiff nbRollback (AF.takeWhileOldest accept suffix)

map ::
forall a b.
( HasHeader b
, HeaderHash a ~ HeaderHash b
)
=> (a -> b)
-> ChainDiff a
-> ChainDiff b
map f (ChainDiff rollback suffix) =
ChainDiff rollback
$ AF.mapAnchoredFragment f suffix

mapM ::
forall a b m.
( HasHeader b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ module Ouroboros.Consensus.HeaderValidation (
, Ticked (..)
-- * Header with time
, HeaderWithTime (..)
, mkHeaderWithTime
) where

import Cardano.Binary (enforceSize)
Expand All @@ -79,6 +80,10 @@ import NoThunks.Class (NoThunks)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.BlockchainTime (RelativeTime)
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HardFork.Abstract
(HasHardForkHistory (hardForkSummary))
import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry
import Ouroboros.Consensus.Ledger.Basics
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Ticked
import Ouroboros.Consensus.Util (whenJust)
Expand Down Expand Up @@ -544,6 +549,31 @@ instance (Typeable blk, HasHeader (Header blk), Show (HeaderHash blk))
instance HasHeader (Header blk) => GetHeader (HeaderWithTime blk) blk where
getHeader = hwtHeader

-- | Convert 'Header' to 'HeaderWithTime'
--
-- PREREQ: The given ledger must be able to translate the slot of the given
-- header.
--
-- This is INLINEed since the summary can usually be reused.
mkHeaderWithTime ::
( HasHardForkHistory blk
, HasHeader (Header blk)
)
=> LedgerConfig blk
-> LedgerState blk
-> Header blk
-> HeaderWithTime blk
{-# INLINE mkHeaderWithTime #-}
mkHeaderWithTime cfg lst = \hdr ->
let summary = hardForkSummary cfg lst
slot = fromWithOrigin 0 $ pointSlot $ headerPoint hdr
qry = Qry.slotToWallclock slot
(slotTime, _) = Qry.runQueryPure qry summary
in HeaderWithTime {
hwtHeader = hdr
, hwtSlotRelativeTime = slotTime
}

{-------------------------------------------------------------------------------
Serialisation
-------------------------------------------------------------------------------}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ import Ouroboros.Network.SizeInBytes

-- | Abstract over the ChainDB
data ChainDbView m blk = ChainDbView {
getCurrentChain :: STM m (AnchoredFragment (Header blk))
getCurrentChain :: STM m (AnchoredFragment (Header blk))
, getCurrentChainWithTime :: STM m (AnchoredFragment (HeaderWithTime blk))
, getIsFetched :: STM m (Point blk -> Bool)
, getMaxSlotNo :: STM m MaxSlotNo
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
}

defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
defaultChainDbView chainDB = ChainDbView {
getCurrentChain = ChainDB.getCurrentChain chainDB
getCurrentChain = ChainDB.getCurrentChain chainDB
, getCurrentChainWithTime = ChainDB.getCurrentChainWithTime chainDB
, getIsFetched = ChainDB.getIsFetched chainDB
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
Expand Down Expand Up @@ -182,7 +184,7 @@ mkBlockFetchConsensusInterface ::
-- ^ Slot forge time, see 'headerForgeUTCTime' and 'blockForgeUTCTime'.
-> STM m FetchMode
-- ^ See 'readFetchMode'.
-> BlockFetchConsensusInterface peer (Header blk) (HeaderWithTime blk) blk m
-> BlockFetchConsensusInterface peer (HeaderWithTime blk) blk m
mkBlockFetchConsensusInterface
bcfg chainDB getCandidates blockFetchSize slotForgeTime readFetchMode =
BlockFetchConsensusInterface {
Expand All @@ -206,8 +208,8 @@ mkBlockFetchConsensusInterface
readCandidateChains :: STM m (Map peer (AnchoredFragment (HeaderWithTime blk)))
readCandidateChains = getCandidates

readCurrentChain :: STM m (AnchoredFragment (Header blk))
readCurrentChain = getCurrentChain chainDB
readCurrentChain :: STM m (AnchoredFragment (HeaderWithTime blk))
readCurrentChain = getCurrentChainWithTime chainDB

readFetchedBlocks :: STM m (Point blk -> Bool)
readFetchedBlocks = getIsFetched chainDB
Expand Down Expand Up @@ -287,7 +289,7 @@ mkBlockFetchConsensusInterface
-- fragment, by the time the block fetch download logic considers the
-- fragment, our current chain might have changed.
plausibleCandidateChain :: HasCallStack
=> AnchoredFragment (Header blk)
=> AnchoredFragment (HeaderWithTime blk)
-> AnchoredFragment (HeaderWithTime blk)
-> Bool
plausibleCandidateChain ours cand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ data ChainDB m blk = ChainDB {
-- fragment will move as the chain grows.
, getCurrentChain :: STM m (AnchoredFragment (Header blk))

-- | Exact same as 'getCurrentChain', except each header is annotated
-- with the 'RelativeTime' of the onset of its slot (translated according
-- to the chain it is on)
--
-- INVARIANT @'hwtHeader' <$> 'getCurrentChainWithTime' = 'getCurrentChain'@
, getCurrentChainWithTime
:: STM m (AnchoredFragment (HeaderWithTime blk))

-- | Return the LedgerDB containing the last @k@ ledger states.
, getLedgerDB :: STM m (LedgerDB' blk)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ import qualified Data.Map.Strict as Map
import Data.Maybe.Strict (StrictMaybe (..))
import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import qualified Ouroboros.Consensus.Fragment.Validated as VF
import Ouroboros.Consensus.HardFork.Abstract
import Ouroboros.Consensus.HeaderValidation (mkHeaderWithTime)
import Ouroboros.Consensus.Ledger.Extended (ledgerState)
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
Expand Down Expand Up @@ -165,8 +168,21 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
let chain = VF.validatedFragment chainAndLedger
ledger = VF.validatedLedger chainAndLedger

lcfg = configLedger (Args.cdbsTopLevelConfig cdbSpecificArgs)

-- the tip ledger state can translate the slots of the volatile
-- headers
chainWithTime =
AF.mapAnchoredFragment
(mkHeaderWithTime
lcfg
(ledgerState (LgrDB.ledgerDbCurrent ledger))
)
chain

atomically $ LgrDB.setCurrent lgrDB ledger
varChain <- newTVarIO chain
varChainWithTime <- newTVarIO chainWithTime
varTentativeState <- newTVarIO $ initialTentativeHeaderState (Proxy @blk)
varTentativeHeader <- newTVarIO SNothing
varIterators <- newTVarIO Map.empty
Expand All @@ -182,6 +198,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, cdbVolatileDB = volatileDB
, cdbLgrDB = lgrDB
, cdbChain = varChain
, cdbChainWithTime = varChainWithTime
, cdbTentativeState = varTentativeState
, cdbTentativeHeader = varTentativeHeader
, cdbIterators = varIterators
Expand All @@ -207,6 +224,8 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
{ addBlockAsync = getEnv2 h ChainSel.addBlockAsync
, chainSelAsync = getEnv h ChainSel.triggerChainSelectionAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
, getCurrentChainWithTime
= getEnvSTM h Query.getCurrentChainWithTime
, getLedgerDB = getEnvSTM h Query.getLedgerDB
, getHeaderStateHistory = getEnvSTM h Query.getHeaderStateHistory
, getTipBlock = getEnv h Query.getTipBlock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,15 @@ copyToImmutableDB CDB{..} = electric $ do
removeFromChain :: Point blk -> STM m ()
removeFromChain pt = do
-- The chain might have been extended in the meantime.
curChain <- readTVar cdbChain
case curChain of
hdr :< curChain'
curChain <- readTVar cdbChain
curChainWithTime <- readTVar cdbChainWithTime
case (curChain, curChainWithTime) of
(hdr :< curChain', _hwt :< curChainWithTime')
| headerPoint hdr == pt
-> writeTVar cdbChain curChain'
-- We're the only one removing things from 'curChain', so this cannot
-> do
writeTVar cdbChain curChain'
writeTVar cdbChainWithTime curChainWithTime'
-- We're the only one removing things from 'cdbChain', so this cannot
-- happen if the precondition was satisfied.
_ -> error "header to remove not on the current chain"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ import Ouroboros.Consensus.Fragment.ValidatedDiff
import qualified Ouroboros.Consensus.Fragment.ValidatedDiff as ValidatedDiff
import Ouroboros.Consensus.HardFork.Abstract
import qualified Ouroboros.Consensus.HardFork.History as History
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime (..))
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime (..),
mkHeaderWithTime)
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Ledger.Inspect
Expand Down Expand Up @@ -880,14 +881,32 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = electric $ do
$ getSuffix
$ getChainDiff vChainDiff
(curChain, newChain, events, prevTentativeHeader) <- atomically $ do
curChain <- readTVar cdbChain -- Not Query.getCurrentChain!
curLedger <- LgrDB.getCurrent cdbLgrDB
curChain <- readTVar cdbChain
-- Not Query.getCurrentChain!
curChainWithTime <- readTVar cdbChainWithTime
curLedger <- LgrDB.getCurrent cdbLgrDB
case Diff.apply curChain chainDiff of
-- Impossible, as described in the docstring
Nothing ->
error "chainDiff doesn't fit onto current chain"
Just newChain -> do
writeTVar cdbChain newChain
let lcfg = configLedger cdbTopLevelConfig
diffWithTime =
-- the new ledger state can translate the slots of the new
-- headers
Diff.map
(mkHeaderWithTime
lcfg
(ledgerState (LgrDB.ledgerDbCurrent newLedger))
)
chainDiff
newChainWithTime =
case Diff.apply curChainWithTime diffWithTime of
Nothing -> error "chainDiff failed for HeaderWithTime"
Just x -> x

writeTVar cdbChain newChain
writeTVar cdbChainWithTime newChainWithTime
LgrDB.setCurrent cdbLgrDB newLedger

-- Inspect the new ledger for potential problems
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query (
-- * Queries
getBlockComponent
, getCurrentChain
, getCurrentChainWithTime
, getHeaderStateHistory
, getIsFetched
, getIsInvalidBlock
Expand All @@ -31,7 +32,8 @@ import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HardFork.Abstract (HasHardForkHistory (..))
import Ouroboros.Consensus.HeaderStateHistory
(HeaderStateHistory (..), mkHeaderStateWithTimeFromSummary)
import Ouroboros.Consensus.HeaderValidation (HasAnnTip)
import Ouroboros.Consensus.HeaderValidation (HasAnnTip,
HeaderWithTime)
import Ouroboros.Consensus.Ledger.Abstract (IsLedger, LedgerState)
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Protocol.Abstract
Expand Down Expand Up @@ -79,6 +81,20 @@ getCurrentChain CDB{..} =
where
SecurityParam k = configSecurityParam cdbTopLevelConfig

-- | Same as 'getCurrentChain', /mutatis mutandi/.
getCurrentChainWithTime ::
forall m blk.
( IOLike m
, HasHeader (HeaderWithTime blk)
, ConsensusProtocol (BlockProtocol blk)
)
=> ChainDbEnv m blk
-> STM m (AnchoredFragment (HeaderWithTime blk))
getCurrentChainWithTime CDB{..} =
AF.anchorNewest k <$> readTVar cdbChainWithTime
where
SecurityParam k = configSecurityParam cdbTopLevelConfig

getLedgerDB ::
IOLike m
=> ChainDbEnv m blk -> STM m (LgrDB.LedgerDB' blk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ data ChainDbEnv m blk = CDB
--
-- Note that the \"immutable\" block will /never/ be /more/ than @k@
-- blocks back, as opposed to the anchor point of 'cdbChain'.
, cdbChainWithTime :: !(StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
-- ^ INVARIANT @fmap 'hwtHeader' 'cdbChainWithTime' = 'chbChain'@
--
-- This mutable variable is maintained separately --- but exactly in
-- parallel --- for performance reasons and modularity reasons, trading a
-- few thousand pointers to avoid extra allocation per use, more granular
-- interfaces (notably
-- 'Ouroboros.Network.BlockFetch.ConsensusInterface.BlockFetchConsensusInterface'),
-- etc.
, cdbTentativeState :: !(StrictTVar m (TentativeHeaderState blk))
, cdbTentativeHeader :: !(StrictTVar m (StrictMaybe (Header blk)))
-- ^ The tentative header, for diffusion pipelining.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,15 @@ compareAnchoredFragments cfg frag1 frag2 =
-- from our tip, although the exact distance does not matter for
-- 'compareAnchoredFragments').
preferAnchoredCandidate ::
forall blk t t'.
forall h blk.
( BlockSupportsProtocol blk
, HasCallStack
, GetHeader (t blk) blk
, HasHeader (t blk)
, GetHeader (t' blk) blk
, HasHeader (t' blk)
, HeaderHash (t blk) ~ HeaderHash (t' blk)
, GetHeader (h blk) blk
, HasHeader (h blk)
)
=> BlockConfig blk
-> AnchoredFragment (t blk) -- ^ Our chain
-> AnchoredFragment (t' blk) -- ^ Candidate
-> AnchoredFragment (h blk) -- ^ Our chain
-> AnchoredFragment (h blk) -- ^ Candidate
-> Bool
preferAnchoredCandidate cfg ours cand =
assertWithMsg (precondition ours cand) $
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
let -- Always return the empty chain such that the BlockFetch logic
-- downloads all chains.
getCurrentChain = pure $ AF.Empty AF.AnchorGenesis
getCurrentChainWithTime = pure $ AF.Empty AF.AnchorGenesis
getIsFetched = ChainDB.getIsFetched chainDB
getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
Expand All @@ -280,7 +281,7 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
mkTestBlockFetchConsensusInterface ::
STM m (Map PeerId (AnchoredFragment (HeaderWithTime TestBlock)))
-> BlockFetchClientInterface.ChainDbView m TestBlock
-> BlockFetchConsensusInterface PeerId (Header TestBlock) (HeaderWithTime TestBlock) TestBlock m
-> BlockFetchConsensusInterface PeerId (HeaderWithTime TestBlock) TestBlock m
mkTestBlockFetchConsensusInterface getCandidates chainDbView =
BlockFetchClientInterface.mkBlockFetchConsensusInterface
(TestBlockConfig numCoreNodes)
Expand Down

0 comments on commit f40c927

Please sign in to comment.