Skip to content

Commit

Permalink
fix(runtime): fix bug in get in the MP case
Browse files Browse the repository at this point in the history
Spin lock without yield or sleep...
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 27, 2021
1 parent a630761 commit 68bcbcb
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 71 deletions.
88 changes: 88 additions & 0 deletions src/runtime-prototype/bench/disruptor/MP.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
{-# LANGUAGE NumericUnderscores #-}

module Main where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Monad
import Data.IORef
import Data.Int
import Data.Time
import System.Mem (performGC)
import Text.Printf

import Disruptor.MP.Consumer
import Disruptor.MP.Producer
import Disruptor.MP.RingBuffer
import Disruptor.SequenceNumber
import StuntDouble.AtomicCounterPadded
import StuntDouble.Histogram.SingleProducer

------------------------------------------------------------------------

iTERATIONS :: Int64
iTERATIONS = 100_000_000

main :: IO ()
main = do
n <- getNumCapabilities
printf "%-25.25s%10d\n" "CPU capabilities" n
printf "%-25.25s%10d\n" "Total number of events" iTERATIONS
mapM_ (\i -> printf "%s %d:\n" "Run" i >> once) [(0 :: Int)..7]

once :: IO ()
once = do
let ringBufferCapacity = 1024 * 64
rb <- newRingBuffer ringBufferCapacity
-- histo <- newHistogram
-- transactions <- newCounter 0
consumerFinished <- newEmptyMVar

let ep = EventProducer (const (go iTERATIONS)) ()
where
go :: Int64 -> IO ()
go 0 = return ()
go n = do
mSnr <- tryNext rb
case mSnr of
Some snr -> do
-- {-# SCC "transactions+1" #-} incrCounter_ 1 transactions
set rb snr (1 :: Int)
publish rb snr
go (n - 1)
None -> do
threadDelay 1
go n

let handler _s _n snr endOfBatch = do
-- t' <- {-# SCC "transactions-1" #-} decrCounter 1 transactions
-- measureInt_ t' histo
when (endOfBatch && getSequenceNumber snr == iTERATIONS - 1) $
putMVar consumerFinished ()
return ()

ec <- newEventConsumer rb handler () [] (Sleep 1)
setGatingSequences rb [ecSequenceNumber ec]

performGC
start <- getCurrentTime
withEventProducer ep $ \aep ->
withEventConsumer ec $ \aec -> do
() <- takeMVar consumerFinished
end <- getCurrentTime
cancel aep
cancel aec
let duration :: Double
duration = realToFrac (diffUTCTime end start)

throughput :: Double
throughput = realToFrac iTERATIONS / duration
printf "%-25.25s%10.2f events/s\n" "Throughput" throughput
printf "%-25.25s%10.2f s\n" "Duration" duration
-- XXX: prettyPrintHistogram histo
-- meanTransactions <- hmean histo
-- printf "%-25.25s%10.2f\n" "Mean concurrent txs" meanTransactions
-- Just maxTransactions <- percentile 100.0 histo
-- printf "%-25.25s%10.2f\n" "Max concurrent txs" maxTransactions
-- printf "%-25.25s%10.2f ns\n" "Latency" ((meanTransactions / throughput) * 1000000)
49 changes: 24 additions & 25 deletions src/runtime-prototype/benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ set -euo pipefail
# Inspired by: https://sled.rs/perf.html#experimental-design

BENCHMARK_WORKLOAD1="bench-disruptor-sp"
BENCHMARK_GITHASH1="XXX: NOT USED YET"
BENCHMARK_WORKLOAD2="bench-disruptor-unagi-chan"
BENCHMARK_GITHASH2="XXX: NOT USED YET"
BENCHMARK_WORKLOAD2="bench-disruptor-sp-unboxed"
BENCHMARK_NUMBER_OF_RUNS=5
BENCHMARK_GHC_OPTS=""

BENCHMARK_BIN1="/tmp/${BENCHMARK_GITHASH1}-${BENCHMARK_WORKLOAD1}"
BENCHMARK_BIN2="/tmp/${BENCHMARK_GITHASH2}-${BENCHMARK_WORKLOAD2}"
BENCHMARK_CABAL_BUILD_OPTS=("--disable-profiling" "-O2")
BENCHMARK_CABAL_RUN_OPTS=("-O2")
BENCHMARK_PERF_EVENTS="L1-dcache-loads,L1-dcache-load-misses,LLC-loads,LLC-load-misses,dTLB-loads,dTLB-load-misses"

# Save info about current hardware and OS setup.
uname --kernel-name --kernel-release --kernel-version --machine --operating-system
Expand Down Expand Up @@ -46,36 +43,37 @@ for policy in /sys/devices/system/cpu/cpufreq/policy*; do
done

# Compile workloads.
cabal configure \
--disable-profiling \
--ghc-options='-O2 -threaded -rtsopts -with-rtsopts=-N -fproc-alignment=64
-fexcess-precision -fasm'

# XXX: enable benchmarking against old versions of same test
# BENCHMARK_GITHASH1="XXX: NOT USED YET"
# BENCHMARK_GITHASH2="XXX: NOT USED YET"
# BENCHMARK_BIN1="/tmp/${BENCHMARK_GITHASH1}-${BENCHMARK_WORKLOAD1}"
# BENCHMARK_BIN2="/tmp/${BENCHMARK_GITHASH2}-${BENCHMARK_WORKLOAD2}"
# if [ -n "${BENCHMARK_GITHASH1}" ] && [ ! -f "${BENCHMARK_BIN1}" ] ; then
# git checkout "${BENCHMARK_GITHASH1}"
# cabal build -O2 "${BENCHMARK_WORKLOAD1}"
# cp $(cabal list-bin "${BENCHMARK_WORKLOAD1}") "${BENCHMARK_BIN1}"
# fi

cabal build -O2 "${BENCHMARK_WORKLOAD2}"
cabal build "${BENCHMARK_CABAL_BUILD_OPTS[@]}" "${BENCHMARK_WORKLOAD2}"

# Disable turbo boost.
echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo

# The following run is just a (CPU) warm up, the results are discarded.
cabal run -O2 ${BENCHMARK_WORKLOAD2}
cabal run "${BENCHMARK_CABAL_RUN_OPTS[@]}" "${BENCHMARK_WORKLOAD2}"

# Run the benchmarks. By running workloads interleaved with each other, we
# reduce the risk of having particular transient system-wide effects impact only
# a single measurement.
for i in $(seq ${BENCHMARK_NUMBER_OF_RUNS}); do
echo "Running benchmark run ${i}"
perf stat -e cache-misses,branch-misses,dTLB-load-misses,iTLB-load-misses \
cabal run -O2 "${BENCHMARK_WORKLOAD1}" \
&>> /tmp/${BENCHMARK_WORKLOAD1}.txt
perf stat -e cache-misses,branch-misses,dTLB-load-misses,iTLB-load-misses \
cabal run -O2 "${BENCHMARK_WORKLOAD2}" \
&>> /tmp/${BENCHMARK_WORKLOAD2}.txt
perf stat --event="${BENCHMARK_PERF_EVENTS}" \
cabal run "${BENCHMARK_CABAL_RUN_OPTS[@]}" "${BENCHMARK_WORKLOAD1}" \
&>> "/tmp/${BENCHMARK_WORKLOAD1}.txt"
perf stat --event="${BENCHMARK_PERF_EVENTS}" \
cabal run "${BENCHMARK_CABAL_RUN_OPTS[@]}" "${BENCHMARK_WORKLOAD2}" \
&>> "/tmp/${BENCHMARK_WORKLOAD2}.txt"

# XXX: Can't get the below to work, ${BENCHMARK_WORKLOAD} env var doesn't
# get interpolated correctly into the string?
Expand All @@ -102,12 +100,13 @@ done
R_FILE="/tmp/${BENCHMARK_WORKLOAD1}-${BENCHMARK_WORKLOAD2}.r"

echo 'Input=("' > "${R_FILE}"
echo "Workload Throughput" >> "${R_FILE}"
awk -v wl1="${BENCHMARK_WORKLOAD1}" \
'/Throughput/ { print wl1, $2}' "/tmp/${BENCHMARK_WORKLOAD1}.txt" >> "${R_FILE}"
awk -v wl2="${BENCHMARK_WORKLOAD2}" \
'/Throughput/ { print wl2, $2}' "/tmp/${BENCHMARK_WORKLOAD2}.txt" >> "${R_FILE}"
echo '")' >> "${R_FILE}"
{ echo "Workload Throughput";
awk -v wl1="${BENCHMARK_WORKLOAD1}" \
'/Throughput/ { print wl1, $2 }' "/tmp/${BENCHMARK_WORKLOAD1}.txt";
awk -v wl2="${BENCHMARK_WORKLOAD2}" \
'/Throughput/ { print wl2, $2 }' "/tmp/${BENCHMARK_WORKLOAD2}.txt";
echo '")'
} >> "${R_FILE}"

cat << EOF >> "${R_FILE}"
Data = read.table(textConnection(Input),header=TRUE)
Expand Down
8 changes: 4 additions & 4 deletions src/runtime-prototype/src/Disruptor/MP/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ newEventConsumer rb handler s0 _barriers (Sleep n) = do
where
go' lo hi s | lo > hi = return s
| lo <= hi = do
e <- unsafeGet rb lo
e <- get rb lo
s' <- {-# SCC handler #-} handler s e lo (lo == hi)
go' (lo + 1) hi s'

Expand All @@ -69,9 +69,9 @@ waitFor consumed rb = go
if consumed < produced
then return produced
else do
yield -- NOTE: removing this or the sleep seems to cause
-- non-termination... XXX: Why though? the consumer should be
-- running on its own thread?
-- NOTE: Removing the sleep seems to cause non-termination... XXX: Why
-- though? the consumer should be running on its own thread?
yield
-- threadDelay 1
go -- SPIN
-- ^ XXX: waitStrategy should be passed in and acted on here.
Expand Down
49 changes: 30 additions & 19 deletions src/runtime-prototype/src/Disruptor/MP/RingBuffer.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
module Disruptor.MP.RingBuffer where

import Control.Concurrent (threadDelay, yield)
import Control.Exception (assert)
import Control.Monad (when)
import Data.Atomics (casIORef, peekTicket, readForCAS)
import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef, writeIORef)
import Data.Int (Int64)
import Data.Bits (popCount)
import Data.IORef
(IORef, atomicModifyIORef', newIORef, readIORef, writeIORef)
import Data.Int (Int64)
import Data.Vector.Mutable (IOVector)
import qualified Data.Vector.Mutable as Vector

Expand Down Expand Up @@ -87,8 +89,8 @@ setCachedGatingSequence rb = writeIORef (rbCachedGatingSequence rb)
setAvailable :: RingBuffer e -> SequenceNumber -> IO ()
setAvailable rb snr = Vector.write
(rbAvailableBuffer rb)
(index (rbCapacity rb) snr)
(availabilityFlag (rbCapacity rb) snr)
(index (capacity rb) snr)
(availabilityFlag (capacity rb) snr)
{-# INLINE setAvailable #-}

getAvailable :: RingBuffer e -> Int -> IO Int
Expand Down Expand Up @@ -157,7 +159,10 @@ nextBatch rb n = assert (n > 0 && fromIntegral n <= capacity rb) $ do
go = do
gatingSequence <- minimumSequence rb
if wrapPoint > gatingSequence
then go
then do
yield
-- threadDelay 1
go -- SPIN
else setCachedGatingSequence rb gatingSequence
{-# INLINABLE nextBatch #-}

Expand All @@ -183,8 +188,11 @@ tryNextBatch rb n = assert (n > 0) go
(success, _current') <- casIORef (rbCursor rb) current next
if success
then return (Some next)
else go -- SPIN
{-# INLINE tryNextBatch #-}
else do
threadDelay 1
-- yield
go -- SPIN
{-# INLINABLE tryNextBatch #-}

hasCapacity :: RingBuffer e -> Int -> SequenceNumber -> IO Bool
hasCapacity rb requiredCapacity cursorValue = do
Expand All @@ -202,7 +210,7 @@ hasCapacity rb requiredCapacity cursorValue = do
{-# INLINE hasCapacity #-}

set :: RingBuffer e -> SequenceNumber -> e -> IO ()
set rb snr e = Vector.write (rbEvents rb) (index (rbCapacity rb) snr) e
set rb snr e = Vector.unsafeWrite (rbEvents rb) (index (capacity rb) snr) e
{-# INLINE set #-}

publish :: RingBuffer e -> SequenceNumber -> IO ()
Expand All @@ -214,32 +222,35 @@ publishBatch :: RingBuffer e -> SequenceNumber -> SequenceNumber -> IO ()
publishBatch rb lo hi = mapM_ (setAvailable rb) [lo..hi]
{-# INLINE publishBatch #-}

unsafeGet :: RingBuffer e -> SequenceNumber -> IO e
unsafeGet rb current = go
-- |
get :: RingBuffer e -> SequenceNumber -> IO e
get rb current = go
where
availableValue :: Int
availableValue = availabilityFlag (rbCapacity rb) current
availableValue = availabilityFlag (capacity rb) current

ix :: Int
ix = index (rbCapacity rb) current
ix = index (capacity rb) current

go = do
v <- getAvailable rb ix
if v /= availableValue
then go
else Vector.read (rbEvents rb) ix
then do
threadDelay 1
go -- SPIN
else Vector.unsafeRead (rbEvents rb) ix

get :: RingBuffer e -> SequenceNumber -> IO (Maybe e)
get rb want = do
tryGet :: RingBuffer e -> SequenceNumber -> IO (Maybe e)
tryGet rb want = do
produced <- getCursor rb
if want <= produced
then Just <$> unsafeGet rb want
then Just <$> get rb want
else return Nothing
{-# INLINE get #-}
{-# INLINE tryGet #-}

isAvailable :: RingBuffer e -> SequenceNumber -> IO Bool
isAvailable rb snr =
(==) <$> Vector.read (rbAvailableBuffer rb) (index capacity snr)
(==) <$> Vector.unsafeRead (rbAvailableBuffer rb) (index capacity snr)
<*> pure (availabilityFlag capacity snr)
where
capacity = rbCapacity rb
Expand Down
7 changes: 3 additions & 4 deletions src/runtime-prototype/src/Disruptor/SP/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ waitFor consumed rb = go
if consumed < produced
then return produced
else do
yield -- NOTE: removing this or the sleep seems to cause
-- non-termination... XXX: Why though? the consumer should be
-- running on its own thread?
-- threadDelay 1
threadDelay 1 -- NOTE: removing the sleep seems to cause
-- non-termination... XXX: Why though? the consumer should be
-- running on its own thread?
go -- SPIN
-- ^ XXX: waitStrategy should be passed in and acted on here.
--
Expand Down
7 changes: 3 additions & 4 deletions src/runtime-prototype/src/Disruptor/SP/Unboxed/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ waitFor consumed rb = go
if consumed < produced
then return produced
else do
yield -- NOTE: removing this or the sleep seems to cause
-- non-termination... XXX: Why though? the consumer should be
-- running on its own thread?
-- threadDelay 1
-- NOTE: Removing the sleep seems to cause non-termination... XXX: Why
-- though? the consumer should be running on its own thread?
threadDelay 1
go -- SPIN
-- ^ XXX: waitStrategy should be passed in and acted on here.
--
Expand Down
21 changes: 21 additions & 0 deletions src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,25 @@ benchmark bench-disruptor-unagi-chan
-optc-ffast-math
-- -threaded -eventlog -rtsopts -with-rtsopts=-N
-threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010

benchmark bench-disruptor-mp
type: exitcode-stdio-1.0
hs-source-dirs: bench/disruptor
main-is: MP.hs
build-depends:
async
, atomic-primops
, base
, stunt-double
, time
ghc-options:
-O2
-fproc-alignment=64
-fexcess-precision
-fasm
-optc-O3
-optc-ffast-math
-- -threaded -eventlog -rtsopts -with-rtsopts=-N
-threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010
Loading

0 comments on commit 68bcbcb

Please sign in to comment.