Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make wait_for_first_flush configurable in streaming responses #66

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ module Reqd : sig

val respond_with_string : _ t -> Response.t -> string -> unit
val respond_with_bigstring : _ t -> Response.t -> Bigstring.t -> unit
val respond_with_streaming : _ t -> Response.t -> [`write] Body.t
val respond_with_streaming : ?flush_headers_immediately:bool -> _ t -> Response.t -> [`write] Body.t

(** Exception Handling *)

Expand Down
15 changes: 6 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type 'handle t =
; mutable persistent : bool
; mutable response_state : 'handle response_state
; mutable error_code : [`Ok | error ]
; wait_for_first_flush : bool
; mutable wait_for_first_flush : bool
}

let default_waiting = Sys.opaque_identity (fun () -> ())
Expand All @@ -90,10 +90,6 @@ let create error_handler request request_body writer response_body_buffer =
; persistent = Request.persistent_connection request
; response_state = Waiting (ref default_waiting)
; error_code = `Ok
(* XXX(seliopou): Make it configurable whether this callback is fired upon
* receiving the response, or after the first flush of the streaming body.
* There's a tradeoff here between time to first byte (latency) and batching
* (throughput). For now, just wait for the first flush. *)
; wait_for_first_flush = true
}

Expand Down Expand Up @@ -151,7 +147,8 @@ let respond_with_bigstring t response (bstr:Bigstring.t) =
| Complete _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already complete"

let unsafe_respond_with_streaming t response =
let unsafe_respond_with_streaming ~flush_headers_immediately t response =
t.wait_for_first_flush <- not flush_headers_immediately;
match t.response_state with
| Waiting when_done_waiting ->
let response_body = Body.create t.response_body_buffer in
Expand All @@ -167,10 +164,10 @@ let unsafe_respond_with_streaming t response =
| Complete _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already complete"

let respond_with_streaming t response =
let respond_with_streaming ?(flush_headers_immediately=false) t response =
if t.error_code <> `Ok then
failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
unsafe_respond_with_streaming t response
unsafe_respond_with_streaming ~flush_headers_immediately t response

let report_error t error =
t.persistent <- false;
Expand All @@ -184,7 +181,7 @@ let report_error t error =
| #Status.standard as status -> status
in
t.error_handler ~request:t.request error (fun headers ->
unsafe_respond_with_streaming t (Response.create ~headers status))
unsafe_respond_with_streaming ~flush_headers_immediately:true t (Response.create ~headers status))
| Waiting _, `Exn _ ->
(* XXX(seliopou): Decide what to do in this unlikely case. There is an
* outstanding call to the [error_handler], but an intervening exception
Expand Down
14 changes: 13 additions & 1 deletion lib_test/test_httpaf_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ let single_get =
~handler: (basic_handler "")
~input: [(`Request (Request.create `GET "/")), `Empty]
~output: [(`Response (Response.create `OK) ), `Empty]
; "singel GET, close connection"
; "single GET, close connection"
, `Quick
, Simulator.test_server
~handler: (basic_handler "")
Expand Down Expand Up @@ -97,6 +97,18 @@ let streaming_response =
Body.close_writer body))
~input: [ `Request (Request.create `GET "/"), `Empty ]
~output: [ `Response (Response.create `OK), `Fixed ["Hello,"; " world!"] ]
; "streaming headers, flush immediately"
, `Quick
, Simulator.test_server
~handler: (fun reqd ->
Simulator.debug " > handler called";
let request_body = Reqd.request_body reqd in
Body.close_reader request_body;
let body = Reqd.respond_with_streaming ~flush_headers_immediately:true reqd (Response.create `OK) in
Body.flush body (fun () ->
Body.close_writer body))
~input: [ `Request (Request.create ~headers:Headers.(of_list ["connection", "close"]) `GET "/"), `Empty ]
~output: [ `Response (Response.create `OK), `Fixed [] ]
]

;;
Expand Down