diff --git a/src/sut/dumblog/dumblog.cabal b/src/sut/dumblog/dumblog.cabal index 8b008cd9..cb4a68ba 100644 --- a/src/sut/dumblog/dumblog.cabal +++ b/src/sut/dumblog/dumblog.cabal @@ -50,6 +50,7 @@ library , stm , text , time + , vector , wai , warp if flag(persistent-sqlite) @@ -75,6 +76,7 @@ library Dumblog.ZeroCopy.HttpServer Dumblog.ZeroCopy.Main Dumblog.ZeroCopy.Sendfile + Dumblog.ZeroCopy.State if flag(persistent-sqlite) exposed-modules: Dumblog.SQLite.DBPersistent diff --git a/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs b/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs index a06bf6b4..79e49c05 100644 --- a/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs +++ b/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs @@ -8,17 +8,15 @@ import qualified Data.ByteString.Char8 as BS import Control.Exception (bracketOnError) import Network.Socket import Network.Socket.ByteString (sendAll) -import GHC.IO.Device (SeekMode(AbsoluteSeek)) import GHC.Event import Control.Concurrent -import System.Posix.IO import Journal.Types (Journal, jLogger, hEADER_LENGTH) import Journal.MP import Journal.Internal.BufferClaim import Journal.Internal.ByteBufferPtr -import Dumblog.ZeroCopy.Sendfile +import Dumblog.ZeroCopy.State ------------------------------------------------------------------------ @@ -27,10 +25,11 @@ httpServer jour port = withSocketsDo $ do numCapabilities <- getNumCapabilities putStrLn ("Starting http server on port: " ++ show port) putStrLn ("Capabilities: : " ++ show numCapabilities) + state <- initState 40000 "/tmp/dumblog-zero-copy.journal" sock <- listenOn port mgr <- fromMaybe (error "Compile with -threaded") <$> getSystemEventManager _key <- withFdSocket sock $ \fd -> - registerFd mgr (client jour sock) (fromIntegral fd) evtRead MultiShot + registerFd mgr (client jour state sock) (fromIntegral fd) evtRead MultiShot loop where loop = do @@ -61,49 +60,34 @@ parseOffsetLength bs = do let (headers, _match) = BS.breakSubstring "\r\n\r\n" bs return (BS.length headers + BS.length "POST" + BS.length "\r\n\r\n", len) -client :: Journal -> Socket -> FdKey -> Event -> IO () -client jour sock fdKey event = do - eRes <- tryClaim jour 4096 +client :: Journal -> State -> Socket -> FdKey -> Event -> IO () +client jour state sock fdKey event = do + let bufSize = 4096 - hEADER_LENGTH + eRes <- tryClaim jour bufSize case eRes of Left err -> do putStrLn ("client, err: " ++ show err) - client jour sock fdKey event + client jour state sock fdKey event Right (offset, bufferClaim) -> do (conn, _) <- accept sock - bytesRecv <- recvBytes bufferClaim conn 4096 - putStrLn ("client, bytes received: " ++ show bytesRecv) + bytesRecv <- recvBytes bufferClaim conn bufSize commit bufferClaim (jLogger jour) -- NOTE: The following copies bytes. It would be better to do the parsing at -- the `ByteBuffer` level rather than `ByteString` level... req <- getByteStringAt (bcByteBuffer bufferClaim) hEADER_LENGTH bytesRecv - - print req case parseCommand req of Just (Write offset' len) -> do - putStrLn ("BODY: " ++ BS.unpack (BS.take len (BS.drop offset' req))) - fd <- openFd "/tmp/dumblog-zero-copy.journal" ReadOnly Nothing defaultFileFlags - -- fdSeek fd AbsoluteSeek - -- (fromIntegral offset' + fromIntegral hEADER_LENGTH) - -- (s, _readBytes) <- fdRead fd (fromIntegral len) - sendAll conn (httpHeader len) - -- NOTE: For subsequent requests we need to take `offset` into account also. - _sentBytes <- sendfile conn fd (fromIntegral (offset' + hEADER_LENGTH)) - (fromIntegral len) - return () - - Just (Read _ix) -> do - sendAll conn msg + ix <- writeLocation state offset (Location (fromIntegral offset') (fromIntegral len)) + sendAll conn (response (BS.pack (show ix))) + Just (Read ix) -> readSendfile state conn ix Nothing -> return () close conn -msg :: ByteString -msg = BS.pack "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n" - -httpHeader :: Int -> ByteString -httpHeader len = - BS.pack "HTTP/1.0 200 OK\r\nContent-Length: " <> BS.pack (show len) <> "\r\n\r\n" - +response :: ByteString -> ByteString +response body = + BS.pack "HTTP/1.0 200 OK\r\nContent-Length: " <> BS.pack (show (BS.length body)) <> + "\r\n\r\n" <> body listenOn :: Int -> IO Socket listenOn port = do diff --git a/src/sut/dumblog/src/Dumblog/ZeroCopy/State.hs b/src/sut/dumblog/src/Dumblog/ZeroCopy/State.hs new file mode 100644 index 00000000..d6a9e305 --- /dev/null +++ b/src/sut/dumblog/src/Dumblog/ZeroCopy/State.hs @@ -0,0 +1,59 @@ +module Dumblog.ZeroCopy.State where + +import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as BS +import Data.Int (Int64) +import Data.Vector.Mutable (IOVector) +import qualified Data.Vector.Mutable as Vector +import Data.Word (Word16, Word64) +import Network.Socket (Socket) +import Network.Socket.ByteString (sendAll) +import System.Posix.IO (OpenMode(ReadOnly), defaultFileFlags, openFd) +import System.Posix.Types (Fd) + +import Dumblog.ZeroCopy.Sendfile +import Journal.Types (hEADER_LENGTH) +import Journal.Types.AtomicCounter + +------------------------------------------------------------------------ + +data State = State + { sLocations :: !(IOVector Location) + , sIndex :: !AtomicCounter + , sFd :: !Fd + } + +initState :: Int -> FilePath -> IO State +initState size fp + = State + <$> Vector.new size + <*> newCounter 0 + <*> openFd fp ReadOnly Nothing defaultFileFlags + +data Location = Location + { lOffset :: !Word64 + , lLength :: !Word16 + } + +writeLocation :: State -> Int64 -> Location -> IO Int +writeLocation s offset loc = do + ix <- getAndIncrCounter 1 (sIndex s) + Vector.write (sLocations s) ix + (loc { lOffset = fromIntegral (offset - 4096) + lOffset loc }) + return ix + +readLocation :: State -> Int -> IO Location +readLocation s ix = Vector.read (sLocations s) ix + +readSendfile :: State -> Socket -> Int -> IO () +readSendfile s sock ix = do + loc <- readLocation s ix + sendAll sock (httpHeader (lLength loc)) + _bytesSent <- sendfile sock (sFd s) + (fromIntegral (lOffset loc) + fromIntegral hEADER_LENGTH) + (fromIntegral (lLength loc)) + return () + +httpHeader :: Word16 -> ByteString +httpHeader len = + BS.pack "HTTP/1.0 200 OK\r\nContent-Length: " <> BS.pack (show len) <> BS.pack "\r\n\r\n"