Skip to content

Commit

Permalink
feat(runtime): add logging of received replies
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 1, 2021
1 parent 2232b9a commit 65165b3
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 46 deletions.
19 changes: 10 additions & 9 deletions src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ instance ParseRow Agenda where
Left err -> error (show err)
parseRow x = error (show x)

-- echo "{\"tag\":\"InternalMessage'\",\"contents\":[\"CreateTest\",[{\"tag\":\"SInt\",\"contents\":0}]]}" | http POST :3005
-- echo "{\"tag\":\"InternalMessage'\",\"contents\":[\"CreateTest\",[{\"tag\":\"SInt\",\"contents\":0}]]}" | http POST :3005 && echo "{\"tag\":\"InternalMessage'\",\"contents\":[\"Start\",[]]}" | http POST :3005

fakeScheduler :: RemoteRef -> Message -> Actor SchedulerState
fakeScheduler executorRef (ClientRequest' "CreateTest" [SInt tid] cid) = Actor $ do
Expand Down Expand Up @@ -174,9 +174,9 @@ fakeScheduler executorRef (ClientRequest' "Start" [] cid) =
fakeScheduler _ msg = error (show msg)

-- XXX: Avoid going to string, not sure if we should use bytestring or text though?
entryToData :: Int -> Int -> UTCTime -> Bool -> LogEntry -> String
entryToData slt rlt rst d (LogSend _from (InternalMessage msg) _to _timestamp)
= addField "sent-logical-time" (show slt) -- XXX: we cannot use _timestamp
entryToData :: Int -> Int -> UTCTime -> Bool -> Timestamped LogEntry -> String
entryToData slt rlt rst d (Timestamped (LogSend _from _to (InternalMessage msg)) _logicalTimestamp _t)
= addField "sent-logical-time" (show slt) -- XXX: we cannot use _logicalTimestamp
-- here, because its when the event
-- loop sent the message to the
-- executor rather than what we
Expand All @@ -203,12 +203,13 @@ executorCodec = Codec encode decode
decode bs = case eitherDecode bs of
Right (ExecutorResponse evs corrId) -> Right $
Envelope
{ envelopeKind = ResponseKind
, envelopeSender = RemoteRef "executor" 0
{ envelopeKind = ResponseKind
, envelopeSender = RemoteRef "executor" 0
-- XXX: going to sdatatype here seems suboptimal...
, envelopeMessage = InternalMessage' "Events" (map toSDatatype evs)
, envelopeReceiver = RemoteRef "scheduler" 0
, envelopeCorrelationId = corrId
, envelopeMessage = InternalMessage' "Events" (map toSDatatype evs)
, envelopeReceiver = RemoteRef "scheduler" 0
, envelopeCorrelationId = corrId
, envelopeLogicalTimestamp = LogicalTimestamp "executor" (-1)
}
Left err -> error err

Expand Down
48 changes: 29 additions & 19 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ actorPokeIO ls lref msg = do
in
((reply, as, p', seed', l'), am')

logEvent :: EventLoop -> LogEntry -> IO ()
logEvent ls e = atomically (modifyTVar (lsLog ls) (\(Log es) -> Log (e : es)))
logEvent :: TVar Log -> LogEntry -> LogicalTimestamp -> Timestamp -> IO ()
logEvent l e lt t = atomically (modifyTVar l (appendLog e lt t))

logDump :: EventLoop -> IO String
logDump ls = do
Expand Down Expand Up @@ -349,13 +349,15 @@ act ls as = mapM_ go as

go :: Action -> IO ()
go (SendAction from msg to p@(Promise i)) = do
lt <- timestamp (lsLogicalTime ls)
-- XXX: What do we do if `transportSend` fails here? We should probably
-- call the failure handler/continuation for this promise, if it exists.
-- If it doesn't exist we probably want to crash the sender, i.e. `from`.
transportSend (lsTransport ls)
(Envelope RequestKind (localToRemoteRef (lsName ls) from) msg to (CorrelationId i))
t <- timestamp (lsLogicalTime ls)
logEvent ls (LogSend from msg to t)
(Envelope RequestKind (localToRemoteRef (lsName ls) from) msg to (CorrelationId i) lt)
lt <- timestamp (lsLogicalTime ls)
t <- getCurrentTime (lsTime ls)
logEvent (lsLog ls) (LogSend from to msg) lt t
t <- getCurrentTime (lsTime ls)
-- XXX: make it possible to specify when a send request should timeout.
let timeoutAfter = Time.addUTCTime 60 t
Expand Down Expand Up @@ -401,46 +403,48 @@ data ReactTask
| forall s. Typeable s => ResumeContinuation (Free (ActorF s) ()) LocalRef
| AdminSendResponse (TMVar Message) Message

react :: Reaction -> AsyncState -> (ReactTask, AsyncState)
react :: Reaction -> AsyncState -> ((ReactTask, Maybe (TimestampedLogically LogEntry)), AsyncState)
react (Receive p e) s =
case envelopeKind e of
RequestKind -> (Request e, s)
RequestKind -> ((Request e, Nothing), s)
ResponseKind ->
case Map.lookup p (asyncStateContinuations s) of
Just (ResolutionClosure k, lref) ->
(ResumeContinuation (k (InternalMessageR (envelopeMessage e))) lref,
((ResumeContinuation (k (InternalMessageR (envelopeMessage e))) lref,
Just (TimestampedLogically (LogResumeContinuation (envelopeSender e) lref (envelopeMessage e))
(envelopeLogicalTimestamp e))),
s { asyncStateContinuations =
Map.delete p (asyncStateContinuations s) })
Nothing ->
case Map.lookup p (asyncStateAdminSend s) of
Nothing ->
-- We got a response for something we are not (longer) waiting for.
(NothingToDo, s)
((NothingToDo, Nothing), s)
Just returnVar ->
(AdminSendResponse returnVar (envelopeMessage e),
((AdminSendResponse returnVar (envelopeMessage e), Nothing),
s { asyncStateAdminSend =
Map.delete p (asyncStateAdminSend s) })
react (SendTimeoutReaction a lref) s = (ResumeContinuation a lref, s)
react (SendTimeoutReaction a lref) s = ((ResumeContinuation a lref, Nothing), s)
react (AsyncIOFinished p result) s =
case Map.lookup p (asyncStateContinuations s) of
Nothing ->
-- No continuation was registered for this async.
(NothingToDo, s)
((NothingToDo, Nothing), s)
Just (ResolutionClosure k, lref) ->
(ResumeContinuation (k (IOResultR result)) lref,
((ResumeContinuation (k (IOResultR result)) lref, Nothing),
s { asyncStateContinuations =
Map.delete p (asyncStateContinuations s) })
react (AsyncIOFailed p exception) s =
case Map.lookup p (asyncStateContinuations s) of
Nothing ->
-- No continuation was registered for this async.
(NothingToDo, s)
((NothingToDo, Nothing), s)
Just (ResolutionClosure k, lref) ->
(ResumeContinuation (k (ExceptionR exception)) lref,
((ResumeContinuation (k (ExceptionR exception)) lref, Nothing),
s { asyncStateContinuations =
Map.delete p (asyncStateContinuations s) })

reactIO :: Reaction -> TVar AsyncState -> IO ReactTask
reactIO :: Reaction -> TVar AsyncState -> IO (ReactTask, Maybe (TimestampedLogically LogEntry))
reactIO r v = atomically (stateTVar v (react r))

------------------------------------------------------------------------
Expand Down Expand Up @@ -792,7 +796,13 @@ handleEvents1 ls = do
handleEvent :: Event -> EventLoop -> IO ()
handleEvent (Action a) ls = act ls [a]
handleEvent (Reaction r) ls = do
m <- reactIO r (lsAsyncState ls)
(m, mle) <- reactIO r (lsAsyncState ls)
now <- getCurrentTime (lsTime ls)
case mle of
Just (TimestampedLogically le lt) -> do
lt' <- update (lsLogicalTime ls) lt
logEvent (lsLog ls) le lt' now
Nothing -> return ()
case m of
NothingToDo -> return ()
Request e -> do
Expand All @@ -801,7 +811,6 @@ handleEvent (Reaction r) ls = do
-- XXX: return more than reply, and log event
transportSend (lsTransport ls) (replyEnvelope e reply)
ResumeContinuation a lref -> do
now <- getCurrentTime (lsTime ls)
as <- atomically $ do
am <- readTVar (lsActorMap ls)
p <- readTVar (lsNextPromise ls)
Expand All @@ -826,8 +835,9 @@ handleEvent (Admin cmd) ls = case cmd of
atomically (putTMVar returnVar reply)
AdminSend rref msg p returnVar -> do
let dummyAdminRef = localToRemoteRef (lsName ls) (LocalRef (-1))
lt <- timestamp (lsLogicalTime ls)
transportSend (lsTransport ls)
(Envelope RequestKind dummyAdminRef msg rref (CorrelationId (unPromise p)))
(Envelope RequestKind dummyAdminRef msg rref (CorrelationId (unPromise p)) lt)
atomically (modifyTVar' (lsAsyncState ls)
(\as -> as { asyncStateAdminSend =
Map.insert p returnVar (asyncStateAdminSend as) }))
Expand Down
14 changes: 9 additions & 5 deletions src/runtime-prototype/src/StuntDouble/Envelope.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Control.Concurrent.Async

import StuntDouble.Message
import StuntDouble.Reference
import StuntDouble.LogicalTime

------------------------------------------------------------------------

Expand All @@ -29,11 +30,14 @@ instance ToJSON EnvelopeKind
instance FromJSON EnvelopeKind

data Envelope = Envelope
{ envelopeKind :: EnvelopeKind
, envelopeSender :: RemoteRef
, envelopeMessage :: Message
, envelopeReceiver :: RemoteRef
, envelopeCorrelationId :: CorrelationId
{ envelopeKind :: EnvelopeKind
, envelopeSender :: RemoteRef
, envelopeMessage :: Message
, envelopeReceiver :: RemoteRef
, envelopeCorrelationId :: CorrelationId
, envelopeLogicalTimestamp :: LogicalTimestamp -- XXX: we don't need to send
-- the NodeName part, only the
-- integer part over the wire...
}
deriving (Generic, Eq, Show, Read)

Expand Down
19 changes: 12 additions & 7 deletions src/runtime-prototype/src/StuntDouble/Log.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ import Control.Exception

import StuntDouble.Message
import StuntDouble.Reference
import StuntDouble.Time
import StuntDouble.LogicalTime

------------------------------------------------------------------------

newtype Log = Log [LogEntry]
newtype Log = Log [Timestamped LogEntry]
deriving Show

data Timestamped a = Timestamped a LogicalTimestamp Timestamp
deriving Show

data TimestampedLogically a = TimestampedLogically a LogicalTimestamp
deriving Show

data LogEntry
= LogSend LocalRef Message RemoteRef LogicalTimestamp
= LogSend LocalRef RemoteRef Message
| LogResumeContinuation RemoteRef LocalRef Message
deriving Show
{-
= Spawned LocalRef
Expand Down Expand Up @@ -40,17 +48,14 @@ data LogLines = YYY
emptyLog :: Log
emptyLog = Log []

appendLog :: LogEntry -> Log -> Log
appendLog e (Log es) = Log (e : es)
appendLog :: LogEntry -> LogicalTimestamp -> Timestamp -> Log -> Log
appendLog e lt t (Log es) = Log (Timestamped e lt t : es)

-- XXX: Use more efficient data structure to avoid having to reverse.
-- XXX: better serialisation than show...
getLog :: Log -> String
getLog (Log es) = show (Log (reverse es))

foldMapLog :: Monoid m => (LogEntry -> m) -> Log -> m
foldMapLog f (Log es) = foldMap f (reverse es)

{-
type EventLog = [LogEntry]
Expand Down
19 changes: 14 additions & 5 deletions src/runtime-prototype/src/StuntDouble/LogicalTime.hs
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE DeriveGeneric #-}

-- This module implements logical time via Lamport clocks, we don't need vector
-- clocks because we can't have events happening concurrently anyway.

module StuntDouble.LogicalTime where

import GHC.Generics (Generic)
import Data.Aeson (FromJSON, ToJSON)
import Data.IORef
import Data.String (IsString)

------------------------------------------------------------------------

newtype NodeName = NodeName String
deriving (Eq, Ord, IsString, Show)
deriving (Eq, Ord, IsString, Show, Read, Generic)

instance ToJSON NodeName
instance FromJSON NodeName

data LogicalTime = LogicalTime NodeName (IORef Int)

data LogicalTimestamp = LogicalTimestamp NodeName Int
deriving Show
deriving (Show, Eq, Read, Generic)

instance ToJSON LogicalTimestamp
instance FromJSON LogicalTimestamp

data Relation = HappenedBeforeOrConcurrently | HappenedAfter

Expand All @@ -38,6 +47,6 @@ timestamp (LogicalTime n c) = do
return (LogicalTimestamp n t)

-- Upon receving a timestamped message we should update our clock.
update :: LogicalTime -> LogicalTimestamp -> IO ()
update (LogicalTime _n c) (LogicalTimestamp _n' t') =
atomicModifyIORef' c (\t -> (max t t' + 1, ()))
update :: LogicalTime -> LogicalTimestamp -> IO LogicalTimestamp
update (LogicalTime n c) (LogicalTimestamp _n' t') =
atomicModifyIORef' c (\t -> let t'' = max t t' + 1 in (t'', LogicalTimestamp n t''))
4 changes: 3 additions & 1 deletion src/runtime-prototype/src/StuntDouble/Time.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import qualified Data.Time.Calendar.OrdinalDate as Time

------------------------------------------------------------------------

type Timestamp = Time.UTCTime

data Time = Time
{ getCurrentTime :: IO Time.UTCTime }
{ getCurrentTime :: IO Timestamp }

realTime :: Time
realTime = Time Time.getCurrentTime
Expand Down

0 comments on commit 65165b3

Please sign in to comment.