Skip to content

Commit

Permalink
feat(runtime): add send doesn't work yet, because we always send to o…
Browse files Browse the repository at this point in the history
…urselves
  • Loading branch information
symbiont-stevan-andjelkovic committed Apr 29, 2021
1 parent 46567c1 commit 98ab4bc
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 92 deletions.
77 changes: 55 additions & 22 deletions src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module StuntDouble.EventLoop where
Expand All @@ -18,7 +19,7 @@ import StuntDouble.EventLoop.Transport
import StuntDouble.FreeMonad
import StuntDouble.EventLoop.State
import StuntDouble.EventLoop.Event
import StuntDouble.EventLoop.RequestHandler
import StuntDouble.EventLoop.InboundHandler

------------------------------------------------------------------------

Expand All @@ -27,38 +28,46 @@ newtype EventLoopRef = EventLoopRef

------------------------------------------------------------------------

initLoopState :: Transport IO -> IO LoopState
initLoopState transport =
initLoopState :: EventLoopName -> Transport IO -> IO LoopState
initLoopState name transport =
LoopState
<$> newTVarIO []
<$> pure name
<*> newTVarIO []
<*> newTBQueueIO 128
<*> newTVarIO Map.empty
<*> newTVarIO Map.empty
<*> newTVarIO []
<*> pure transport
<*> newTVarIO 0
<*> newTVarIO Map.empty

makeEventLoop :: FilePath -> IO EventLoopRef
makeEventLoop fp = do
makeEventLoop :: EventLoopName -> FilePath -> IO EventLoopRef
makeEventLoop name fp = do
transport <- namedPipeTransport fp
ls <- initLoopState transport
aReqHandler <- async (handleRequests ls)
ls <- initLoopState name transport
aInHandler <- async (handleInbound ls)
aEvHandler <- async (handleEvents ls)
atomically (modifyTVar' (loopStatePids ls) ([aReqHandler, aEvHandler] ++ ))
atomically (modifyTVar' (loopStatePids ls) ([aInHandler, aEvHandler] ++ ))
return (EventLoopRef ls)

handleEvents :: LoopState -> IO ()
handleEvents ls = go
where
go = do
e <- atomically (readTBQueue (loopStateQueue ls))
putStr (getEventLoopName (loopStateName ls) ++ "> ")
putStrLn (eventName e)
handleEvent e ls
`catch` \(exception :: SomeException) -> print exception
go

handleEvent :: Event -> LoopState -> IO ()
handleEvent (Command c) ls = handleCommand c ls
handleEvent (Receive r) ls = handleReceive r ls
handleEvent (Command c) ls = handleCommand c ls
handleEvent (Response r) ls = handleResponse r ls
handleEvent (Receive r) ls = handleReceive r ls

dummyDeveloperRef :: RemoteRef
dummyDeveloperRef = RemoteRef "dev" 0

handleCommand :: Command -> LoopState -> IO ()
handleCommand (Spawn actor respVar) ls = atomically $ do
Expand All @@ -70,15 +79,30 @@ handleCommand (Invoke lref msg respVar) ls = do
Just actor <- lookupActor lref (loopStateActors ls)
Now reply <- runActor ls (actor msg)
atomically (putTMVar respVar reply)
handleCommand (Send rr m) ls = do
-- a <- async (makeHttpRequest (translateToUrl rr) (seralise m))
-- atomically (modifyTVar (loopStateAsyncs ls) (a :))
undefined
handleCommand (Send rref msg respVar) ls = do
(corrId, respTMVar) <- atomically $ do
corrId <- readTVar (loopStateNextCorrelationId ls)
modifyTVar' (loopStateNextCorrelationId ls) succ
respTMVar <- newEmptyTMVar
modifyTVar' (loopStateResponses ls) (Map.insert corrId respTMVar)
return (corrId, respTMVar)
transportSend (loopStateTransport ls) (Envelope dummyDeveloperRef msg rref corrId)
a <- async $ atomically $ do
resp <- takeTMVar respTMVar -- XXX: timeout?
modifyTVar' (loopStateResponses ls) (Map.delete corrId)
return resp
atomically (putTMVar respVar a)
handleCommand Quit ls = do
pids <- atomically (readTVar (loopStatePids ls))
threadDelay 100000
mapM_ cancel pids

handleResponse :: Response -> LoopState -> IO ()
handleResponse (Reply respTMVar e) ls
| envelopeSender e == dummyDeveloperRef =
atomically (putTMVar respTMVar (envelopeMessage e))
| otherwise = undefined

data ActorNotFound = ActorNotFound RemoteRef
deriving Show

Expand All @@ -93,8 +117,8 @@ handleReceive (Request e) ls = do
-- going to known actors, i.e. that the remote refs are valid.
Nothing -> throwIO (ActorNotFound (envelopeReceiver e))
Just actor -> do
_ <- runActor ls (actor (envelopeMessage e))
return ()
Now replyMsg <- runActor ls (actor (envelopeMessage e))
transportSend (loopStateTransport ls) (reply e replyMsg)

runActor :: LoopState -> Free ActorF a -> IO a
runActor ls = iterM go return
Expand All @@ -108,7 +132,7 @@ runActor ls = iterM go return
undefined
go (AsyncIO m k) = do
a <- async m
atomically (modifyTVar (loopStateIOAsyncs ls) (a :))
atomically (modifyTVar' (loopStateIOAsyncs ls) (a :))
k a
go (Get k) = do
undefined
Expand All @@ -133,14 +157,23 @@ spawn r actor = helper r (Spawn actor)
invoke :: EventLoopRef -> LocalRef -> Message -> IO Message
invoke r lref msg = helper r (Invoke lref msg)

send :: EventLoopRef -> RemoteRef -> Message -> IO (Async Message)
send r rref msg = helper r (Send rref msg)

testActor :: Message -> Actor
testActor (Message "hi") = return (Now (Message "bye!"))

test :: IO ()
test = do
r <- makeEventLoop "/tmp/a"
lref <- spawn r testActor
el1 <- makeEventLoop "a" "/tmp/a"
el2 <- makeEventLoop "b" "/tmp/b"
lref <- spawn el1 testActor
let msg = Message "hi"
reply <- invoke r lref msg
reply <- invoke el1 lref msg
print reply
quit r
a <- send el2 (localToRemoteRef "dummy" lref) msg
reply' <- wait a
print reply'
threadDelay 10000
quit el1
quit el2
36 changes: 27 additions & 9 deletions src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module StuntDouble.EventLoop.Event where

import Control.Concurrent.STM
Expand All @@ -15,30 +17,46 @@ data Event
| Receive Receive

eventName :: Event -> String
eventName (Command cmd) = "Command/" ++ commandName cmd
eventName (Command cmd) = "Command/" ++ commandName cmd
eventName (Response resp) = "Response/" ++ responseName resp

data Command
= Spawn (Message -> Actor) (TMVar LocalRef)
| Invoke LocalRef Message (TMVar Message)
| Send RemoteRef Message
| Send RemoteRef Message (TMVar (Async Message))
| Quit

commandName :: Command -> String
commandName Spawn {} = "Spawn"
commandName Spawn {} = "Spawn"
commandName Invoke {} = "Invoke"
commandName Send {} = "Send"
commandName Quit {} = "Quit"
commandName Send {} = "Send"
commandName Quit {} = "Quit"

data Response
= IOReady (Async IOResult)
-- Receive (Async Message) Message
| Reply (TMVar Message) Envelope

responseName :: Response -> String
responseName IOReady {} = "IOReady"
responseName Reply {} = "Reply"

data Receive
= Request Envelope

newtype CorrelationId = CorrelationId Int
deriving (Eq, Ord, Show, Read, Num, Enum)

data Envelope = Envelope
{ envelopeSender :: RemoteRef
, envelopeMessage :: Message
, envelopeReceiver :: RemoteRef
{ envelopeSender :: RemoteRef
, envelopeMessage :: Message
, envelopeReceiver :: RemoteRef
, envelopeCorrelationId :: CorrelationId
}
deriving (Eq, Show, Read)

reply :: Envelope -> Message -> Envelope
reply e reply =
e { envelopeSender = envelopeReceiver e
, envelopeMessage = reply
, envelopeReceiver = envelopeSender e
}
28 changes: 28 additions & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/InboundHandler.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module StuntDouble.EventLoop.InboundHandler where

import qualified Data.Map as Map
import Control.Monad
import Control.Exception
import Control.Concurrent.Async
import Control.Concurrent.STM

import StuntDouble.EventLoop.State
import StuntDouble.EventLoop.Event
import StuntDouble.EventLoop.Transport
import StuntDouble.Reference
import StuntDouble.Message

------------------------------------------------------------------------

handleInbound :: LoopState -> IO ()
handleInbound ls = forever go
where
go = do
e <- transportReceive (loopStateTransport ls)
atomically $ do
responses <- readTVar (loopStateResponses ls)
case Map.lookup (envelopeCorrelationId e) responses of
Nothing ->
writeTBQueue (loopStateQueue ls) (Receive (Request e))
Just respTMVar ->
writeTBQueue (loopStateQueue ls) (Response (Reply respTMVar e))
57 changes: 0 additions & 57 deletions src/runtime-prototype/src/StuntDouble/EventLoop/RequestHandler.hs

This file was deleted.

10 changes: 9 additions & 1 deletion src/runtime-prototype/src/StuntDouble/EventLoop/State.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module StuntDouble.EventLoop.State where

import Data.String
import Data.Map (Map)
import qualified Data.Map as Map
import Control.Concurrent.STM
Expand All @@ -13,16 +16,21 @@ import StuntDouble.Message

------------------------------------------------------------------------

newtype EventLoopName = EventLoopName { getEventLoopName :: String }
deriving IsString

data LoopState = LoopState
{ loopStatePids :: TVar [Async ()] -- | Holds the `Async`s (or PIDs) of the
{ loopStateName :: EventLoopName
, loopStatePids :: TVar [Async ()] -- | Holds the `Async`s (or PIDs) of the
-- event loop itself.
, loopStateQueue :: TBQueue Event
, loopStateActors :: TVar (Map LocalRef (Message -> Actor)) -- XXX: Only changed by main loop, so no need for STM?
-- , loopStateHandlers :: TVar (Map (Async Message) (Message -> Actor))
, loopStateIOHandlers :: TVar (Map (Async IOResult) (LocalRef, IOResult -> Actor))
, loopStateIOAsyncs :: TVar [Async IOResult]
, loopStateTransport :: Transport IO -- Will not change once created, so doesn't need STM?
, loopStateNextCorrelationId :: TVar CorrelationId
, loopStateResponses :: TVar (Map CorrelationId (TMVar Message))
}


Expand Down
41 changes: 39 additions & 2 deletions src/runtime-prototype/src/StuntDouble/EventLoop/Transport.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,47 @@
module StuntDouble.EventLoop.Transport where

import Control.Exception
import Control.Concurrent.Async
import System.IO
import System.IO.Error
import System.Posix.Files

import StuntDouble.EventLoop.Event
import StuntDouble.Reference
import StuntDouble.Message

------------------------------------------------------------------------

data Transport m = Transport
{ send :: Envelope -> m ()
, receive :: m Envelope
{ transportSend :: Envelope -> m ()
, transportReceive :: m Envelope
}

------------------------------------------------------------------------

namedPipeTransport :: FilePath -> IO (Transport IO)
namedPipeTransport fp = do
catchJust
(\e -> if isAlreadyExistsErrorType (ioeGetErrorType e)
then Just ()
else Nothing)
(createNamedPipe fp (namedPipeMode `unionFileModes`
ownerReadMode `unionFileModes`
ownerWriteMode))
return
h <- openFile fp ReadWriteMode
hSetBuffering h LineBuffering
return Transport { transportSend = hPutStrLn h . show
, transportReceive = fmap read (hGetLine h)
}

------------------------------------------------------------------------

test :: IO ()
test = do
t <- namedPipeTransport "/tmp/test_request.pipe"
let e = Envelope (RemoteRef "from" 0) (Message "msg") (RemoteRef "to" 1) 0
a <- async (transportSend t e)
e' <- transportReceive t
cancel a
assert (e' == e) (return ())
2 changes: 1 addition & 1 deletion src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ library
StuntDouble.EventLoop
StuntDouble.EventLoop.AsyncHandler
StuntDouble.EventLoop.Event
StuntDouble.EventLoop.RequestHandler
StuntDouble.EventLoop.InboundHandler
StuntDouble.EventLoop.State
StuntDouble.EventLoop.Transport
StuntDouble.FreeMonad
Expand Down

0 comments on commit 98ab4bc

Please sign in to comment.