Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http upgrades #203

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d73822b
refactor request queue mechanics
dpatti Jan 13, 2020
94e4480
refactor-request-queue: read loop no longer needs to wake writer
dpatti May 16, 2020
cce55fd
refactor-request-queue: fixes
dpatti Apr 2, 2021
0bab909
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
b761c7e
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
a49ea3c
Merge remote-tracking branch 'upstream/refactor-request-queue' into r…
dhouse-js Apr 27, 2021
949d7d9
Merge branch 'refactor-request-queue' into http-upgrades
dhouse-js Apr 27, 2021
124eed8
Fix requests where the client requests an upgrade but the server decl…
dhouse-js Apr 29, 2021
004e128
refactor
dhouse-js Apr 29, 2021
d1ff221
More fixes for when the upgrade request is declined
dhouse-js Apr 29, 2021
60f137b
Unwind changes to parser
dhouse-js Apr 30, 2021
2a36491
Merge branch 'master' into http-upgrades
dhouse-js May 24, 2021
d45b85c
Fix compilation of examples
dhouse-js May 24, 2021
05c797d
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
e6cda20
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
9ccf77b
Fix requests where the client requests an upgrade but the server decl…
dhouse-js Apr 29, 2021
40ae33c
refactor
dhouse-js Apr 29, 2021
5fcb69a
More fixes for when the upgrade request is declined
dhouse-js Apr 29, 2021
681d363
Unwind changes to parser
dhouse-js Apr 30, 2021
90de89e
Fix compilation of examples
dhouse-js May 24, 2021
203f06c
update reader coercion in client tests
dpatti Jun 3, 2021
cee330c
http-upgrades: require no request body
dpatti Jun 5, 2021
81b4808
http-upgrades: refactor input_state logic
dpatti Jun 5, 2021
9faf49c
http-upgrades: add tests and fix read loop issue
dpatti Jun 5, 2021
a978744
resolve conflicts
dhouse-js Jun 7, 2021
6167c51
merge with master
dhouse-js Jun 7, 2021
d3377dd
fix build
dhouse-js Jun 7, 2021
5264434
improve comment
dhouse-js Jun 7, 2021
052bdcf
fix conflicts
dhouse-js Jun 21, 2021
31a6c1a
minor refactoring
dpatti Jan 12, 2022
0c2ede1
expose is_upgrade and fix up documentation
dpatti Jan 12, 2022
0da6f10
Merge remote-tracking branch 'origin/master' into http-upgrades
dpatti Jan 17, 2022
1384050
fix httpaf-async upgrade implementation
dpatti Jan 17, 2022
5294105
fix lwt implementation and add upgrade helper
dpatti Mar 7, 2022
c7eeda5
switch async upgrade to pass the socket
dpatti Mar 7, 2022
71e0486
fix lwt_echo_upgrade example
dpatti Mar 12, 2022
4944d00
move reader yields to separate function
dpatti Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion async/httpaf_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ let read fd buffer =
open Httpaf

module Server = struct
let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler =
let create_connection_handler
?(config=Config.default) ~request_handler ~upgrade_handler ~error_handler =
fun client_addr socket ->
let fd = Socket.fd socket in
let writev = Faraday_async.writev_of_fd fd in
Expand Down Expand Up @@ -119,6 +120,14 @@ module Server = struct
| `Yield ->
(* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *)
Server_connection.yield_reader conn reader_thread
| `Upgrade ->
(match upgrade_handler with
| None -> failwith "HTTP upgrades not supported"
| Some upgrade_handler ->
upon (upgrade_handler client_addr) (fun () ->
Ivar.fill read_complete ();
if not (Fd.is_closed fd)
then Socket.shutdown socket `Receive))
| `Close ->
(* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *)
Ivar.fill read_complete ();
Expand All @@ -136,6 +145,14 @@ module Server = struct
| `Yield ->
(* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *)
Server_connection.yield_writer conn writer_thread;
| `Upgrade ->
(match upgrade_handler with
| None -> failwith "HTTP upgrades not supported"
| Some upgrade_handler ->
upon (upgrade_handler client_addr) (fun () ->
dpatti marked this conversation as resolved.
Show resolved Hide resolved
Ivar.fill write_complete ();
if not (Fd.is_closed fd)
then Socket.shutdown socket `Send))
| `Close _ ->
(* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *)
Ivar.fill write_complete ();
Expand Down
1 change: 1 addition & 0 deletions async/httpaf_async.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Server : sig
val create_connection_handler
: ?config : Config.t
-> request_handler : ('a -> Server_connection.request_handler)
-> upgrade_handler : ('a -> unit Deferred.t) option
-> error_handler : ('a -> Server_connection.error_handler)
-> ([< Socket.Address.t] as 'a)
-> ([`Active], 'a) Socket.t
Expand Down
5 changes: 4 additions & 1 deletion examples/async/async_echo_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ let main port max_accepts_per_batch () =
let where_to_listen = Tcp.Where_to_listen.of_port port in
Tcp.(Server.create_sock ~on_handler_error:`Raise
~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen)
(Server.create_connection_handler ~request_handler ~error_handler)
(Server.create_connection_handler
~request_handler
~error_handler
~upgrade_handler:None)
>>= fun _server ->
Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
Stdio.printf "To send a POST request, try one of the following\n\n";
Expand Down
5 changes: 4 additions & 1 deletion examples/lwt/lwt_echo_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ let main port =
Lwt.async (fun () ->
Lwt_io.establish_server_with_client_socket
listen_address
(Server.create_connection_handler ~request_handler ~error_handler)
(Server.create_connection_handler
~request_handler
~error_handler
~upgrade_handler:None)
>|= fun _server ->
Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
Stdio.printf "To send a POST request, try one of the following\n\n";
Expand Down
12 changes: 11 additions & 1 deletion lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,15 @@ module Reqd : sig
val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit
val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> Body.Writer.t

val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit
(** Initiate an HTTP upgrade. [Server_connection.next_write_request] and
[next_read_request] will begin returning [`Upgrade] once the response headers have
been written, which indicates that the runtime should take over direct control of
the socket rather than shuttling bytes through httpaf.

The headers must indicate a valid upgrade message, e.g. must include "Connection:
upgrade". *)

(** {3 Exception Handling} *)

val report_exn : t -> exn -> unit
Expand Down Expand Up @@ -700,7 +709,7 @@ module Server_connection : sig
(** [create ?config ?error_handler ~request_handler] creates a connection
handler that will service individual requests with [request_handler]. *)

val next_read_operation : t -> [ `Read | `Yield | `Close ]
val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ]
(** [next_read_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)

Expand All @@ -727,6 +736,7 @@ module Server_connection : sig
val next_write_operation : t -> [
| `Write of Bigstringaf.t IOVec.t list
| `Yield
| `Upgrade
| `Close of int ]
(** [next_write_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)
Expand Down
2 changes: 2 additions & 0 deletions lib/parse.ml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ module Reader = struct
| `Fixed 0L ->
handler request Body.Reader.empty;
ok
| `Fixed _ | `Chunked when Request.is_upgrade request ->
return (Error (`Bad_request request))
| `Fixed _ | `Chunked as encoding ->
let request_body = Body.Reader.create Bigstringaf.empty in
handler request request_body;
Expand Down
41 changes: 37 additions & 4 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,25 @@ type error =
module Response_state = struct
type t =
| Waiting
| Upgrade of Response.t
| Fixed of Response.t
| Streaming of Response.t * Body.Writer.t
end

module Input_state = struct
type t =
| Waiting
| Ready
| Complete
| Upgraded
end

module Output_state = struct
type t =
| Waiting
| Ready
| Complete
| Upgraded
end

type error_handler =
Expand Down Expand Up @@ -111,12 +115,14 @@ let response { response_state; _ } =
match response_state with
| Waiting -> None
| Streaming (response, _)
| Upgrade response
| Fixed response -> Some response

let response_exn { response_state; _ } =
match response_state with
| Waiting -> failwith "httpaf.Reqd.response_exn: response has not started"
| Streaming (response, _)
| Upgrade response
| Fixed response -> response

let respond_with_string t response str =
Expand All @@ -133,6 +139,7 @@ let respond_with_string t response str =
Writer.wakeup t.writer;
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_string: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_string: response already complete"

Expand All @@ -150,6 +157,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) =
Writer.wakeup t.writer;
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already complete"

Expand All @@ -175,6 +183,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
response_body
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already complete"

Expand All @@ -183,6 +192,25 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response =
failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
unsafe_respond_with_streaming ~flush_headers_immediately t response

let respond_with_upgrade ?reason t headers =
match t.response_state with
| Waiting ->
if not (Request.is_upgrade t.request) then
dpatti marked this conversation as resolved.
Show resolved Hide resolved
failwith "httpaf.Reqd.respond_with_upgrade: request was not an upgrade request"
else (
let response = Response.create ?reason ~headers `Switching_protocols in
t.response_state <- Upgrade response;
(* The parser ensures it only passes empty bodies in the case of an
upgrade request *)
assert (Body.Reader.is_closed t.request_body);
Writer.write_response t.writer response;
Writer.wakeup t.writer);
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_upgrade: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_upgrade: response already complete"

let report_error t error =
t.persistent <- false;
Body.Reader.close t.request_body;
Expand All @@ -207,7 +235,7 @@ let report_error t error =
| Streaming (_response, response_body), `Exn _ ->
Body.Writer.close response_body;
Writer.close_and_drain t.writer
| (Fixed _ | Streaming _ | Waiting) , _ ->
| (Fixed _ | Streaming _ | Waiting | Upgrade _) , _ ->
(* XXX(seliopou): Once additional logging support is added, log the error
* in case it is not spurious. *)
()
Expand All @@ -232,13 +260,18 @@ let persistent_connection t =
t.persistent

let input_state t : Input_state.t =
if Body.Reader.is_closed t.request_body
then Complete
else Ready
match t.response_state with
| Upgrade _ -> Upgraded
| Waiting when Request.is_upgrade t.request -> Waiting
| Waiting | Fixed _ | Streaming _ ->
if Body.Reader.is_closed t.request_body
then Complete
else Ready
;;

let output_state t : Output_state.t =
match t.response_state with
| Upgrade _ -> Upgraded
| Fixed _ -> Complete
| Streaming (_, response_body) ->
if Body.Writer.has_pending_output response_body
Expand Down
5 changes: 5 additions & 0 deletions lib/request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,8 @@ let persistent_connection ?proxy { version; headers; _ } =
let pp_hum fmt { meth; target; version; headers } =
Format.fprintf fmt "((method \"%a\") (target %S) (version \"%a\") (headers %a))"
Method.pp_hum meth target Version.pp_hum version Headers.pp_hum headers

let is_upgrade t =
match Headers.get t.headers "Connection" with
| None -> false
| Some header_val -> Headers.ci_equal header_val "upgrade"
2 changes: 2 additions & 0 deletions lib/serialize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,6 @@ module Writer = struct
| `Close -> `Close (drained_bytes t)
| `Yield -> `Yield
| `Writev iovecs -> `Write iovecs

let has_pending_output t = Faraday.has_pending_output t.encoder
end
25 changes: 23 additions & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,18 @@ let rec _next_read_operation t =
Reader.next t.reader
) else (
let reqd = current_reqd_exn t in
(* XXX(dpatti): This fails for the same reason as my comment below in the
final_read_operation section. I played around with some alternatives and
believe I have some more improvements to the request queue mechanism that
removes the need for two hacks. *)
dpatti marked this conversation as resolved.
Show resolved Hide resolved
match Reqd.input_state reqd with
| Waiting ->
if Reader.is_closed t.reader
then Reader.next t.reader
else `Yield
| Ready -> Reader.next t.reader
| Complete -> _final_read_operation_for t reqd
| Upgraded -> `Upgrade
)

and _final_read_operation_for t reqd =
Expand All @@ -245,6 +254,10 @@ and _final_read_operation_for t reqd =
if Reader.is_closed t.reader
then Reader.next t.reader
else `Yield
| Upgraded ->
(* If the input state is not [Upgraded], the output state cannot be
either. *)
assert false
| Complete ->
advance_request_queue t;
_next_read_operation t;
Expand All @@ -255,7 +268,7 @@ let next_read_operation t =
match _next_read_operation t with
| `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close
| `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close
| (`Read | `Yield | `Close) as operation -> operation
| (`Read | `Yield | `Close | `Upgrade) as operation -> operation

let rec read_with_more t bs ~off ~len more =
let call_handler = Queue.is_empty t.request_queue in
Expand Down Expand Up @@ -294,7 +307,13 @@ let rec _next_write_operation t =
Reqd.flush_response_body reqd;
Writer.next t.writer
| Complete -> _final_write_operation_for t reqd
)
| Upgraded ->
wakeup_reader t;
(* Even in the Upgrade case, we're still responsible for writing the
response header, so we might have work to do. *)
if Writer.has_pending_output t.writer
then Writer.next t.writer
else `Upgrade)

and _final_write_operation_for t reqd =
let next =
Expand All @@ -303,7 +322,9 @@ and _final_write_operation_for t reqd =
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Waiting -> `Yield
| Ready -> Writer.next t.writer;
| Upgraded -> `Upgrade
| Complete ->
advance_request_queue t;
_next_write_operation t;
Expand Down
9 changes: 6 additions & 3 deletions lib_test/helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@ let response_to_string ?body r =
Faraday.serialize_to_string f

module Read_operation = struct
type t = [ `Read | `Yield | `Close ]
type t = [ `Read | `Yield | `Close | `Upgrade ]

let pp_hum fmt (t : t) =
let str =
match t with
| `Read -> "Read"
| `Yield -> "Yield"
| `Close -> "Close"
| `Upgrade -> "Upgrade"
in
Format.pp_print_string fmt str
;;
end

module Write_operation = struct
type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int ]
type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int | `Upgrade ]

let iovecs_to_string iovecs =
let len = IOVec.lengthv iovecs in
Expand All @@ -50,12 +51,13 @@ module Write_operation = struct
| `Write iovecs -> Format.fprintf fmt "Write %S" (iovecs_to_string iovecs)
| `Yield -> Format.pp_print_string fmt "Yield"
| `Close len -> Format.fprintf fmt "Close %i" len
| `Upgrade -> Format.pp_print_string fmt "Upgrade"
;;

let to_write_as_string t =
match t with
| `Write iovecs -> Some (iovecs_to_string iovecs)
| `Close _ | `Yield -> None
| `Close _ | `Yield | `Upgrade -> None
;;
end

Expand All @@ -70,4 +72,5 @@ module Headers = struct
let connection_close = Headers.of_list ["connection", "close"]
let encoding_chunked = Headers.of_list ["transfer-encoding", "chunked"]
let encoding_fixed n = Headers.of_list ["content-length", string_of_int n]
let upgrade protocol = Headers.of_list ["connection", "upgrade" ; "upgrade", protocol]
end
10 changes: 5 additions & 5 deletions lib_test/test_client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ let read_response t r =

let reader_ready t =
Alcotest.check read_operation "Reader is ready"
`Read (next_read_operation t :> [`Close | `Read | `Yield]);
`Read (next_read_operation t :> Read_operation.t);
;;

let reader_closed t =
Alcotest.check read_operation "Reader is closed"
`Close (next_read_operation t :> [`Close | `Read | `Yield]);
`Close (next_read_operation t :> Read_operation.t);
;;

let write_string ?(msg="output written") t str =
Expand All @@ -64,17 +64,17 @@ let write_request ?(msg="request written") t r =

let writer_yielded t =
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation t);
`Yield (next_write_operation t :> Write_operation.t);
;;

let writer_closed t =
Alcotest.check write_operation "Writer is closed"
(`Close 0) (next_write_operation t);
(`Close 0) (next_write_operation t :> Write_operation.t);
;;

let connection_is_shutdown t =
Alcotest.check read_operation "Reader is closed"
`Close (next_read_operation t :> [`Close | `Read | `Yield]);
`Close (next_read_operation t :> Read_operation.t);
writer_closed t;
;;

Expand Down
Loading