Skip to content

Commit

Permalink
Make wait_for_first_flush configurable in streaming responses
Browse files Browse the repository at this point in the history
This is related to #65. I managed to get websockets working using
Conduit in order to gain access to the file descriptors. However,
because streaming responses wait for the first write to flush response
bodies, I could never deliver the connection upgrade with 1. an empty
response body and 2. while not closing the writer.

Making `wait_for_first_flush` configurable (which I see was already in
the roadmap) solves this problem.

I've tested this in my code and it's working really well now. Please
let me know if you had thought of another way of configuring this
behavior.
  • Loading branch information
Antonio Nuno Monteiro committed Aug 10, 2018
1 parent 743f19d commit a3a6897
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
2 changes: 1 addition & 1 deletion lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ module Reqd : sig

val respond_with_string : _ t -> Response.t -> string -> unit
val respond_with_bigstring : _ t -> Response.t -> Bigstring.t -> unit
val respond_with_streaming : _ t -> Response.t -> [`write] Body.t
val respond_with_streaming : ?wait_for_first_flush:bool -> _ t -> Response.t -> [`write] Body.t

(** Exception Handling *)

Expand Down
15 changes: 6 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type 'handle t =
; mutable persistent : bool
; mutable response_state : 'handle response_state
; mutable error_code : [`Ok | error ]
; wait_for_first_flush : bool
; mutable wait_for_first_flush : bool
}

let default_waiting = Sys.opaque_identity (fun () -> ())
Expand All @@ -90,10 +90,6 @@ let create error_handler request request_body writer response_body_buffer =
; persistent = Request.persistent_connection request
; response_state = Waiting (ref default_waiting)
; error_code = `Ok
(* XXX(seliopou): Make it configurable whether this callback is fired upon
* receiving the response, or after the first flush of the streaming body.
* There's a tradeoff here between time to first byte (latency) and batching
* (throughput). For now, just wait for the first flush. *)
; wait_for_first_flush = true
}

Expand Down Expand Up @@ -151,7 +147,8 @@ let respond_with_bigstring t response (bstr:Bigstring.t) =
| Complete _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already complete"

let unsafe_respond_with_streaming t response =
let unsafe_respond_with_streaming ~wait_for_first_flush t response =
t.wait_for_first_flush <- wait_for_first_flush;
match t.response_state with
| Waiting when_done_waiting ->
let response_body = Body.create t.response_body_buffer in
Expand All @@ -167,10 +164,10 @@ let unsafe_respond_with_streaming t response =
| Complete _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already complete"

let respond_with_streaming t response =
let respond_with_streaming ?(wait_for_first_flush=true) t response =
if t.error_code <> `Ok then
failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
unsafe_respond_with_streaming t response
unsafe_respond_with_streaming ~wait_for_first_flush t response

let report_error t error =
t.persistent <- false;
Expand All @@ -184,7 +181,7 @@ let report_error t error =
| #Status.standard as status -> status
in
t.error_handler ~request:t.request error (fun headers ->
unsafe_respond_with_streaming t (Response.create ~headers status))
unsafe_respond_with_streaming ~wait_for_first_flush:false t (Response.create ~headers status))
| Waiting _, `Exn _ ->
(* XXX(seliopou): Decide what to do in this unlikely case. There is an
* outstanding call to the [error_handler], but an intervening exception
Expand Down

0 comments on commit a3a6897

Please sign in to comment.