Skip to content

Commit

Permalink
Merge pull request #49 from anmonteiro/anmonteiro/merge-upstream-2020…
Browse files Browse the repository at this point in the history
…-04-05

Merge upstream changes as of 2020-04-05.

Useful to get the alternative upstream fix related to Optional_thunk
  • Loading branch information
anmonteiro authored Apr 5, 2020
2 parents d1bcda9 + 51d30e6 commit c4dcd6b
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 45 deletions.
17 changes: 8 additions & 9 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,20 @@ type _ t =
; mutable write_final_if_chunked : bool
; mutable on_eof : unit -> unit
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; mutable when_ready_to_write : unit -> unit
; mutable when_ready_to_write : Optional_thunk.t
; buffered_bytes : int ref
}

let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())
let default_ready_to_write = Sys.opaque_identity (fun () -> ())

let of_faraday faraday =
{ faraday
; read_scheduled = false
; write_final_if_chunked = true
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_write = default_ready_to_write
; when_ready_to_write = Optional_thunk.none
; buffered_bytes = ref 0
}

Expand Down Expand Up @@ -83,8 +82,8 @@ let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =

let ready_to_write t =
let callback = t.when_ready_to_write in
t.when_ready_to_write <- default_ready_to_write;
callback ()
t.when_ready_to_write <- Optional_thunk.none;
Optional_thunk.call_if_some callback

let flush t kontinue =
Faraday.flush t.faraday kontinue;
Expand Down Expand Up @@ -149,11 +148,11 @@ let close_reader t =
;;

let when_ready_to_write t callback =
if not (t.when_ready_to_write == default_ready_to_write)
then failwith "Body.when_ready_to_write: only one callback can be registered at a time"
else if is_closed t
if is_closed t
then callback ()
else t.when_ready_to_write <- callback
else if Optional_thunk.is_some t.when_ready_to_write
then failwith "Body.when_ready_to_write: only one callback can be registered at a time"
else t.when_ready_to_write <- Optional_thunk.some callback

let transfer_to_writer_with_encoding t ~encoding writer =
let faraday = t.faraday in
Expand Down
6 changes: 3 additions & 3 deletions lib/headers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ let replace t name value =
if CI.equal needle name
then (
if seen
then loop t name nv true
else nv::loop t name nv true)
else nv'::loop t name nv seen
then loop t needle nv true
else nv::loop t needle nv true)
else nv'::loop t needle nv seen
in
try loop t name (name,value) false
with Local -> t
Expand Down
12 changes: 12 additions & 0 deletions lib/optional_thunk.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
type t = unit -> unit

let none = Sys.opaque_identity (fun () -> ())
let some f =
if f == none
then failwith "Optional_thunk: this function is not representable as a some value";
f

let is_none t = t == none
let is_some t = not (is_none t)
let call_if_some t = t ()
let unchecked_value t = t
10 changes: 10 additions & 0 deletions lib/optional_thunk.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
type t

val none : t
val some : (unit -> unit) -> t

val is_none : t -> bool
val is_some : t -> bool

val call_if_some : t -> unit
val unchecked_value : t -> unit -> unit
16 changes: 7 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type error =
[ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

type ('handle, 'io) response_state =
| Waiting of (unit -> unit) ref
| Waiting of Optional_thunk.t ref
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t
| Upgrade of Response.t * ('handle -> 'io)
Expand Down Expand Up @@ -79,23 +79,21 @@ type ('handle, 'io) t =
; mutable error_code : [`Ok | error ]
}

let default_waiting = Sys.opaque_identity (fun () -> ())

let create error_handler request request_body writer response_body_buffer =
{ request
; request_body
; writer
; response_body_buffer
; error_handler
; persistent = Request.persistent_connection request
; response_state = Waiting (ref default_waiting)
; response_state = Waiting (ref Optional_thunk.none)
; error_code = `Ok
}

let done_waiting when_done_waiting =
let f = !when_done_waiting in
when_done_waiting := default_waiting;
f ()
when_done_waiting := Optional_thunk.none;
Optional_thunk.call_if_some f

let request { request; _ } = request
let request_body { request_body; _ } = request_body
Expand Down Expand Up @@ -242,9 +240,9 @@ let error_code t =
let on_more_output_available t f =
match t.response_state with
| Waiting when_done_waiting ->
if not (!when_done_waiting == default_waiting) then
failwith "httpaf.Reqd.on_more_output_available: only one callback can be registered at a time";
when_done_waiting := f
if Optional_thunk.is_some !when_done_waiting
then failwith "httpaf.Reqd.on_more_output_available: only one callback can be registered at a time";
when_done_waiting := Optional_thunk.some f
| Streaming(_, response_body) ->
Body.when_ready_to_write response_body f
| Complete _ ->
Expand Down
40 changes: 16 additions & 24 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ type ('fd, 'io) t =
; request_queue : ('fd, 'io) Reqd.t Queue.t
(* invariant: If [request_queue] is not empty, then the head of the queue
has already had [request_handler] called on it. *)
; mutable wakeup_writer : (unit -> unit)
; mutable wakeup_reader : (unit -> unit)
; mutable wakeup_writer : Optional_thunk.t
; mutable wakeup_reader : Optional_thunk.t
(* Represents an unrecoverable error that will cause the connection to
* shutdown. Holds on to the response body created by the error handler
* that might be streaming to the client. *)
; mutable error_code : [`Ok | `Error of [`write] Body.t ]
}

let default_wakeup = Sys.opaque_identity (fun () -> ())

let is_closed t =
Reader.is_closed t.reader && Writer.is_closed t.writer

Expand All @@ -78,43 +76,37 @@ let current_reqd_exn t =
let yield_reader t k =
if is_closed t
then failwith "on_wakeup_reader on closed conn"
else if not (t.wakeup_reader == default_wakeup);
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- k
else t.wakeup_reader <- Optional_thunk.some k
;;

let wakeup_reader t =
let f = t.wakeup_reader in
t.wakeup_reader <- default_wakeup;
f ()
t.wakeup_reader <- Optional_thunk.none;
Optional_thunk.call_if_some f
;;

let on_wakeup_writer t k =
if is_closed t
then failwith "on_wakeup_writer on closed conn"
else if not (t.wakeup_writer == default_wakeup)
else if Optional_thunk.is_some t.wakeup_writer
then failwith "yield_writer: only one callback can be registered at a time"
else t.wakeup_writer <- k
else t.wakeup_writer <- Optional_thunk.some k
;;

let wakeup_writer t =
let f = t.wakeup_writer in
t.wakeup_writer <- default_wakeup;
f ()
t.wakeup_writer <- Optional_thunk.none;
Optional_thunk.call_if_some f
;;

let transfer_writer_callback t reqd =
(* Note: it's important that we don't call `Reqd.on_more_output_available` if
* no `wakeup_writer` callback has been registered. In the current
* implementation, `Reqd` performs its own bookkeeping by performing its own
* physical equality check. If `Server_connection` registers its dummy
* callback, `Reqd`'s physical equality check with _its own_ dummy callback
* will fail and cause weird bugs. *)
if not (t.wakeup_writer == default_wakeup) then begin
if Optional_thunk.is_some t.wakeup_writer
then (
let f = t.wakeup_writer in
t.wakeup_writer <- default_wakeup;
Reqd.on_more_output_available reqd f
end
t.wakeup_writer <- Optional_thunk.none;
Reqd.on_more_output_available reqd (Optional_thunk.unchecked_value f))
;;

let default_error_handler ?request:_ error handle =
Expand Down Expand Up @@ -150,8 +142,8 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque
; request_handler = request_handler
; error_handler = error_handler
; request_queue
; wakeup_writer = default_wakeup
; wakeup_reader = default_wakeup
; wakeup_writer = Optional_thunk.none
; wakeup_reader = Optional_thunk.none
; error_code = `Ok
}

Expand Down
7 changes: 7 additions & 0 deletions lib_test/test_httpaf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ module Headers = struct
"a"
"d");

check "replace middle element"
~expect:["e", "f"; "c", "z"; "a", "b"]
(Headers.replace
(Headers.of_list ["e", "f"; "c", "d"; "a", "b"])
"c"
"z");

check "remove multiple trailing elements"
~expect:["c", "d"; "a", "d"]
(Headers.replace
Expand Down

0 comments on commit c4dcd6b

Please sign in to comment.