diff --git a/.travis.yml b/.travis.yml index b45dda65..b66e4d52 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,13 +6,17 @@ install: wget https://raw.githubusercontent.com/ocaml/ocaml-travisci-skeleton/ma script: bash -ex ./.travis-docker.sh env: global: - - PINS="httpaf-async:. httpaf:." + - PINS="httpaf-async:. httpaf-lwt-unix:. httpaf:." matrix: - - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" - - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" - - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" - - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" - - PACKAGE="httpaf" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0" - - PACKAGE="httpaf-async" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0" - - PACKAGE="httpaf" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" - - PACKAGE="httpaf-async" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" + - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" + - PACKAGE="httpaf-lwt-unix" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" + - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" + - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" + - PACKAGE="httpaf-lwt-unix" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" + - PACKAGE="httpaf" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf-async" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf-lwt-unix" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf-async" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf-lwt-unix" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" diff --git a/benchmarks/jbuild b/benchmarks/jbuild index a5d21386..9db49969 100644 --- a/benchmarks/jbuild +++ b/benchmarks/jbuild @@ -4,3 +4,8 @@ ((libraries (httpaf httpaf-async async core)) (modules (wrk_async_benchmark)) (names (wrk_async_benchmark)))) + +(executables + ((libraries (httpaf httpaf-lwt-unix lwt lwt.unix)) + (modules (wrk_lwt_benchmark)) + (names (wrk_lwt_benchmark)))) diff --git a/benchmarks/wrk_lwt_benchmark.ml b/benchmarks/wrk_lwt_benchmark.ml new file mode 100644 index 00000000..6468ef8e --- /dev/null +++ b/benchmarks/wrk_lwt_benchmark.ml @@ -0,0 +1,64 @@ +open Lwt.Infix +open Httpaf + +let text = "CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of sitting by her sister on the bank, and of having nothing to do: once or twice she had peeped into the book her sister was reading, but it had no pictures or conversations in it, thought Alice So she was considering in her own mind (as well as she could, for the hot day made her feel very sleepy and stupid), whether the pleasure of making a daisy-chain would be worth the trouble of getting up and picking the daisies, when suddenly a White Rabbit with pink eyes ran close by her. There was nothing so very remarkable in that; nor did Alice think it so very much out of the way to hear the Rabbit say to itself, (when she thought it over afterwards, it occurred to her that she ought to have wondered at this, but at the time it all seemed quite natural); but when the Rabbit actually took a watch out of its waistcoat-pocket, and looked at it, and then hurried on, Alice started to her feet, for it flashed across her mind that she had never before seen a rabbit with either a waistcoat-pocket, or a watch to take out of it, and burning with curiosity, she ran across the field after it, and fortunately was just in time to see it pop down a large rabbit-hole under the hedge. In another moment down went Alice after it, never once considering how in the world she was to get out again. The rabbit-hole went straight on like a tunnel for some way, and then dipped suddenly down, so suddenly that Alice had not a moment to think about stopping herself before she found herself falling down a very deep well. Either the well was very deep, or she fell very slowly, for she had plenty of time as she went down to look about her and to wonder what was going to happen next. First, she tried to look down and make out what she was coming to, but it was too dark to see anything; then she looked at the sides of the well, and noticed that they were filled with cupboards......" + +let text = Bigstring.of_string text + +let headers = Headers.of_list ["content-length", string_of_int (Bigstring.length text)] +let error_handler _ ?request:_ error start_response = + let response_body = start_response Headers.empty in + begin match error with + | `Exn exn -> + Body.write_string response_body (Printexc.to_string exn); + Body.write_string response_body "\n"; + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error) + end; + Body.close_writer response_body +;; + +let request_handler _ reqd = + let {Request.target; _} = Reqd.request reqd in + let request_body = Reqd.request_body reqd in + Body.close_reader request_body; + match target with + | "/" -> Reqd.respond_with_bigstring reqd (Response.create ~headers `OK) text; + | _ -> Reqd.respond_with_string reqd (Response.create `Not_found) "Route not found" + +let main port max_accepts_per_batch = + let conn_count = ref 0 in + let sock = Lwt_unix.socket Lwt_unix.PF_INET Lwt_unix.SOCK_STREAM 0 in + let sockaddr = Lwt_unix.ADDR_INET (Unix.inet_addr_loopback, port) in + Lwt_unix.bind sock sockaddr >>= fun () -> + Lwt_unix.listen sock 11_000; + let h = Httpaf_lwt_unix.Server.create_connection_handler ~error_handler ~request_handler in + + let rec monitor () = + Lwt_unix.sleep 0.5 >>= fun () -> + Lwt_io.printlf "conns: %d" !conn_count >>= fun () -> + monitor () in + Lwt.async monitor; + + let rec serve () = + Lwt_unix.accept_n sock max_accepts_per_batch >>= fun (accepts, exn) -> + begin match exn with + | None -> () + | Some exn -> prerr_endline ("Accept failed: " ^ Printexc.to_string exn) + end; + conn_count := !conn_count + List.length accepts; + accepts |> List.iter begin fun (sa, fd) -> + Lwt.async (fun () -> h fd sa >|= fun () -> decr conn_count) + end; + serve () in + serve () + +let () = + let port = ref 8080 in + let max_accepts_per_batch = ref 1 in + Arg.parse + [ "-p", Arg.Set_int port, "int Source port to listen on"; + "-a", Arg.Set_int max_accepts_per_batch, "int Maximum accepts per batch" ] + (fun _ -> raise (Arg.Bad "positional arg")) + "Start a hello world Lwt server"; + Lwt_main.run (main !port !max_accepts_per_batch) diff --git a/examples/jbuild b/examples/jbuild index 0028c1fc..af518fb3 100644 --- a/examples/jbuild +++ b/examples/jbuild @@ -2,3 +2,8 @@ ((libraries (httpaf httpaf-async async core)) (modules (async_echo_post async_get async_post)) (names (async_echo_post async_get async_post)))) + +(executables + ((libraries (httpaf httpaf-lwt-unix lwt lwt.unix)) + (modules (lwt_unix_echo_post lwt_unix_get)) + (names (lwt_unix_echo_post lwt_unix_get)))) diff --git a/examples/lwt_unix_echo_post.ml b/examples/lwt_unix_echo_post.ml new file mode 100644 index 00000000..e4d3d9cd --- /dev/null +++ b/examples/lwt_unix_echo_post.ml @@ -0,0 +1,68 @@ +open Httpaf +open Httpaf_lwt_unix +open Lwt.Infix + +let error_handler _ ?request:_ error start_response = + let response_body = start_response Headers.empty in + match error with + | `Exn exn -> + Body.write_string response_body (Printexc.to_string exn); + Body.write_string response_body "\n" + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error); + Body.close_writer response_body + +let request_handler _ reqd = + match Reqd.request reqd with + | {Request.meth = `POST; headers; _} -> + let response = + let content_type = + match Headers.get headers "content-type" with + | None -> "application/octet-stream" + | Some x -> x in + let headers = Headers.of_list [ + "content-type", content_type; + "connection", "close"; + ] in + Response.create ~headers `OK in + let request_body = Reqd.request_body reqd in + let response_body = Reqd.respond_with_streaming reqd response in + let + rec on_read buffer ~off ~len = + Body.write_bigstring response_body buffer ~off ~len; + Body.schedule_read request_body ~on_eof ~on_read + and on_eof () = + print_endline "eof"; + Body.close_writer response_body + in + Body.schedule_read (Reqd.request_body reqd) ~on_eof ~on_read + | _ -> + Reqd.respond_with_string reqd (Response.create `Method_not_allowed) "" + +let main port max_accepts_per_batch = + let sock = Lwt_unix.socket Lwt_unix.PF_INET Lwt_unix.SOCK_STREAM 0 in + let sockaddr = Lwt_unix.ADDR_INET (Unix.inet_addr_loopback, port) in + Lwt_unix.bind sock sockaddr >>= fun () -> + Lwt_unix.listen sock 10_000; + let h = Server.create_connection_handler ~error_handler ~request_handler in + let rec serve () = + Lwt_unix.accept_n sock max_accepts_per_batch >>= fun (accepts, exn) -> + begin match exn with + | None -> () + | Some exn -> prerr_endline ("Accept failed: " ^ Printexc.to_string exn) + end; + List.iter (fun (sa, fd) -> Lwt.async (fun () -> h fd sa)) accepts; + serve () in + serve () + +let () = + let port = ref 8080 in + let batch_capacity = ref 100 in + Arg.parse + ["-p", Arg.Set_int port, " Port number to listen on."; + "-a", Arg.Set_int batch_capacity, " Maximum number of accepts per batch."] + (fun _ -> + prerr_endline "No posititonal arguments accepted."; + exit 64) + "lwt_unix_echo_post [-p PORT] [-a N-ACCEPT-PER-BATCH]"; + Lwt_main.run (main !port !batch_capacity) diff --git a/examples/lwt_unix_get.ml b/examples/lwt_unix_get.ml new file mode 100644 index 00000000..f720b2bc --- /dev/null +++ b/examples/lwt_unix_get.ml @@ -0,0 +1,90 @@ +open Httpaf +open Lwt.Infix + +let string_of_pp f x = + let buf = Buffer.create 128 in + let ppf = Format.formatter_of_buffer buf in + f ppf x; + Format.pp_print_flush ppf (); + Buffer.contents buf + +let report_error = function + | `Malformed_response msg -> + Lwt_io.eprintl msg + | `Invalid_response_body_length _ -> + Lwt_io.eprintl "Invalid response body length." + | `Exn exn -> + Lwt_io.eprintl (Printexc.to_string exn) + +let response_handler finished response response_body = + Lwt.async @@ fun () -> + Lwt_io.printl (string_of_pp Response.pp_hum response) >|= fun () -> + let on_eof () = Lwt.wakeup_later finished (Ok ()) in + let rec on_read bs ~off ~len = + Lwt.async @@ fun () -> + Lwt_io.print (Bigstring.to_string ~off ~len bs) >|= fun () -> + Body.schedule_read response_body ~on_read ~on_eof in + Body.schedule_read response_body ~on_read ~on_eof + +let error_handler finished error = Lwt.wakeup_later finished (Error error) + +let main host port = + let host_entry = Unix.gethostbyname host in + let sock = Lwt_unix.socket host_entry.Unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in + let errors = ref [] in + let process () = + let request_promise, request_resolver = Lwt.wait () in + let headers = Headers.of_list [ + "Host", host; + ] in + let request_body = + Httpaf_lwt_unix.Client.request + ~error_handler:(error_handler request_resolver) + ~response_handler:(response_handler request_resolver) + sock + (Request.create ~headers `GET "/") in + Body.close_writer request_body; + request_promise in + let rec connect_loop i = + if i = Array.length host_entry.h_addr_list then begin + Lwt_io.eprintl "Address unreachable." >>= fun () -> + Lwt_list.iter_s + (fun (inet_addr, msg) -> + Lwt_io.eprintlf "%s: %s" + (Unix.string_of_inet_addr inet_addr) msg) + (List.rev !errors) >>= fun () -> + exit 69 + end else begin + let inet_addr = host_entry.h_addr_list.(i) in + let sockaddr = Lwt_unix.ADDR_INET (inet_addr, port) in + Lwt.catch + (fun () -> + Lwt_unix.connect sock sockaddr >>= fun () -> + process ()) + (function + | Unix.Unix_error (error, _, _) -> + errors := (inet_addr, Unix.error_message error) :: !errors; + connect_loop (i + 1) + | exn -> + Lwt.return_error (`Exn exn)) + end in + connect_loop 0 >>= + (function + | Ok () -> Lwt.return 0 + | Error error -> report_error error >|= fun () -> 69) + +let () = + let host = ref "" in + let port = ref 80 in + Arg.parse + ["-a", Arg.Set_string host, " Address."; + "-p", Arg.Set_int port, " Port number."] + (fun _ -> + prerr_endline "No positional arguments accepted."; + exit 64) + "lwt_unix_get -a ADDRESS [-p PORT]"; + if !host = "" then begin + prerr_endline "The -a option is mandatory unless you just ask for -help."; + exit 64 + end else + exit (Lwt_main.run (main !host !port)) diff --git a/httpaf-lwt-unix.opam b/httpaf-lwt-unix.opam new file mode 100644 index 00000000..dcc2f12c --- /dev/null +++ b/httpaf-lwt-unix.opam @@ -0,0 +1,23 @@ +opam-version: "1.2" +name: "httpaf-lwt-unix" +maintainer: "Spiros Eliopoulos " +authors: [ "Spiros Eliopoulos " ] +license: "BSD-3-clause" +homepage: "https://github.com/inhabitedtype/httpaf" +bug-reports: "https://github.com/inhabitedtype/httpaf/issues" +dev-repo: "https://github.com/inhabitedtype/httpaf.git" +build: [ + ["jbuilder" "subst"] {pinned} + ["jbuilder" "build" "-p" name "-j" jobs] +] +build-test: [ + ["jbuilder" "runtest" "-p" name] +] +depends: [ + "jbuilder" {build & >= "1.0+beta10"} + "angstrom-lwt-unix" + "faraday-lwt-unix" + "httpaf" + "lwt" +] +available: [ ocaml-version >= "4.03.0" ] diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml new file mode 100644 index 00000000..fce8a8f9 --- /dev/null +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -0,0 +1,148 @@ +(*---------------------------------------------------------------------------- + Copyright (c) 2018 Inhabited Type LLC. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of the author nor the names of his contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS + OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. + ----------------------------------------------------------------------------*) + +open Lwt.Infix +open Httpaf + +module Server = struct + let create_connection_handler ?config ~request_handler ~error_handler = + fun client_addr sock -> + + let conn = + Server_connection.create ?config + ~error_handler:(error_handler client_addr) + (request_handler client_addr) in + + let buffer = Lwt_bytes.create 0x1000 in (* TODO: Make configurable. *) + let rec reader_thread avail_off avail_end = + match Server_connection.next_read_operation conn with + | `Read -> + if avail_off < avail_end then + let len = avail_end - avail_off in + let consumed_len = + Server_connection.read conn buffer ~off:avail_off ~len in + reader_thread (avail_off + consumed_len) avail_end + else begin + Lwt_bytes.read sock buffer 0 (Lwt_bytes.length buffer) >>= fun len -> + if len = 0 then begin + Server_connection.shutdown_reader conn; + reader_thread 0 0 + end else begin + let consumed_len = Server_connection.read conn buffer ~off:0 ~len in + reader_thread consumed_len len + end + end + | `Yield -> + let ready_wait, ready_release = Lwt.wait () in + Server_connection.yield_reader conn (Lwt.wakeup_later ready_release); + ready_wait >>= fun () -> reader_thread avail_off avail_end + | `Close -> + Lwt.return_unit in + + let rec writer_thread () = + match Server_connection.next_write_operation conn with + | `Write iovecs -> + Faraday_lwt_unix.writev_of_fd sock iovecs >>= fun result -> + Server_connection.report_write_result conn result; + writer_thread () + | `Yield -> + let ready_wait, ready_release = Lwt.wait () in + Server_connection.yield_writer conn (Lwt.wakeup_later ready_release); + ready_wait >>= writer_thread + | `Close _ -> + Lwt_unix.shutdown sock Lwt_unix.SHUTDOWN_SEND; + Lwt.return_unit in + + Lwt.catch + (fun () -> Lwt.join [writer_thread (); reader_thread 0 0]) + (fun exn -> + Server_connection.report_exn conn exn; + Lwt.return_unit) >>= fun () -> + Lwt_unix.close sock + +end + +module Client = struct + + let request sock request ~error_handler ~response_handler = + + let request_body, conn = + Client_connection.request request ~error_handler ~response_handler in + + let buffer = Lwt_bytes.create 0x1000 in (* TODO: Make configurable. *) + let rec reader_thread avail_off avail_end = + match Client_connection.next_read_operation conn with + | `Read -> + if avail_off < avail_end then + let len = avail_end - avail_off in + let consumed_len = + Client_connection.read conn buffer ~off:avail_off ~len in + reader_thread (avail_off + consumed_len) avail_end + else begin + Lwt_bytes.read sock buffer 0 (Lwt_bytes.length buffer) >>= fun len -> + if len = 0 then begin + Client_connection.shutdown_reader conn; + reader_thread 0 0 + end else begin + let consumed_len = Client_connection.read conn buffer ~off:0 ~len in + reader_thread consumed_len len + end + end + | `Close -> + Lwt.return_unit in + + let rec writer_thread () = + match Client_connection.next_write_operation conn with + | `Write iovecs -> + Faraday_lwt_unix.writev_of_fd sock iovecs >>= fun result -> + Client_connection.report_write_result conn result; + writer_thread () + | `Yield -> + let ready_wait, ready_release = Lwt.wait () in + Client_connection.yield_writer conn (Lwt.wakeup_later ready_release); + ready_wait >>= writer_thread + | `Close _ -> + Lwt_unix.shutdown sock Lwt_unix.SHUTDOWN_SEND; + Lwt.return_unit in + + Lwt.async begin fun () -> + Lwt.catch + (fun () -> Lwt.join [writer_thread (); reader_thread 0 0]) + (fun exn -> + Client_connection.report_exn conn exn; + Lwt.return_unit) >>= fun () -> + Lwt_unix.close sock + end; + request_body + +end diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli new file mode 100644 index 00000000..6c6cc86e --- /dev/null +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -0,0 +1,22 @@ +open Httpaf + +module Server : sig + open Server_connection + + val create_connection_handler + : ?config : Config.t + -> request_handler : (Unix.sockaddr -> Lwt_unix.file_descr request_handler) + -> error_handler : (Unix.sockaddr -> error_handler) + -> Unix.sockaddr + -> Lwt_unix.file_descr + -> unit Lwt.t +end + +module Client : sig + val request + : Lwt_unix.file_descr + -> Request.t + -> error_handler : Client_connection.error_handler + -> response_handler : Client_connection.response_handler + -> [`write] Body.t +end diff --git a/lwt-unix/jbuild b/lwt-unix/jbuild new file mode 100644 index 00000000..b982b1fb --- /dev/null +++ b/lwt-unix/jbuild @@ -0,0 +1,12 @@ +(jbuild_version 1) + +(library + ((name httpaf_lwt_unix) + (public_name httpaf-lwt-unix) + (wrapped false) + (libraries + (angstrom angstrom-lwt-unix + faraday faraday-lwt faraday-lwt-unix + httpaf + lwt lwt.unix)) + (flags (:standard -safe-string))))