Skip to content

Commit

Permalink
feat(sut): use recv bytes from journal in zero copy variant
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 2, 2022
1 parent 65cf9c1 commit 26889c2
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 21 deletions.
6 changes: 3 additions & 3 deletions src/journal/src/Journal/Internal/BufferClaim.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Journal.Internal.Utils

------------------------------------------------------------------------

newtype BufferClaim = BufferClaim ByteBuffer
newtype BufferClaim = BufferClaim { bcByteBuffer :: ByteBuffer }

newBufferClaim :: ByteBuffer -> TermOffset -> Int -> IO BufferClaim
newBufferClaim src (TermOffset offset) len = BufferClaim <$>
Expand All @@ -23,9 +23,9 @@ putBS (BufferClaim bb) offset bs = putByteStringAt bb offset bs

withPtr :: BufferClaim -> (Ptr Word8 -> IO a) -> IO a
withPtr (BufferClaim bb) k = do
Position offset <- readPosition bb
Slice slice <- readSlice bb
-- XXX: boundcheck?
withForeignPtr (bbData bb `plusForeignPtr` offset) k
withForeignPtr (bbData bb `plusForeignPtr` slice) k

commit :: BufferClaim -> Logger -> IO ()
commit (BufferClaim bb) logger = do
Expand Down
5 changes: 5 additions & 0 deletions src/journal/src/Journal/MP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int (Int64)
import qualified Data.Vector as Vector
import Network.Socket (Socket, recvBuf)
import Foreign (plusPtr)

import Journal.Internal
( AppendError(..)
Expand Down Expand Up @@ -37,6 +39,9 @@ appendBS jour bs = do
putBS bufferClaim hEADER_LENGTH bs
Right <$> commit bufferClaim (jLogger jour)

recvBytes :: BufferClaim -> Socket -> Int -> IO Int
recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock (ptr `plusPtr` hEADER_LENGTH) len

readJournal :: Journal -> IO (Maybe ByteString)
readJournal jour = do
offset <- readCounter (jBytesConsumed jour)
Expand Down
51 changes: 36 additions & 15 deletions src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@ import Network.Socket.ByteString (sendAll, recv)
import GHC.Event
import Control.Concurrent

import Journal.Types (Journal, jLogger, hEADER_LENGTH)
import Journal.MP
import Journal.Internal.BufferClaim
import Journal.Internal.ByteBufferPtr

------------------------------------------------------------------------

httpServer :: Int -> IO ()
httpServer port = withSocketsDo $ do
httpServer :: Journal -> Int -> IO ()
httpServer jour port = withSocketsDo $ do
numCapabilities <- getNumCapabilities
putStrLn ("Starting http server on port: " ++ show port)
putStrLn ("Capabilities: : " ++ show numCapabilities)
sock <- listenOn port
mgr <- fromMaybe (error "Compile with -threaded") <$> getSystemEventManager
_key <- withFdSocket sock $ \fd ->
registerFd mgr (client sock) (fromIntegral fd) evtRead MultiShot
registerFd mgr (client jour sock) (fromIntegral fd) evtRead MultiShot
loop
where
loop = do
Expand Down Expand Up @@ -52,18 +57,31 @@ parseOffsetLength bs = do
let (headers, _match) = BS.breakSubstring "\r\n\r\n" bs
return (BS.length headers + BS.length "POST" + BS.length "\r\n\r\n", len)

client :: Socket -> FdKey -> Event -> IO ()
client sock _ _ = do
(conn, _) <- accept sock
req <- recv conn 4096
print (parseCommand req)
case parseCommand req of
Just (Write offset len) ->
putStrLn ("BODY: " ++ BS.unpack (BS.take len (BS.drop offset req)))
Just (Read _ix) -> return ()
Nothing -> return ()
sendAll conn msg
close conn
client :: Journal -> Socket -> FdKey -> Event -> IO ()
client jour sock fdKey event = do
eRes <- tryClaim jour 4096
case eRes of
Left err -> do
putStrLn ("client, err: " ++ show err)
client jour sock fdKey event
Right (offset, bufferClaim) -> do
(conn, _) <- accept sock
bytesRecv <- recvBytes bufferClaim conn 4096
putStrLn ("client, bytes received: " ++ show bytesRecv)
commit bufferClaim (jLogger jour)

-- NOTE: The following copies bytes. It would be better to do the parsing at
-- the `ByteBuffer` level rather than `ByteString` level...
req <- getByteStringAt (bcByteBuffer bufferClaim) hEADER_LENGTH bytesRecv

print req
case parseCommand req of
Just (Write offset len) ->
putStrLn ("BODY: " ++ BS.unpack (BS.take len (BS.drop offset req)))
Just (Read _ix) -> return ()
Nothing -> return ()
sendAll conn msg
close conn

msg :: ByteString
msg = BS.pack "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"
Expand Down Expand Up @@ -91,3 +109,6 @@ listenOn port = do

openSocket :: AddrInfo -> IO Socket
openSocket addr = socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)

-- foreign import ccall unsafe "sys/sendfile.h sendfile"
-- c_sendfile :: Fd -> Fd -> Ptr Int64 -> Word64 -> IO Int64
14 changes: 11 additions & 3 deletions src/sut/dumblog/src/Dumblog/ZeroCopy/Main.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
module Dumblog.ZeroCopy.Main where

import Journal.Types (Journal)
import Journal (defaultOptions, allocateJournal, startJournal)

import Dumblog.ZeroCopy.HttpServer

------------------------------------------------------------------------

zeroCopyDumblog :: Int -> Int -> IO ()
zeroCopyDumblog _capacity port = httpServer port
zeroCopyDumblog :: Journal -> Int -> IO ()
zeroCopyDumblog jour port = httpServer jour port

main :: IO ()
main = zeroCopyDumblog (64 * 1024) 8054
main = do
let fp = "/tmp/dumblog-zero-copy.journal"
opts = defaultOptions
allocateJournal fp opts
jour <- startJournal fp opts
zeroCopyDumblog jour 8054

0 comments on commit 26889c2

Please sign in to comment.