diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index b7d724ac..74419cd5 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -48,8 +48,9 @@ pARTITION_COUNT :: Int pARTITION_COUNT = 3 data Journal' = Journal' - { jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer) - , jMetadata :: {-# UNPACK #-} !ByteBuffer + { jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer) + , jMetadata :: {-# UNPACK #-} !ByteBuffer + , jPositionLimit :: {-# UNPACK #-} !(IORef Int) -- ??? } data JMetadata = JMetadata @@ -92,8 +93,8 @@ lOG_META_DATA_LENGTH = lOG_PAGE_SIZE_OFFSET ------------------------------------------------------------------------ rawTail :: ByteBuffer -> Int -> IO Int64 -rawTail metadataBuffer partitionIndex = - readInt64OffArrayIx metadataBuffer +rawTail meta partitionIndex = + readInt64OffArrayIx meta (tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex)) termId :: Int64 -> Int32 @@ -154,11 +155,57 @@ termLength meta = readInt32OffArrayIx meta lOG_TERM_LENGTH_OFFSET pageSize :: ByteBuffer -> IO Int32 pageSize meta = readInt32OffArrayIx meta lOG_PAGE_SIZE_OFFSET +-- | The number of bits to shift when multiplying or dividing by the term buffer +-- length. +positionBitsToShift :: Int32 -> Int +positionBitsToShift termBufferLength = + case termBufferLength of + 65536 {- 64 * 1024 -} -> 16 + 131072 {- 128 * 1024 -} -> 17 + 262144 {- 256 * 1024 -} -> 18 + 524288 {- 512 * 1024 -} -> 19 + 1048576 {- 1024 * 1024 -} -> 20 + 2097152 {- 2 * 1024 * 1024 -} -> 21 + 4194304 {- 4 * 1024 * 1024 -} -> 22 + 8388608 {- 8 * 1024 * 1024 -} -> 23 + 16777216 {- 16 * 1024 * 1024 -} -> 24 + 33554432 {- 32 * 1024 * 1024 -} -> 25 + 67108864 {- 64 * 1024 * 1024 -} -> 26 + 134217728 {- 128 * 1024 * 1024 -} -> 27 + 268435456 {- 256 * 1024 * 1024 -} -> 28 + 536870912 {- 512 * 1024 * 1024 -} -> 29 + 1073741824 {- 1024 * 1024 * 1024 -} -> 30 + _otherwise -> + error ("positionBitsToShift: invalid term buffer length: " ++ + show (termBufferLength)) + ------------------------------------------------------------------------ +-- | Rotate to the next partition in sequence for the current term id. +nextPartitionIndex :: Int32 -> Int32 +nextPartitionIndex currentIndex = + (currentIndex + 1) `mod` fromIntegral pARTITION_COUNT + +-- | Calculate the partition index to be used given the initial term and active +-- term ids. +indexByTerm :: Int32 -> Int32 -> Int32 +indexByTerm initTermId activeTermId = + (activeTermId - initTermId) `mod` fromIntegral pARTITION_COUNT + +-- | Caluclate the partition index based on number of terms that have passed. indexByTermCount :: Int32 -> Int indexByTermCount termCount = fromIntegral termCount `mod` pARTITION_COUNT +-- | Calculate the partition index given a stream position. +indexByPosition :: Int64 -> Int -> Int +indexByPosition pos posBitsToShift = fromIntegral $ + (pos `shiftR` posBitsToShift) `mod` fromIntegral pARTITION_COUNT + +-- | Compute the current position in absolute number of bytes. +computePosition = undefined + +-- | Compute the current position in absolute number of bytes for the beginning +-- of a term. computeTermBeginPosition :: Int32 -> Int32 -> Int32 -> Int64 computeTermBeginPosition activeTermId posBitsToShift initTermId = let @@ -168,11 +215,31 @@ computeTermBeginPosition activeTermId posBitsToShift initTermId = in termCount `shiftL` fromIntegral posBitsToShift +-- | Compute the term id from a position. +computeTermIdFromPosition = undefined + +-- | Compute the total length of a log file given the term length. +computeLogLength = undefined + +-- | Rotate the log and update the tail counter for the new term. This function +-- is thread safe. rotateLog :: ByteBuffer -> Int32 -> Int32 -> IO Bool rotateLog meta termCount termId0 = do - -- XXX: - undefined - casActiveTermCount meta termCount (termCount + 1) + go + casActiveTermCount meta termCount nextTermCount + where + nextTermId = termId0 + 1 + nextTermCount = termCount + 1 + nextIndex = indexByTermCount nextTermCount + expectedTermId = nextTermId - fromIntegral pARTITION_COUNT + + go = do + rt <- rawTail meta nextIndex + if expectedTermId /= termId rt + then return () + else do + b <- casRawTail meta nextIndex rt (packTail nextTermId 0) + if b then return () else go ------------------------------------------------------------------------