Skip to content


test(runtime): add test for two producers and one consumer (still fails)
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 14, 2021
1 parent 92fd66e commit ed16243
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 124 deletions.
230 changes: 122 additions & 108 deletions src/runtime-prototype/src/Disruptor.hs
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Disruptor where

import Data.Atomics (casIORef, readForCAS, peekTicket)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, withAsync, link)
import Control.Concurrent.STM
(TMVar, atomically, isEmptyTMVar, newEmptyTMVarIO, tryPutTMVar)
import Control.Monad (foldM, void, when)
import Data.Atomics (casIORef, peekTicket, readForCAS)
import Data.Bits
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
(countLeadingZeros, finiteBitSize, popCount, shiftR, (.&.))
import Data.IORef
import Data.Int
( IORef
, atomicModifyIORef'
, atomicWriteIORef
, newIORef
, readIORef
, writeIORef
import Data.Int (Int64)
import Data.Vector.Mutable (IOVector)
import qualified Data.Vector.Mutable as Vector
import System.IO
import System.IO.Error
import System.Posix.Files


Expand All @@ -34,14 +41,10 @@ data RingBuffer e = RingBuffer
, rbCapacity :: Int64
, rbSequenceNumber :: IORef SequenceNumber
, rbEvents :: IOVector e
, rbGatingSequences :: IORef [IORef SequenceNumber] -- ^ References to the
-- last consumers'
-- sequence numbers, used
-- in order to avoid
-- wrapping the buffer and
-- overwriting events that
-- have not been consumed
-- yet.
-- | References to the last consumers' sequence numbers, used in order to
-- avoid wrapping the buffer and overwriting events that have not been
-- consumed yet.
, rbGatingSequences :: IORef [IORef SequenceNumber]
, rbCachedGatingSequence :: IORef SequenceNumber
, rbAvailableBuffer :: IOVector Int
Expand Down Expand Up @@ -73,9 +76,13 @@ getCursor rb = readIORef (rbSequenceNumber rb)
claim :: RingBuffer e -> SequenceNumber -> IO ()
claim rb = atomicWriteIORef (rbSequenceNumber rb)

setGatingSequences :: RingBuffer e -> [EventConsumer] -> IO ()
data Exists f = forall x. Exists (f x)

setGatingSequences :: RingBuffer e -> [Exists EventConsumer] -> IO ()
setGatingSequences rb eps =
atomicWriteIORef (rbGatingSequences rb) (map ecSequenceNumber eps)
atomicWriteIORef (rbGatingSequences rb) (map go eps)
go (Exists ec) = ecSequenceNumber ec

getCachedGatingSequence :: RingBuffer e -> IO SequenceNumber
getCachedGatingSequence rb = readIORef (rbCachedGatingSequence rb)
Expand Down Expand Up @@ -110,28 +117,42 @@ logBase2 w = finiteBitSize w - 1 - countLeadingZeros w

-- * Event producers

data EventProducer = EventProducer
{ epAsync :: Async ()
data EventProducer s = EventProducer
{ epWorker :: s -> IO s
, epInitialState :: s
, epShutdown :: Shutdown

newEventProducer :: RingBuffer e -> (s -> IO (e, s)) -> (s -> IO ()) -> s -> IO EventProducer
newEventProducer :: RingBuffer e -> (s -> IO (e, s)) -> (s -> IO ()) -> s
-> IO (EventProducer s)
newEventProducer rb p backPressure s0 = do
a <- async (go s0)
return (EventProducer a)
go s = do
mSnr <- tryNext rb
case mSnr of
Nothing -> do
putStrLn "producer: consumer is too slow"
backPressure s
go s
Just snr -> do
(e, s') <- p s
set rb snr e
putStrLn ("wrote to srn: " ++ show (getSequenceNumber snr))
publish rb snr
go s'
shutdownVar <- newShutdownVar
let go s = do
mSnr <- tryNext rb
case mSnr of
Nothing -> do
putStrLn "producer: consumer is too slow"
backPressure s
halt <- isItTimeToShutdown shutdownVar
if halt
then return s
else go s
Just snr -> do
(e, s') <- p s
set rb snr e
putStrLn ("wrote to srn: " ++ show (getSequenceNumber snr))
publish rb snr
halt <- isItTimeToShutdown shutdownVar
if halt
then return s'
else go s'

return (EventProducer go s0 shutdownVar)

withEventProducer :: EventProducer s -> (Async s -> IO a) -> IO a
withEventProducer ep k = withAsync (epWorker ep (epInitialState ep)) $ \a -> do
link a
k a

-- | Claim the next event in sequence for publishing.
next :: RingBuffer e -> IO SequenceNumber
Expand Down Expand Up @@ -295,52 +316,81 @@ get rb snr = do

-- * Event consumers

data EventConsumer = EventConsumer
data EventConsumer s = EventConsumer
{ ecSequenceNumber :: IORef SequenceNumber
, ecAsync :: Async ()
, ecWorker :: s -> IO s
, ecInitialState :: s
, ecShutdown :: Shutdown

-- NOTE: The `SequenceNumber` can be used for sharding, e.g. one handler handles
-- even and another handles odd numbers.
type EventHandler e = e -> SequenceNumber -> EndOfBatch -> IO ()
type EventHandler s e = s -> e -> SequenceNumber -> EndOfBatch -> IO s
type EndOfBatch = Bool

data SequenceBarrier e
= RingBufferBarrier (RingBuffer e)
| EventConsumerBarrier EventConsumer
| forall s. EventConsumerBarrier (EventConsumer s)

data WaitStrategy = Sleep Int

newEventConsumer :: EventHandler e -> RingBuffer e -> [SequenceBarrier e] -> WaitStrategy
-> IO EventConsumer
newEventConsumer handler rb barriers (Sleep n) = do
putStrLn "starting consumer"
withEventConsumer :: EventConsumer s -> (Async s -> IO a) -> IO a
withEventConsumer ec k = withAsync (ecWorker ec (ecInitialState ec)) $ \a -> do
link a
k a
-- ^ XXX: Pin to a specific CPU core with `withAsyncOn`?

newtype Shutdown = Shutdown (TMVar ())

newShutdownVar :: IO Shutdown
newShutdownVar = Shutdown <$> newEmptyTMVarIO

tellToShutdown :: Shutdown -> IO ()
tellToShutdown (Shutdown tmvar) = void (atomically (tryPutTMVar tmvar ()))

shutdownProducer :: EventProducer s -> IO ()
shutdownProducer = tellToShutdown . epShutdown

shutdownConsumer :: EventConsumer s -> IO ()
shutdownConsumer = tellToShutdown . ecShutdown

isItTimeToShutdown :: Shutdown -> IO Bool
isItTimeToShutdown (Shutdown tmvar) = not <$> atomically (isEmptyTMVar tmvar)

newEventConsumer :: RingBuffer e -> EventHandler s e -> s -> [SequenceBarrier e]
-> WaitStrategy -> IO (EventConsumer s)
newEventConsumer rb handler s0 barriers (Sleep n) = do
snrRef <- newIORef (-1)
a <- async (go snrRef) -- XXX: Pin to a specific CPU core with `asyncOn`?
return (EventConsumer snrRef a)
go snrRef = do
mySnr <- readIORef snrRef
mbSnr <- waitFor mySnr rb barriers
case mbSnr of
Nothing -> do
-- XXX: Other wait strategies could be implemented here, e.g. we could
-- try to recurse immediately here, and if there's no work after a
-- couple of tries go into a takeMTVar sleep waiting for a producer to
-- wake us up.
threadDelay n
putStrLn ("nothing to do, mySrn = " ++ show (getSequenceNumber mySnr))
-- XXX: Maybe we want to check if a shutdown variable has been set before looping?
go snrRef
Just bSnr -> do
putStrLn ("something to do, mySrn = " ++ show (getSequenceNumber mySnr) ++
", bSnr = " ++ show (getSequenceNumber bSnr))
-- XXX: what if handler throws exception?
mapM_ (\snr -> unsafeGet rb snr >>= \e ->
handler e snr (snr == bSnr)) [mySnr + 1..bSnr]
writeIORef snrRef bSnr
threadDelay 1000000
go snrRef
shutdownVar <- newShutdownVar

let go s = do
mySnr <- readIORef snrRef
mbSnr <- waitFor mySnr rb barriers
case mbSnr of
Nothing -> do
-- XXX: Other wait strategies could be implemented here, e.g. we could
-- try to recurse immediately here, and if there's no work after a
-- couple of tries go into a takeMTVar sleep waiting for a producer to
-- wake us up.
threadDelay n
putStrLn ("nothing to do, mySrn = " ++ show (getSequenceNumber mySnr))
halt <- isItTimeToShutdown shutdownVar
if halt
then return s
else go s
Just bSnr -> do
putStrLn ("something to do, mySrn = " ++ show (getSequenceNumber mySnr) ++
", bSnr = " ++ show (getSequenceNumber bSnr))
-- XXX: what if handler throws exception?
s' <- foldM (\ih snr -> unsafeGet rb snr >>= \e ->
handler ih e snr (snr == bSnr)) s [mySnr + 1..bSnr]
writeIORef snrRef bSnr
halt <- isItTimeToShutdown shutdownVar
if halt
then return s'
else go s'

return (EventConsumer snrRef go s0 shutdownVar)

waitFor :: SequenceNumber -> RingBuffer e -> [SequenceBarrier e] -> IO (Maybe SequenceNumber)
waitFor snr rb [] = waitFor snr rb [RingBufferBarrier rb]
Expand All @@ -349,46 +399,10 @@ waitFor snr rb bs = do
minSnr <- minimum <$> mapM readIORef snrs
putStrLn ("waitFor: snr = " ++ show (getSequenceNumber snr) ++
", minSrn = " ++ show (getSequenceNumber minSnr))
if (snr == maxBound && minSnr /= maxBound) || snr < minSnr
if snr < minSnr
then return (Just minSnr)
else return Nothing
getSequenceNumberRef :: SequenceBarrier e -> IORef SequenceNumber
getSequenceNumberRef (RingBufferBarrier rb) = rbSequenceNumber rb
getSequenceNumberRef (EventConsumerBarrier ec) = ecSequenceNumber ec

main :: IO ()
main = do
rb <- newRingBuffer SingleProducer 128
let pipe = "/tmp/producer-pipe"
safeCreateNamedPipe pipe
h <- openFile pipe ReadWriteMode
hSetBuffering h LineBuffering
let backPressure = const (threadDelay 1000000)
ep <- newEventProducer rb producer backPressure h
link (epAsync ep)
ec <- newEventConsumer handler rb [] (Sleep 1000000)
setGatingSequences rb [ec]
link (ecAsync ec)
threadDelay 5000000
cancel (epAsync ep)
cancel (ecAsync ec)
return ()
handler str snr eob = putStrLn (show (getSequenceNumber snr) ++ ": " ++ str ++
if eob then ";" else "")
producer h = do
l <- hGetLine h
return (l, h)

safeCreateNamedPipe :: FilePath -> IO ()
safeCreateNamedPipe fp =
(\e -> if isAlreadyExistsErrorType (ioeGetErrorType e)
then Just ()
else Nothing)
(createNamedPipe fp
(namedPipeMode `unionFileModes`
ownerReadMode `unionFileModes`

0 comments on commit ed16243

Please sign in to comment.