Skip to content

Commit

Permalink
feat(sut): Dumblog add versions in the journal of which state-machine…
Browse files Browse the repository at this point in the history
… to run
  • Loading branch information
symbiont-daniel-gustafsson committed Mar 17, 2022
1 parent 0a248a3 commit a00ca8b
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/sut/dumblog/dumblog.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ library
Dumblog.Journal.Snapshot
Dumblog.Journal.StateMachine
Dumblog.Journal.Types
Dumblog.Journal.Versions.Codec
Dumblog.Journal.Worker
Dumblog.Metrics.Main
Dumblog.SQLite.Command
Expand Down
1 change: 1 addition & 0 deletions src/sut/dumblog/src/Dumblog/Journal/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import GHC.Generics (Generic)
data Envelope a = Envelope
{ eLength :: !Int
, eContent :: !a
, eVersion :: !Int64
, eArrival :: !Int64 -- Nano seconds since epoch.
} deriving stock (Functor, Generic)

Expand Down
7 changes: 4 additions & 3 deletions src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
module Dumblog.Journal.FrontEnd where

import Control.Concurrent.MVar (MVar, putMVar)
import Control.Monad (when)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
import Data.Int (Int64)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import qualified Data.Text.Read as TextReader
Expand All @@ -29,10 +29,11 @@ import Dumblog.Journal.Types

data FrontEndInfo = FrontEndInfo
{ blockers :: Blocker (Either Response Response)
, currentVersion :: Int64
}

httpFrontend :: Journal -> DumblogMetrics -> FrontEndInfo -> Wai.Application
httpFrontend journal metrics (FrontEndInfo blocker) req respond = do
httpFrontend journal metrics (FrontEndInfo blocker cVersion) req respond = do
body <- Wai.strictRequestBody req
let mmethod = case parseMethod $ Wai.requestMethod req of
Left err -> Left $ LBS.fromStrict err
Expand All @@ -54,7 +55,7 @@ httpFrontend journal metrics (FrontEndInfo blocker) req respond = do
Right cmd -> do
key <- newKey blocker
now <- getCurrentNanosSinceEpoch
let env = encode (Envelope (sequenceNumber key) cmd now)
let env = encode (Envelope (sequenceNumber key) cmd cVersion now)
res <- Journal.appendBS journal env
res' <- case res of
Left err -> do
Expand Down
1 change: 0 additions & 1 deletion src/sut/dumblog/src/Dumblog/Journal/Logger.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module Dumblog.Journal.Logger where

import Data.Foldable
import Data.Int
import Data.IORef
import Data.Sequence

Expand Down
30 changes: 17 additions & 13 deletions src/sut/dumblog/src/Dumblog/Journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Control.Concurrent.Async (link, withAsync)
import Control.Concurrent.MVar (MVar)
import Control.Exception (bracket_)
import qualified Data.Aeson as Aeson
import Data.Int (Int64)
import qualified Data.Text as Text
import Data.Text.Encoding (decodeUtf8)
import qualified Data.Text.Lazy as LText
Expand Down Expand Up @@ -55,8 +56,9 @@ import qualified Dumblog.Journal.Logger as DLogger
import Dumblog.Journal.Snapshot (Snapshot)
import qualified Dumblog.Journal.Snapshot as Snapshot
import Dumblog.Journal.StateMachine
(InMemoryDumblog, initState, runCommand)
(InMemoryDumblog, initState)
import Dumblog.Journal.Types (Command(..))
import Dumblog.Journal.Versions.Codec (runCommand)
import Dumblog.Journal.Worker (WorkerInfo(..), worker)

------------------------------------------------------------------------
Expand All @@ -78,30 +80,29 @@ fetchJournal mSnapshot fpj opts = do
writeBytesConsumed (jMetadata journal) Sub1 bytes
pure journal

-- this can be pure when `runCommand` gets pure
replay :: [Command] -> InMemoryDumblog -> IO InMemoryDumblog
replay :: [(Int64, Command)] -> InMemoryDumblog -> IO InMemoryDumblog
replay [] s = do
putStrLn "[REPLAY] finished!"
pure s
replay (cmd:cmds) s = do
replay ((v, cmd):cmds) s = do
putStrLn $ "[REPLAY] running: " <> show cmd
(s', _) <- runCommand DLogger.ioLogger s cmd
(s', _) <- runCommand v DLogger.ioLogger s cmd
replay cmds s'

type DebugFile = Vector InstanceStateRepr

-- TODO: merge with `replay`
replayDebug :: [Command] -> InMemoryDumblog -> IO DebugFile
replayDebug :: [(Int64, Command)] -> InMemoryDumblog -> IO DebugFile
replayDebug originCommands originState = do
queueLogger <- DLogger.newQueueLogger
go queueLogger 0 mempty originCommands originState
where
go _ _logTime dfile [] _s = do
putStrLn "[REPLAY-DEBUG] finished!"
pure dfile
go logger logTime dfile (cmd:cmds) s = do
go logger logTime dfile ((v, cmd):cmds) s = do
putStrLn $ "[REPLAY-DEBUG] running: " <> show cmd
(s', r) <- runCommand (DLogger.queueLogger logger) s cmd
(s', r) <- runCommand v (DLogger.queueLogger logger) s cmd
logLines <- DLogger.flushQueue logger
let
lbsToString = LText.unpack . LEncoding.decodeUtf8
Expand Down Expand Up @@ -130,7 +131,7 @@ replayDebug originCommands originState = do
}
go logger (succ logTime) (Vector.snoc dfile is) cmds s'

collectAll :: Journal -> IO [Command]
collectAll :: Journal -> IO [(Int64, Command)]
collectAll jour = do
putStrLn "[collect] Checking journal for old-entries"
val <- Journal.readJournal jour Sub1
Expand All @@ -140,9 +141,9 @@ collectAll jour = do
pure []
Just entry -> do
putStrLn "[collect] Found an entry"
let Envelope _key cmd _arrivalTime = decode entry
let Envelope _key cmd version _arrivalTime = decode entry
cmds <- collectAll jour
pure $ cmd : cmds
pure $ (version, cmd) : cmds

startingState :: Maybe Snapshot -> InMemoryDumblog
startingState Nothing = initState
Expand Down Expand Up @@ -182,6 +183,9 @@ dUMBLOG_JOURNAL = "/tmp/dumblog.journal"
dUMBLOG_SNAPSHOT :: FilePath
dUMBLOG_SNAPSHOT = "/tmp/dumblog.snapshot"

dUMBLOG_CURRENT_VERSION :: Int64
dUMBLOG_CURRENT_VERSION = 1 -- 1 has bug, 2 fixes it

journalDumblog :: DumblogConfig -> Int -> Int -> Maybe (MVar ()) -> IO ()
journalDumblog cfg _capacity port mReady = do
let fpj = dUMBLOG_JOURNAL
Expand All @@ -198,10 +202,10 @@ journalDumblog cfg _capacity port mReady = do
workerState <- replay cmds (startingState mSnapshot)
let
events = length cmds
feInfo = FrontEndInfo blocker
feInfo = FrontEndInfo blocker dUMBLOG_CURRENT_VERSION
logger | unHelpful q = DLogger.nullLogger
| otherwise = DLogger.ioLogger
wInfo = WorkerInfo blocker logger fps events untilSnapshot
wInfo = WorkerInfo blocker logger fps dUMBLOG_CURRENT_VERSION events untilSnapshot
withAsync (worker journal metrics wInfo workerState) $ \a -> do
link a
runFrontEnd port journal metrics feInfo mReady
Expand Down
2 changes: 2 additions & 0 deletions src/sut/dumblog/src/Dumblog/Journal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Dumblog.Journal.Snapshot where
import Data.Binary (Binary)
import qualified Data.Binary as Binary
import qualified Data.ByteString.Lazy as LBS
import Data.Int (Int64)
import GHC.Generics (Generic)

import qualified System.Directory as Dir
Expand All @@ -14,6 +15,7 @@ import Dumblog.Journal.StateMachine

data Snapshot = Snapshot
{ ssBytesInJournal :: Int
, ssVersion :: Int64
, ssState :: InMemoryDumblog
} deriving Generic

Expand Down
15 changes: 6 additions & 9 deletions src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import Data.Binary (Binary)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
import Data.Text.Encoding (decodeUtf8)
import Data.TreeDiff (ToExpr)
import Data.Sequence
import GHC.Generics (Generic)

import Dumblog.Journal.Logger
import Dumblog.Journal.Types
import Journal.Internal.Metrics (incrCounter)

------------------------------------------------------------------------

Expand All @@ -24,25 +22,24 @@ import Journal.Internal.Metrics (incrCounter)
data InMemoryDumblog = InMemoryDumblog
{ theLog :: Seq ByteString -- not very memory efficient, but not the point
, nextIx :: Int
, hasBug :: Bool
} deriving stock Generic

instance Binary InMemoryDumblog
instance ToExpr InMemoryDumblog

initState :: InMemoryDumblog
initState = InMemoryDumblog empty 0 False
initState = InMemoryDumblog empty 0

runCommand :: Logger -> InMemoryDumblog -> Command -> IO (InMemoryDumblog, Response)
runCommand logger state@(InMemoryDumblog appLog ix hasBug) cmd = case cmd of
runCommand :: Bool -> Logger -> InMemoryDumblog -> Command -> IO (InMemoryDumblog, Response)
runCommand hasBug logger state@(InMemoryDumblog appLog ix) cmd = case cmd of
Write bs -> do
logger "Performing a write"
pure (InMemoryDumblog (appLog |> {- DumblogByteString-} bs) (ix+1) hasBug, LBS8.pack (show ix))
pure (InMemoryDumblog (appLog |> bs) (ix+1), LBS8.pack (show ix))
Read i
| hasBug && ix == 3 -> do
logger "Weird reset happend"
pure (InMemoryDumblog empty 0 hasBug, LBS8.pack "Dumblog!")
| i < ix -> pure (state, LBS.fromStrict $ {-innerByteString-} (index appLog i))
pure (InMemoryDumblog empty 0, LBS8.pack "Dumblog!")
| i < ix -> pure (state, LBS.fromStrict (index appLog i))
| otherwise -> do
logger $ "Oh no, request not in log"
logger $ ("Max index is " ++ show (ix - 1))
Expand Down
14 changes: 14 additions & 0 deletions src/sut/dumblog/src/Dumblog/Journal/Versions/Codec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Dumblog.Journal.Versions.Codec where

import Data.Int(Int64)

import Dumblog.Journal.Logger (Logger)
import Dumblog.Journal.Types (Command, Response)
import Dumblog.Journal.StateMachine(InMemoryDumblog)
import qualified Dumblog.Journal.StateMachine as SM

runCommand :: Int64 -> Logger -> InMemoryDumblog -> Command -> IO (InMemoryDumblog, Response)
runCommand 1 = SM.runCommand True
-- this line will be added in demo!
-- runCommand 2 = SM.runCommand False
runCommand v = \_ _ cmd -> error ("Don't know how to run the command: " <> show cmd <> " in version: " <> show v)
14 changes: 8 additions & 6 deletions src/sut/dumblog/src/Dumblog/Journal/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ module Dumblog.Journal.Worker where

import Control.Concurrent (threadDelay)
import Control.Monad (unless)

import qualified Data.ByteString as BS
import Data.Int (Int64)
import qualified Journal.Internal.Metrics as Metrics
import qualified Journal.MP as Journal
import Journal.Types
Expand All @@ -22,15 +22,17 @@ import Dumblog.Journal.Blocker
import Dumblog.Journal.Codec
import Dumblog.Journal.Logger
import qualified Dumblog.Journal.Snapshot as Snapshot
import Dumblog.Journal.StateMachine
import Dumblog.Journal.StateMachine hiding (runCommand)
import Dumblog.Journal.Types
import Dumblog.Journal.Versions.Codec (runCommand)

------------------------------------------------------------------------

data WorkerInfo = WorkerInfo
{ wiBlockers :: Blocker (Either Response Response)
, wiLogger :: Logger
, wiSnapshotFile :: FilePath
, wiCurrentVersion :: Int64
, wiEvents :: Int -- how many events since last snapshot
, wiEventsInRound :: Int -- how many events in one snapshot
}
Expand All @@ -43,14 +45,14 @@ wakeUpFrontend blocker key resp = do
error $ "Frontend never added MVar"

worker :: Journal -> DumblogMetrics -> WorkerInfo -> InMemoryDumblog -> IO ()
worker journal metrics (WorkerInfo blocker logger snapshotFile eventCount untilSnapshot) =
worker journal metrics (WorkerInfo blocker logger snapshotFile currentVersion eventCount untilSnapshot) =
go eventCount
where
go ev s
| ev >= untilSnapshot = do
logger "[worker] Performing Snapshot"
bytes <- readBytesConsumed (jMetadata journal) Sub1
Snapshot.toFile (Snapshot.Snapshot bytes s) snapshotFile
Snapshot.toFile (Snapshot.Snapshot bytes currentVersion s) snapshotFile
writeBytesConsumed (jMetadata journal) Sub2 bytes
go 0 s
go ev s = do
Expand All @@ -59,14 +61,14 @@ worker journal metrics (WorkerInfo blocker logger snapshotFile eventCount untilS
{ Nothing -> return (ev, s)
; Just entry -> do
Metrics.decrCounter_ metrics QueueDepth 1
let Envelope key cmd arrivalTime = decode entry
let Envelope key cmd version arrivalTime = decode entry
-- XXX: In case of decode error:
-- Metrics.incrCounter metrics ErrorsEncountered 1
-- wakeUpFrontend blocker key $ Left "Couldn't parse request"
-- -- ^ should be better error message
--
!startTime <- getCurrentNanosSinceEpoch
(s', r) <- runCommand logger s cmd
(s', r) <- runCommand version logger s cmd
wakeUpFrontend blocker key (Right r)
!endTime <- getCurrentNanosSinceEpoch
-- Convert from nano s to µs with `* 10^-3`.
Expand Down

0 comments on commit a00ca8b

Please sign in to comment.