Skip to content

Commit

Permalink
refactor(journal): cache values that don't change when reading
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 18, 2022
1 parent 16f03d3 commit 0effab0
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 59 deletions.
3 changes: 0 additions & 3 deletions doc/demo-journal/slides-journal.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,6 @@ done
# Disable turbo boost.
echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo

# Allow for more open file descriptors.
ulimit -n unlimited

# The following run is just a (CPU) warm up, the results are discarded.
cabal run bench-sqlite

Expand Down
6 changes: 5 additions & 1 deletion src/journal/extra/assert/skiprun/Assert.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Assert (assert, assertM, assertIO) where
module Assert (assert, assertM, assertIO, assertMMsg) where

import GHC.Stack (HasCallStack)

Expand All @@ -13,3 +13,7 @@ assertM _ = pure ()
{-# INLINE assertIO #-}
assertIO :: IO Bool -> IO ()
assertIO _ = pure ()

{-# INLINE assertMMsg #-}
assertMMsg :: Monad m => String -> Bool -> m ()
assertMMsg _msg _b = return ()
106 changes: 86 additions & 20 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ startJournal fp (Options termLength logger _maxSub) = do
writePosition bb (Position offset)
writeLimit bb (Limit (offset + termLength))
slice bb
return (Journal termBuffers (Metadata meta) logger)

initTermId <- readInitialTermId (Metadata meta)

return (Journal termBuffers (Metadata meta) logger
(int2Int32 termLength) (positionBitsToShift (int2Int32 termLength)) initTermId)

------------------------------------------------------------------------

Expand Down Expand Up @@ -178,59 +182,121 @@ recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock ptr len
readJournal :: Journal -> Subscriber -> IO (Maybe ByteString)
readJournal jour sub = do
offset <- readBytesConsumed (jMetadata jour) sub
let jLog = logg (jLogger jour)
jLog ("readJournal, offset: " ++ show offset)
-- let jLog = logg (jLogger jour)
-- jLog ("readJournal, offset: " ++ show offset)

termLen <- readTermLength (jMetadata jour)
let readIndex = indexByPosition (int2Int64 offset) (positionBitsToShift termLen)
jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex))
let termLen = jTermLength jour
posBitsToShift = jPositionBitsToShift jour
readIndex = indexByPosition (int2Int64 offset) posBitsToShift
-- jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex))

termCount <- activeTermCount (jMetadata jour)
let activeTermIndex = indexByTermCount termCount
rawTail <- readRawTail (jMetadata jour) activeTermIndex
let termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex
activeTermId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen
initTermId = jInitialTermId jour

jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
initTermId <- readInitialTermId (jMetadata jour)
jLog ("readJournal, initTermId: " ++ show (unTermId initTermId))
-- jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
-- jLog ("readJournal, initTermId: " ++ show (unTermId initTermId))
let position =
computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId
computePosition activeTermId termOffset posBitsToShift initTermId
assertM (int2Int64 offset <= position)

let readTermCount =
computeTermIdFromPosition (int2Int64 offset) (positionBitsToShift termLen) initTermId
- unTermId initTermId
computeTermIdFromPosition (int2Int64 offset) posBitsToShift initTermId -
unTermId initTermId

jLog ("readJournal, readTermCount: " ++ show readTermCount)
-- jLog ("readJournal, readTermCount: " ++ show readTermCount)

if int2Int64 offset == position
then return Nothing
else do
assertM (int2Int64 offset < position)

let relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) - readTermCount * termLen
jLog ("readJournal, relativeOffset: " ++ show relativeOffset)
-- jLog ("readJournal, relativeOffset: " ++ show relativeOffset)
tag <- readFrameType termBuffer (TermOffset relativeOffset)
jLog ("readJournal, tag: " ++ show tag)
-- jLog ("readJournal, tag: " ++ show tag)
HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset)
jLog ("readJournal, len: " ++ show len)
-- jLog ("readJournal, len: " ++ show len)
if tag == Padding
then do
assertM (len >= 0)
incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT)
jLog "readJournal, skipping padding..."
-- jLog "readJournal, skipping padding..."
readJournal jour sub
else do
assertM (len > 0)
jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
-- jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
bs <- getByteStringAt termBuffer
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT)
return (Just bs)
{-
readManyJournal' :: Journal -> Subscriber -> [ByteString] -> IO [ByteString]
readManyJournal' jour sub acc = do
offset <- readBytesConsumed (jMetadata jour) sub
let jLog = logg (jLogger jour)
jLog ("readJournal, offset: " ++ show offset)
termLen <- readTermLength (jMetadata jour)
let readIndex = indexByPosition (int2Int64 offset) (positionBitsToShift termLen)
jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex))
termCount <- activeTermCount (jMetadata jour)
let activeTermIndex = indexByTermCount termCount
rawTail <- readRawTail (jMetadata jour) activeTermIndex
let termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex
activeTermId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen
jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
initTermId <- readInitialTermId (jMetadata jour)
jLog ("readJournal, initTermId: " ++ show (unTermId initTermId))
let position =
computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId
assertM (int2Int64 offset <= position)
let readTermCount =
computeTermIdFromPosition (int2Int64 offset) (positionBitsToShift termLen) initTermId
- unTermId initTermId
jLog ("readJournal, readTermCount: " ++ show readTermCount)
go [] offset
where
go offset acc
| int2Int64 offset == position = return (reverse acc)
| otherwise = do
assertM (int2Int64 offset < position)
let relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) -
readTermCount * termLen
jLog ("readJournal, relativeOffset: " ++ show relativeOffset)
tag <- readFrameType termBuffer (TermOffset relativeOffset)
jLog ("readJournal, tag: " ++ show tag)
HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset)
jLog ("readJournal, len: " ++ show len)
if tag == Padding
then do
assertM (len >= 0)
incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT)
jLog "readJournal, skipping padding..."
readManyJournal' jour sub acc
else do
assertM (len > 0)
jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
bs <- getByteStringAt termBuffer
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT)
go (offset + align (int322Int len) fRAME_ALIGNMENT) (bs : acc)
-}

------------------------------------------------------------------------

Expand Down Expand Up @@ -289,10 +355,10 @@ metricsBytesWritten :: Journal -> IO Int64
metricsBytesWritten jour = do
let meta = jMetadata jour
termCount <- activeTermCount meta
initTermId <- readInitialTermId meta
termLen <- readTermLength meta
let index = indexByTermCount termCount
rawTail <- readRawTail meta index
let termId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen
return (computePosition termId termOffset (positionBitsToShift termLen) initTermId)
return (computePosition termId termOffset (jPositionBitsToShift jour)
(jInitialTermId jour))
3 changes: 2 additions & 1 deletion src/journal/src/Journal/Internal/ByteBufferPtr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ checkInRange val (lower, upper) name
]

boundCheck :: HasCallStack => ByteBuffer -> Int -> Int -> IO ()
boundCheck bb ix size = do
boundCheck bb ix size = return () {- do
invariant bb
-- XXX: use Word for size?
-- XXX: parametrise on build flag and only do these checks if enabled?
Expand Down Expand Up @@ -169,6 +169,7 @@ invariant bb = do
assertM (mark <= pos)
assertM (pos <= lim)
assertM (lim - slice <= capa)
-}

------------------------------------------------------------------------
-- * Create
Expand Down
33 changes: 17 additions & 16 deletions src/journal/src/Journal/MP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,51 +49,52 @@ recvBytes bc sock len = recvBytesOffset bc sock hEADER_LENGTH len
readJournal :: Journal -> Subscriber -> IO (Maybe ByteString)
readJournal jour sub = do
offset <- readBytesConsumed (jMetadata jour) sub
let jLog = logg (jLogger jour)
jLog ("readJournal, offset: " ++ show offset)
-- let jLog = logg (jLogger jour)
-- jLog ("readJournal, offset: " ++ show offset)

termLen <- readTermLength (jMetadata jour)
let readIndex = indexByPosition (int2Int64 offset) (positionBitsToShift termLen)
jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex))
let termLen = jTermLength jour
posBitsToShift = jPositionBitsToShift jour
readIndex = indexByPosition (int2Int64 offset) posBitsToShift
-- jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex))

termCount <- activeTermCount (jMetadata jour)
let activeTermIndex = indexByTermCount termCount
rawTail <- readRawTail (jMetadata jour) activeTermIndex
let termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex
activeTermId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen
initTermId = jInitialTermId jour

jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
initTermId <- readInitialTermId (jMetadata jour)
jLog ("readJournal, initTermId: " ++ show (unTermId initTermId))
-- jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
-- jLog ("readJournal, initTermId: " ++ show (unTermId initTermId))
let position =
computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId
computePosition activeTermId termOffset posBitsToShift initTermId
-- putStrLn ("readJournal, offset: " ++ show offset ++ ", position: " ++ show position)
-- assertM (int2Int64 offset <= position)

let readTermCount =
computeTermIdFromPosition (int2Int64 offset) (positionBitsToShift termLen) initTermId
computeTermIdFromPosition (int2Int64 offset) posBitsToShift initTermId
- unTermId initTermId

jLog ("readJournal, readTermCount: " ++ show readTermCount)
-- jLog ("readJournal, readTermCount: " ++ show readTermCount)

if int2Int64 offset == position
then return Nothing
else do
-- assertM (int2Int64 offset < position)

let relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) - readTermCount * termLen
jLog ("readJournal, relativeOffset: " ++ show relativeOffset)
-- jLog ("readJournal, relativeOffset: " ++ show relativeOffset)
tag <- readFrameType termBuffer (TermOffset relativeOffset)
jLog ("readJournal, tag: " ++ show tag)
-- jLog ("readJournal, tag: " ++ show tag)
HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset)
jLog ("readJournal, len: " ++ show len)
-- jLog ("readJournal, len: " ++ show len)
if tag == Padding
then do
if len >= 0
then do
_success <- casBytesConsumed (jMetadata jour) sub offset (offset + int322Int len)
jLog "readJournal, skipping padding..."
-- jLog "readJournal, skipping padding..."
-- If the CAS fails, it just means that some other process incremented the
-- counter already.
readJournal jour sub
Expand All @@ -103,7 +104,7 @@ readJournal jour sub = do
then readJournal jour sub
else do
assertMMsg (show len) (len > 0)
jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
-- jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
-- NOTE: We need to read the bytestring before the CAS, otherwise the
-- bytes can be cleaned away before read. In case the CAS fails this
-- causes us to do unnecessary work, as we have to throw away the
Expand Down
31 changes: 18 additions & 13 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ module Journal.Types
-- )
where

import Data.Coerce (coerce)
import Control.Concurrent.STM (TVar, atomically, writeTVar, readTVar, newTVarIO)
import Control.Concurrent.STM
(TVar, atomically, newTVarIO, readTVar, writeTVar)
import Data.Binary (Binary)
import Data.Bits
import Data.ByteString (ByteString)
import Data.Coerce (coerce)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Vector (Vector)
import Data.Int (Int32, Int64)
import Data.Vector (Vector)
import Data.Word (Word32, Word64, Word8)
import Foreign.Ptr (Ptr, plusPtr)
import Foreign.Storable (Storable, sizeOf)

import Journal.Internal.ByteBufferPtr
import Journal.Internal.Logger (Logger)
import Journal.Internal.Utils
import Journal.Types.AtomicCounter

------------------------------------------------------------------------
Expand All @@ -59,9 +61,12 @@ pARTITION_COUNT = 3
newtype Metadata = Metadata { unMetadata :: ByteBuffer }

data Journal = Journal
{ jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer)
, jMetadata :: {-# UNPACK #-} !Metadata
, jLogger :: !Logger
{ jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer)
, jMetadata :: {-# UNPACK #-} !Metadata
, jLogger :: !Logger
, jTermLength :: !Int32
, jPositionBitsToShift :: !Int32
, jInitialTermId :: {-# UNPACK #-} !TermId
}

data JMetadata = JMetadata
Expand Down Expand Up @@ -240,7 +245,7 @@ casCleanPosition (Metadata meta) = casIntAddr meta lOG_CLEAN_POSITION_OFFSET

-- | The number of bits to shift when multiplying or dividing by the term buffer
-- length.
positionBitsToShift :: Int32 -> Int
positionBitsToShift :: Int32 -> Int32
positionBitsToShift termBufferLength =
case termBufferLength of
65536 {- 64 * 1024 -} -> 16
Expand Down Expand Up @@ -281,18 +286,18 @@ indexByTermCount termCount = PartitionIndex $
fromIntegral termCount `mod` pARTITION_COUNT

-- | Calculate the partition index given a stream position.
indexByPosition :: Int64 -> Int -> PartitionIndex
indexByPosition :: Int64 -> Int32 -> PartitionIndex
indexByPosition pos posBitsToShift = fromIntegral $
(pos `shiftR` posBitsToShift) `mod` fromIntegral pARTITION_COUNT
(pos `shiftR` int322Int posBitsToShift) `mod` fromIntegral pARTITION_COUNT

-- | Compute the current position in absolute number of bytes.
computePosition :: TermId -> TermOffset -> Int -> TermId -> Int64
computePosition :: TermId -> TermOffset -> Int32 -> TermId -> Int64
computePosition activeTermId termOffset posBitsToShift initTermId =
computeTermBeginPosition activeTermId posBitsToShift initTermId + fromIntegral termOffset

-- | Compute the current position in absolute number of bytes for the beginning
-- of a term.
computeTermBeginPosition :: TermId -> Int -> TermId -> Int64
computeTermBeginPosition :: TermId -> Int32 -> TermId -> Int64
computeTermBeginPosition activeTermId posBitsToShift initTermId =
let
termCount :: Int64
Expand All @@ -302,9 +307,9 @@ computeTermBeginPosition activeTermId posBitsToShift initTermId =
termCount `shiftL` fromIntegral posBitsToShift

-- | Compute the term id from a position.
computeTermIdFromPosition :: Int64 -> Int -> TermId -> Int32
computeTermIdFromPosition :: Int64 -> Int32 -> TermId -> Int32
computeTermIdFromPosition pos posBitsToShift initTermId = fromIntegral $
(pos `shiftR` posBitsToShift) + fromIntegral initTermId
(pos `shiftR` int322Int posBitsToShift) + fromIntegral initTermId

-- | Compute the total length of a log file given the term length.
computeLogLength :: Int -> Int -> Int64
Expand Down
4 changes: 2 additions & 2 deletions src/sut/dumblog/bench/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ rEAD_FREQUENCY :: Int
rEAD_FREQUENCY = 80

iTERATIONS :: Int
iTERATIONS = 100000
iTERATIONS = 10000

vALUE_TO_WRITE :: ByteString
vALUE_TO_WRITE = LBS.pack "Dumblog"
Expand All @@ -61,7 +61,7 @@ commonMain variant io = do
bracket (commonSetup variant io) commonTeardown (commonBenchmark clients)
where
nUM_OF_CLIENTS :: Int
nUM_OF_CLIENTS = 1000
nUM_OF_CLIENTS = 100

commonSetup :: String -> (MVar () -> IO ()) -> IO (Async (), HttpClient)
commonSetup msg io = do
Expand Down
1 change: 1 addition & 0 deletions src/sut/dumblog/cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ with-compiler: ghc-8.10.4
reject-unconstrained-dependencies: all

constraints: QuickCheck +old-random
constraints: journal +SkipAssert

package dumblog

Expand Down
Loading

0 comments on commit 0effab0

Please sign in to comment.