Skip to content

Commit

Permalink
feat(runtime): extend message type with client requests
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jun 3, 2021
1 parent 849f2b2 commit d1188a5
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 192 deletions.
9 changes: 4 additions & 5 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data Resolution
= TimeoutR
| TimerR
| IOResultR IOResult
| MessageR Message
| InternalMessageR Message
| ExceptionR SomeException

data ActorF x
Expand Down Expand Up @@ -275,7 +275,6 @@ clientRequest ls lref msg = do
cref <- atomically (stateTVar (lsNextPromise ls) (\p -> (ClientRef (unPromise p), p + 1)))
returnVar <- newEmptyTMVarIO
respVar <- newEmptyTMVarIO
-- XXX: assoc cref respVar ls
atomically (modifyTVar' (lsAsyncState ls)
(\s -> s { asyncStateClientResponses =
Map.insert cref respVar (asyncStateClientResponses s) }))
Expand Down Expand Up @@ -354,7 +353,7 @@ act name as time transport s0 = foldM go s0 as
let respVar = asyncStateClientResponses s Map.! cref -- XXX: partial
atomically (putTMVar respVar msg)
return s
{ asyncStateClientResponses = Map.delete cref (asyncStateClientResponses s)}
{ asyncStateClientResponses = Map.delete cref (asyncStateClientResponses s) }

data Reaction
= Receive Promise Envelope
Expand All @@ -374,7 +373,7 @@ react (Receive p e) s =
RequestKind -> (Request e, s)
ResponseKind ->
case Map.lookup p (asyncStateContinuations s) of
Just (k, lref) -> (ResumeContinuation (k (MessageR (envelopeMessage e))) lref,
Just (k, lref) -> (ResumeContinuation (k (InternalMessageR (envelopeMessage e))) lref,
s { asyncStateContinuations =
Map.delete p (asyncStateContinuations s) })
Nothing ->
Expand Down Expand Up @@ -587,5 +586,5 @@ handleEvent (Admin cmd) ls = case cmd of
threadDelay 100000
mapM_ cancel pids
handleEvent (ClientRequestEvent lref msg cref returnVar) ls = do
reply <- actorPokeIO ls lref msg -- XXX: cref needs to be fed in here...
reply <- actorPokeIO ls lref (ClientRequest (getMessage msg) cref)
atomically (putTMVar returnVar reply)
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@ instance FromJSON RemoteRef
deriving instance Generic CorrelationId
instance ToJSON CorrelationId
instance FromJSON CorrelationId

deriving instance Generic ClientRef
instance ToJSON ClientRef
instance FromJSON ClientRef
11 changes: 9 additions & 2 deletions src/runtime-prototype/src/StuntDouble/Message.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
module StuntDouble.Message where

newtype Message = Message String
import StuntDouble.Reference

------------------------------------------------------------------------

data Message
= InternalMessage String
| ClientRequest String ClientRef
deriving (Eq, Show, Read)

getMessage :: Message -> String
getMessage (Message msg) = msg
getMessage (InternalMessage msg) = msg
getMessage (ClientRequest msg _cref) = msg
2 changes: 1 addition & 1 deletion src/runtime-prototype/src/StuntDouble/Reference.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ newtype EventLoopName = EventLoopName { getEventLoopName :: String }
deriving (Eq, Ord, Show, IsString)

newtype ClientRef = ClientRef Int
deriving (Eq, Ord, Show)
deriving (Eq, Ord, Show, Read)
1 change: 0 additions & 1 deletion src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ test-suite test
other-modules:
StuntDouble.ActorMapTest
StuntDouble.EventLoop.TransportTest
StuntDouble.EventLoopTest
StuntDouble.SchedulerTest
TastyDiscover
-- TODO(stevan: This doesn't work, because tasty-discovery finds the module
Expand Down
74 changes: 37 additions & 37 deletions src/runtime-prototype/test/StuntDouble/ActorMapTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,39 +32,39 @@ eventLoopA :: String -> EventLoopName
eventLoopA suffix = EventLoopName ("event-loop-actormap-a" ++ "-" ++ suffix)

testActor :: Message -> Actor
testActor (Message "hi") = Actor (return (Message "bye!"))
testActor (InternalMessage "hi") = Actor (return (InternalMessage "bye!"))

------------------------------------------------------------------------

unit_actorMapInvoke :: Assertion
unit_actorMapInvoke = withEventLoop (eventLoopA "invoke") $ \el _h -> do
lref <- spawn el testActor emptyState
reply <- ainvoke el lref (Message "hi")
reply @?= Message "bye!"
reply <- ainvoke el lref (InternalMessage "hi")
reply @?= InternalMessage "bye!"

unit_actorMapSend :: Assertion
unit_actorMapSend = do
let ev = eventLoopA "send"
withEventLoop ev $ \el _h -> do
lref <- spawn el testActor emptyState
let rref = localToRemoteRef ev lref
a <- asend el rref (Message "hi")
a <- asend el rref (InternalMessage "hi")
reply <- wait a
reply @?= Message "bye!"
reply @?= InternalMessage "bye!"

------------------------------------------------------------------------

testActor1 :: Message -> Actor
testActor1 (Message "inc") = Actor (return (Message "ack"))
testActor1 (InternalMessage "inc") = Actor (return (InternalMessage "ack"))

testActor2 :: RemoteRef -> Message -> Actor
testActor2 rref msg@(Message "inc") = Actor $ do
testActor2 rref msg@(InternalMessage "inc") = Actor $ do
p <- send rref msg
on p (\(MessageR (Message "ack")) -> modify (add "x" 1))
return (Message "inced")
testActor2 _rref (Message "sum") = Actor $ do
on p (\(InternalMessageR (InternalMessage "ack")) -> modify (add "x" 1))
return (InternalMessage "inced")
testActor2 _rref (InternalMessage "sum") = Actor $ do
s <- get
return (Message (show (getField "x" s)))
return (InternalMessage (show (getField "x" s)))

eventLoopB :: String -> EventLoopName
eventLoopB suffix = EventLoopName ("event-loop-actormap-b" ++ "-" ++ suffix)
Expand All @@ -81,61 +81,61 @@ unit_actorMapOnAndState = do
lref1 <- spawn elA testActor1 emptyState
let rref1 = localToRemoteRef evA lref1
lref2 <- spawn elB (testActor2 rref1) (stateFromList [("x", Integer 0)])
reply <- ainvoke elB lref2 (Message "inc")
reply @?= Message "inced"
reply <- ainvoke elB lref2 (InternalMessage "inc")
reply @?= InternalMessage "inced"
threadDelay 100000
reply2 <- ainvoke elB lref2 (Message "sum")
reply2 <- ainvoke elB lref2 (InternalMessage "sum")
quit elA
quit elB
return reply2)
(\(e :: SomeException) -> return (Message (show e)))
reply2 @?= Message "Integer 1"
(\(e :: SomeException) -> return (InternalMessage (show e)))
reply2 @?= InternalMessage "Integer 1"

------------------------------------------------------------------------

testActor3 :: Message -> Actor
testActor3 (Message "go") = Actor $ do
testActor3 (InternalMessage "go") = Actor $ do
p <- asyncIO (return (String "io done"))
on p (\(IOResultR (String "io done")) -> modify (add "x" 1))
return (Message "done")
return (InternalMessage "done")

unit_actorMapIO :: Assertion
unit_actorMapIO = withEventLoop (eventLoopA "io") $ \el _h -> do
lref <- spawn el testActor3 (stateFromList [("x", Integer 0)])
_done <- ainvoke el lref (Message "go")
_done <- ainvoke el lref (InternalMessage "go")
threadDelay 100000
s <- getActorState el lref
s @?= stateFromList [("x", Integer 1)]

testActor4 :: Message -> Actor
testActor4 (Message "go") = Actor $ do
testActor4 (InternalMessage "go") = Actor $ do
p <- asyncIO (error "failed")
on p (\(ExceptionR _exception) -> modify (add "x" 1))
return (Message "done")
return (InternalMessage "done")

unit_actorMapIOFail :: Assertion
unit_actorMapIOFail = withEventLoop (eventLoopA "io_fail") $ \el _h -> do
lref <- spawn el testActor4 (stateFromList [("x", Integer 0)])
_done <- ainvoke el lref (Message "go")
_done <- ainvoke el lref (InternalMessage "go")
threadDelay 100000
s <- getActorState el lref
s @?= stateFromList [("x", Integer 1)]

------------------------------------------------------------------------

testActor5 :: RemoteRef -> Message -> Actor
testActor5 rref (Message "go") = Actor $ do
p <- send rref (Message "hi")
testActor5 rref (InternalMessage "go") = Actor $ do
p <- send rref (InternalMessage "hi")
on p (\TimeoutR -> modify (add "x" 1))
return (Message "done")
return (InternalMessage "done")

unit_actorMapSendTimeout :: Assertion
unit_actorMapSendTimeout = do
let ev = eventLoopA "send_timeout"
withEventLoop ev $ \el h -> do
let rref = RemoteRef "doesnt_exist" 0
lref <- spawn el (testActor5 rref) (stateFromList [("x", Integer 0)])
_done <- ainvoke el lref (Message "go")
_done <- ainvoke el lref (InternalMessage "go")
-- Timeout happens after 60 seconds.
advanceFakeTime h 59
threadDelay 100000
Expand All @@ -149,40 +149,40 @@ unit_actorMapSendTimeout = do
------------------------------------------------------------------------

testActor6 :: Message -> Actor
testActor6 (Message "go") = Actor $ do
testActor6 (InternalMessage "go") = Actor $ do
d <- random
t <- getTime
return (Message (show d ++ " " ++ show t))
return (InternalMessage (show d ++ " " ++ show t))

unit_actorMapRandomAndTime :: Assertion
unit_actorMapRandomAndTime = do
let ev = eventLoopA "random_and_time"
withEventLoop ev $ \el h -> do
lref <- spawn el testActor6 emptyState
result <- ainvoke el lref (Message "go")
result @?= Message "0.9871468153391151 1970-01-01 00:00:00 UTC"
result <- ainvoke el lref (InternalMessage "go")
result @?= InternalMessage "0.9871468153391151 1970-01-01 00:00:00 UTC"
advanceFakeTime h 1
result2 <- ainvoke el lref (Message "go")
result2 @?= Message "6.761085639865827e-2 1970-01-01 00:00:01 UTC"
result2 <- ainvoke el lref (InternalMessage "go")
result2 @?= InternalMessage "6.761085639865827e-2 1970-01-01 00:00:01 UTC"

testActor7 :: Message -> Actor
testActor7 (Message "go") = Actor $ do
testActor7 (InternalMessage "go") = Actor $ do
p <- setTimer 10
on p (\TimerR -> modify (add "x" 1))
return (Message "done")
return (InternalMessage "done")

unit_actorMapTimer :: Assertion
unit_actorMapTimer = do
let ev = eventLoopA "timer"
withEventLoop ev $ \el h -> do
lref <- spawn el testActor7 (stateFromList [("x", Integer 0)])
_done <- ainvoke el lref (Message "go")
_done <- ainvoke el lref (InternalMessage "go")
-- Timer happens after 10 seconds.
advanceFakeTime h 9
threadDelay 10000
threadDelay 100000
s <- getActorState el lref
s @?= stateFromList [("x", Integer 0)]
advanceFakeTime h 1
threadDelay 10000
threadDelay 100000
s' <- getActorState el lref
s' @?= stateFromList [("x", Integer 1)]
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ unit_httpSendReceive = do
let port = 3001
url = "http://localhost:" ++ show port
catch (do t <- httpTransport port
let e = Envelope RequestKind (RemoteRef url 0) (Message "msg") (RemoteRef url 1) 0
let e = Envelope RequestKind (RemoteRef url 0) (InternalMessage "msg")
(RemoteRef url 1) 0
-- XXX: add better way to detect when http server is ready...
threadDelay 100000
a <- async (transportSend t e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import StuntDouble
unit_sendReceive :: IO ()
unit_sendReceive = do
t <- namedPipeTransport "/tmp" (EventLoopName "a")
let e = Envelope RequestKind (RemoteRef "from" 0) (Message "msg") (RemoteRef "a" 1) 0
let e = Envelope RequestKind (RemoteRef "from" 0) (InternalMessage "msg")
(RemoteRef "a" 1) 0
a <- async (transportSend t e)
e' <- transportReceive t
cancel a
Expand Down
Loading

0 comments on commit d1188a5

Please sign in to comment.