Skip to content

Commit

Permalink
refactor(runtime): use atomic counter instead of ioref sequence number
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 28, 2021
1 parent 38932e9 commit 573487b
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 110 deletions.
8 changes: 4 additions & 4 deletions src/runtime-prototype/bench/disruptor/MP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import Disruptor.MP.Consumer
import Disruptor.MP.Producer
import Disruptor.MP.RingBuffer
import Disruptor.SequenceNumber
import StuntDouble.AtomicCounterPadded
import Disruptor.AtomicCounterPadded
import StuntDouble.Histogram.SingleProducer

------------------------------------------------------------------------

iTERATIONS :: Int64
iTERATIONS = 10_000_000
iTERATIONS = 100_000_000

main :: IO ()
main = do
Expand All @@ -48,8 +48,8 @@ once = do
blocking n = do
snr <- next rb
-- {-# SCC "transactions+1" #-} incrCounter_ 1 transactions
set rb snr (1 :: Int)
publish rb snr
{-# SCC set #-} set rb snr (1 :: Int)
{-# SCC publish #-} publish rb snr
go (n - 1)

nonBlocking n = do
Expand Down
4 changes: 2 additions & 2 deletions src/runtime-prototype/bench/disruptor/MPUnagiChan.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Main where

import Control.Concurrent (yield)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (wait, withAsync)
import Control.Monad (replicateM_)
import Data.Time (getCurrentTime, diffUTCTime)
Expand All @@ -21,7 +21,7 @@ main = do
(i, o) <- newChan

let producer = replicateM_ iTERATIONS (writeChan i (1 :: Int))
consumer = replicateM_ (iTERATIONS * 2) (readChan yield o)
consumer = replicateM_ (iTERATIONS * 2) (readChan (threadDelay 1) o)

performGC
start <- getCurrentTime
Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/bench/disruptor/SP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Disruptor.SP.Consumer
import Disruptor.SP.Producer
import Disruptor.SP.RingBuffer
import Disruptor.SequenceNumber
import StuntDouble.AtomicCounterPadded
import Disruptor.AtomicCounterPadded
import StuntDouble.Histogram.SingleProducer

------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/runtime-prototype/bench/disruptor/SPUnagiChan.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Main where

import Control.Concurrent (yield)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (wait, withAsync)
import Control.Monad (replicateM_)
import Data.Time (getCurrentTime, diffUTCTime)
Expand All @@ -21,7 +21,7 @@ main = do
(i, o) <- newChan

let producer = replicateM_ iTERATIONS (writeChan i (1 :: Int))
consumer = replicateM_ iTERATIONS (readChan yield o)
consumer = replicateM_ iTERATIONS (readChan (threadDelay 1) o)

performGC
start <- getCurrentTime
Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/bench/disruptor/SPUnboxed.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import Data.Time
import System.Mem (performGC)
import Text.Printf

import Disruptor.AtomicCounterPadded
import Disruptor.SP.Unboxed.Consumer
import Disruptor.SP.Unboxed.Producer
import Disruptor.SP.Unboxed.RingBuffer
import Disruptor.SequenceNumber
import StuntDouble.Histogram.SingleProducer
import StuntDouble.AtomicCounterPadded

------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/bench/disruptor/SingleOps.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Data.IORef
import System.CPUTime

import StuntDouble.Histogram
import qualified StuntDouble.AtomicCounterPadded as Padded
import qualified Disruptor.AtomicCounterPadded as Padded

------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/bench/disruptor/TBQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Data.Time
import Text.Printf

import StuntDouble.Histogram.SingleProducer
import StuntDouble.AtomicCounterPadded
import Disruptor.AtomicCounterPadded

------------------------------------------------------------------------

Expand Down
86 changes: 86 additions & 0 deletions src/runtime-prototype/src/Disruptor/AtomicCounterPadded.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}

-- Inspired by:
-- * https://github.com/jberryman/unagi-chan/blob/master/src/Data/Atomics/Counter/Fat.hs
-- * https://hackage.haskell.org/package/atomic-primops-0.8.4/docs/src/Data.Atomics.Counter.html
-- * https://hackage.haskell.org/package/unboxed-ref-0.4.0.0/docs/src/Data-STRef-Unboxed-Internal.html#STRefU
-- * https://hackage.haskell.org/package/ghc-8.10.2/docs/src/FastMutInt.html#FastMutInt

module Disruptor.AtomicCounterPadded
( AtomicCounter()
, newCounter
, incrCounter
, incrCounter_
, decrCounter
, decrCounter_
, readCounter
, casCounter
) where

import Control.Monad.Primitive (RealWorld)
import Data.Bits (finiteBitSize)
import GHC.Exts
import GHC.Types

------------------------------------------------------------------------

data AtomicCounter = AtomicCounter !(MutableByteArray# RealWorld)

sIZEOF_CACHELINE :: Int
sIZEOF_CACHELINE = 64
{-# INLINE sIZEOF_CACHELINE #-}
-- ^ TODO: See
-- https://github.com/NickStrupat/CacheLineSize/blob/93a57c094f71a2796714f7a28d74dd8776149193/CacheLineSize.c
-- for how to get the cache line size on Windows, MacOS and Linux.

-- | Create a new atomic counter padded with 64-bytes (an x86 cache line) to try
-- to avoid false sharing.
newCounter :: Int -> IO AtomicCounter
newCounter (I# n) = IO $ \s ->
case newAlignedPinnedByteArray# size alignment s of
(# s', arr #) -> case writeIntArray# arr 0# n s' of
s'' -> (# s'', AtomicCounter arr #)
where
!(I# size) = finiteBitSize (0 :: Int)
!(I# alignment) = sIZEOF_CACHELINE
{-# INLINE newCounter #-}

incrCounter :: Int -> AtomicCounter -> IO Int
incrCounter (I# incr) (AtomicCounter arr) = IO $ \s ->
case fetchAddIntArray# arr 0# incr s of
(# s', i #) -> (# s', I# (i +# incr) #)
{-# INLINE incrCounter #-}

incrCounter_ :: Int -> AtomicCounter -> IO ()
incrCounter_ (I# incr) (AtomicCounter arr) = IO $ \s ->
case fetchAddIntArray# arr 0# incr s of
(# s', _i #) -> (# s', () #)
{-# INLINE incrCounter_ #-}

decrCounter :: Int -> AtomicCounter -> IO Int
decrCounter (I# decr) (AtomicCounter arr) = IO $ \s ->
case fetchSubIntArray# arr 0# decr s of
(# s', i #) -> (# s', I# (i -# decr) #)
{-# INLINE decrCounter #-}

decrCounter_ :: Int -> AtomicCounter -> IO ()
decrCounter_ (I# decr) (AtomicCounter arr) = IO $ \s ->
case fetchSubIntArray# arr 0# decr s of
(# s', _i #) -> (# s', () #)
{-# INLINE decrCounter_ #-}

readCounter :: AtomicCounter -> IO Int
readCounter (AtomicCounter arr) = IO $ \s ->
case readIntArray# arr 0# s of
(# s', i #) -> (# s', I# i #)
{-# INLINE readCounter #-}

casCounter :: AtomicCounter -> Int -> Int -> IO Bool
casCounter (AtomicCounter arr) (I# old) (I# new) = IO $ \s ->
case casIntArray# arr 0# old new s of
(# s', before #) -> case before ==# old of
1# -> (# s', True #)
0# -> (# s', False #)
{-# INLINE casCounter #-}
10 changes: 3 additions & 7 deletions src/runtime-prototype/src/Disruptor/MP/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,19 @@ waitFor :: SequenceNumber -> RingBuffer e -> IO SequenceNumber
waitFor consumed rb = go
where
go = do
produced <- readIORef (rbCursor rb)
produced <- getCursor rb
if consumed < produced
then return produced
else do
-- NOTE: Removing the sleep seems to cause non-termination... XXX: Why
-- though? the consumer should be running on its own thread?
yield
-- threadDelay 1
-- yield
threadDelay 1
go -- SPIN
-- ^ XXX: waitStrategy should be passed in and acted on here.
--
-- XXX: Other wait strategies could be implemented here, e.g. we could
-- try to recurse immediately here, and if there's no work after a
-- couple of tries go into a takeMTVar sleep waiting for a producer to
-- wake us up.

_getSequenceNumberRef :: SequenceBarrier e -> IORef SequenceNumber
_getSequenceNumberRef (RingBufferBarrier rb) = rbCursor rb
_getSequenceNumberRef (EventConsumerBarrier ec) = ecSequenceNumber ec
{-# INLINE waitFor #-}
63 changes: 43 additions & 20 deletions src/runtime-prototype/src/Disruptor/MP/RingBuffer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import Data.IORef
import Data.Int (Int64)
import Data.Vector.Mutable (IOVector)
import qualified Data.Vector.Mutable as Vector
import qualified Data.Vector.Unboxed.Mutable as Unboxed

import Disruptor.SequenceNumber
import Disruptor.AtomicCounterPadded

------------------------------------------------------------------------

Expand All @@ -34,7 +36,7 @@ data RingBuffer e = RingBuffer
-- | The capacity, or maximum amount of values, of the ring buffer.
rbCapacity :: {-# UNPACK #-} !Int64
-- | The cursor pointing to the head of the ring buffer.
, rbCursor :: {-# UNPACK #-} !(IORef SequenceNumber)
, rbCursor :: {-# UNPACK #-} !AtomicCounter
-- | The values of the ring buffer.
, rbEvents :: {-# UNPACK #-} !(IOVector e)
-- | References to the last consumers' sequence numbers, used in order to
Expand All @@ -45,7 +47,7 @@ data RingBuffer e = RingBuffer
-- above references.
, rbCachedGatingSequence :: {-# UNPACK #-} !(IORef SequenceNumber)
-- | Used to keep track of what has been published in the multi-producer case.
, rbAvailableBuffer :: {-# UNPACK #-} !(IOVector Int)
, rbAvailableBuffer :: {-# UNPACK #-} !(Unboxed.IOVector Int)
}

newRingBuffer :: Int -> IO (RingBuffer e)
Expand All @@ -56,12 +58,12 @@ newRingBuffer capacity
-- NOTE: The use of bitwise and (`.&.`) in `index` relies on this.
error "newRingBuffer: capacity must be a power of 2"
| otherwise = do
snr <- newIORef (-1)
snr <- newCounter (-1)
v <- Vector.new capacity
gs <- newIORef []
cgs <- newIORef (-1)
ab <- Vector.new capacity
Vector.set ab (-1)
ab <- Unboxed.new capacity
Unboxed.set ab (-1)
return (RingBuffer (fromIntegral capacity) snr v gs cgs ab)
{-# INLINABLE newRingBuffer #-}

Expand All @@ -71,7 +73,7 @@ capacity = rbCapacity
{-# INLINE capacity #-}

getCursor :: RingBuffer e -> IO SequenceNumber
getCursor rb = readIORef (rbCursor rb)
getCursor rb = fromIntegral <$> readCounter (rbCursor rb)
{-# INLINE getCursor #-}

setGatingSequences :: RingBuffer e -> [IORef SequenceNumber] -> IO ()
Expand All @@ -87,14 +89,14 @@ setCachedGatingSequence rb = writeIORef (rbCachedGatingSequence rb)
{-# INLINE setCachedGatingSequence #-}

setAvailable :: RingBuffer e -> SequenceNumber -> IO ()
setAvailable rb snr = Vector.unsafeWrite
setAvailable rb snr = Unboxed.unsafeWrite
(rbAvailableBuffer rb)
(index (capacity rb) snr)
(availabilityFlag (capacity rb) snr)
{-# INLINE setAvailable #-}

getAvailable :: RingBuffer e -> Int -> IO Int
getAvailable rb ix = Vector.unsafeRead (rbAvailableBuffer rb) ix
getAvailable rb ix = Unboxed.unsafeRead (rbAvailableBuffer rb) ix
{-# INLINE getAvailable #-}

minimumSequence :: RingBuffer e -> IO SequenceNumber
Expand Down Expand Up @@ -136,17 +138,21 @@ next rb = nextBatch rb 1
--
nextBatch :: RingBuffer e -> Int -> IO SequenceNumber
nextBatch rb n = assert (n > 0 && fromIntegral n <= capacity rb) $ do
(current, nextSequence) <- {-# SCC "atomicModifyIORef'" #-}
-- XXX: The atomic takes 60% of the time of
-- `nextBatch`... Try using `AtomicCounter` instead
-- of `IORef SequneceNumber`.
atomicModifyIORef' (rbCursor rb) $ \current ->
let
nextSequence = current + fromIntegral n
in
(nextSequence, (current, nextSequence))

let wrapPoint :: SequenceNumber
-- (current, nextSequence) <- -- {-# SCC "atomicModifyIORef'" #-}
-- -- XXX: The atomic takes 60% of the time of
-- -- `nextBatch`... Try using `AtomicCounter` instead
-- -- of `IORef SequneceNumber`.
-- atomicModifyIORef' (rbCursor rb) $ \current ->
-- let
-- nextSequence = current + fromIntegral n
-- in
-- (nextSequence, (current, nextSequence))
nextSequence <- fromIntegral <$> {-# SCC incrCounter #-} incrCounter n (rbCursor rb)

let current :: SequenceNumber
current = nextSequence - fromIntegral n

wrapPoint :: SequenceNumber
wrapPoint = nextSequence - fromIntegral (capacity rb)

cachedGatingSequence <- getCachedGatingSequence rb
Expand Down Expand Up @@ -182,6 +188,22 @@ tryNextBatch :: RingBuffer e -> Int -> IO MaybeSequenceNumber
tryNextBatch rb n = assert (n > 0) go
where
go = do
current_ <- {-# SCC "readCounter" #-} readCounter (rbCursor rb)
let current = fromIntegral current_
next_ = current_ + n
next = fromIntegral next_
b <- {-# SCC "hasCapacity" #-} hasCapacity rb n current
if not b
then return None
else do
success <- {-# SCC casCounter #-} casCounter (rbCursor rb) current_ next_
if success
then return (Some next)
else do
{-# SCC "threadDelay" #-} threadDelay 1
-- yield
go -- SPIN
{--
current <- {-# SCC "readForCas" #-} readForCAS (rbCursor rb)
let current_ = peekTicket current
next = current_ + fromIntegral n
Expand All @@ -196,6 +218,7 @@ tryNextBatch rb n = assert (n > 0) go
{-# SCC "threadDelay" #-}threadDelay 1
-- yield
go -- SPIN
-}
{-# INLINABLE tryNextBatch #-}

hasCapacity :: RingBuffer e -> Int -> SequenceNumber -> IO Bool
Expand Down Expand Up @@ -254,7 +277,7 @@ tryGet rb want = do

isAvailable :: RingBuffer e -> SequenceNumber -> IO Bool
isAvailable rb snr =
(==) <$> Vector.unsafeRead (rbAvailableBuffer rb) (index capacity snr)
(==) <$> Unboxed.unsafeRead (rbAvailableBuffer rb) (index capacity snr)
<*> pure (availabilityFlag capacity snr)
where
capacity = rbCapacity rb
Expand Down
Loading

0 comments on commit 573487b

Please sign in to comment.