From e0deca03a642f89e10941d12056d8402a06feba0 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Mon, 14 Feb 2022 12:37:18 +0100 Subject: [PATCH] feat(journal): add assert for concurrent programs --- src/journal/src/Journal/Internal.hs | 6 ++-- src/journal/src/Journal/MP.hs | 2 +- src/journal/test/JournalTest.hs | 51 +++++++++++++++++++---------- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/journal/src/Journal/Internal.hs b/src/journal/src/Journal/Internal.hs index 9b37e1e5..8fb06e81 100644 --- a/src/journal/src/Journal/Internal.hs +++ b/src/journal/src/Journal/Internal.hs @@ -71,7 +71,7 @@ tryClaim jour len = do jLog ("tryClaim, position: " ++ show position) jLog ("tryClaim, limit: " ++ show limit) - if position < fromIntegral limit + if position < limit then do mResult <- exclusiveTermAppenderClaim (jMetadata jour) termAppender termId termOffset len (jLogger jour) @@ -81,7 +81,7 @@ tryClaim jour len = do -- XXX: Save the result in `producerLimit :: AtomicCounter` and update it in a -- separate process? -calculatePositionLimit :: Journal -> IO Int +calculatePositionLimit :: Journal -> IO Int64 calculatePositionLimit jour = do minSubscriberPos <- readCounter (jBytesConsumed jour) -- XXX: only one subscriber so far. maxSubscriberPos <- readCounter (jBytesConsumed jour) @@ -89,7 +89,7 @@ calculatePositionLimit jour = do let _consumerPos = maxSubscriberPos proposedLimit = minSubscriberPos + fromIntegral termWindowLen cleanBufferTo jour minSubscriberPos - return proposedLimit + return (int2Int64 proposedLimit) where termWindowLength :: Metadata -> IO Int32 termWindowLength meta = do diff --git a/src/journal/src/Journal/MP.hs b/src/journal/src/Journal/MP.hs index 8fd883ff..868ca906 100644 --- a/src/journal/src/Journal/MP.hs +++ b/src/journal/src/Journal/MP.hs @@ -128,7 +128,7 @@ tryClaim jour len = do if unTermCount termCount /= unTermId (termId - initTermId) then return (Left AdminAction) -- XXX: what does this mean to end up here? - else if position < int2Int64 limit + else if position < limit then do eResult <- termAppenderClaim jour len termId newPosition (jMetadata jour) termCount termOffset termId position eResult diff --git a/src/journal/test/JournalTest.hs b/src/journal/test/JournalTest.hs index 0d36765f..9ae46d33 100644 --- a/src/journal/test/JournalTest.hs +++ b/src/journal/test/JournalTest.hs @@ -34,10 +34,10 @@ import Test.QuickCheck.Monadic import Test.Tasty.HUnit (Assertion, assertBool) import Journal -import qualified Journal.MP as MP import Journal.Internal import Journal.Internal.Logger (ioLogger, nullLogger) import Journal.Internal.Utils hiding (assert) +import qualified Journal.MP as MP ------------------------------------------------------------------------ @@ -103,14 +103,16 @@ appendBSFake bs fj@(FakeJournal bss ix termCount) = limit = readBytes + termLen `div` 2 readBytes :: Int - readBytes = sum [ align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT - | bs <- map (bss Vector.!) [0..ix - 1] - ] + readBytes = Vector.sum + . Vector.map (\bs -> align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT) + . Vector.slice 0 (max 0 (ix - 1)) + $ bss unreadBytes :: Int - unreadBytes = sum [ align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT - | bs <- map (bss Vector.!) [ix..Vector.length bss - 1] - ] + unreadBytes = Vector.sum + . Vector.map (\bs -> align (hEADER_LENGTH + BS.length bs) fRAME_ALIGNMENT) + . Vector.slice ix (max 0 (Vector.length bss - 1 - ix)) + $ bss readJournalFake :: FakeJournal -> (FakeJournal, Maybe ByteString) readJournalFake fj@(FakeJournal jour ix termCount) @@ -377,13 +379,13 @@ runCommands cmds = do allocateJournal fp testOptions j <- startJournal fp testOptions putStrLn "" - b <- go m j cmds [] + b <- go m j cmds dumpJournal j return b where - go :: Model -> Journal -> [Command] -> [(Command, Response)] -> IO Bool - go m j [] _hist = putStrLn "\nSuccess!" >> return True - go m j (cmd : cmds) hist = do + go :: Model -> Journal -> [Command] -> IO Bool + go m j [] = putStrLn "\nSuccess!" >> return True + go m j (cmd : cmds) = do let (m', resp) = step cmd m putStrLn (prettyFakeJournal m) putStrLn "" @@ -395,7 +397,7 @@ runCommands cmds = do resp' <- exec cmd j `catch` (return . IOException) -- is <- checkForInconsistencies (fst j) if resp == resp' -- && null is - then go m' j cmds ((cmd, resp) : hist) + then go m' j cmds else do putStrLn "" when (resp /= resp') $ @@ -405,7 +407,6 @@ runCommands cmds = do putStrLn "" putStrLn "Journal dump:" dumpJournal j - -- print (stats (reverse hist)) return False ------------------------------------------------------------------------ @@ -528,6 +529,10 @@ unit_bug14 = assertProgram "" [ AppendBS [(32737,'H')], ReadJournal, AppendBS [(9,'D')] , AppendBS [(32753,'F')], ReadJournal, AppendBS [(1,'Z')]] +unit_bug15 :: Assertion +unit_bug15 = assertConcProgram "" $ ConcProgram + [[AppendBS [(1024,'Z')],AppendBS [(1024,'U')],AppendBS [(1024,'C')],AppendBS [(1024,'Q')]],[AppendBS [(1024,'B')],AppendBS [(1024,'E')],AppendBS [(1024,'P')],ReadJournal,ReadJournal],[ReadJournal,ReadJournal,ReadJournal,ReadJournal],[AppendBS [(1024,'P')],AppendBS [(1024,'I')],ReadJournal],[ReadJournal,AppendBS [(1024,'S')],ReadJournal,AppendBS [(1024,'X')]],[AppendBS [(1024,'V')],AppendBS [(1024,'N')],ReadJournal,AppendBS [(1024,'C')],AppendBS [(1024,'V')]],[AppendBS [(1024,'R')],ReadJournal],[ReadJournal,AppendBS [(1024,'V')]],[ReadJournal,ReadJournal,AppendBS [(1024,'E')]],[ReadJournal,ReadJournal,ReadJournal,ReadJournal],[AppendBS [(1024,'W')],AppendBS [(1024,'O')],AppendBS [(1024,'P')]],[AppendBS [(1024,'B')],AppendBS [(1024,'H')],ReadJournal,ReadJournal,ReadJournal], [ReadJournal,ReadJournal],[AppendBS [(1024,'E')],AppendBS [(1024,'B')],AppendBS [(1024,'I')],AppendBS [(1024,'F')],AppendBS [(1024,'P')]],[AppendBS [(1024,'Q')],ReadJournal,AppendBS [(1024,'C')],ReadJournal,ReadJournal],[ReadJournal,AppendBS [(1024,'H')],ReadJournal,AppendBS [(1024,'P')],AppendBS [(1024,'L')]],[AppendBS [(1024,'X')],ReadJournal,AppendBS [(1024,'Y')],ReadJournal,ReadJournal],[AppendBS [(1024,'H')],ReadJournal],[ReadJournal,ReadJournal,ReadJournal,ReadJournal,AppendBS [(1024,'Q')]],[AppendBS [(1024,'J')],AppendBS [(1024,'N')],AppendBS [(1024,'D')],ReadJournal,AppendBS [(1024,'S')]],[ReadJournal,ReadJournal,AppendBS [(1024,'S')]],[AppendBS [(1024,'Z')],ReadJournal,AppendBS [(1024,'W')],AppendBS [(1024,'E')]],[ReadJournal,ReadJournal,ReadJournal],[ReadJournal,ReadJournal,AppendBS [(1024,'I')],AppendBS [(1024,'W')],AppendBS [(1024,'M')]],[AppendBS [(1024,'F')],AppendBS [(1024,'L')]],[ReadJournal,AppendBS [(1024,'J')],ReadJournal,ReadJournal],[AppendBS [(1024,'R')],ReadJournal],[AppendBS [(1024 ,'A')],ReadJournal,AppendBS [(1024,'N')],AppendBS [(1024,'W')]],[ReadJournal,AppendBS [(1024,'N')],ReadJournal],[ReadJournal,AppendBS [(1024,'C')]],[AppendBS [(1024,'I')],ReadJournal,AppendBS [(1024,'F')] ,AppendBS [(1024,'O')]],[AppendBS [(1024,'A')],ReadJournal,ReadJournal],[AppendBS [(1024,'W')],AppendBS [(1024,'Y')],AppendBS [(1024,'P')]],[ReadJournal,ReadJournal],[AppendBS [(1024,'S')],ReadJournal],[AppendBS [(1024,'L')],ReadJournal],[AppendBS [(1024,'D')],ReadJournal,ReadJournal,ReadJournal,ReadJournal],[ReadJournal,AppendBS [(1024,'P')],AppendBS [(1024,'E')],AppendBS [(1024,'K')]]] + alignedLength :: Int -> Int alignedLength n = align (hEADER_LENGTH + n) fRAME_ALIGNMENT @@ -541,6 +546,19 @@ assertProgram msg cmds = do b <- runCommands cmds assertBool msg b +assertConcProgram :: String -> ConcProgram -> Assertion +assertConcProgram msg (ConcProgram cmdss) = do + (fp, jour) <- initJournal + queue <- newTQueueIO + mapM_ (mapConcurrently (concExec queue jour)) cmdss + hist <- History <$> atomically (flushTQueue queue) + removeFile fp + let msg' = msg ++ "\nHistory:\n" ++ prettyHistory hist + assertBool msg' (linearisable (interleavings hist)) + +prettyHistory :: History -> String +prettyHistory = show + ------------------------------------------------------------------------ newtype ConcProgram = ConcProgram { unConcProgram :: [[Command]] } @@ -600,9 +618,6 @@ data Operation toPid :: ThreadId -> Pid toPid tid = Pid (read (drop (length ("ThreadId " :: String)) (show tid))) -prettyHistory :: History -> String -prettyHistory = show - concExec :: TQueue Operation -> Journal -> Command -> IO () concExec queue jour cmd = do pid <- toPid <$> myThreadId @@ -666,7 +681,7 @@ linearisable = any' (go initModel) any' p xs = any p xs prop_concurrent :: Property -prop_concurrent = mapSize (max 20) $ +prop_concurrent = mapSize (min 20) $ noShrinking $ forAllConcProgram $ \(ConcProgram cmdss) -> monadicIO $ do monitor (classifyCommandsLength (concat cmdss)) -- Rerun a couple of times, to avoid being lucky with the interleavings. @@ -724,7 +739,7 @@ genHistory pids initModel = sized $ \s -> go (consH (Ok pid resp) conc) linear model size ((pid, DoingNothing):pids') prop_lineariseIsOkay :: Property -prop_lineariseIsOkay = mapSize (max 20) $ +prop_lineariseIsOkay = mapSize (min 20) $ -- ^ Above 20 it starts to take too much time to check. forAll (genHistory pids initModel) $ \(history, _) -> linearisable (interleavings history)