From d73822bc2442fb13143a9f433c36d25a3c2e165c Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sun, 12 Jan 2020 20:37:24 -0500 Subject: [PATCH 01/30] refactor request queue mechanics This is a prelude to #159 which introduces upgrade requests, with a few major changes in `Server_connection`. The goals here is to try to make queue management easier to reason about by folding bits of logic from `advance_request_queue_if_necessary` into `next_read_operation` and `next_write_operation` such that we only perform side-effects when the operation in question demands it. One of the ways I tried to make this easier to reason about was to make the `next__operation` functions very parallel. Getting the read operation starts out with a short-circuit for shutting down when the server can no longer make progress (reader is closed and queue is empty). This doesn't feel like it belongs here. Perhaps this check should be part of `advance_request_queue` with some extra logic triggering in `shutdown_reader`? After that, the next-operation functions use some very simple probing of the input/output state of `Reqd` to determine what to do next. Only in the case of `Complete` do we move into a separate function (to make it easier to read): `_final__operation`. In these functions, we decide if we should shutdown the respective reader/writer or consider the `reqd` complete and move it off the queue. What's happening is that we don't know if the write action or read action will be last, so each function checks the state of the other to see if they're both complete. When we do shift it off, we recursively ask for the next operation given the new queue state. In the case of the writer triggering the advancing, before we return the result, we wakeup the reader so that it can evaluate the next operation given the new queue state. Note that in the case of a non-persistent connection, the queue is never advanced and the connection is shut down when both sides are done. Though on the surface, these pieces feel fairly straightforward, there are still a slew of re-entrancy bugs to consider. I think there are two things that we can do to make this drastically easier to manage: 1. We call `t.request_handler` in two places, and this is mostly because we want to keep the invariant that the head of the request queue has already been passed off to the handler. I feel like splitting this up into a simple queue of unhandled requests and a [Reqd.t option] that represents the current request would be easier to manage. 2. It would be nice to schedule calls. Things like waking up the writer before you let the read loop know its next operation just immediately makes my mind fall apart and lose track of state. There's a fairly obvious solution of asking for a `schedule : (unit -> unit) -> unit` function from the runtime that promises to not call the thunk synchronously, but rather waits until it is outside of the read and write loops. But maybe we can solve it using what we have now, like establishing a contract that when the reader/writer is woken up, they must schedule their work for a fresh call stack and not immediately ask for operations. --- lib/reqd.ml | 9 ---- lib/server_connection.ml | 110 +++++++++++++++++++++++---------------- 2 files changed, 64 insertions(+), 55 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index 106c06f4..8277a637 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -243,15 +243,6 @@ let output_state t : Output_state.t = | Waiting -> Waiting ;; -let is_complete t = - match input_state t with - | Ready -> false - | Complete -> - (match output_state t with - | Waiting | Ready -> false - | Complete -> true) -;; - let flush_request_body t = let request_body = request_body t in if Body.has_pending_output request_body diff --git a/lib/server_connection.ml b/lib/server_connection.ml index bf358e42..959d542d 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -81,7 +81,7 @@ let current_reqd_exn t = let yield_reader t k = if is_closed t - then failwith "on_wakeup_reader on closed conn" + then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" else t.wakeup_reader <- Optional_thunk.some k @@ -155,6 +155,7 @@ let error_code t = else None let shutdown t = + Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -182,53 +183,44 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin - ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - (* Take the head of the queue, close the remaining request bodies, clear - * the queue, and push the head back on. We do not plan on processing any - * more requests after the current one. *) - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - if Reqd.is_complete reqd - then shutdown t - else - match Reqd.input_state reqd with - | Ready -> () - | Complete -> shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let advance_request_queue t = + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); +;; + +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with | Ready -> Reader.next t.reader - | Complete -> - if Reqd.persistent_connection reqd - then `Yield - else ( - shutdown_reader t; - Reader.next t.reader) + | Complete -> _final_read_operation_for t reqd ) - else Reader.next t.reader + +and _final_read_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Waiting | Ready -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) + in + wakeup_writer t; + next ;; let next_read_operation t = match _next_read_operation t with + (* XXX(dpatti): These two [`Error _] constructors are never returned *) | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation @@ -259,13 +251,39 @@ let read t bs ~off ~len = let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete -let next_write_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let rec _next_write_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( let reqd = current_reqd_exn t in - Reqd.flush_response_body reqd); - Writer.next t.writer + match Reqd.output_state reqd with + | Waiting -> `Yield + | Ready -> + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> _final_write_operation_for t reqd + ) + +and _final_write_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_writer t; + Writer.next t.writer; + ) else ( + match Reqd.input_state reqd with + | Ready -> assert false + | Complete -> + advance_request_queue t; + _next_write_operation t; + ) + in + wakeup_reader t; + next +;; + +let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result From 94e4480366e61f4d31c6749608f654681d490372 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sat, 16 May 2020 15:11:13 -0400 Subject: [PATCH 02/30] refactor-request-queue: read loop no longer needs to wake writer This is because the writer is always woken by the appropriate calls that push chunks onto the body or writer or calls that close the body. Had to import an additional line from a recent band-aid fix regarding setting the flag on non-chunked streaming responses. It feels like we should find an alternative means of maintaining this piece of information. --- lib/server_connection.ml | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 959d542d..db6f08ef 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -202,20 +202,16 @@ let rec _next_read_operation t = ) and _final_read_operation_for t reqd = - let next = - if not (Reqd.persistent_connection reqd) then ( - shutdown_reader t; - Reader.next t.reader; - ) else ( - match Reqd.output_state reqd with - | Waiting | Ready -> `Yield - | Complete -> - advance_request_queue t; - _next_read_operation t; - ) - in - wakeup_writer t; - next + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Waiting | Ready -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) ;; let next_read_operation t = @@ -259,7 +255,12 @@ let rec _next_write_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.output_state reqd with - | Waiting -> `Yield + | Waiting -> + (* XXX(dpatti): I don't think we should need to call this, but it is + necessary in the case of a streaming, non-chunked body so that you can + set the appropriate flag. *) + Reqd.flush_response_body reqd; + Writer.next t.writer | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer From cce55fd5e3b3d91e821ffddabbe9bf6e43f58dd2 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Fri, 2 Apr 2021 15:58:19 -0400 Subject: [PATCH 03/30] refactor-request-queue: fixes We basically never want to call `Queue.clear` because the head of the queue has special semantic meaning. Instead, we never try to clear the queue and rely on the fact that the queue will never be advanced. This is easy to reason about because the only time we advance the request queue is when the current request is not persistent. I added an explicit test of this situation to build confidence. Additionally, there was an incorrect assertion that you couldn't finish a write with reads still pending. A test was added upstream and it no longer fails with this fix. The final change was some subtle but unused code. In the write loop, we have something that decides to shutdown the connection if the reader is closed, parallel to the next read operation. But this felt weird, the reader should always be awake in the case that it is closed, which means that either 1) it will shutdown the connection or 2) it will wait for the writer, which will wake the reader once it's advanced the request queue, and then it will shutdown the connection. --- lib/server_connection.ml | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index db6f08ef..f81cd5bd 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -155,7 +155,6 @@ let error_code t = else None let shutdown t = - Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -190,7 +189,8 @@ let advance_request_queue t = ;; let rec _next_read_operation t = - if not (is_active t) then ( + if not (is_active t) + then ( if Reader.is_closed t.reader then shutdown t; Reader.next t.reader @@ -216,7 +216,6 @@ and _final_read_operation_for t reqd = let next_read_operation t = match _next_read_operation t with - (* XXX(dpatti): These two [`Error _] constructors are never returned *) | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation @@ -248,11 +247,9 @@ let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete let rec _next_write_operation t = - if not (is_active t) then ( - if Reader.is_closed t.reader - then shutdown t; - Writer.next t.writer - ) else ( + if not (is_active t) + then Writer.next t.writer + else ( let reqd = current_reqd_exn t in match Reqd.output_state reqd with | Waiting -> @@ -274,7 +271,7 @@ and _final_write_operation_for t reqd = Writer.next t.writer; ) else ( match Reqd.input_state reqd with - | Ready -> assert false + | Ready -> Writer.next t.writer; | Complete -> advance_request_queue t; _next_write_operation t; From 0bab909200786949136500293168ac22aa6161a5 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 26 Apr 2021 12:49:40 +0100 Subject: [PATCH 04/30] Implement HTTP upgrades. --- lib/httpaf.mli | 5 ++++- lib/reqd.ml | 34 ++++++++++++++++++++++++++++++---- lib/serialize.ml | 2 ++ lib/server_connection.ml | 11 ++++++++++- 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/lib/httpaf.mli b/lib/httpaf.mli index 6a529033..e6ab4e29 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -649,6 +649,8 @@ module Reqd : sig val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> [`write] Body.t + val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit + (** {3 Exception Handling} *) val report_exn : t -> exn -> unit @@ -690,7 +692,7 @@ module Server_connection : sig (** [create ?config ?error_handler ~request_handler] creates a connection handler that will service individual requests with [request_handler]. *) - val next_read_operation : t -> [ `Read | `Yield | `Close ] + val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ] (** [next_read_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) @@ -717,6 +719,7 @@ module Server_connection : sig val next_write_operation : t -> [ | `Write of Bigstringaf.t IOVec.t list | `Yield + | `Upgrade | `Close of int ] (** [next_write_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) diff --git a/lib/reqd.ml b/lib/reqd.ml index 8277a637..b8d915b3 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -37,6 +37,7 @@ type error = module Response_state = struct type t = | Waiting + | Upgrade of Response.t | Fixed of Response.t | Streaming of Response.t * [`write] Body.t end @@ -45,6 +46,7 @@ module Input_state = struct type t = | Ready | Complete + | Upgraded end module Output_state = struct @@ -52,6 +54,7 @@ module Output_state = struct | Waiting | Ready | Complete + | Upgraded end type error_handler = @@ -111,12 +114,14 @@ let response { response_state; _ } = match response_state with | Waiting -> None | Streaming (response, _) + | Upgrade response | Fixed response -> Some response let response_exn { response_state; _ } = match response_state with | Waiting -> failwith "httpaf.Reqd.response_exn: response has not started" | Streaming (response, _) + | Upgrade response | Fixed response -> response let respond_with_string t response str = @@ -133,6 +138,7 @@ let respond_with_string t response str = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_string: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_string: response already complete" @@ -150,6 +156,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already complete" @@ -170,6 +177,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = response_body | Streaming _ -> failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_streaming: response already complete" @@ -178,6 +186,20 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error"; unsafe_respond_with_streaming ~flush_headers_immediately t response +let respond_with_upgrade ?reason t headers = + match t.response_state with + | Waiting -> + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.close_reader t.request_body; + Writer.write_response t.writer response; + Writer.wakeup t.writer; + | Streaming _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already started" + | Upgrade _ + | Fixed _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already complete" + let report_error t error = t.persistent <- false; Body.close_reader t.request_body; @@ -201,7 +223,7 @@ let report_error t error = | Streaming (_response, response_body), `Exn _ -> Body.close_writer response_body; Writer.close_and_drain t.writer - | (Fixed _ | Streaming _ | Waiting) , _ -> + | (Fixed _ | Streaming _ | Waiting | Upgrade _) , _ -> (* XXX(seliopou): Once additional logging support is added, log the error * in case it is not spurious. *) () @@ -226,13 +248,17 @@ let persistent_connection t = t.persistent let input_state t : Input_state.t = - if Body.is_closed t.request_body - then Complete - else Ready + match t.response_state with + | Upgrade _ -> Upgraded + | Waiting | Fixed _ | Streaming _ -> + if Body.is_closed t.request_body + then Complete + else Ready ;; let output_state t : Output_state.t = match t.response_state with + | Upgrade _ -> Upgraded | Fixed _ -> Complete | Streaming (_, response_body) -> if Body.has_pending_output response_body diff --git a/lib/serialize.ml b/lib/serialize.ml index 61c22131..76039b9d 100644 --- a/lib/serialize.ml +++ b/lib/serialize.ml @@ -195,4 +195,6 @@ module Writer = struct | `Close -> `Close (drained_bytes t) | `Yield -> `Yield | `Writev iovecs -> `Write iovecs + + let has_pending_output t = Faraday.has_pending_output t.encoder end diff --git a/lib/server_connection.ml b/lib/server_connection.ml index f81cd5bd..22ad434b 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -199,6 +199,7 @@ let rec _next_read_operation t = match Reqd.input_state reqd with | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd + | Upgraded -> `Upgrade ) and _final_read_operation_for t reqd = @@ -208,6 +209,7 @@ and _final_read_operation_for t reqd = ) else ( match Reqd.output_state reqd with | Waiting | Ready -> `Yield + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_read_operation t; @@ -218,7 +220,7 @@ let next_read_operation t = match _next_read_operation t with | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close - | (`Read | `Yield | `Close) as operation -> operation + | (`Read | `Yield | `Close | `Upgrade) as operation -> operation let rec read_with_more t bs ~off ~len more = let call_handler = Queue.is_empty t.request_queue in @@ -262,6 +264,12 @@ let rec _next_write_operation t = Reqd.flush_response_body reqd; Writer.next t.writer | Complete -> _final_write_operation_for t reqd + | Upgraded -> + if Writer.has_pending_output t.writer then + (* Even in the Upgrade case, we're still responsible for writing the response + header, so we might have work to do. *) + Writer.next t.writer + else _final_write_operation_for t reqd ) and _final_write_operation_for t reqd = @@ -272,6 +280,7 @@ and _final_write_operation_for t reqd = ) else ( match Reqd.input_state reqd with | Ready -> Writer.next t.writer; + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_write_operation t; From b761c7ea84604620b58976e42d41847eed172d53 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 26 Apr 2021 12:49:40 +0100 Subject: [PATCH 05/30] Implement HTTP upgrades. --- async/httpaf_async.ml | 19 +++++++++++++++++- async/httpaf_async.mli | 1 + lib_test/helpers.ml | 8 +++++--- lib_test/test_client_connection.ml | 8 ++++---- lib_test/test_server_connection.ml | 32 ++++++++++++++++++++++++++---- lwt-unix/httpaf_lwt_unix.ml | 25 ++++++++++++++++++++++- lwt-unix/httpaf_lwt_unix.mli | 1 + 7 files changed, 81 insertions(+), 13 deletions(-) diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index accd3eec..20a7908f 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -90,7 +90,8 @@ let read fd buffer = open Httpaf module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + let create_connection_handler + ?(config=Config.default) ~request_handler ~upgrade_handler ~error_handler = fun client_addr socket -> let fd = Socket.fd socket in let writev = Faraday_async.writev_of_fd fd in @@ -119,6 +120,14 @@ module Server = struct | `Yield -> (* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_reader conn reader_thread + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upon (upgrade_handler client_addr) (fun () -> + Ivar.fill read_complete (); + if not (Fd.is_closed fd) + then Socket.shutdown socket `Receive)) | `Close -> (* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill read_complete (); @@ -136,6 +145,14 @@ module Server = struct | `Yield -> (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_writer conn writer_thread; + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upon (upgrade_handler client_addr) (fun () -> + Ivar.fill write_complete (); + if not (Fd.is_closed fd) + then Socket.shutdown socket `Send)) | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index f120624d..27b2b98e 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -7,6 +7,7 @@ module Server : sig val create_connection_handler : ?config : Config.t -> request_handler : ('a -> Server_connection.request_handler) + -> upgrade_handler : ('a -> unit Deferred.t) option -> error_handler : ('a -> Server_connection.error_handler) -> ([< Socket.Address.t] as 'a) -> ([`Active], 'a) Socket.t diff --git a/lib_test/helpers.ml b/lib_test/helpers.ml index 9ce0be45..29f8dc2f 100644 --- a/lib_test/helpers.ml +++ b/lib_test/helpers.ml @@ -18,7 +18,7 @@ let response_to_string ?body r = Faraday.serialize_to_string f module Read_operation = struct - type t = [ `Read | `Yield | `Close ] + type t = [ `Read | `Yield | `Close | `Upgrade ] let pp_hum fmt (t : t) = let str = @@ -26,13 +26,14 @@ module Read_operation = struct | `Read -> "Read" | `Yield -> "Yield" | `Close -> "Close" + | `Upgrade -> "Upgrade" in Format.pp_print_string fmt str ;; end module Write_operation = struct - type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int ] + type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int | `Upgrade ] let iovecs_to_string iovecs = let len = IOVec.lengthv iovecs in @@ -50,12 +51,13 @@ module Write_operation = struct | `Write iovecs -> Format.fprintf fmt "Write %S" (iovecs_to_string iovecs) | `Yield -> Format.pp_print_string fmt "Yield" | `Close len -> Format.fprintf fmt "Close %i" len + | `Upgrade -> Format.pp_print_string fmt "Upgrade" ;; let to_write_as_string t = match t with | `Write iovecs -> Some (iovecs_to_string iovecs) - | `Close _ | `Yield -> None + | `Close _ | `Yield | `Upgrade -> None ;; end diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index 4075c590..dae3ba78 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -41,7 +41,7 @@ let read_response t r = let reader_ready t = Alcotest.check read_operation "Reader is ready" - `Read (next_read_operation t :> [`Close | `Read | `Yield]); + `Read (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); ;; let write_string ?(msg="output written") t str = @@ -59,17 +59,17 @@ let write_request ?(msg="request written") t r = let writer_yielded t = Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + `Yield (next_write_operation t :> Write_operation.t); ;; let writer_closed t = Alcotest.check write_operation "Writer is closed" - (`Close 0) (next_write_operation t); + (`Close 0) (next_write_operation t :> Write_operation.t); ;; let connection_is_shutdown t = Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t :> [`Close | `Read | `Yield]); + `Close (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); writer_closed t; ;; diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index e1b0e696..80ebfec0 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -70,6 +70,9 @@ end = struct | `Close -> trace "reader: Close"; t.read_operation <- `Close + | `Upgrade -> + trace "reader: Upgrade"; + t.read_operation <- `Upgrade ;; let rec write_step t = @@ -89,6 +92,9 @@ end = struct | `Close n -> trace "writer: Close"; t.write_operation <- `Close n + | `Upgrade -> + trace "writer: Upgrade"; + t.write_operation <- `Upgrade ;; let create ?config ?error_handler request_handler = @@ -112,13 +118,13 @@ end = struct let current_read_operation t = match t.read_operation with | `Initial -> assert false - | `Read | `Yield | `Close as op -> op + | `Read | `Yield | `Close | `Upgrade as op -> op ;; let current_write_operation t = match t.write_operation with | `Initial -> assert false - | `Write _ | `Yield | `Close _ as op -> op + | `Write _ | `Yield | `Close _ | `Upgrade as op -> op ;; let do_read t f = @@ -127,7 +133,7 @@ end = struct let res = f t.server_connection in t.read_loop (); res - | `Yield | `Close as op -> + | `Yield | `Close | `Upgrade as op -> Alcotest.failf "Read attempted during operation: %a" Read_operation.pp_hum op ;; @@ -138,7 +144,7 @@ end = struct let res = f t.server_connection bufs in t.write_loop (); res - | `Yield | `Close _ as op -> + | `Yield | `Close _ | `Upgrade as op -> Alcotest.failf "Write attempted during operation: %a" Write_operation.pp_hum op ;; @@ -894,6 +900,23 @@ let test_response_finished_before_body_read () = write_response t response ~body:"done"; ;; +let test_upgrade () = + let upgrade_headers =["Connection", "Upgrade" ; "Upgrade", "foo"] in + let request_handler reqd = + Reqd.respond_with_upgrade reqd + (Headers.of_list upgrade_headers) + in + let t = create request_handler in + read_request t + (Request.create `GET "/" + ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); + Alcotest.check read_operation "Reader is `Upgrade" `Upgrade (current_read_operation t); + write_response t + (Response.create `Switching_protocols ~headers:(Headers.of_list upgrade_headers)); + Alcotest.check write_operation "Writer is `Upgrade" `Upgrade (current_write_operation t); +;; + + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -922,4 +945,5 @@ let tests = ; "multiple requests with connection close", `Quick, test_multiple_requests_in_single_read_with_close ; "parse failure after checkpoint", `Quick, test_parse_failure_after_checkpoint ; "response finished before body read", `Quick, test_response_finished_before_body_read + ; "test upgrades", `Quick, test_upgrade ] diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index 545fbc6a..bdd71178 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -106,7 +106,8 @@ let shutdown socket command = module Config = Httpaf.Config module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + let create_connection_handler + ?(config=Config.default) ~request_handler ~upgrade_handler ~error_handler = fun client_addr socket -> let module Server_connection = Httpaf.Server_connection in let connection = @@ -140,6 +141,17 @@ module Server = struct Server_connection.yield_reader connection read_loop; Lwt.return_unit + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upgrade_handler client_addr >>= fun () -> + Lwt.wakeup_later notify_read_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_RECEIVE + end; + Lwt.return_unit) + | `Close -> Lwt.wakeup_later notify_read_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin @@ -172,6 +184,17 @@ module Server = struct Server_connection.yield_writer connection write_loop; Lwt.return_unit + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upgrade_handler client_addr >>= fun () -> + Lwt.wakeup_later notify_write_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_SEND + end; + Lwt.return_unit) + | `Close _ -> Lwt.wakeup_later notify_write_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli index 6625002c..4f8e3387 100644 --- a/lwt-unix/httpaf_lwt_unix.mli +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -42,6 +42,7 @@ module Server : sig val create_connection_handler : ?config : Config.t -> request_handler : (Unix.sockaddr -> Server_connection.request_handler) + -> upgrade_handler : (Unix.sockaddr -> unit Lwt.t) option -> error_handler : (Unix.sockaddr -> Server_connection.error_handler) -> Unix.sockaddr -> Lwt_unix.file_descr From 124eed89067c2db551bb1fa33581a9d267aefe8e Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 12:04:23 +0100 Subject: [PATCH 06/30] Fix requests where the client requests an upgrade but the server declines. In these cases, the correct behaviour is to send the response back to the client and then close the connection. The client may have sent us bytes after the request header which are not HTTP, so we have no hope of recovering this connection. This is achieved by modifying the parser to stop if it detects that the client has requested an upgrade. This ensures that, regardless of whether or not [Reqd.input_state] is [Upgrade], the parser will not attempt to consume bytes that might not be HTTP. --- lib/message.ml | 2 +- lib/parse.ml | 47 ++++++++++++++++++------------ lib/request.ml | 5 ++++ lib/server_connection.ml | 2 +- lib_test/test_server_connection.ml | 17 ++++++++++- 5 files changed, 51 insertions(+), 22 deletions(-) diff --git a/lib/message.ml b/lib/message.ml index c24ddbe3..acf8b98e 100644 --- a/lib/message.ml +++ b/lib/message.ml @@ -41,7 +41,7 @@ let persistent_connection ?(proxy=false) version headers = (* XXX(seliopou): use proxy argument in the case of HTTP/1.0 as per https://tools.ietf.org/html/rfc7230#section-6.3 *) match Headers.get headers "connection" with - | Some "close" -> false + | Some ("close" | "upgrade") -> false | Some "keep-alive" -> Version.(compare version v1_0) >= 0 | _ -> Version.(compare version v1_1) >= 0 diff --git a/lib/parse.ml b/lib/parse.ml index 664e3c3b..925e5f02 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -212,13 +212,22 @@ module Reader = struct | `Invalid_response_body_length of Response.t | `Parse of string list * string ] + type parser_result = + | Can_continue + | Stop + type 'error parse_state = - | Done + | Done of parser_result | Fail of 'error - | Partial of (Bigstringaf.t -> off:int -> len:int -> AU.more -> (unit, 'error) result AU.state) + | Partial of + (Bigstringaf.t + -> off:int + -> len:int + -> AU.more + -> (parser_result, 'error) result AU.state) type 'error t = - { parser : (unit, 'error) result Angstrom.t + { parser : (parser_result, 'error) result Angstrom.t ; mutable parse_state : 'error parse_state (* The state of the parse for the current request *) ; mutable closed : bool @@ -231,24 +240,25 @@ module Reader = struct let create parser = { parser - ; parse_state = Done + ; parse_state = Done Can_continue ; closed = false } - let ok = return (Ok ()) - let request handler = let parser = request <* commit >>= fun request -> match Request.body_length request with - | `Error `Bad_request -> return (Error (`Bad_request request)) + | `Error `Bad_request -> + return (Error (`Bad_request request)) | `Fixed 0L -> handler request Body.empty; - ok + (* If the client has requested an upgrade, then any bytes after the headers are + likely not HTTP, so we should be careful not to try to parse them. *) + return (Ok (if Request.is_upgrade request then Stop else Can_continue)) | `Fixed _ | `Chunked as encoding -> let request_body = Body.create_reader Bigstringaf.empty in handler request request_body; - body ~encoding request_body *> ok + body ~encoding request_body *> return (Ok Can_continue) in create parser @@ -261,13 +271,13 @@ module Reader = struct | `Error `Internal_server_error -> return (Error (`Invalid_response_body_length response)) | `Fixed 0L -> handler response Body.empty; - ok + return (Ok Can_continue) | `Fixed _ | `Chunked | `Close_delimited as encoding -> (* We do not trust the length provided in the [`Fixed] case, as the client could DOS easily. *) let response_body = Body.create_reader Bigstringaf.empty in handler response response_body; - body ~encoding response_body *> ok + body ~encoding response_body *> return (Ok Can_continue) in create parser ;; @@ -278,8 +288,8 @@ module Reader = struct let transition t state = match state with - | AU.Done(consumed, Ok ()) -> - t.parse_state <- Done; + | AU.Done(consumed, Ok result) -> + t.parse_state <- Done result; consumed | AU.Done(consumed, Error error) -> t.parse_state <- Fail error; @@ -303,8 +313,8 @@ module Reader = struct let rec read_with_more t bs ~off ~len more = let consumed = match t.parse_state with - | Fail _ -> 0 - | Done -> + | Fail _ | Done Stop -> 0 + | Done Can_continue -> start t (AU.parse t.parser); read_with_more t bs ~off ~len more; | Partial continue -> @@ -324,11 +334,10 @@ module Reader = struct let next t = if t.closed then `Close - else ( + else match t.parse_state with | Fail err -> `Error err - | Done -> `Read - | Partial _ -> `Read - ) + | Done Stop -> `Close + | Done Can_continue | Partial _ -> `Read ;; end diff --git a/lib/request.ml b/lib/request.ml index 158fd4c2..204a2000 100644 --- a/lib/request.ml +++ b/lib/request.ml @@ -80,3 +80,8 @@ let persistent_connection ?proxy { version; headers; _ } = let pp_hum fmt { meth; target; version; headers } = Format.fprintf fmt "((method \"%a\") (target %S) (version \"%a\") (headers %a))" Method.pp_hum meth target Version.pp_hum version Headers.pp_hum headers + +let is_upgrade t = + match Headers.get t.headers "Connection" with + | None -> false + | Some header_val -> Headers.ci_equal header_val "upgrade" diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 22ad434b..5a0e584f 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -269,7 +269,7 @@ let rec _next_write_operation t = (* Even in the Upgrade case, we're still responsible for writing the response header, so we might have work to do. *) Writer.next t.writer - else _final_write_operation_for t reqd + else `Upgrade ) and _final_write_operation_for t reqd = diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 80ebfec0..f42b4c64 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -901,7 +901,7 @@ let test_response_finished_before_body_read () = ;; let test_upgrade () = - let upgrade_headers =["Connection", "Upgrade" ; "Upgrade", "foo"] in + let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in let request_handler reqd = Reqd.respond_with_upgrade reqd (Headers.of_list upgrade_headers) @@ -916,6 +916,20 @@ let test_upgrade () = Alcotest.check write_operation "Writer is `Upgrade" `Upgrade (current_write_operation t); ;; +let test_upgrade_where_server_does_not_upgrade () = + let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in + let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in + let request_handler reqd = + Reqd.respond_with_string reqd response "" + in + let t = create request_handler in + read_request t + (Request.create `GET "/" + ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); + Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); + write_response t response; + Alcotest.check write_operation "Writer is `Close" (`Close 0) (current_write_operation t); +;; let tests = [ "initial reader state" , `Quick, test_initial_reader_state @@ -946,4 +960,5 @@ let tests = ; "parse failure after checkpoint", `Quick, test_parse_failure_after_checkpoint ; "response finished before body read", `Quick, test_response_finished_before_body_read ; "test upgrades", `Quick, test_upgrade + ; "test upgrade where server does not upgrade", `Quick, test_upgrade_where_server_does_not_upgrade ] From 004e128771c0c505669017089acfea50979e489c Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 12:36:26 +0100 Subject: [PATCH 07/30] refactor --- lib/server_connection.ml | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 5a0e584f..5614aef4 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -263,18 +263,20 @@ let rec _next_write_operation t = | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer - | Complete -> _final_write_operation_for t reqd - | Upgraded -> + | Complete -> _final_write_operation_for t reqd ~upgrade:false + | Upgraded -> _final_write_operation_for t reqd ~upgrade:true + ) + +and _final_write_operation_for t reqd ~upgrade = + let next = + if upgrade then ( if Writer.has_pending_output t.writer then (* Even in the Upgrade case, we're still responsible for writing the response header, so we might have work to do. *) Writer.next t.writer - else `Upgrade - ) - -and _final_write_operation_for t reqd = - let next = - if not (Reqd.persistent_connection reqd) then ( + else + `Upgrade + ) else if not (Reqd.persistent_connection reqd) then ( shutdown_writer t; Writer.next t.writer; ) else ( From d1ff221042c38145557ba93fb945d7364abaaf01 Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 14:55:21 +0100 Subject: [PATCH 08/30] More fixes for when the upgrade request is declined --- lib/reqd.ml | 31 ++++++++++++++++++++++-------- lib/server_connection.ml | 2 ++ lib_test/test_server_connection.ml | 18 ++++++++++++----- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index b8d915b3..40c989aa 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -44,6 +44,7 @@ end module Input_state = struct type t = + | Waiting | Ready | Complete | Upgraded @@ -189,11 +190,14 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = let respond_with_upgrade ?reason t headers = match t.response_state with | Waiting -> - let response = Response.create ?reason ~headers `Switching_protocols in - t.response_state <- Upgrade response; - Body.close_reader t.request_body; - Writer.write_response t.writer response; - Writer.wakeup t.writer; + if not (Request.is_upgrade t.request) then + failwith "httpaf.Reqd.respond_with_upgrade: request was not an upgrade request" + else ( + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.close_reader t.request_body; + Writer.write_response t.writer response; + Writer.wakeup t.writer); | Streaming _ -> failwith "httpaf.Reqd.respond_with_upgrade: response already started" | Upgrade _ @@ -248,12 +252,23 @@ let persistent_connection t = t.persistent let input_state t : Input_state.t = - match t.response_state with - | Upgrade _ -> Upgraded - | Waiting | Fixed _ | Streaming _ -> + let upgrade_status = + match Request.is_upgrade t.request with + | false -> `Not_upgrading + | true -> + match t.response_state with + | Upgrade _ -> `Finished_upgrading + | Fixed _ | Streaming _ -> `Upgrade_declined + | Waiting -> `Upgrade_in_progress + in + match upgrade_status with + | `Finished_upgrading -> Upgraded + | `Not_upgrading | `Upgrade_declined -> if Body.is_closed t.request_body then Complete else Ready + | `Upgrade_in_progress -> + Waiting ;; let output_state t : Output_state.t = diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 5614aef4..8fca0866 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -197,6 +197,7 @@ let rec _next_read_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd | Upgraded -> `Upgrade @@ -281,6 +282,7 @@ and _final_write_operation_for t reqd ~upgrade = Writer.next t.writer; ) else ( match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Writer.next t.writer; | Upgraded -> `Upgrade | Complete -> diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index f42b4c64..5141e475 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -918,16 +918,24 @@ let test_upgrade () = let test_upgrade_where_server_does_not_upgrade () = let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in - let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in - let request_handler reqd = - Reqd.respond_with_string reqd response "" - in + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in let t = create request_handler in read_request t (Request.create `GET "/" ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); - Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); + (* At this point, we don't know if the response handler will call respond_with_upgrade + or not. So we pause the reader until that is determined. *) + Alcotest.check read_operation "Reader is `Yield during upgrade negotiation" + `Yield (current_read_operation t); + + (* Now pretend the user doesn't want to do the upgrade and make sure we close the + connection *) + let reqd = Option.get !reqd in + let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in + Reqd.respond_with_string reqd response ""; write_response t response; + Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); Alcotest.check write_operation "Writer is `Close" (`Close 0) (current_write_operation t); ;; From 60f137b48bc30d88de3ebad584654a8977857b8b Mon Sep 17 00:00:00 2001 From: David House Date: Fri, 30 Apr 2021 11:14:56 +0100 Subject: [PATCH 09/30] Unwind changes to parser The previous commit asserted that the correct response to the server declining to upgrade was to close the connection. This isn't right. For instance, if the web server is using the Negotiate authentication scheme (https://tools.ietf.org/html/rfc4559), the workflow looks like this: - The client sends a request - The server responds with Not_authorized and 'WWW-Authenticate: Negotiate' - The client re-sends the same request with a 'Authorization: Negotiate ' header. So, if someone wants to do websockets on a kerberos-authenticated server (for instance), then we need to allow requests after a failed upgrade. I guess if the client sends an upgrade request and immediately starts speaking the new protocol without waiting for the response, we'll get a parse error, but there isn't much we can do. This commit achieves this by unwinding the changes to the parser. The declined-upgrade test is flipped round so that it proves subsequent requests *are* possible, rather than *are not* possible. --- lib/message.ml | 2 +- lib/parse.ml | 47 ++++++++++++------------------ lib_test/test_server_connection.ml | 17 +++++++---- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/lib/message.ml b/lib/message.ml index acf8b98e..c24ddbe3 100644 --- a/lib/message.ml +++ b/lib/message.ml @@ -41,7 +41,7 @@ let persistent_connection ?(proxy=false) version headers = (* XXX(seliopou): use proxy argument in the case of HTTP/1.0 as per https://tools.ietf.org/html/rfc7230#section-6.3 *) match Headers.get headers "connection" with - | Some ("close" | "upgrade") -> false + | Some "close" -> false | Some "keep-alive" -> Version.(compare version v1_0) >= 0 | _ -> Version.(compare version v1_1) >= 0 diff --git a/lib/parse.ml b/lib/parse.ml index 925e5f02..664e3c3b 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -212,22 +212,13 @@ module Reader = struct | `Invalid_response_body_length of Response.t | `Parse of string list * string ] - type parser_result = - | Can_continue - | Stop - type 'error parse_state = - | Done of parser_result + | Done | Fail of 'error - | Partial of - (Bigstringaf.t - -> off:int - -> len:int - -> AU.more - -> (parser_result, 'error) result AU.state) + | Partial of (Bigstringaf.t -> off:int -> len:int -> AU.more -> (unit, 'error) result AU.state) type 'error t = - { parser : (parser_result, 'error) result Angstrom.t + { parser : (unit, 'error) result Angstrom.t ; mutable parse_state : 'error parse_state (* The state of the parse for the current request *) ; mutable closed : bool @@ -240,25 +231,24 @@ module Reader = struct let create parser = { parser - ; parse_state = Done Can_continue + ; parse_state = Done ; closed = false } + let ok = return (Ok ()) + let request handler = let parser = request <* commit >>= fun request -> match Request.body_length request with - | `Error `Bad_request -> - return (Error (`Bad_request request)) + | `Error `Bad_request -> return (Error (`Bad_request request)) | `Fixed 0L -> handler request Body.empty; - (* If the client has requested an upgrade, then any bytes after the headers are - likely not HTTP, so we should be careful not to try to parse them. *) - return (Ok (if Request.is_upgrade request then Stop else Can_continue)) + ok | `Fixed _ | `Chunked as encoding -> let request_body = Body.create_reader Bigstringaf.empty in handler request request_body; - body ~encoding request_body *> return (Ok Can_continue) + body ~encoding request_body *> ok in create parser @@ -271,13 +261,13 @@ module Reader = struct | `Error `Internal_server_error -> return (Error (`Invalid_response_body_length response)) | `Fixed 0L -> handler response Body.empty; - return (Ok Can_continue) + ok | `Fixed _ | `Chunked | `Close_delimited as encoding -> (* We do not trust the length provided in the [`Fixed] case, as the client could DOS easily. *) let response_body = Body.create_reader Bigstringaf.empty in handler response response_body; - body ~encoding response_body *> return (Ok Can_continue) + body ~encoding response_body *> ok in create parser ;; @@ -288,8 +278,8 @@ module Reader = struct let transition t state = match state with - | AU.Done(consumed, Ok result) -> - t.parse_state <- Done result; + | AU.Done(consumed, Ok ()) -> + t.parse_state <- Done; consumed | AU.Done(consumed, Error error) -> t.parse_state <- Fail error; @@ -313,8 +303,8 @@ module Reader = struct let rec read_with_more t bs ~off ~len more = let consumed = match t.parse_state with - | Fail _ | Done Stop -> 0 - | Done Can_continue -> + | Fail _ -> 0 + | Done -> start t (AU.parse t.parser); read_with_more t bs ~off ~len more; | Partial continue -> @@ -334,10 +324,11 @@ module Reader = struct let next t = if t.closed then `Close - else + else ( match t.parse_state with | Fail err -> `Error err - | Done Stop -> `Close - | Done Can_continue | Partial _ -> `Read + | Done -> `Read + | Partial _ -> `Read + ) ;; end diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 5141e475..d97deb7f 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -918,8 +918,8 @@ let test_upgrade () = let test_upgrade_where_server_does_not_upgrade () = let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in - let reqd = ref None in - let request_handler reqd' = reqd := Some reqd' in + let reqd_ref = ref None in + let request_handler reqd = reqd_ref := Some reqd in let t = create request_handler in read_request t (Request.create `GET "/" @@ -931,12 +931,17 @@ let test_upgrade_where_server_does_not_upgrade () = (* Now pretend the user doesn't want to do the upgrade and make sure we close the connection *) - let reqd = Option.get !reqd in - let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in + let reqd = Option.get !reqd_ref in + let response = Response.create `Bad_request ~headers:(Headers.encoding_fixed 0) in + Reqd.respond_with_string reqd response ""; + write_response t response; + + (* The connection is left healthy and can be used for more requests *) + read_request t (Request.create `GET "/" ~headers:(Headers.encoding_fixed 0)); + let reqd = Option.get !reqd_ref in + let response = Response.create `OK ~headers:(Headers.encoding_fixed 0) in Reqd.respond_with_string reqd response ""; write_response t response; - Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); - Alcotest.check write_operation "Writer is `Close" (`Close 0) (current_write_operation t); ;; let tests = From d45b85cb935270f84843108dccbf849cde2d3c02 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 24 May 2021 12:16:16 +0100 Subject: [PATCH 10/30] Fix compilation of examples --- examples/async/async_echo_post.ml | 5 ++++- examples/lwt/lwt_echo_post.ml | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/async/async_echo_post.ml b/examples/async/async_echo_post.ml index 0f524a2a..b0df8db5 100644 --- a/examples/async/async_echo_post.ml +++ b/examples/async/async_echo_post.ml @@ -10,7 +10,10 @@ let main port max_accepts_per_batch () = let where_to_listen = Tcp.Where_to_listen.of_port port in Tcp.(Server.create_sock ~on_handler_error:`Raise ~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen) - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler + ~request_handler + ~error_handler + ~upgrade_handler:None) >>= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; diff --git a/examples/lwt/lwt_echo_post.ml b/examples/lwt/lwt_echo_post.ml index 18307107..42d95b9f 100644 --- a/examples/lwt/lwt_echo_post.ml +++ b/examples/lwt/lwt_echo_post.ml @@ -12,7 +12,10 @@ let main port = Lwt.async (fun () -> Lwt_io.establish_server_with_client_socket listen_address - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler + ~request_handler + ~error_handler + ~upgrade_handler:None) >|= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; From 05c797d528061323894654eb9fb0f965a93eb047 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 26 Apr 2021 12:49:40 +0100 Subject: [PATCH 11/30] Implement HTTP upgrades. --- lib/httpaf.mli | 5 ++++- lib/reqd.ml | 34 ++++++++++++++++++++++++++++++---- lib/serialize.ml | 2 ++ lib/server_connection.ml | 11 ++++++++++- 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/lib/httpaf.mli b/lib/httpaf.mli index 6a529033..e6ab4e29 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -649,6 +649,8 @@ module Reqd : sig val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> [`write] Body.t + val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit + (** {3 Exception Handling} *) val report_exn : t -> exn -> unit @@ -690,7 +692,7 @@ module Server_connection : sig (** [create ?config ?error_handler ~request_handler] creates a connection handler that will service individual requests with [request_handler]. *) - val next_read_operation : t -> [ `Read | `Yield | `Close ] + val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ] (** [next_read_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) @@ -717,6 +719,7 @@ module Server_connection : sig val next_write_operation : t -> [ | `Write of Bigstringaf.t IOVec.t list | `Yield + | `Upgrade | `Close of int ] (** [next_write_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) diff --git a/lib/reqd.ml b/lib/reqd.ml index 8277a637..b8d915b3 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -37,6 +37,7 @@ type error = module Response_state = struct type t = | Waiting + | Upgrade of Response.t | Fixed of Response.t | Streaming of Response.t * [`write] Body.t end @@ -45,6 +46,7 @@ module Input_state = struct type t = | Ready | Complete + | Upgraded end module Output_state = struct @@ -52,6 +54,7 @@ module Output_state = struct | Waiting | Ready | Complete + | Upgraded end type error_handler = @@ -111,12 +114,14 @@ let response { response_state; _ } = match response_state with | Waiting -> None | Streaming (response, _) + | Upgrade response | Fixed response -> Some response let response_exn { response_state; _ } = match response_state with | Waiting -> failwith "httpaf.Reqd.response_exn: response has not started" | Streaming (response, _) + | Upgrade response | Fixed response -> response let respond_with_string t response str = @@ -133,6 +138,7 @@ let respond_with_string t response str = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_string: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_string: response already complete" @@ -150,6 +156,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already complete" @@ -170,6 +177,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = response_body | Streaming _ -> failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_streaming: response already complete" @@ -178,6 +186,20 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error"; unsafe_respond_with_streaming ~flush_headers_immediately t response +let respond_with_upgrade ?reason t headers = + match t.response_state with + | Waiting -> + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.close_reader t.request_body; + Writer.write_response t.writer response; + Writer.wakeup t.writer; + | Streaming _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already started" + | Upgrade _ + | Fixed _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already complete" + let report_error t error = t.persistent <- false; Body.close_reader t.request_body; @@ -201,7 +223,7 @@ let report_error t error = | Streaming (_response, response_body), `Exn _ -> Body.close_writer response_body; Writer.close_and_drain t.writer - | (Fixed _ | Streaming _ | Waiting) , _ -> + | (Fixed _ | Streaming _ | Waiting | Upgrade _) , _ -> (* XXX(seliopou): Once additional logging support is added, log the error * in case it is not spurious. *) () @@ -226,13 +248,17 @@ let persistent_connection t = t.persistent let input_state t : Input_state.t = - if Body.is_closed t.request_body - then Complete - else Ready + match t.response_state with + | Upgrade _ -> Upgraded + | Waiting | Fixed _ | Streaming _ -> + if Body.is_closed t.request_body + then Complete + else Ready ;; let output_state t : Output_state.t = match t.response_state with + | Upgrade _ -> Upgraded | Fixed _ -> Complete | Streaming (_, response_body) -> if Body.has_pending_output response_body diff --git a/lib/serialize.ml b/lib/serialize.ml index 61c22131..76039b9d 100644 --- a/lib/serialize.ml +++ b/lib/serialize.ml @@ -195,4 +195,6 @@ module Writer = struct | `Close -> `Close (drained_bytes t) | `Yield -> `Yield | `Writev iovecs -> `Write iovecs + + let has_pending_output t = Faraday.has_pending_output t.encoder end diff --git a/lib/server_connection.ml b/lib/server_connection.ml index c4b3680e..0501791f 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -199,6 +199,7 @@ let rec _next_read_operation t = match Reqd.input_state reqd with | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd + | Upgraded -> `Upgrade ) and _final_read_operation_for t reqd = @@ -220,6 +221,7 @@ and _final_read_operation_for t reqd = if Reader.is_closed t.reader then Reader.next t.reader else `Yield + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_read_operation t; @@ -230,7 +232,7 @@ let next_read_operation t = match _next_read_operation t with | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close - | (`Read | `Yield | `Close) as operation -> operation + | (`Read | `Yield | `Close | `Upgrade) as operation -> operation let rec read_with_more t bs ~off ~len more = let call_handler = Queue.is_empty t.request_queue in @@ -274,6 +276,12 @@ let rec _next_write_operation t = Reqd.flush_response_body reqd; Writer.next t.writer | Complete -> _final_write_operation_for t reqd + | Upgraded -> + if Writer.has_pending_output t.writer then + (* Even in the Upgrade case, we're still responsible for writing the response + header, so we might have work to do. *) + Writer.next t.writer + else _final_write_operation_for t reqd ) and _final_write_operation_for t reqd = @@ -284,6 +292,7 @@ and _final_write_operation_for t reqd = ) else ( match Reqd.input_state reqd with | Ready -> Writer.next t.writer; + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_write_operation t; From e6cda20516edab4efb064a178d5533ed180038c5 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 26 Apr 2021 12:49:40 +0100 Subject: [PATCH 12/30] Implement HTTP upgrades. --- async/httpaf_async.ml | 19 +++++++++++++++++- async/httpaf_async.mli | 1 + lib_test/helpers.ml | 8 +++++--- lib_test/test_client_connection.ml | 8 ++++---- lib_test/test_server_connection.ml | 31 ++++++++++++++++++++++++++---- lwt-unix/httpaf_lwt_unix.ml | 25 +++++++++++++++++++++++- lwt-unix/httpaf_lwt_unix.mli | 1 + 7 files changed, 80 insertions(+), 13 deletions(-) diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index accd3eec..20a7908f 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -90,7 +90,8 @@ let read fd buffer = open Httpaf module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + let create_connection_handler + ?(config=Config.default) ~request_handler ~upgrade_handler ~error_handler = fun client_addr socket -> let fd = Socket.fd socket in let writev = Faraday_async.writev_of_fd fd in @@ -119,6 +120,14 @@ module Server = struct | `Yield -> (* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_reader conn reader_thread + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upon (upgrade_handler client_addr) (fun () -> + Ivar.fill read_complete (); + if not (Fd.is_closed fd) + then Socket.shutdown socket `Receive)) | `Close -> (* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill read_complete (); @@ -136,6 +145,14 @@ module Server = struct | `Yield -> (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_writer conn writer_thread; + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upon (upgrade_handler client_addr) (fun () -> + Ivar.fill write_complete (); + if not (Fd.is_closed fd) + then Socket.shutdown socket `Send)) | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index f120624d..27b2b98e 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -7,6 +7,7 @@ module Server : sig val create_connection_handler : ?config : Config.t -> request_handler : ('a -> Server_connection.request_handler) + -> upgrade_handler : ('a -> unit Deferred.t) option -> error_handler : ('a -> Server_connection.error_handler) -> ([< Socket.Address.t] as 'a) -> ([`Active], 'a) Socket.t diff --git a/lib_test/helpers.ml b/lib_test/helpers.ml index 9ce0be45..29f8dc2f 100644 --- a/lib_test/helpers.ml +++ b/lib_test/helpers.ml @@ -18,7 +18,7 @@ let response_to_string ?body r = Faraday.serialize_to_string f module Read_operation = struct - type t = [ `Read | `Yield | `Close ] + type t = [ `Read | `Yield | `Close | `Upgrade ] let pp_hum fmt (t : t) = let str = @@ -26,13 +26,14 @@ module Read_operation = struct | `Read -> "Read" | `Yield -> "Yield" | `Close -> "Close" + | `Upgrade -> "Upgrade" in Format.pp_print_string fmt str ;; end module Write_operation = struct - type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int ] + type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int | `Upgrade ] let iovecs_to_string iovecs = let len = IOVec.lengthv iovecs in @@ -50,12 +51,13 @@ module Write_operation = struct | `Write iovecs -> Format.fprintf fmt "Write %S" (iovecs_to_string iovecs) | `Yield -> Format.pp_print_string fmt "Yield" | `Close len -> Format.fprintf fmt "Close %i" len + | `Upgrade -> Format.pp_print_string fmt "Upgrade" ;; let to_write_as_string t = match t with | `Write iovecs -> Some (iovecs_to_string iovecs) - | `Close _ | `Yield -> None + | `Close _ | `Yield | `Upgrade -> None ;; end diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index 838c2651..ae0ad927 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -41,7 +41,7 @@ let read_response t r = let reader_ready t = Alcotest.check read_operation "Reader is ready" - `Read (next_read_operation t :> [`Close | `Read | `Yield]); + `Read (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); ;; let write_string ?(msg="output written") t str = @@ -59,17 +59,17 @@ let write_request ?(msg="request written") t r = let writer_yielded t = Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + `Yield (next_write_operation t :> Write_operation.t); ;; let writer_closed t = Alcotest.check write_operation "Writer is closed" - (`Close 0) (next_write_operation t); + (`Close 0) (next_write_operation t :> Write_operation.t); ;; let connection_is_shutdown t = Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t :> [`Close | `Read | `Yield]); + `Close (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); writer_closed t; ;; diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 056f85dd..005cc440 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -72,6 +72,9 @@ end = struct | `Close -> trace "reader: Close"; t.read_operation <- `Close + | `Upgrade -> + trace "reader: Upgrade"; + t.read_operation <- `Upgrade ;; let rec write_step t = @@ -91,6 +94,9 @@ end = struct | `Close n -> trace "writer: Close"; t.write_operation <- `Close n + | `Upgrade -> + trace "writer: Upgrade"; + t.write_operation <- `Upgrade ;; let create ?config ?error_handler request_handler = @@ -114,13 +120,13 @@ end = struct let current_read_operation t = match t.read_operation with | `Initial -> assert false - | `Read | `Yield | `Close as op -> op + | `Read | `Yield | `Close | `Upgrade as op -> op ;; let current_write_operation t = match t.write_operation with | `Initial -> assert false - | `Write _ | `Yield | `Close _ as op -> op + | `Write _ | `Yield | `Close _ | `Upgrade as op -> op ;; let do_read t f = @@ -129,7 +135,7 @@ end = struct let res = f t.server_connection in t.read_loop (); res - | `Yield | `Close as op -> + | `Yield | `Close | `Upgrade as op -> Alcotest.failf "Read attempted during operation: %a" Read_operation.pp_hum op ;; @@ -140,7 +146,7 @@ end = struct let res = f t.server_connection bufs in t.write_loop (); res - | `Yield | `Close _ as op -> + | `Yield | `Close _ | `Upgrade as op -> Alcotest.failf "Write attempted during operation: %a" Write_operation.pp_hum op ;; @@ -943,6 +949,22 @@ let test_shutdown_during_asynchronous_request () = writer_closed t ;; +let test_upgrade () = + let upgrade_headers =["Connection", "Upgrade" ; "Upgrade", "foo"] in + let request_handler reqd = + Reqd.respond_with_upgrade reqd + (Headers.of_list upgrade_headers) + in + let t = create request_handler in + read_request t + (Request.create `GET "/" + ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); + Alcotest.check read_operation "Reader is `Upgrade" `Upgrade (current_read_operation t); + write_response t + (Response.create `Switching_protocols ~headers:(Headers.of_list upgrade_headers)); + Alcotest.check write_operation "Writer is `Upgrade" `Upgrade (current_write_operation t); +;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -974,4 +996,5 @@ let tests = ; "response finished before body read", `Quick, test_response_finished_before_body_read ; "shutdown in request handler", `Quick, test_shutdown_in_request_handler ; "shutdown during asynchronous request", `Quick, test_shutdown_during_asynchronous_request + ; "test upgrades", `Quick, test_upgrade ] diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index 545fbc6a..bdd71178 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -106,7 +106,8 @@ let shutdown socket command = module Config = Httpaf.Config module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + let create_connection_handler + ?(config=Config.default) ~request_handler ~upgrade_handler ~error_handler = fun client_addr socket -> let module Server_connection = Httpaf.Server_connection in let connection = @@ -140,6 +141,17 @@ module Server = struct Server_connection.yield_reader connection read_loop; Lwt.return_unit + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upgrade_handler client_addr >>= fun () -> + Lwt.wakeup_later notify_read_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_RECEIVE + end; + Lwt.return_unit) + | `Close -> Lwt.wakeup_later notify_read_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin @@ -172,6 +184,17 @@ module Server = struct Server_connection.yield_writer connection write_loop; Lwt.return_unit + | `Upgrade -> + (match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upgrade_handler client_addr >>= fun () -> + Lwt.wakeup_later notify_write_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_SEND + end; + Lwt.return_unit) + | `Close _ -> Lwt.wakeup_later notify_write_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli index 6625002c..4f8e3387 100644 --- a/lwt-unix/httpaf_lwt_unix.mli +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -42,6 +42,7 @@ module Server : sig val create_connection_handler : ?config : Config.t -> request_handler : (Unix.sockaddr -> Server_connection.request_handler) + -> upgrade_handler : (Unix.sockaddr -> unit Lwt.t) option -> error_handler : (Unix.sockaddr -> Server_connection.error_handler) -> Unix.sockaddr -> Lwt_unix.file_descr From 9ccf77b724716ec3eab51cf55c167411ac78081d Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 12:04:23 +0100 Subject: [PATCH 13/30] Fix requests where the client requests an upgrade but the server declines. In these cases, the correct behaviour is to send the response back to the client and then close the connection. The client may have sent us bytes after the request header which are not HTTP, so we have no hope of recovering this connection. This is achieved by modifying the parser to stop if it detects that the client has requested an upgrade. This ensures that, regardless of whether or not [Reqd.input_state] is [Upgrade], the parser will not attempt to consume bytes that might not be HTTP. --- lib/message.ml | 2 +- lib/parse.ml | 47 ++++++++++++++++++------------ lib/request.ml | 5 ++++ lib/server_connection.ml | 2 +- lib_test/test_server_connection.ml | 18 +++++++++++- 5 files changed, 52 insertions(+), 22 deletions(-) diff --git a/lib/message.ml b/lib/message.ml index c24ddbe3..acf8b98e 100644 --- a/lib/message.ml +++ b/lib/message.ml @@ -41,7 +41,7 @@ let persistent_connection ?(proxy=false) version headers = (* XXX(seliopou): use proxy argument in the case of HTTP/1.0 as per https://tools.ietf.org/html/rfc7230#section-6.3 *) match Headers.get headers "connection" with - | Some "close" -> false + | Some ("close" | "upgrade") -> false | Some "keep-alive" -> Version.(compare version v1_0) >= 0 | _ -> Version.(compare version v1_1) >= 0 diff --git a/lib/parse.ml b/lib/parse.ml index 664e3c3b..925e5f02 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -212,13 +212,22 @@ module Reader = struct | `Invalid_response_body_length of Response.t | `Parse of string list * string ] + type parser_result = + | Can_continue + | Stop + type 'error parse_state = - | Done + | Done of parser_result | Fail of 'error - | Partial of (Bigstringaf.t -> off:int -> len:int -> AU.more -> (unit, 'error) result AU.state) + | Partial of + (Bigstringaf.t + -> off:int + -> len:int + -> AU.more + -> (parser_result, 'error) result AU.state) type 'error t = - { parser : (unit, 'error) result Angstrom.t + { parser : (parser_result, 'error) result Angstrom.t ; mutable parse_state : 'error parse_state (* The state of the parse for the current request *) ; mutable closed : bool @@ -231,24 +240,25 @@ module Reader = struct let create parser = { parser - ; parse_state = Done + ; parse_state = Done Can_continue ; closed = false } - let ok = return (Ok ()) - let request handler = let parser = request <* commit >>= fun request -> match Request.body_length request with - | `Error `Bad_request -> return (Error (`Bad_request request)) + | `Error `Bad_request -> + return (Error (`Bad_request request)) | `Fixed 0L -> handler request Body.empty; - ok + (* If the client has requested an upgrade, then any bytes after the headers are + likely not HTTP, so we should be careful not to try to parse them. *) + return (Ok (if Request.is_upgrade request then Stop else Can_continue)) | `Fixed _ | `Chunked as encoding -> let request_body = Body.create_reader Bigstringaf.empty in handler request request_body; - body ~encoding request_body *> ok + body ~encoding request_body *> return (Ok Can_continue) in create parser @@ -261,13 +271,13 @@ module Reader = struct | `Error `Internal_server_error -> return (Error (`Invalid_response_body_length response)) | `Fixed 0L -> handler response Body.empty; - ok + return (Ok Can_continue) | `Fixed _ | `Chunked | `Close_delimited as encoding -> (* We do not trust the length provided in the [`Fixed] case, as the client could DOS easily. *) let response_body = Body.create_reader Bigstringaf.empty in handler response response_body; - body ~encoding response_body *> ok + body ~encoding response_body *> return (Ok Can_continue) in create parser ;; @@ -278,8 +288,8 @@ module Reader = struct let transition t state = match state with - | AU.Done(consumed, Ok ()) -> - t.parse_state <- Done; + | AU.Done(consumed, Ok result) -> + t.parse_state <- Done result; consumed | AU.Done(consumed, Error error) -> t.parse_state <- Fail error; @@ -303,8 +313,8 @@ module Reader = struct let rec read_with_more t bs ~off ~len more = let consumed = match t.parse_state with - | Fail _ -> 0 - | Done -> + | Fail _ | Done Stop -> 0 + | Done Can_continue -> start t (AU.parse t.parser); read_with_more t bs ~off ~len more; | Partial continue -> @@ -324,11 +334,10 @@ module Reader = struct let next t = if t.closed then `Close - else ( + else match t.parse_state with | Fail err -> `Error err - | Done -> `Read - | Partial _ -> `Read - ) + | Done Stop -> `Close + | Done Can_continue | Partial _ -> `Read ;; end diff --git a/lib/request.ml b/lib/request.ml index 158fd4c2..204a2000 100644 --- a/lib/request.ml +++ b/lib/request.ml @@ -80,3 +80,8 @@ let persistent_connection ?proxy { version; headers; _ } = let pp_hum fmt { meth; target; version; headers } = Format.fprintf fmt "((method \"%a\") (target %S) (version \"%a\") (headers %a))" Method.pp_hum meth target Version.pp_hum version Headers.pp_hum headers + +let is_upgrade t = + match Headers.get t.headers "Connection" with + | None -> false + | Some header_val -> Headers.ci_equal header_val "upgrade" diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 0501791f..6baffe3e 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -281,7 +281,7 @@ let rec _next_write_operation t = (* Even in the Upgrade case, we're still responsible for writing the response header, so we might have work to do. *) Writer.next t.writer - else _final_write_operation_for t reqd + else `Upgrade ) and _final_write_operation_for t reqd = diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 005cc440..33064c26 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -950,7 +950,7 @@ let test_shutdown_during_asynchronous_request () = ;; let test_upgrade () = - let upgrade_headers =["Connection", "Upgrade" ; "Upgrade", "foo"] in + let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in let request_handler reqd = Reqd.respond_with_upgrade reqd (Headers.of_list upgrade_headers) @@ -965,6 +965,21 @@ let test_upgrade () = Alcotest.check write_operation "Writer is `Upgrade" `Upgrade (current_write_operation t); ;; +let test_upgrade_where_server_does_not_upgrade () = + let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in + let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in + let request_handler reqd = + Reqd.respond_with_string reqd response "" + in + let t = create request_handler in + read_request t + (Request.create `GET "/" + ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); + Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); + write_response t response; + Alcotest.check write_operation "Writer is `Close" (`Close 0) (current_write_operation t); +;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -997,4 +1012,5 @@ let tests = ; "shutdown in request handler", `Quick, test_shutdown_in_request_handler ; "shutdown during asynchronous request", `Quick, test_shutdown_during_asynchronous_request ; "test upgrades", `Quick, test_upgrade + ; "test upgrade where server does not upgrade", `Quick, test_upgrade_where_server_does_not_upgrade ] From 40ae33c125599126e66cc8626291e879025fb7d9 Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 12:36:26 +0100 Subject: [PATCH 14/30] refactor --- lib/server_connection.ml | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 6baffe3e..64f4ea80 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -275,18 +275,20 @@ let rec _next_write_operation t = | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer - | Complete -> _final_write_operation_for t reqd - | Upgraded -> + | Complete -> _final_write_operation_for t reqd ~upgrade:false + | Upgraded -> _final_write_operation_for t reqd ~upgrade:true + ) + +and _final_write_operation_for t reqd ~upgrade = + let next = + if upgrade then ( if Writer.has_pending_output t.writer then (* Even in the Upgrade case, we're still responsible for writing the response header, so we might have work to do. *) Writer.next t.writer - else `Upgrade - ) - -and _final_write_operation_for t reqd = - let next = - if not (Reqd.persistent_connection reqd) then ( + else + `Upgrade + ) else if not (Reqd.persistent_connection reqd) then ( shutdown_writer t; Writer.next t.writer; ) else ( From 5fcb69a5c57fc369b3d92cd65effe6f1f0856b27 Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 14:55:21 +0100 Subject: [PATCH 15/30] More fixes for when the upgrade request is declined --- lib/reqd.ml | 31 ++++++++++++++++++++++-------- lib/server_connection.ml | 2 ++ lib_test/test_server_connection.ml | 18 ++++++++++++----- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index b8d915b3..40c989aa 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -44,6 +44,7 @@ end module Input_state = struct type t = + | Waiting | Ready | Complete | Upgraded @@ -189,11 +190,14 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = let respond_with_upgrade ?reason t headers = match t.response_state with | Waiting -> - let response = Response.create ?reason ~headers `Switching_protocols in - t.response_state <- Upgrade response; - Body.close_reader t.request_body; - Writer.write_response t.writer response; - Writer.wakeup t.writer; + if not (Request.is_upgrade t.request) then + failwith "httpaf.Reqd.respond_with_upgrade: request was not an upgrade request" + else ( + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.close_reader t.request_body; + Writer.write_response t.writer response; + Writer.wakeup t.writer); | Streaming _ -> failwith "httpaf.Reqd.respond_with_upgrade: response already started" | Upgrade _ @@ -248,12 +252,23 @@ let persistent_connection t = t.persistent let input_state t : Input_state.t = - match t.response_state with - | Upgrade _ -> Upgraded - | Waiting | Fixed _ | Streaming _ -> + let upgrade_status = + match Request.is_upgrade t.request with + | false -> `Not_upgrading + | true -> + match t.response_state with + | Upgrade _ -> `Finished_upgrading + | Fixed _ | Streaming _ -> `Upgrade_declined + | Waiting -> `Upgrade_in_progress + in + match upgrade_status with + | `Finished_upgrading -> Upgraded + | `Not_upgrading | `Upgrade_declined -> if Body.is_closed t.request_body then Complete else Ready + | `Upgrade_in_progress -> + Waiting ;; let output_state t : Output_state.t = diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 64f4ea80..53b308ea 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -197,6 +197,7 @@ let rec _next_read_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd | Upgraded -> `Upgrade @@ -293,6 +294,7 @@ and _final_write_operation_for t reqd ~upgrade = Writer.next t.writer; ) else ( match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Writer.next t.writer; | Upgraded -> `Upgrade | Complete -> diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 33064c26..cbe969f4 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -967,16 +967,24 @@ let test_upgrade () = let test_upgrade_where_server_does_not_upgrade () = let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in - let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in - let request_handler reqd = - Reqd.respond_with_string reqd response "" - in + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in let t = create request_handler in read_request t (Request.create `GET "/" ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); - Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); + (* At this point, we don't know if the response handler will call respond_with_upgrade + or not. So we pause the reader until that is determined. *) + Alcotest.check read_operation "Reader is `Yield during upgrade negotiation" + `Yield (current_read_operation t); + + (* Now pretend the user doesn't want to do the upgrade and make sure we close the + connection *) + let reqd = Option.get !reqd in + let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in + Reqd.respond_with_string reqd response ""; write_response t response; + Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); Alcotest.check write_operation "Writer is `Close" (`Close 0) (current_write_operation t); ;; From 681d36324fbf477235a43556d03364dcd5d5b041 Mon Sep 17 00:00:00 2001 From: David House Date: Fri, 30 Apr 2021 11:14:56 +0100 Subject: [PATCH 16/30] Unwind changes to parser The previous commit asserted that the correct response to the server declining to upgrade was to close the connection. This isn't right. For instance, if the web server is using the Negotiate authentication scheme (https://tools.ietf.org/html/rfc4559), the workflow looks like this: - The client sends a request - The server responds with Not_authorized and 'WWW-Authenticate: Negotiate' - The client re-sends the same request with a 'Authorization: Negotiate ' header. So, if someone wants to do websockets on a kerberos-authenticated server (for instance), then we need to allow requests after a failed upgrade. I guess if the client sends an upgrade request and immediately starts speaking the new protocol without waiting for the response, we'll get a parse error, but there isn't much we can do. This commit achieves this by unwinding the changes to the parser. The declined-upgrade test is flipped round so that it proves subsequent requests *are* possible, rather than *are not* possible. --- lib/message.ml | 2 +- lib/parse.ml | 47 ++++++++++++------------------ lib_test/test_server_connection.ml | 17 +++++++---- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/lib/message.ml b/lib/message.ml index acf8b98e..c24ddbe3 100644 --- a/lib/message.ml +++ b/lib/message.ml @@ -41,7 +41,7 @@ let persistent_connection ?(proxy=false) version headers = (* XXX(seliopou): use proxy argument in the case of HTTP/1.0 as per https://tools.ietf.org/html/rfc7230#section-6.3 *) match Headers.get headers "connection" with - | Some ("close" | "upgrade") -> false + | Some "close" -> false | Some "keep-alive" -> Version.(compare version v1_0) >= 0 | _ -> Version.(compare version v1_1) >= 0 diff --git a/lib/parse.ml b/lib/parse.ml index 925e5f02..664e3c3b 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -212,22 +212,13 @@ module Reader = struct | `Invalid_response_body_length of Response.t | `Parse of string list * string ] - type parser_result = - | Can_continue - | Stop - type 'error parse_state = - | Done of parser_result + | Done | Fail of 'error - | Partial of - (Bigstringaf.t - -> off:int - -> len:int - -> AU.more - -> (parser_result, 'error) result AU.state) + | Partial of (Bigstringaf.t -> off:int -> len:int -> AU.more -> (unit, 'error) result AU.state) type 'error t = - { parser : (parser_result, 'error) result Angstrom.t + { parser : (unit, 'error) result Angstrom.t ; mutable parse_state : 'error parse_state (* The state of the parse for the current request *) ; mutable closed : bool @@ -240,25 +231,24 @@ module Reader = struct let create parser = { parser - ; parse_state = Done Can_continue + ; parse_state = Done ; closed = false } + let ok = return (Ok ()) + let request handler = let parser = request <* commit >>= fun request -> match Request.body_length request with - | `Error `Bad_request -> - return (Error (`Bad_request request)) + | `Error `Bad_request -> return (Error (`Bad_request request)) | `Fixed 0L -> handler request Body.empty; - (* If the client has requested an upgrade, then any bytes after the headers are - likely not HTTP, so we should be careful not to try to parse them. *) - return (Ok (if Request.is_upgrade request then Stop else Can_continue)) + ok | `Fixed _ | `Chunked as encoding -> let request_body = Body.create_reader Bigstringaf.empty in handler request request_body; - body ~encoding request_body *> return (Ok Can_continue) + body ~encoding request_body *> ok in create parser @@ -271,13 +261,13 @@ module Reader = struct | `Error `Internal_server_error -> return (Error (`Invalid_response_body_length response)) | `Fixed 0L -> handler response Body.empty; - return (Ok Can_continue) + ok | `Fixed _ | `Chunked | `Close_delimited as encoding -> (* We do not trust the length provided in the [`Fixed] case, as the client could DOS easily. *) let response_body = Body.create_reader Bigstringaf.empty in handler response response_body; - body ~encoding response_body *> return (Ok Can_continue) + body ~encoding response_body *> ok in create parser ;; @@ -288,8 +278,8 @@ module Reader = struct let transition t state = match state with - | AU.Done(consumed, Ok result) -> - t.parse_state <- Done result; + | AU.Done(consumed, Ok ()) -> + t.parse_state <- Done; consumed | AU.Done(consumed, Error error) -> t.parse_state <- Fail error; @@ -313,8 +303,8 @@ module Reader = struct let rec read_with_more t bs ~off ~len more = let consumed = match t.parse_state with - | Fail _ | Done Stop -> 0 - | Done Can_continue -> + | Fail _ -> 0 + | Done -> start t (AU.parse t.parser); read_with_more t bs ~off ~len more; | Partial continue -> @@ -334,10 +324,11 @@ module Reader = struct let next t = if t.closed then `Close - else + else ( match t.parse_state with | Fail err -> `Error err - | Done Stop -> `Close - | Done Can_continue | Partial _ -> `Read + | Done -> `Read + | Partial _ -> `Read + ) ;; end diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index cbe969f4..4cd6c085 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -967,8 +967,8 @@ let test_upgrade () = let test_upgrade_where_server_does_not_upgrade () = let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in - let reqd = ref None in - let request_handler reqd' = reqd := Some reqd' in + let reqd_ref = ref None in + let request_handler reqd = reqd_ref := Some reqd in let t = create request_handler in read_request t (Request.create `GET "/" @@ -980,12 +980,17 @@ let test_upgrade_where_server_does_not_upgrade () = (* Now pretend the user doesn't want to do the upgrade and make sure we close the connection *) - let reqd = Option.get !reqd in - let response = Response.create `Bad_request ~headers:(Headers.of_list upgrade_headers) in + let reqd = Option.get !reqd_ref in + let response = Response.create `Bad_request ~headers:(Headers.encoding_fixed 0) in + Reqd.respond_with_string reqd response ""; + write_response t response; + + (* The connection is left healthy and can be used for more requests *) + read_request t (Request.create `GET "/" ~headers:(Headers.encoding_fixed 0)); + let reqd = Option.get !reqd_ref in + let response = Response.create `OK ~headers:(Headers.encoding_fixed 0) in Reqd.respond_with_string reqd response ""; write_response t response; - Alcotest.check read_operation "Reader is `Close" `Close (current_read_operation t); - Alcotest.check write_operation "Writer is `Close" (`Close 0) (current_write_operation t); ;; let tests = From 90de89e594e1171059445a982bdec27b3cf389ce Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 24 May 2021 12:16:16 +0100 Subject: [PATCH 17/30] Fix compilation of examples --- examples/async/async_echo_post.ml | 5 ++++- examples/lwt/lwt_echo_post.ml | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/async/async_echo_post.ml b/examples/async/async_echo_post.ml index 0f524a2a..b0df8db5 100644 --- a/examples/async/async_echo_post.ml +++ b/examples/async/async_echo_post.ml @@ -10,7 +10,10 @@ let main port max_accepts_per_batch () = let where_to_listen = Tcp.Where_to_listen.of_port port in Tcp.(Server.create_sock ~on_handler_error:`Raise ~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen) - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler + ~request_handler + ~error_handler + ~upgrade_handler:None) >>= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; diff --git a/examples/lwt/lwt_echo_post.ml b/examples/lwt/lwt_echo_post.ml index 18307107..42d95b9f 100644 --- a/examples/lwt/lwt_echo_post.ml +++ b/examples/lwt/lwt_echo_post.ml @@ -12,7 +12,10 @@ let main port = Lwt.async (fun () -> Lwt_io.establish_server_with_client_socket listen_address - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler + ~request_handler + ~error_handler + ~upgrade_handler:None) >|= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; From 203f06c3e94e151f17341821e6699e1b7a35ce84 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Wed, 2 Jun 2021 20:26:45 -0400 Subject: [PATCH 18/30] update reader coercion in client tests --- lib_test/test_client_connection.ml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index ae0ad927..70564cd2 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -41,7 +41,7 @@ let read_response t r = let reader_ready t = Alcotest.check read_operation "Reader is ready" - `Read (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); + `Read (next_read_operation t :> Read_operation.t); ;; let write_string ?(msg="output written") t str = @@ -69,7 +69,7 @@ let writer_closed t = let connection_is_shutdown t = Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); + `Close (next_read_operation t :> Read_operation.t); writer_closed t; ;; From cee330cd1b15e01a9f13498ec2406fd820bf6c6d Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sat, 5 Jun 2021 15:44:42 -0400 Subject: [PATCH 19/30] http-upgrades: require no request body --- lib/parse.ml | 2 ++ lib/reqd.ml | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/parse.ml b/lib/parse.ml index 664e3c3b..a11bc517 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -245,6 +245,8 @@ module Reader = struct | `Fixed 0L -> handler request Body.empty; ok + | `Fixed _ | `Chunked when Request.is_upgrade request -> + return (Error (`Bad_request request)) | `Fixed _ | `Chunked as encoding -> let request_body = Body.create_reader Bigstringaf.empty in handler request request_body; diff --git a/lib/reqd.ml b/lib/reqd.ml index 40c989aa..275597e5 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -37,7 +37,7 @@ type error = module Response_state = struct type t = | Waiting - | Upgrade of Response.t + | Upgrade of Response.t | Fixed of Response.t | Streaming of Response.t * [`write] Body.t end @@ -195,7 +195,9 @@ let respond_with_upgrade ?reason t headers = else ( let response = Response.create ?reason ~headers `Switching_protocols in t.response_state <- Upgrade response; - Body.close_reader t.request_body; + (* The parser ensures it only passes empty bodies in the case of an + upgrade request *) + assert (Body.is_closed t.request_body); Writer.write_response t.writer response; Writer.wakeup t.writer); | Streaming _ -> From 81b4808f4df794eb769a604723e6de1e8842d0e5 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sat, 5 Jun 2021 15:45:16 -0400 Subject: [PATCH 20/30] http-upgrades: refactor input_state logic The removed indirection lets you trace responses to their conditions more easily. --- lib/reqd.ml | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index 275597e5..e77a4a25 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -254,23 +254,13 @@ let persistent_connection t = t.persistent let input_state t : Input_state.t = - let upgrade_status = - match Request.is_upgrade t.request with - | false -> `Not_upgrading - | true -> - match t.response_state with - | Upgrade _ -> `Finished_upgrading - | Fixed _ | Streaming _ -> `Upgrade_declined - | Waiting -> `Upgrade_in_progress - in - match upgrade_status with - | `Finished_upgrading -> Upgraded - | `Not_upgrading | `Upgrade_declined -> + match t.response_state with + | Upgrade _ -> Upgraded + | Waiting when Request.is_upgrade t.request -> Waiting + | _ -> if Body.is_closed t.request_body then Complete else Ready - | `Upgrade_in_progress -> - Waiting ;; let output_state t : Output_state.t = From 9faf49c412e238b6c01d3d736bc7e02b7c9cd6ac Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sat, 5 Jun 2021 15:46:27 -0400 Subject: [PATCH 21/30] http-upgrades: add tests and fix read loop issue --- lib/server_connection.ml | 9 ++- lib_test/helpers.ml | 1 + lib_test/test_server_connection.ml | 126 ++++++++++++++++++++++------- 3 files changed, 104 insertions(+), 32 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 53b308ea..83f8d6e9 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -196,8 +196,15 @@ let rec _next_read_operation t = Reader.next t.reader ) else ( let reqd = current_reqd_exn t in + (* XXX(dpatti): This fails for the same reason as my comment below in the + final_read_operation section. I played around with some alternatives and + believe I have some more improvements to the request queue mechanism that + removes the need for two hacks. *) match Reqd.input_state reqd with - | Waiting -> `Yield + | Waiting -> + if Reader.is_closed t.reader + then Reader.next t.reader + else `Yield | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd | Upgraded -> `Upgrade diff --git a/lib_test/helpers.ml b/lib_test/helpers.ml index 29f8dc2f..eb4840af 100644 --- a/lib_test/helpers.ml +++ b/lib_test/helpers.ml @@ -72,4 +72,5 @@ module Headers = struct let connection_close = Headers.of_list ["connection", "close"] let encoding_chunked = Headers.of_list ["transfer-encoding", "chunked"] let encoding_fixed n = Headers.of_list ["content-length", string_of_int n] + let upgrade protocol = Headers.of_list ["connection", "upgrade" ; "upgrade", protocol] end diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 4cd6c085..05508ec7 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -212,6 +212,11 @@ let reader_closed t = `Close (current_read_operation t); ;; +let reader_upgraded t = + Alcotest.check read_operation "Reader is upgraded" + `Upgrade (current_read_operation t); +;; + (* Checks that the [len] prefixes of expected and the write match, and returns the rest. *) let write_partial_string ?(msg="output written") t expected len = @@ -264,6 +269,11 @@ let writer_closed ?(unread = 0) t = (`Close unread) (current_write_operation t); ;; +let writer_upgraded t = + Alcotest.check write_operation "Writer is upgraded" + `Upgrade (current_write_operation t); +;; + let connection_is_shutdown t = reader_closed t; writer_closed t; @@ -305,6 +315,16 @@ let streaming_handler ?(flush=false) response writes reqd = write (); ;; +let capture_handler () = + let fail _ = failwith "Captured handler was not invoked" in + let capture = ref fail in + let respond reqd f = + capture := fail; + f reqd + in + capture, (fun reqd -> capture := respond reqd) +;; + let synchronous_raise reqd = Reqd.report_exn reqd (Failure "caught this exception") ;; @@ -950,47 +970,87 @@ let test_shutdown_during_asynchronous_request () = ;; let test_upgrade () = - let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in - let request_handler reqd = - Reqd.respond_with_upgrade reqd - (Headers.of_list upgrade_headers) - in + let headers = Headers.upgrade "foo" in + let request_handler reqd = Reqd.respond_with_upgrade reqd headers in let t = create request_handler in - read_request t - (Request.create `GET "/" - ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); - Alcotest.check read_operation "Reader is `Upgrade" `Upgrade (current_read_operation t); - write_response t - (Response.create `Switching_protocols ~headers:(Headers.of_list upgrade_headers)); - Alcotest.check write_operation "Writer is `Upgrade" `Upgrade (current_write_operation t); + read_request t (Request.create `GET "/" ~headers); + reader_upgraded t; + write_response t (Response.create `Switching_protocols ~headers); + writer_upgraded t; ;; let test_upgrade_where_server_does_not_upgrade () = - let upgrade_headers =["Connection", "upgrade" ; "Upgrade", "foo"] in - let reqd_ref = ref None in - let request_handler reqd = reqd_ref := Some reqd in - let t = create request_handler in - read_request t - (Request.create `GET "/" - ~headers:(Headers.of_list (("Content-Length", "0") :: upgrade_headers))); + let respond, handler = capture_handler () in + let t = create handler in + read_request t (Request.create `GET "/" ~headers:(Headers.upgrade "foo")); (* At this point, we don't know if the response handler will call respond_with_upgrade or not. So we pause the reader until that is determined. *) - Alcotest.check read_operation "Reader is `Yield during upgrade negotiation" - `Yield (current_read_operation t); + reader_yielded t; (* Now pretend the user doesn't want to do the upgrade and make sure we close the connection *) - let reqd = Option.get !reqd_ref in - let response = Response.create `Bad_request ~headers:(Headers.encoding_fixed 0) in - Reqd.respond_with_string reqd response ""; - write_response t response; + !respond (fun reqd -> + let response = Response.create `Bad_request ~headers:(Headers.encoding_fixed 0) in + Reqd.respond_with_string reqd response ""; + write_response t response); (* The connection is left healthy and can be used for more requests *) read_request t (Request.create `GET "/" ~headers:(Headers.encoding_fixed 0)); - let reqd = Option.get !reqd_ref in - let response = Response.create `OK ~headers:(Headers.encoding_fixed 0) in - Reqd.respond_with_string reqd response ""; - write_response t response; + !respond (fun reqd -> + let response = Response.create `OK ~headers:(Headers.encoding_fixed 0) in + Reqd.respond_with_string reqd response ""; + write_response t response); +;; + +let test_upgrade_with_initial_data () = + let headers = Headers.upgrade "foo" in + let request_handler reqd = Reqd.respond_with_upgrade reqd headers in + let t = create request_handler in + let payload = request_to_string (Request.create `GET "/" ~headers) ^ "foo" in + let c = feed_string t payload in + Alcotest.(check int) "read consumes headers" 53 c; + reader_upgraded t; + write_response t (Response.create `Switching_protocols ~headers); + writer_upgraded t; +;; + +let test_upgrade_with_bad_body_length () = + let headers = Headers.upgrade "foo" in + let request_handler reqd = Reqd.respond_with_upgrade reqd headers in + let t = create request_handler in + read_request t + (Request.create `GET "/" ~headers:Headers.(headers @ encoding_fixed 100)); + reader_closed t; + write_response t (Response.create `Bad_request) ~body:"400"; + writer_closed t; +;; + +let test_asynchronous_upgrade () = + let headers = Headers.upgrade "foo" in + let respond, handler = capture_handler () in + let t = create handler in + read_request t (Request.create `GET "/" ~headers); + reader_yielded t; + + !respond (fun reqd -> Reqd.respond_with_upgrade reqd headers); + reader_upgraded t; + write_response t (Response.create `Switching_protocols ~headers); + writer_upgraded t; +;; + +let test_upgrade_interrupted_by_shutdown () = + let headers = Headers.upgrade "foo" in + let respond, handler = capture_handler () in + let t = create handler in + read_request t (Request.create `GET "/" ~headers); + reader_yielded t; + + shutdown t; + (* XXX(dpatti): If we call this, we try to write to the closed writer *) + (* !respond (fun reqd -> Reqd.respond_with_upgrade reqd headers); *) + ignore respond; + reader_closed t; + writer_closed t; ;; let tests = @@ -1024,6 +1084,10 @@ let tests = ; "response finished before body read", `Quick, test_response_finished_before_body_read ; "shutdown in request handler", `Quick, test_shutdown_in_request_handler ; "shutdown during asynchronous request", `Quick, test_shutdown_during_asynchronous_request - ; "test upgrades", `Quick, test_upgrade - ; "test upgrade where server does not upgrade", `Quick, test_upgrade_where_server_does_not_upgrade + ; "upgrade", `Quick, test_upgrade + ; "upgrade where server does not upgrade", `Quick, test_upgrade_where_server_does_not_upgrade + ; "upgrade with initial data", `Quick, test_upgrade_with_initial_data + ; "upgrade with bad body length", `Quick, test_upgrade_with_bad_body_length + ; "asynchronous upgrade", `Quick, test_asynchronous_upgrade + ; "upgrade interrupted by shutdown", `Quick, test_upgrade_interrupted_by_shutdown ] From d3377dd948ad8ee0c5994d42e6f98960e018ea73 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 7 Jun 2021 10:44:59 +0100 Subject: [PATCH 22/30] fix build --- lib/reqd.ml | 2 +- lib/server_connection.ml | 8 -------- lib_test/test_client_connection.ml | 10 +--------- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index d571bbfa..52741d2e 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -197,7 +197,7 @@ let respond_with_upgrade ?reason t headers = t.response_state <- Upgrade response; (* The parser ensures it only passes empty bodies in the case of an upgrade request *) - assert (Body.is_closed t.request_body); + assert (Body.Reader.is_closed t.request_body); Writer.write_response t.writer response; Writer.wakeup t.writer); | Streaming _ -> diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 5178ac16..367247fa 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -201,14 +201,10 @@ let rec _next_read_operation t = believe I have some more improvements to the request queue mechanism that removes the need for two hacks. *) match Reqd.input_state reqd with -<<<<<<< HEAD - | Waiting -> `Yield -======= | Waiting -> if Reader.is_closed t.reader then Reader.next t.reader else `Yield ->>>>>>> origin/http-upgrades | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd | Upgraded -> `Upgrade @@ -220,9 +216,6 @@ and _final_read_operation_for t reqd = Reader.next t.reader; ) else ( match Reqd.output_state reqd with -<<<<<<< HEAD - | Waiting | Ready -> `Yield -======= | Waiting | Ready -> (* XXX(dpatti): This is a way in which the reader and writer are not parallel -- we tell the writer when it needs to yield but the reader is @@ -236,7 +229,6 @@ and _final_read_operation_for t reqd = if Reader.is_closed t.reader then Reader.next t.reader else `Yield ->>>>>>> origin/http-upgrades | Upgraded -> `Upgrade | Complete -> advance_request_queue t; diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index 88dace1a..471e5ebe 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -41,16 +41,12 @@ let read_response t r = let reader_ready t = Alcotest.check read_operation "Reader is ready" -<<<<<<< HEAD - `Read (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); -======= `Read (next_read_operation t :> Read_operation.t); ->>>>>>> origin/http-upgrades ;; let reader_closed t = Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t :> [`Close | `Read | `Yield]); + `Close (next_read_operation t :> Read_operation.t); ;; let write_string ?(msg="output written") t str = @@ -78,11 +74,7 @@ let writer_closed t = let connection_is_shutdown t = Alcotest.check read_operation "Reader is closed" -<<<<<<< HEAD - `Close (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); -======= `Close (next_read_operation t :> Read_operation.t); ->>>>>>> origin/http-upgrades writer_closed t; ;; From 5264434442e5fe2cb98bdeef96810a34d3e7fa13 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 7 Jun 2021 10:50:25 +0100 Subject: [PATCH 23/30] improve comment --- lib/httpaf.mli | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/httpaf.mli b/lib/httpaf.mli index 1b90ffa9..cfe10496 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -660,6 +660,13 @@ module Reqd : sig val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> Body.Writer.t val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit + (** Initiate an HTTP upgrade. [Server_connection.next_write_request] and + [next_read_request] will begin returning [`Upgrade] once the response headers have + been written, which indicates that the runtime should take over direct control of + the socket rather than shuttling bytes through httpaf. + + The headers must indicate a valid upgrade message, e.g. must include "Connection: + upgrade". *) (** {3 Exception Handling} *) From 31a6c1a5adc09ec772c26d9668b9b50ce71e6180 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Wed, 12 Jan 2022 16:27:36 -0500 Subject: [PATCH 24/30] minor refactoring No intended change to functionality --- lib/reqd.ml | 2 +- lib/server_connection.ml | 29 +++++++++++++++-------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index 1b86f6a4..2d09d3c1 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -263,7 +263,7 @@ let input_state t : Input_state.t = match t.response_state with | Upgrade _ -> Upgraded | Waiting when Request.is_upgrade t.request -> Waiting - | _ -> + | Waiting | Fixed _ | Streaming _ -> if Body.Reader.is_closed t.request_body then Complete else Ready diff --git a/lib/server_connection.ml b/lib/server_connection.ml index c178a3f2..a6b7941b 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -254,7 +254,10 @@ and _final_read_operation_for t reqd = if Reader.is_closed t.reader then Reader.next t.reader else `Yield - | Upgraded -> `Upgrade + | Upgraded -> + (* If the input state is not [Upgraded], the output state cannot be + either. *) + assert false | Complete -> advance_request_queue t; _next_read_operation t; @@ -303,20 +306,18 @@ let rec _next_write_operation t = | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer - | Complete -> _final_write_operation_for t reqd ~upgrade:false - | Upgraded -> _final_write_operation_for t reqd ~upgrade:true - ) - -and _final_write_operation_for t reqd ~upgrade = + | Complete -> _final_write_operation_for t reqd + | Upgraded -> + wakeup_reader t; + (* Even in the Upgrade case, we're still responsible for writing the + response header, so we might have work to do. *) + if Writer.has_pending_output t.writer + then Writer.next t.writer + else `Upgrade) + +and _final_write_operation_for t reqd = let next = - if upgrade then ( - if Writer.has_pending_output t.writer then - (* Even in the Upgrade case, we're still responsible for writing the response - header, so we might have work to do. *) - Writer.next t.writer - else - `Upgrade - ) else if not (Reqd.persistent_connection reqd) then ( + if not (Reqd.persistent_connection reqd) then ( shutdown_writer t; Writer.next t.writer; ) else ( From 0c2ede1aaae51c3f06c0902bd2f887946b21cf63 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Wed, 12 Jan 2022 18:06:31 -0500 Subject: [PATCH 25/30] expose is_upgrade and fix up documentation --- lib/httpaf.mli | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/httpaf.mli b/lib/httpaf.mli index cfe10496..aea08a1f 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -564,6 +564,10 @@ module Request : sig more details. *) val pp_hum : Format.formatter -> t -> unit [@@ocaml.toplevel_printer] + + val is_upgrade : t -> bool + (** [is_upgrade t] returns true if the request has the "Connection: upgrade" + header. *) end @@ -661,12 +665,13 @@ module Reqd : sig val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit (** Initiate an HTTP upgrade. [Server_connection.next_write_request] and - [next_read_request] will begin returning [`Upgrade] once the response headers have - been written, which indicates that the runtime should take over direct control of - the socket rather than shuttling bytes through httpaf. + [next_read_request] will begin returning [`Upgrade] once the response + headers have been written, which indicates that the runtime should take + over direct control of the socket rather than shuttling bytes through + httpaf. - The headers must indicate a valid upgrade message, e.g. must include "Connection: - upgrade". *) + The headers must indicate a valid upgrade message, e.g. must include + "Connection: upgrade". See [Request.is_upgrade]. *) (** {3 Exception Handling} *) From 138405079d32460c2b2cb4ce11512aa29d4e2ef0 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Mon, 17 Jan 2022 11:59:18 -0500 Subject: [PATCH 26/30] fix httpaf-async upgrade implementation We don't want to invoke the handler separately in the read loop and write loop, we want to wait until both are upgraded and then invoke. I included a script that wraps `nc` as a way of testing. We don't have upgrade support on the clients yet, and we can't use something like `wscat` without also implementing the entire websocket stack, which is more trouble than it seems worth right now. This still has an issue that you can easily demonstrate with the included script -- the "hello" message gets read into the `Buffer` along with the headers and never passed along to the client. I don't know how we should really address this, but we can't put data back onto the `fd` that we intend to pass on to the handler, so we have to surface the buffered data somehow. --- async/httpaf_async.ml | 34 ++++++++++----------- async/httpaf_async.mli | 2 +- examples/async/async_echo_upgrade.ml | 44 ++++++++++++++++++++++++++++ examples/async/dune | 2 +- examples/lib/httpaf_examples.ml | 10 +++++++ examples/script/upgrade-connect | 13 ++++++++ 6 files changed, 85 insertions(+), 20 deletions(-) create mode 100644 examples/async/async_echo_upgrade.ml create mode 100755 examples/script/upgrade-connect diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 20a7908f..28df32ff 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -91,7 +91,7 @@ open Httpaf module Server = struct let create_connection_handler - ?(config=Config.default) ~request_handler ~upgrade_handler ~error_handler = + ?(config=Config.default) ~request_handler ~error_handler ~upgrade_handler = fun client_addr socket -> let fd = Socket.fd socket in let writev = Faraday_async.writev_of_fd fd in @@ -99,6 +99,19 @@ module Server = struct let error_handler = error_handler client_addr in let conn = Server_connection.create ~config ~error_handler request_handler in let read_complete = Ivar.create () in + let write_complete = Ivar.create () in + let upgrade_read, upgrade_write = Ivar.create (), Ivar.create () in + upon + (Deferred.both (Ivar.read upgrade_read) (Ivar.read upgrade_write)) + (fun ((), ()) -> + match upgrade_handler with + | None -> failwith "HTTP upgrades not supported" + | Some upgrade_handler -> + upgrade_handler client_addr (Reader.create fd) (Writer.create fd) + >>> fun () -> + if not (Fd.is_closed fd) then Socket.shutdown socket `Both; + Ivar.fill read_complete (); + Ivar.fill write_complete ()); let buffer = Buffer.create config.read_buffer_size in let rec reader_thread () = match Server_connection.next_read_operation conn with @@ -120,21 +133,13 @@ module Server = struct | `Yield -> (* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_reader conn reader_thread - | `Upgrade -> - (match upgrade_handler with - | None -> failwith "HTTP upgrades not supported" - | Some upgrade_handler -> - upon (upgrade_handler client_addr) (fun () -> - Ivar.fill read_complete (); - if not (Fd.is_closed fd) - then Socket.shutdown socket `Receive)) + | `Upgrade -> Ivar.fill upgrade_read () | `Close -> (* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill read_complete (); if not (Fd.is_closed fd) then Socket.shutdown socket `Receive in - let write_complete = Ivar.create () in let rec writer_thread () = match Server_connection.next_write_operation conn with | `Write iovecs -> @@ -145,14 +150,7 @@ module Server = struct | `Yield -> (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_writer conn writer_thread; - | `Upgrade -> - (match upgrade_handler with - | None -> failwith "HTTP upgrades not supported" - | Some upgrade_handler -> - upon (upgrade_handler client_addr) (fun () -> - Ivar.fill write_complete (); - if not (Fd.is_closed fd) - then Socket.shutdown socket `Send)) + | `Upgrade -> Ivar.fill upgrade_write () | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index ee9bad5e..cdd89da7 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -7,8 +7,8 @@ module Server : sig val create_connection_handler : ?config : Config.t -> request_handler : ('a -> Server_connection.request_handler) - -> upgrade_handler : ('a -> unit Deferred.t) option -> error_handler : ('a -> Server_connection.error_handler) + -> upgrade_handler : ('a -> Reader.t -> Writer.t -> unit Deferred.t) option -> ([< Socket.Address.t] as 'a) -> ([`Active], 'a) Socket.t -> unit Deferred.t diff --git a/examples/async/async_echo_upgrade.ml b/examples/async/async_echo_upgrade.ml new file mode 100644 index 00000000..cbf7d910 --- /dev/null +++ b/examples/async/async_echo_upgrade.ml @@ -0,0 +1,44 @@ +open Core +open Async + +open Httpaf_async + +let request_handler (_ : Socket.Address.Inet.t) = Httpaf_examples.Server.upgrade +let error_handler (_ : Socket.Address.Inet.t) = Httpaf_examples.Server.error_handler + +let upgrade_handler (_ : Socket.Address.Inet.t) reader writer = + Reader.read_one_chunk_at_a_time reader ~handle_chunk:(fun bigstring ~pos ~len -> + Writer.write_bigstring writer bigstring ~pos ~len; + return `Continue) + >>| function + | `Eof | `Stopped _ | `Eof_with_unconsumed_data _ -> () +;; + +let main port max_accepts_per_batch () = + let where_to_listen = Tcp.Where_to_listen.of_port port in + Tcp.(Server.create_sock ~on_handler_error:`Raise + ~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen) + (Server.create_connection_handler + ~request_handler + ~error_handler + ~upgrade_handler:(Some upgrade_handler)) + >>= fun _server -> + Stdio.printf "Listening on port %i, upgrading, and echoing data.\n" port; + Stdio.printf "To send an interactive upgrade request, try\n\n"; + Stdio.printf " examples/script/upgrade-connect\n%!"; + Deferred.never () +;; + +let () = + Command.async + ~summary:"Echo POST requests" + Command.Param.( + map (both + (flag "-p" (optional_with_default 8080 int) + ~doc:"int Source port to listen on") + (flag "-a" (optional_with_default 1 int) + ~doc:"int Maximum accepts per batch")) + ~f:(fun (port, accepts) -> + (fun () -> main port accepts ()))) + |> Command.run +;; diff --git a/examples/async/dune b/examples/async/dune index 4008a21d..b107d5cf 100644 --- a/examples/async/dune +++ b/examples/async/dune @@ -1,6 +1,6 @@ (executables (libraries httpaf httpaf-async httpaf_examples async core) - (names async_echo_post async_get async_post)) + (names async_echo_post async_echo_upgrade async_get async_post)) (alias (name examples) diff --git a/examples/lib/httpaf_examples.ml b/examples/lib/httpaf_examples.ml index fd049718..0dca1c91 100644 --- a/examples/lib/httpaf_examples.ml +++ b/examples/lib/httpaf_examples.ml @@ -86,4 +86,14 @@ module Server = struct end; Body.Writer.close response_body ;; + + let upgrade reqd = + if Request.is_upgrade (Reqd.request reqd) then ( + let headers = Headers.of_list [ "connection", "upgrade" ] in + Reqd.respond_with_upgrade reqd headers; + ) else ( + let headers = Headers.of_list [ "connection", "close" ] in + Reqd.respond_with_string reqd (Response.create ~headers `Not_found) "" + ) + ;; end diff --git a/examples/script/upgrade-connect b/examples/script/upgrade-connect new file mode 100755 index 00000000..416d30b7 --- /dev/null +++ b/examples/script/upgrade-connect @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -euo pipefail + +function headers { + printf "\ +GET / HTTP/1.1\r +Host: localhost\r +Connection: upgrade\r +\r +" +} + +( headers; echo hello; cat; echo bye ) | nc localhost 8080 --close From 529410595ff1367503c1e5363b7a097b9306b9e0 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Mon, 7 Mar 2022 14:48:56 -0500 Subject: [PATCH 27/30] fix lwt implementation and add upgrade helper Give it the same treatment as the async implementation in the parent commit. --- examples/lwt/dune | 2 +- examples/lwt/lwt_echo_upgrade.ml | 48 ++++++++++++++++++++++++++++++++ lwt-unix/httpaf_lwt_unix.ml | 45 +++++++++++++++++------------- lwt-unix/httpaf_lwt_unix.mli | 2 +- 4 files changed, 76 insertions(+), 21 deletions(-) create mode 100644 examples/lwt/lwt_echo_upgrade.ml diff --git a/examples/lwt/dune b/examples/lwt/dune index fe8f8b0a..7760e3c2 100644 --- a/examples/lwt/dune +++ b/examples/lwt/dune @@ -1,6 +1,6 @@ (executables (libraries httpaf httpaf-lwt-unix httpaf_examples base stdio lwt lwt.unix) - (names lwt_get lwt_post lwt_echo_post)) + (names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade)) (alias (name examples) diff --git a/examples/lwt/lwt_echo_upgrade.ml b/examples/lwt/lwt_echo_upgrade.ml new file mode 100644 index 00000000..447aa869 --- /dev/null +++ b/examples/lwt/lwt_echo_upgrade.ml @@ -0,0 +1,48 @@ +open Base +open Lwt.Infix +module Arg = Caml.Arg + +open Httpaf_lwt_unix + +let request_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.upgrade +let error_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.error_handler + +let upgrade_handler (_ : Unix.sockaddr) (fd : Lwt_unix.file_descr) = + let input = Lwt_io.of_fd fd ~mode:Input in + let output = Lwt_io.of_fd fd ~mode:Output in + let rec loop () = + Lwt_io.read input ~count:4096 + >>= fun data -> + Lwt_io.write output data + >>= fun () -> + loop () + in + loop () +;; + +let main port = + let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in + Lwt.async (fun () -> + Lwt_io.establish_server_with_client_socket + listen_address + (Server.create_connection_handler + ~request_handler + ~error_handler + ~upgrade_handler:(Some upgrade_handler)) + >|= fun _server -> + Stdio.printf "Listening on port %i, upgrading, and echoing data.\n" port; + Stdio.printf "To send an interactive upgrade request, try\n\n"; + Stdio.printf " examples/script/upgrade-connect\n%!"); + let forever, _ = Lwt.wait () in + Lwt_main.run forever +;; + +let () = + let port = ref 8080 in + Arg.parse + ["-p", Arg.Set_int port, " Listening port number (8080 by default)"] + ignore + "Echoes POST requests. Runs forever."; + main !port +;; + diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index bdd71178..ec132ba1 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -119,6 +119,28 @@ module Server = struct let read_buffer = Buffer.create config.read_buffer_size in let read_loop_exited, notify_read_loop_exited = Lwt.wait () in + let write_loop_exited, notify_write_loop_exited = Lwt.wait () in + + let (upgrade_read, notify_upgrade_read), (upgrade_write, notify_upgrade_write) = + Lwt.wait (), Lwt.wait () + in + Lwt.async (fun () -> + upgrade_read + >>= fun () -> + upgrade_write + >>= fun () -> + match upgrade_handler with + | None -> Lwt.fail_with "HTTP upgrades not supported" + | Some upgrade_handler -> + upgrade_handler client_addr socket + >>= fun () -> + if (Lwt_unix.state socket = Lwt_unix.Closed) + then Lwt.return_unit + else Lwt_unix.close socket + >>= fun () -> + Lwt.wakeup_later notify_read_loop_exited (); + Lwt.wakeup_later notify_write_loop_exited (); + Lwt.return_unit); let rec read_loop () = let rec read_loop_step () = @@ -142,15 +164,8 @@ module Server = struct Lwt.return_unit | `Upgrade -> - (match upgrade_handler with - | None -> failwith "HTTP upgrades not supported" - | Some upgrade_handler -> - upgrade_handler client_addr >>= fun () -> - Lwt.wakeup_later notify_read_loop_exited (); - if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin - shutdown socket Unix.SHUTDOWN_RECEIVE - end; - Lwt.return_unit) + Lwt.wakeup_later notify_upgrade_read (); + Lwt.return_unit | `Close -> Lwt.wakeup_later notify_read_loop_exited (); @@ -170,7 +185,6 @@ module Server = struct let writev = Faraday_lwt_unix.writev_of_fd socket in - let write_loop_exited, notify_write_loop_exited = Lwt.wait () in let rec write_loop () = let rec write_loop_step () = @@ -185,15 +199,8 @@ module Server = struct Lwt.return_unit | `Upgrade -> - (match upgrade_handler with - | None -> failwith "HTTP upgrades not supported" - | Some upgrade_handler -> - upgrade_handler client_addr >>= fun () -> - Lwt.wakeup_later notify_write_loop_exited (); - if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin - shutdown socket Unix.SHUTDOWN_SEND - end; - Lwt.return_unit) + Lwt.wakeup_later notify_upgrade_write (); + Lwt.return_unit | `Close _ -> Lwt.wakeup_later notify_write_loop_exited (); diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli index 90c81dde..d361ea91 100644 --- a/lwt-unix/httpaf_lwt_unix.mli +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -42,7 +42,7 @@ module Server : sig val create_connection_handler : ?config : Config.t -> request_handler : (Unix.sockaddr -> Server_connection.request_handler) - -> upgrade_handler : (Unix.sockaddr -> unit Lwt.t) option + -> upgrade_handler : (Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t) option -> error_handler : (Unix.sockaddr -> Server_connection.error_handler) -> Unix.sockaddr -> Lwt_unix.file_descr From c7eeda536da468490fa08eff8bd32aed30e943ab Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Mon, 7 Mar 2022 14:49:42 -0500 Subject: [PATCH 28/30] switch async upgrade to pass the socket --- async/httpaf_async.ml | 2 +- async/httpaf_async.mli | 2 +- examples/async/async_echo_upgrade.ml | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 28df32ff..cd2b2293 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -107,7 +107,7 @@ module Server = struct match upgrade_handler with | None -> failwith "HTTP upgrades not supported" | Some upgrade_handler -> - upgrade_handler client_addr (Reader.create fd) (Writer.create fd) + upgrade_handler client_addr socket >>> fun () -> if not (Fd.is_closed fd) then Socket.shutdown socket `Both; Ivar.fill read_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index cdd89da7..9c3eb82b 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -8,7 +8,7 @@ module Server : sig : ?config : Config.t -> request_handler : ('a -> Server_connection.request_handler) -> error_handler : ('a -> Server_connection.error_handler) - -> upgrade_handler : ('a -> Reader.t -> Writer.t -> unit Deferred.t) option + -> upgrade_handler : ('a -> ([`Active], 'a) Socket.t -> unit Deferred.t) option -> ([< Socket.Address.t] as 'a) -> ([`Active], 'a) Socket.t -> unit Deferred.t diff --git a/examples/async/async_echo_upgrade.ml b/examples/async/async_echo_upgrade.ml index cbf7d910..cb0c9c23 100644 --- a/examples/async/async_echo_upgrade.ml +++ b/examples/async/async_echo_upgrade.ml @@ -6,7 +6,10 @@ open Httpaf_async let request_handler (_ : Socket.Address.Inet.t) = Httpaf_examples.Server.upgrade let error_handler (_ : Socket.Address.Inet.t) = Httpaf_examples.Server.error_handler -let upgrade_handler (_ : Socket.Address.Inet.t) reader writer = +let upgrade_handler (_ : Socket.Address.Inet.t) socket = + let fd = Socket.fd socket in + let reader = Reader.create fd in + let writer = Writer.create fd in Reader.read_one_chunk_at_a_time reader ~handle_chunk:(fun bigstring ~pos ~len -> Writer.write_bigstring writer bigstring ~pos ~len; return `Continue) From 71e04868713170c7f7208ddb23b51f0a686a9700 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sat, 12 Mar 2022 12:45:46 -0500 Subject: [PATCH 29/30] fix lwt_echo_upgrade example We weren't handling empty reads correctly, so that meant it would just spin at 100% once it got EOF. --- examples/lwt/lwt_echo_upgrade.ml | 7 +++---- lwt-unix/httpaf_lwt_unix.ml | 5 ++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/lwt/lwt_echo_upgrade.ml b/examples/lwt/lwt_echo_upgrade.ml index 447aa869..e0acd5a0 100644 --- a/examples/lwt/lwt_echo_upgrade.ml +++ b/examples/lwt/lwt_echo_upgrade.ml @@ -12,10 +12,9 @@ let upgrade_handler (_ : Unix.sockaddr) (fd : Lwt_unix.file_descr) = let output = Lwt_io.of_fd fd ~mode:Output in let rec loop () = Lwt_io.read input ~count:4096 - >>= fun data -> - Lwt_io.write output data - >>= fun () -> - loop () + >>= function + | "" -> Lwt.return_unit + | data -> Lwt_io.write output data >>= loop in loop () ;; diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index ec132ba1..eec42234 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -121,9 +121,8 @@ module Server = struct let read_loop_exited, notify_read_loop_exited = Lwt.wait () in let write_loop_exited, notify_write_loop_exited = Lwt.wait () in - let (upgrade_read, notify_upgrade_read), (upgrade_write, notify_upgrade_write) = - Lwt.wait (), Lwt.wait () - in + let upgrade_read, notify_upgrade_read = Lwt.wait () in + let upgrade_write, notify_upgrade_write = Lwt.wait () in Lwt.async (fun () -> upgrade_read >>= fun () -> From 4944d007f79a6a101f14e1f6fc7a596a96acbda3 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Wed, 22 Jun 2022 19:28:53 -0400 Subject: [PATCH 30/30] move reader yields to separate function This is purely so that we can attach the big comment that describes why this is weird. The "improvements" that I mentioned in one of the comments are lost to time, but I believe it involved allowing the reader to be put into a yielded state once the parsing was complete and resumed once more input was requested. Ultimately, we won't spend the time trying to implement such a thing. --- lib/server_connection.ml | 38 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 42183a1a..1689e436 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -224,15 +224,8 @@ let rec _next_read_operation t = Reader.next t.reader ) else ( let reqd = current_reqd_exn t in - (* XXX(dpatti): This fails for the same reason as my comment below in the - final_read_operation section. I played around with some alternatives and - believe I have some more improvements to the request queue mechanism that - removes the need for two hacks. *) match Reqd.input_state reqd with - | Waiting -> - if Reader.is_closed t.reader - then Reader.next t.reader - else `Yield + | Waiting -> _yield_reader t | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd | Upgraded -> `Upgrade @@ -244,19 +237,7 @@ and _final_read_operation_for t reqd = Reader.next t.reader; ) else ( match Reqd.output_state reqd with - | Waiting | Ready -> - (* XXX(dpatti): This is a way in which the reader and writer are not - parallel -- we tell the writer when it needs to yield but the reader is - always asking for more data. This is the only branch in either - operation function that does not return `(Reader|Writer).next`, which - means there are surprising states you can get into. For example, we ask - the runtime to yield but then raise when it tries to because the reader - is closed. I don't think checking `is_closed` here makes sense - semantically, but I don't think checking it in `_next_read_operation` - makes sense either. I chose here so I could describe why. *) - if Reader.is_closed t.reader - then Reader.next t.reader - else `Yield + | Waiting | Ready -> _yield_reader t | Upgraded -> (* If the input state is not [Upgraded], the output state cannot be either. *) @@ -265,6 +246,21 @@ and _final_read_operation_for t reqd = advance_request_queue t; _next_read_operation t; ) + +and _yield_reader t = + (* XXX(dpatti): This is a way in which the reader and writer are not + parallel -- we tell the writer when it needs to yield but the reader is + always asking for more data. This is the only branch in either + operation function that does not return `(Reader|Writer).next`, which + means there are surprising states you can get into. For example, we ask + the runtime to yield but then raise when it tries to because the reader + is closed. I think this can be avoided if we allow this module to tell the + reader when it should yield/resume, then we'd just do an inlined + `Reader.next` call instead. I put this function here to describe why this + is subtle. *) + if Reader.is_closed t.reader + then Reader.next t.reader + else `Yield ;; let next_read_operation t =