diff --git a/src/journal/src/Journal/Internal/ByteBuffer.hs b/src/journal/src/Journal/Internal/ByteBuffer.hs index 58b113f9..56934519 100644 --- a/src/journal/src/Journal/Internal/ByteBuffer.hs +++ b/src/journal/src/Journal/Internal/ByteBuffer.hs @@ -29,7 +29,7 @@ data ByteBuffer = ByteBuffer , bbSlice :: {-# UNPACK #-} !(IORef Slice) } -newtype Capacity = Capacity Int +newtype Capacity = Capacity { unCapacity :: Int } deriving (Num, Integral, Real, Ord, Eq, Enum) newtype Limit = Limit Int @@ -105,11 +105,17 @@ remaining bb = do ------------------------------------------------------------------------ -- * Checks -boundCheck :: ByteBuffer -> Int -> IO () -boundCheck bb ix = do - if fromIntegral ix <= getCapacity bb +boundCheck :: ByteBuffer -> String -> Int -> IO () +boundCheck bb ctx ix = do + -- XXX: parametrise on build flag and only do these checks if enabled? + if ix < fromIntegral (getCapacity bb) then return () - else throwIO (IndexOutOfBounds "XXX") + else throwIO (IndexOutOfBounds errMsg) + where + errMsg = concat + [ ctx, ": index out of bounds " + , "(", show ix, ",", show (unCapacity (getCapacity bb)), ")" + ] invariant :: ByteBuffer -> IO () invariant bb = do @@ -243,41 +249,111 @@ getStorable bb = do putStorableAt :: Storable a => ByteBuffer -> Int -> a -> IO () putStorableAt bb ix x = do - boundCheck bb ix + boundCheck bb "putStorableAt" ix pokeByteOff (bbPtr bb) ix x getStorableAt :: Storable a => ByteBuffer -> Int -> IO a getStorableAt bb ix = do - boundCheck bb ix + boundCheck bb "getStorableAt" ix peekByteOff (bbPtr bb) ix ------------------------------------------------------------------------ --- indexCharOffAddr# --- indexWideCharOffAddr# --- indexIntOffAddr# --- indexWordOffAddr# --- indexAddrOffAddr# --- indexFloatOffAddr# --- indexDoubleOffAddr# --- indexStablePtrOffAddr# --- indexInt8OffAddr# --- indexInt16OffAddr# --- indexInt32OffAddr# --- indexInt64OffAddr# --- indexWord8OffAddr# --- indexWord16OffAddr# --- indexWord32OffAddr# --- indexWord64OffAddr# - --- writeIntArray# +-- readCharOffArray# +-- readWideCharOffArray# +-- readIntOffArray# +-- readWordOffArray# +-- readArrayOffAddr# +-- readFloatOffArray# +-- readDoubleOffArray# +-- readStablePtrOffArray# +-- readInt8OffArray# +-- readInt16OffArray# +readInt32OffArrayIx :: ByteBuffer -> Int -> IO Int32 +readInt32OffArrayIx bb (I# ix#) = IO $ \s -> + case readInt32Array# (bbData bb) ix# s of + (# s', i #) -> (# s', fromIntegral (I# i) #) + +readInt64OffArrayIx :: ByteBuffer -> Int -> IO Int64 +readInt64OffArrayIx bb (I# ix#) = IO $ \s -> + case readInt64Array# (bbData bb) ix# s of + (# s', i #) -> (# s', fromIntegral (I# i) #) +-- readWord8OffArray# +-- readWord16OffArray# +-- readWord32OffArray# +-- readWord64OffArray# + +-- writeCharOffArray# +-- writeWideCharOffArray# +-- writeIntOffArray# +-- writeWordOffArray# +-- writeArrayOffAddr# +-- writeFloatOffArray# +-- writeDoubleOffArray# +-- writeStablePtrOffArray# +-- writeInt8OffArray# +-- writeInt16OffArray# + +writeInt32OffArrayIx :: ByteBuffer -> Int -> Int32 -> IO () +writeInt32OffArrayIx bb (I# ix#) value = IO $ \s -> + case writeInt32Array# (bbData bb) ix# value# s of + s' -> (# s', () #) + where + I# value# = fromIntegral value + +writeInt64OffArrayIx :: ByteBuffer -> Int -> Int64 -> IO () +writeInt64OffArrayIx bb (I# ix#) value = IO $ \s -> + case writeInt64Array# (bbData bb) ix# value# s of + s' -> (# s', () #) + where + I# value# = fromIntegral value + +-- writeWord8OffArray# +-- writeWord16OffArray# +-- writeWord32OffArray# +-- writeWord64OffArray# + +-- | Given a bytebuffer, an offset in machine words, the expected old value, and +-- the new value, perform an atomic compare and swap i.e. write the new value if +-- the current value matches the provided old value. Returns a boolean +-- indicating whether the compare and swap succeded or not. Implies a full +-- memory barrier. +casIntArray :: ByteBuffer -> Int -> Int -> Int -> IO Bool +casIntArray bb (I# offset#) (I# old#) (I# new#) = IO $ \s -> + case casIntArray# (bbData bb) offset# old# new# s of + (# s', before# #) -> case before# ==# old# of + 1# -> (# s', True #) + 0# -> (# s', False #) + +-- | Given a bytebuffer, and offset in machine words, and a value to add, +-- atomically add the value to the element. Returns the value of the element +-- before the operation. Implies a full memory barrier. +fetchAddIntArray :: ByteBuffer -> Int -> Int -> IO Int +fetchAddIntArray bb (I# offset#) (I# incr#) = IO $ \s -> + case fetchAddIntArray# (bbData bb) offset# incr# s of + (# s', before# #) -> (# s', I# before# #) + +-- | Given a bytebuffer, and offset in machine words, and a value to add, +-- atomically add the value to the element. Implies a full memory barrier. +fetchAddIntArray_ :: ByteBuffer -> Int -> Int -> IO () +fetchAddIntArray_ bb (I# offset#) (I# incr#) = IO $ \s -> + case fetchAddIntArray# (bbData bb) offset# incr# s of + (# s', _before# #) -> (# s', () #) + +-- | Given a bytebuffer, and offset in machine words, and a value to add, +-- atomically add the value to the element. Returns the value of the element +-- after the operation. Implies a full memory barrier. +fetchAddIntArray' :: ByteBuffer -> Int -> Int -> IO Int +fetchAddIntArray' bb (I# offset#) (I# incr#) = IO $ \s -> + case fetchAddIntArray# (bbData bb) offset# incr# s of + (# s', before# #) -> (# s', I# (before# +# incr#) #) ------------------------------------------------------------------------ -- * Mapped -- | Calls `msync` which forces the data in memory to be synced to disk. force :: ByteBuffer -> IO () -force = undefined +force bb = msync (bbPtr bb) (fromIntegral (bbCapacity bb)) MS_SYNC False ------------------------------------------------------------------------ diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index b50bed38..b7d724ac 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE NumericUnderscores #-} + module Journal.Types ( Journal(Journal) , jMaxByteSize @@ -19,20 +21,161 @@ module Journal.Types , getMaxByteSize , readFileCount , bumpFileCount - , module Journal.Types.AtomicCounter) + , module Journal.Types.AtomicCounter + , packTail + , termId + , termOffset + ) where import Control.Concurrent.STM import Control.Concurrent.STM (TVar) import Data.ByteString (ByteString) import Data.IORef (IORef, newIORef, readIORef, writeIORef) +import Data.Int +import Data.Bits +import Data.Vector (Vector) import Data.Word (Word32, Word64, Word8) import Foreign.Ptr (Ptr, plusPtr) +import Foreign.Storable (sizeOf) +import Journal.Internal.ByteBuffer import Journal.Types.AtomicCounter ------------------------------------------------------------------------ +pARTITION_COUNT :: Int +pARTITION_COUNT = 3 + +data Journal' = Journal' + { jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer) + , jMetadata :: {-# UNPACK #-} !ByteBuffer + } + +data JMetadata = JMetadata + { mdRawTail0 :: Int64 + , mdRawTail1 :: Int64 + , mdRawTail2 :: Int64 + , mdActiveCount :: Int32 + -- padding + , mdInitialTermId :: Int32 + -- mdDefaultFrameHeaderLength :: Int32? + -- mdMTULength :: Int32, only needed if we want to fragment large messages... + , mdTermLength :: Int32 + , mdPageSize :: Int32 + -- padding + -- , mdDefaultFrameHeader :: Bytestring??? + } + +tERM_TAIL_COUNTERS_OFFSET :: Int +tERM_TAIL_COUNTERS_OFFSET = 0 + +lOG_ACTIVE_TERM_COUNT_OFFSET :: Int +lOG_ACTIVE_TERM_COUNT_OFFSET = tERM_TAIL_COUNTERS_OFFSET + + sizeOf (8 :: Int64) * pARTITION_COUNT + +lOG_INITIAL_TERM_ID_OFFSET :: Int +lOG_INITIAL_TERM_ID_OFFSET = lOG_ACTIVE_TERM_COUNT_OFFSET + + sizeOf (4 :: Int32) + +lOG_TERM_LENGTH_OFFSET :: Int +lOG_TERM_LENGTH_OFFSET = lOG_INITIAL_TERM_ID_OFFSET + + sizeOf (4 :: Int32) + +lOG_PAGE_SIZE_OFFSET :: Int +lOG_PAGE_SIZE_OFFSET = lOG_TERM_LENGTH_OFFSET + + sizeOf (4 :: Int32) + +lOG_META_DATA_LENGTH :: Int +lOG_META_DATA_LENGTH = lOG_PAGE_SIZE_OFFSET + +------------------------------------------------------------------------ + +rawTail :: ByteBuffer -> Int -> IO Int64 +rawTail metadataBuffer partitionIndex = + readInt64OffArrayIx metadataBuffer + (tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex)) + +termId :: Int64 -> Int32 +termId = fromIntegral . (`shiftR` 32) + +termOffset :: Int64 -> Int64 -> Int32 +termOffset rawTail0 termLen = + fromIntegral (min (rawTail0 .&. 0xFFFF_FFFF) termLen) + +packTail :: Int32 -> Int32 -> Int64 +packTail termId0 termOffset0 = + (fromIntegral termId0 `shiftL` 32) .|. (fromIntegral termOffset0 .&. 0xFFFF_FFFF); + +setRawTail :: ByteBuffer -> Int32 -> Int32 -> Int -> IO () +setRawTail meta termId0 termOffset0 partitionIndex = + writeInt64OffArrayIx meta + (tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex)) + (packTail termId0 termOffset0) + +casRawTail :: ByteBuffer -> Int -> Int64 -> Int64 -> IO Bool +casRawTail meta partitionIndex expectedRawTail newRawTail = + casIntArray meta + (tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex)) + (fromIntegral expectedRawTail) (fromIntegral newRawTail) -- XXX: 32-bit systems? + +initialiseTailWithTermId :: ByteBuffer -> Int -> Int32 -> IO () +initialiseTailWithTermId meta partitionIndex termId0 = + setRawTail meta termId0 0 partitionIndex + +activeTermCount :: ByteBuffer -> IO Int32 +activeTermCount meta = readInt32OffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET + +setActiveTermCount :: ByteBuffer -> Int32 -> IO () +setActiveTermCount meta = writeInt32OffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET + +casActiveTermCount :: ByteBuffer -> Int32 -> Int32 -> IO Bool +casActiveTermCount meta expectedTermCount newTermCount = + undefined + -- casIntArray only works on `Int`, does it mean we need to change all `Int32` + -- to `Int`? Or can we keep `Int32` and use casIntArray + fromIntegral? + + -- casIntArray meta lOG_ACTIVE_TERM_COUNT_OFFSET expectedTermCount newTermCount + +initialTermId :: ByteBuffer -> IO Int32 +initialTermId meta = readInt32OffArrayIx meta lOG_INITIAL_TERM_ID_OFFSET + +-- should never be changed? +-- setInitialTermId :: ByteBuffer -> Int32 -> IO () +-- setInitialTermId meta = writeInt32OffArrayIx meta lOG_INITIAL_TERM_ID_OFFSET + +termLength :: ByteBuffer -> IO Int32 +termLength meta = readInt32OffArrayIx meta lOG_TERM_LENGTH_OFFSET + +-- should never be changed? +-- setTermLength :: ByteBuffer -> Int32 -> IO () +-- setTermLength meta = writeInt32OffArrayIx meta lOG_TERM_LENGTH_OFFSET + +pageSize :: ByteBuffer -> IO Int32 +pageSize meta = readInt32OffArrayIx meta lOG_PAGE_SIZE_OFFSET + +------------------------------------------------------------------------ + +indexByTermCount :: Int32 -> Int +indexByTermCount termCount = fromIntegral termCount `mod` pARTITION_COUNT + +computeTermBeginPosition :: Int32 -> Int32 -> Int32 -> Int64 +computeTermBeginPosition activeTermId posBitsToShift initTermId = + let + termCount :: Int64 + -- Copes with negative `activeTermId` on rollover. + termCount = fromIntegral (activeTermId - initTermId) + in + termCount `shiftL` fromIntegral posBitsToShift + +rotateLog :: ByteBuffer -> Int32 -> Int32 -> IO Bool +rotateLog meta termCount termId0 = do + -- XXX: + undefined + casActiveTermCount meta termCount (termCount + 1) + +------------------------------------------------------------------------ + data Journal = Journal { jPtr :: {-# UNPACK #-} !(TVar (Ptr Word8)) , jOffset :: {-# UNPACK #-} !AtomicCounter @@ -76,6 +219,7 @@ data Options = Options -- max disk space in total? multiple of maxSize? -- checksum? none, crc32 or sha256? -- wait strategy? + -- page size? (for prefetching (see ghc-prim) and buffering writes?) data JournalConsumer = JournalConsumer { jcPtr :: {-# UNPACK #-} !(IORef (Ptr Word8))