From 2291d7bb6e886a51c10894715d9176b5820d5736 Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Wed, 10 Jul 2019 22:22:26 -0700 Subject: [PATCH] Add support for persistent connections in the client (#5) --- async/httpaf_async.ml | 15 +- async/httpaf_async.mli | 14 +- examples/async/async_get.ml | 5 +- examples/async/async_get_pipelined.ml | 53 ++++ examples/async/async_post.ml | 5 +- examples/async/dune | 5 +- examples/lwt/dune | 3 +- examples/lwt/lwt_get.ml | 20 +- examples/lwt/lwt_get_pipelined.ml | 61 ++++ examples/lwt/lwt_https_get.ml | 7 +- examples/lwt/lwt_post.ml | 5 +- lib/body.ml | 1 + lib/client_connection.ml | 399 +++++++++++++++----------- lib/httpaf.ml | 2 +- lib/httpaf.mli | 16 +- lib/parse.ml | 19 +- lib/respd.ml | 118 ++++++++ lib/server_connection.ml | 13 +- lib_test/test_client_connection.ml | 253 ++++++++++++++-- lwt-unix/httpaf_lwt_unix.ml | 18 +- lwt-unix/httpaf_lwt_unix.mli | 42 ++- lwt/httpaf_lwt.ml | 19 +- lwt/httpaf_lwt.mli | 16 +- mirage/httpaf_mirage.mli | 30 +- 24 files changed, 881 insertions(+), 258 deletions(-) create mode 100644 examples/async/async_get_pipelined.ml create mode 100644 examples/lwt/lwt_get_pipelined.ml create mode 100644 lib/respd.ml diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 4db6796..7f6aa3c 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -154,11 +154,12 @@ module Server = struct end module Client = struct - let request ?(config=Config.default) socket request ~error_handler ~response_handler = + type t = Client_connection.t + + let create_connection ?(config=Config.default) socket = let fd = Socket.fd socket in let writev = Faraday_async.writev_of_fd fd in - let request_body, conn = - Client_connection.request request ~error_handler ~response_handler in + let conn = Client_connection.create ~config in let read_complete = Ivar.create () in let buffer = Buffer.create config.read_buffer_size in let rec reader_thread () = @@ -211,5 +212,11 @@ module Client = struct >>| fun () -> if not (Fd.is_closed fd) then don't_wait_for (Fd.close fd)); - request_body + conn + + let request = Client_connection.request + + let shutdown = Client_connection.shutdown + + let is_closed = Client_connection.is_closed end diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index f120624..670d669 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -14,11 +14,21 @@ module Server : sig end module Client : sig - val request - : ?config : Config.t + type t + + val create_connection + : ?config:Config.t -> ([`Active], [< Socket.Address.t]) Socket.t + -> t + + val request + : t -> Request.t -> error_handler : Client_connection.error_handler -> response_handler : Client_connection.response_handler -> [`write] Body.t + + val shutdown : t -> unit + + val is_closed : t -> bool end diff --git a/examples/async/async_get.ml b/examples/async/async_get.ml index da49267..7a0b537 100644 --- a/examples/async/async_get.ml +++ b/examples/async/async_get.ml @@ -13,11 +13,12 @@ let main port host () = let finished = Ivar.create () in let response_handler = Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished) in let headers = Headers.of_list [ "host", host ] in + let connection = Client.create_connection socket in let request_body = Client.request - ~error_handler + connection ~response_handler - socket + ~error_handler (Request.create ~headers `GET "/") in Body.close_writer request_body; diff --git a/examples/async/async_get_pipelined.ml b/examples/async/async_get_pipelined.ml new file mode 100644 index 0000000..5382a58 --- /dev/null +++ b/examples/async/async_get_pipelined.ml @@ -0,0 +1,53 @@ +open! Core +open Async + +open Httpaf +open Httpaf_async + +let error_handler _ = assert false + +let main port host () = + let where_to_connect = Tcp.Where_to_connect.of_host_and_port { host; port } in + Tcp.connect_sock where_to_connect + >>= fun socket -> + let finished = Ivar.create () in + let response_handler = Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished) in + let request_headers = + Request.create ~headers:(Headers.of_list [ "host", host ]) `GET "/" + in + let connection = Client.create_connection socket in + let request_body = + Client.request + connection + ~response_handler + ~error_handler + request_headers + in + let finished' = Ivar.create () in + let response_handler' = + Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished') + in + let request_body' = + Client.request + connection + ~response_handler:response_handler' + ~error_handler + request_headers + in + Body.close_writer request_body'; + Body.close_writer request_body; + Async.Deferred.all_unit [Ivar.read finished; Ivar.read finished'] >>| fun () -> + Client.shutdown connection +;; + +let () = + Command.async + ~summary:"Start a hello world Async client" + Command.Param.( + map (both + (flag "-p" (optional_with_default 80 int) + ~doc:"int destination port") + (anon ("host" %: string))) + ~f:(fun (port, host) -> + (fun () -> main port host ()))) + |> Command.run diff --git a/examples/async/async_post.ml b/examples/async/async_post.ml index c2e9702..2f34668 100644 --- a/examples/async/async_post.ml +++ b/examples/async/async_post.ml @@ -19,11 +19,12 @@ let main port host () = ; "host" , host ] in + let connection = Client.create_connection socket in let request_body = Client.request - ~error_handler + connection ~response_handler - socket + ~error_handler (Request.create ~headers `POST "/") in let stdin = Lazy.force Reader.stdin in diff --git a/examples/async/dune b/examples/async/dune index 4008a21..868edbe 100644 --- a/examples/async/dune +++ b/examples/async/dune @@ -1,7 +1,8 @@ (executables (libraries httpaf httpaf-async httpaf_examples async core) - (names async_echo_post async_get async_post)) + (names async_echo_post async_get async_get_pipelined async_post)) (alias (name examples) - (deps (glob_files *.exe))) + (deps + (glob_files *.exe))) diff --git a/examples/lwt/dune b/examples/lwt/dune index 1b84bf1..1118a01 100644 --- a/examples/lwt/dune +++ b/examples/lwt/dune @@ -1,6 +1,7 @@ (executables (libraries httpaf httpaf-lwt-unix httpaf_examples base stdio lwt lwt.unix) - (names lwt_get lwt_post lwt_echo_post lwt_https_get lwt_https_server)) + (names lwt_get lwt_get_pipelined lwt_post lwt_echo_post lwt_https_get + lwt_https_server)) (alias (name examples) diff --git a/examples/lwt/lwt_get.ml b/examples/lwt/lwt_get.ml index c109d51..7671c94 100644 --- a/examples/lwt/lwt_get.ml +++ b/examples/lwt/lwt_get.ml @@ -18,15 +18,31 @@ let main port host = Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished) in let headers = Headers.of_list [ "host", host ] in + Client.create_connection socket >>= fun connection -> let request_body = Client.request + connection + ~response_handler ~error_handler + (Request.create ~headers `GET "/") + in + Body.close_writer request_body; + finished >>= fun () -> + let finished, notify_finished = Lwt.wait () in + let response_handler = + Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished) + in + let headers = Headers.of_list [ "host", host ] in + let request_body = + Client.request + connection ~response_handler - socket + ~error_handler (Request.create ~headers `GET "/") in Body.close_writer request_body; - finished + finished >|= fun () -> + Client.shutdown connection ;; let () = diff --git a/examples/lwt/lwt_get_pipelined.ml b/examples/lwt/lwt_get_pipelined.ml new file mode 100644 index 0000000..d029a1e --- /dev/null +++ b/examples/lwt/lwt_get_pipelined.ml @@ -0,0 +1,61 @@ +open Base +open Lwt.Infix +module Arg = Caml.Arg + +open Httpaf +open Httpaf_lwt_unix + +let error_handler _ = assert false + +let main port host = + Lwt_unix.getaddrinfo host (Int.to_string port) [Unix.(AI_FAMILY PF_INET)] + >>= fun addresses -> + let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Lwt_unix.connect socket (List.hd_exn addresses).Unix.ai_addr + >>= fun () -> + let finished, notify_finished = Lwt.wait () in + let response_handler = + Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished) + in + let request_headers = + Request.create ~headers:(Headers.of_list [ "host", host ]) `GET "/" + in + Client.create_connection socket >>= fun connection -> + let request_body = + Client.request + connection + ~response_handler + ~error_handler + request_headers + in + let finished', notify_finished' = Lwt.wait () in + let response_handler' = + Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished') + in + let request_body' = + Client.request + connection + ~response_handler:response_handler' + ~error_handler + request_headers + in + Body.close_writer request_body'; + Body.close_writer request_body; + Lwt.join [finished; finished'] >|= fun () -> + Client.shutdown connection +;; + +let () = + let host = ref None in + let port = ref 80 in + Arg.parse + ["-p", Set_int port, " Port number (80 by default)"] + (fun host_argument -> host := Some host_argument) + "lwt_get.exe [-p N] HOST"; + let host = + match !host with + | None -> failwith "No hostname provided" + | Some host -> host + in + Lwt_main.run (main !port host) +;; diff --git a/examples/lwt/lwt_https_get.ml b/examples/lwt/lwt_https_get.ml index 6546f80..da257c9 100644 --- a/examples/lwt/lwt_https_get.ml +++ b/examples/lwt/lwt_https_get.ml @@ -18,12 +18,13 @@ let main port host = Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished) in let headers = Headers.of_list [ "host", host ] in - Client.TLS.request + Client.TLS.create_connection socket >>= fun connection -> + let request_body = Client.TLS.request + connection ~error_handler ~response_handler - socket (Request.create ~headers `GET "/") - >>= fun request_body -> + in Body.close_writer request_body; finished ;; diff --git a/examples/lwt/lwt_post.ml b/examples/lwt/lwt_post.ml index 91451ad..6f1af57 100644 --- a/examples/lwt/lwt_post.ml +++ b/examples/lwt/lwt_post.ml @@ -26,11 +26,12 @@ let main port host = ; "host" , host ] in + Client.create_connection socket >>= fun connection -> let request_body = Client.request - ~error_handler + connection ~response_handler - socket + ~error_handler (Request.create ~headers `POST "/") in Body.write_string request_body body; diff --git a/lib/body.ml b/lib/body.ml index 60d3fd3..ff4513e 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -59,6 +59,7 @@ let create buffer = let create_empty () = let t = create Bigstringaf.empty in + t.write_final_if_chunked <- false; Faraday.close t.faraday; t diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 502f729..d5f215e 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -1,5 +1,6 @@ (*---------------------------------------------------------------------------- Copyright (c) 2017-2019 Inhabited Type LLC. + Copyright (c) 2019 Antonio Nuno Monteiro. All rights reserved. @@ -34,176 +35,238 @@ module Reader = Parse.Reader module Writer = Serialize.Writer -module Oneshot = struct - type error = - [ `Malformed_response of string | `Invalid_response_body_length of Response.t | `Exn of exn ] - - type response_handler = Response.t -> [`read] Body.t -> unit - type error_handler = error -> unit - - type state = - | Awaiting_response - | Received_response of Response.t * [`read] Body.t - | Closed - - type t = - { request : Request.t - ; request_body : [ `write ] Body.t - ; error_handler : (error -> unit) - ; reader : Reader.response - ; writer : Writer.t - ; state : state ref - ; mutable error_code : [ `Ok | error ] - } - - let request ?(config=Config.default) request ~error_handler ~response_handler = - let state = ref Awaiting_response in - let request_method = request.Request.meth in - let handler response body = - state := Received_response(response, body); - response_handler response body - in - let request_body = Body.create (Bigstringaf.create config.request_body_buffer_size) in - let t = - { request - ; request_body - ; error_handler - ; error_code = `Ok - ; reader = Reader.response ~request_method handler - ; writer = Writer.create () - ; state } - in - Writer.write_request t.writer request; - request_body, t - ;; - - let flush_request_body t = - if Body.has_pending_output t.request_body - then - let encoding = - match Request.body_length t.request with - | `Fixed _ | `Chunked as encoding -> encoding - | `Error _ -> assert false (* XXX(seliopou): This needs to be handled properly *) - in - Body.transfer_to_writer_with_encoding t.request_body ~encoding t.writer - ;; - - let set_error_and_handle_without_shutdown t error = - t.state := Closed; - t.error_code <- (error :> [`Ok | error]); - t.error_handler error; - ;; - - let unexpected_eof t = - set_error_and_handle_without_shutdown t (`Malformed_response "unexpected eof"); - ;; - - let shutdown_reader t = - Reader.force_close t.reader; - begin match !(t.state) with - | Awaiting_response -> unexpected_eof t; - | Closed -> () - | Received_response(_, response_body) -> - Body.close_reader response_body; - Body.execute_read response_body; - end; - ;; - - let shutdown_writer t = - flush_request_body t; - Writer.close t.writer; - Body.close_writer t.request_body; - ;; - - let shutdown t = - shutdown_reader t; - shutdown_writer t; - ;; - - let set_error_and_handle t error = - Reader.force_close t.reader; - begin match !(t.state) with - | Closed -> () - | Awaiting_response -> - set_error_and_handle_without_shutdown t error; - | Received_response(_, response_body) -> - Body.close_reader response_body; - Body.execute_read response_body; - set_error_and_handle_without_shutdown t error; +type error = + [ `Malformed_response of string | `Invalid_response_body_length of Response.t | `Exn of exn ] + +type response_handler = Response.t -> [`read] Body.t -> unit +type error_handler = error -> unit + +type t = + { config : Config.t + ; reader : Reader.response + ; writer : Writer.t + ; request_queue : Respd.t Queue.t + (* invariant: If [request_queue] is not empty, then the head of the queue + has already written the request headers to the wire. *) + ; wakeup_writer : (unit -> unit) list ref + } + +let is_closed t = + Reader.is_closed t.reader && Writer.is_closed t.writer + +let is_waiting t = + not (is_closed t) && Queue.is_empty t.request_queue + +let is_active t = + not (Queue.is_empty t.request_queue) + +let current_respd_exn t = + Queue.peek t.request_queue + +let on_wakeup_writer t k = + if is_closed t + then failwith "on_wakeup_writer on closed conn" + else t.wakeup_writer := k::!(t.wakeup_writer) + +let _wakeup_writer callbacks = + let fs = !callbacks in + callbacks := []; + List.iter (fun f -> f ()) fs + +let wakeup_writer t = + _wakeup_writer t.wakeup_writer + +let[@ocaml.warning "-16"] create ?(config=Config.default) = + let request_queue = Queue.create () in + { config + ; reader = Reader.response request_queue + ; writer = Writer.create () + ; request_queue + ; wakeup_writer = ref [] + } + +let request t request ~error_handler ~response_handler = + let request_body = + Body.create (Bigstringaf.create t.config.request_body_buffer_size) + in + let respd = + Respd.create error_handler request request_body t.writer response_handler in + let handle_now = Queue.is_empty t.request_queue in + Queue.push respd t.request_queue; + if handle_now then + Respd.write_request respd; + (* Not handling the request now means it may be pipelined. + * `advance_request_queue_if_necessary` will take care of it, but we still + * wanna wake up the writer so that the function gets called. *) + _wakeup_writer t.wakeup_writer; + request_body +;; + +let flush_request_body t = + if is_active t then begin + let respd = current_respd_exn t in + Respd.flush_request_body respd + end + +let set_error_and_handle_without_shutdown t error = + if is_active t then begin + let respd = current_respd_exn t in + Respd.report_error respd error + end + (* TODO: not active?! can be because of a closed FD for example. *) +;; + +let unexpected_eof t = + set_error_and_handle_without_shutdown t (`Malformed_response "unexpected eof"); +;; + +let shutdown_reader t = + Reader.force_close t.reader; + if is_active t + then Respd.close_response_body (current_respd_exn t) + +let shutdown_writer t = + flush_request_body t; + Writer.close t.writer; + if is_active t then begin + let respd = current_respd_exn t in + Body.close_writer respd.request_body; + end +;; + +let shutdown t = + shutdown_reader t; + shutdown_writer t; +;; + +(* TODO: Need to check in the RFC if reporting an error, e.g. in a malformed + * response causes the whole connection to shutdown. *) +let set_error_and_handle t error = + shutdown t; + set_error_and_handle_without_shutdown t error; +;; + +let report_exn t exn = + set_error_and_handle t (`Exn exn) +;; + +exception Local + +let maybe_pipeline_queued_requests t = + (* Don't bother trying to pipeline if there aren't multiple requests in the + * queue. *) + if Queue.length t.request_queue > 1 then begin + match Queue.fold (fun prev respd -> + begin match prev with + | None -> () + | Some prev -> + if respd.Respd.state = Uninitialized && not (Respd.requires_output prev) + then Respd.write_request respd + else + (* bail early. If we can't pipeline this request, we can't write + * next ones either. *) + raise Local + end; + Some respd) + None + t.request_queue + with + | _ -> () + | exception Local -> () + end + +let advance_request_queue_if_necessary t = + if is_active t then begin + let respd = current_respd_exn t in + if Respd.persistent_connection respd then begin + if Respd.is_complete respd then begin + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) then begin + (* write request to the wire *) + let respd = current_respd_exn t in + Respd.write_request respd; + end; + wakeup_writer t; + end else if not (Respd.requires_output respd) then + (* From RFC7230ยง6.3.2: + * A client that supports persistent connections MAY "pipeline" its + * requests (i.e., send multiple requests without waiting for each + * response). *) + maybe_pipeline_queued_requests t + end else begin + ignore (Queue.take t.request_queue); + Queue.iter Respd.close_response_body t.request_queue; + Queue.clear t.request_queue; + Queue.push respd t.request_queue; + wakeup_writer t; + if Respd.is_complete respd + then shutdown t + else if not (Respd.requires_output respd) + then shutdown_writer t end - ;; - - let report_exn t exn = - set_error_and_handle t (`Exn exn) - ;; - - let flush_response_body t = - match !(t.state) with - | Awaiting_response | Closed -> () - | Received_response(_, response_body) -> - try Body.execute_read response_body - with exn -> report_exn t exn - ;; - - let _next_read_operation t = - match !(t.state) with - | Awaiting_response | Closed -> Reader.next t.reader - | Received_response(_, response_body) -> - if not (Body.is_closed response_body) - then Reader.next t.reader - else begin - Reader.force_close t.reader; - Reader.next t.reader - end - ;; - - let next_read_operation t = - match _next_read_operation t with - | `Error (`Parse(marks, message)) -> - let message = String.concat "" [ String.concat ">" marks; ": "; message] in - set_error_and_handle t (`Malformed_response message); - `Close - | `Error (`Invalid_response_body_length _ as error) -> - set_error_and_handle t error; - `Close - | (`Read | `Close) as operation -> operation - ;; - - let read_with_more t bs ~off ~len more = - let consumed = Reader.read_with_more t.reader bs ~off ~len more in - flush_response_body t; - consumed - ;; - - let read t bs ~off ~len = - read_with_more t bs ~off ~len Incomplete - - let read_eof t bs ~off ~len = - let bytes_read = read_with_more t bs ~off ~len Complete in - begin match !(t.state) with + end else if Reader.is_closed t.reader + then shutdown t + +let next_read_operation t = + advance_request_queue_if_necessary t; + match Reader.next t.reader with + | `Error (`Parse(marks, message)) -> + let message = String.concat "" [ String.concat ">" marks; ": "; message] in + set_error_and_handle t (`Malformed_response message); + `Close + | `Error (`Invalid_response_body_length _ as error) -> + set_error_and_handle t error; + `Close + | (`Read | `Close) as operation -> operation +;; + +let read_with_more t bs ~off ~len more = + let consumed = Reader.read_with_more t.reader bs ~off ~len more in + if is_active t then + Respd.flush_response_body (current_respd_exn t); + consumed +;; + +let read t bs ~off ~len = + read_with_more t bs ~off ~len Incomplete + +let read_eof t bs ~off ~len = + let bytes_read = read_with_more t bs ~off ~len Complete in + if is_active t then begin + let respd = current_respd_exn t in + (* TODO: could just check for `Respd.requires_input`? *) + match respd.state with + | Uninitialized -> assert false | Received_response _ | Closed -> () - | Awaiting_response -> unexpected_eof t; - end; - bytes_read - ;; - - let next_write_operation t = - flush_request_body t; - if Body.is_closed t.request_body - then Writer.close t.writer; - Writer.next t.writer - ;; - - let yield_writer t k = - if Body.is_closed t.request_body - then begin + | Awaiting_response -> + (* TODO: review this. It makes sense to tear down the connection if an + * unexpected EOF is received. *) + shutdown t; + unexpected_eof t + end; + bytes_read +;; + +let next_write_operation t = + advance_request_queue_if_necessary t; + flush_request_body t; + Writer.next t.writer +;; + +let yield_writer t k = + if is_active t then begin + let respd = current_respd_exn t in + if Respd.requires_output respd then + Respd.on_more_output_available respd k + else if Respd.persistent_connection respd then + on_wakeup_writer t k + else begin + (* TODO: call shutdown? *) Writer.close t.writer; k () - end else - Body.when_ready_to_write t.request_body k - - let report_write_result t result = - Writer.report_result t.writer result + end + end else + on_wakeup_writer t k - let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer -end +let report_write_result t result = + Writer.report_result t.writer result diff --git a/lib/httpaf.ml b/lib/httpaf.ml index 6d16dab..da47492 100644 --- a/lib/httpaf.ml +++ b/lib/httpaf.ml @@ -10,7 +10,7 @@ module Body = Body module Config = Config module Server_connection = Server_connection -module Client_connection = Client_connection.Oneshot +module Client_connection = Client_connection module Httpaf_private = struct module Parse = Parse diff --git a/lib/httpaf.mli b/lib/httpaf.mli index b2c341f..3f15231 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -1,5 +1,6 @@ (*---------------------------------------------------------------------------- Copyright (c) 2017 Inhabited Type LLC. + Copyright (c) 2019 Antonio Nuno Monteiro. All rights reserved. @@ -760,12 +761,14 @@ module Client_connection : sig type error_handler = error -> unit + val create : ?config:Config.t -> t + val request - : ?config:Config.t + : t -> Request.t -> error_handler:error_handler -> response_handler:response_handler - -> [`write] Body.t * t + -> [`write] Body.t val next_read_operation : t -> [ `Read | `Close ] (** [next_read_operation t] returns a value describing the next operation @@ -817,10 +820,15 @@ module Client_connection : sig may call its error handler before terminating the connection. *) val is_closed : t -> bool + (** [is_closed t] is [true] if both the read and write processors have been + shutdown. When this is the case {!next_read_operation} will return + [`Close _] and {!next_write_operation} will return [`Write _] until all + buffered output has been flushed, at which point it will also return + `Close. *) - (**/**) val shutdown : t -> unit - (**/**) + (** [shutdown connection] closes the underlying input and output channels of + the connection, rendering it unusable for any further communication. *) end (**/**) diff --git a/lib/parse.ml b/lib/parse.ml index cf9d0a2..2f870a3 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -252,19 +252,30 @@ module Reader = struct in create parser - let response ~request_method handler = + let response request_queue = let parser = response <* commit >>= fun response -> + assert (not (Queue.is_empty request_queue)); + let exception Local of Respd.t in + let respd = match + (Queue.iter (fun respd -> + if respd.Respd.state = Awaiting_response then + raise (Local respd)) request_queue) + with + | exception Local respd -> respd + | _ -> assert false + in + let request = Respd.request respd in let proxy = false in - match Response.body_length ~request_method response with + match Response.body_length ~request_method:request.meth response with | `Error `Bad_gateway -> assert (not proxy); assert false | `Error `Internal_server_error -> return (Error (`Invalid_response_body_length response)) | `Fixed 0L -> - handler response Body.empty; + respd.response_handler response Body.empty; ok | `Fixed _ | `Chunked | `Close_delimited as encoding -> let response_body = Body.create Bigstringaf.empty in - handler response response_body; + respd.response_handler response response_body; body ~encoding response_body *> ok in create parser diff --git a/lib/respd.ml b/lib/respd.ml new file mode 100644 index 0000000..43e0f9d --- /dev/null +++ b/lib/respd.ml @@ -0,0 +1,118 @@ +module Writer = Serialize.Writer + +type error = + [ `Malformed_response of string + | `Invalid_response_body_length of Response.t + | `Exn of exn ] + +type state = + | Uninitialized + | Awaiting_response + | Received_response of Response.t * [`read] Body.t + | Closed + +type t = + { request : Request.t + ; request_body : [ `write ] Body.t + ; response_handler : (Response.t -> [`read] Body.t -> unit) + ; error_handler : (error -> unit) + ; mutable error_code : [ `Ok | error ] + ; writer : Writer.t + ; mutable state : state + ; mutable persistent : bool + } + +let create error_handler request request_body writer response_handler = + let rec handler response body = + let t = Lazy.force t in + if t.persistent then + t.persistent <- Response.persistent_connection response; + t.state <- Received_response(response, body); + response_handler response body + and t = + lazy + { request + ; request_body + ; response_handler = handler + ; error_handler + ; error_code = `Ok + ; writer + ; state = Uninitialized + ; persistent = Request.persistent_connection request + } + in + Lazy.force t + +let request { request; _ } = request + +let request_body { request_body; _ } = request_body + +let write_request t = + Writer.write_request t.writer t.request; + t.state <- Awaiting_response + +let on_more_output_available { request_body; _ } f = + Body.when_ready_to_write request_body f + +let report_error t error = + (* t.persistent <- false; *) + (* TODO: drain queue? *) + match t.state, t.error_code with + | (Uninitialized | Awaiting_response | Received_response _), `Ok -> + t.state <- Closed; + t.error_code <- (error :> [`Ok | error]); + t.error_handler error + | Uninitialized, `Exn _ -> + (* TODO(anmonteiro): Not entirely sure this is possible in the client. *) + failwith "httpaf.Reqd.report_exn: NYI" + | (Uninitialized | Awaiting_response | Received_response _ | Closed), _ -> + (* XXX(seliopou): Once additional logging support is added, log the error + * in case it is not spurious. *) + () + +let persistent_connection t = + t.persistent + +let close_response_body t = + match t.state with + | Uninitialized + | Awaiting_response + | Closed -> () + | Received_response (_, response_body) -> + Body.close_reader response_body + +let requires_input t = + match t.state with + | Uninitialized -> true + | Awaiting_response -> true + | Received_response (_, response_body) -> + not (Body.is_closed response_body) + | Closed -> false + +let requires_output { request_body; state; _ } = + state = Uninitialized || + not (Body.is_closed request_body) || + Body.has_pending_output request_body + +let is_complete t = + not (requires_input t || requires_output t) + +let flush_request_body { request; request_body; writer; _ } = + if Body.has_pending_output request_body + then + let encoding = + match Request.body_length request with + | `Fixed _ | `Chunked as encoding -> encoding + | `Error _ -> assert false (* XXX(seliopou): This needs to be handled properly *) + in + Body.transfer_to_writer_with_encoding request_body ~encoding writer + +let flush_response_body t = + match t.state with + | Uninitialized | Awaiting_response | Closed -> () + | Received_response(_, response_body) -> + try Body.execute_read response_body + (* TODO: report_exn *) + with _exn -> + Format.eprintf "EXN@." + (* report_exn t exn *) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 3ab5944..e51cf7e 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -32,17 +32,6 @@ ----------------------------------------------------------------------------*) -module Queue = struct - include Queue - - let peek_exn = peek - - let peek t = - if is_empty t - then None - else Some (peek_exn t) -end - module Reader = Parse.Reader module Writer = Serialize.Writer @@ -82,7 +71,7 @@ let is_active t = not (Queue.is_empty t.request_queue) let current_reqd_exn t = - Queue.peek_exn t.request_queue + Queue.peek t.request_queue let yield_reader t k = if is_closed t diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index f170e1b..65cdf88 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -72,30 +72,53 @@ let test_get () = let response = Response.create `OK in (* Single GET *) - let body, t = + let t = create ?config:None in + let body = request + t request' ~response_handler:(default_response_handler response) ~error_handler:no_error_handler in Body.close_writer body; - write_request t request'; - writer_closed t; - read_response t response; + write_request t request'; + read_response t response; + + (* Single GET, request closes the connection. *) + let request_close = + Request.create + ~headers:(Headers.of_list ["connection", "close"]) + `GET "/" + in + let t = create ?config:None in + let body = + request + t + request_close + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + Body.close_writer body; + write_request t request_close; + writer_closed t; + read_response t response; (* Single GET, response closes connection *) let response = Response.create `OK ~headers:(Headers.of_list [ "connection", "close" ]) in - let body, t = + let t = create ?config:None in + let body = request + t request' ~response_handler:(default_response_handler response) ~error_handler:no_error_handler in Body.close_writer body; - write_request t request'; - read_response t response; + write_request t request'; + read_response t response; + writer_closed t; let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in Alcotest.(check int) "read_eof with no input returns 0" 0 c; connection_is_shutdown t; @@ -104,16 +127,56 @@ let test_get () = let response = Response.create `OK ~headers:(Headers.of_list [ "transfer-encoding", "chunked" ]) in - let body, t = + let t = create ?config:None in + let body = request + t request' ~response_handler:(default_response_handler response) ~error_handler:no_error_handler in Body.close_writer body; - write_request t request'; - read_response t response; - read_string t "d\r\nHello, world!\r\n0\r\n\r\n"; + write_request t request'; + read_response t response; + read_string t "d\r\nHello, world!\r\n0\r\n\r\n"; +;; + +let test_get_last_close () = + (* Multiple GET requests, the last one closes the connection *) + let request' = Request.create `GET "/" in + let response = + Response.create ~headers:(Headers.of_list ["content-length", "0"]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + Body.close_writer body; + write_request t request'; + read_response t response; + + let request'' = + Request.create ~headers:(Headers.of_list ["connection", "close"]) `GET "/" + in + let body' = + request + t + request'' + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + Body.close_writer body'; + write_request t request''; + read_response t response; + + writer_closed t; + let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in + Alcotest.(check int) "read_eof with no input returns 0" 0 c; + connection_is_shutdown t; ;; let test_response_eof () = @@ -121,8 +184,10 @@ let test_response_eof () = let response = Response.create `OK in (* not actually writen to the channel *) let error_message = ref None in - let body, t = + let t = create ?config:None in + let body = request + t request' ~response_handler:(default_response_handler response) ~error_handler:(function @@ -130,8 +195,7 @@ let test_response_eof () = | _ -> assert false) in Body.close_writer body; - write_request t request'; - writer_closed t; + write_request t request'; reader_ready t; let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in Alcotest.(check int) "read_eof with no input returns 0" 0 c; @@ -141,6 +205,138 @@ let test_response_eof () = !error_message ;; +let test_persistent_connection_requests () = + let request' = Request.create `GET "/" in + let response = + Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + Body.close_writer body; + write_request t request'; + read_response t response; + writer_yielded t; + reader_ready t; + let body' = + request + t + request' + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + Body.close_writer body'; + write_request t request'; + read_response t response; +;; + +let test_persistent_connection_requests_pipelining () = + let request' = Request.create `GET "/" in + let response = + Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + Body.close_writer body; + write_request t request'; + (* send the 2nd request without reading the response *) + let response' = + Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `Not_found + in + let body' = + request + t + request' + ~response_handler:(fun response body -> + (default_response_handler response' response body)) + ~error_handler:no_error_handler + in + Body.close_writer body'; + write_request t request'; + read_response t response; + read_response t response'; +;; + +let test_persistent_connection_requests_pipelining_send_body () = + let request' = + Request.create ~headers:(Headers.of_list [ "content-length", "8" ]) `GET "/" + in + let response = + Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + write_request t request'; + (* send the 2nd request without reading the response *) + let request'' = Request.create `GET "/" in + let response' = + Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `Not_found + in + let body' = + request + t + request'' + ~response_handler:(fun response body -> + (default_response_handler response' response body)) + ~error_handler:no_error_handler + in + Body.close_writer body'; + Body.write_string body "a string"; + Body.close_writer body; + write_string ~msg:"writes the body for the first request" t "a string"; + write_request t request''; + read_response t response; + read_response t response'; +;; + +let test_persistent_connection_requests_body () = + let request' = Request.create `GET "/" in + let request'' = Request.create `GET "/second" in + let response = + Response.create ~headers:(Headers.of_list [ "content-length", "10" ]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(default_response_handler response) + ~error_handler:no_error_handler + in + Body.close_writer body; + write_request t request'; + let response' = Response.create `OK in + read_response t response; + read_string t "ten chars."; + let body' = + request + t + request'' + ~response_handler:(default_response_handler response') + ~error_handler:no_error_handler + in + Body.close_writer body'; + write_request t request''; + read_response t response'; +;; + let test_response_header_order () = let request' = Request.create `GET "/" in let headers = @@ -151,15 +347,17 @@ let test_response_header_order () = in let response = Response.create `OK ~headers:(Headers.of_list headers) in let received = ref None in - let body, t = + let t = create ?config:None in + let body = request + t request' ~response_handler:(fun response _ -> received := Some response) ~error_handler:no_error_handler in Body.close_writer body; write_request t request'; - writer_closed t; + writer_yielded t; read_response t response; match !received with | None -> assert false @@ -173,8 +371,10 @@ let test_report_exn () = let response = Response.create `OK in (* not actually writen to the channel *) let error_message = ref None in - let body, t = + let t = create ?config:None in + let body = request + t request' ~response_handler:(default_response_handler response) ~error_handler:(function @@ -183,7 +383,7 @@ let test_report_exn () = in Body.close_writer body; write_request t request'; - writer_closed t; + writer_yielded t; reader_ready t; report_exn t (Failure "something went wrong"); connection_is_shutdown t; @@ -197,8 +397,10 @@ let test_input_shrunk () = let response = Response.create `OK in (* not actually writen to the channel *) let error_message = ref None in - let body, t = + let t = create ?config:None in + let body = request + t request' ~response_handler:(default_response_handler response) ~error_handler:(function @@ -207,7 +409,7 @@ let test_input_shrunk () = in Body.close_writer body; write_request t request'; - writer_closed t; + writer_yielded t; reader_ready t; let c = feed_string t "HTTP/1.1 200 OK\r\nDate" in Alcotest.(check int) "read the status line" c 17; @@ -224,4 +426,15 @@ let tests = ; "Response header order preserved", `Quick, test_response_header_order ; "report_exn" , `Quick, test_report_exn ; "input_shrunk", `Quick, test_input_shrunk + ; "multiple GET, last request closes connection", `Quick, test_get_last_close + ; "Persistent connection, multiple GETs", `Quick, test_persistent_connection_requests + ; "Persistent connection, request pipelining", `Quick, test_persistent_connection_requests_pipelining + ; "Persistent connection, first request includes body", `Quick, test_persistent_connection_requests_pipelining_send_body + ; "Persistent connections, read response body", `Quick, test_persistent_connection_requests_body ] + +(* + * TODO: + * - test client connection error handling + * + *) diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index 28d0547..4b84164 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -136,16 +136,22 @@ module Client = struct module TLS = struct include Httpaf_lwt.Client (Tls_io.Io) - let request ?client ?(config=Config.default) socket request_headers ~error_handler ~response_handler = - Tls_io.make_client ?client socket >|= fun tls_client -> - request ~config (socket, tls_client) request_headers ~error_handler ~response_handler + let create_connection ?client ?(config = Config.default) = + let make_tls_client = Tls_io.make_client ?client in + fun socket -> + make_tls_client socket >>= fun tls_client -> + create_connection + ~config + (socket, tls_client) end module SSL = struct include Httpaf_lwt.Client (Ssl_io.Io) - let request ?client ?(config=Config.default) socket request_headers ~error_handler ~response_handler = - Ssl_io.make_client ?client socket >|= fun ssl_client -> - request ~config ssl_client request_headers ~error_handler ~response_handler + let create_connection ?client ?(config = Config.default) = + let make_ssl_client = Ssl_io.make_client ?client in + fun socket -> + make_ssl_client socket >>= fun ssl_client -> + create_connection ~config ssl_client end end diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli index f08c718..e48ffbb 100644 --- a/lwt-unix/httpaf_lwt_unix.mli +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -77,33 +77,63 @@ end (* For an example, see [examples/lwt_get.ml]. *) module Client : sig - val request - : ?config : Config.t + type t + + val create_connection + : ?config:Config.t -> Lwt_unix.file_descr + -> t Lwt.t + + val request + : t -> Request.t -> error_handler : Client_connection.error_handler -> response_handler : Client_connection.response_handler -> [`write] Body.t + val shutdown : t -> unit + + val is_closed : t -> bool + module TLS : sig - val request + type t + + val create_connection : ?client : Tls_io.client -> ?config : Config.t -> Lwt_unix.file_descr + -> t Lwt.t + + val request + : t -> Request.t -> error_handler : Client_connection.error_handler -> response_handler : Client_connection.response_handler - -> [`write] Body.t Lwt.t + -> [`write] Body.t + + val shutdown : t -> unit + + val is_closed : t -> bool end module SSL : sig - val request + type t + + val create_connection : ?client : Ssl_io.client -> ?config : Config.t -> Lwt_unix.file_descr + -> t Lwt.t + + val request + : t -> Request.t -> error_handler : Client_connection.error_handler -> response_handler : Client_connection.response_handler - -> [`write] Body.t Lwt.t + -> [`write] Body.t + + val shutdown : t -> unit + + val is_closed : t -> bool end end diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml index 75ee1b0..841fe92 100644 --- a/lwt/httpaf_lwt.ml +++ b/lwt/httpaf_lwt.ml @@ -199,10 +199,13 @@ end module Client (Io: IO) = struct - let request ?(config=Config.default) socket request ~error_handler ~response_handler = - let module Client_connection = Httpaf.Client_connection in - let request_body, connection = - Client_connection.request ~config request ~error_handler ~response_handler in + module Client_connection = Httpaf.Client_connection + + type t = Client_connection.t + + let create_connection ?(config=Config.default) socket = + let connection = + Client_connection.create ~config in let read_buffer = Buffer.create config.read_buffer_size in @@ -276,5 +279,11 @@ module Client (Io: IO) = struct Lwt.join [read_loop_exited; write_loop_exited] >>= fun () -> Io.close socket); - request_body + Lwt.return connection + + let request = Client_connection.request + + let shutdown = Client_connection.shutdown + + let is_closed = Client_connection.is_closed end diff --git a/lwt/httpaf_lwt.mli b/lwt/httpaf_lwt.mli index 7ad52e7..2fe00b9 100644 --- a/lwt/httpaf_lwt.mli +++ b/lwt/httpaf_lwt.mli @@ -74,11 +74,21 @@ end (* For an example, see [examples/lwt_get.ml]. *) module Client (Io: IO) : sig - val request - : ?config : Httpaf.Config.t + type t + + val create_connection + : ?config : Config.t -> Io.socket + -> t Lwt.t + + val request + : t -> Request.t -> error_handler : Client_connection.error_handler -> response_handler : Client_connection.response_handler - -> [`write] Httpaf.Body.t + -> [`write] Body.t + + val shutdown: t -> unit + + val is_closed : t -> bool end diff --git a/mirage/httpaf_mirage.mli b/mirage/httpaf_mirage.mli index e038d6d..70cdfaa 100644 --- a/mirage/httpaf_mirage.mli +++ b/mirage/httpaf_mirage.mli @@ -32,13 +32,15 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Httpaf + module type Server_intf = sig type flow val create_connection_handler - : ?config : Httpaf.Config.t - -> request_handler : Httpaf.Server_connection.request_handler - -> error_handler : Httpaf.Server_connection.error_handler + : ?config : Config.t + -> request_handler : Server_connection.request_handler + -> error_handler : Server_connection.error_handler -> (flow -> unit Lwt.t) end @@ -56,11 +58,21 @@ module Server_with_conduit : sig end module Client (Flow : Mirage_flow_lwt.S) : sig - val request - : ?config : Httpaf.Config.t + type t + + val create_connection + : ?config : Config.t -> Flow.flow - -> Httpaf.Request.t - -> error_handler : Httpaf.Client_connection.error_handler - -> response_handler : Httpaf.Client_connection.response_handler - -> [`write] Httpaf.Body.t + -> t Lwt.t + + val request + : t + -> Request.t + -> error_handler : Client_connection.error_handler + -> response_handler : Client_connection.response_handler + -> [`write] Body.t + + val shutdown : t -> unit + + val is_closed : t -> bool end