Skip to content

Commit

Permalink
feat(sut): parametrise paths by port
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 25, 2022
1 parent 06827f2 commit 0f13e90
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 42 deletions.
4 changes: 2 additions & 2 deletions src/sut/dumblog/bench/journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import Common

main :: IO ()
main = do
removePathForcibly dUMBLOG_JOURNAL
removePathForcibly dUMBLOG_SNAPSHOT
removePathForcibly (dumblogJournalPath dUMBLOG_PORT)
removePathForcibly (dumblogSnapshotPath dUMBLOG_PORT)
commonMain "Journal" (journalDumblog quietRun bUFFER_CAPACITY . Just)
4 changes: 2 additions & 2 deletions src/sut/dumblog/src/Dumblog/Common/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import qualified Journal.Internal.Metrics as Metrics

------------------------------------------------------------------------

dUMBLOG_METRICS :: FilePath
dUMBLOG_METRICS = "/tmp/dumblog.metrics"
dumblogMetricsPath :: Int -> FilePath
dumblogMetricsPath port = "/tmp/dumblog-" ++ show port ++ ".metrics"

------------------------------------------------------------------------

Expand Down
64 changes: 33 additions & 31 deletions src/sut/dumblog/src/Dumblog/Journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import Options.Generic
import System.Directory (copyFile, getTemporaryDirectory, removeFile)
import System.FilePath ((<.>), (</>))

import Dumblog.Common.Metrics (dUMBLOG_METRICS, dumblogSchema)
import Dumblog.Common.Metrics (dumblogSchema, dumblogMetricsPath)
import Dumblog.Journal.Blocker (emptyBlocker)
import Dumblog.Journal.Codec (Envelope(..))
import Dumblog.Journal.FrontEnd (FrontEndInfo(..), runFrontEnd)
Expand Down Expand Up @@ -176,26 +176,23 @@ dumblogOptions = defaultOptions
, oTermBufferLength = 512 * 1024 * 1024
}

dUMBLOG_JOURNAL :: FilePath
dUMBLOG_JOURNAL = "/tmp/dumblog.journal"

dUMBLOG_SNAPSHOT :: FilePath
dUMBLOG_SNAPSHOT = "/tmp/dumblog.snapshot"

dUMBLOG_PORT :: Int
dUMBLOG_PORT = 8054

dumblogJournalPath :: Int -> FilePath
dumblogJournalPath port = "/tmp/dumblog-" ++ show port ++ ".journal"

dumblogSnapshotPath :: Int -> FilePath
dumblogSnapshotPath port = "/tmp/dumblog-" ++ show port ++ ".snapshot"

journalDumblog :: DumblogConfig -> Int -> Maybe (MVar ()) -> IO ()
journalDumblog cfg _capacity mReady = do
let fpj = dUMBLOG_JOURNAL
fpm = dUMBLOG_METRICS
fps = dUMBLOG_SNAPSHOT
untilSnapshot = 1000
case cfg of
Run q mPort -> do
mSnapshot <- Snapshot.readFile fps
journal <- fetchJournal mSnapshot fpj dumblogOptions
metrics <- Metrics.newMetrics dumblogSchema fpm
let port = fromMaybe dUMBLOG_PORT mPort
mSnapshot <- Snapshot.readFile (dumblogSnapshotPath port)
journal <- fetchJournal mSnapshot (dumblogJournalPath port) dumblogOptions
metrics <- Metrics.newMetrics dumblogSchema (dumblogMetricsPath port)
blocker <- emptyBlocker 0 -- it is okay to start over
cmds <- collectAll journal
workerState <- replay cmds (startingState mSnapshot)
Expand All @@ -204,20 +201,24 @@ journalDumblog cfg _capacity mReady = do
feInfo = FrontEndInfo blocker dUMBLOG_CURRENT_VERSION
logger | unHelpful q = DLogger.nullLogger
| otherwise = DLogger.ioLogger
wInfo = WorkerInfo blocker logger fps dUMBLOG_CURRENT_VERSION events untilSnapshot
untilSnapshot = 1000
wInfo = WorkerInfo blocker logger (dumblogSnapshotPath port)
dUMBLOG_CURRENT_VERSION events untilSnapshot
withAsync (worker journal metrics wInfo workerState) $ \a -> do
link a
runFrontEnd (fromMaybe dUMBLOG_PORT mPort) journal metrics feInfo mReady
DebugFile fp -> debugFile (unHelpful fp)
DebugFileWatch fp -> do
runFrontEnd port journal metrics feInfo mReady
DebugFile debugFile -> genDebugFile (dumblogJournalPath dUMBLOG_PORT) -- XXX: port is hardcoded.
(dumblogSnapshotPath dUMBLOG_PORT)
(unHelpful debugFile)
DebugFileWatch debugFile -> do
putStrLn "[journal]: waiting for journal changes..."
watch (unHelpful fp)
watch (dumblogJournalPath dUMBLOG_PORT) (dumblogSnapshotPath dUMBLOG_PORT) (unHelpful debugFile)

watch :: FilePath -> IO ()
watch fp = go 0
watch :: FilePath -> FilePath -> FilePath -> IO ()
watch journalFile snapshotFile debugFile = go 0
where
go lastBytesProduced = do
eMeta <- journalMetadata dUMBLOG_JOURNAL dumblogOptions
eMeta <- journalMetadata journalFile dumblogOptions
case eMeta of
Left err -> do
putStrLn ("[watch] error: " ++ show err)
Expand All @@ -228,7 +229,7 @@ watch fp = go 0
if bytesProduced /= lastBytesProduced
then do
putStrLn "[watch] journal has changed!"
debugFile fp
genDebugFile journalFile snapshotFile debugFile
go bytesProduced
else threadDelay 10000 >> go lastBytesProduced

Expand All @@ -249,14 +250,15 @@ watch fp = go 0
produced = termBeginPosition + fromIntegral termOffset
return produced

debugFile :: FilePath -> IO ()
debugFile fp = withTempCopy dUMBLOG_JOURNAL $ \fpjCopy -> do
mSnapshot <- Snapshot.readFile dUMBLOG_SNAPSHOT
journal <- fetchJournal mSnapshot fpjCopy dumblogOptions
cmds <- collectAll journal
debugFileContents <- replayDebug cmds (startingState mSnapshot)
Aeson.encodeFile fp debugFileContents
putStrLn "Generated Debug-file"
genDebugFile :: FilePath -> FilePath -> FilePath -> IO ()
genDebugFile journalFile snapshotFile debugFile =
withTempCopy journalFile $ \journalCopy -> do
mSnapshot <- Snapshot.readFile snapshotFile
journal <- fetchJournal mSnapshot journalCopy dumblogOptions
cmds <- collectAll journal
debugFileContents <- replayDebug cmds (startingState mSnapshot)
Aeson.encodeFile debugFile debugFileContents
putStrLn "Generated Debug-file"

withTempCopy :: FilePath -> (FilePath -> IO a) -> IO a
withTempCopy fp k = do
Expand Down
2 changes: 1 addition & 1 deletion src/sut/dumblog/src/Dumblog/Journal/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ worker journal metrics wi = go (wiEvents wi)
Metrics.measure metrics (case input of
ClientRequest (Write {}) -> ServiceTimeWrites
ClientRequest (Read {}) -> ServiceTimeReads
_othrwise -> error "impossible") serviceTime
_otherwise -> error "impossible") serviceTime
Metrics.measure metrics ResponseTime (latency + serviceTime)
case input of
ClientRequest (Write bs) -> Metrics.measure metrics WriteSize (realToFrac (LBS.length bs))
Expand Down
10 changes: 5 additions & 5 deletions src/sut/dumblog/src/Dumblog/Metrics/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ initThroughputState = do
throughputAvg :: ThroughputState -> Double
throughputAvg ts = tsSum ts / realToFrac (tsIterations ts)

metricsMain :: IO ()
metricsMain = do
metricsMain :: Int -> IO ()
metricsMain port = do
setLocaleEncoding utf8 -- Otherwise we can't print µ...
removePathForcibly dUMBLOG_METRICS
removePathForcibly (dumblogMetricsPath port)
ts <- initThroughputState
go ts
where
go :: ThroughputState -> IO ()
go ts = do
metrics <- newMetrics dumblogSchema dUMBLOG_METRICS
eMeta <- journalMetadata dUMBLOG_JOURNAL dumblogOptions
metrics <- newMetrics dumblogSchema (dumblogMetricsPath port)
eMeta <- journalMetadata (dumblogJournalPath port) dumblogOptions

-- Only needed on MacOS it seems.
msyncMetrics metrics
Expand Down
2 changes: 1 addition & 1 deletion src/sut/dumblog/src/Dumblog/SQLite/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Journal.Internal.Metrics
sqliteDumblog :: Int -> Int -> Maybe (MVar ()) -> IO ()
sqliteDumblog capacity port mReady = do
queue <- newTBQueueIO (fromIntegral capacity)
metrics <- newMetrics dumblogSchema dUMBLOG_METRICS
metrics <- newMetrics dumblogSchema (dumblogMetricsPath port)
bracket initDB closeDB $ \conn ->
withAsync (worker queue metrics conn) $ \_async ->
runFrontEnd queue metrics port mReady
Expand Down

0 comments on commit 0f13e90

Please sign in to comment.