Skip to content

Commit

Permalink
refactor(runtime): finish handle events
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed May 25, 2021
1 parent b753993 commit c428f84
Showing 1 changed file with 65 additions and 43 deletions.
108 changes: 65 additions & 43 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ data ActorF x
= Invoke LocalRef Message (Message -> x)
| Send RemoteRef Message (Promise -> x)
| AsyncIO (IO IOResult) (Promise -> x)
| On Promise (Either IOResult Message -> x) (() -> x)
| On Promise (Either IOResult Message -> Free ActorF ()) (() -> x)
| Get (State -> x)
| Put State (() -> x)
deriving instance Functor ActorF
Expand Down Expand Up @@ -69,7 +69,7 @@ actorMapSpawn a s (ActorMap m) =
data Action
= SendAction LocalRef Message RemoteRef Promise
| AsyncIOAction (IO IOResult) Promise
| OnAction Promise (Either IOResult Message -> Actor) LocalRef
| OnAction Promise (Either IOResult Message -> Free ActorF ()) LocalRef

-- XXX: what about exceptions? transactional in state, but also in actions?!
actorMapTurn :: LocalRef -> Message -> ActorMap -> ((Message, ActorMap, [Action]), ActorMap)
Expand All @@ -79,34 +79,36 @@ actorMapTurn lref0 msg0 am0 =
in
-- XXX: Promises should not always start from 0, or they will overlap each
-- other if more than one turn happens...
(go 0 [] lref0 (unActor (a msg0)) am0, am0)
where
go _pc acc _lref (Pure msg) am = (msg, am, reverse acc)
go pc acc lref (Free op) am = case op of
Invoke lref' msg k ->
let
a' = fst (actorMapUnsafeLookup lref' am)
(reply, am', acc') = go pc acc lref' (unActor (a' msg)) am
in
go pc acc' lref (k reply) am'
Send rref msg k ->
let
p = Promise pc
in
go (pc + 1) (SendAction lref msg rref p : acc) lref (k p) am
AsyncIO io k ->
let
p = Promise pc
in
go (pc + 1) (AsyncIOAction io p : acc) lref (k p) am
On p c k ->
go pc (OnAction p (Actor . c) lref : acc) lref (k ()) am
Get k ->
go pc acc lref (k (snd (actorMapUnsafeLookup lref am))) am
Put s' k ->
case am of
ActorMap m ->
go pc acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m))
(actorMapTurn' 0 [] lref0 (unActor (a msg0)) am0, am0)

actorMapTurn' :: Int -> [Action] -> LocalRef -> Free ActorF a -> ActorMap
-> (a, ActorMap, [Action])
actorMapTurn' _pc acc _lref (Pure msg) am = (msg, am, reverse acc)
actorMapTurn' pc acc lref (Free op) am = case op of
Invoke lref' msg k ->
let
a' = fst (actorMapUnsafeLookup lref' am)
(reply, am', acc') = actorMapTurn' pc acc lref' (unActor (a' msg)) am
in
actorMapTurn' pc acc' lref (k reply) am'
Send rref msg k ->
let
p = Promise pc
in
actorMapTurn' (pc + 1) (SendAction lref msg rref p : acc) lref (k p) am
AsyncIO io k ->
let
p = Promise pc
in
actorMapTurn' (pc + 1) (AsyncIOAction io p : acc) lref (k p) am
On p c k ->
actorMapTurn' pc (OnAction p c lref : acc) lref (k ()) am
Get k ->
actorMapTurn' pc acc lref (k (snd (actorMapUnsafeLookup lref am))) am
Put s' k ->
case am of
ActorMap m ->
actorMapTurn' pc acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m))

actorMapPeek :: LocalRef -> Message -> ActorMap -> (Message, ActorMap)
actorMapPeek lref msg am =
Expand Down Expand Up @@ -154,7 +156,8 @@ devSend = undefined

data AsyncState = AsyncState
{ asyncStateAsyncIO :: Map (Async IOResult) Promise
, asyncStateContinuations :: Map Promise (Either IOResult Message -> Actor, LocalRef)
, asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (),
LocalRef)
-- , asyncStateDeveloperSend :: Map Promise (TMVar Message)
}

Expand Down Expand Up @@ -193,7 +196,7 @@ data Reaction
= Response Promise Message
| AsyncIOFinished (Async IOResult) IOResult

react :: Reaction -> AsyncState -> (Maybe (Actor, LocalRef), AsyncState)
react :: Reaction -> AsyncState -> (Maybe (Free ActorF (), LocalRef), AsyncState)
react (Response p msg) s =
case Map.lookup p (asyncStateContinuations s) of
Just (k, lref) -> (Just (k (Right msg), lref),
Expand All @@ -217,7 +220,7 @@ react (AsyncIOFinished a result) s =
, asyncStateContinuations = Map.delete p (asyncStateContinuations s)
})

reactIO :: Reaction -> TVar AsyncState -> IO (Maybe (Actor, LocalRef))
reactIO :: Reaction -> TVar AsyncState -> IO (Maybe (Free ActorF (), LocalRef))
reactIO r v = atomically (stateTVar v (react r))

------------------------------------------------------------------------
Expand All @@ -227,17 +230,19 @@ data Event
| Reaction Reaction

data EventLoop = EventLoop
{ lsActorMap :: TVar ActorMap
{ lsName :: EventLoopName
, lsActorMap :: TVar ActorMap
, lsAsyncState :: TVar AsyncState
, lsQueue :: TBQueue Event
, lsTransport :: Transport IO
, lsPids :: TVar [Async ()]
}

initLoopState :: Transport IO -> IO EventLoop
initLoopState t =
initLoopState :: EventLoopName -> Transport IO -> IO EventLoop
initLoopState name t =
EventLoop
<$> newTVarIO emptyActorMap
<$> pure name
<*> newTVarIO emptyActorMap
<*> newTVarIO emptyAsyncState
<*> newTBQueueIO 128
<*> pure t
Expand All @@ -248,7 +253,7 @@ makeEventLoop tk name = do
t <- case tk of
NamedPipe fp -> namedPipeTransport fp name
Http port -> httpTransport port
ls <- initLoopState t
ls <- initLoopState name t
aInHandler <- async (handleInbound ls)
aAsyncIOHandler <- async (handleAsyncIO ls)
aEvHandler <- async (handleEvents ls)
Expand All @@ -269,7 +274,7 @@ handleAsyncIO ls = forever go
where
go = atomically $ do
-- XXX: Use waitAnyCatchSTM and handle exceptions appropriately here, e.g.
-- by extending `AsyncIODone` with `Fail` and `Info`.
-- by extending `AsyncIOFinished` with `Fail` and `Info`.
as <- readTVar (lsAsyncState ls)
(a, ioResult) <- waitAnySTM (Map.keys (asyncStateAsyncIO as))
writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished a ioResult))
Expand All @@ -286,11 +291,28 @@ handleEvents ls = forever go
putStrLn ("handleEvents: exception: " ++ show ex)

handleEvent :: Event -> EventLoop -> IO ()
handleEvent (Action a) ls = undefined
-- XXX: act :: EventLoopName -> [Action] -> AsyncState -> Transport IO -> IO AsyncState
handleEvent (Action a) ls = do
-- XXX:
-- XXX: Non-atomic update of `lsAsyncState`, should be fixed...
-- XXX
s <- readTVarIO (lsAsyncState ls)
s' <- act (lsName ls) [a] s (lsTransport ls)
atomically (writeTVar (lsAsyncState ls) s')
handleEvent (Reaction r) ls = do
m <- reactIO r (lsAsyncState ls)
case m of
Nothing -> return ()
Just (a, lref) -> undefined
-- XXX: need a variant of actorMapTurn which doesn't take a message...
Just (a, lref) -> do
as <- atomically $ do
am <- readTVar (lsActorMap ls)
let ((), am', as) = actorMapTurn' 0 [] lref a am -- XXX: promise counter
-- should not always be
-- 0...
writeTVar (lsActorMap ls) am'
return as
-- XXX:
-- XXX: Non-atomic update of `lsAsyncState`, should be fixed...
-- XXX
s <- readTVarIO (lsAsyncState ls)
s' <- act (lsName ls) as s (lsTransport ls)
atomically (writeTVar (lsAsyncState ls) s')

0 comments on commit c428f84

Please sign in to comment.