Skip to content


feat(sut): finish zero copy dumblog variant
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 3, 2022
1 parent 42b3161 commit 48e3431
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 32 deletions.
2 changes: 2 additions & 0 deletions src/sut/dumblog/dumblog.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ library
, stm
, text
, time
, vector
, wai
, warp
if flag(persistent-sqlite)
Expand All @@ -75,6 +76,7 @@ library
if flag(persistent-sqlite)
exposed-modules: Dumblog.SQLite.DBPersistent

Expand Down
48 changes: 16 additions & 32 deletions src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import qualified Data.ByteString.Char8 as BS
import Control.Exception (bracketOnError)
import Network.Socket
import Network.Socket.ByteString (sendAll)
import GHC.IO.Device (SeekMode(AbsoluteSeek))
import GHC.Event
import Control.Concurrent
import System.Posix.IO

import Journal.Types (Journal, jLogger, hEADER_LENGTH)
import Journal.MP
import Journal.Internal.BufferClaim
import Journal.Internal.ByteBufferPtr

import Dumblog.ZeroCopy.Sendfile
import Dumblog.ZeroCopy.State


Expand All @@ -27,10 +25,11 @@ httpServer jour port = withSocketsDo $ do
numCapabilities <- getNumCapabilities
putStrLn ("Starting http server on port: " ++ show port)
putStrLn ("Capabilities: : " ++ show numCapabilities)
state <- initState 40000 "/tmp/dumblog-zero-copy.journal"
sock <- listenOn port
mgr <- fromMaybe (error "Compile with -threaded") <$> getSystemEventManager
_key <- withFdSocket sock $ \fd ->
registerFd mgr (client jour sock) (fromIntegral fd) evtRead MultiShot
registerFd mgr (client jour state sock) (fromIntegral fd) evtRead MultiShot
loop = do
Expand Down Expand Up @@ -61,49 +60,34 @@ parseOffsetLength bs = do
let (headers, _match) = BS.breakSubstring "\r\n\r\n" bs
return (BS.length headers + BS.length "POST" + BS.length "\r\n\r\n", len)

client :: Journal -> Socket -> FdKey -> Event -> IO ()
client jour sock fdKey event = do
eRes <- tryClaim jour 4096
client :: Journal -> State -> Socket -> FdKey -> Event -> IO ()
client jour state sock fdKey event = do
let bufSize = 4096 - hEADER_LENGTH
eRes <- tryClaim jour bufSize
case eRes of
Left err -> do
putStrLn ("client, err: " ++ show err)
client jour sock fdKey event
client jour state sock fdKey event
Right (offset, bufferClaim) -> do
(conn, _) <- accept sock
bytesRecv <- recvBytes bufferClaim conn 4096
putStrLn ("client, bytes received: " ++ show bytesRecv)
bytesRecv <- recvBytes bufferClaim conn bufSize
commit bufferClaim (jLogger jour)

-- NOTE: The following copies bytes. It would be better to do the parsing at
-- the `ByteBuffer` level rather than `ByteString` level...
req <- getByteStringAt (bcByteBuffer bufferClaim) hEADER_LENGTH bytesRecv

print req
case parseCommand req of
Just (Write offset' len) -> do
putStrLn ("BODY: " ++ BS.unpack (BS.take len (BS.drop offset' req)))
fd <- openFd "/tmp/dumblog-zero-copy.journal" ReadOnly Nothing defaultFileFlags
-- fdSeek fd AbsoluteSeek
-- (fromIntegral offset' + fromIntegral hEADER_LENGTH)
-- (s, _readBytes) <- fdRead fd (fromIntegral len)
sendAll conn (httpHeader len)
-- NOTE: For subsequent requests we need to take `offset` into account also.
_sentBytes <- sendfile conn fd (fromIntegral (offset' + hEADER_LENGTH))
(fromIntegral len)
return ()

Just (Read _ix) -> do
sendAll conn msg
ix <- writeLocation state offset (Location (fromIntegral offset') (fromIntegral len))
sendAll conn (response (BS.pack (show ix)))
Just (Read ix) -> readSendfile state conn ix
Nothing -> return ()
close conn

msg :: ByteString
msg = BS.pack "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"

httpHeader :: Int -> ByteString
httpHeader len =
BS.pack "HTTP/1.0 200 OK\r\nContent-Length: " <> BS.pack (show len) <> "\r\n\r\n"

response :: ByteString -> ByteString
response body =
BS.pack "HTTP/1.0 200 OK\r\nContent-Length: " <> BS.pack (show (BS.length body)) <>
"\r\n\r\n" <> body

listenOn :: Int -> IO Socket
listenOn port = do
Expand Down
59 changes: 59 additions & 0 deletions src/sut/dumblog/src/Dumblog/ZeroCopy/State.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
module Dumblog.ZeroCopy.State where

import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.Int (Int64)
import Data.Vector.Mutable (IOVector)
import qualified Data.Vector.Mutable as Vector
import Data.Word (Word16, Word64)
import Network.Socket (Socket)
import Network.Socket.ByteString (sendAll)
import System.Posix.IO (OpenMode(ReadOnly), defaultFileFlags, openFd)
import System.Posix.Types (Fd)

import Dumblog.ZeroCopy.Sendfile
import Journal.Types (hEADER_LENGTH)
import Journal.Types.AtomicCounter


data State = State
{ sLocations :: !(IOVector Location)
, sIndex :: !AtomicCounter
, sFd :: !Fd

initState :: Int -> FilePath -> IO State
initState size fp
= State
<$> size
<*> newCounter 0
<*> openFd fp ReadOnly Nothing defaultFileFlags

data Location = Location
{ lOffset :: !Word64
, lLength :: !Word16

writeLocation :: State -> Int64 -> Location -> IO Int
writeLocation s offset loc = do
ix <- getAndIncrCounter 1 (sIndex s)
Vector.write (sLocations s) ix
(loc { lOffset = fromIntegral (offset - 4096) + lOffset loc })
return ix

readLocation :: State -> Int -> IO Location
readLocation s ix = (sLocations s) ix

readSendfile :: State -> Socket -> Int -> IO ()
readSendfile s sock ix = do
loc <- readLocation s ix
sendAll sock (httpHeader (lLength loc))
_bytesSent <- sendfile sock (sFd s)
(fromIntegral (lOffset loc) + fromIntegral hEADER_LENGTH)
(fromIntegral (lLength loc))
return ()

httpHeader :: Word16 -> ByteString
httpHeader len =
BS.pack "HTTP/1.0 200 OK\r\nContent-Length: " <> BS.pack (show len) <> BS.pack "\r\n\r\n"

0 comments on commit 48e3431

Please sign in to comment.