diff --git a/src/journal/src/Journal.hs b/src/journal/src/Journal.hs index 0839a76a..31283136 100644 --- a/src/journal/src/Journal.hs +++ b/src/journal/src/Journal.hs @@ -208,9 +208,9 @@ readJournal' jour = do HeaderLength len <- readFrameLength termBuffer (TermOffset (int2Int32 offset)) putStrLn ("readJournal, len: " ++ show len) bs <- getByteStringAt termBuffer - (offset + hEADER_LENGTH) (word322Int len - hEADER_LENGTH) - assertM (BS.length bs == word322Int len - hEADER_LENGTH) - incrCounter_ (word322Int len) (jBytesConsumed jour) + (offset + hEADER_LENGTH) (int322Int len - hEADER_LENGTH) + assertM (BS.length bs == int322Int len - hEADER_LENGTH) + incrCounter_ (int322Int len) (jBytesConsumed jour) return (Just bs) readJournal :: JournalConsumer -> IO ByteString @@ -348,6 +348,7 @@ tfl = do HeaderLength headerLen <- readFrameLength bb 0 putStrLn ("headerLength: " ++ show headerLen) - writeFrameLength bb 4 {- (sizeOf (4 :: Word32)) -} 6 - HeaderLength headerLen' <- readFrameLength bb 4 {- (sizeOf (4 :: Word32)) -} + writeFrameLength bb 4 {- (sizeOf (4 :: Int32)) -} (-6) + writeFrameLength bb 4 {- (sizeOf (4 :: Int32)) -} 6 + HeaderLength headerLen' <- readFrameLength bb 4 {- (sizeOf (4 :: Int32)) -} putStrLn ("headerLength': " ++ show headerLen') diff --git a/src/journal/src/Journal/Internal.hs b/src/journal/src/Journal/Internal.hs index 6babb85f..b3870732 100644 --- a/src/journal/src/Journal/Internal.hs +++ b/src/journal/src/Journal/Internal.hs @@ -159,9 +159,11 @@ headerWrite termBuffer termOffset len _termId = do versionFlagsType = fromIntegral cURRENT_VERSION `shiftL` 32 -- XXX: Atomic write? putStrLn ("headerWrite, versionFlagsType: " ++ show versionFlagsType) - putStrLn ("headerWrite, len: " ++ show (unHeaderLength len)) + putStrLn ("headerWrite, len: " ++ show (- unHeaderLength len)) + putStrLn ("headerWrite, value: " ++ show + (versionFlagsType .|. ((- int322Int64 (unHeaderLength len)) .&. 0xFFFF_FFFF))) writeInt64OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET) - (versionFlagsType .|. ((- fromIntegral len) .&. 0xFFFF_FFFF)) + (versionFlagsType .|. ((- int322Int64 (unHeaderLength len)) .&. 0xFFFF_FFFF)) -- XXX: store termId and offset (only need for replication?) rotateTerm :: Metadata -> IO () diff --git a/src/journal/src/Journal/Internal/BufferClaim.hs b/src/journal/src/Journal/Internal/BufferClaim.hs index 8590cb2e..90ccdd5d 100644 --- a/src/journal/src/Journal/Internal/BufferClaim.hs +++ b/src/journal/src/Journal/Internal/BufferClaim.hs @@ -28,15 +28,12 @@ withPtr (BufferClaim bb) k = do commit :: BufferClaim -> IO () commit (BufferClaim bb) = do - Position offset <- readPosition bb let Capacity frameLen = getCapacity bb - putStrLn ("commit, offset: " ++ show offset) putStrLn ("commit, frameLen: " ++ show frameLen) - writeFrameLength bb (fromIntegral offset) (fromIntegral frameLen) + writeFrameLength bb 0 (HeaderLength (int2Int32 frameLen)) abort :: BufferClaim -> IO () abort (BufferClaim bb) = do - Position offset <- readPosition bb let Capacity frameLen = getCapacity bb - writeFrameType bb (fromIntegral offset) Padding - writeFrameLength bb (fromIntegral offset) (fromIntegral frameLen) + writeFrameType bb 0 Padding + writeFrameLength bb 0 (HeaderLength (int2Int32 frameLen)) diff --git a/src/journal/src/Journal/Internal/ByteBufferPtr.hs b/src/journal/src/Journal/Internal/ByteBufferPtr.hs index 343e28e0..fa6a1387 100644 --- a/src/journal/src/Journal/Internal/ByteBufferPtr.hs +++ b/src/journal/src/Journal/Internal/ByteBufferPtr.hs @@ -248,7 +248,7 @@ reset bb = do writePosition bb mrk ------------------------------------------------------------------------ --- * Single-byte relative and absolute operations +-- * Single-byte relative operations putByte :: ByteBuffer -> Word8 -> IO () putByte = undefined @@ -260,12 +260,6 @@ getByte bb = do writePosition bb (pos + 1) return w8 -putByteAt :: ByteBuffer -> Int -> Word8 -> IO () -putByteAt = undefined - -getByteAt :: ByteBuffer -> Int -> IO Word8 -getByteAt = undefined - ------------------------------------------------------------------------ -- * Multi-byte relative and absolute operations @@ -476,18 +470,20 @@ writeInt32OffAddr = primitiveInt32_ writeInt32OffAddr# writeInt64OffAddr :: ByteBuffer -> Int -> Int64 -> IO () writeInt64OffAddr = primitiveInt64_ writeInt64OffAddr# --- writeWord8OffArray# --- writeWord16OffArray# +writeWord8OffAddr :: ByteBuffer -> Int -> Word8 -> IO () +writeWord8OffAddr = primitiveWord_ writeWord8OffAddr# (\(W8# w#) -> w#) + +writeWord16OffAddr :: ByteBuffer -> Int -> Word16 -> IO () +writeWord16OffAddr = primitiveWord_ writeWord16OffAddr# (\(W16# w#) -> w#) writeWord32OffAddr :: ByteBuffer -> Int -> Word32 -> IO () writeWord32OffAddr = primitiveWord_ writeWord32OffAddr# (\(W32# w#) -> w#) - {- --- writeWord64OffArray# +writeWord64OffAddr :: ByteBuffer -> Int -> Word64 -> IO () +writeWord64OffAddr = primitiveWord_ writeWord64OffAddr# (\(W64# w#) -> w#) -- atomicReadIntArray# -- atomicWriteIntArray# --} -- | Given a bytebuffer, an offset in machine words, the expected old value, and -- the new value, perform an atomic compare and swap i.e. write the new value if diff --git a/src/journal/src/Journal/Internal/Utils.hs b/src/journal/src/Journal/Internal/Utils.hs index b72a4871..39ecb0ec 100644 --- a/src/journal/src/Journal/Internal/Utils.hs +++ b/src/journal/src/Journal/Internal/Utils.hs @@ -62,6 +62,9 @@ int642Int i@(I64# i#) = word322Int :: Word32 -> Int word322Int w@(W32# w#) = assert (W64# w# <= fromIntegral (maxBound :: Int)) (fromIntegral w) +int322Int64 :: Int32 -> Int64 +int322Int64 (I32# i#) = I64# i# + fallocate :: FilePath -> Int -> IO () fallocate fp len = do withRWFd fp $ \fd -> do diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index ee13a5e1..1d3ce0e0 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -292,16 +292,16 @@ rotateLog meta termCount termId = do writeFrameType :: ByteBuffer -> TermOffset -> HeaderTag -> IO () writeFrameType termBuffer termOffset (HeaderTag tag) = - putByteAt termBuffer (fromIntegral termOffset + tAG_FIELD_OFFSET) tag + writeWord8OffAddr termBuffer (fromIntegral termOffset + tAG_FIELD_OFFSET) tag writeFrameLength :: ByteBuffer -> TermOffset -> HeaderLength -> IO () writeFrameLength termBuffer termOffset (HeaderLength len) = - writeWord32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET) + writeInt32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET) len readFrameLength :: ByteBuffer -> TermOffset -> IO HeaderLength readFrameLength termBuffer termOffset = HeaderLength <$> - readWord32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET) + readInt32OffAddr termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET) ------------------------------------------------------------------------ @@ -323,7 +323,7 @@ tagString other = "Unknown: " ++ show other newtype HeaderVersion = HeaderVersion Word8 deriving newtype (Eq, Binary, Num, Storable, Integral, Real, Ord, Enum) -newtype HeaderLength = HeaderLength { unHeaderLength :: Word32 } +newtype HeaderLength = HeaderLength { unHeaderLength :: Int32 } deriving newtype (Eq, Ord, Binary, Enum, Real, Integral, Num, Storable) newtype HeaderIndex = HeaderIndex Word32