Skip to content

Commit

Permalink
feat(journal): add MP append, move MP read and restore old SP read
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Feb 11, 2022
1 parent 74d309b commit 23c68df
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 79 deletions.
75 changes: 7 additions & 68 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -191,31 +191,18 @@ readJournal jour = do
if tag == Padding
then do
assertM (len >= 0)
-- Single-threaded case:
-- incrCounter_ (align (int322Int len) fRAME_ALIGNMENT) (jBytesConsumed jour)
_success <- casCounter (jBytesConsumed jour) offset (offset + int322Int len)
incrCounter_ (align (int322Int len) fRAME_ALIGNMENT) (jBytesConsumed jour)
jLog "readJournal, skipping padding..."
-- If the CAS fails, it just means that some other process incremented the
-- counter already.
readJournal jour
else do
assertM (len > 0)
jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
success <- casCounter (jBytesConsumed jour) offset
(offset + (align (int322Int len) fRAME_ALIGNMENT))
if success
then do
bs <- getByteStringAt termBuffer
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
-- Single-threaded case:
-- incrCounter_ (align (int322Int len) fRAME_ALIGNMENT) (jBytesConsumed jour)
return (Just bs)
else
-- If the CAS failed it means that another process read what we were
-- about to read, so we retry reading the next item instead.
readJournal jour
bs <- getByteStringAt termBuffer
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
incrCounter_ (align (int322Int len) fRAME_ALIGNMENT) (jBytesConsumed jour)
return (Just bs)

------------------------------------------------------------------------

Expand Down Expand Up @@ -265,15 +252,6 @@ dumpJournal jour = do
return (rawTailTermOffset rawTail termLen)
Vector.imapM_ dumpTermBuffer (jTermBuffers jour `Vector.zip` termOffsets)
dumpMetadata (jMetadata jour)
{-
limit <- calculatePositionLimit jour
let termAppender = jTermBuffers jour Vector.! unPartitionIndex activePartitionIndex
position = termBeginPosition + fromIntegral termOffset
putStrLn $ "limit: " ++ show limit
putStrLn $ "termBeginPosition = " ++ show termBeginPosition
putStrLn $ "termOffset = " ++ show (unTermOffset termOffset)
-}

------------------------------------------------------------------------

Expand All @@ -290,42 +268,3 @@ metricsBytesWritten jour = do
let termId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen
return (computePosition termId termOffset (positionBitsToShift termLen) initTermId)

------------------------------------------------------------------------

tj :: IO ()
tj = do
let fp = "/tmp/journal.txt"
opts = defaultOptions
logger = oLogger opts
allocateJournal fp opts
jour <- startJournal fp opts

Right (offset, claimBuf) <- tryClaim jour 5
putStrLn ("offset: " ++ show offset)
putBS claimBuf hEADER_LENGTH (BSChar8.pack "hello")
commit claimBuf logger
Just bs <- readJournal jour
putStrLn ("read bytestring 1: '" ++ BSChar8.unpack bs ++ "'")

Right (offset', claimBuf') <- tryClaim jour 6
putStrLn ("offset': " ++ show offset')
putBS claimBuf' hEADER_LENGTH (BSChar8.pack "world!")
commit claimBuf' logger
Just bs' <- readJournal jour
putStrLn ("read bytestring 2: '" ++ BSChar8.unpack bs' ++ "'")

dumpMetadata (jMetadata jour)
return ()

tbc :: IO ()
tbc = do
bb <- allocate 16
bc <- newBufferClaim bb 0 5
putBS bc 0 (BSChar8.pack "hello")
bs <- getByteStringAt bb 0 5
putStrLn ("'" ++ BSChar8.unpack bs ++ "'")
bc' <- newBufferClaim bb 5 6
putBS bc' 0 (BSChar8.pack "world!")
bs' <- getByteStringAt bb 5 6
putStrLn ("'" ++ BSChar8.unpack bs' ++ "'")
108 changes: 99 additions & 9 deletions src/journal/src/Journal/MP.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Journal.MP where

import Control.Monad (when)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int (Int64)
import qualified Data.Vector as Vector

Expand All @@ -13,15 +15,102 @@ import Journal.Internal
)
import Journal.Internal.BufferClaim
import Journal.Internal.ByteBufferPtr
(ByteBuffer, getCapacity, unCapacity)
(ByteBuffer, getByteStringAt, getCapacity, unCapacity)
import Journal.Internal.Logger (Logger, logg)
import Journal.Internal.Utils
import Journal.Types
import Journal.Types.AtomicCounter

------------------------------------------------------------------------

tryClaim :: Journal -> Int -> Logger -> IO (Either AppendError (Int64, BufferClaim))
tryClaim jour len logger = do
appendBS :: Journal -> ByteString -> IO (Either AppendError ())
appendBS jour bs = do
assertIO $ do
termBufferLen <- int322Int <$> readTermLength (jMetadata jour)
return (0 < BS.length bs && align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT <=
termBufferLen `div` 2)
let len = BS.length bs
eClaim <- tryClaim jour len
case eClaim of
Left err -> return (Left err)
Right (_offset, bufferClaim) -> do
putBS bufferClaim hEADER_LENGTH bs
Right <$> commit bufferClaim (jLogger jour)

readJournal :: Journal -> IO (Maybe ByteString)
readJournal jour = do
offset <- readCounter (jBytesConsumed jour)
let jLog = logg (jLogger jour)
jLog ("readJournal, offset: " ++ show offset)

termLen <- readTermLength (jMetadata jour)
let readIndex = indexByPosition (int2Int64 offset) (positionBitsToShift termLen)
jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex))

termCount <- activeTermCount (jMetadata jour)
let activeTermIndex = indexByTermCount termCount
rawTail <- readRawTail (jMetadata jour) activeTermIndex
let termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex
activeTermId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen

jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
initTermId <- readInitialTermId (jMetadata jour)
jLog ("readJournal, initTermId: " ++ show (unTermId initTermId))
let position =
computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId
assertM (int2Int64 offset <= position)

let readTermCount =
computeTermIdFromPosition (int2Int64 offset) (positionBitsToShift termLen) initTermId
- unTermId initTermId

jLog ("readJournal, readTermCount: " ++ show readTermCount)

if int2Int64 offset == position
then return Nothing
else do
assertM (int2Int64 offset < position)

let relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) - readTermCount * termLen
jLog ("readJournal, relativeOffset: " ++ show relativeOffset)
tag <- readFrameType termBuffer (TermOffset relativeOffset)
jLog ("readJournal, tag: " ++ show tag)
HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset)
jLog ("readJournal, len: " ++ show len)
if tag == Padding
then do
assertM (len >= 0)
-- Single-threaded case:
-- incrCounter_ (align (int322Int len) fRAME_ALIGNMENT) (jBytesConsumed jour)
_success <- casCounter (jBytesConsumed jour) offset (offset + int322Int len)
jLog "readJournal, skipping padding..."
-- If the CAS fails, it just means that some other process incremented the
-- counter already.
readJournal jour
else do
assertM (len > 0)
jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
success <- casCounter (jBytesConsumed jour) offset
(offset + (align (int322Int len) fRAME_ALIGNMENT))
if success
then do
bs <- getByteStringAt termBuffer
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
-- Single-threaded case:
-- incrCounter_ (align (int322Int len) fRAME_ALIGNMENT) (jBytesConsumed jour)
return (Just bs)
else
-- If the CAS failed it means that another process read what we were
-- about to read, so we retry reading the next item instead.
readJournal jour

------------------------------------------------------------------------

tryClaim :: Journal -> Int -> IO (Either AppendError (Int64, BufferClaim))
tryClaim jour len = do
-- XXX: checkPayloadLength len

limit <- calculatePositionLimit jour
Expand All @@ -41,10 +130,10 @@ tryClaim jour len logger = do
then return (Left AdminAction) -- XXX: what does this mean to end up here?
else if position < int2Int64 limit
then do
eResult <- termAppenderClaim jour len termId logger
eResult <- termAppenderClaim jour len termId
newPosition (jMetadata jour) termCount termOffset termId position eResult
else
backPressureStatus position len logger
backPressureStatus position len (jLogger jour)

newPosition :: Metadata -> TermCount -> TermOffset -> TermId -> Int64
-> Either AppendError (TermOffset, BufferClaim)
Expand All @@ -60,9 +149,9 @@ newPosition meta termCount (TermOffset termOffset) termId position eResult =
-- ^ XXX: when is this needed?
-- | (position + termOffset) > maxPossiblePosition = return mAX_POSITION_EXCEEDED

termAppenderClaim :: Journal -> Int -> TermId -> Logger
termAppenderClaim :: Journal -> Int -> TermId
-> IO (Either AppendError (TermOffset, BufferClaim))
termAppenderClaim jour len activeTermId logger = do
termAppenderClaim jour len activeTermId = do
let frameLen = len + hEADER_LENGTH
alignedLen = align frameLen fRAME_ALIGNMENT
termCount <- activeTermCount (jMetadata jour)
Expand All @@ -81,11 +170,12 @@ termAppenderClaim jour len activeTermId logger = do

if int322Int (unTermOffset resultingOffset) > termLen
then do
handleEndOfLogCondition termBuffer termOffset termLen termId logger
handleEndOfLogCondition termBuffer termOffset termLen termId (jLogger jour)
return (Left Rotation)
else do
let frameOffset = termOffset
headerWrite termBuffer frameOffset (HeaderLength (int2Int32 frameLen)) termId logger
headerWrite termBuffer frameOffset (HeaderLength (int2Int32 frameLen)) termId
(jLogger jour)
bufClaim <- newBufferClaim termBuffer termOffset frameLen
return (Right (resultingOffset, bufClaim))

Expand Down
5 changes: 3 additions & 2 deletions src/journal/test/JournalTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import Test.QuickCheck.Instances.ByteString ()
import Test.QuickCheck.Monadic
import Test.Tasty.HUnit (Assertion, assertBool)

import Journal
import Journal hiding (appendBS, readJournal)
import Journal.MP
import Journal.Internal
import Journal.Internal.Logger (ioLogger, nullLogger)
import Journal.Internal.Utils hiding (assert)
Expand Down Expand Up @@ -557,7 +558,7 @@ genConcProgram m0 = sized (go m0 [])
go :: Model -> [[Command]] -> Int -> Gen ConcProgram
go m acc sz | sz <= 0 = return (ConcProgram (reverse acc))
| otherwise = do
n <- chooseInt (2, 2)
n <- chooseInt (2, 2) -- XXX: change back to (2, 5)
cmds <- vectorOf n genCommand `suchThat` concSafe m
go (advanceModel m cmds) (cmds : acc) (sz - n)

Expand Down

0 comments on commit 23c68df

Please sign in to comment.