Skip to content

Commit

Permalink
Merge pull request #485 from symbiont-io/journal
Browse files Browse the repository at this point in the history
feat(journal): add assert for concurrent programs
  • Loading branch information
symbiont-stevan-andjelkovic authored Feb 14, 2022
2 parents 75b46f5 + f5fa91b commit 9cb42a2
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 22 deletions.
6 changes: 3 additions & 3 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ tryClaim jour len = do

jLog ("tryClaim, position: " ++ show position)
jLog ("tryClaim, limit: " ++ show limit)
if position < fromIntegral limit
if position < limit
then do
mResult <- exclusiveTermAppenderClaim (jMetadata jour) termAppender termId
termOffset len (jLogger jour)
Expand All @@ -81,15 +81,15 @@ tryClaim jour len = do

-- XXX: Save the result in `producerLimit :: AtomicCounter` and update it in a
-- separate process?
calculatePositionLimit :: Journal -> IO Int
calculatePositionLimit :: Journal -> IO Int64
calculatePositionLimit jour = do
minSubscriberPos <- readCounter (jBytesConsumed jour) -- XXX: only one subscriber so far.
maxSubscriberPos <- readCounter (jBytesConsumed jour)
termWindowLen <- termWindowLength (jMetadata jour)
let _consumerPos = maxSubscriberPos
proposedLimit = minSubscriberPos + fromIntegral termWindowLen
cleanBufferTo jour minSubscriberPos
return proposedLimit
return (int2Int64 proposedLimit)
where
termWindowLength :: Metadata -> IO Int32
termWindowLength meta = do
Expand Down
2 changes: 1 addition & 1 deletion src/journal/src/Journal/MP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ tryClaim jour len = do

if unTermCount termCount /= unTermId (termId - initTermId)
then return (Left AdminAction) -- XXX: what does this mean to end up here?
else if position < int2Int64 limit
else if position < limit
then do
eResult <- termAppenderClaim jour len termId
newPosition (jMetadata jour) termCount termOffset termId position eResult
Expand Down
51 changes: 33 additions & 18 deletions src/journal/test/JournalTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import Test.QuickCheck.Monadic
import Test.Tasty.HUnit (Assertion, assertBool)

import Journal
import qualified Journal.MP as MP
import Journal.Internal
import Journal.Internal.Logger (ioLogger, nullLogger)
import Journal.Internal.Utils hiding (assert)
import qualified Journal.MP as MP

------------------------------------------------------------------------

Expand Down Expand Up @@ -112,14 +112,16 @@ appendBSFake bs fj@(FakeJournal bss ix termCount) =
hardLimit = readBytes + termLen -- TODO: calculate this correctly

readBytes :: Int
readBytes = sum [ align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT
| bs <- map (bss Vector.!) [0..ix - 1]
]
readBytes = Vector.sum
. Vector.map (\bs -> align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT)
. Vector.take (ix - 1)
$ bss

unreadBytes :: Int
unreadBytes = sum [ align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT
| bs <- map (bss Vector.!) [ix..Vector.length bss - 1]
]
unreadBytes = Vector.sum
. Vector.map (\bs -> align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT)
. Vector.drop ix
$ bss

readJournalFake :: FakeJournal -> (FakeJournal, Maybe ByteString)
readJournalFake fj@(FakeJournal jour ix termCount)
Expand Down Expand Up @@ -396,13 +398,13 @@ runCommands cmds = do
allocateJournal fp testOptions
j <- startJournal fp testOptions
putStrLn ""
b <- go m j cmds []
b <- go m j cmds
dumpJournal j
return b
where
go :: Model -> Journal -> [Command] -> [(Command, Response)] -> IO Bool
go m j [] _hist = putStrLn "\nSuccess!" >> return True
go m j (cmd : cmds) hist = do
go :: Model -> Journal -> [Command] -> IO Bool
go m j [] = putStrLn "\nSuccess!" >> return True
go m j (cmd : cmds) = do
let (m', resp) = step cmd m
putStrLn (prettyFakeJournal m)
putStrLn ""
Expand All @@ -414,7 +416,7 @@ runCommands cmds = do
resp' <- exec cmd j `catch` (return . IOException)
-- is <- checkForInconsistencies (fst j)
if resp == resp' -- && null is
then go m' j cmds ((cmd, resp) : hist)
then go m' j cmds
else do
putStrLn ""
when (resp /= resp') $
Expand All @@ -424,7 +426,6 @@ runCommands cmds = do
putStrLn ""
putStrLn "Journal dump:"
dumpJournal j
-- print (stats (reverse hist))
return False

------------------------------------------------------------------------
Expand Down Expand Up @@ -547,6 +548,10 @@ unit_bug14 = assertProgram ""
[ AppendBS [(32737,'H')], ReadJournal, AppendBS [(9,'D')]
, AppendBS [(32753,'F')], ReadJournal, AppendBS [(1,'Z')]]

unit_bug15 :: Assertion
unit_bug15 = assertConcProgram "" $ ConcProgram
[[AppendBS [(1024,'Z')],AppendBS [(1024,'U')],AppendBS [(1024,'C')],AppendBS [(1024,'Q')]],[AppendBS [(1024,'B')],AppendBS [(1024,'E')],AppendBS [(1024,'P')],ReadJournal,ReadJournal],[ReadJournal,ReadJournal,ReadJournal,ReadJournal],[AppendBS [(1024,'P')],AppendBS [(1024,'I')],ReadJournal],[ReadJournal,AppendBS [(1024,'S')],ReadJournal,AppendBS [(1024,'X')]],[AppendBS [(1024,'V')],AppendBS [(1024,'N')],ReadJournal,AppendBS [(1024,'C')],AppendBS [(1024,'V')]],[AppendBS [(1024,'R')],ReadJournal],[ReadJournal,AppendBS [(1024,'V')]],[ReadJournal,ReadJournal,AppendBS [(1024,'E')]],[ReadJournal,ReadJournal,ReadJournal,ReadJournal],[AppendBS [(1024,'W')],AppendBS [(1024,'O')],AppendBS [(1024,'P')]],[AppendBS [(1024,'B')],AppendBS [(1024,'H')],ReadJournal,ReadJournal,ReadJournal], [ReadJournal,ReadJournal],[AppendBS [(1024,'E')],AppendBS [(1024,'B')],AppendBS [(1024,'I')],AppendBS [(1024,'F')],AppendBS [(1024,'P')]],[AppendBS [(1024,'Q')],ReadJournal,AppendBS [(1024,'C')],ReadJournal,ReadJournal],[ReadJournal,AppendBS [(1024,'H')],ReadJournal,AppendBS [(1024,'P')],AppendBS [(1024,'L')]],[AppendBS [(1024,'X')],ReadJournal,AppendBS [(1024,'Y')],ReadJournal,ReadJournal],[AppendBS [(1024,'H')],ReadJournal],[ReadJournal,ReadJournal,ReadJournal,ReadJournal,AppendBS [(1024,'Q')]],[AppendBS [(1024,'J')],AppendBS [(1024,'N')],AppendBS [(1024,'D')],ReadJournal,AppendBS [(1024,'S')]],[ReadJournal,ReadJournal,AppendBS [(1024,'S')]],[AppendBS [(1024,'Z')],ReadJournal,AppendBS [(1024,'W')],AppendBS [(1024,'E')]],[ReadJournal,ReadJournal,ReadJournal],[ReadJournal,ReadJournal,AppendBS [(1024,'I')],AppendBS [(1024,'W')],AppendBS [(1024,'M')]],[AppendBS [(1024,'F')],AppendBS [(1024,'L')]],[ReadJournal,AppendBS [(1024,'J')],ReadJournal,ReadJournal],[AppendBS [(1024,'R')],ReadJournal],[AppendBS [(1024 ,'A')],ReadJournal,AppendBS [(1024,'N')],AppendBS [(1024,'W')]],[ReadJournal,AppendBS [(1024,'N')],ReadJournal],[ReadJournal,AppendBS [(1024,'C')]],[AppendBS [(1024,'I')],ReadJournal,AppendBS [(1024,'F')] ,AppendBS [(1024,'O')]],[AppendBS [(1024,'A')],ReadJournal,ReadJournal],[AppendBS [(1024,'W')],AppendBS [(1024,'Y')],AppendBS [(1024,'P')]],[ReadJournal,ReadJournal],[AppendBS [(1024,'S')],ReadJournal],[AppendBS [(1024,'L')],ReadJournal],[AppendBS [(1024,'D')],ReadJournal,ReadJournal,ReadJournal,ReadJournal],[ReadJournal,AppendBS [(1024,'P')],AppendBS [(1024,'E')],AppendBS [(1024,'K')]]]

alignedLength :: Int -> Int
alignedLength n = align (hEADER_LENGTH + n) fRAME_ALIGNMENT

Expand All @@ -560,6 +565,19 @@ assertProgram msg cmds = do
b <- runCommands cmds
assertBool msg b

assertConcProgram :: String -> ConcProgram -> Assertion
assertConcProgram msg (ConcProgram cmdss) = do
(fp, jour) <- initJournal
queue <- newTQueueIO
mapM_ (mapConcurrently (concExec queue jour)) cmdss
hist <- History <$> atomically (flushTQueue queue)
removeFile fp
let msg' = msg ++ "\nHistory:\n" ++ prettyHistory hist
assertBool msg' (linearisable (interleavings hist))

prettyHistory :: History -> String
prettyHistory = show

------------------------------------------------------------------------

newtype ConcProgram = ConcProgram { unConcProgram :: [[Command]] }
Expand Down Expand Up @@ -619,9 +637,6 @@ data Operation
toPid :: ThreadId -> Pid
toPid tid = Pid (read (drop (length ("ThreadId " :: String)) (show tid)))

prettyHistory :: History -> String
prettyHistory = show

concExec :: TQueue Operation -> Journal -> Command -> IO ()
concExec queue jour cmd = do
pid <- toPid <$> myThreadId
Expand Down Expand Up @@ -684,7 +699,7 @@ linearisable = any' (go initModel)
any' p xs = any p xs

prop_concurrent :: Property
prop_concurrent = mapSize (max 20) $
prop_concurrent = mapSize (min 20) $ noShrinking $
forAllConcProgram $ \(ConcProgram cmdss) -> monadicIO $ do
monitor (classifyCommandsLength (concat cmdss))
-- Rerun a couple of times, to avoid being lucky with the interleavings.
Expand Down Expand Up @@ -742,7 +757,7 @@ genHistory pids initModel = sized $ \s ->
go (consH (Ok pid resp) conc) linear model size ((pid, DoingNothing):pids')

prop_lineariseIsOkay :: Property
prop_lineariseIsOkay = mapSize (max 20) $
prop_lineariseIsOkay = mapSize (min 20) $
-- ^ Above 20 it starts to take too much time to check.
forAll (genHistory pids initModel) $ \(history, _) ->
linearisable (interleavings history)
Expand Down

0 comments on commit 9cb42a2

Please sign in to comment.