Skip to content

Commit

Permalink
Change how last starvation is recorded
Browse files Browse the repository at this point in the history
Previously, we only registered the time at which starvation started.
This is in fact not enough: if after the grace period the peer is still
not making us unstarved, we won't detect it. Instead, we record the last
starvation as either ongoing or we record its end time.
  • Loading branch information
Niols committed Jun 28, 2024
1 parent 33e98b5 commit bab4a6a
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
headPoint)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (SlotNo (SlotNo), Tip, castPoint)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))
import Test.Consensus.PointSchedule.NodeState (NodeState)
import Test.Consensus.PointSchedule.Peers (Peer (Peer), PeerId)
import Test.Util.TersePrinting (terseAnchor, terseBlock,
Expand Down Expand Up @@ -369,6 +371,8 @@ traceChainDBEventTestBlockWith tracer = \case
AddedReprocessLoEBlocksToQueue ->
trace $ "Requested ChainSel run"
_ -> pure ()
ChainDB.TraceChainSelStarvation ChainSelStarvationOngoing -> trace "ChainSel starved"
ChainDB.TraceChainSelStarvation (ChainSelStarvationEndedAt time) -> trace $ "ChainSel starvation ended at " ++ prettyTime time
_ -> pure ()
where
trace = traceUnitWith tracer "ChainDB"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(BlockFetchConsensusInterface (..), FetchMode (..),
(BlockFetchConsensusInterface (..),
ChainSelStarvation (..), FetchMode (..),
FromConsensus (..), WhetherReceivingTentativeBlocks (..))
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers,
requiresBootstrapPeers)
Expand All @@ -56,7 +57,7 @@ data ChainDbView m blk = ChainDbView {
, getIsFetched :: STM m (Point blk -> Bool)
, getMaxSlotNo :: STM m MaxSlotNo
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
, getLastTimeStarved :: STM m Time
, getChainSelStarvation :: STM m ChainSelStarvation
}

defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
Expand All @@ -65,7 +66,7 @@ defaultChainDbView chainDB = ChainDbView {
, getIsFetched = ChainDB.getIsFetched chainDB
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
, getLastTimeStarved = ChainDB.getLastTimeStarved chainDB
, getChainSelStarvation = ChainDB.getChainSelStarvation chainDB
}

-- | How to get the wall-clock time of a slot. Note that this is a very
Expand Down Expand Up @@ -351,7 +352,7 @@ mkBlockFetchConsensusInterface
headerForgeUTCTime = slotForgeTime . headerRealPoint . unFromConsensus
blockForgeUTCTime = slotForgeTime . blockRealPoint . unFromConsensus

lastChainSelStarvation = getLastTimeStarved chainDB
readChainSelStarvation = getChainSelStarvation chainDB

demoteCSJDynamo :: peer -> m ()
demoteCSJDynamo = void . atomically . Jumping.rotateDynamo csHandlesCol
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (ChainUpdate, MaxSlotNo,
Serialised (..))
import qualified Ouroboros.Network.Block as Network
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))
import Ouroboros.Network.Mock.Chain (Chain (..))
import qualified Ouroboros.Network.Mock.Chain as Chain
import System.FS.API.Types (FsError)
Expand Down Expand Up @@ -334,9 +336,9 @@ data ChainDB m blk = ChainDB {
-- invalid block is detected. These blocks are likely to be valid.
, getIsInvalidBlock :: STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))

-- | The last time we starved the chainsel thread. this is used by the
-- blockfetch decision logic to demote peers.
, getLastTimeStarved :: STM m Time
-- | Whether ChainSel is currently starved, or when was last time it
-- stopped being starved.
, getChainSelStarvation :: STM m ChainSelStarvation

, closeDB :: m ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry (WithTempRegistry,
import Ouroboros.Consensus.Util.STM (Fingerprint (..),
WithFingerprint (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

{-------------------------------------------------------------------------------
Initialization
Expand Down Expand Up @@ -176,7 +178,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
copyFuse <- newFuse "copy to immutable db"
chainSelFuse <- newFuse "chain selection"
chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs)
varLastTimeStarved <- newTVarIO =<< getMonotonicTime
varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing

let env = CDB { cdbImmutableDB = immutableDB
, cdbVolatileDB = volatileDB
Expand All @@ -201,7 +203,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, cdbChainSelQueue = chainSelQueue
, cdbFutureBlocks = varFutureBlocks
, cdbLoE = Args.cdbsLoE cdbSpecificArgs
, cdbLastTimeStarved = varLastTimeStarved
, cdbChainSelStarvation = varChainSelStarvation
}
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
let chainDB = API.ChainDB
Expand All @@ -219,7 +221,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, stream = Iterator.stream h
, newFollower = Follower.newFollower h
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, getLastTimeStarved = getEnvSTM h Query.getLastTimeStarved
, getChainSelStarvation = getEnvSTM h Query.getChainSelStarvation
, closeDB = closeDB h
, isOpen = isOpen h
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
-- exception (or it errored), notify the blocked thread
withFuse fuse $
bracketOnError
(lift $ getChainSelMessage (writeTVar cdbLastTimeStarved) cdbChainSelQueue)
(lift $ getChainSelMessage reportChainSelStarvation cdbChainSelQueue)
(\message -> lift $ atomically $ do
case message of
ChainSelReprocessLoEBlocks -> pure ()
Expand All @@ -541,3 +541,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
blockRealPoint blockToAdd
chainSelSync cdb message)
where
reportChainSelStarvation s = do
traceWith cdbTracer $ TraceChainSelStarvation s
atomically $ writeTVar cdbChainSelStarvation s
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query (
, getAnyBlockComponent
, getAnyKnownBlock
, getAnyKnownBlockComponent
, getLastTimeStarved
, getChainSelStarvation
) where

import qualified Data.Map.Strict as Map
Expand All @@ -43,6 +43,8 @@ import Ouroboros.Consensus.Util.STM (WithFingerprint (..))
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (MaxSlotNo, maxSlotNoFromWithOrigin)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

-- | Return the last @k@ headers.
--
Expand Down Expand Up @@ -149,8 +151,11 @@ getIsInvalidBlock ::
getIsInvalidBlock CDB{..} =
fmap (fmap (fmap invalidBlockReason) . flip Map.lookup) <$> readTVar cdbInvalid

getLastTimeStarved :: forall m blk. IOLike m => ChainDbEnv m blk -> STM m Time
getLastTimeStarved CDB{..} = readTVar cdbLastTimeStarved
getChainSelStarvation ::
forall m blk. IOLike m
=> ChainDbEnv m blk
-> STM m ChainSelStarvation
getChainSelStarvation CDB {..} = readTVar cdbChainSelStarvation

getIsValid ::
forall m blk. (IOLike m, HasHeader blk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
, TraceValidationEvent (..)
) where

import Cardano.Prelude (whenM)
import Control.Monad (when)
import Control.Tracer
import Data.Foldable (traverse_)
import Data.Map.Strict (Map)
Expand Down Expand Up @@ -108,6 +108,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (WithFingerprint)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

-- | All the serialisation related constraints needed by the ChainDB.
class ( ImmutableDbSerialiseConstraints blk
Expand Down Expand Up @@ -276,9 +278,9 @@ data ChainDbEnv m blk = CDB
-- switch back to a chain containing it. The fragment is usually anchored at
-- a recent immutable tip; if it does not, it will conservatively be treated
-- as the empty fragment anchored in the current immutable tip.
, cdbLastTimeStarved :: !(StrictTVar m Time)
-- ^ The last time we starved the ChainSel thread. This is used by the
-- BlockFetch decision logic to demote peers.
, cdbChainSelStarvation :: !(StrictTVar m ChainSelStarvation)
-- ^ Information on the last starvation of ChainSel, whether ongoing or
-- ended recently.
} deriving (Generic)

-- | We include @blk@ in 'showTypeOf' because it helps resolving type families
Expand Down Expand Up @@ -513,13 +515,21 @@ addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks

-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
-- queue is empty; in that case, reports the current time to the given callback.
getChainSelMessage :: IOLike m => (Time -> STM m ()) -> ChainSelQueue m blk -> m (ChainSelMessage m blk)
getChainSelMessage whenEmpty (ChainSelQueue queue) = do
time <- getMonotonicTime
-- NOTE: The two following lines are in different `atomically` on purpose.
atomically $ whenM (isEmptyTBQueue queue) (whenEmpty time)
atomically $ readTBQueue queue
-- queue is empty; in that case, reports the starvation (and its end) to the
-- callback.
getChainSelMessage
:: IOLike m
=> (ChainSelStarvation -> m ())
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage report (ChainSelQueue queue) = do
-- NOTE: The test of emptiness and the blocking read are in different STM
-- transactions on purpose.
starved <- atomically $ isEmptyTBQueue queue
when starved $ report ChainSelStarvationOngoing
message <- atomically $ readTBQueue queue
when starved $ report =<< ChainSelStarvationEndedAt <$> getMonotonicTime
return message

-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
--
Expand Down Expand Up @@ -552,6 +562,7 @@ data TraceEvent blk
| TraceLedgerReplayEvent (LgrDB.TraceReplayEvent blk)
| TraceImmutableDBEvent (ImmutableDB.TraceEvent blk)
| TraceVolatileDBEvent (VolatileDB.TraceEvent blk)
| TraceChainSelStarvation ChainSelStarvation
deriving (Generic)


Expand Down

0 comments on commit bab4a6a

Please sign in to comment.