Skip to content

Commit

Permalink
feat(runtime): make invoke work
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Apr 29, 2021
1 parent 47e8ab0 commit a96fe21
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/runtime-prototype/src/StuntDouble/Actor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ data Cont a

-- XXX: dup
data IOResult = Unit | String String
deriving Show

data ActorF x
= Call LocalRef Message (Message -> x)
Expand Down
41 changes: 28 additions & 13 deletions src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}

module StuntDouble.EventLoop where

import Data.Map (Map)
import qualified Data.Map as Map
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
Expand Down Expand Up @@ -39,7 +42,6 @@ makeEventLoop fp = do
transport <- namedPipeTransport fp
loopState <- initLoopState transport
aReqHandler <- async (handleRequests loopState)
-- tid' <- forkIO $ forever $ undefined loopState
a <- async (handleEvents loopState)
atomically (putTMVar (loopStateAsync loopState) a)
return (EventLoopRef loopState)
Expand All @@ -49,38 +51,47 @@ handleEvents ls = go
where
go = do
e <- atomically (readTBQueue (loopStateQueue ls))
putStrLn (eventName e)
handleEvent e ls
`catch` \(exception :: SomeException) -> print exception
go

handleEvent :: Event -> LoopState -> IO ()
handleEvent (Command c) ls = handleCommand c ls
handleEvent (Receive r) ls = handleReceive r ls

handleCommand :: Command -> LoopState -> IO ()
handleCommand (Spawn actor respVar) ls = do
undefined
handleCommand (Invoke lr m respVar) ls = do
Just actor <- lookupActor lr (loopStateActors ls)
Now reply <- runActor ls (actor m)
handleCommand (Spawn actor respVar) ls = atomically $ do
actors <- readTVar (loopStateActors ls)
let lref = LocalRef (Map.size actors)
writeTVar (loopStateActors ls) (Map.insert lref actor actors)
putTMVar respVar lref
handleCommand (Invoke lref msg respVar) ls = do
Just actor <- lookupActor lref (loopStateActors ls)
Now reply <- runActor ls (actor msg)
atomically (putTMVar respVar reply)
handleCommand (Send rr m) ls = do
-- a <- async (makeHttpRequest (translateToUrl rr) (seralise m))
-- atomically (modifyTVar (loopStateAsyncs ls) (a :))
undefined
handleCommand Quit ls = do
a <- atomically (takeTMVar (loopStateAsync ls))
threadDelay 100000
cancel a

data ActorNotFound = ActorNotFound RemoteRef
deriving Show

instance Exception ActorNotFound where

handleReceive :: Request -> LoopState -> IO ()
handleReceive :: Receive -> LoopState -> IO ()
handleReceive (Request e) ls = do
mActor <- lookupActor (remoteToLocalRef (envelopeReceiver e)) (loopStateActors ls)
case mActor of
Nothing -> throwIO (ActorNotFound (envelopeReceiver e)) -- XXX: Throw here or just log and continue?
-- XXX: Throw here or just log and continue? Or take care of this at one
-- layer above transport where we authenticate and verify messages are only
-- going to known actors, i.e. that the remote refs are valid.
Nothing -> throwIO (ActorNotFound (envelopeReceiver e))
Just actor -> do
_ <- runActor ls (actor (envelopeMessage e))
return ()
Expand All @@ -90,7 +101,9 @@ runActor ls = iterM go return
where
go :: ActorF (IO a) -> IO a
go (Call lref msg k) = do
undefined
Just actor <- lookupActor lref (loopStateActors ls)
Now reply <- runActor ls (actor msg)
k reply
go (RemoteCall rref msg k) = do
undefined
go (AsyncIO m k) = do
Expand All @@ -107,10 +120,12 @@ quit r = atomically $
writeTBQueue (loopStateQueue (loopRefLoopState r)) (Command Quit)

helper :: EventLoopRef -> (TMVar a -> Command) -> IO a
helper r cmd = atomically $ do
respVar <- newEmptyTMVar
writeTBQueue (loopStateQueue (loopRefLoopState r)) (Command (cmd respVar))
takeTMVar respVar
helper r cmd = do
respVar <- atomically $ do
respVar <- newEmptyTMVar
writeTBQueue (loopStateQueue (loopRefLoopState r)) (Command (cmd respVar))
return respVar
atomically (takeTMVar respVar)

spawn :: EventLoopRef -> (Message -> Actor) -> IO LocalRef
spawn r actor = helper r (Spawn actor)
Expand Down
20 changes: 16 additions & 4 deletions src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,36 @@ import StuntDouble.Reference

------------------------------------------------------------------------

data Event = Command Command | Response Response | Receive Request
data Event
= Command Command
| Response Response
| Receive Receive

eventName :: Event -> String
eventName (Command cmd) = "Command/" ++ commandName cmd

data Command
= Spawn (Message -> Actor) (TMVar LocalRef)
| Invoke LocalRef Message (TMVar Message)
| Send RemoteRef Message
| Quit

commandName :: Command -> String
commandName Spawn {} = "Spawn"
commandName Invoke {} = "Invoke"
commandName Send {} = "Send"
commandName Quit {} = "Quit"

data Response
= IOReady (Async IOResult)
-- Receive (Async Message) Message

data Request = Request Envelope

data Receive
= Request Envelope

data Envelope = Envelope
{ envelopeSender :: RemoteRef
, envelopeMessage :: Message
, envelopeReceiver :: RemoteRef
}
deriving (Eq, Show, Read)

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ handleRequests ls = forever go
e <- receive (loopStateTransport ls)
atomically (writeTBQueue (loopStateQueue ls) (Receive (Request e)))

handleRequest :: Request -> LoopState -> IO ()
handleRequest :: Receive -> LoopState -> IO ()
handleRequest (Request e) ls = undefined
-- Treat this like a `Send (envelopeReceiver e) (envelopeMessage e)`?

Expand Down

0 comments on commit a96fe21

Please sign in to comment.