Skip to content

Commit

Permalink
feat(runtime): support remote calls that return now
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Apr 30, 2021
1 parent 62672c0 commit 12d9396
Showing 1 changed file with 40 additions and 5 deletions.
45 changes: 40 additions & 5 deletions src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ handleEvent (Response r) ls = handleResponse r ls
handleEvent (Receive r) ls = handleReceive r ls

dummyDeveloperRef :: LoopState -> RemoteRef
dummyDeveloperRef ls = RemoteRef (getEventLoopName (loopStateName ls)) 0
dummyDeveloperRef ls = RemoteRef (getEventLoopName (loopStateName ls)) dummyIndex
where
dummyIndex = -1

handleCommand :: Command -> LoopState -> IO ()
handleCommand (Spawn actor respVar) ls = atomically $ do
Expand Down Expand Up @@ -101,7 +103,9 @@ handleResponse :: Response -> LoopState -> IO ()
handleResponse (Reply respTMVar e) ls
| envelopeReceiver e == dummyDeveloperRef ls =
atomically (putTMVar respTMVar (envelopeMessage e))
| otherwise = undefined
| otherwise = do
-- XXX: Call Later continuation
undefined

data ActorNotFound = ActorNotFound RemoteRef
deriving Show
Expand All @@ -117,8 +121,10 @@ handleReceive (Request e) ls = do
-- going to known actors, i.e. that the remote refs are valid.
Nothing -> throwIO (ActorNotFound (envelopeReceiver e))
Just actor -> do
Now replyMsg <- runActor ls (actor (envelopeMessage e))
transportSend (loopStateTransport ls) (reply e replyMsg)
cont <- runActor ls (actor (envelopeMessage e))
case cont of
Now replyMsg -> transportSend (loopStateTransport ls) (reply e replyMsg)
Later async k -> _

runActor :: LoopState -> Free ActorF a -> IO a
runActor ls = iterM go return
Expand All @@ -129,7 +135,18 @@ runActor ls = iterM go return
Now reply <- runActor ls (actor msg)
k reply
go (RemoteCall rref msg k) = do
undefined
(corrId, respTMVar) <- atomically $ do
corrId <- readTVar (loopStateNextCorrelationId ls)
modifyTVar' (loopStateNextCorrelationId ls) succ
respTMVar <- newEmptyTMVar
modifyTVar' (loopStateResponses ls) (Map.insert corrId respTMVar)
return (corrId, respTMVar)
transportSend (loopStateTransport ls) (Envelope (dummyDeveloperRef ls) msg rref corrId)
a <- async $ atomically $ do
resp <- takeTMVar respTMVar -- XXX: timeout?
modifyTVar' (loopStateResponses ls) (Map.delete corrId)
return resp
k a
go (AsyncIO m k) = do
a <- async m
atomically (modifyTVar' (loopStateIOAsyncs ls) (a :))
Expand Down Expand Up @@ -163,6 +180,11 @@ send r rref msg = helper r (Send rref msg)
testActor :: Message -> Actor
testActor (Message "hi") = return (Now (Message "bye!"))

testActor2 :: RemoteRef -> Message -> Actor
testActor2 rref (Message "init") = do
a <- remoteCall rref (Message "hi")
return (Later a (\reply -> return (Now (Message "done"))))

test :: IO ()
test = do
el1 <- makeEventLoop "/tmp" "a"
Expand All @@ -177,3 +199,16 @@ test = do
threadDelay 10000
quit el1
quit el2

test2 :: IO ()
test2 = do
el1 <- makeEventLoop "/tmp" "a"
el2 <- makeEventLoop "/tmp" "b"
lref1 <- spawn el1 testActor
lref2 <- spawn el2 (testActor2 (localToRemoteRef "a" lref1))
a <- send el2 (localToRemoteRef "b" lref2) (Message "init")
done <- wait a
print done
threadDelay 10000
quit el1
quit el2

0 comments on commit 12d9396

Please sign in to comment.