Skip to content

Commit

Permalink
feat(runtime): benchmark simple actor, plus fix a couple of bottlenecks
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jul 1, 2021
1 parent ee8b3d2 commit bc320dd
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 84 deletions.
7 changes: 6 additions & 1 deletion src/runtime-prototype/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ cabal test

```bash
cabal configure bench \
--enable-profiling \
--ghc-options='-threaded -rtsopts -with-rtsopts=-N'
cabal bench
cabal run bench -p -- +RTS -p -hm -RTS

less bench.prof
hp2ps -c bench.hp
evince bench.ps
```
129 changes: 93 additions & 36 deletions src/runtime-prototype/bench/Main.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}

-- This module is heavily inspired by Tyler Neely's sled benchmark:
-- https://github.com/spacejam/sled/blob/main/benchmarks/stress2/src/main.rs

Expand All @@ -7,25 +9,50 @@ import Control.Monad
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Time.Clock as Clock
import Data.Atomics.Counter
import Data.IORef

import StuntDouble

------------------------------------------------------------------------

client :: AtomicCounter -> IO ()
client total = forever go
scheduler :: Message -> Actor
-- XXX: use http frontend?
-- scheduler (ClientRequest "write" cid) = Actor $ do
scheduler (InternalMessage "write") = Actor $ do
s <- get
let Integer i = getField "i" s
p <- asyncIO (IOAppend (Index (fromInteger i)) (Value "blob"))
put (setField "i" (Integer (succ i)) s)
return (InternalMessage "ack")

------------------------------------------------------------------------

client :: EventLoop -> LocalRef -> AtomicCounter -> AtomicCounter -> IORef Bool
-> IO ()
client el lref total errors shutdown = go
where
go :: IO ()
go = do
-- generate client req
-- execute client req
-- XXX: remove
threadDelay 100000 -- 100ms
incrCounter_ 1 total
b <- readIORef shutdown
if b then return ()
else do
eReply <- try (ainvoke el lref (InternalMessage "write"))
:: IO (Either SomeException Message)
case eReply of
-- XXX: log error for debugging purposes?
Left _err -> incrCounter_ 1 errors
Right _reply -> return ()
incrCounter_ 1 total
go

-- | "Resident set size (RSS) is the portion of memory occupied by a process
-- that is held in main memory (RAM)." --
-- https://en.wikipedia.org/wiki/Resident_set_size
rss :: IO Double
rss = do
-- XXX: This will only work on linux.
ml <- try (readFile "/proc/self/statm")
case ml of
Left err -> do
Expand All @@ -37,31 +64,47 @@ rss = do
in
return (rssPages * 4096)

reporter :: AtomicCounter -> IO ()
reporter total = go 0
reporter :: AtomicCounter -> AtomicCounter -> IORef Bool -> IO ()
reporter total errors shutdown = go 0
where
go :: Int -> IO ()
go last = do
threadDelay 1000000 -- 1s
tot <- readCounter total
b <- rss
putStrLn (concat ["did ", show (tot - last), " ops, ",
show (b / (1024 * 1024)), "mb RSS"])
go tot

before :: Int -> AtomicCounter -> IO [Async ()]
before numberOfClients total =
mapM (\i -> async ((if i == 0 then reporter else client) total))
[0..numberOfClients]
b <- readIORef shutdown
if b then return ()
else do
threadDelay 1000000 -- 1s
tot <- readCounter total
err <- readCounter errors
b <- rss
putStrLn (concat ["did ", show (tot - last), " ops, ",
show (b / (1024 * 1024)), "mb RSS"])
-- XXX: last for errors also?
when (err /= 0) $
putStrLn (show err ++ " errors")
go tot

before :: Int -> AtomicCounter -> AtomicCounter -> IORef Bool
-> IO (EventLoop, LocalRef, [Async ()])
before numberOfClients total errors shutdown = do
el <- makeEventLoopThreaded SingleThreaded NoThreadPool
realTime (makeSeed 0) (Http 3003) (EventLoopName "bench")
lref <- spawn el scheduler (stateFromList [("i", Integer 0)])
workerPids <- forM [0..numberOfClients] $ \i -> do
async ((if i == 0 then reporter else client el lref) total errors shutdown)
return (el, lref, workerPids)

data StoppingCriteria
= MaxDurationInSecs Int
| MaxOperations Int
| WaitForCtrlCSignal

run :: AtomicCounter -> StoppingCriteria -> IO ()
run _total (MaxDurationInSecs s) = threadDelay (s * 1000000)
run total (MaxOperations maxOps) = go
run :: AtomicCounter -> StoppingCriteria -> IORef Bool -> IO ()
run _total (MaxDurationInSecs s) shutdown = do
threadDelay (s * 1000000)
writeIORef shutdown True
run _total WaitForCtrlCSignal shutdown =
threadDelay maxBound `finally` writeIORef shutdown True
run total (MaxOperations maxOps) shutdown = go
where
go :: IO ()
go = do
Expand All @@ -70,24 +113,38 @@ run total (MaxOperations maxOps) = go
then do
threadDelay (50 * 1000) -- 50 ms
go
else return ()
run _total WaitForCtrlCSignal = threadDelay maxBound
else writeIORef shutdown True

after :: AtomicCounter -> [Async ()] -> IO ()
after total pids = do
mapM_ cancel pids
after :: AtomicCounter -> AtomicCounter -> Clock.UTCTime
-> (EventLoop, LocalRef, [Async ()]) -> IO ()
after total errors t0 (el, _lref, pids) = do
now <- Clock.getCurrentTime
let duration = Clock.diffUTCTime now t0
quit el
mapM_ wait pids
tot <- readCounter total
err <- readCounter errors
putStrLn ""
putStrLn ("total ops: " ++ show tot ++ " ops")
putStrLn (concat ["total of ", show tot, " ops in ", show duration,
" (", show (round (realToFrac tot / realToFrac duration)),
" ops/s)"])
when (err /= 0) $
putStrLn ("total errors: " ++ show err)

main :: IO ()
main = do
-- spawn event loop
-- deploy SUT
n <- getNumCapabilities
putStrLn ("CPU capabilities: " ++ show n)
-- XXX: make it possilbe to configure these parameters via command line
-- arguments:
let numberOfClients = 4
stop = MaxOperations 100
total <- newCounter 0
stop = MaxDurationInSecs 10

total <- newCounter 0
errors <- newCounter 0
shutdown <- newIORef False
now <- Clock.getCurrentTime
bracket
(before numberOfClients total)
(after total)
(const (run total stop))
(before numberOfClients total errors shutdown)
(after total errors now)
(const (run total stop shutdown))
27 changes: 15 additions & 12 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.List
import Data.Foldable (toList)
import Data.Vector (Vector)
import qualified Data.Vector as Vector
import Data.Heap (Entry(Entry), Heap)
import qualified Data.Heap as Heap
import Data.Map (Map)
Expand All @@ -20,6 +23,7 @@ import Data.Set (Set)
import qualified Data.Set as Set
import Data.Time (UTCTime)
import qualified Data.Time as Time
import System.Random

import StuntDouble.Actor.State
import StuntDouble.Envelope
Expand Down Expand Up @@ -605,19 +609,18 @@ withEventLoop name k =
return x

runHandlers :: Seed -> [IO ()] -> IO ()
runHandlers s0 hs = go s0
runHandlers seed0 hs = go seed0
where
go s = do
s' <- stepHandlers s hs
go s'

stepHandlers :: Seed -> [IO ()] -> IO Seed
stepHandlers s hs =
let
(hs', s') = shuffle s hs
in do
sequence_ hs'
return s'
hss :: Vector [IO ()]
hss = Vector.fromList (permutations hs)

go :: Seed -> IO ()
go seed =
let
(ix, seed') = randomR (0, length hss - 1) seed
in do
sequence_ (hss Vector.! ix)
go seed'

handleInbound :: EventLoop -> IO ()
handleInbound = forever . handleInbound1
Expand Down
34 changes: 0 additions & 34 deletions src/runtime-prototype/src/StuntDouble/Random.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,3 @@ list s0 n0 g = go [] s0 n0
(x, s') = g s
in
go (x : acc) s' (n - 1)

shuffle :: Seed -> [a] -> ([a], Seed)
shuffle s [] = ([], s)
shuffle s xs =
let
l = length xs - 1
(rs, s') = make_rs l s
in
(shuffle1 xs rs, s')
where
-- The following code is taken from:
-- http://okmij.org/ftp/Haskell/perfect-shuffle.txt
shuffle1 :: [b] -> [Int] -> [b]
shuffle1 [e] [] = [e]
shuffle1 elements (r:r_others) =
let (b,rest) = extract r elements
in b:(shuffle1 rest r_others)
shuffle1 _ _ = error "shuffle1: impossible"

extract :: Int -> [a] -> (a, [a])
extract 0 (h:t) = (h, t)
extract j0 l = loop j0 l []
where
loop 0 (h:t) accum = (h, accum ++ t)
loop j (h:t) accum = loop (j-1) t (h:accum)
loop _ _ _ = error "loop: impossible"

make_rs :: RandomGen g => Int -> g -> ([Int],g)
make_rs n g = loop [] n g
where
loop acc 0 g = (reverse acc,g)
loop acc n g =
let (r,g') = randomR (0,n) g
in loop (r:acc) (pred n) g'
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ cleanUpNamedPipe fp name =
return

hMaybeGetLine :: Handle -> IO (Maybe String)
hMaybeGetLine = timeout 1000 . hGetLine
hMaybeGetLine = timeout 1 . hGetLine
1 change: 1 addition & 0 deletions src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ benchmark bench
, base
, stm
, stunt-double
, time

-- https://ghc.gitlab.haskell.org/ghc/doc/users_guide/using-concurrent.html
-- -with-rtsopts=-qa -with-rtsopts=-qm
Expand Down

0 comments on commit bc320dd

Please sign in to comment.