Skip to content

Commit

Permalink
Merge pull request #66 from anmonteiro/anmonteiro/wait-for-first-flush
Browse files Browse the repository at this point in the history
Make `wait_for_first_flush` configurable in streaming responses
  • Loading branch information
seliopou authored Nov 25, 2018
2 parents 67d03bd + 661a0f4 commit 4c37916
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
2 changes: 1 addition & 1 deletion lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ module Reqd : sig

val respond_with_string : _ t -> Response.t -> string -> unit
val respond_with_bigstring : _ t -> Response.t -> Bigstring.t -> unit
val respond_with_streaming : _ t -> Response.t -> [`write] Body.t
val respond_with_streaming : ?flush_headers_immediately:bool -> _ t -> Response.t -> [`write] Body.t

(** Exception Handling *)

Expand Down
15 changes: 6 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type 'handle t =
; mutable persistent : bool
; mutable response_state : 'handle response_state
; mutable error_code : [`Ok | error ]
; wait_for_first_flush : bool
; mutable wait_for_first_flush : bool
}

let default_waiting = Sys.opaque_identity (fun () -> ())
Expand All @@ -90,10 +90,6 @@ let create error_handler request request_body writer response_body_buffer =
; persistent = Request.persistent_connection request
; response_state = Waiting (ref default_waiting)
; error_code = `Ok
(* XXX(seliopou): Make it configurable whether this callback is fired upon
* receiving the response, or after the first flush of the streaming body.
* There's a tradeoff here between time to first byte (latency) and batching
* (throughput). For now, just wait for the first flush. *)
; wait_for_first_flush = true
}

Expand Down Expand Up @@ -151,7 +147,8 @@ let respond_with_bigstring t response (bstr:Bigstring.t) =
| Complete _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already complete"

let unsafe_respond_with_streaming t response =
let unsafe_respond_with_streaming ~flush_headers_immediately t response =
t.wait_for_first_flush <- not flush_headers_immediately;
match t.response_state with
| Waiting when_done_waiting ->
let response_body = Body.create t.response_body_buffer in
Expand All @@ -167,10 +164,10 @@ let unsafe_respond_with_streaming t response =
| Complete _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already complete"

let respond_with_streaming t response =
let respond_with_streaming ?(flush_headers_immediately=false) t response =
if t.error_code <> `Ok then
failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
unsafe_respond_with_streaming t response
unsafe_respond_with_streaming ~flush_headers_immediately t response

let report_error t error =
t.persistent <- false;
Expand All @@ -184,7 +181,7 @@ let report_error t error =
| #Status.standard as status -> status
in
t.error_handler ~request:t.request error (fun headers ->
unsafe_respond_with_streaming t (Response.create ~headers status))
unsafe_respond_with_streaming ~flush_headers_immediately:true t (Response.create ~headers status))
| Waiting _, `Exn _ ->
(* XXX(seliopou): Decide what to do in this unlikely case. There is an
* outstanding call to the [error_handler], but an intervening exception
Expand Down
14 changes: 13 additions & 1 deletion lib_test/test_httpaf_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ let single_get =
~handler: (basic_handler "")
~input: [(`Request (Request.create `GET "/")), `Empty]
~output: [(`Response (Response.create `OK) ), `Empty]
; "singel GET, close connection"
; "single GET, close connection"
, `Quick
, Simulator.test_server
~handler: (basic_handler "")
Expand Down Expand Up @@ -97,6 +97,18 @@ let streaming_response =
Body.close_writer body))
~input: [ `Request (Request.create `GET "/"), `Empty ]
~output: [ `Response (Response.create `OK), `Fixed ["Hello,"; " world!"] ]
; "streaming headers, flush immediately"
, `Quick
, Simulator.test_server
~handler: (fun reqd ->
Simulator.debug " > handler called";
let request_body = Reqd.request_body reqd in
Body.close_reader request_body;
let body = Reqd.respond_with_streaming ~flush_headers_immediately:true reqd (Response.create `OK) in
Body.flush body (fun () ->
Body.close_writer body))
~input: [ `Request (Request.create ~headers:Headers.(of_list ["connection", "close"]) `GET "/"), `Empty ]
~output: [ `Response (Response.create `OK), `Fixed [] ]
]

;;
Expand Down

0 comments on commit 4c37916

Please sign in to comment.