From 0effab01bfc8eb9231f01a04cf9b9f63f3e16acc Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Fri, 18 Mar 2022 12:35:10 +0100 Subject: [PATCH] refactor(journal): cache values that don't change when reading --- doc/demo-journal/slides-journal.md | 3 - src/journal/extra/assert/skiprun/Assert.hs | 6 +- src/journal/src/Journal.hs | 106 ++++++++++++++---- .../src/Journal/Internal/ByteBufferPtr.hs | 3 +- src/journal/src/Journal/MP.hs | 33 +++--- src/journal/src/Journal/Types.hs | 31 ++--- src/sut/dumblog/bench/Common.hs | 4 +- src/sut/dumblog/cabal.project | 1 + .../dumblog/src/Dumblog/Journal/FrontEnd.hs | 3 +- src/sut/dumblog/src/Dumblog/Journal/Main.hs | 3 +- 10 files changed, 134 insertions(+), 59 deletions(-) diff --git a/doc/demo-journal/slides-journal.md b/doc/demo-journal/slides-journal.md index 80363f0d..5df3e374 100644 --- a/doc/demo-journal/slides-journal.md +++ b/doc/demo-journal/slides-journal.md @@ -223,9 +223,6 @@ done # Disable turbo boost. echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo -# Allow for more open file descriptors. -ulimit -n unlimited - # The following run is just a (CPU) warm up, the results are discarded. cabal run bench-sqlite diff --git a/src/journal/extra/assert/skiprun/Assert.hs b/src/journal/extra/assert/skiprun/Assert.hs index 2c002110..155705f9 100644 --- a/src/journal/extra/assert/skiprun/Assert.hs +++ b/src/journal/extra/assert/skiprun/Assert.hs @@ -1,4 +1,4 @@ -module Assert (assert, assertM, assertIO) where +module Assert (assert, assertM, assertIO, assertMMsg) where import GHC.Stack (HasCallStack) @@ -13,3 +13,7 @@ assertM _ = pure () {-# INLINE assertIO #-} assertIO :: IO Bool -> IO () assertIO _ = pure () + +{-# INLINE assertMMsg #-} +assertMMsg :: Monad m => String -> Bool -> m () +assertMMsg _msg _b = return () diff --git a/src/journal/src/Journal.hs b/src/journal/src/Journal.hs index ee2ef577..cbf15b9a 100644 --- a/src/journal/src/Journal.hs +++ b/src/journal/src/Journal.hs @@ -126,7 +126,11 @@ startJournal fp (Options termLength logger _maxSub) = do writePosition bb (Position offset) writeLimit bb (Limit (offset + termLength)) slice bb - return (Journal termBuffers (Metadata meta) logger) + + initTermId <- readInitialTermId (Metadata meta) + + return (Journal termBuffers (Metadata meta) logger + (int2Int32 termLength) (positionBitsToShift (int2Int32 termLength)) initTermId) ------------------------------------------------------------------------ @@ -178,12 +182,13 @@ recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock ptr len readJournal :: Journal -> Subscriber -> IO (Maybe ByteString) readJournal jour sub = do offset <- readBytesConsumed (jMetadata jour) sub - let jLog = logg (jLogger jour) - jLog ("readJournal, offset: " ++ show offset) + -- let jLog = logg (jLogger jour) + -- jLog ("readJournal, offset: " ++ show offset) - termLen <- readTermLength (jMetadata jour) - let readIndex = indexByPosition (int2Int64 offset) (positionBitsToShift termLen) - jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex)) + let termLen = jTermLength jour + posBitsToShift = jPositionBitsToShift jour + readIndex = indexByPosition (int2Int64 offset) posBitsToShift + -- jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex)) termCount <- activeTermCount (jMetadata jour) let activeTermIndex = indexByTermCount termCount @@ -191,19 +196,19 @@ readJournal jour sub = do let termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex activeTermId = rawTailTermId rawTail termOffset = rawTailTermOffset rawTail termLen + initTermId = jInitialTermId jour - jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset)) - initTermId <- readInitialTermId (jMetadata jour) - jLog ("readJournal, initTermId: " ++ show (unTermId initTermId)) + -- jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset)) + -- jLog ("readJournal, initTermId: " ++ show (unTermId initTermId)) let position = - computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId + computePosition activeTermId termOffset posBitsToShift initTermId assertM (int2Int64 offset <= position) let readTermCount = - computeTermIdFromPosition (int2Int64 offset) (positionBitsToShift termLen) initTermId - - unTermId initTermId + computeTermIdFromPosition (int2Int64 offset) posBitsToShift initTermId - + unTermId initTermId - jLog ("readJournal, readTermCount: " ++ show readTermCount) + -- jLog ("readJournal, readTermCount: " ++ show readTermCount) if int2Int64 offset == position then return Nothing @@ -211,26 +216,87 @@ readJournal jour sub = do assertM (int2Int64 offset < position) let relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) - readTermCount * termLen - jLog ("readJournal, relativeOffset: " ++ show relativeOffset) + -- jLog ("readJournal, relativeOffset: " ++ show relativeOffset) tag <- readFrameType termBuffer (TermOffset relativeOffset) - jLog ("readJournal, tag: " ++ show tag) + -- jLog ("readJournal, tag: " ++ show tag) HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset) - jLog ("readJournal, len: " ++ show len) + -- jLog ("readJournal, len: " ++ show len) if tag == Padding then do assertM (len >= 0) incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT) - jLog "readJournal, skipping padding..." + -- jLog "readJournal, skipping padding..." readJournal jour sub else do assertM (len > 0) - jLog ("readJournal, termCount: " ++ show (unTermCount termCount)) + -- jLog ("readJournal, termCount: " ++ show (unTermCount termCount)) bs <- getByteStringAt termBuffer (int322Int relativeOffset + hEADER_LENGTH) (int322Int len - hEADER_LENGTH) assertM (BS.length bs == int322Int len - hEADER_LENGTH) incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT) return (Just bs) +{- + +readManyJournal' :: Journal -> Subscriber -> [ByteString] -> IO [ByteString] +readManyJournal' jour sub acc = do + offset <- readBytesConsumed (jMetadata jour) sub + let jLog = logg (jLogger jour) + jLog ("readJournal, offset: " ++ show offset) + + termLen <- readTermLength (jMetadata jour) + let readIndex = indexByPosition (int2Int64 offset) (positionBitsToShift termLen) + jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex)) + + termCount <- activeTermCount (jMetadata jour) + let activeTermIndex = indexByTermCount termCount + rawTail <- readRawTail (jMetadata jour) activeTermIndex + let termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex + activeTermId = rawTailTermId rawTail + termOffset = rawTailTermOffset rawTail termLen + + jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset)) + initTermId <- readInitialTermId (jMetadata jour) + jLog ("readJournal, initTermId: " ++ show (unTermId initTermId)) + let position = + computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId + assertM (int2Int64 offset <= position) + + let readTermCount = + computeTermIdFromPosition (int2Int64 offset) (positionBitsToShift termLen) initTermId + - unTermId initTermId + + jLog ("readJournal, readTermCount: " ++ show readTermCount) + + go [] offset + where + go offset acc + | int2Int64 offset == position = return (reverse acc) + | otherwise = do + assertM (int2Int64 offset < position) + let relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) - + readTermCount * termLen + jLog ("readJournal, relativeOffset: " ++ show relativeOffset) + tag <- readFrameType termBuffer (TermOffset relativeOffset) + jLog ("readJournal, tag: " ++ show tag) + HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset) + jLog ("readJournal, len: " ++ show len) + if tag == Padding + then do + assertM (len >= 0) + incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT) + jLog "readJournal, skipping padding..." + readManyJournal' jour sub acc + else do + assertM (len > 0) + jLog ("readJournal, termCount: " ++ show (unTermCount termCount)) + bs <- getByteStringAt termBuffer + (int322Int relativeOffset + hEADER_LENGTH) + (int322Int len - hEADER_LENGTH) + assertM (BS.length bs == int322Int len - hEADER_LENGTH) + incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT) + go (offset + align (int322Int len) fRAME_ALIGNMENT) (bs : acc) +-} ------------------------------------------------------------------------ @@ -289,10 +355,10 @@ metricsBytesWritten :: Journal -> IO Int64 metricsBytesWritten jour = do let meta = jMetadata jour termCount <- activeTermCount meta - initTermId <- readInitialTermId meta termLen <- readTermLength meta let index = indexByTermCount termCount rawTail <- readRawTail meta index let termId = rawTailTermId rawTail termOffset = rawTailTermOffset rawTail termLen - return (computePosition termId termOffset (positionBitsToShift termLen) initTermId) + return (computePosition termId termOffset (jPositionBitsToShift jour) + (jInitialTermId jour)) diff --git a/src/journal/src/Journal/Internal/ByteBufferPtr.hs b/src/journal/src/Journal/Internal/ByteBufferPtr.hs index 3af6a2fb..81b572c5 100644 --- a/src/journal/src/Journal/Internal/ByteBufferPtr.hs +++ b/src/journal/src/Journal/Internal/ByteBufferPtr.hs @@ -141,7 +141,7 @@ checkInRange val (lower, upper) name ] boundCheck :: HasCallStack => ByteBuffer -> Int -> Int -> IO () -boundCheck bb ix size = do +boundCheck bb ix size = return () {- do invariant bb -- XXX: use Word for size? -- XXX: parametrise on build flag and only do these checks if enabled? @@ -169,6 +169,7 @@ invariant bb = do assertM (mark <= pos) assertM (pos <= lim) assertM (lim - slice <= capa) +-} ------------------------------------------------------------------------ -- * Create diff --git a/src/journal/src/Journal/MP.hs b/src/journal/src/Journal/MP.hs index a0761aa0..77ac5edd 100644 --- a/src/journal/src/Journal/MP.hs +++ b/src/journal/src/Journal/MP.hs @@ -49,12 +49,13 @@ recvBytes bc sock len = recvBytesOffset bc sock hEADER_LENGTH len readJournal :: Journal -> Subscriber -> IO (Maybe ByteString) readJournal jour sub = do offset <- readBytesConsumed (jMetadata jour) sub - let jLog = logg (jLogger jour) - jLog ("readJournal, offset: " ++ show offset) + -- let jLog = logg (jLogger jour) + -- jLog ("readJournal, offset: " ++ show offset) - termLen <- readTermLength (jMetadata jour) - let readIndex = indexByPosition (int2Int64 offset) (positionBitsToShift termLen) - jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex)) + let termLen = jTermLength jour + posBitsToShift = jPositionBitsToShift jour + readIndex = indexByPosition (int2Int64 offset) posBitsToShift + -- jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex)) termCount <- activeTermCount (jMetadata jour) let activeTermIndex = indexByTermCount termCount @@ -62,20 +63,20 @@ readJournal jour sub = do let termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex activeTermId = rawTailTermId rawTail termOffset = rawTailTermOffset rawTail termLen + initTermId = jInitialTermId jour - jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset)) - initTermId <- readInitialTermId (jMetadata jour) - jLog ("readJournal, initTermId: " ++ show (unTermId initTermId)) + -- jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset)) + -- jLog ("readJournal, initTermId: " ++ show (unTermId initTermId)) let position = - computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId + computePosition activeTermId termOffset posBitsToShift initTermId -- putStrLn ("readJournal, offset: " ++ show offset ++ ", position: " ++ show position) -- assertM (int2Int64 offset <= position) let readTermCount = - computeTermIdFromPosition (int2Int64 offset) (positionBitsToShift termLen) initTermId + computeTermIdFromPosition (int2Int64 offset) posBitsToShift initTermId - unTermId initTermId - jLog ("readJournal, readTermCount: " ++ show readTermCount) + -- jLog ("readJournal, readTermCount: " ++ show readTermCount) if int2Int64 offset == position then return Nothing @@ -83,17 +84,17 @@ readJournal jour sub = do -- assertM (int2Int64 offset < position) let relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) - readTermCount * termLen - jLog ("readJournal, relativeOffset: " ++ show relativeOffset) + -- jLog ("readJournal, relativeOffset: " ++ show relativeOffset) tag <- readFrameType termBuffer (TermOffset relativeOffset) - jLog ("readJournal, tag: " ++ show tag) + -- jLog ("readJournal, tag: " ++ show tag) HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset) - jLog ("readJournal, len: " ++ show len) + -- jLog ("readJournal, len: " ++ show len) if tag == Padding then do if len >= 0 then do _success <- casBytesConsumed (jMetadata jour) sub offset (offset + int322Int len) - jLog "readJournal, skipping padding..." + -- jLog "readJournal, skipping padding..." -- If the CAS fails, it just means that some other process incremented the -- counter already. readJournal jour sub @@ -103,7 +104,7 @@ readJournal jour sub = do then readJournal jour sub else do assertMMsg (show len) (len > 0) - jLog ("readJournal, termCount: " ++ show (unTermCount termCount)) + -- jLog ("readJournal, termCount: " ++ show (unTermCount termCount)) -- NOTE: We need to read the bytestring before the CAS, otherwise the -- bytes can be cleaned away before read. In case the CAS fails this -- causes us to do unnecessary work, as we have to throw away the diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index 8a6ec9c3..d66602a4 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -35,20 +35,22 @@ module Journal.Types -- ) where -import Data.Coerce (coerce) -import Control.Concurrent.STM (TVar, atomically, writeTVar, readTVar, newTVarIO) +import Control.Concurrent.STM + (TVar, atomically, newTVarIO, readTVar, writeTVar) import Data.Binary (Binary) import Data.Bits import Data.ByteString (ByteString) +import Data.Coerce (coerce) import Data.IORef (IORef, newIORef, readIORef, writeIORef) -import Data.Vector (Vector) import Data.Int (Int32, Int64) +import Data.Vector (Vector) import Data.Word (Word32, Word64, Word8) import Foreign.Ptr (Ptr, plusPtr) import Foreign.Storable (Storable, sizeOf) import Journal.Internal.ByteBufferPtr import Journal.Internal.Logger (Logger) +import Journal.Internal.Utils import Journal.Types.AtomicCounter ------------------------------------------------------------------------ @@ -59,9 +61,12 @@ pARTITION_COUNT = 3 newtype Metadata = Metadata { unMetadata :: ByteBuffer } data Journal = Journal - { jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer) - , jMetadata :: {-# UNPACK #-} !Metadata - , jLogger :: !Logger + { jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer) + , jMetadata :: {-# UNPACK #-} !Metadata + , jLogger :: !Logger + , jTermLength :: !Int32 + , jPositionBitsToShift :: !Int32 + , jInitialTermId :: {-# UNPACK #-} !TermId } data JMetadata = JMetadata @@ -240,7 +245,7 @@ casCleanPosition (Metadata meta) = casIntAddr meta lOG_CLEAN_POSITION_OFFSET -- | The number of bits to shift when multiplying or dividing by the term buffer -- length. -positionBitsToShift :: Int32 -> Int +positionBitsToShift :: Int32 -> Int32 positionBitsToShift termBufferLength = case termBufferLength of 65536 {- 64 * 1024 -} -> 16 @@ -281,18 +286,18 @@ indexByTermCount termCount = PartitionIndex $ fromIntegral termCount `mod` pARTITION_COUNT -- | Calculate the partition index given a stream position. -indexByPosition :: Int64 -> Int -> PartitionIndex +indexByPosition :: Int64 -> Int32 -> PartitionIndex indexByPosition pos posBitsToShift = fromIntegral $ - (pos `shiftR` posBitsToShift) `mod` fromIntegral pARTITION_COUNT + (pos `shiftR` int322Int posBitsToShift) `mod` fromIntegral pARTITION_COUNT -- | Compute the current position in absolute number of bytes. -computePosition :: TermId -> TermOffset -> Int -> TermId -> Int64 +computePosition :: TermId -> TermOffset -> Int32 -> TermId -> Int64 computePosition activeTermId termOffset posBitsToShift initTermId = computeTermBeginPosition activeTermId posBitsToShift initTermId + fromIntegral termOffset -- | Compute the current position in absolute number of bytes for the beginning -- of a term. -computeTermBeginPosition :: TermId -> Int -> TermId -> Int64 +computeTermBeginPosition :: TermId -> Int32 -> TermId -> Int64 computeTermBeginPosition activeTermId posBitsToShift initTermId = let termCount :: Int64 @@ -302,9 +307,9 @@ computeTermBeginPosition activeTermId posBitsToShift initTermId = termCount `shiftL` fromIntegral posBitsToShift -- | Compute the term id from a position. -computeTermIdFromPosition :: Int64 -> Int -> TermId -> Int32 +computeTermIdFromPosition :: Int64 -> Int32 -> TermId -> Int32 computeTermIdFromPosition pos posBitsToShift initTermId = fromIntegral $ - (pos `shiftR` posBitsToShift) + fromIntegral initTermId + (pos `shiftR` int322Int posBitsToShift) + fromIntegral initTermId -- | Compute the total length of a log file given the term length. computeLogLength :: Int -> Int -> Int64 diff --git a/src/sut/dumblog/bench/Common.hs b/src/sut/dumblog/bench/Common.hs index d5810db9..f605d0c9 100644 --- a/src/sut/dumblog/bench/Common.hs +++ b/src/sut/dumblog/bench/Common.hs @@ -42,7 +42,7 @@ rEAD_FREQUENCY :: Int rEAD_FREQUENCY = 80 iTERATIONS :: Int -iTERATIONS = 100000 +iTERATIONS = 10000 vALUE_TO_WRITE :: ByteString vALUE_TO_WRITE = LBS.pack "Dumblog" @@ -61,7 +61,7 @@ commonMain variant io = do bracket (commonSetup variant io) commonTeardown (commonBenchmark clients) where nUM_OF_CLIENTS :: Int - nUM_OF_CLIENTS = 1000 + nUM_OF_CLIENTS = 100 commonSetup :: String -> (MVar () -> IO ()) -> IO (Async (), HttpClient) commonSetup msg io = do diff --git a/src/sut/dumblog/cabal.project b/src/sut/dumblog/cabal.project index 252c2edc..329a7618 100644 --- a/src/sut/dumblog/cabal.project +++ b/src/sut/dumblog/cabal.project @@ -8,6 +8,7 @@ with-compiler: ghc-8.10.4 reject-unconstrained-dependencies: all constraints: QuickCheck +old-random +constraints: journal +SkipAssert package dumblog diff --git a/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs b/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs index 49ff3aeb..8ecdc3d7 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -54,7 +55,7 @@ httpFrontend journal metrics (FrontEndInfo blocker cVersion) req respond = do respond $ Wai.responseLBS status400 [] err Right cmd -> do key <- newKey blocker - now <- getCurrentNanosSinceEpoch + !now <- getCurrentNanosSinceEpoch let env = encode (Envelope (sequenceNumber key) cmd cVersion now) res <- Journal.appendBS journal env res' <- case res of diff --git a/src/sut/dumblog/src/Dumblog/Journal/Main.hs b/src/sut/dumblog/src/Dumblog/Journal/Main.hs index bce882c9..eb0a3821 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/Main.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/Main.hs @@ -105,7 +105,6 @@ replayDebug originCommands originState = do (s', r) <- runCommand v (DLogger.queueLogger logger) s cmd logLines <- DLogger.flushQueue logger let - lbsToString = LText.unpack . LEncoding.decodeUtf8 (ev, msg) = case cmd of Read i -> ("read", show i) Write logMsg -> ("write", Text.unpack (decodeUtf8 logMsg)) @@ -191,7 +190,7 @@ journalDumblog cfg _capacity port mReady = do let fpj = dUMBLOG_JOURNAL fpm = dUMBLOG_METRICS fps = dUMBLOG_SNAPSHOT - untilSnapshot = 1000 + untilSnapshot = 10000 case cfg of Run q -> do mSnapshot <- Snapshot.readFile fps