Skip to content

Commit

Permalink
fix(sut): down throw errors in http client
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Feb 28, 2022
1 parent db06cfa commit 9c59a0a
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 23 deletions.
21 changes: 13 additions & 8 deletions src/sut/dumblog/src/Dumblog/Common/HttpClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
module Dumblog.Common.HttpClient where

import Control.Monad (when)
import Control.Exception (try)
import qualified Data.ByteString.Char8 as BSChar8
import Data.ByteString.Lazy.Char8 (ByteString)
import qualified Data.ByteString.Lazy.Char8 as LBSChar8
import Network.HTTP.Client
( Manager
, Request
, RequestBody(RequestBodyLBS)
, HttpException(HttpExceptionRequest)
, defaultManagerSettings
, httpLbs
, httpNoBody
Expand Down Expand Up @@ -53,14 +55,17 @@ newHttpClient host port = do

writeHttp :: HttpClient -> ByteString -> IO Int
writeHttp hc bs = do
resp <- httpLbs (hcWriteReq hc bs) (hcManager hc)
when (responseStatus resp /= ok200) $
return () -- XXX: increment hcErrors
return (read (LBSChar8.unpack (responseBody resp)))
eResp <- try (httpLbs (hcWriteReq hc bs) (hcManager hc))
case eResp of
Left (HttpExceptionRequest _req _exceptCtx) ->
-- XXX: increment hcErrors
return (-1)
Right resp -> return (read (LBSChar8.unpack (responseBody resp)))

readHttp :: HttpClient -> Int -> IO ByteString
readHttp hc ix = do
resp <- httpLbs (hcReadReq hc ix) (hcManager hc)
when (responseStatus resp /= ok200) $
return () -- XXX: increment hcErrors
return (responseBody resp)
eResp <- try (httpLbs (hcReadReq hc ix) (hcManager hc))
case eResp of
Left (HttpExceptionRequest _req _exceptCtx) ->
return "error" -- XXX: increment hcErrors
Right resp -> return (responseBody resp)
35 changes: 28 additions & 7 deletions src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

module Dumblog.Journal.FrontEnd where

import Control.Monad (when)
import Control.Concurrent.MVar (MVar, putMVar)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
Expand All @@ -13,16 +14,19 @@ import Network.HTTP.Types.Method
import Network.HTTP.Types.Status (status200, status400)
import qualified Network.Wai as Wai
import Network.Wai.Handler.Warp
import System.Timeout (timeout)

import Journal (Journal)
import qualified Journal
import qualified Journal.MP as Journal
import Journal.Types.AtomicCounter (AtomicCounter)
import qualified Journal.Types.AtomicCounter as AtomicCounter

import Dumblog.Journal.Blocker
import Dumblog.Journal.Codec
import Dumblog.Journal.Types

------------------------------------------------------------------------

data FrontEndInfo = FrontEndInfo
{ sequenceNumber :: AtomicCounter
, blockers :: Blocker (Either Response Response)
Expand All @@ -48,18 +52,35 @@ httpFrontend journal (FrontEndInfo c blocker) req respond = do
case mmethod of
Left err -> respond $ Wai.responseLBS status400 [] err
Right cmd -> do
Journal.appendBS journal (encode $ Envelope key cmd)
resp <- blockUntil blocker key
-- Journal.dumpJournal journal
case resp of
Left errMsg -> respond $ Wai.responseLBS status400 [] errMsg
Right msg -> respond $ Wai.responseLBS status200 [] msg
res <- Journal.appendBS journal (encode $ Envelope key cmd)
res' <- case res of
Left err -> do
putStrLn ("httpFrontend, append error: " ++ show err)
Journal.appendBS journal (encode $ Envelope key cmd)
Right () -> return (Right ())
case res' of
Left err -> do
putStrLn ("httpFrontend, append error 2: " ++ show err)
respond $ Wai.responseLBS status400 [] (LBS8.pack (show err))
Right () -> do
mResp <- timeout (3*1000*1000) (blockUntil blocker key)
-- Journal.dumpJournal journal
case mResp of
Nothing -> respond $ Wai.responseLBS status400 [] "MVar timeout"
Just (Left errMsg) -> respond $ Wai.responseLBS status400 [] errMsg
Just (Right msg) -> respond $ Wai.responseLBS status200 [] msg

runFrontEnd :: Port -> Journal -> FrontEndInfo -> Maybe (MVar ()) -> IO ()
runFrontEnd port journal feInfo mReady =
runSettings settings (httpFrontend journal feInfo)
where
settings
= setPort port
$ setLogger (\req status _mSize ->
when (status /= status200) $ do
putStrLn ("warp, request: " ++ show req)
putStrLn ("warp, status: " ++ show status)
print =<< Wai.strictRequestBody req)

$ maybe id (\ready -> setBeforeMainLoop (putMVar ready ())) mReady
$ defaultSettings
7 changes: 4 additions & 3 deletions src/sut/dumblog/src/Dumblog/Journal/Main.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Dumblog.Journal.Main where

import Control.Concurrent.Async (withAsync)
import Control.Concurrent.Async (withAsync, link)
import Control.Concurrent.MVar (MVar)
import Control.Monad (when)
import Journal (Journal)
Expand Down Expand Up @@ -90,7 +90,7 @@ journalDumblog _capacity port mReady = do
fpm = "/tmp/dumblog.metrics"
fps = "/tmp/dumblog.snapshot"
opts = Journal.defaultOptions { Journal.oLogger = Logger.nullLogger }
untilSnapshot = 10
untilSnapshot = 1000000
mSnapshot <- Snapshot.readFile fps
journal <- fetchJournal mSnapshot fpj opts
metrics <- Metrics.newMetrics dumblogSchema fpm
Expand All @@ -99,5 +99,6 @@ journalDumblog _capacity port mReady = do
(state, events) <- fetchState mSnapshot journal
let feInfo = FrontEndInfo counter blocker
wInfo = WorkerInfo blocker fps events untilSnapshot
withAsync (worker journal metrics wInfo state) $ \_async ->
withAsync (worker journal metrics wInfo state) $ \a -> do
link a
runFrontEnd port journal feInfo mReady
17 changes: 12 additions & 5 deletions src/sut/dumblog/src/Dumblog/Journal/Worker.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}

module Dumblog.Journal.Worker where

import Control.Concurrent (threadDelay)
Expand All @@ -10,7 +11,8 @@ import qualified Data.Char as Char
import Data.Time (getCurrentTime, diffUTCTime)

import Journal (Journal)
import qualified Journal
import Journal (jBytesConsumed)
import qualified Journal.MP as Journal
import qualified Journal.Internal.Metrics as Metrics
import qualified Journal.Types.AtomicCounter as AtomicCounter

Expand Down Expand Up @@ -38,14 +40,16 @@ timeIt metrics action = do
Metrics.measure metrics ResponseTime (realToFrac . (*1000) $ diffUTCTime endTime startTime)
return result

wakeUpFrontend :: Blocker (Either Response Response) -> Int -> Either Response Response -> IO ()
wakeUpFrontend :: Blocker (Either Response Response) -> Int -> Either Response Response
-> IO ()
wakeUpFrontend blocker key resp = do
b <- wakeUp blocker key resp
unless b $
error $ "Frontend never added MVar"

worker :: Journal -> DumblogMetrics -> WorkerInfo -> InMemoryDumblog -> IO ()
worker journal metrics (WorkerInfo blocker snapshotFile eventCount untilSnapshot) = go eventCount
worker journal metrics (WorkerInfo blocker snapshotFile eventCount untilSnapshot) =
go eventCount
where
go ev s
| ev >= untilSnapshot = do
Expand All @@ -61,9 +65,12 @@ worker journal metrics (WorkerInfo blocker snapshotFile eventCount untilSnapshot
let Envelope key cmd = decode entry
{- // in case of decode error
Metrics.incrCounter metrics ErrorsEncountered 1
wakeUpFrontend blocker key $ Left "Couldn't parse request" -- should be better error message
-}
wakeUpFrontend blocker key $ Left "Couldn't parse request"
-- ^ should be better error message
-}
-- putStrLn ("worker: key: " ++ show key ++ ", cmd: " ++ show cmd)
(s', r) <- runCommand s cmd
-- putStrLn ("worker: key: " ++ show key ++ ", response: " ++ show r)
wakeUpFrontend blocker key (Right r)
return (succ ev, s')
}
Expand Down
57 changes: 57 additions & 0 deletions src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module Dumblog.ZeroCopy.HttpServer where

import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS8
import Control.Exception (bracketOnError)
import Network.Socket
import Network.Socket.ByteString (sendAll)
import GHC.Event
import Control.Concurrent

------------------------------------------------------------------------

httpServer :: Int -> IO ()
httpServer port = withSocketsDo $ do
putStrLn "Starting http server on port 5002"
sock <- listenOn port
Just mgr <- getSystemEventManager
key <- withFdSocket sock $ \fd ->
registerFd mgr (client sock) (fromIntegral fd) evtRead MultiShot
loop
where
loop = do
threadDelay (10*1000*1000)
loop

client :: Socket -> FdKey -> Event -> IO ()
client sock _ _ = do
(conn, _) <- accept sock
sendAll conn msg
close conn

msg :: ByteString
msg = BS8.pack "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"


listenOn :: Int -> IO Socket
listenOn port = do
addr <- resolve
open addr
where
resolve = do
let hints = defaultHints {
addrFlags = [AI_PASSIVE]
, addrSocketType = Stream
}
head <$> getAddrInfo (Just hints) (Just "localhost") (Just (show port))

open addr = bracketOnError (openSocket addr) (const (return ())) $ \sock -> do
-- close $ \sock -> do
setSocketOption sock ReuseAddr 1
withFdSocket sock setCloseOnExecIfNeeded
bind sock (addrAddress addr)
listen sock 1024
return sock

openSocket :: AddrInfo -> IO Socket
openSocket addr = socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
11 changes: 11 additions & 0 deletions src/sut/dumblog/src/Dumblog/ZeroCopy/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module Dumblog.ZeroCopy.Main where

import Dumblog.ZeroCopy.HttpServer

------------------------------------------------------------------------

zeroCopyDumblog :: Int -> Int -> IO ()
zeroCopyDumblog _capacity port = httpServer port

main :: IO ()
main = zeroCopyDumblog (64 * 1024) 8054

0 comments on commit 9c59a0a

Please sign in to comment.