Skip to content

Commit

Permalink
fix(journal): commit should write header at position zero
Browse files Browse the repository at this point in the history
(Because the write operation already does take slice etc into account.)
  • Loading branch information
symbiont-stevan-andjelkovic committed Jan 17, 2022
1 parent 087c426 commit c31cb31
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 29 deletions.
11 changes: 6 additions & 5 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ readJournal' jour = do
HeaderLength len <- readFrameLength termBuffer (TermOffset (int2Int32 offset))
putStrLn ("readJournal, len: " ++ show len)
bs <- getByteStringAt termBuffer
(offset + hEADER_LENGTH) (word322Int len - hEADER_LENGTH)
assertM (BS.length bs == word322Int len - hEADER_LENGTH)
incrCounter_ (word322Int len) (jBytesConsumed jour)
(offset + hEADER_LENGTH) (int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
incrCounter_ (int322Int len) (jBytesConsumed jour)
return (Just bs)

readJournal :: JournalConsumer -> IO ByteString
Expand Down Expand Up @@ -348,6 +348,7 @@ tfl = do
HeaderLength headerLen <- readFrameLength bb 0
putStrLn ("headerLength: " ++ show headerLen)

writeFrameLength bb 4 {- (sizeOf (4 :: Word32)) -} 6
HeaderLength headerLen' <- readFrameLength bb 4 {- (sizeOf (4 :: Word32)) -}
writeFrameLength bb 4 {- (sizeOf (4 :: Int32)) -} (-6)
writeFrameLength bb 4 {- (sizeOf (4 :: Int32)) -} 6
HeaderLength headerLen' <- readFrameLength bb 4 {- (sizeOf (4 :: Int32)) -}
putStrLn ("headerLength': " ++ show headerLen')
6 changes: 4 additions & 2 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ headerWrite termBuffer termOffset len _termId = do
versionFlagsType = fromIntegral cURRENT_VERSION `shiftL` 32
-- XXX: Atomic write?
putStrLn ("headerWrite, versionFlagsType: " ++ show versionFlagsType)
putStrLn ("headerWrite, len: " ++ show (unHeaderLength len))
putStrLn ("headerWrite, len: " ++ show (- unHeaderLength len))
putStrLn ("headerWrite, value: " ++ show
(versionFlagsType .|. ((- int322Int64 (unHeaderLength len)) .&. 0xFFFF_FFFF)))
writeInt64OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
(versionFlagsType .|. ((- fromIntegral len) .&. 0xFFFF_FFFF))
(versionFlagsType .|. ((- int322Int64 (unHeaderLength len)) .&. 0xFFFF_FFFF))
-- XXX: store termId and offset (only need for replication?)

rotateTerm :: Metadata -> IO ()
Expand Down
9 changes: 3 additions & 6 deletions src/journal/src/Journal/Internal/BufferClaim.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ withPtr (BufferClaim bb) k = do

commit :: BufferClaim -> IO ()
commit (BufferClaim bb) = do
Position offset <- readPosition bb
let Capacity frameLen = getCapacity bb
putStrLn ("commit, offset: " ++ show offset)
putStrLn ("commit, frameLen: " ++ show frameLen)
writeFrameLength bb (fromIntegral offset) (fromIntegral frameLen)
writeFrameLength bb 0 (HeaderLength (int2Int32 frameLen))

abort :: BufferClaim -> IO ()
abort (BufferClaim bb) = do
Position offset <- readPosition bb
let Capacity frameLen = getCapacity bb
writeFrameType bb (fromIntegral offset) Padding
writeFrameLength bb (fromIntegral offset) (fromIntegral frameLen)
writeFrameType bb 0 Padding
writeFrameLength bb 0 (HeaderLength (int2Int32 frameLen))
20 changes: 8 additions & 12 deletions src/journal/src/Journal/Internal/ByteBufferPtr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ reset bb = do
writePosition bb mrk

------------------------------------------------------------------------
-- * Single-byte relative and absolute operations
-- * Single-byte relative operations

putByte :: ByteBuffer -> Word8 -> IO ()
putByte = undefined
Expand All @@ -260,12 +260,6 @@ getByte bb = do
writePosition bb (pos + 1)
return w8

putByteAt :: ByteBuffer -> Int -> Word8 -> IO ()
putByteAt = undefined

getByteAt :: ByteBuffer -> Int -> IO Word8
getByteAt = undefined

------------------------------------------------------------------------
-- * Multi-byte relative and absolute operations

Expand Down Expand Up @@ -476,18 +470,20 @@ writeInt32OffAddr = primitiveInt32_ writeInt32OffAddr#
writeInt64OffAddr :: ByteBuffer -> Int -> Int64 -> IO ()
writeInt64OffAddr = primitiveInt64_ writeInt64OffAddr#

-- writeWord8OffArray#
-- writeWord16OffArray#
writeWord8OffAddr :: ByteBuffer -> Int -> Word8 -> IO ()
writeWord8OffAddr = primitiveWord_ writeWord8OffAddr# (\(W8# w#) -> w#)

writeWord16OffAddr :: ByteBuffer -> Int -> Word16 -> IO ()
writeWord16OffAddr = primitiveWord_ writeWord16OffAddr# (\(W16# w#) -> w#)

writeWord32OffAddr :: ByteBuffer -> Int -> Word32 -> IO ()
writeWord32OffAddr = primitiveWord_ writeWord32OffAddr# (\(W32# w#) -> w#)

{-
-- writeWord64OffArray#
writeWord64OffAddr :: ByteBuffer -> Int -> Word64 -> IO ()
writeWord64OffAddr = primitiveWord_ writeWord64OffAddr# (\(W64# w#) -> w#)

-- atomicReadIntArray#
-- atomicWriteIntArray#
-}

-- | Given a bytebuffer, an offset in machine words, the expected old value, and
-- the new value, perform an atomic compare and swap i.e. write the new value if
Expand Down
3 changes: 3 additions & 0 deletions src/journal/src/Journal/Internal/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ int642Int i@(I64# i#) =
word322Int :: Word32 -> Int
word322Int w@(W32# w#) = assert (W64# w# <= fromIntegral (maxBound :: Int)) (fromIntegral w)

int322Int64 :: Int32 -> Int64
int322Int64 (I32# i#) = I64# i#

fallocate :: FilePath -> Int -> IO ()
fallocate fp len = do
withRWFd fp $ \fd -> do
Expand Down
8 changes: 4 additions & 4 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,16 @@ rotateLog meta termCount termId = do

writeFrameType :: ByteBuffer -> TermOffset -> HeaderTag -> IO ()
writeFrameType termBuffer termOffset (HeaderTag tag) =
putByteAt termBuffer (fromIntegral termOffset + tAG_FIELD_OFFSET) tag
writeWord8OffAddr termBuffer (fromIntegral termOffset + tAG_FIELD_OFFSET) tag

writeFrameLength :: ByteBuffer -> TermOffset -> HeaderLength -> IO ()
writeFrameLength termBuffer termOffset (HeaderLength len) =
writeWord32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
writeInt32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
len

readFrameLength :: ByteBuffer -> TermOffset -> IO HeaderLength
readFrameLength termBuffer termOffset = HeaderLength <$>
readWord32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
readInt32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)

------------------------------------------------------------------------

Expand All @@ -323,7 +323,7 @@ tagString other = "Unknown: " ++ show other
newtype HeaderVersion = HeaderVersion Word8
deriving newtype (Eq, Binary, Num, Storable, Integral, Real, Ord, Enum)

newtype HeaderLength = HeaderLength { unHeaderLength :: Word32 }
newtype HeaderLength = HeaderLength { unHeaderLength :: Int32 }
deriving newtype (Eq, Ord, Binary, Enum, Real, Integral, Num, Storable)

newtype HeaderIndex = HeaderIndex Word32
Expand Down

0 comments on commit c31cb31

Please sign in to comment.