Skip to content

Commit

Permalink
feat(journal): add rotateLog and a few other metadata helper functions
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 28, 2021
1 parent 3a3c4a5 commit e2bcc39
Showing 1 changed file with 74 additions and 7 deletions.
81 changes: 74 additions & 7 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ pARTITION_COUNT :: Int
pARTITION_COUNT = 3

data Journal' = Journal'
{ jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer)
, jMetadata :: {-# UNPACK #-} !ByteBuffer
{ jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer)
, jMetadata :: {-# UNPACK #-} !ByteBuffer
, jPositionLimit :: {-# UNPACK #-} !(IORef Int) -- ???
}

data JMetadata = JMetadata
Expand Down Expand Up @@ -92,8 +93,8 @@ lOG_META_DATA_LENGTH = lOG_PAGE_SIZE_OFFSET
------------------------------------------------------------------------

rawTail :: ByteBuffer -> Int -> IO Int64
rawTail metadataBuffer partitionIndex =
readInt64OffArrayIx metadataBuffer
rawTail meta partitionIndex =
readInt64OffArrayIx meta
(tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex))

termId :: Int64 -> Int32
Expand Down Expand Up @@ -154,11 +155,57 @@ termLength meta = readInt32OffArrayIx meta lOG_TERM_LENGTH_OFFSET
pageSize :: ByteBuffer -> IO Int32
pageSize meta = readInt32OffArrayIx meta lOG_PAGE_SIZE_OFFSET

-- | The number of bits to shift when multiplying or dividing by the term buffer
-- length.
positionBitsToShift :: Int32 -> Int
positionBitsToShift termBufferLength =
case termBufferLength of
65536 {- 64 * 1024 -} -> 16
131072 {- 128 * 1024 -} -> 17
262144 {- 256 * 1024 -} -> 18
524288 {- 512 * 1024 -} -> 19
1048576 {- 1024 * 1024 -} -> 20
2097152 {- 2 * 1024 * 1024 -} -> 21
4194304 {- 4 * 1024 * 1024 -} -> 22
8388608 {- 8 * 1024 * 1024 -} -> 23
16777216 {- 16 * 1024 * 1024 -} -> 24
33554432 {- 32 * 1024 * 1024 -} -> 25
67108864 {- 64 * 1024 * 1024 -} -> 26
134217728 {- 128 * 1024 * 1024 -} -> 27
268435456 {- 256 * 1024 * 1024 -} -> 28
536870912 {- 512 * 1024 * 1024 -} -> 29
1073741824 {- 1024 * 1024 * 1024 -} -> 30
_otherwise ->
error ("positionBitsToShift: invalid term buffer length: " ++
show (termBufferLength))

------------------------------------------------------------------------

-- | Rotate to the next partition in sequence for the current term id.
nextPartitionIndex :: Int32 -> Int32
nextPartitionIndex currentIndex =
(currentIndex + 1) `mod` fromIntegral pARTITION_COUNT

-- | Calculate the partition index to be used given the initial term and active
-- term ids.
indexByTerm :: Int32 -> Int32 -> Int32
indexByTerm initTermId activeTermId =
(activeTermId - initTermId) `mod` fromIntegral pARTITION_COUNT

-- | Caluclate the partition index based on number of terms that have passed.
indexByTermCount :: Int32 -> Int
indexByTermCount termCount = fromIntegral termCount `mod` pARTITION_COUNT

-- | Calculate the partition index given a stream position.
indexByPosition :: Int64 -> Int -> Int
indexByPosition pos posBitsToShift = fromIntegral $
(pos `shiftR` posBitsToShift) `mod` fromIntegral pARTITION_COUNT

-- | Compute the current position in absolute number of bytes.
computePosition = undefined

-- | Compute the current position in absolute number of bytes for the beginning
-- of a term.
computeTermBeginPosition :: Int32 -> Int32 -> Int32 -> Int64
computeTermBeginPosition activeTermId posBitsToShift initTermId =
let
Expand All @@ -168,11 +215,31 @@ computeTermBeginPosition activeTermId posBitsToShift initTermId =
in
termCount `shiftL` fromIntegral posBitsToShift

-- | Compute the term id from a position.
computeTermIdFromPosition = undefined

-- | Compute the total length of a log file given the term length.
computeLogLength = undefined

-- | Rotate the log and update the tail counter for the new term. This function
-- is thread safe.
rotateLog :: ByteBuffer -> Int32 -> Int32 -> IO Bool
rotateLog meta termCount termId0 = do
-- XXX:
undefined
casActiveTermCount meta termCount (termCount + 1)
go
casActiveTermCount meta termCount nextTermCount
where
nextTermId = termId0 + 1
nextTermCount = termCount + 1
nextIndex = indexByTermCount nextTermCount
expectedTermId = nextTermId - fromIntegral pARTITION_COUNT

go = do
rt <- rawTail meta nextIndex
if expectedTermId /= termId rt
then return ()
else do
b <- casRawTail meta nextIndex rt (packTail nextTermId 0)
if b then return () else go

------------------------------------------------------------------------

Expand Down

0 comments on commit e2bcc39

Please sign in to comment.