diff --git a/src/journal/src/Journal/Internal.hs b/src/journal/src/Journal/Internal.hs index 1ff2c08e..20038bf7 100644 --- a/src/journal/src/Journal/Internal.hs +++ b/src/journal/src/Journal/Internal.hs @@ -62,6 +62,7 @@ tryClaim jour len = do putStrLn ("tryClaim, termCount: " ++ show (unTermCount termCount)) putStrLn ("tryClaim, activePartitionIndex: " ++ show (unPartitionIndex activePartitionIndex)) putStrLn ("tryClaim, termOffset: " ++ show (unTermOffset termOffset)) + putStrLn ("tryClaim, termBeginPosition: " ++ show termBeginPosition) limit <- calculatePositionLimit jour let termAppender = jTermBuffers jour Vector.! unPartitionIndex activePartitionIndex position = termBeginPosition + fromIntegral termOffset diff --git a/src/journal/test/JournalTest.hs b/src/journal/test/JournalTest.hs index 25a34c02..1ce05a0c 100644 --- a/src/journal/test/JournalTest.hs +++ b/src/journal/test/JournalTest.hs @@ -3,6 +3,7 @@ module JournalTest where +import Debug.Trace (trace) import Control.Arrow ((&&&)) import Control.Exception (IOException, catch, displayException) import Control.Monad (unless, when) @@ -44,36 +45,72 @@ startJournalFake = FakeJournal Vector.empty 0 appendBSFake :: ByteString -> FakeJournal -> (FakeJournal, Either AppendError ()) appendBSFake bs fj@(FakeJournal bss ix) - | unreadBytes < limit = (FakeJournal (Vector.snoc bss bs) ix, Right ()) + | position < limit = + trace (unlines [ "TRACE" + , "ix: " ++ show ix + , "readBytes: " ++ show readBytes + , "unreadBytes: " ++ show unreadBytes + , "readPadBytes: " ++ show readPadBytes + , "unreadPadBytes: " ++ show unreadPadBytes + , "position: " ++ show position + , "limit: " ++ show limit + ]) + (FakeJournal (Vector.snoc bss bs) ix, Right ()) | otherwise = (fj, Left BackPressure) where termLen :: Int termLen = oTermBufferLength testOptions + position = readBytes + unreadBytes + limit :: Int - limit = termLen `div` 2 + limit = readBytes + termLen `div` 2 + + readBytes :: Int + readBytes = (+ readPadBytes) + . Vector.sum + . Vector.map (\bs -> hEADER_LENGTH + BS.length bs) + $ Vector.slice 0 (max 0 (ix - 1)) bss unreadBytes :: Int unreadBytes = sum [ hEADER_LENGTH + BS.length bs | bs <- map (bss Vector.!) [ix..Vector.length bss - 1] ] - + padBytes + + unreadPadBytes + + -- XXX: figure out why this doesn't work: + -- unreadBytes :: Int + -- unreadBytes = (+ unreadPadBytes) + -- . Vector.sum + -- . Vector.map (\bs -> hEADER_LENGTH + BS.length bs) + -- $ Vector.slice ix (max 0 (Vector.length bss - 1)) bss + - padBytes :: Int - padBytes = + readPadBytes, unreadPadBytes :: Int + (readPadBytes, unreadPadBytes) = let - (lbss, rbss) = Vector.splitAt ix bss - (lacc, lpad) = padding 0 0 (Vector.toList (Vector.map BS.length lbss)) - (racc, rpad) = padding lacc 0 (Vector.toList (Vector.map BS.length rbss)) + (lbss, rbss) = Vector.splitAt ix (Vector.snoc bss bs) + (lacc, lpad) = padding 0 0 (Vector.map BS.length lbss) + (racc, rpad) = padding lacc 0 (Vector.map BS.length rbss) in - rpad - - padding :: Int -> Int -> [Int] -> (Int, Int) - padding acc pad [] = (acc, pad) - padding acc pad (l : ls) - | acc + l + hEADER_LENGTH <= termLen = padding (acc + l + hEADER_LENGTH) pad ls - | otherwise = padding (l + hEADER_LENGTH) - (pad + (termLen - acc)) ls + trace (unlines [ "PAD" + , "ix: " ++ show ix + , "lbss: " ++ show (Vector.map encodeRunLength lbss) + , "rbss: " ++ show (Vector.map encodeRunLength rbss) + , "lacc: " ++ show lacc + , "lpad: " ++ show lpad + , "racc: " ++ show racc + , "rpad: " ++ show rpad + ]) + (lpad, rpad) + + padding :: Int -> Int -> Vector Int -> (Int, Int) + padding acc pad ls = case Vector.uncons ls of + Nothing -> (acc, pad) + Just (l, ls) + | acc + l + hEADER_LENGTH <= termLen -> padding (acc + l + hEADER_LENGTH) pad ls + | otherwise -> padding (l + hEADER_LENGTH) + (pad + (termLen - acc)) ls readJournalFake :: FakeJournal -> (FakeJournal, Maybe ByteString) readJournalFake fj@(FakeJournal jour ix) = @@ -353,7 +390,14 @@ unit_bug3 = assertProgram "" , ReadJournal , AppendBS [(32756,'N')] , ReadJournal - , AppendBS [(32756,'W')] + , AppendBS [(32756,'W')] -- 7+6 + 32756+6 + 32756+6 = 65537 which won't fit in + -- the first term, so padding is written, term + -- rotation happens, and the Ws get written to the + -- next term. The padding will be pretty big: + -- 65536-(7+6 + 32756+6) = 32761, so after the Ws are + -- written there's 32761 + 32756+6 = 65523 unread + -- bytes. 7+6+32756+6 = 32775 bytes have been read, + -- so the limit is 32775 + (64*1024 / 2) = 65543. , AppendBS [(32756,'Q')] ] @@ -361,6 +405,26 @@ unit_bug4 :: Assertion unit_bug4 = assertProgram "" [AppendBS [(1, 'A')], AppendBS [(32755,'Q')], AppendBS [(1,'D')]] +unit_bug5 :: Assertion +unit_bug5 = assertProgram "" + [AppendBS [(1,'K')], AppendBS [(32757,'Q')], ReadJournal, AppendBS [(32761,'R')]] +-- ^ Before writing the Rs, we have posiiton: 1+6+32757+6 = 32770 and limit: +-- 1+6+(64*1024/2) = 32775, so that's fine, however 32770+32761+6 = 65537 so it +-- doesn't fit in the current term. 65536-(1+6+32757+6) = 32766 bytes of padding +-- is written instead and then term rotation happens before the 32761+6 bytes +-- related to the append of Rs, this gives us a position of 32770+32767 = 65537 +-- which is over the limit and so backpressure should happen. + +unit_bug6 :: Assertion +unit_bug6 = assertProgram "" + [ AppendBS [(1,'J')], ReadJournal, AppendBS [(32757,'K')], AppendBS [(32761,'H')] + , AppendBS [(1,'F')]] + +unit_bug7 :: Assertion +unit_bug7 = assertProgram "" + [AppendBS [(32756,'Y')], AppendBS [(32761,'A')], ReadJournal, AppendBS [(1,'J')]] + + ------------------------------------------------------------------------ assertProgram :: String -> [Command] -> Assertion