Skip to content

Commit

Permalink
feat(runtime): towards being able to handle client requests and respo…
Browse files Browse the repository at this point in the history
…nses
  • Loading branch information
symbiont-stevan-andjelkovic committed Jun 3, 2021
1 parent 92e7615 commit 849f2b2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
42 changes: 37 additions & 5 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ data ActorF x
| GetTime (UTCTime -> x)
| Random (Double -> x)
| SetTimer Time.NominalDiffTime (Promise -> x)
| ClientResponse ClientRef Message (() -> x)
-- XXX: Log?
-- XXX: Throw?
deriving instance Functor ActorF
Expand Down Expand Up @@ -101,6 +102,9 @@ random = Free (Random return)
setTimer :: Time.NominalDiffTime -> Free ActorF Promise
setTimer ndt = Free (SetTimer ndt return)

clientResponse :: ClientRef -> Message -> Free ActorF ()
clientResponse cref msg = Free (ClientResponse cref msg return)

------------------------------------------------------------------------

newtype ActorMap = ActorMap (Map LocalRef ActorData)
Expand Down Expand Up @@ -134,6 +138,7 @@ data Action
| AsyncIOAction (IO IOResult) Promise
| OnAction Promise (Resolution -> Free ActorF ()) LocalRef
| SetTimerAction Time.NominalDiffTime Promise
| ClientResponseAction ClientRef Message -- XXX: this doesn't really fit into action...

-- XXX: what about exceptions? transactional in state, but also in actions?!
actorMapTurn :: LocalRef -> Message -> Promise -> UTCTime -> Seed -> ActorMap
Expand Down Expand Up @@ -176,6 +181,8 @@ actorMapTurn' p acc lref t seed (Free op) am = case op of
actorMapTurn' p acc lref t seed' (k d) am
SetTimer ndt k ->
actorMapTurn' (p + 1) (SetTimerAction ndt p : acc) lref t seed (k p) am
ClientResponse cref msg k ->
actorMapTurn' p (ClientResponseAction cref msg : acc) lref t seed (k ()) am

actorMapPeek :: LocalRef -> Message -> Promise -> UTCTime -> Seed -> ActorMap
-> (Message, ActorMap)
Expand Down Expand Up @@ -263,6 +270,20 @@ asend ls rref msg = do
atomically (writeTBQueue (lsQueue ls) (Admin (AdminSend rref msg p returnVar)))
async (atomically (takeTMVar returnVar))

clientRequest :: EventLoop -> LocalRef -> Message -> IO (Message, Async Message)
clientRequest ls lref msg = do
cref <- atomically (stateTVar (lsNextPromise ls) (\p -> (ClientRef (unPromise p), p + 1)))
returnVar <- newEmptyTMVarIO
respVar <- newEmptyTMVarIO
-- XXX: assoc cref respVar ls
atomically (modifyTVar' (lsAsyncState ls)
(\s -> s { asyncStateClientResponses =
Map.insert cref respVar (asyncStateClientResponses s) }))
atomically (writeTBQueue (lsQueue ls) (ClientRequestEvent lref msg cref returnVar))
a <- async (atomically (takeTMVar respVar))
reply <- atomically (takeTMVar returnVar)
return (reply, a)

spawn :: EventLoop -> (Message -> Actor) -> State -> IO LocalRef
spawn ls a s = do
returnVar <- newEmptyTMVarIO
Expand All @@ -275,18 +296,19 @@ quit ls = atomically (writeTBQueue (lsQueue ls) (Admin Quit))
------------------------------------------------------------------------

data AsyncState = AsyncState
{ asyncStateAsyncIO :: Map (Async IOResult) Promise
, asyncStateContinuations :: Map Promise (Resolution -> Free ActorF (), LocalRef)
, asyncStateAdminSend :: Map Promise (TMVar Message)
, asyncStateTimeouts :: Heap (Entry UTCTime (TimeoutKind, Promise))
{ asyncStateAsyncIO :: Map (Async IOResult) Promise
, asyncStateContinuations :: Map Promise (Resolution -> Free ActorF (), LocalRef)
, asyncStateAdminSend :: Map Promise (TMVar Message)
, asyncStateTimeouts :: Heap (Entry UTCTime (TimeoutKind, Promise))
, asyncStateClientResponses :: Map ClientRef (TMVar Message)
}

data TimeoutKind
= SendTimeout
| TimerTimeout

emptyAsyncState :: AsyncState
emptyAsyncState = AsyncState Map.empty Map.empty Map.empty Heap.empty
emptyAsyncState = AsyncState Map.empty Map.empty Map.empty Heap.empty Map.empty

madePromises :: [Action] -> Set Int
madePromises = foldMap go
Expand All @@ -295,6 +317,7 @@ madePromises = foldMap go
go (AsyncIOAction _io (Promise i)) = Set.singleton i
go OnAction {} = Set.empty
go (SetTimerAction _ndt (Promise i)) = Set.singleton i
go ClientResponseAction {} = Set.empty

act :: EventLoopName -> [Action] -> Time -> Transport IO -> AsyncState -> IO AsyncState
act name as time transport s0 = foldM go s0 as
Expand Down Expand Up @@ -327,6 +350,11 @@ act name as time transport s0 = foldM go s0 as
let timeoutAfter = Time.addUTCTime ndt t
return s { asyncStateTimeouts =
Heap.insert (Entry timeoutAfter (TimerTimeout, p)) (asyncStateTimeouts s) }
go s (ClientResponseAction cref msg) = do
let respVar = asyncStateClientResponses s Map.! cref -- XXX: partial
atomically (putTMVar respVar msg)
return s
{ asyncStateClientResponses = Map.delete cref (asyncStateClientResponses s)}

data Reaction
= Receive Promise Envelope
Expand Down Expand Up @@ -385,6 +413,7 @@ data Event
= Action Action
| Reaction Reaction
| Admin Command
| ClientRequestEvent LocalRef Message ClientRef (TMVar Message)

data Command
= Spawn (Message -> Actor) State (TMVar LocalRef)
Expand Down Expand Up @@ -557,3 +586,6 @@ handleEvent (Admin cmd) ls = case cmd of
pids <- readTVarIO (lsPids ls)
threadDelay 100000
mapM_ cancel pids
handleEvent (ClientRequestEvent lref msg cref returnVar) ls = do
reply <- actorPokeIO ls lref msg -- XXX: cref needs to be fed in here...
atomically (putTMVar returnVar reply)
4 changes: 2 additions & 2 deletions src/runtime-prototype/src/StuntDouble/Log.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ newtype Log = Log [LogEntry]
data LogEntry
= Spawned LocalRef State
| Turn TurnData
| ClientRequest
| ClientResponse
| ClientRequestEntry
| ClientResponseEntry

data TurnData = TurnData
{ tdActor :: LocalRef
Expand Down
5 changes: 4 additions & 1 deletion src/runtime-prototype/src/StuntDouble/Reference.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Data.String

------------------------------------------------------------------------

data LocalRef = LocalRef Int
newtype LocalRef = LocalRef Int
deriving (Eq, Ord, Show)

data RemoteRef = RemoteRef
Expand All @@ -23,3 +23,6 @@ remoteToLocalRef = LocalRef . index

newtype EventLoopName = EventLoopName { getEventLoopName :: String }
deriving (Eq, Ord, Show, IsString)

newtype ClientRef = ClientRef Int
deriving (Eq, Ord, Show)

0 comments on commit 849f2b2

Please sign in to comment.