Skip to content

Commit

Permalink
fix(runtime): fix bug with put state not being persisted
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Sep 10, 2021
1 parent 437ebd6 commit 1aec066
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 42 deletions.
24 changes: 16 additions & 8 deletions src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ initState t s = SchedulerState
data Agenda = Agenda [SchedulerEvent]

instance ParseRow Agenda where
-- XXX: Text -> ByteString -> JSON, seems unnecessary? We only need the `at`
-- field for the heap priority, the rest could remain as a text and sent as
-- such to the executor?
parseRow [FText t] = case eitherDecodeStrict (Text.encodeUtf8 t) of
Right es -> Just (Agenda es)
Left err -> error (show err)
Expand All @@ -57,20 +60,26 @@ fakeScheduler executorRef (ClientRequest' "CreateTest" [SInt tid] cid) = Actor $
p <- asyncIO (IOQuery "SELECT agenda FROM test_info WHERE test_id = :tid" [":tid" := tid])
on p (\(IOResultR (IORows rs)) -> case parseRows rs of
Nothing -> clientResponse cid (InternalMessage "parse error")
Just [Agenda es] -> clientResponse cid (InternalMessage (show es)))
Just [Agenda es] -> do
modify $ \s -> s { heap = Heap.fromList (map (\e -> Entry (at e) e) es) }
clientResponse cid (InternalMessage (show es)))
return (InternalMessage "ok")
fakeScheduler executorRef (ClientRequest "Start" cid) = Actor $ do
fakeScheduler executorRef (ClientRequest' "Start" [] cid) = Actor $ do
-- pop agenda end send to executorRef
r <- Heap.uncons . heap <$> get
case r of
Just (Entry time cmd, heap') -> do
Just (Entry time e, heap') -> do
modify $ \s -> s { heap = heap'
, time = time
}
p <- send executorRef (InternalMessage (prettyCommand cmd))
on p (\(InternalMessageR (InternalMessage "Ack")) -> undefined)
undefined
Nothing -> return (InternalMessage "Done") -- XXX: reply to client id?!
clientResponse cid (InternalMessage (show e))
return (InternalMessage "ok")
-- p <- send executorRef (InternalMessage (prettyCommand cmd))
-- on p (\(InternalMessageR (InternalMessage "Ack")) -> undefined)
-- undefined
Nothing -> do
clientResponse cid (InternalMessage "empty heap")
return (InternalMessage "Done")
where
prettyCommand :: SchedulerEvent -> String
prettyCommand = undefined
Expand All @@ -79,7 +88,6 @@ fakeScheduler executorRef msg@(InternalMessage "Ack") = Actor $ do
-- does executor send back anything else?
-- schedule the responses from the executor back on the agenda

-- XXX: we need to make messages be able to have args/fields/payload
-- cmds <- parseCommands (payload msg)
-- if no cmds and agenda is empty then stop (how do we contact client? need to save cid?)
-- else
Expand Down
68 changes: 34 additions & 34 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ data ActorF s x
| Send RemoteRef Message (Promise -> x)
| AsyncIO IOOp (Promise -> x)
| Typeable s => On Promise (Resolution -> Free (ActorF s) ()) (() -> x)
| Get (s -> x)
| Put s (() -> x)
| Typeable s => Get (s -> x)
| Typeable s => Put s (() -> x)
| GetTime (UTCTime -> x)
| Random (Double -> x)
| SetTimer Time.NominalDiffTime (Promise -> x)
Expand All @@ -93,21 +93,15 @@ asyncIO io = Free (AsyncIO io return)
on :: Typeable s => Promise -> (Resolution -> Free (ActorF s) ()) -> Free (ActorF s) ()
on p k = Free (On p k return)

get :: Free (ActorF s) s
get :: Typeable s => Free (ActorF s) s
get = Free (Get return)

gets :: Text -> Free (ActorF s) Datatype
gets k = undefined -- getField k <$> get

put :: s -> Free (ActorF s) ()
put :: Typeable s => s -> Free (ActorF s) ()
put s' = Free (Put s' return)

modify :: (s -> s) -> Free (ActorF s) ()
modify :: Typeable s => (s -> s) -> Free (ActorF s) ()
modify f = put . f =<< get

update :: Text -> Datatype -> Free (ActorF s) ()
update k v = undefined -- modify (setField k v)

getTime :: Free (ActorF s) UTCTime
getTime = Free (GetTime return)

Expand Down Expand Up @@ -141,6 +135,14 @@ actorMapUnsafeLookup lref am = case actorMapLookup lref am of
Nothing -> error ("actorMapUnsafeLookup: `" ++ show lref ++ "' not in actor map.")
Just v -> v

actorMapUpdateState :: forall s. Typeable s => LocalRef -> s -> ActorMap -> ActorMap
actorMapUpdateState lref s' (ActorMap m) = ActorMap (Map.adjust f lref m)
where
f (ActorData a _s t) =
case cast s' of
Just s -> ActorData a s t
Nothing -> error "actorMapUpdateState: impossible, wrong type of state"

actorMapSpawn :: Typeable s => (Message -> Actor s) -> s -> Time -> ActorMap
-> (LocalRef, ActorMap)
actorMapSpawn a s t (ActorMap m) =
Expand All @@ -161,39 +163,40 @@ actorMapTurn :: LocalRef -> Message -> Promise -> UTCTime -> Seed -> ActorMap
-> ((Message, Promise, Seed, ActorMap, [Action]), ActorMap)
actorMapTurn lref msg p t seed am =
case actorMapUnsafeLookup lref am of
ActorData a s _ ->
(actorMapTurn' p [] lref t seed s (unActor (a msg)) am, am)
ActorData a _ _ ->
(actorMapTurn' p [] lref t seed (unActor (a msg)) am, am)

actorMapTurn' :: Promise -> [Action] -> LocalRef -> UTCTime -> Seed -> s -> Free (ActorF s) a
actorMapTurn' :: Promise -> [Action] -> LocalRef -> UTCTime -> Seed -> Free (ActorF s) a
-> ActorMap -> (a, Promise, Seed, ActorMap, [Action])
actorMapTurn' p acc _lref _t seed s (Pure msg) am = (msg, p, seed, am, reverse acc)
actorMapTurn' p acc lref t seed s (Free op) am = case op of
actorMapTurn' p acc _lref _t seed (Pure msg) am = (msg, p, seed, am, reverse acc)
actorMapTurn' p acc lref t seed (Free op) am = case op of
Invoke lref' msg k ->
case actorMapUnsafeLookup lref' am of
ActorData a' s' _ ->
ActorData a' _ _ ->
let (reply, p', seed', am', acc') =
actorMapTurn' p acc lref' t seed s' (unActor (a' msg)) am
actorMapTurn' p acc lref' t seed (unActor (a' msg)) am
in
actorMapTurn' p' acc' lref t seed' s (k reply) am'
actorMapTurn' p' acc' lref t seed' (k reply) am'
Send rref msg k ->
actorMapTurn' (p + 1) (SendAction lref msg rref p : acc) lref t seed s (k p) am
actorMapTurn' (p + 1) (SendAction lref msg rref p : acc) lref t seed (k p) am
AsyncIO io k ->
actorMapTurn' (p + 1) (AsyncIOAction io p : acc) lref t seed s (k p) am
actorMapTurn' (p + 1) (AsyncIOAction io p : acc) lref t seed (k p) am
On q c k ->
actorMapTurn' p (OnAction q c lref : acc) lref t seed s (k ()) am
Get k -> actorMapTurn' p acc lref t seed s (k s) am
Put s' k -> actorMapTurn' p acc lref t seed s' (k ()) am
actorMapTurn' p (OnAction q c lref : acc) lref t seed (k ()) am
Get k -> let s = fst (actorMapGetState lref am) in
actorMapTurn' p acc lref t seed (k s) am
Put s' k -> actorMapTurn' p acc lref t seed (k ()) (actorMapUpdateState lref s' am)
GetTime k -> do
actorMapTurn' p acc lref t seed s (k t) am
actorMapTurn' p acc lref t seed (k t) am
Random k ->
let
(d, seed') = interval seed
in
actorMapTurn' p acc lref t seed' s (k d) am
actorMapTurn' p acc lref t seed' (k d) am
SetTimer ndt k ->
actorMapTurn' (p + 1) (SetTimerAction ndt p : acc) lref t seed s (k p) am
actorMapTurn' (p + 1) (SetTimerAction ndt p : acc) lref t seed (k p) am
ClientResponse cref msg k ->
actorMapTurn' p (ClientResponseAction cref msg : acc) lref t seed s (k ()) am
actorMapTurn' p (ClientResponseAction cref msg : acc) lref t seed (k ()) am

actorMapPeek :: LocalRef -> Message -> Promise -> UTCTime -> Seed -> ActorMap
-> (Message, ActorMap)
Expand Down Expand Up @@ -257,11 +260,8 @@ actorPokeIO ls lref msg = do
act ls as
return reply

actorMapGetStateSTM :: Typeable s => LocalRef -> TVar ActorMap -> STM s
actorMapGetStateSTM lref am = stateTVar am (actorMapGetState lref)

actorMapGetStateIO :: Typeable s => LocalRef -> TVar ActorMap -> IO s
actorMapGetStateIO lref am = atomically (actorMapGetStateSTM lref am)
actorMapGetStateIO lref am = atomically (stateTVar am (actorMapGetState lref))

getActorState :: Typeable s => EventLoop -> LocalRef -> IO s
getActorState ls lref = actorMapGetStateIO lref (lsActorMap ls)
Expand Down Expand Up @@ -649,6 +649,7 @@ runHandlers seed0 hs = go seed0
(ix, seed') = randomR (0, length hss - 1) seed
in do
sequence_ (hss Vector.! ix)
threadDelay 100
go seed'

handleInbound :: EventLoop -> IO ()
Expand Down Expand Up @@ -772,8 +773,7 @@ handleEvent (Reaction r) ls = do
am <- readTVar (lsActorMap ls)
p <- readTVar (lsNextPromise ls)
seed <- readTVar (lsSeed ls)
s <- actorMapGetStateSTM lref (lsActorMap ls)
let ((), p', seed', am', as) = actorMapTurn' p [] lref now seed s a am
let ((), p', seed', am', as) = actorMapTurn' p [] lref now seed a am
writeTVar (lsActorMap ls) am'
writeTVar (lsNextPromise ls) p'
writeTVar (lsSeed ls) seed'
Expand Down

0 comments on commit 1aec066

Please sign in to comment.