Skip to content

Commit

Permalink
Abstract writer wakeups behind optional thunk
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed Apr 6, 2020
1 parent 4112a48 commit cffee23
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 51 deletions.
8 changes: 3 additions & 5 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ type _ t =
; mutable on_eof : unit -> unit
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; buffered_bytes : int ref
; ready_to_write : unit -> unit
; ready_to_write : Optional_thunk.t
}

let default_on_eof = Sys.opaque_identity (fun () -> ())

let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())

let default_ready_to_write = Sys.opaque_identity (fun () -> ())

let create buffer ready_to_write =
{ faraday = Faraday.of_bigstring buffer
; read_scheduled = false
Expand All @@ -61,13 +59,13 @@ let create buffer ready_to_write =
}

let create_empty () =
let t = create Bigstringaf.empty default_ready_to_write in
let t = create Bigstringaf.empty Optional_thunk.none in
Faraday.close t.faraday;
t

let empty = create_empty ()

let ready_to_write t = t.ready_to_write ()
let ready_to_write t = Optional_thunk.call_if_some t.ready_to_write

let write_char t c =
Faraday.write_char t.faraday c;
Expand Down
38 changes: 14 additions & 24 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ type t =
; pending_pings : (unit -> unit) Queue.t
; error_handler : error -> unit
; push_handler : Request.t -> (response_handler, unit) result
; wakeup_writer : (unit -> unit) ref
; wakeup_stream : unit -> unit
; mutable wakeup_writer : Optional_thunk.t
(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *)
Expand All @@ -84,26 +83,18 @@ type t =

let default_push_handler = Sys.opaque_identity (fun _ -> Ok (fun _ _ -> ()))

let is_shutdown t = Reader.is_closed t.reader && Writer.is_closed t.writer

let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer

let on_wakeup_writer t k =
if is_shutdown t then
if is_closed t then
failwith "on_wakeup_writer on closed conn"
else
t.wakeup_writer := k

let default_wakeup_writer = Sys.opaque_identity (fun () -> ())

let _wakeup_writer wakeup_ref =
let f = !wakeup_ref in
wakeup_ref := default_wakeup_writer;
f ()

let wakeup_writer t = _wakeup_writer t.wakeup_writer
t.wakeup_writer <- Optional_thunk.some k

let wakeup_stream t () = wakeup_writer (Lazy.force t)
let wakeup_writer t =
let f = t.wakeup_writer in
t.wakeup_writer <- Optional_thunk.none;
Optional_thunk.call_if_some f

let shutdown_reader t = Reader.force_close t.reader

Expand Down Expand Up @@ -189,7 +180,7 @@ let set_error_and_handle t stream error error_code =
wakeup_writer t

let report_exn t exn =
if not (is_shutdown t) then
if not (is_closed t) then
let additional_debug_data = Printexc.to_string exn in
report_connection_error t ~additional_debug_data Error_code.InternalError

Expand Down Expand Up @@ -254,7 +245,7 @@ let handle_push_promise_headers t respd headers =
, { Respd.request
; request_body
; response_handler
; wakeup_writer = t.wakeup_stream
; wakeup_writer = t.wakeup_writer
} )
| Error _ ->
(* From RFC7540§6.6:
Expand Down Expand Up @@ -1210,9 +1201,8 @@ let create ?(config = Config.default) ?push_handler ~error_handler =
; reader = Reader.client_frames connection_preface_handler frame_handler
; writer = Writer.create settings.max_frame_size
; streams = Scheduler.make_root ()
; wakeup_writer = ref default_wakeup_writer
; wakeup_stream =
wakeup_stream t
; wakeup_writer =
Optional_thunk.none
(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *)
Expand Down Expand Up @@ -1300,7 +1290,7 @@ let create_h2c
* it is completely sent. *)
; request_body = Body.empty
; response_handler
; wakeup_writer = t.wakeup_stream
; wakeup_writer = t.wakeup_writer
} );
wakeup_writer t;
Ok t
Expand All @@ -1311,7 +1301,7 @@ let request t request ~error_handler ~response_handler =
let max_frame_size = t.settings.max_frame_size in
let respd = create_and_add_stream t ~error_handler in
let request_body =
Body.create (Bigstringaf.create max_frame_size) t.wakeup_stream
Body.create (Bigstringaf.create max_frame_size) t.wakeup_writer
in
let frame_info =
Writer.make_frame_info
Expand All @@ -1327,7 +1317,7 @@ let request t request ~error_handler ~response_handler =
, { request
; request_body
; response_handler
; wakeup_writer = t.wakeup_stream
; wakeup_writer = t.wakeup_writer
} ));
wakeup_writer t;
(* Closing the request body puts the stream in the half-closed (local) state.
Expand Down
51 changes: 51 additions & 0 deletions lib/optional_thunk.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
(*----------------------------------------------------------------------------
* Copyright (c) 2020 Inhabited Type LLC.
* Copyright (c) 2020 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)

type t = unit -> unit

let none = Sys.opaque_identity (fun () -> ())

let some f =
if f == none then
failwith
"Optional_thunk: this function is not representable as a some value";
f

let is_none t = t == none

let is_some t = not (is_none t)

let call_if_some t = t ()

let unchecked_value t = t
47 changes: 47 additions & 0 deletions lib/optional_thunk.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
(*----------------------------------------------------------------------------
* Copyright (c) 2020 Inhabited Type LLC.
* Copyright (c) 2020 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)

type t

val none : t

val some : (unit -> unit) -> t

val is_none : t -> bool

val is_some : t -> bool

val call_if_some : t -> unit

val unchecked_value : t -> unit -> unit
8 changes: 4 additions & 4 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type active_stream =
* trailer headers are emitted. *)
; mutable trailers_parser : partial_headers option
; mutable trailers : Headers.t option
; wakeup_writer : unit -> unit
; wakeup_writer : Optional_thunk.t
; create_push_stream :
unit -> (t, [ `Push_disabled | `Stream_ids_exhausted ]) result
}
Expand Down Expand Up @@ -191,7 +191,7 @@ let send_fixed_response t s response data =
s.response_state <- Fixed { response; iovec }
else
s.response_state <- Complete response;
s.wakeup_writer ()
Optional_thunk.call_if_some s.wakeup_writer
| Streaming _ ->
failwith "h2.Reqd.respond_with_*: response already started"
| Fixed _ | Complete _ ->
Expand Down Expand Up @@ -238,7 +238,7 @@ let send_streaming_response ~flush_headers_immediately t s response =
Writer.write_response_headers t.writer s.encoder frame_info response;
if s.wait_for_first_flush then Writer.yield t.writer;
s.response_state <- Streaming (response, response_body);
s.wakeup_writer ();
Optional_thunk.call_if_some s.wakeup_writer;
response_body
| Streaming _ ->
failwith "h2.Reqd.respond_with_streaming: response already started"
Expand Down Expand Up @@ -305,7 +305,7 @@ let start_push_stream t s request =
* might immediately call one of the `respond_with` functions and expect
* the stream to be in the `Reserved` state. *)
promised_reqd.state <- Reserved (request_info, active_stream);
wakeup_writer ();
Optional_thunk.call_if_some wakeup_writer;
Ok promised_reqd
| Error e ->
Error (e :> [ `Push_disabled | `Stream_cant_push | `Stream_ids_exhausted ])
Expand Down
2 changes: 1 addition & 1 deletion lib/respd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type active_request =
{ request : Request.t
; request_body : [ `read ] Body.t
; response_handler : response_handler
; wakeup_writer : unit -> unit
; wakeup_writer : Optional_thunk.t
}

type active_state =
Expand Down
26 changes: 9 additions & 17 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ type t =
* we haven't eceived an acknowledgment from the client. *)
; mutable unacked_settings : int
; mutable did_send_go_away : bool
; wakeup_writer : (unit -> unit) ref
; wakeup_stream : unit -> unit
; mutable wakeup_writer : Optional_thunk.t
(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *)
Expand All @@ -90,16 +89,12 @@ let on_wakeup_writer t k =
if is_closed t then
failwith "on_wakeup_writer on closed conn"
else
t.wakeup_writer := k
t.wakeup_writer <- Optional_thunk.some k

let default_wakeup_writer = Sys.opaque_identity (fun () -> ())

let _wakeup_writer wakeup_ref =
let f = !wakeup_ref in
wakeup_ref := default_wakeup_writer;
f ()

let wakeup_writer t = _wakeup_writer t.wakeup_writer
let wakeup_writer t =
let f = t.wakeup_writer in
t.wakeup_writer <- Optional_thunk.none;
Optional_thunk.call_if_some f

let shutdown_reader t = Reader.force_close t.reader

Expand Down Expand Up @@ -443,7 +438,7 @@ let process_first_headers_block t frame_header reqd headers_block =
Reqd.create_active_stream
t.hpack_encoder
t.config.response_body_buffer_size
t.wakeup_stream
t.wakeup_writer
(create_push_stream t)
in
reqd.Stream.state <-
Expand Down Expand Up @@ -1101,8 +1096,6 @@ let settings_from_config
; enable_push = enable_server_push
}

let wakeup_stream t () = wakeup_writer (Lazy.force t)

let write_connection_preface t =
(* Check if the settings for the connection are different than the default
* HTTP/2 settings. In the event that they are, we need to send a non-empty
Expand Down Expand Up @@ -1215,8 +1208,7 @@ let create_generic ~h2c ~config ~error_handler request_handler =
; receiving_headers_for_stream = None
; did_send_go_away = false
; unacked_settings = 0
; wakeup_writer = ref default_wakeup_writer
; wakeup_stream = wakeup_stream t
; wakeup_writer = Optional_thunk.none
; hpack_encoder = Hpack.Encoder.(create settings.header_table_size)
; hpack_decoder = Hpack.Decoder.(create settings.header_table_size)
}
Expand All @@ -1242,7 +1234,7 @@ let handle_h2c_request t headers request_body_iovecs =
Reqd.create_active_stream
t.hpack_encoder
t.config.response_body_buffer_size
(fun () -> wakeup_writer t)
t.wakeup_writer
(create_push_stream t)
in
t.max_client_stream_id <- reqd.Stream.id;
Expand Down

0 comments on commit cffee23

Please sign in to comment.