Skip to content

Commit

Permalink
Merge pull request #462 from symbiont-io/journal
Browse files Browse the repository at this point in the history
fix(journal): in the model backpressure check if position is below the limit
  • Loading branch information
symbiont-stevan-andjelkovic authored Feb 4, 2022
2 parents 4d66ebb + a85fd53 commit 5c1aa35
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ tryClaim jour len = do
putStrLn ("tryClaim, termCount: " ++ show (unTermCount termCount))
putStrLn ("tryClaim, activePartitionIndex: " ++ show (unPartitionIndex activePartitionIndex))
putStrLn ("tryClaim, termOffset: " ++ show (unTermOffset termOffset))
putStrLn ("tryClaim, termBeginPosition: " ++ show termBeginPosition)
limit <- calculatePositionLimit jour
let termAppender = jTermBuffers jour Vector.! unPartitionIndex activePartitionIndex
position = termBeginPosition + fromIntegral termOffset
Expand Down
98 changes: 81 additions & 17 deletions src/journal/test/JournalTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module JournalTest where

import Debug.Trace (trace)
import Control.Arrow ((&&&))
import Control.Exception (IOException, catch, displayException)
import Control.Monad (unless, when)
Expand Down Expand Up @@ -44,36 +45,72 @@ startJournalFake = FakeJournal Vector.empty 0

appendBSFake :: ByteString -> FakeJournal -> (FakeJournal, Either AppendError ())
appendBSFake bs fj@(FakeJournal bss ix)
| unreadBytes < limit = (FakeJournal (Vector.snoc bss bs) ix, Right ())
| position < limit =
trace (unlines [ "TRACE"
, "ix: " ++ show ix
, "readBytes: " ++ show readBytes
, "unreadBytes: " ++ show unreadBytes
, "readPadBytes: " ++ show readPadBytes
, "unreadPadBytes: " ++ show unreadPadBytes
, "position: " ++ show position
, "limit: " ++ show limit
])
(FakeJournal (Vector.snoc bss bs) ix, Right ())
| otherwise = (fj, Left BackPressure)
where
termLen :: Int
termLen = oTermBufferLength testOptions

position = readBytes + unreadBytes

limit :: Int
limit = termLen `div` 2
limit = readBytes + termLen `div` 2

readBytes :: Int
readBytes = (+ readPadBytes)
. Vector.sum
. Vector.map (\bs -> hEADER_LENGTH + BS.length bs)
$ Vector.slice 0 (max 0 (ix - 1)) bss

unreadBytes :: Int
unreadBytes = sum [ hEADER_LENGTH + BS.length bs
| bs <- map (bss Vector.!) [ix..Vector.length bss - 1]
]
+ padBytes
+ unreadPadBytes

-- XXX: figure out why this doesn't work:
-- unreadBytes :: Int
-- unreadBytes = (+ unreadPadBytes)
-- . Vector.sum
-- . Vector.map (\bs -> hEADER_LENGTH + BS.length bs)
-- $ Vector.slice ix (max 0 (Vector.length bss - 1)) bss


padBytes :: Int
padBytes =
readPadBytes, unreadPadBytes :: Int
(readPadBytes, unreadPadBytes) =
let
(lbss, rbss) = Vector.splitAt ix bss
(lacc, lpad) = padding 0 0 (Vector.toList (Vector.map BS.length lbss))
(racc, rpad) = padding lacc 0 (Vector.toList (Vector.map BS.length rbss))
(lbss, rbss) = Vector.splitAt ix (Vector.snoc bss bs)
(lacc, lpad) = padding 0 0 (Vector.map BS.length lbss)
(racc, rpad) = padding lacc 0 (Vector.map BS.length rbss)
in
rpad

padding :: Int -> Int -> [Int] -> (Int, Int)
padding acc pad [] = (acc, pad)
padding acc pad (l : ls)
| acc + l + hEADER_LENGTH <= termLen = padding (acc + l + hEADER_LENGTH) pad ls
| otherwise = padding (l + hEADER_LENGTH)
(pad + (termLen - acc)) ls
trace (unlines [ "PAD"
, "ix: " ++ show ix
, "lbss: " ++ show (Vector.map encodeRunLength lbss)
, "rbss: " ++ show (Vector.map encodeRunLength rbss)
, "lacc: " ++ show lacc
, "lpad: " ++ show lpad
, "racc: " ++ show racc
, "rpad: " ++ show rpad
])
(lpad, rpad)

padding :: Int -> Int -> Vector Int -> (Int, Int)
padding acc pad ls = case Vector.uncons ls of
Nothing -> (acc, pad)
Just (l, ls)
| acc + l + hEADER_LENGTH <= termLen -> padding (acc + l + hEADER_LENGTH) pad ls
| otherwise -> padding (l + hEADER_LENGTH)
(pad + (termLen - acc)) ls

readJournalFake :: FakeJournal -> (FakeJournal, Maybe ByteString)
readJournalFake fj@(FakeJournal jour ix) =
Expand Down Expand Up @@ -353,14 +390,41 @@ unit_bug3 = assertProgram ""
, ReadJournal
, AppendBS [(32756,'N')]
, ReadJournal
, AppendBS [(32756,'W')]
, AppendBS [(32756,'W')] -- 7+6 + 32756+6 + 32756+6 = 65537 which won't fit in
-- the first term, so padding is written, term
-- rotation happens, and the Ws get written to the
-- next term. The padding will be pretty big:
-- 65536-(7+6 + 32756+6) = 32761, so after the Ws are
-- written there's 32761 + 32756+6 = 65523 unread
-- bytes. 7+6+32756+6 = 32775 bytes have been read,
-- so the limit is 32775 + (64*1024 / 2) = 65543.
, AppendBS [(32756,'Q')]
]

unit_bug4 :: Assertion
unit_bug4 = assertProgram ""
[AppendBS [(1, 'A')], AppendBS [(32755,'Q')], AppendBS [(1,'D')]]

unit_bug5 :: Assertion
unit_bug5 = assertProgram ""
[AppendBS [(1,'K')], AppendBS [(32757,'Q')], ReadJournal, AppendBS [(32761,'R')]]
-- ^ Before writing the Rs, we have posiiton: 1+6+32757+6 = 32770 and limit:
-- 1+6+(64*1024/2) = 32775, so that's fine, however 32770+32761+6 = 65537 so it
-- doesn't fit in the current term. 65536-(1+6+32757+6) = 32766 bytes of padding
-- is written instead and then term rotation happens before the 32761+6 bytes
-- related to the append of Rs, this gives us a position of 32770+32767 = 65537
-- which is over the limit and so backpressure should happen.

unit_bug6 :: Assertion
unit_bug6 = assertProgram ""
[ AppendBS [(1,'J')], ReadJournal, AppendBS [(32757,'K')], AppendBS [(32761,'H')]
, AppendBS [(1,'F')]]

unit_bug7 :: Assertion
unit_bug7 = assertProgram ""
[AppendBS [(32756,'Y')], AppendBS [(32761,'A')], ReadJournal, AppendBS [(1,'J')]]


------------------------------------------------------------------------

assertProgram :: String -> [Command] -> Assertion
Expand Down

0 comments on commit 5c1aa35

Please sign in to comment.