Skip to content

Commit

Permalink
feat(runtime): add unboxed ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 25, 2021
1 parent 4bd4f6d commit ab19884
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 4 deletions.
77 changes: 77 additions & 0 deletions src/runtime-prototype/bench/disruptor/SPUnboxed.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{-# LANGUAGE NumericUnderscores #-}

module Main where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Monad
import Data.Atomics.Counter
import Data.IORef
import Data.Int
import Data.Time
import Text.Printf

import Disruptor.ConsumerUnboxed
import Disruptor.Producer
import Disruptor.RingBuffer.SingleProducerUnboxed
import Disruptor.SequenceNumber
import StuntDouble.Histogram.SingleProducer

------------------------------------------------------------------------

iTERATIONS :: Int64
iTERATIONS = 70000

main :: IO ()
main = do
n <- getNumCapabilities
printf "%-25.25s%10d\n" "CPU capabilities" n
let ringBufferCapacity = 1024 * 64
rb <- newRingBuffer ringBufferCapacity
-- histo <- newHistogram
-- transactions <- newCounter 0
consumerFinished <- newEmptyMVar

let ep = EventProducer (const (go iTERATIONS)) ()
where
go :: Int64 -> IO ()
go 0 = return ()
go n = do
mSnr <- tryNext rb
case mSnr of
Some snr -> do
-- {-# SCC "transactions+1" #-} incrCounter_ 1 transactions
set rb snr (1 :: Int)
publish rb snr
go (n - 1)
None -> go n

let handler _s _n snr endOfBatch = do
-- t' <- {-# SCC "transactions-1" #-} incrCounter (-1) transactions
-- measureInt_ t' histo
when (endOfBatch && getSequenceNumber snr == iTERATIONS - 1) $
putMVar consumerFinished ()
return ()

ec <- newEventConsumer rb handler () [] (Sleep 1)
setGatingSequences rb [ecSequenceNumber ec]

start <- getCurrentTime
withEventProducer ep $ \aep ->
withEventConsumer ec $ \aec -> do
() <- takeMVar consumerFinished
end <- getCurrentTime
cancel aep
cancel aec
end <- getCurrentTime
printf "%-25.25s%10d\n" "Total number of events" iTERATIONS
printf "%-25.25s%10.2f s\n" "Duration" (realToFrac (diffUTCTime end start) :: Double)
let throughput :: Double
throughput = realToFrac iTERATIONS / realToFrac (diffUTCTime end start)
printf "%-25.25s%10.2f events/s\n" "Throughput" throughput
-- meanTransactions <- hmean histo
-- printf "%-25.25s%10.2f\n" "Mean concurrent txs" meanTransactions
-- Just maxTransactions <- percentile 100.0 histo
-- printf "%-25.25s%10.2f\n" "Max concurrent txs" maxTransactions
-- printf "%-25.25s%10.2f ns\n" "Latency" ((meanTransactions / throughput) * 1000000)
1 change: 0 additions & 1 deletion src/runtime-prototype/cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ reject-unconstrained-dependencies: all
constraints: QuickCheck +old-random

package stunt-double
ghc-options: -Wall

allow-older: *
allow-newer: *
89 changes: 89 additions & 0 deletions src/runtime-prototype/src/Disruptor/ConsumerUnboxed.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{-# LANGUAGE ExistentialQuantification #-} -- XXX

module Disruptor.ConsumerUnboxed where

import Control.Concurrent.Async
import Control.Concurrent
import Control.Concurrent.STM -- XXX
import Data.IORef.Unboxed
import Data.Vector.Unboxed (Unbox)

import Disruptor.SequenceNumber
import Disruptor.RingBuffer.SingleProducerUnboxed

------------------------------------------------------------------------

data EventConsumer s = EventConsumer
{ ecSequenceNumber :: {-# UNPACK #-} !(IORefU SequenceNumber)
, ecWorker :: s -> IO s
, ecInitialState :: s
}

-- NOTE: The `SequenceNumber` can be used for sharding, e.g. one handler handles
-- even and another handles odd numbers.
type EventHandler s e = s -> e -> SequenceNumber -> EndOfBatch -> IO s
type EndOfBatch = Bool

data SequenceBarrier e
= RingBufferBarrier (RingBuffer e)
| forall s. EventConsumerBarrier (EventConsumer s)

data WaitStrategy = Sleep Int

withEventConsumer :: EventConsumer s -> (Async s -> IO a) -> IO a
withEventConsumer ec k = withAsync (ecWorker ec (ecInitialState ec)) $ \a -> do
link a
k a

withEventConsumerOn :: Int -> EventConsumer s -> (Async s -> IO a) -> IO a
withEventConsumerOn capability ec k =
withAsyncOn capability (ecWorker ec (ecInitialState ec)) $ \a -> do
link a
k a

newEventConsumer :: Unbox e => RingBuffer e -> EventHandler s e -> s -> [SequenceBarrier e]
-> WaitStrategy -> IO (EventConsumer s)
newEventConsumer rb handler s0 barriers (Sleep n) = do
snrRef <- newIORefU (-1)

let go s = {-# SCC go #-} do
mySnr <- readIORefU snrRef
mbSnr <- waitFor mySnr rb barriers
case mbSnr of
Nothing -> do
-- XXX: Other wait strategies could be implemented here, e.g. we could
-- try to recurse immediately here, and if there's no work after a
-- couple of tries go into a takeMTVar sleep waiting for a producer to
-- wake us up.
threadDelay n
go s -- SPIN
Just bSnr -> do
-- XXX: what if handler throws exception? https://youtu.be/eTeWxZvlCZ8?t=2271
s' <- {-# SCC go' #-} go' (mySnr + 1) bSnr s
writeIORefU snrRef bSnr
go s'
where
go' lo hi s | lo > hi = return s
| lo <= hi = do
e <- unsafeGet rb lo
s' <- {-# SCC handler #-} handler s e lo (lo == hi)
go' (lo + 1) hi s'

return (EventConsumer snrRef go s0)

waitFor :: SequenceNumber -> RingBuffer e -> [SequenceBarrier e] -> IO (Maybe SequenceNumber)
waitFor snr rb [] = do
minSnr <- readIORefU (rbCursor rb)
if snr < minSnr
then return (Just minSnr)
else return Nothing
waitFor snr rb bs = do
minSnr <- minimum <$> mapM readIORefU (map getSequenceNumberRef bs)
if snr < minSnr
then return (Just minSnr)
else return Nothing
where
getSequenceNumberRef :: SequenceBarrier e -> IORefU SequenceNumber
getSequenceNumberRef (RingBufferBarrier rb) = rbCursor rb
getSequenceNumberRef (EventConsumerBarrier ec) = ecSequenceNumber ec
{-# INLINE waitFor #-}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
module Disruptor.RingBuffer.SingleProducerUnboxed where

import Control.Exception (assert)
import Control.Monad (when)
import Data.Bits (popCount)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.IORef.Unboxed (IORefU, newIORefU, readIORefU, writeIORefU)
import Data.Int (Int64)
import qualified Data.Vector as ImmutableVector
import qualified Data.Vector.Mutable as Boxed
import Data.Vector.Unboxed.Mutable (IOVector, Unbox)
import qualified Data.Vector.Unboxed.Mutable as Vector

import Disruptor.SequenceNumber

------------------------------------------------------------------------

data RingBuffer e = RingBuffer
{
-- | The capacity, or maximum amount of values, of the ring buffer.
rbCapacity :: {-# UNPACK #-} !Int64
-- | The cursor pointing to the head of the ring buffer.
, rbCursor :: {-# UNPACK #-} !(IORefU SequenceNumber)
-- | The values of the ring buffer.
, rbEvents :: !(IOVector e)
-- | References to the last consumers' sequence numbers, used in order to
-- avoid wrapping the buffer and overwriting events that have not been
-- consumed yet.
, rbGatingSequences :: {-# UNPACK #-} !(IORef (Boxed.IOVector (IORefU SequenceNumber)))
-- | Cached value of computing the last consumers' sequence numbers using the
-- above references.
, rbCachedGatingSequence :: {-# UNPACK #-} !(IORefU SequenceNumber)
}

newRingBuffer :: Unbox e => Int -> IO (RingBuffer e)
newRingBuffer capacity
| capacity <= 0 =
error "newRingBuffer: capacity must be greater than 0"
| popCount capacity /= 1 =
-- NOTE: The use of bitwise and (`.&.`) in `index` relies on this.
error "newRingBuffer: capacity must be a power of 2"
| otherwise = do
snr <- newIORefU (-1)
v <- Vector.new capacity
gs <- newIORef =<< Boxed.new 0
cgs <- newIORefU (-1)
return (RingBuffer (fromIntegral capacity) snr v gs cgs)

-- | The capacity, or maximum amount of values, of the ring buffer.
capacity :: RingBuffer e -> Int64
capacity = rbCapacity
{-# INLINE capacity #-}

getCursor :: RingBuffer e -> IO SequenceNumber
getCursor rb = readIORefU (rbCursor rb)
{-# INLINE getCursor #-}

setGatingSequences :: RingBuffer e -> [IORefU SequenceNumber] -> IO ()
setGatingSequences rb gs = do
v <- ImmutableVector.thaw (ImmutableVector.fromList gs)
writeIORef (rbGatingSequences rb) v
{-# INLINE setGatingSequences #-}

getCachedGatingSequence :: RingBuffer e -> IO SequenceNumber
getCachedGatingSequence rb = readIORefU (rbCachedGatingSequence rb)
{-# INLINE getCachedGatingSequence #-}

setCachedGatingSequence :: RingBuffer e -> SequenceNumber -> IO ()
setCachedGatingSequence rb = writeIORefU (rbCachedGatingSequence rb)
{-# INLINE setCachedGatingSequence #-}

minimumSequence :: RingBuffer e -> IO SequenceNumber
minimumSequence rb = do
cursorValue <- getCursor rb
minimumSequence' (rbGatingSequences rb) cursorValue
{-# INLINE minimumSequence #-}

minimumSequence' :: IORef (Boxed.IOVector (IORefU SequenceNumber)) -> SequenceNumber
-> IO SequenceNumber
minimumSequence' gatingSequences cursorValue = do
gs <- readIORef gatingSequences
go gs
where
go :: Boxed.IOVector (IORefU SequenceNumber) -> IO SequenceNumber
go gs = go' 0 cursorValue
where
len :: Int
len = Boxed.length gs - 1

go' :: Int -> SequenceNumber -> IO SequenceNumber
go' ix minSequence | ix > len = return minSequence
| ix <= len = do
g <- readIORefU =<< Boxed.read gs ix
if g < minSequence
then go' (ix + 1) g
else go' (ix + 1) minSequence
{-# INLINE minimumSequence' #-}

-- | Currently available slots to write to.
size :: RingBuffer e -> IO Int64
size rb = do
consumed <- minimumSequence rb
produced <- getCursor rb
return (capacity rb - fromIntegral (produced - consumed))
{-# INLINE size #-}

-- | Claim the next event in sequence for publishing.
next :: RingBuffer e -> IO SequenceNumber
next rb = nextBatch rb 1
{-# INLINE next #-}

-- | Claim the next `n` events in sequence for publishing. This is for batch
-- event producing. Returns the highest claimed sequence number, so using it
-- requires a bit of extra work, e.g.:
--
-- @
-- let n = 10
-- hi <- nextBatch rb n
-- let lo = hi - (n - 1)
-- mapM_ f [lo..hi]
-- publishBatch rb lo hi
-- @
--
nextBatch :: RingBuffer e -> Int -> IO SequenceNumber
nextBatch rb n = assert (n > 0 && fromIntegral n <= capacity rb) $ do
current <- getCursor rb
let nextSequence :: SequenceNumber
nextSequence = current + fromIntegral n

wrapPoint :: SequenceNumber
wrapPoint = nextSequence - fromIntegral (capacity rb)

writeIORefU (rbCursor rb) nextSequence
cachedGatingSequence <- getCachedGatingSequence rb

when (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) $
waitForConsumers wrapPoint

return nextSequence
where
waitForConsumers :: SequenceNumber -> IO ()
waitForConsumers wrapPoint = go
where
go :: IO ()
go = do
gatingSequence <- minimumSequence rb
if wrapPoint > gatingSequence
then go
else setCachedGatingSequence rb gatingSequence
{-# INLINE nextBatch #-}

-- Try to return the next sequence number to write to. If `Nothing` is returned,
-- then the last consumer has not yet processed the event we are about to
-- overwrite (due to the ring buffer wrapping around) -- the callee of `tryNext`
-- should apply back-pressure upstream if this happens.
tryNext :: RingBuffer e -> IO MaybeSequenceNumber
tryNext rb = tryNextBatch rb 1
{-# INLINE tryNext #-}

data MaybeSequenceNumber = None | Some {-# UNPACK #-} !SequenceNumber

tryNextBatch :: RingBuffer e -> Int -> IO MaybeSequenceNumber
tryNextBatch rb n = assert (n > 0) $ do
current <- getCursor rb
let next = current + fromIntegral n
wrapPoint = next - fromIntegral (capacity rb)
cachedGatingSequence <- getCachedGatingSequence rb
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
then do
minSequence <- minimumSequence' (rbGatingSequences rb) current
setCachedGatingSequence rb minSequence
if (wrapPoint > minSequence)
then return None
else return (Some next)
else return (Some next)
{-# INLINE tryNextBatch #-}

set :: Unbox e => RingBuffer e -> SequenceNumber -> e -> IO ()
set rb snr e = Vector.write (rbEvents rb) (index (rbCapacity rb) snr) e
{-# INLINE set #-}

publish :: RingBuffer e -> SequenceNumber -> IO ()
publish rb = writeIORefU (rbCursor rb)
{-# INLINE publish #-}

publishBatch :: RingBuffer e -> SequenceNumber -> SequenceNumber -> IO ()
publishBatch rb _lo hi = writeIORefU (rbCursor rb) hi
{-# INLINE publishBatch #-}

unsafeGet :: Unbox e => RingBuffer e -> SequenceNumber -> IO e
unsafeGet rb current = Vector.read (rbEvents rb) (index (capacity rb) current)
{-# INLINE unsafeGet #-}
5 changes: 4 additions & 1 deletion src/runtime-prototype/src/Disruptor/SequenceNumber.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Disruptor.SequenceNumber where

import Data.Bits (countLeadingZeros, finiteBitSize, shiftR, (.&.))
import Data.Int (Int64)
import Data.Primitive.Types (Prim)

------------------------------------------------------------------------

newtype SequenceNumber = SequenceNumber { getSequenceNumber :: Int64 }
deriving newtype (Num, Eq, Ord, Real, Enum, Integral, Show, Bounded)
deriving newtype (Num, Eq, Ord, Real, Enum, Integral, Show, Bounded, Prim)
-- ^ NOTE: `(maxBound :: Int64) == 9223372036854775807` so if we write 10M events
-- per second (`10_000_000*60*60*24*365 == 315360000000000) then it would take
-- us `9223372036854775807 / 315360000000000 == 29247.1208677536` years before
Expand Down
Loading

0 comments on commit ab19884

Please sign in to comment.