From 65165b37375721fe2832bd2df39d288c3598d778 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Fri, 1 Oct 2021 10:30:44 +0200 Subject: [PATCH] feat(runtime): add logging of received replies --- src/runtime-prototype/src/Scheduler.hs | 19 ++++---- .../src/StuntDouble/ActorMap.hs | 48 +++++++++++-------- .../src/StuntDouble/Envelope.hs | 14 ++++-- src/runtime-prototype/src/StuntDouble/Log.hs | 19 +++++--- .../src/StuntDouble/LogicalTime.hs | 19 ++++++-- src/runtime-prototype/src/StuntDouble/Time.hs | 4 +- 6 files changed, 77 insertions(+), 46 deletions(-) diff --git a/src/runtime-prototype/src/Scheduler.hs b/src/runtime-prototype/src/Scheduler.hs index b6987384..aa58533e 100644 --- a/src/runtime-prototype/src/Scheduler.hs +++ b/src/runtime-prototype/src/Scheduler.hs @@ -81,7 +81,7 @@ instance ParseRow Agenda where Left err -> error (show err) parseRow x = error (show x) --- echo "{\"tag\":\"InternalMessage'\",\"contents\":[\"CreateTest\",[{\"tag\":\"SInt\",\"contents\":0}]]}" | http POST :3005 +-- echo "{\"tag\":\"InternalMessage'\",\"contents\":[\"CreateTest\",[{\"tag\":\"SInt\",\"contents\":0}]]}" | http POST :3005 && echo "{\"tag\":\"InternalMessage'\",\"contents\":[\"Start\",[]]}" | http POST :3005 fakeScheduler :: RemoteRef -> Message -> Actor SchedulerState fakeScheduler executorRef (ClientRequest' "CreateTest" [SInt tid] cid) = Actor $ do @@ -174,9 +174,9 @@ fakeScheduler executorRef (ClientRequest' "Start" [] cid) = fakeScheduler _ msg = error (show msg) -- XXX: Avoid going to string, not sure if we should use bytestring or text though? -entryToData :: Int -> Int -> UTCTime -> Bool -> LogEntry -> String -entryToData slt rlt rst d (LogSend _from (InternalMessage msg) _to _timestamp) - = addField "sent-logical-time" (show slt) -- XXX: we cannot use _timestamp +entryToData :: Int -> Int -> UTCTime -> Bool -> Timestamped LogEntry -> String +entryToData slt rlt rst d (Timestamped (LogSend _from _to (InternalMessage msg)) _logicalTimestamp _t) + = addField "sent-logical-time" (show slt) -- XXX: we cannot use _logicalTimestamp -- here, because its when the event -- loop sent the message to the -- executor rather than what we @@ -203,12 +203,13 @@ executorCodec = Codec encode decode decode bs = case eitherDecode bs of Right (ExecutorResponse evs corrId) -> Right $ Envelope - { envelopeKind = ResponseKind - , envelopeSender = RemoteRef "executor" 0 + { envelopeKind = ResponseKind + , envelopeSender = RemoteRef "executor" 0 -- XXX: going to sdatatype here seems suboptimal... - , envelopeMessage = InternalMessage' "Events" (map toSDatatype evs) - , envelopeReceiver = RemoteRef "scheduler" 0 - , envelopeCorrelationId = corrId + , envelopeMessage = InternalMessage' "Events" (map toSDatatype evs) + , envelopeReceiver = RemoteRef "scheduler" 0 + , envelopeCorrelationId = corrId + , envelopeLogicalTimestamp = LogicalTimestamp "executor" (-1) } Left err -> error err diff --git a/src/runtime-prototype/src/StuntDouble/ActorMap.hs b/src/runtime-prototype/src/StuntDouble/ActorMap.hs index 94b6f79c..8b887c26 100644 --- a/src/runtime-prototype/src/StuntDouble/ActorMap.hs +++ b/src/runtime-prototype/src/StuntDouble/ActorMap.hs @@ -259,8 +259,8 @@ actorPokeIO ls lref msg = do in ((reply, as, p', seed', l'), am') -logEvent :: EventLoop -> LogEntry -> IO () -logEvent ls e = atomically (modifyTVar (lsLog ls) (\(Log es) -> Log (e : es))) +logEvent :: TVar Log -> LogEntry -> LogicalTimestamp -> Timestamp -> IO () +logEvent l e lt t = atomically (modifyTVar l (appendLog e lt t)) logDump :: EventLoop -> IO String logDump ls = do @@ -349,13 +349,15 @@ act ls as = mapM_ go as go :: Action -> IO () go (SendAction from msg to p@(Promise i)) = do + lt <- timestamp (lsLogicalTime ls) -- XXX: What do we do if `transportSend` fails here? We should probably -- call the failure handler/continuation for this promise, if it exists. -- If it doesn't exist we probably want to crash the sender, i.e. `from`. transportSend (lsTransport ls) - (Envelope RequestKind (localToRemoteRef (lsName ls) from) msg to (CorrelationId i)) - t <- timestamp (lsLogicalTime ls) - logEvent ls (LogSend from msg to t) + (Envelope RequestKind (localToRemoteRef (lsName ls) from) msg to (CorrelationId i) lt) + lt <- timestamp (lsLogicalTime ls) + t <- getCurrentTime (lsTime ls) + logEvent (lsLog ls) (LogSend from to msg) lt t t <- getCurrentTime (lsTime ls) -- XXX: make it possible to specify when a send request should timeout. let timeoutAfter = Time.addUTCTime 60 t @@ -401,46 +403,48 @@ data ReactTask | forall s. Typeable s => ResumeContinuation (Free (ActorF s) ()) LocalRef | AdminSendResponse (TMVar Message) Message -react :: Reaction -> AsyncState -> (ReactTask, AsyncState) +react :: Reaction -> AsyncState -> ((ReactTask, Maybe (TimestampedLogically LogEntry)), AsyncState) react (Receive p e) s = case envelopeKind e of - RequestKind -> (Request e, s) + RequestKind -> ((Request e, Nothing), s) ResponseKind -> case Map.lookup p (asyncStateContinuations s) of Just (ResolutionClosure k, lref) -> - (ResumeContinuation (k (InternalMessageR (envelopeMessage e))) lref, + ((ResumeContinuation (k (InternalMessageR (envelopeMessage e))) lref, + Just (TimestampedLogically (LogResumeContinuation (envelopeSender e) lref (envelopeMessage e)) + (envelopeLogicalTimestamp e))), s { asyncStateContinuations = Map.delete p (asyncStateContinuations s) }) Nothing -> case Map.lookup p (asyncStateAdminSend s) of Nothing -> -- We got a response for something we are not (longer) waiting for. - (NothingToDo, s) + ((NothingToDo, Nothing), s) Just returnVar -> - (AdminSendResponse returnVar (envelopeMessage e), + ((AdminSendResponse returnVar (envelopeMessage e), Nothing), s { asyncStateAdminSend = Map.delete p (asyncStateAdminSend s) }) -react (SendTimeoutReaction a lref) s = (ResumeContinuation a lref, s) +react (SendTimeoutReaction a lref) s = ((ResumeContinuation a lref, Nothing), s) react (AsyncIOFinished p result) s = case Map.lookup p (asyncStateContinuations s) of Nothing -> -- No continuation was registered for this async. - (NothingToDo, s) + ((NothingToDo, Nothing), s) Just (ResolutionClosure k, lref) -> - (ResumeContinuation (k (IOResultR result)) lref, + ((ResumeContinuation (k (IOResultR result)) lref, Nothing), s { asyncStateContinuations = Map.delete p (asyncStateContinuations s) }) react (AsyncIOFailed p exception) s = case Map.lookup p (asyncStateContinuations s) of Nothing -> -- No continuation was registered for this async. - (NothingToDo, s) + ((NothingToDo, Nothing), s) Just (ResolutionClosure k, lref) -> - (ResumeContinuation (k (ExceptionR exception)) lref, + ((ResumeContinuation (k (ExceptionR exception)) lref, Nothing), s { asyncStateContinuations = Map.delete p (asyncStateContinuations s) }) -reactIO :: Reaction -> TVar AsyncState -> IO ReactTask +reactIO :: Reaction -> TVar AsyncState -> IO (ReactTask, Maybe (TimestampedLogically LogEntry)) reactIO r v = atomically (stateTVar v (react r)) ------------------------------------------------------------------------ @@ -792,7 +796,13 @@ handleEvents1 ls = do handleEvent :: Event -> EventLoop -> IO () handleEvent (Action a) ls = act ls [a] handleEvent (Reaction r) ls = do - m <- reactIO r (lsAsyncState ls) + (m, mle) <- reactIO r (lsAsyncState ls) + now <- getCurrentTime (lsTime ls) + case mle of + Just (TimestampedLogically le lt) -> do + lt' <- update (lsLogicalTime ls) lt + logEvent (lsLog ls) le lt' now + Nothing -> return () case m of NothingToDo -> return () Request e -> do @@ -801,7 +811,6 @@ handleEvent (Reaction r) ls = do -- XXX: return more than reply, and log event transportSend (lsTransport ls) (replyEnvelope e reply) ResumeContinuation a lref -> do - now <- getCurrentTime (lsTime ls) as <- atomically $ do am <- readTVar (lsActorMap ls) p <- readTVar (lsNextPromise ls) @@ -826,8 +835,9 @@ handleEvent (Admin cmd) ls = case cmd of atomically (putTMVar returnVar reply) AdminSend rref msg p returnVar -> do let dummyAdminRef = localToRemoteRef (lsName ls) (LocalRef (-1)) + lt <- timestamp (lsLogicalTime ls) transportSend (lsTransport ls) - (Envelope RequestKind dummyAdminRef msg rref (CorrelationId (unPromise p))) + (Envelope RequestKind dummyAdminRef msg rref (CorrelationId (unPromise p)) lt) atomically (modifyTVar' (lsAsyncState ls) (\as -> as { asyncStateAdminSend = Map.insert p returnVar (asyncStateAdminSend as) })) diff --git a/src/runtime-prototype/src/StuntDouble/Envelope.hs b/src/runtime-prototype/src/StuntDouble/Envelope.hs index 4e236161..e7cb7170 100644 --- a/src/runtime-prototype/src/StuntDouble/Envelope.hs +++ b/src/runtime-prototype/src/StuntDouble/Envelope.hs @@ -10,6 +10,7 @@ import Control.Concurrent.Async import StuntDouble.Message import StuntDouble.Reference +import StuntDouble.LogicalTime ------------------------------------------------------------------------ @@ -29,11 +30,14 @@ instance ToJSON EnvelopeKind instance FromJSON EnvelopeKind data Envelope = Envelope - { envelopeKind :: EnvelopeKind - , envelopeSender :: RemoteRef - , envelopeMessage :: Message - , envelopeReceiver :: RemoteRef - , envelopeCorrelationId :: CorrelationId + { envelopeKind :: EnvelopeKind + , envelopeSender :: RemoteRef + , envelopeMessage :: Message + , envelopeReceiver :: RemoteRef + , envelopeCorrelationId :: CorrelationId + , envelopeLogicalTimestamp :: LogicalTimestamp -- XXX: we don't need to send + -- the NodeName part, only the + -- integer part over the wire... } deriving (Generic, Eq, Show, Read) diff --git a/src/runtime-prototype/src/StuntDouble/Log.hs b/src/runtime-prototype/src/StuntDouble/Log.hs index b7891085..21ed8f86 100644 --- a/src/runtime-prototype/src/StuntDouble/Log.hs +++ b/src/runtime-prototype/src/StuntDouble/Log.hs @@ -4,15 +4,23 @@ import Control.Exception import StuntDouble.Message import StuntDouble.Reference +import StuntDouble.Time import StuntDouble.LogicalTime ------------------------------------------------------------------------ -newtype Log = Log [LogEntry] +newtype Log = Log [Timestamped LogEntry] + deriving Show + +data Timestamped a = Timestamped a LogicalTimestamp Timestamp + deriving Show + +data TimestampedLogically a = TimestampedLogically a LogicalTimestamp deriving Show data LogEntry - = LogSend LocalRef Message RemoteRef LogicalTimestamp + = LogSend LocalRef RemoteRef Message + | LogResumeContinuation RemoteRef LocalRef Message deriving Show {- = Spawned LocalRef @@ -40,17 +48,14 @@ data LogLines = YYY emptyLog :: Log emptyLog = Log [] -appendLog :: LogEntry -> Log -> Log -appendLog e (Log es) = Log (e : es) +appendLog :: LogEntry -> LogicalTimestamp -> Timestamp -> Log -> Log +appendLog e lt t (Log es) = Log (Timestamped e lt t : es) -- XXX: Use more efficient data structure to avoid having to reverse. -- XXX: better serialisation than show... getLog :: Log -> String getLog (Log es) = show (Log (reverse es)) -foldMapLog :: Monoid m => (LogEntry -> m) -> Log -> m -foldMapLog f (Log es) = foldMap f (reverse es) - {- type EventLog = [LogEntry] diff --git a/src/runtime-prototype/src/StuntDouble/LogicalTime.hs b/src/runtime-prototype/src/StuntDouble/LogicalTime.hs index 80a2800c..ef05b7c1 100644 --- a/src/runtime-prototype/src/StuntDouble/LogicalTime.hs +++ b/src/runtime-prototype/src/StuntDouble/LogicalTime.hs @@ -1,22 +1,31 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE DeriveGeneric #-} -- This module implements logical time via Lamport clocks, we don't need vector -- clocks because we can't have events happening concurrently anyway. module StuntDouble.LogicalTime where +import GHC.Generics (Generic) +import Data.Aeson (FromJSON, ToJSON) import Data.IORef import Data.String (IsString) ------------------------------------------------------------------------ newtype NodeName = NodeName String - deriving (Eq, Ord, IsString, Show) + deriving (Eq, Ord, IsString, Show, Read, Generic) + +instance ToJSON NodeName +instance FromJSON NodeName data LogicalTime = LogicalTime NodeName (IORef Int) data LogicalTimestamp = LogicalTimestamp NodeName Int - deriving Show + deriving (Show, Eq, Read, Generic) + +instance ToJSON LogicalTimestamp +instance FromJSON LogicalTimestamp data Relation = HappenedBeforeOrConcurrently | HappenedAfter @@ -38,6 +47,6 @@ timestamp (LogicalTime n c) = do return (LogicalTimestamp n t) -- Upon receving a timestamped message we should update our clock. -update :: LogicalTime -> LogicalTimestamp -> IO () -update (LogicalTime _n c) (LogicalTimestamp _n' t') = - atomicModifyIORef' c (\t -> (max t t' + 1, ())) +update :: LogicalTime -> LogicalTimestamp -> IO LogicalTimestamp +update (LogicalTime n c) (LogicalTimestamp _n' t') = + atomicModifyIORef' c (\t -> let t'' = max t t' + 1 in (t'', LogicalTimestamp n t'')) diff --git a/src/runtime-prototype/src/StuntDouble/Time.hs b/src/runtime-prototype/src/StuntDouble/Time.hs index 27b8daa0..c4a3ad15 100644 --- a/src/runtime-prototype/src/StuntDouble/Time.hs +++ b/src/runtime-prototype/src/StuntDouble/Time.hs @@ -6,8 +6,10 @@ import qualified Data.Time.Calendar.OrdinalDate as Time ------------------------------------------------------------------------ +type Timestamp = Time.UTCTime + data Time = Time - { getCurrentTime :: IO Time.UTCTime } + { getCurrentTime :: IO Timestamp } realTime :: Time realTime = Time Time.getCurrentTime