Skip to content

Commit

Permalink
fix(journal): account for slice in more places
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jan 17, 2022
1 parent c988b8c commit 085e4fa
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ tj = do
Just bs <- readJournal' jour
putStrLn ("read bytestring 1: '" ++ BSChar8.unpack bs ++ "'")

Just (offset', claimBuf') <- tryClaim jour 5
Just (offset', claimBuf') <- tryClaim jour 6
putStrLn ("offset': " ++ show offset')
putBS claimBuf' hEADER_LENGTH (BSChar8.pack "world")
putBS claimBuf' hEADER_LENGTH (BSChar8.pack "world!")
commit claimBuf'
Just bs' <- readJournal' jour
putStrLn ("read bytestring 2: '" ++ BSChar8.unpack bs' ++ "'")
Expand Down
5 changes: 5 additions & 0 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tryClaim jour len = do
termBeginPosition =
computeTermBeginPosition termId (positionBitsToShift termLen) initTermId

putStrLn ("tryClaim, termOffset: " ++ show (unTermOffset termOffset))
limit <- calculatePositionLimit jour
let termAppender = jTermBuffers jour Vector.! unPartitionIndex activePartitionIndex
position = termBeginPosition + fromIntegral termOffset
Expand Down Expand Up @@ -125,12 +126,16 @@ termAppenderClaim meta termBuffer termId termOffset len = do
termLength = getCapacity termBuffer
termCount <- activeTermCount meta
let activePartitionIndex = indexByTermCount termCount
putStrLn ("termAppenderClaim, resultingOffset: " ++
show (unTermOffset resultingOffset))
writeRawTail meta termId resultingOffset activePartitionIndex
if resultingOffset > fromIntegral termLength
then do
handleEndOfLogCondition termBuffer termOffset termLength termId
return Nothing
else do
putStrLn ("termAppenderClaim, frameLength: " ++
show frameLength)
headerWrite termBuffer termOffset (fromIntegral frameLength) termId
bufClaim <- newBufferClaim termBuffer termOffset frameLength
return (Just (resultingOffset, bufClaim))
Expand Down
34 changes: 19 additions & 15 deletions src/journal/src/Journal/Internal/ByteBufferPtr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ remaining bb = do

boundCheck :: HasCallStack => ByteBuffer -> Int -> IO ()
boundCheck bb ix = do
invariant bb
-- XXX: parametrise on build flag and only do these checks if enabled?
Slice slice <- readIORef (bbSlice bb)
if ix - slice < fromIntegral (getCapacity bb) &&
0 <= ix + slice
Limit limit <- readIORef (bbLimit bb)
if 0 <= ix + slice &&
ix - slice < limit - slice
then return ()
else throwIO (IndexOutOfBounds errMsg)
where
Expand All @@ -130,17 +132,17 @@ boundCheck bb ix = do
, "(", show ix, ",", show (unCapacity (getCapacity bb)), ")"
]

invariant :: ByteBuffer -> IO ()
invariant :: HasCallStack => ByteBuffer -> IO ()
invariant bb = do
mark <- readMark bb
pos <- readPosition bb
lim <- readLimit bb
let capa = getCapacity bb
assert ((mark == (-1) || 0 <= mark) &&
mark <= fromIntegral pos &&
pos <= fromIntegral lim &&
lim <= fromIntegral capa)
(return ())
Position mark <- readMark bb
Position pos <- readPosition bb
Slice slice <- readIORef (bbSlice bb)
Limit lim <- readLimit bb
let Capacity capa = getCapacity bb
assertM (mark == (-1) || 0 <= mark)
assertM (mark <= pos)
assertM (pos <= lim)
assertM (lim - slice <= capa)

------------------------------------------------------------------------
-- * Create
Expand Down Expand Up @@ -175,8 +177,9 @@ wrap bb = newByteBuffer (bbData bb) capa lim (Position 0) (Just (bbSlice bb))

wrapPart :: ByteBuffer -> Int -> Int -> IO ByteBuffer
wrapPart bb offset len = do
slice <- newIORef (Slice offset)
newByteBuffer (bbData bb) capa lim pos (Just slice)
Slice slice <- readIORef (bbSlice bb)
slice' <- newIORef (Slice (slice + offset))
newByteBuffer (bbData bb) capa lim pos (Just slice')
where
capa = Capacity len
lim = Limit (fromIntegral offset + fromIntegral len)
Expand Down Expand Up @@ -283,11 +286,12 @@ getBytes bb offset len = undefined

putByteStringAt :: ByteBuffer -> Int -> BS.ByteString -> IO ()
putByteStringAt bb doffset bs = do
Slice slice <- readIORef (bbSlice bb)
let (fptr, soffset, len) = BS.toForeignPtr bs
boundCheck bb (doffset + len - 1)
withForeignPtr fptr $ \sptr ->
withForeignPtr (bbData bb) $ \dptr ->
copyBytes (dptr `plusPtr` doffset) (sptr `plusPtr` soffset) len
copyBytes (dptr `plusPtr` (slice + doffset)) (sptr `plusPtr` soffset) len
{-
putLazyByteString :: ByteBuffer -> LBS.ByteString -> IO ()
putLazyByteString bb lbs = do
Expand Down

0 comments on commit 085e4fa

Please sign in to comment.