Skip to content

Commit

Permalink
refactor(journal): revert back to using bytearray for bytebuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 21, 2021
1 parent ac30608 commit 930edda
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 68 deletions.
125 changes: 81 additions & 44 deletions src/journal/src/Journal/Internal/ByteBuffer.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE UnboxedTuples #-}

module Journal.Internal.ByteBuffer where
Expand All @@ -8,68 +9,98 @@ import Data.IORef
import Foreign
import GHC.ForeignPtr
import GHC.Exts
import GHC.Types
import GHC.IO
import System.Posix.IO (openFd, defaultFileFlags, OpenMode(ReadWrite))

import Journal.Internal.Mmap

------------------------------------------------------------------------
-- * Type
-- * Types

type MBArray = MutableByteArray# RealWorld

data ByteBuffer = ByteBuffer
{ bbAddr :: {-# UNPACK #-} !Addr#
, bbCapacity :: {-# UNPACK #-} !Int
, bbLimit :: {-# UNPACK #-} !(IORef Int)
, bbPosition :: {-# UNPACK #-} !(IORef Int)
, bbMark :: {-# UNPACK #-} !(IORef Int)
{ bbData :: {-# UNPACK #-} !MBArray
, bbCapacity :: {-# UNPACK #-} !Capacity
, bbLimit :: {-# UNPACK #-} !(IORef Limit)
, bbPosition :: {-# UNPACK #-} !(IORef Position)
, bbMark :: {-# UNPACK #-} !(IORef Position)
, bbSlice :: {-# UNPACK #-} !(IORef Slice)
}

newByteBuffer :: Addr# -> Int -> Int -> Int -> IO ByteBuffer
newByteBuffer addr# capa lim pos =
ByteBuffer addr# capa <$> newIORef lim <*> newIORef pos <*> newIORef (-1)
newtype Capacity = Capacity Int
deriving (Num, Integral, Real, Ord, Eq, Enum)

newtype Limit = Limit Int
deriving (Num, Integral, Real, Ord, Eq, Enum)

newtype Position = Position Int
deriving (Num, Integral, Real, Ord, Eq, Enum)

newtype Slice = Slice Int
deriving (Num, Integral, Real, Ord, Eq, Enum)

------------------------------------------------------------------------

newByteBuffer :: MBArray -> Capacity -> Limit -> Position -> Maybe (IORef Slice)
-> IO ByteBuffer
newByteBuffer mba# capa lim pos mSli
= ByteBuffer mba# capa
<$> newIORef lim
<*> newIORef pos
<*> newIORef (-1)
<*> maybe (newIORef 0) return mSli

bbPtr :: ByteBuffer -> Ptr a
bbPtr (ByteBuffer addr# _ _ _ _) = Ptr addr#
bbPtr (ByteBuffer mba# _ _ _ _ _) = Ptr (byteArrayContents# (unsafeCoerce# mba#))
{-# INLINE bbPtr #-}

getCapacity :: ByteBuffer -> Int
getCapacity :: ByteBuffer -> Capacity
getCapacity = bbCapacity
{-# INLINE getCapacity #-}

readLimit :: ByteBuffer -> IO Int
readLimit :: ByteBuffer -> IO Limit
readLimit = readIORef . bbLimit
{-# INLINE readLimit #-}

writeLimit :: ByteBuffer -> Int -> IO ()
writeLimit :: ByteBuffer -> Limit -> IO ()
writeLimit bb = writeIORef (bbLimit bb)
{-# INLINE writeLimit #-}

readPosition :: ByteBuffer -> IO Int
readPosition :: ByteBuffer -> IO Position
readPosition = readIORef . bbPosition
{-# INLINE readPosition #-}

writePosition :: ByteBuffer -> Int -> IO ()
writePosition :: ByteBuffer -> Position -> IO ()
writePosition bb = writeIORef (bbPosition bb)
{-# INLINE writePosition #-}

incrPosition :: ByteBuffer -> Int -> IO ()
incrPosition bb i = modifyIORef (bbPosition bb) (+ i)
incrPosition bb i = modifyIORef (bbPosition bb) (+ fromIntegral i)
{-# INLINE incrPosition #-}

readMark :: ByteBuffer -> IO Int
readMark :: ByteBuffer -> IO Position
readMark = readIORef . bbMark
{-# INLINE readMark #-}

writeMark :: ByteBuffer -> Int -> IO ()
writeMark :: ByteBuffer -> Position -> IO ()
writeMark bb = writeIORef (bbMark bb)
{-# INLINE writeMark #-}

readSlice :: ByteBuffer -> IO Slice
readSlice = readIORef . bbSlice

writeSlice :: ByteBuffer -> Slice -> IO ()
writeSlice bb = writeIORef (bbSlice bb)

------------------------------------------------------------------------

remaining :: ByteBuffer -> IO Int
remaining bb = do
lim <- readLimit bb
pos <- readPosition bb
return (lim - pos)
return (fromIntegral (lim - fromIntegral pos))

------------------------------------------------------------------------
-- * Checks
Expand All @@ -87,23 +118,23 @@ invariant bb = do
lim <- readLimit bb
let capa = getCapacity bb
assert ((mark == (-1) || 0 <= mark) &&
mark <= pos &&
pos <= lim &&
lim <= capa)
mark <= fromIntegral pos &&
pos <= fromIntegral lim &&
lim <= fromIntegral capa)
(return ())

------------------------------------------------------------------------
-- * Create

allocate :: Int -> IO ByteBuffer
allocate capa = do
Ptr addr# <- mallocBytes capa
newByteBuffer addr# capa capa 0
allocate capa@(I# capa#) = IO $ \s ->
case newPinnedByteArray# capa# s of
(# s', mba# #) -> unIO (newByteBuffer mba# (Capacity capa) (Limit capa) 0 Nothing) s'

allocateAligned :: Int -> Int -> IO ByteBuffer
allocateAligned capa align = do
Ptr addr# <- posixMemalign capa align
newByteBuffer addr# capa capa 0
allocateAligned capa@(I# capa#) align@(I# align#) = IO $ \s ->
case newAlignedPinnedByteArray# capa# align# s of
(# s', mba# #) -> unIO (newByteBuffer mba# (Capacity capa) (Limit capa) 0 Nothing) s'

mmapped :: FilePath -> Int -> IO ByteBuffer
mmapped fp capa = do
Expand All @@ -116,52 +147,56 @@ mmapped fp capa = do
return bb

wrap :: ByteBuffer -> IO ByteBuffer
wrap bb = newByteBuffer (bbAddr bb) size size 0
wrap bb = newByteBuffer (bbData bb) capa lim (Position 0) (Just (bbSlice bb))
where
size = bbCapacity bb
capa = bbCapacity bb
lim = Limit (fromIntegral capa)

wrapPart :: ByteBuffer -> Int -> Int -> IO ByteBuffer
wrapPart bb offset len = newByteBuffer (bbAddr bb) size (offset + len) offset
wrapPart bb offset len = newByteBuffer (bbData bb) capa lim pos (Just (bbSlice bb))
where
size = bbCapacity bb
capa = bbCapacity bb
lim = Limit (fromIntegral offset + fromIntegral len)
pos = Position (fromIntegral offset)

slice :: ByteBuffer -> IO ByteBuffer
slice bb = do
I# pos# <- readPosition bb
pos <- readPosition bb
left <- remaining bb
newByteBuffer (bbAddr bb `plusAddr#` pos#) left left 0
slice <- newIORef (fromIntegral pos)
newByteBuffer (bbData bb) (Capacity left) (Limit left) (Position 0) (Just slice)

duplicate :: ByteBuffer -> IO ByteBuffer
duplicate bb = do
lim <- readLimit bb
pos <- readPosition bb
newByteBuffer (bbAddr bb) (getCapacity bb) lim pos
newByteBuffer (bbData bb) (getCapacity bb) lim pos (Just (bbSlice bb))

------------------------------------------------------------------------

mark :: ByteBuffer -> IO ()
mark bb = do
pos <- readPosition bb
writeMark bb pos

compact :: ByteBuffer -> IO ByteBuffer
compact = undefined

------------------------------------------------------------------------

-- | The position is set to zero, the limit is set to the capacity, and the mark
-- is discarded.
clear :: ByteBuffer
clear = undefined

-- | The limit is set to the current position and then the position is set to
-- zero. If the mark is defined then it is discarded.
flipBB :: ByteBuffer -> IO ByteBuffer
flipBB = undefined

-- | Rewinds this buffer. The position is set to zero and the mark is discarded.
rewind :: ByteBuffer -> IO ByteBuffer
rewind = undefined

-- | Resets this buffer's position to the previously-marked position.
-- Invoking this method neither changes nor discards the mark's value.
reset :: ByteBuffer
reset = undefined
reset :: ByteBuffer -> IO ()
reset bb = do
mrk <- readMark bb
writePosition bb mrk

------------------------------------------------------------------------
-- * Single-byte relative and absolute operations
Expand Down Expand Up @@ -235,6 +270,8 @@ getStorableAt bb ix = do
-- indexWord32OffAddr#
-- indexWord64OffAddr#

-- writeIntArray#

------------------------------------------------------------------------
-- * Mapped

Expand Down
50 changes: 31 additions & 19 deletions src/journal/src/Journal/Internal/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import GHC.ForeignPtr
import GHC.Prim
import GHC.Types

import Journal.Internal.ByteBuffer

------------------------------------------------------------------------

data Metrics a = Metrics
{ mPtr :: ForeignPtr Word8
, mOffsets :: Vector Int
{ mMetadata :: ByteBuffer
, mBuffer :: ByteBuffer
}

newtype MetricsSchema a = MetricsSchema
Expand All @@ -26,16 +28,13 @@ newtype MetricsSchema a = MetricsSchema
data MetricsType = Counter | Histogram

newMetrics :: (Enum a, Bounded a) => MetricsSchema a -> FilePath -> IO (Metrics a)
newMetrics ms@(MetricsSchema xs) fp = IO $ \s ->
-- XXX: just use mallocForeignPtrBytes :: Int -> IO (ForeignPtr a)
-- XXX: Aligned to avoid possible false sharing?
case newPinnedByteArray# len s of
(# s', arr #) -> case byteArrayContents# (unsafeCoerce# arr) of
addr# -> (# s', Metrics (ForeignPtr addr# (PlainPtr arr)) (calculateOffsets ms) #)
-- ptr' <- mmap ptr ...
-- assertM (ptr' == ptr)
newMetrics ms@(MetricsSchema xs) fp = do
bb <- mmapped fp (sizeOfOffsets + sizeOfMetrics ms)
meta <- wrapPart bb 0 sizeOfOffsets
buf <- wrapPart bb (sizeOfOffsets + 1) (sizeOfOffsets + sizeOfMetrics ms)
return (Metrics meta buf)
where
I# len = length xs
sizeOfOffsets = length xs * sizeOf (8 :: Int)

-- XXX: how can we avoid `incrCounter` being called on a histogram?
incrCounter :: (Enum a, Bounded a) => Metrics a -> a -> Int -> IO ()
Expand All @@ -54,21 +53,34 @@ validSchema :: (Enum a, Bounded a) => MetricsSchema a -> Either SchemaError ()
validSchema = undefined -- map (fromEnum . fst) . unMetricsSchema

lookupOffset :: (Enum a, Bounded a) => Metrics a -> a -> Int
lookupOffset m x = mOffsets m Vector.! fromEnum x
lookupOffset m x = undefined -- mOffsets m Vector.! fromEnum x

calculateOffsets :: (Enum a, Bounded a) => MetricsSchema a -> Vector Int
calculateOffsets (MetricsSchema xs)
= Vector.take (length xs)
. Vector.fromList
. scanl (\ih (_, mty) -> intsOfSpaceNeeded mty + ih) 0
$ xs
where
intsOfSpaceNeeded :: MetricsType -> Int
intsOfSpaceNeeded Counter = 1 -- count
intsOfSpaceNeeded Histogram
= 2 ^ 16 -- buckets
+ 1 -- sum
+ 1 -- count

intsOfSpaceNeeded :: MetricsType -> Int
intsOfSpaceNeeded Counter = 1 -- count
intsOfSpaceNeeded Histogram
= 2 ^ 16 -- buckets
+ 1 -- sum
+ 1 -- count

sizeOfMetrics :: MetricsSchema a -> Int
sizeOfMetrics
= (* sizeOf (8 :: Int))
. sum
. map (intsOfSpaceNeeded . snd)
. unMetricsSchema

addNewCounter :: ByteBuffer -> IO ()
addNewCounter bb = undefined

addNewHistogram :: ByteBuffer -> IO ()
addNewHistogram = undefined

------------------------------------------------------------------------

Expand Down
9 changes: 4 additions & 5 deletions src/journal/src/Journal/Internal/Mmap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,19 @@ posixMemalign align size = do
peek memPtr

sysconfPageSize :: IO Int
sysconfPageSize = fromIntegral <$> c_sysconf 30
sysconfPageSize = fromIntegral <$> c_sysconf _SC_PAGE_SIZE
where
_SC_PAGE_SIZE = 30

------------------------------------------------------------------------

{-
main :: IO ()
main = do
fptr <- posixMemalign 4096 4096
fptr <- posixMemalignFPtr 4096 4096
withForeignPtr fptr $ \ptr' -> do
ptr <- mmap (Just ptr') 16 (PROT (READ :| WRITE)) MAP_SHARED Nothing 0
if ptr /= ptr'
then error "not same ptr"
else do
msync ptr 16 MS_SYNC False
munmap ptr 16
-}

0 comments on commit 930edda

Please sign in to comment.