Skip to content

Commit

Permalink
feat(journal): add and use buffer claim datatype
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 30, 2021
1 parent 0e8a33d commit 8058946
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 93 deletions.
1 change: 1 addition & 0 deletions src/journal/journal.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ library
Journal.Internal.Parse
Journal.Internal.Metrics
Journal.Internal.ByteBuffer
Journal.Internal.BufferClaim
Journal.Internal.Mmap
Journal.Types
Journal.Types.AtomicCounter
Expand Down
4 changes: 4 additions & 0 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import System.FilePath (takeDirectory, (</>))

import Journal.Internal
import Journal.Internal.ByteBuffer
import Journal.Internal.BufferClaim
import Journal.Types
import Journal.Types.AtomicCounter

Expand Down Expand Up @@ -143,6 +144,9 @@ tee jour sock len = do
fptr <- newForeignPtr_ buf
return (BS.copy (fromForeignPtr fptr (offset + hEADER_LENGTH) len))

recvBytes :: BufferClaim -> Socket -> Int -> IO Int
recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock ptr len

appendRecv :: Journal -> Socket -> Int -> IO Int
appendRecv jour sock len = do
assertM (0 < len && hEADER_LENGTH + len + fOOTER_LENGTH <= jMaxByteSize jour)
Expand Down
100 changes: 12 additions & 88 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Journal.Internal where

Expand Down Expand Up @@ -32,61 +30,14 @@ import Journal.Internal.Parse
import Journal.Internal.ByteBuffer
import Journal.Types
import Journal.Types.AtomicCounter

------------------------------------------------------------------------

newtype HeaderTag = HeaderTag { unHeaderTag :: Word8 }
deriving newtype (Eq, Binary, Bits, Show, Num, Storable)

newtype HeaderVersion = HeaderVersion Word8
deriving newtype (Eq, Binary, Num, Storable, Integral, Real, Ord, Enum)

newtype HeaderLength = HeaderLength Word32
deriving newtype (Eq, Ord, Binary, Enum, Real, Integral, Num, Storable)

newtype HeaderIndex = HeaderIndex Word32
deriving newtype (Eq, Binary, Num, Storable)

------------------------------------------------------------------------

-- * Constants

-- | The length of the journal entry header in bytes.
hEADER_LENGTH :: Int
hEADER_LENGTH
= sizeOf (1 :: HeaderTag)
+ sizeOf (1 :: HeaderVersion)
+ sizeOf (4 :: HeaderLength)
-- + sizeOf (4 :: HeaderIndex)
-- XXX: CRC?

fOOTER_LENGTH :: Int
fOOTER_LENGTH = hEADER_LENGTH

cURRENT_VERSION :: HeaderVersion
cURRENT_VERSION = 0

aCTIVE_FILE :: FilePath
aCTIVE_FILE = "active"

dIRTY_FILE :: FilePath
dIRTY_FILE = "dirty"

cLEAN_FILE :: FilePath
cLEAN_FILE = "clean"

sNAPSHOT_FILE :: FilePath
sNAPSHOT_FILE = "snapshot"

aRCHIVE_FILE :: FilePath
aRCHIVE_FILE = "archive"
import Journal.Internal.BufferClaim

------------------------------------------------------------------------

offer :: Ptr Word8 -> Int -> Int -> IO Int
offer buf offset len = undefined

tryClaim :: Journal' -> Int -> IO (Maybe Int64)
tryClaim :: Journal' -> Int -> IO (Maybe (Int64, BufferClaim))
tryClaim jour len = do
-- checkPayloadLength len
termCount <- activeTermCount (jMetadata jour)
Expand All @@ -107,18 +58,18 @@ tryClaim jour len = do
position = termBeginPosition + fromIntegral termOffset
if position < limit
then do
result <- termAppenderClaim (jMetadata jour) termAppender termId termOffset len
newPosition (jMetadata jour) result
mResult <- termAppenderClaim (jMetadata jour) termAppender termId termOffset len
newPosition (jMetadata jour) mResult
else
return (backPressureStatus position len)

calculatePositionLimit = undefined
backPressureStatus = undefined

newPosition :: Metadata -> Maybe TermOffset -> IO (Maybe Int64)
newPosition meta mResultingOffset =
case mResultingOffset of
Just resultingOffset -> do
newPosition :: Metadata -> Maybe (TermOffset, BufferClaim) -> IO (Maybe (Int64, BufferClaim))
newPosition meta mResult =
case mResult of
Just (resultingOffset, bufClaim) -> do
-- XXX: cache
-- termOffset := resultingOffset
termCount <- activeTermCount meta
Expand All @@ -131,7 +82,7 @@ newPosition meta mResultingOffset =
termOffset = rawTailTermOffset rt termLen
termBeginPosition =
computeTermBeginPosition termId (positionBitsToShift termLen) initTermId
return (Just (termBeginPosition + fromIntegral resultingOffset))
return (Just (termBeginPosition + fromIntegral resultingOffset, bufClaim))
Nothing -> do
-- XXX:
-- if termBeginPosition + termBufferLength >= maxPossiblePosition
Expand All @@ -144,7 +95,7 @@ fRAME_ALIGNMENT :: Int
fRAME_ALIGNMENT = 32

termAppenderClaim :: Metadata -> ByteBuffer -> TermId -> TermOffset -> Int
-> IO (Maybe TermOffset)
-> IO (Maybe (TermOffset, BufferClaim))
termAppenderClaim meta termBuffer termId termOffset len = do
let
frameLength = len + hEADER_LENGTH
Expand All @@ -160,8 +111,8 @@ termAppenderClaim meta termBuffer termId termOffset len = do
return Nothing
else do
headerWrite termBuffer termOffset (fromIntegral frameLength) termId
-- claimBuffer <- wrapPart termBuffer termOffset frameLength
return (Just resultingOffset)
bufClaim <- newBufferClaim termBuffer termOffset frameLength
return (Just (resultingOffset, bufClaim))

handleEndOfLogCondition :: ByteBuffer -> TermOffset -> Capacity -> TermId -> IO ()
handleEndOfLogCondition termBuffer termOffset (Capacity termLen) termId = do
Expand All @@ -174,12 +125,6 @@ handleEndOfLogCondition termBuffer termOffset (Capacity termLen) termId = do
writeFrameType termBuffer termOffset Padding
writeFrameLength termBuffer termOffset paddingLength

fRAME_LENGTH_FIELD_OFFSET :: Int
fRAME_LENGTH_FIELD_OFFSET = 0

tAG_FIELD_OFFSET :: Int
tAG_FIELD_OFFSET = 6

headerWrite :: ByteBuffer -> TermOffset -> HeaderLength -> TermId -> IO ()
headerWrite termBuffer termOffset len _termId = do
let versionFlagsType :: Int64
Expand All @@ -189,15 +134,6 @@ headerWrite termBuffer termOffset len _termId = do
(versionFlagsType .|. ((- fromIntegral len) .&. 0xFFFF_FFFF))
-- XXX: store termId and offset (only need for replication?)

writeFrameType :: ByteBuffer -> TermOffset -> HeaderTag -> IO ()
writeFrameType termBuffer termOffset (HeaderTag tag) = do
putByteAt termBuffer (fromIntegral termOffset + tAG_FIELD_OFFSET) tag

writeFrameLength :: ByteBuffer -> TermOffset -> HeaderLength -> IO ()
writeFrameLength termBuffer termOffset (HeaderLength len) = do
writeWord32OffArrayIx termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
len

rotateTerm :: Metadata -> IO ()
rotateTerm meta = do
termCount <- activeTermCount meta
Expand Down Expand Up @@ -315,18 +251,6 @@ data JournalHeaderV0 = JournalHeaderV0
-- , jhChecksum :: !Word32 -- V1
}

pattern Empty = 0 :: HeaderTag
pattern Valid = 1 :: HeaderTag
pattern Invalid = 2 :: HeaderTag
pattern Padding = 4 :: HeaderTag

tagString :: HeaderTag -> String
tagString Empty = "Empty"
tagString Valid = "Valid"
tagString Invalid = "Invalid"
tagString Padding = "Padding"
tagString other = "Unknown: " ++ show other

newHeader :: HeaderTag -> HeaderVersion -> HeaderLength -> JournalHeader
newHeader = JournalHeaderV0

Expand Down
39 changes: 39 additions & 0 deletions src/journal/src/Journal/Internal/BufferClaim.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module Journal.Internal.BufferClaim where

import Data.ByteString (ByteString)
import Data.Word (Word8)
import Foreign.Ptr (Ptr, plusPtr)

import Journal.Internal.ByteBuffer
import Journal.Types

------------------------------------------------------------------------

newtype BufferClaim = BufferClaim ByteBuffer

newBufferClaim :: ByteBuffer -> TermOffset -> Int -> IO BufferClaim
newBufferClaim src offset len = BufferClaim <$>
wrapPart src (fromIntegral offset) len

putBytes :: BufferClaim -> ByteString -> IO ()
putBytes (BufferClaim bb) bs = putByteString bb bs

-- NOTE: The underlying @ByteBuffer@ must have been pinned, otherwise we cannot
-- guarantee to get a pointer to it that isn't moved around.
withPtr :: BufferClaim -> (Ptr Word8 -> IO a) -> IO a
withPtr (BufferClaim bb) k = do
Position offset <- readPosition bb
k (bbPtr bb `plusPtr` offset)

commit :: BufferClaim -> IO ()
commit (BufferClaim bb) = do
Position offset <- readPosition bb
let Capacity frameLen = getCapacity bb
writeFrameLength bb (fromIntegral offset) (fromIntegral frameLen)

abort :: BufferClaim -> IO ()
abort (BufferClaim bb) = do
Position offset <- readPosition bb
let Capacity frameLen = getCapacity bb
writeFrameType bb (fromIntegral offset) Padding
writeFrameLength bb (fromIntegral offset) (fromIntegral frameLen)
4 changes: 3 additions & 1 deletion src/journal/src/Journal/Internal/ByteBuffer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ wrap bb = newByteBuffer (bbData bb) capa lim (Position 0) (Just (bbSlice bb))
wrapPart :: ByteBuffer -> Int -> Int -> IO ByteBuffer
wrapPart bb offset len = newByteBuffer (bbData bb) capa lim pos (Just (bbSlice bb))
where
capa = bbCapacity bb
capa = Capacity len
lim = Limit (fromIntegral offset + fromIntegral len)
pos = Position (fromIntegral offset)

Expand Down Expand Up @@ -374,6 +374,8 @@ readWord8OffArrayIx bb offset@(I# offset#) = do

-- writeCharOffArray#
-- writeWideCharOffArray#
writeInt = writeIntOffArrayIx

writeIntOffArrayIx :: ByteBuffer -> Int -> Int -> IO ()
writeIntOffArrayIx bb ix@(I# ix#) (I# value#) = do
boundCheck bb ix
Expand Down
88 changes: 84 additions & 4 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Journal.Types
-- ( Journal'(Journal')
Expand Down Expand Up @@ -35,13 +37,14 @@ module Journal.Types

import Control.Concurrent.STM
import Control.Concurrent.STM (TVar)
import Data.Binary (Binary)
import Data.Bits
import Data.ByteString (ByteString)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Bits
import Data.Vector (Vector)
import Data.Word (Word32, Word64, Word8)
import Foreign.Ptr (Ptr, plusPtr)
import Foreign.Storable (sizeOf)
import Foreign.Storable (Storable, sizeOf)

import Journal.Internal.ByteBuffer
import Journal.Types.AtomicCounter
Expand Down Expand Up @@ -294,6 +297,83 @@ rotateLog meta termCount termId = do

------------------------------------------------------------------------

writeFrameType :: ByteBuffer -> TermOffset -> HeaderTag -> IO ()
writeFrameType termBuffer termOffset (HeaderTag tag) =
putByteAt termBuffer (fromIntegral termOffset + tAG_FIELD_OFFSET) tag

writeFrameLength :: ByteBuffer -> TermOffset -> HeaderLength -> IO ()
writeFrameLength termBuffer termOffset (HeaderLength len) =
writeWord32OffArrayIx termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
len

------------------------------------------------------------------------

newtype HeaderTag = HeaderTag { unHeaderTag :: Word8 }
deriving newtype (Eq, Binary, Bits, Show, Num, Storable)

pattern Empty = 0 :: HeaderTag
pattern Valid = 1 :: HeaderTag
pattern Invalid = 2 :: HeaderTag
pattern Padding = 4 :: HeaderTag

tagString :: HeaderTag -> String
tagString Empty = "Empty"
tagString Valid = "Valid"
tagString Invalid = "Invalid"
tagString Padding = "Padding"
tagString other = "Unknown: " ++ show other

newtype HeaderVersion = HeaderVersion Word8
deriving newtype (Eq, Binary, Num, Storable, Integral, Real, Ord, Enum)

newtype HeaderLength = HeaderLength Word32
deriving newtype (Eq, Ord, Binary, Enum, Real, Integral, Num, Storable)

newtype HeaderIndex = HeaderIndex Word32
deriving newtype (Eq, Binary, Num, Storable)

------------------------------------------------------------------------

-- * Constants

-- | The length of the journal entry header in bytes.
hEADER_LENGTH :: Int
hEADER_LENGTH
= sizeOf (1 :: HeaderTag)
+ sizeOf (1 :: HeaderVersion)
+ sizeOf (4 :: HeaderLength)
-- + sizeOf (4 :: HeaderIndex)
-- XXX: CRC?

fOOTER_LENGTH :: Int
fOOTER_LENGTH = hEADER_LENGTH

cURRENT_VERSION :: HeaderVersion
cURRENT_VERSION = 0

aCTIVE_FILE :: FilePath
aCTIVE_FILE = "active"

dIRTY_FILE :: FilePath
dIRTY_FILE = "dirty"

cLEAN_FILE :: FilePath
cLEAN_FILE = "clean"

sNAPSHOT_FILE :: FilePath
sNAPSHOT_FILE = "snapshot"

aRCHIVE_FILE :: FilePath
aRCHIVE_FILE = "archive"

fRAME_LENGTH_FIELD_OFFSET :: Int
fRAME_LENGTH_FIELD_OFFSET = 0

tAG_FIELD_OFFSET :: Int
tAG_FIELD_OFFSET = 6

------------------------------------------------------------------------

data Journal = Journal
{ jPtr :: {-# UNPACK #-} !(TVar (Ptr Word8))
, jOffset :: {-# UNPACK #-} !AtomicCounter
Expand Down

0 comments on commit 8058946

Please sign in to comment.