Skip to content

Commit

Permalink
feat(journal): add beginnings of bytebuffer and metrics modules
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 21, 2021
1 parent a622c32 commit 5691fed
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/journal/journal.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ library
Journal.CRC32
Journal.Internal
Journal.Internal.Parse
Journal.Internal.Metrics
Journal.Internal.ByteBuffer
Journal.Types
Journal.Types.AtomicCounter

Expand Down
148 changes: 148 additions & 0 deletions src/journal/src/Journal/Internal/ByteBuffer.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}

module Journal.Internal.ByteBuffer where

import Control.Exception
import Data.IORef
import GHC.Word
import Foreign
import GHC.Exts
import GHC.Base (unIO)
import GHC.Types

------------------------------------------------------------------------
-- * Type

data ByteBuffer = ByteBuffer
{ bbArray :: {-# UNPACK #-} !(MutableByteArray# RealWorld)
, bbCapacity :: {-# UNPACK #-} !Int
, bbLimit :: {-# UNPACK #-} !(IORef Int)
, bbPosition :: {-# UNPACK #-} !(IORef Int)
}

newByteBuffer :: MutableByteArray# RealWorld -> Int -> Int -> Int -> IO ByteBuffer
newByteBuffer mba# capa lim pos =
ByteBuffer mba# capa <$> newIORef lim <*> newIORef pos

bbPtr :: ByteBuffer -> Ptr a
bbPtr (ByteBuffer mba# _ _ _) = Ptr (byteArrayContents# (unsafeCoerce# mba#))
{-# INLINE bbPtr #-}

getCapacity :: ByteBuffer -> Int
getCapacity = bbCapacity

readLimit :: ByteBuffer -> IO Int
readLimit = readIORef . bbLimit

readPosition :: ByteBuffer -> IO Int
readPosition = readIORef . bbPosition

writePosition :: ByteBuffer -> Int -> IO ()
writePosition bb = writeIORef (bbPosition bb)

incrPosition :: ByteBuffer -> Int -> IO ()
incrPosition bb i = modifyIORef (bbPosition bb) (+ i)

------------------------------------------------------------------------

remaining :: ByteBuffer -> IO Int
remaining bb = undefined


------------------------------------------------------------------------
-- * Checks

boundCheck :: ByteBuffer -> Int -> IO ()
boundCheck bb ix = do
if fromIntegral ix <= getCapacity bb
then return ()
else throwIO (IndexOutOfBounds "XXX")

------------------------------------------------------------------------
-- * Create

allocate :: Int -> IO ByteBuffer
allocate capa@(I# capa#) = IO $ \s ->
case newPinnedByteArray# capa# s of
(# s', mba# #) -> unIO (newByteBuffer mba# capa 0 0) s'

mmapped :: FilePath -> Int -> IO ByteBuffer
mmapped = undefined

wrap :: ByteBuffer -> IO ByteBuffer
wrap = undefined

wrapPart :: ByteBuffer -> Int -> Int -> IO ByteBuffer
wrapPart bb offset len = undefined

slice :: ByteBuffer -> IO ByteBuffer
slice bb@(ByteBuffer mba# _ _ _) = do
pos <- readPosition bb
-- XXX:
newByteBuffer (unsafeCoerce# (bbPtr bb `plusPtr` pos)) undefined undefined 0

duplicate :: ByteBuffer -> IO ByteBuffer
duplicate bb@(ByteBuffer mba# _ _ _) = do
pos <- readPosition bb
lim <- readLimit bb
newByteBuffer mba# (getCapacity bb) pos lim

------------------------------------------------------------------------
-- * Single-byte relative and absolute operations

putByte :: ByteBuffer -> Word8 -> IO ()
putByte = undefined

getByte :: ByteBuffer -> IO Word8
getByte = undefined

putByteAt :: ByteBuffer -> Int -> Word8 -> IO ()
putByteAt = undefined

getByteAt :: ByteBuffer -> Int -> IO Word8
getByteAt = undefined

------------------------------------------------------------------------
-- * Multi-byte operations

putBytes :: ByteBuffer -> [Word8] -> IO ()
putBytes = undefined

getBytes :: ByteBuffer -> IO [Word8]
getBytes = undefined

------------------------------------------------------------------------
-- * Relative operations on `Storable` elements

putStorable :: Storable a => ByteBuffer -> a -> IO ()
putStorable bb x = do
pos <- readPosition bb
putStorableAt bb (fromIntegral pos) x
incrPosition bb (sizeOf x)

getStorable :: Storable a => ByteBuffer -> IO a
getStorable bb = do
pos <- readPosition bb
x <- getStorableAt bb (fromIntegral pos)
incrPosition bb (sizeOf x)
return x

------------------------------------------------------------------------
-- * Absolute operations on `Storable` elements

putStorableAt :: Storable a => ByteBuffer -> Int -> a -> IO ()
putStorableAt bb ix x = do
boundCheck bb ix
pokeByteOff (bbPtr bb) ix x

getStorableAt :: Storable a => ByteBuffer -> Int -> IO a
getStorableAt bb ix = do
boundCheck bb ix
peekByteOff (bbPtr bb) ix

------------------------------------------------------------------------
-- * Mapped

msync :: ByteBuffer -> IO ()
msync = undefined
88 changes: 88 additions & 0 deletions src/journal/src/Journal/Internal/Metrics.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}

module Journal.Internal.Metrics where

import Data.Vector.Unboxed (Vector)
import qualified Data.Vector.Unboxed as Vector
import Data.Word
import Foreign
import GHC.Exts
import GHC.ForeignPtr
import GHC.Prim
import GHC.Types

------------------------------------------------------------------------

data Metrics a = Metrics
{ mPtr :: ForeignPtr Word8
, mOffsets :: Vector Int
}

newtype MetricsSchema a = MetricsSchema
{ unMetricsSchema :: [(a, MetricsType)]
}

data MetricsType = Counter | Histogram

newMetrics :: (Enum a, Bounded a) => MetricsSchema a -> FilePath -> IO (Metrics a)
newMetrics ms@(MetricsSchema xs) fp = IO $ \s ->
-- XXX: just use mallocForeignPtrBytes :: Int -> IO (ForeignPtr a)
-- XXX: Aligned to avoid possible false sharing?
case newPinnedByteArray# len s of
(# s', arr #) -> case byteArrayContents# (unsafeCoerce# arr) of
addr# -> (# s', Metrics (ForeignPtr addr# (PlainPtr arr)) (calculateOffsets ms) #)
-- ptr' <- mmap ptr ...
-- assertM (ptr' == ptr)
where
I# len = length xs

-- XXX: how can we avoid `incrCounter` being called on a histogram?
incrCounter :: (Enum a, Bounded a) => Metrics a -> a -> Int -> IO ()
incrCounter = undefined

measure :: (Enum a, Bounded a) => Metrics a -> a -> Int -> IO ()
measure = undefined

------------------------------------------------------------------------

-- * Internal

data SchemaError = Duplicate

validSchema :: (Enum a, Bounded a) => MetricsSchema a -> Either SchemaError ()
validSchema = undefined -- map (fromEnum . fst) . unMetricsSchema

lookupOffset :: (Enum a, Bounded a) => Metrics a -> a -> Int
lookupOffset m x = mOffsets m Vector.! fromEnum x

calculateOffsets :: (Enum a, Bounded a) => MetricsSchema a -> Vector Int
calculateOffsets (MetricsSchema xs)
= Vector.take (length xs)
. Vector.fromList
. scanl (\ih (_, mty) -> intsOfSpaceNeeded mty + ih) 0
$ xs
where
intsOfSpaceNeeded :: MetricsType -> Int
intsOfSpaceNeeded Counter = 1 -- count
intsOfSpaceNeeded Histogram
= 2 ^ 16 -- buckets
+ 1 -- sum
+ 1 -- count

------------------------------------------------------------------------

-- * Example

data MyMetrics = Connections | Latency
deriving (Enum, Bounded)

mySchema :: MetricsSchema MyMetrics
mySchema = MetricsSchema [(Connections, Counter), (Latency, Histogram)]

main :: IO ()
main = do
metrics <- newMetrics mySchema "/tmp/test-metrics"
incrCounter metrics Connections 1
measure metrics Latency 200
return ()

0 comments on commit 5691fed

Please sign in to comment.