Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional thunk #170

Merged
merged 4 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,20 @@ type _ t =
; mutable write_final_if_chunked : bool
; mutable on_eof : unit -> unit
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; mutable when_ready_to_write : unit -> unit
; mutable when_ready_to_write : Optional_thunk.t
; buffered_bytes : int ref
}

let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())
let default_ready_to_write = Sys.opaque_identity (fun () -> ())

let of_faraday faraday =
{ faraday
; read_scheduled = false
; write_final_if_chunked = true
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_write = default_ready_to_write
; when_ready_to_write = Optional_thunk.none
; buffered_bytes = ref 0
}

Expand Down Expand Up @@ -79,8 +78,8 @@ let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =

let ready_to_write t =
let callback = t.when_ready_to_write in
t.when_ready_to_write <- default_ready_to_write;
callback ()
t.when_ready_to_write <- Optional_thunk.none;
Optional_thunk.unchecked_value callback ()

let flush t kontinue =
Faraday.flush t.faraday kontinue;
Expand Down Expand Up @@ -145,11 +144,11 @@ let close_reader t =
;;

let when_ready_to_write t callback =
if not (t.when_ready_to_write == default_ready_to_write)
then failwith "Body.when_ready_to_write: only one callback can be registered at a time"
else if is_closed t
if is_closed t
then callback ()
else t.when_ready_to_write <- callback
else if Optional_thunk.is_some t.when_ready_to_write
then failwith "Body.when_ready_to_write: only one callback can be registered at a time"
else t.when_ready_to_write <- Optional_thunk.some callback

let transfer_to_writer_with_encoding t ~encoding writer =
let faraday = t.faraday in
Expand Down
11 changes: 11 additions & 0 deletions lib/optional_thunk.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
type t = unit -> unit

let none = Sys.opaque_identity (fun () -> ())
let some f =
if f == none
then failwith "Optional_thunk: this function is not representable as a some value";
f

let is_none t = t == none
let is_some t = not (is_none t)
let unchecked_value t = t
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this when I was reviewing it and asking myself "should this be raising if t is none?" – but that felt a little burdensome in all of the places we use it safely.

But now I'm thinking about it some more, and I'm wondering: maybe it would make sense if we split this into two functions, kind of like what we talked about before the PR:

(** Return the exposed thunk, raises if called on [none] *)
val value_exn : t -> unit -> unit

(** Invoke the thunk if [some], do nothing if [none] *)
val invoke_if_some : t -> unit

This has two benefits: calling an optional thunk should always be safe, since the default is to just noop by invoking none, and we can use this in all of the current places where we use unchecked_value without is_some. But removing opaqueness is something you have to be a bit more careful about, since presumably you should be able to do Optional_thunk.some (Optional_thunk.value_exn t). And ultimately I rank "descriptive names" below "failing at runtime" and "compiler error" in terms of safety.

9 changes: 9 additions & 0 deletions lib/optional_thunk.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type t

val none : t
val some : (unit -> unit) -> t

val is_none : t -> bool
val is_some : t -> bool

val unchecked_value : t -> unit -> unit
16 changes: 7 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type error =
[ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

type response_state =
| Waiting of (unit -> unit) ref
| Waiting of Optional_thunk.t ref
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t

Expand Down Expand Up @@ -78,23 +78,21 @@ type t =
; mutable error_code : [`Ok | error ]
}

let default_waiting = Sys.opaque_identity (fun () -> ())

let create error_handler request request_body writer response_body_buffer =
{ request
; request_body
; writer
; response_body_buffer
; error_handler
; persistent = Request.persistent_connection request
; response_state = Waiting (ref default_waiting)
; response_state = Waiting (ref Optional_thunk.none)
; error_code = `Ok
}

let done_waiting when_done_waiting =
let f = !when_done_waiting in
when_done_waiting := default_waiting;
f ()
when_done_waiting := Optional_thunk.none;
Optional_thunk.unchecked_value f ()

let request { request; _ } = request
let request_body { request_body; _ } = request_body
Expand Down Expand Up @@ -213,9 +211,9 @@ let error_code t =
let on_more_output_available t f =
match t.response_state with
| Waiting when_done_waiting ->
if not (!when_done_waiting == default_waiting) then
failwith "httpaf.Reqd.on_more_output_available: only one callback can be registered at a time";
when_done_waiting := f
if Optional_thunk.is_some !when_done_waiting
then failwith "httpaf.Reqd.on_more_output_available: only one callback can be registered at a time";
when_done_waiting := Optional_thunk.some f
| Streaming(_, response_body) ->
Body.when_ready_to_write response_body f
| Complete _ ->
Expand Down
34 changes: 17 additions & 17 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ type t =
; request_queue : Reqd.t Queue.t
(* invariant: If [request_queue] is not empty, then the head of the queue
has already had [request_handler] called on it. *)
; mutable wakeup_writer : (unit -> unit)
; mutable wakeup_reader : (unit -> unit)
; mutable wakeup_writer : Optional_thunk.t
; mutable wakeup_reader : Optional_thunk.t
}

let default_wakeup = Sys.opaque_identity (fun () -> ())

let is_closed t =
Reader.is_closed t.reader && Writer.is_closed t.writer

Expand All @@ -85,35 +83,37 @@ let current_reqd_exn t =
let yield_reader t k =
if is_closed t
then failwith "on_wakeup_reader on closed conn"
else if not (t.wakeup_reader == default_wakeup);
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- k
else t.wakeup_reader <- Optional_thunk.some k
;;

let wakeup_reader t =
let f = t.wakeup_reader in
t.wakeup_reader <- default_wakeup;
f ()
t.wakeup_reader <- Optional_thunk.none;
Optional_thunk.unchecked_value f ()
;;

let on_wakeup_writer t k =
if is_closed t
then failwith "on_wakeup_writer on closed conn"
else if not (t.wakeup_writer == default_wakeup)
else if Optional_thunk.is_some t.wakeup_writer
then failwith "yield_writer: only one callback can be registered at a time"
else t.wakeup_writer <- k
else t.wakeup_writer <- Optional_thunk.some k
;;

let wakeup_writer t =
let f = t.wakeup_writer in
t.wakeup_writer <- default_wakeup;
f ()
t.wakeup_writer <- Optional_thunk.none;
Optional_thunk.unchecked_value f ()
;;

let transfer_writer_callback t reqd =
let f = t.wakeup_writer in
t.wakeup_writer <- default_wakeup;
Reqd.on_more_output_available reqd f
if Optional_thunk.is_some t.wakeup_writer
then (
let f = t.wakeup_writer in
t.wakeup_writer <- Optional_thunk.none;
Reqd.on_more_output_available reqd (Optional_thunk.unchecked_value f))
;;

let default_error_handler ?request:_ error handle =
Expand Down Expand Up @@ -149,8 +149,8 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque
; request_handler = request_handler
; error_handler = error_handler
; request_queue
; wakeup_writer = default_wakeup
; wakeup_reader = default_wakeup
; wakeup_writer = Optional_thunk.none
; wakeup_reader = Optional_thunk.none
}

let shutdown_reader t =
Expand Down