Skip to content

Commit

Permalink
refactor(runtime): start using whatToDo
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 8, 2021
1 parent 91af53a commit 549d71c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 39 deletions.
24 changes: 14 additions & 10 deletions src/runtime-prototype/src/Disruptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import System.Posix.Files
------------------------------------------------------------------------

newtype SequenceNumber = SequenceNumber { getSequenceNumber :: Word64 }
deriving (Num, Eq, Ord, Real, Enum, Integral, Show)
deriving (Num, Eq, Ord, Real, Enum, Integral, Show, Bounded)

-- * Ring-buffer

Expand Down Expand Up @@ -74,7 +74,7 @@ newEventProducer rb p backPressure s0 = do
Just snr -> do
(e, s') <- p s
write rb e snr
-- putStrLn ("wrote to srn: " ++ show (getSequenceNumber snr))
putStrLn ("wrote to srn: " ++ show (getSequenceNumber snr))
publish rb
go s'

Expand Down Expand Up @@ -107,6 +107,8 @@ remainingCapacity rb produced = do
write :: RingBuffer e -> e -> SequenceNumber -> IO ()
write rb e snr = Vector.write (rbEvents rb) (index (rbCapacity rb) snr) e

-- TODO: Non-blocking multi-producer: https://youtu.be/VBnLW9mKMh4?t=1813
-- https://groups.google.com/g/lmax-disruptor/c/UhmRuz_CL6E/m/-hVt86bHvf8J
publish :: RingBuffer e -> IO ()
publish rb = atomicModifyIORef' (rbSequenceNumber rb) (\snr -> (snr + 1, ()))

Expand Down Expand Up @@ -164,15 +166,17 @@ newEventConsumer handler rb barriers (Sleep n) = do
waitFor :: SequenceNumber -> RingBuffer e -> [SequenceBarrier e] -> IO (Maybe SequenceNumber)
waitFor snr rb [] = waitFor snr rb [RingBufferBarrier rb]
waitFor snr rb bs = do
let snrs = concatMap getSequenceNumber bs
minSrn <- minimum <$> mapM readIORef snrs
if snr < minSrn
then return (Just minSrn)
let snrs = map getSequenceNumberRef bs
minSnr <- minimum <$> mapM readIORef snrs
putStrLn ("waitFor: snr = " ++ show (getSequenceNumber snr) ++
", minSrn = " ++ show (getSequenceNumber minSnr))
if (snr == maxBound && minSnr /= maxBound) || snr < minSnr
then return (Just minSnr)
else return Nothing
where
getSequenceNumber :: SequenceBarrier e -> [IORef SequenceNumber]
getSequenceNumber (RingBufferBarrier rb) = [rbSequenceNumber rb]
getSequenceNumber (EventConsumerBarrier ec) = [ecSequenceNumber ec]
getSequenceNumberRef :: SequenceBarrier e -> IORef SequenceNumber
getSequenceNumberRef (RingBufferBarrier rb) = rbSequenceNumber rb
getSequenceNumberRef (EventConsumerBarrier ec) = ecSequenceNumber ec

main :: IO ()
main = do
Expand All @@ -187,7 +191,7 @@ main = do
ec <- newEventConsumer handler rb [] (Sleep 1000000)
setGatingSequences rb [ec]
link (ecAsync ec)
threadDelay 30000000
threadDelay 5000000
cancel (epAsync ep)
cancel (ecAsync ec)
return ()
Expand Down
87 changes: 61 additions & 26 deletions src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,29 @@ data SchedulerAction
| TimeoutClient (Time, SchedulerEvent) Time -- XXX: what's the second time?

-- XXX: This will need some handling of faults
whatToDo :: {- RunInfo ref -> -} SchedulerState -> SchedulerAction
whatToDo :: {- RunInfo ref -> -} SchedulerState -> (SchedulerAction, SchedulerState)
whatToDo s0 = go s0
where
-- XXX: this comes from RunInfo
clientTimeout = 20
clientDelay = 20

go :: SchedulerState -> SchedulerAction -- XXX: return the new state also?
-- Otherwise we won't get through
-- the agenda if we call whatToDo in
-- a loop.
go :: SchedulerState -> (SchedulerAction, SchedulerState)
go s =
case Agenda.pop (agenda s) of
Nothing -> Done
Nothing -> (Done, s)
Just (ev@(t, event), agenda') ->
case lookupClient (from event) s of
Nothing ->
-- XXX: check if faults apply here
Execute ev False
(Execute ev False, s { agenda = agenda' })
Just t' ->
let
now :: Time
now = time s
in
if now `afterTime` (t' `addTime` clientTimeout)
then TimeoutClient ev now
then (TimeoutClient ev now, s { agenda = agenda' })
else
-- Update time. XXX: explain why?
go (s { agenda = Agenda.push (t `addTime` clientDelay, event) agenda' })
Expand Down Expand Up @@ -80,25 +77,48 @@ fakeScheduler executorRef (ClientRequest' "CreateTest" [SInt tid] cid) = Actor $
return (InternalMessage "ok")
fakeScheduler executorRef (ClientRequest' "Start" [] cid) =
let
step :: Free (ActorF SchedulerState) ()
step = do
r <- Agenda.pop . agenda <$> get
case r of
Just ((time, e), agenda') -> do
modify $ \s -> s { agenda = agenda'
, time = time
, steps = succ (steps s)
}
p <- send executorRef (InternalMessage (prettyEvent e))
on p (\(InternalMessageR (InternalMessage' "Events" args)) -> do
-- XXX: we should generate an arrival time here using the seed.
-- XXX: with some probability duplicate the event?
let Just evs = sequence (map (fromSDatatype time) args)
evs' = filter (\e -> kind e /= "ok") (concat evs)
agenda' = Agenda.fromList (map (\e -> (at e, e)) evs')
modify $ \s -> s { agenda = agenda s `Agenda.union` agenda' }
step
)
Nothing -> do
sa <- modifys whatToDo
case sa of
Execute (t, ev) dropped
| dropped -> do
undefined -- XXX
-- let now = Agenda.theTime ae
-- Time.advanceTime timeC now Time.BumpLogical
-- lnow <- Time.currentLogicalClock timeC
-- emitEvent traceC clientC testId runId dropped lnow ae
-- step
| otherwise -> do
modify $ \s -> s { time = t
, steps = succ (steps s)
}

p <- send executorRef (InternalMessage (prettyEvent ev))
on p (\(InternalMessageR (InternalMessage' "Events" args)) -> do
-- XXX: we should generate an arrival time here using the seed.
-- XXX: with some probability duplicate the event?
let Just evs = sequence (map (fromSDatatype t) args)
evs' = filter (\e -> kind e /= "ok") (concat evs)
agenda' = Agenda.fromList (map (\e -> (at e, e)) evs')
modify $ \s -> s { agenda = agenda s `Agenda.union` agenda' }
step
)
-- if client request we need to add it to state
-- let now = Agenda.theTime ae
-- Time.advanceTime timeC now Time.BumpLogical
-- currentLogicalTime <- Time.currentLogicalClock timeC
-- emitEvent traceC clientC testId runId dropped currentLogicalTime ae
-- let ref = senderRef runInfo (to $ Agenda.theEvent ae)
-- let ie = toInEvent testId runId currentLogicalTime ae
-- events <- case Executor.kind ie of
-- Executor.KEInternalMessage -> Executor.execute executorC ref ie
-- Executor.KEClient -> Executor.execute executorC ref ie
-- Executor.KETimer -> Executor.timer executorC ref ie
-- (cr, entries) <- partitionOutEvent clientC currentLogicalTime events
-- resolveClientResponses testId runId runInfo timeC cr
-- scheduleEvents randomC agendaC timeC entries
Done -> do
-- The format looks at follows:
-- LogSend _from (InternalMessage "{\"event\":\"write\",\"args\":{\"value\":1},\"at\":\"1970-01-01T00:00:00Z\",\"kind\":\"invoke\",\"to\":\"frontend\",\"from\":\"client:0\",\"meta\":null}") _to
-- For network_trace we need:
Expand Down Expand Up @@ -149,6 +169,21 @@ fakeScheduler executorRef (ClientRequest' "Start" [] cid) =
",\"run_id\":" ++ show (runId s) ++
",\"event_log\":" ++ show l ++
"}"))
TimeoutClient ae now -> do
-- Time.advanceTime timeC now Time.BumpLogical -- should this really bump logical?
-- lnow <- Time.currentLogicalClock timeC
-- emitTimeout traceC clientC testId runId False lnow ae
-- go
undefined
{-
Tick now -> do
Time.advanceTime timeC now Time.KeepLogical
events <- forM (allRefs runInfo) $ \ ref -> Executor.tick executorC ref now
let (cr, entries) = partitionOutEvent (sort $ concat events)
resolveClientResponses testId runId runInfo timeC cr
scheduleEvents randomC agendaC timeC entries
go
-}
in
Actor $ do
step
Expand Down
3 changes: 2 additions & 1 deletion src/runtime-prototype/src/StuntDouble.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import StuntDouble.AdminTransport as X
import StuntDouble.AdminTransport.NamedPipe as X
import StuntDouble.Codec as X
import StuntDouble.Envelope as X
import StuntDouble.FreeMonad as X
import StuntDouble.Frontend.Http as X
import StuntDouble.Histogram as X
import StuntDouble.IO as X
import StuntDouble.Log as X
import StuntDouble.LogicalTime as X
import StuntDouble.Message as X
import StuntDouble.Metrics as X
import StuntDouble.Random as X
import StuntDouble.Reference as X
import StuntDouble.Time as X
import StuntDouble.LogicalTime as X
import StuntDouble.Transport as X
import StuntDouble.Transport.Http as X
import StuntDouble.Transport.HttpSync as X
Expand Down
4 changes: 2 additions & 2 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ put s' = Free (Put s' return)
modify :: Typeable s => (s -> s) -> Free (ActorF s) ()
modify f = put . f =<< get

modifys :: Typeable s => (s -> (s, a)) -> Free (ActorF s) a
modifys :: Typeable s => (s -> (a, s)) -> Free (ActorF s) a
modifys f = do
s <- get
let (s', x) = f s
let (x, s') = f s
put s'
return x

Expand Down

0 comments on commit 549d71c

Please sign in to comment.