Skip to content

Commit

Permalink
feat(journal): add atomic counter module and start working on file ov…
Browse files Browse the repository at this point in the history
…erflow
  • Loading branch information
symbiont-stevan-andjelkovic committed Nov 30, 2021
1 parent 90cd226 commit 315b40c
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/journal/journal.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ library

build-depends:
, async
, ghc-prim
, mmap
, network

exposed-modules:
Journal
Journal.Internal
Journal.Types
Journal.Types.AtomicCounter

ghc-options: -O2
default-language: Haskell2010
Expand Down
11 changes: 5 additions & 6 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Journal
, replay_
) where

import Control.Monad.IO.Class (MonadIO)
import Control.Monad (unless)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -53,11 +54,9 @@ startJournal dir (Options maxByteSize) = do
-- XXX: assert max size
bytesProducedCounter <- newCounter offset
ptrRef <- newJournalPtrRef (ptr `plusPtr` offset)
jcPtrRef <- newJournalConsumerPtrRef ptr
fileCounter <- newCounter 0
bytesConsumedCounter <- newCounter 0
return (Journal ptrRef bytesProducedCounter maxByteSize fileCounter,
JournalConsumer jcPtrRef bytesConsumedCounter)
jc <- JournalConsumer <$> newJournalConsumerPtrRef ptr <*> newCounter 0
return (Journal ptrRef bytesProducedCounter maxByteSize fileCounter, jc)

------------------------------------------------------------------------

Expand Down Expand Up @@ -93,7 +92,7 @@ appendRecv jour sock len = do
readJournal :: JournalConsumer -> IO ByteString
readJournal jc = do
ptr <- getJournalConsumerPtr jc
offset <- incrCounter hEADER_SIZE (jcBytesConsumed jc)
offset <- getAndIncrCounter hEADER_SIZE (jcBytesConsumed jc)
len <- waitForHeader ptr offset
fptr <- newForeignPtr_ ptr
let bs = BS.copy (fromForeignPtr fptr (offset + hEADER_SIZE) len)
Expand All @@ -107,7 +106,7 @@ readJournal jc = do
truncateAfterSnapshot :: Journal -> Int -> IO ()
truncateAfterSnapshot jour bytesRead = undefined

replay :: Journal -> (ByteString -> IO a) -> IO [a]
replay :: MonadIO m => Journal -> (ByteString -> m a) -> m [a]
replay = undefined

replay_ :: Journal -> (ByteString -> IO ()) -> IO ()
Expand Down
27 changes: 23 additions & 4 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,35 @@ import Foreign.Ptr (Ptr, plusPtr)
import Foreign.Storable (pokeByteOff, peekByteOff)

import Journal.Types
import Journal.Types.AtomicCounter

------------------------------------------------------------------------

-- | The size of the journal entry header in bytes.
hEADER_SIZE :: Int
hEADER_SIZE = 4 -- sizeOf (0 :: Word32)
-- XXX: Some special header start byte?
-- XXX: version?
-- XXX: CRC?

claim :: Journal -> Int -> IO Int
claim jour bytes = incrCounter (bytes + hEADER_SIZE) (jOffset jour)
claim jour len = do
offset <- getAndIncrCounter (len + hEADER_SIZE) (jOffset jour)
-- XXX: mod/.&. maxByteSize?
if offset + len <= jMaxByteSize jour
then return offset -- Fits in current file.
else if offset < jMaxByteSize jour
then do
-- First writer that overflowed the file, the second one
-- would have got an offset higher than `maxBytes`.

-- rotate
undefined
else do
-- `offset >= maxBytes`, so we clearly can't write to the current file.
-- Wait for the first writer that overflowed to rotate the files then
-- write.
undefined

-- XXX:
-- if bytes + offset <= jMaxSize then write to active file
Expand All @@ -45,9 +63,10 @@ writeLBSToPtr bs ptr | LBS.null bs = return ()
go (n - 1)

writeHeader :: Ptr Word8 -> Int -> IO ()
writeHeader ptr len = do
let header = encode (fromIntegral len :: Word32)
writeLBSToPtr header ptr
writeHeader ptr len = writeLBSToPtr header ptr
where
header :: ByteString
header = encode (fromIntegral len :: Word32)

readHeader :: Ptr Word8 -> IO Int
readHeader ptr = do
Expand Down
36 changes: 18 additions & 18 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
module Journal.Types where
module Journal.Types
( Journal(Journal)
, jMaxByteSize
, jOffset
, Options(Options)
, JournalConsumer(JournalConsumer)
, jcBytesConsumed
, getJournalPtr
, getJournalConsumerPtr
, newJournalPtrRef
, newJournalConsumerPtrRef
, module Journal.Types.AtomicCounter)
where

import Control.Concurrent.STM
import Control.Concurrent.STM (TVar)
import Data.ByteString (ByteString)
import Control.Concurrent.STM
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef')
import Data.Word (Word8, Word32, Word64)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Word (Word32, Word64, Word8)
import Foreign.Ptr (Ptr, plusPtr)

------------------------------------------------------------------------

newtype AtomicCounter = AtomicCounter (IORef Int)
import Journal.Types.AtomicCounter

newCounter :: Int -> IO AtomicCounter
newCounter i = AtomicCounter <$> newIORef i

incrCounter :: Int -> AtomicCounter -> IO Int
incrCounter i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, j))

incrCounter_ :: Int -> AtomicCounter -> IO ()
incrCounter_ i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, ()))

readCounter :: AtomicCounter -> IO Int
readCounter (AtomicCounter ref) = readIORef ref
------------------------------------------------------------------------

data Journal = Journal
{ jPtr :: !(TVar (Ptr Word8))
Expand Down
107 changes: 107 additions & 0 deletions src/journal/src/Journal/Types/AtomicCounter.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}

-- Inspired by:
-- * https://github.com/jberryman/unagi-chan/blob/master/src/Data/Atomics/Counter/Fat.hs
-- * https://hackage.haskell.org/package/atomic-primops-0.8.4/docs/src/Data.Atomics.Counter.html
-- * https://hackage.haskell.org/package/unboxed-ref-0.4.0.0/docs/src/Data-STRef-Unboxed-Internal.html#STRefU
-- * https://hackage.haskell.org/package/ghc-8.10.2/docs/src/FastMutInt.html#FastMutInt

module Journal.Types.AtomicCounter
( AtomicCounter()
, newCounter
, incrCounter
, incrCounter_
, getAndIncrCounter
, decrCounter
, decrCounter_
, readCounter
, casCounter
) where

import Data.Bits (finiteBitSize)
import GHC.Exts
( MutableByteArray#
, RealWorld
, casIntArray#
, fetchAddIntArray#
, fetchSubIntArray#
, newAlignedPinnedByteArray#
, readIntArray#
, writeIntArray#
, (+#)
, (-#)
, (==#)
, MutVar#
, newMutVar#
, readMutVar#
)
import GHC.Types

------------------------------------------------------------------------

data AtomicCounter = AtomicCounter !(MutableByteArray# RealWorld)

sIZEOF_CACHELINE :: Int
sIZEOF_CACHELINE = 64
{-# INLINE sIZEOF_CACHELINE #-}
-- ^ TODO: See
-- https://github.com/NickStrupat/CacheLineSize/blob/93a57c094f71a2796714f7a28d74dd8776149193/CacheLineSize.c
-- for how to get the cache line size on Windows, MacOS and Linux.

-- | Create a new atomic counter padded with 64-bytes (an x86 cache line) to try
-- to avoid false sharing.
newCounter :: Int -> IO AtomicCounter
newCounter (I# n) = IO $ \s ->
case newAlignedPinnedByteArray# size alignment s of
(# s', arr #) -> case writeIntArray# arr 0# n s' of
s'' -> (# s'', AtomicCounter arr #)
where
!(I# size) = finiteBitSize (0 :: Int)
!(I# alignment) = sIZEOF_CACHELINE
{-# INLINE newCounter #-}

incrCounter :: Int -> AtomicCounter -> IO Int
incrCounter (I# incr) (AtomicCounter arr) = IO $ \s ->
case fetchAddIntArray# arr 0# incr s of
(# s', i #) -> (# s', I# (i +# incr) #)
{-# INLINE incrCounter #-}

getAndIncrCounter :: Int -> AtomicCounter -> IO Int
getAndIncrCounter (I# incr) (AtomicCounter arr) = IO $ \s ->
case fetchAddIntArray# arr 0# incr s of
(# s', i #) -> (# s', I# i #)
{-# INLINE getAndIncrCounter #-}

incrCounter_ :: Int -> AtomicCounter -> IO ()
incrCounter_ (I# incr) (AtomicCounter arr) = IO $ \s ->
case fetchAddIntArray# arr 0# incr s of
(# s', _i #) -> (# s', () #)
{-# INLINE incrCounter_ #-}

decrCounter :: Int -> AtomicCounter -> IO Int
decrCounter (I# decr) (AtomicCounter arr) = IO $ \s ->
case fetchSubIntArray# arr 0# decr s of
(# s', i #) -> (# s', I# (i -# decr) #)
{-# INLINE decrCounter #-}

decrCounter_ :: Int -> AtomicCounter -> IO ()
decrCounter_ (I# decr) (AtomicCounter arr) = IO $ \s ->
case fetchSubIntArray# arr 0# decr s of
(# s', _i #) -> (# s', () #)
{-# INLINE decrCounter_ #-}

readCounter :: AtomicCounter -> IO Int
readCounter (AtomicCounter arr) = IO $ \s ->
case readIntArray# arr 0# s of
(# s', i #) -> (# s', I# i #)
{-# INLINE readCounter #-}

casCounter :: AtomicCounter -> Int -> Int -> IO Bool
casCounter (AtomicCounter arr) (I# old) (I# new) = IO $ \s ->
case casIntArray# arr 0# old new s of
(# s', before #) -> case before ==# old of
1# -> (# s', True #)
0# -> (# s', False #)
{-# INLINE casCounter #-}

0 comments on commit 315b40c

Please sign in to comment.