diff --git a/src/runtime-prototype/src/StuntDouble/ActorMap.hs b/src/runtime-prototype/src/StuntDouble/ActorMap.hs index da44adca..0938cc6c 100644 --- a/src/runtime-prototype/src/StuntDouble/ActorMap.hs +++ b/src/runtime-prototype/src/StuntDouble/ActorMap.hs @@ -39,7 +39,7 @@ data ActorF x = Invoke LocalRef Message (Message -> x) | Send RemoteRef Message (Promise -> x) | AsyncIO (IO IOResult) (Promise -> x) - | On Promise (Either IOResult Message -> x) (() -> x) + | On Promise (Either IOResult Message -> Free ActorF ()) (() -> x) | Get (State -> x) | Put State (() -> x) deriving instance Functor ActorF @@ -69,7 +69,7 @@ actorMapSpawn a s (ActorMap m) = data Action = SendAction LocalRef Message RemoteRef Promise | AsyncIOAction (IO IOResult) Promise - | OnAction Promise (Either IOResult Message -> Actor) LocalRef + | OnAction Promise (Either IOResult Message -> Free ActorF ()) LocalRef -- XXX: what about exceptions? transactional in state, but also in actions?! actorMapTurn :: LocalRef -> Message -> ActorMap -> ((Message, ActorMap, [Action]), ActorMap) @@ -79,34 +79,36 @@ actorMapTurn lref0 msg0 am0 = in -- XXX: Promises should not always start from 0, or they will overlap each -- other if more than one turn happens... - (go 0 [] lref0 (unActor (a msg0)) am0, am0) - where - go _pc acc _lref (Pure msg) am = (msg, am, reverse acc) - go pc acc lref (Free op) am = case op of - Invoke lref' msg k -> - let - a' = fst (actorMapUnsafeLookup lref' am) - (reply, am', acc') = go pc acc lref' (unActor (a' msg)) am - in - go pc acc' lref (k reply) am' - Send rref msg k -> - let - p = Promise pc - in - go (pc + 1) (SendAction lref msg rref p : acc) lref (k p) am - AsyncIO io k -> - let - p = Promise pc - in - go (pc + 1) (AsyncIOAction io p : acc) lref (k p) am - On p c k -> - go pc (OnAction p (Actor . c) lref : acc) lref (k ()) am - Get k -> - go pc acc lref (k (snd (actorMapUnsafeLookup lref am))) am - Put s' k -> - case am of - ActorMap m -> - go pc acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m)) + (actorMapTurn' 0 [] lref0 (unActor (a msg0)) am0, am0) + +actorMapTurn' :: Int -> [Action] -> LocalRef -> Free ActorF a -> ActorMap + -> (a, ActorMap, [Action]) +actorMapTurn' _pc acc _lref (Pure msg) am = (msg, am, reverse acc) +actorMapTurn' pc acc lref (Free op) am = case op of + Invoke lref' msg k -> + let + a' = fst (actorMapUnsafeLookup lref' am) + (reply, am', acc') = actorMapTurn' pc acc lref' (unActor (a' msg)) am + in + actorMapTurn' pc acc' lref (k reply) am' + Send rref msg k -> + let + p = Promise pc + in + actorMapTurn' (pc + 1) (SendAction lref msg rref p : acc) lref (k p) am + AsyncIO io k -> + let + p = Promise pc + in + actorMapTurn' (pc + 1) (AsyncIOAction io p : acc) lref (k p) am + On p c k -> + actorMapTurn' pc (OnAction p c lref : acc) lref (k ()) am + Get k -> + actorMapTurn' pc acc lref (k (snd (actorMapUnsafeLookup lref am))) am + Put s' k -> + case am of + ActorMap m -> + actorMapTurn' pc acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m)) actorMapPeek :: LocalRef -> Message -> ActorMap -> (Message, ActorMap) actorMapPeek lref msg am = @@ -154,7 +156,8 @@ devSend = undefined data AsyncState = AsyncState { asyncStateAsyncIO :: Map (Async IOResult) Promise - , asyncStateContinuations :: Map Promise (Either IOResult Message -> Actor, LocalRef) + , asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (), + LocalRef) -- , asyncStateDeveloperSend :: Map Promise (TMVar Message) } @@ -193,7 +196,7 @@ data Reaction = Response Promise Message | AsyncIOFinished (Async IOResult) IOResult -react :: Reaction -> AsyncState -> (Maybe (Actor, LocalRef), AsyncState) +react :: Reaction -> AsyncState -> (Maybe (Free ActorF (), LocalRef), AsyncState) react (Response p msg) s = case Map.lookup p (asyncStateContinuations s) of Just (k, lref) -> (Just (k (Right msg), lref), @@ -217,7 +220,7 @@ react (AsyncIOFinished a result) s = , asyncStateContinuations = Map.delete p (asyncStateContinuations s) }) -reactIO :: Reaction -> TVar AsyncState -> IO (Maybe (Actor, LocalRef)) +reactIO :: Reaction -> TVar AsyncState -> IO (Maybe (Free ActorF (), LocalRef)) reactIO r v = atomically (stateTVar v (react r)) ------------------------------------------------------------------------ @@ -227,17 +230,19 @@ data Event | Reaction Reaction data EventLoop = EventLoop - { lsActorMap :: TVar ActorMap + { lsName :: EventLoopName + , lsActorMap :: TVar ActorMap , lsAsyncState :: TVar AsyncState , lsQueue :: TBQueue Event , lsTransport :: Transport IO , lsPids :: TVar [Async ()] } -initLoopState :: Transport IO -> IO EventLoop -initLoopState t = +initLoopState :: EventLoopName -> Transport IO -> IO EventLoop +initLoopState name t = EventLoop - <$> newTVarIO emptyActorMap + <$> pure name + <*> newTVarIO emptyActorMap <*> newTVarIO emptyAsyncState <*> newTBQueueIO 128 <*> pure t @@ -248,7 +253,7 @@ makeEventLoop tk name = do t <- case tk of NamedPipe fp -> namedPipeTransport fp name Http port -> httpTransport port - ls <- initLoopState t + ls <- initLoopState name t aInHandler <- async (handleInbound ls) aAsyncIOHandler <- async (handleAsyncIO ls) aEvHandler <- async (handleEvents ls) @@ -269,7 +274,7 @@ handleAsyncIO ls = forever go where go = atomically $ do -- XXX: Use waitAnyCatchSTM and handle exceptions appropriately here, e.g. - -- by extending `AsyncIODone` with `Fail` and `Info`. + -- by extending `AsyncIOFinished` with `Fail` and `Info`. as <- readTVar (lsAsyncState ls) (a, ioResult) <- waitAnySTM (Map.keys (asyncStateAsyncIO as)) writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished a ioResult)) @@ -286,11 +291,28 @@ handleEvents ls = forever go putStrLn ("handleEvents: exception: " ++ show ex) handleEvent :: Event -> EventLoop -> IO () -handleEvent (Action a) ls = undefined --- XXX: act :: EventLoopName -> [Action] -> AsyncState -> Transport IO -> IO AsyncState +handleEvent (Action a) ls = do + -- XXX: + -- XXX: Non-atomic update of `lsAsyncState`, should be fixed... + -- XXX + s <- readTVarIO (lsAsyncState ls) + s' <- act (lsName ls) [a] s (lsTransport ls) + atomically (writeTVar (lsAsyncState ls) s') handleEvent (Reaction r) ls = do m <- reactIO r (lsAsyncState ls) case m of Nothing -> return () - Just (a, lref) -> undefined - -- XXX: need a variant of actorMapTurn which doesn't take a message... + Just (a, lref) -> do + as <- atomically $ do + am <- readTVar (lsActorMap ls) + let ((), am', as) = actorMapTurn' 0 [] lref a am -- XXX: promise counter + -- should not always be + -- 0... + writeTVar (lsActorMap ls) am' + return as + -- XXX: + -- XXX: Non-atomic update of `lsAsyncState`, should be fixed... + -- XXX + s <- readTVarIO (lsAsyncState ls) + s' <- act (lsName ls) as s (lsTransport ls) + atomically (writeTVar (lsAsyncState ls) s')