Skip to content

Commit

Permalink
Add support for persistent connections in the client (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed May 11, 2020
1 parent 6229a06 commit f9339f3
Show file tree
Hide file tree
Showing 24 changed files with 881 additions and 258 deletions.
15 changes: 11 additions & 4 deletions async/httpaf_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,12 @@ module Server = struct
end

module Client = struct
let request ?(config=Config.default) socket request ~error_handler ~response_handler =
type t = Client_connection.t

let create_connection ?(config=Config.default) socket =
let fd = Socket.fd socket in
let writev = Faraday_async.writev_of_fd fd in
let request_body, conn =
Client_connection.request request ~error_handler ~response_handler in
let conn = Client_connection.create ~config in
let read_complete = Ivar.create () in
let buffer = Buffer.create config.read_buffer_size in
let rec reader_thread () =
Expand Down Expand Up @@ -211,5 +212,11 @@ module Client = struct
>>| fun () ->
if not (Fd.is_closed fd)
then don't_wait_for (Fd.close fd));
request_body
conn

let request = Client_connection.request

let shutdown = Client_connection.shutdown

let is_closed = Client_connection.is_closed
end
14 changes: 12 additions & 2 deletions async/httpaf_async.mli
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ module Server : sig
end

module Client : sig
val request
: ?config : Config.t
type t

val create_connection
: ?config:Config.t
-> ([`Active], [< Socket.Address.t]) Socket.t
-> t

val request
: t
-> Request.t
-> error_handler : Client_connection.error_handler
-> response_handler : Client_connection.response_handler
-> [`write] Body.t

val shutdown : t -> unit

val is_closed : t -> bool
end
5 changes: 3 additions & 2 deletions examples/async/async_get.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ let main port host () =
let finished = Ivar.create () in
let response_handler = Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished) in
let headers = Headers.of_list [ "host", host ] in
let connection = Client.create_connection socket in
let request_body =
Client.request
~error_handler
connection
~response_handler
socket
~error_handler
(Request.create ~headers `GET "/")
in
Body.close_writer request_body;
Expand Down
53 changes: 53 additions & 0 deletions examples/async/async_get_pipelined.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
open! Core
open Async

open Httpaf
open Httpaf_async

let error_handler _ = assert false

let main port host () =
let where_to_connect = Tcp.Where_to_connect.of_host_and_port { host; port } in
Tcp.connect_sock where_to_connect
>>= fun socket ->
let finished = Ivar.create () in
let response_handler = Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished) in
let request_headers =
Request.create ~headers:(Headers.of_list [ "host", host ]) `GET "/"
in
let connection = Client.create_connection socket in
let request_body =
Client.request
connection
~response_handler
~error_handler
request_headers
in
let finished' = Ivar.create () in
let response_handler' =
Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished')
in
let request_body' =
Client.request
connection
~response_handler:response_handler'
~error_handler
request_headers
in
Body.close_writer request_body';
Body.close_writer request_body;
Async.Deferred.all_unit [Ivar.read finished; Ivar.read finished'] >>| fun () ->
Client.shutdown connection
;;

let () =
Command.async
~summary:"Start a hello world Async client"
Command.Param.(
map (both
(flag "-p" (optional_with_default 80 int)
~doc:"int destination port")
(anon ("host" %: string)))
~f:(fun (port, host) ->
(fun () -> main port host ())))
|> Command.run
5 changes: 3 additions & 2 deletions examples/async/async_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ let main port host () =
; "host" , host
]
in
let connection = Client.create_connection socket in
let request_body =
Client.request
~error_handler
connection
~response_handler
socket
~error_handler
(Request.create ~headers `POST "/")
in
let stdin = Lazy.force Reader.stdin in
Expand Down
5 changes: 3 additions & 2 deletions examples/async/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(executables
(libraries httpaf httpaf-async httpaf_examples async core)
(names async_echo_post async_get async_post))
(names async_echo_post async_get async_get_pipelined async_post))

(alias
(name examples)
(deps (glob_files *.exe)))
(deps
(glob_files *.exe)))
3 changes: 2 additions & 1 deletion examples/lwt/dune
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(executables
(libraries httpaf httpaf-lwt-unix httpaf_examples base stdio lwt lwt.unix)
(names lwt_get lwt_post lwt_echo_post lwt_https_get lwt_https_server))
(names lwt_get lwt_get_pipelined lwt_post lwt_echo_post lwt_https_get
lwt_https_server))

(alias
(name examples)
Expand Down
20 changes: 18 additions & 2 deletions examples/lwt/lwt_get.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,31 @@ let main port host =
Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
in
let headers = Headers.of_list [ "host", host ] in
Client.create_connection socket >>= fun connection ->
let request_body =
Client.request
connection
~response_handler
~error_handler
(Request.create ~headers `GET "/")
in
Body.close_writer request_body;
finished >>= fun () ->
let finished, notify_finished = Lwt.wait () in
let response_handler =
Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
in
let headers = Headers.of_list [ "host", host ] in
let request_body =
Client.request
connection
~response_handler
socket
~error_handler
(Request.create ~headers `GET "/")
in
Body.close_writer request_body;
finished
finished >|= fun () ->
Client.shutdown connection
;;

let () =
Expand Down
61 changes: 61 additions & 0 deletions examples/lwt/lwt_get_pipelined.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
open Base
open Lwt.Infix
module Arg = Caml.Arg

open Httpaf
open Httpaf_lwt_unix

let error_handler _ = assert false

let main port host =
Lwt_unix.getaddrinfo host (Int.to_string port) [Unix.(AI_FAMILY PF_INET)]
>>= fun addresses ->
let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Lwt_unix.connect socket (List.hd_exn addresses).Unix.ai_addr
>>= fun () ->
let finished, notify_finished = Lwt.wait () in
let response_handler =
Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
in
let request_headers =
Request.create ~headers:(Headers.of_list [ "host", host ]) `GET "/"
in
Client.create_connection socket >>= fun connection ->
let request_body =
Client.request
connection
~response_handler
~error_handler
request_headers
in
let finished', notify_finished' = Lwt.wait () in
let response_handler' =
Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished')
in
let request_body' =
Client.request
connection
~response_handler:response_handler'
~error_handler
request_headers
in
Body.close_writer request_body';
Body.close_writer request_body;
Lwt.join [finished; finished'] >|= fun () ->
Client.shutdown connection
;;

let () =
let host = ref None in
let port = ref 80 in
Arg.parse
["-p", Set_int port, " Port number (80 by default)"]
(fun host_argument -> host := Some host_argument)
"lwt_get.exe [-p N] HOST";
let host =
match !host with
| None -> failwith "No hostname provided"
| Some host -> host
in
Lwt_main.run (main !port host)
;;
7 changes: 4 additions & 3 deletions examples/lwt/lwt_https_get.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ let main port host =
Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
in
let headers = Headers.of_list [ "host", host ] in
Client.TLS.request
Client.TLS.create_connection socket >>= fun connection ->
let request_body = Client.TLS.request
connection
~error_handler
~response_handler
socket
(Request.create ~headers `GET "/")
>>= fun request_body ->
in
Body.close_writer request_body;
finished
;;
Expand Down
5 changes: 3 additions & 2 deletions examples/lwt/lwt_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ let main port host =
; "host" , host
]
in
Client.create_connection socket >>= fun connection ->
let request_body =
Client.request
~error_handler
connection
~response_handler
socket
~error_handler
(Request.create ~headers `POST "/")
in
Body.write_string request_body body;
Expand Down
1 change: 1 addition & 0 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ let create buffer =

let create_empty () =
let t = create Bigstringaf.empty in
t.write_final_if_chunked <- false;
Faraday.close t.faraday;
t

Expand Down
Loading

0 comments on commit f9339f3

Please sign in to comment.