diff --git a/src/journal/src/Journal/Internal/ByteBuffer.hs b/src/journal/src/Journal/Internal/ByteBuffer.hs index 05b4150a..58b113f9 100644 --- a/src/journal/src/Journal/Internal/ByteBuffer.hs +++ b/src/journal/src/Journal/Internal/ByteBuffer.hs @@ -1,4 +1,5 @@ {-# LANGUAGE MagicHash #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE UnboxedTuples #-} module Journal.Internal.ByteBuffer where @@ -8,68 +9,98 @@ import Data.IORef import Foreign import GHC.ForeignPtr import GHC.Exts +import GHC.Types +import GHC.IO import System.Posix.IO (openFd, defaultFileFlags, OpenMode(ReadWrite)) import Journal.Internal.Mmap ------------------------------------------------------------------------ --- * Type +-- * Types + +type MBArray = MutableByteArray# RealWorld data ByteBuffer = ByteBuffer - { bbAddr :: {-# UNPACK #-} !Addr# - , bbCapacity :: {-# UNPACK #-} !Int - , bbLimit :: {-# UNPACK #-} !(IORef Int) - , bbPosition :: {-# UNPACK #-} !(IORef Int) - , bbMark :: {-# UNPACK #-} !(IORef Int) + { bbData :: {-# UNPACK #-} !MBArray + , bbCapacity :: {-# UNPACK #-} !Capacity + , bbLimit :: {-# UNPACK #-} !(IORef Limit) + , bbPosition :: {-# UNPACK #-} !(IORef Position) + , bbMark :: {-# UNPACK #-} !(IORef Position) + , bbSlice :: {-# UNPACK #-} !(IORef Slice) } -newByteBuffer :: Addr# -> Int -> Int -> Int -> IO ByteBuffer -newByteBuffer addr# capa lim pos = - ByteBuffer addr# capa <$> newIORef lim <*> newIORef pos <*> newIORef (-1) +newtype Capacity = Capacity Int + deriving (Num, Integral, Real, Ord, Eq, Enum) + +newtype Limit = Limit Int + deriving (Num, Integral, Real, Ord, Eq, Enum) + +newtype Position = Position Int + deriving (Num, Integral, Real, Ord, Eq, Enum) + +newtype Slice = Slice Int + deriving (Num, Integral, Real, Ord, Eq, Enum) + +------------------------------------------------------------------------ + +newByteBuffer :: MBArray -> Capacity -> Limit -> Position -> Maybe (IORef Slice) + -> IO ByteBuffer +newByteBuffer mba# capa lim pos mSli + = ByteBuffer mba# capa + <$> newIORef lim + <*> newIORef pos + <*> newIORef (-1) + <*> maybe (newIORef 0) return mSli bbPtr :: ByteBuffer -> Ptr a -bbPtr (ByteBuffer addr# _ _ _ _) = Ptr addr# +bbPtr (ByteBuffer mba# _ _ _ _ _) = Ptr (byteArrayContents# (unsafeCoerce# mba#)) {-# INLINE bbPtr #-} -getCapacity :: ByteBuffer -> Int +getCapacity :: ByteBuffer -> Capacity getCapacity = bbCapacity {-# INLINE getCapacity #-} -readLimit :: ByteBuffer -> IO Int +readLimit :: ByteBuffer -> IO Limit readLimit = readIORef . bbLimit {-# INLINE readLimit #-} -writeLimit :: ByteBuffer -> Int -> IO () +writeLimit :: ByteBuffer -> Limit -> IO () writeLimit bb = writeIORef (bbLimit bb) {-# INLINE writeLimit #-} -readPosition :: ByteBuffer -> IO Int +readPosition :: ByteBuffer -> IO Position readPosition = readIORef . bbPosition {-# INLINE readPosition #-} -writePosition :: ByteBuffer -> Int -> IO () +writePosition :: ByteBuffer -> Position -> IO () writePosition bb = writeIORef (bbPosition bb) {-# INLINE writePosition #-} incrPosition :: ByteBuffer -> Int -> IO () -incrPosition bb i = modifyIORef (bbPosition bb) (+ i) +incrPosition bb i = modifyIORef (bbPosition bb) (+ fromIntegral i) {-# INLINE incrPosition #-} -readMark :: ByteBuffer -> IO Int +readMark :: ByteBuffer -> IO Position readMark = readIORef . bbMark {-# INLINE readMark #-} -writeMark :: ByteBuffer -> Int -> IO () +writeMark :: ByteBuffer -> Position -> IO () writeMark bb = writeIORef (bbMark bb) {-# INLINE writeMark #-} +readSlice :: ByteBuffer -> IO Slice +readSlice = readIORef . bbSlice + +writeSlice :: ByteBuffer -> Slice -> IO () +writeSlice bb = writeIORef (bbSlice bb) + ------------------------------------------------------------------------ remaining :: ByteBuffer -> IO Int remaining bb = do lim <- readLimit bb pos <- readPosition bb - return (lim - pos) + return (fromIntegral (lim - fromIntegral pos)) ------------------------------------------------------------------------ -- * Checks @@ -87,23 +118,23 @@ invariant bb = do lim <- readLimit bb let capa = getCapacity bb assert ((mark == (-1) || 0 <= mark) && - mark <= pos && - pos <= lim && - lim <= capa) + mark <= fromIntegral pos && + pos <= fromIntegral lim && + lim <= fromIntegral capa) (return ()) ------------------------------------------------------------------------ -- * Create allocate :: Int -> IO ByteBuffer -allocate capa = do - Ptr addr# <- mallocBytes capa - newByteBuffer addr# capa capa 0 +allocate capa@(I# capa#) = IO $ \s -> + case newPinnedByteArray# capa# s of + (# s', mba# #) -> unIO (newByteBuffer mba# (Capacity capa) (Limit capa) 0 Nothing) s' allocateAligned :: Int -> Int -> IO ByteBuffer -allocateAligned capa align = do - Ptr addr# <- posixMemalign capa align - newByteBuffer addr# capa capa 0 +allocateAligned capa@(I# capa#) align@(I# align#) = IO $ \s -> + case newAlignedPinnedByteArray# capa# align# s of + (# s', mba# #) -> unIO (newByteBuffer mba# (Capacity capa) (Limit capa) 0 Nothing) s' mmapped :: FilePath -> Int -> IO ByteBuffer mmapped fp capa = do @@ -116,52 +147,56 @@ mmapped fp capa = do return bb wrap :: ByteBuffer -> IO ByteBuffer -wrap bb = newByteBuffer (bbAddr bb) size size 0 +wrap bb = newByteBuffer (bbData bb) capa lim (Position 0) (Just (bbSlice bb)) where - size = bbCapacity bb + capa = bbCapacity bb + lim = Limit (fromIntegral capa) wrapPart :: ByteBuffer -> Int -> Int -> IO ByteBuffer -wrapPart bb offset len = newByteBuffer (bbAddr bb) size (offset + len) offset +wrapPart bb offset len = newByteBuffer (bbData bb) capa lim pos (Just (bbSlice bb)) where - size = bbCapacity bb + capa = bbCapacity bb + lim = Limit (fromIntegral offset + fromIntegral len) + pos = Position (fromIntegral offset) slice :: ByteBuffer -> IO ByteBuffer slice bb = do - I# pos# <- readPosition bb + pos <- readPosition bb left <- remaining bb - newByteBuffer (bbAddr bb `plusAddr#` pos#) left left 0 + slice <- newIORef (fromIntegral pos) + newByteBuffer (bbData bb) (Capacity left) (Limit left) (Position 0) (Just slice) duplicate :: ByteBuffer -> IO ByteBuffer duplicate bb = do lim <- readLimit bb pos <- readPosition bb - newByteBuffer (bbAddr bb) (getCapacity bb) lim pos + newByteBuffer (bbData bb) (getCapacity bb) lim pos (Just (bbSlice bb)) ------------------------------------------------------------------------ +mark :: ByteBuffer -> IO () +mark bb = do + pos <- readPosition bb + writeMark bb pos + compact :: ByteBuffer -> IO ByteBuffer compact = undefined ------------------------------------------------------------------------ --- | The position is set to zero, the limit is set to the capacity, and the mark --- is discarded. clear :: ByteBuffer clear = undefined --- | The limit is set to the current position and then the position is set to --- zero. If the mark is defined then it is discarded. flipBB :: ByteBuffer -> IO ByteBuffer flipBB = undefined --- | Rewinds this buffer. The position is set to zero and the mark is discarded. rewind :: ByteBuffer -> IO ByteBuffer rewind = undefined --- | Resets this buffer's position to the previously-marked position. --- Invoking this method neither changes nor discards the mark's value. -reset :: ByteBuffer -reset = undefined +reset :: ByteBuffer -> IO () +reset bb = do + mrk <- readMark bb + writePosition bb mrk ------------------------------------------------------------------------ -- * Single-byte relative and absolute operations @@ -235,6 +270,8 @@ getStorableAt bb ix = do -- indexWord32OffAddr# -- indexWord64OffAddr# +-- writeIntArray# + ------------------------------------------------------------------------ -- * Mapped diff --git a/src/journal/src/Journal/Internal/Metrics.hs b/src/journal/src/Journal/Internal/Metrics.hs index df574c3c..51cba42c 100644 --- a/src/journal/src/Journal/Internal/Metrics.hs +++ b/src/journal/src/Journal/Internal/Metrics.hs @@ -12,11 +12,13 @@ import GHC.ForeignPtr import GHC.Prim import GHC.Types +import Journal.Internal.ByteBuffer + ------------------------------------------------------------------------ data Metrics a = Metrics - { mPtr :: ForeignPtr Word8 - , mOffsets :: Vector Int + { mMetadata :: ByteBuffer + , mBuffer :: ByteBuffer } newtype MetricsSchema a = MetricsSchema @@ -26,16 +28,13 @@ newtype MetricsSchema a = MetricsSchema data MetricsType = Counter | Histogram newMetrics :: (Enum a, Bounded a) => MetricsSchema a -> FilePath -> IO (Metrics a) -newMetrics ms@(MetricsSchema xs) fp = IO $ \s -> - -- XXX: just use mallocForeignPtrBytes :: Int -> IO (ForeignPtr a) - -- XXX: Aligned to avoid possible false sharing? - case newPinnedByteArray# len s of - (# s', arr #) -> case byteArrayContents# (unsafeCoerce# arr) of - addr# -> (# s', Metrics (ForeignPtr addr# (PlainPtr arr)) (calculateOffsets ms) #) - -- ptr' <- mmap ptr ... - -- assertM (ptr' == ptr) +newMetrics ms@(MetricsSchema xs) fp = do + bb <- mmapped fp (sizeOfOffsets + sizeOfMetrics ms) + meta <- wrapPart bb 0 sizeOfOffsets + buf <- wrapPart bb (sizeOfOffsets + 1) (sizeOfOffsets + sizeOfMetrics ms) + return (Metrics meta buf) where - I# len = length xs + sizeOfOffsets = length xs * sizeOf (8 :: Int) -- XXX: how can we avoid `incrCounter` being called on a histogram? incrCounter :: (Enum a, Bounded a) => Metrics a -> a -> Int -> IO () @@ -54,7 +53,7 @@ validSchema :: (Enum a, Bounded a) => MetricsSchema a -> Either SchemaError () validSchema = undefined -- map (fromEnum . fst) . unMetricsSchema lookupOffset :: (Enum a, Bounded a) => Metrics a -> a -> Int -lookupOffset m x = mOffsets m Vector.! fromEnum x +lookupOffset m x = undefined -- mOffsets m Vector.! fromEnum x calculateOffsets :: (Enum a, Bounded a) => MetricsSchema a -> Vector Int calculateOffsets (MetricsSchema xs) @@ -62,13 +61,26 @@ calculateOffsets (MetricsSchema xs) . Vector.fromList . scanl (\ih (_, mty) -> intsOfSpaceNeeded mty + ih) 0 $ xs - where - intsOfSpaceNeeded :: MetricsType -> Int - intsOfSpaceNeeded Counter = 1 -- count - intsOfSpaceNeeded Histogram - = 2 ^ 16 -- buckets - + 1 -- sum - + 1 -- count + +intsOfSpaceNeeded :: MetricsType -> Int +intsOfSpaceNeeded Counter = 1 -- count +intsOfSpaceNeeded Histogram + = 2 ^ 16 -- buckets + + 1 -- sum + + 1 -- count + +sizeOfMetrics :: MetricsSchema a -> Int +sizeOfMetrics + = (* sizeOf (8 :: Int)) + . sum + . map (intsOfSpaceNeeded . snd) + . unMetricsSchema + +addNewCounter :: ByteBuffer -> IO () +addNewCounter bb = undefined + +addNewHistogram :: ByteBuffer -> IO () +addNewHistogram = undefined ------------------------------------------------------------------------ diff --git a/src/journal/src/Journal/Internal/Mmap.hs b/src/journal/src/Journal/Internal/Mmap.hs index e0903b37..a5dce8f3 100644 --- a/src/journal/src/Journal/Internal/Mmap.hs +++ b/src/journal/src/Journal/Internal/Mmap.hs @@ -120,14 +120,15 @@ posixMemalign align size = do peek memPtr sysconfPageSize :: IO Int -sysconfPageSize = fromIntegral <$> c_sysconf 30 +sysconfPageSize = fromIntegral <$> c_sysconf _SC_PAGE_SIZE + where + _SC_PAGE_SIZE = 30 ------------------------------------------------------------------------ - {- main :: IO () main = do - fptr <- posixMemalign 4096 4096 + fptr <- posixMemalignFPtr 4096 4096 withForeignPtr fptr $ \ptr' -> do ptr <- mmap (Just ptr') 16 (PROT (READ :| WRITE)) MAP_SHARED Nothing 0 if ptr /= ptr' @@ -135,5 +136,3 @@ main = do else do msync ptr 16 MS_SYNC False munmap ptr 16 - --}