From 9084a00abfbb7c72e8d4c647d73563faa140671c Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Mon, 14 Mar 2022 14:19:35 +0100 Subject: [PATCH] feat(sut): Add a `Logger` to dumblog --- src/sut/dumblog/bench/journal/Main.hs | 2 +- src/sut/dumblog/dumblog.cabal | 1 + src/sut/dumblog/src/Dumblog/Journal/Logger.hs | 26 +++++++++++++++++ src/sut/dumblog/src/Dumblog/Journal/Main.hs | 28 +++++++++++++------ .../src/Dumblog/Journal/StateMachine.hs | 14 +++++++--- src/sut/dumblog/src/Dumblog/Journal/Worker.hs | 8 ++++-- 6 files changed, 62 insertions(+), 17 deletions(-) create mode 100644 src/sut/dumblog/src/Dumblog/Journal/Logger.hs diff --git a/src/sut/dumblog/bench/journal/Main.hs b/src/sut/dumblog/bench/journal/Main.hs index 8b3e1e0c..30c719f5 100644 --- a/src/sut/dumblog/bench/journal/Main.hs +++ b/src/sut/dumblog/bench/journal/Main.hs @@ -12,4 +12,4 @@ main :: IO () main = do removePathForcibly dUMBLOG_JOURNAL removePathForcibly dUMBLOG_SNAPSHOT - commonMain "Journal" (journalDumblog Run bUFFER_CAPACITY pORT . Just) + commonMain "Journal" (journalDumblog quietRun bUFFER_CAPACITY pORT . Just) diff --git a/src/sut/dumblog/dumblog.cabal b/src/sut/dumblog/dumblog.cabal index aa3dfd37..8de59622 100644 --- a/src/sut/dumblog/dumblog.cabal +++ b/src/sut/dumblog/dumblog.cabal @@ -67,6 +67,7 @@ library Dumblog.Journal.Blocker Dumblog.Journal.Codec Dumblog.Journal.FrontEnd + Dumblog.Journal.Logger Dumblog.Journal.Main Dumblog.Journal.Metrics Dumblog.Journal.Snapshot diff --git a/src/sut/dumblog/src/Dumblog/Journal/Logger.hs b/src/sut/dumblog/src/Dumblog/Journal/Logger.hs new file mode 100644 index 00000000..f0e91413 --- /dev/null +++ b/src/sut/dumblog/src/Dumblog/Journal/Logger.hs @@ -0,0 +1,26 @@ +module Dumblog.Journal.Logger where + +import Data.Foldable +import Data.Int +import Data.IORef +import Data.Sequence + +type Logger = LogItem -> IO () + +nullLogger :: Logger +nullLogger = const (pure ()) + +ioLogger :: Logger +ioLogger = putStrLn + +type LogItem = String +newtype QueueLogger = QueueLogger (IORef (Seq LogItem)) + +newQueueLogger :: IO QueueLogger +newQueueLogger = QueueLogger <$> newIORef empty + +queueLogger :: QueueLogger -> Logger +queueLogger (QueueLogger ref) = \logItem -> atomicModifyIORef' ref $ \xs -> (xs |> logItem, ()) + +flushQueue :: QueueLogger -> IO [LogItem] +flushQueue (QueueLogger ref) = atomicModifyIORef' ref $ \xs -> (empty, toList xs) diff --git a/src/sut/dumblog/src/Dumblog/Journal/Main.hs b/src/sut/dumblog/src/Dumblog/Journal/Main.hs index 70946421..a8fa2183 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/Main.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/Main.hs @@ -35,6 +35,7 @@ import Options.Generic import Dumblog.Journal.Blocker (emptyBlocker) import Dumblog.Journal.Codec (Envelope(..), decode) import Dumblog.Journal.FrontEnd (FrontEndInfo(..), runFrontEnd) +import qualified Dumblog.Journal.Logger as DLogger import Dumblog.Journal.Metrics (dumblogSchema) import Dumblog.Journal.Snapshot (Snapshot) import qualified Dumblog.Journal.Snapshot as Snapshot @@ -69,21 +70,24 @@ replay [] s = do pure s replay (cmd:cmds) s = do putStrLn $ "[REPLAY] running: " <> show cmd - (s', _) <- runCommand s cmd + (s', _) <- runCommand DLogger.ioLogger s cmd replay cmds s' type DebugFile = Vector InstanceStateRepr -- TODO: merge with `replay` replayDebug :: [Command] -> InMemoryDumblog -> IO DebugFile -replayDebug = go 0 mempty +replayDebug originCommands originState = do + queueLogger <- DLogger.newQueueLogger + go queueLogger 0 mempty originCommands originState where - go _logTime dfile [] _s = do + go _ _logTime dfile [] _s = do putStrLn "[REPLAY-DEBUG] finished!" pure dfile - go logTime dfile (cmd:cmds) s = do + go logger logTime dfile (cmd:cmds) s = do putStrLn $ "[REPLAY-DEBUG] running: " <> show cmd - (s', _) <- runCommand s cmd + (s', _) <- runCommand (DLogger.queueLogger logger) s cmd + logLines <- DLogger.flushQueue logger let (ev, msg) = case cmd of Read i -> ("read", show i) @@ -98,10 +102,10 @@ replayDebug = go 0 mempty is = InstanceStateRepr { state = LText.unpack (LEncoding.decodeUtf8 (Aeson.encode (mergePatch (Aeson.toJSON s) (Aeson.toJSON s')))) , currentEvent = ce - , logs = [] + , logs = logLines , sent = [] } - go (succ logTime) (Vector.snoc dfile is) cmds s' + go logger (succ logTime) (Vector.snoc dfile is) cmds s' collectAll :: Journal -> IO [Command] collectAll jour = do @@ -123,6 +127,7 @@ startingState (Just snap) = Snapshot.ssState snap data DumblogConfig = Run + { quiet :: Bool "Should we suppress program log messages"} | DebugFile { output :: FilePath "Where to output the debug file" } @@ -130,6 +135,9 @@ data DumblogConfig instance ParseRecord DumblogConfig +quietRun :: DumblogConfig +quietRun = Run (Helpful True) + {- Unclear how to: * How to archive the journal @@ -158,7 +166,7 @@ journalDumblog cfg _capacity port mReady = do fps = dUMBLOG_SNAPSHOT untilSnapshot = 1000 case cfg of - Run -> do + Run q -> do mSnapshot <- Snapshot.readFile fps journal <- fetchJournal mSnapshot fpj dumblogOptions metrics <- Metrics.newMetrics dumblogSchema fpm @@ -168,7 +176,9 @@ journalDumblog cfg _capacity port mReady = do let events = length cmds feInfo = FrontEndInfo blocker - wInfo = WorkerInfo blocker fps events untilSnapshot + logger | unHelpful q = DLogger.nullLogger + | otherwise = DLogger.ioLogger + wInfo = WorkerInfo blocker logger fps events untilSnapshot withAsync (worker journal metrics wInfo workerState) $ \a -> do link a runFrontEnd port journal metrics feInfo mReady diff --git a/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs b/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs index 6f17cd83..20beef98 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs @@ -14,6 +14,7 @@ import Data.Text.Encoding (decodeUtf8) import Data.Sequence import GHC.Generics (Generic) +import Dumblog.Journal.Logger import Dumblog.Journal.Types import Dumblog.Journal.Metrics import Journal.Internal.Metrics (incrCounter) @@ -40,10 +41,15 @@ instance ToJSON InMemoryDumblog initState :: InMemoryDumblog initState = InMemoryDumblog empty 0 -runCommand :: InMemoryDumblog -> Command -> IO (InMemoryDumblog, Response) -runCommand state@(InMemoryDumblog appLog ix) cmd = case cmd of - Write bs -> pure (InMemoryDumblog (appLog |> DumblogByteString bs) (ix+1), LBS8.pack (show ix)) +runCommand :: Logger -> InMemoryDumblog -> Command -> IO (InMemoryDumblog, Response) +runCommand logger state@(InMemoryDumblog appLog ix) cmd = case cmd of + Write bs -> do + logger "Performing a write" + pure (InMemoryDumblog (appLog |> DumblogByteString bs) (ix+1), LBS8.pack (show ix)) Read i | i < ix -> pure (state, LBS.fromStrict $ innerByteString (index appLog i)) - | otherwise -> pure (state, "Transaction not in the store!") + | otherwise -> do + logger $ "Oh no, request not in log" + logger $ ("Max index is " ++ show (ix - 1)) + pure (state, "Transaction not in the store!") -- ^ XXX: we probably should really signal failure diff --git a/src/sut/dumblog/src/Dumblog/Journal/Worker.hs b/src/sut/dumblog/src/Dumblog/Journal/Worker.hs index 79a3d85c..56621293 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/Worker.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/Worker.hs @@ -18,6 +18,7 @@ import Journal.Types import Dumblog.Journal.Blocker import Dumblog.Journal.Codec +import Dumblog.Journal.Logger import Dumblog.Journal.Metrics import qualified Dumblog.Journal.Snapshot as Snapshot import Dumblog.Journal.StateMachine @@ -27,6 +28,7 @@ import Dumblog.Journal.Types data WorkerInfo = WorkerInfo { wiBlockers :: Blocker (Either Response Response) + , wiLogger :: Logger , wiSnapshotFile :: FilePath , wiEvents :: Int -- how many events since last snapshot , wiEventsInRound :: Int -- how many events in one snapshot @@ -40,12 +42,12 @@ wakeUpFrontend blocker key resp = do error $ "Frontend never added MVar" worker :: Journal -> DumblogMetrics -> WorkerInfo -> InMemoryDumblog -> IO () -worker journal metrics (WorkerInfo blocker snapshotFile eventCount untilSnapshot) = +worker journal metrics (WorkerInfo blocker logger snapshotFile eventCount untilSnapshot) = go eventCount where go ev s | ev >= untilSnapshot = do - putStrLn $ "[worker] Performing Snapshot" + logger "[worker] Performing Snapshot" bytes <- readBytesConsumed (jMetadata journal) Sub1 Snapshot.toFile (Snapshot.Snapshot bytes s) snapshotFile writeBytesConsumed (jMetadata journal) Sub2 bytes @@ -63,7 +65,7 @@ worker journal metrics (WorkerInfo blocker snapshotFile eventCount untilSnapshot -- -- ^ should be better error message -- !startTime <- getCurrentNanosSinceEpoch - (s', r) <- runCommand s cmd + (s', r) <- runCommand logger s cmd wakeUpFrontend blocker key (Right r) !endTime <- getCurrentNanosSinceEpoch -- Convert from nano s to µs with `* 10^-3`.