From 61164bc30cb9ff05d664c77b8b95837f59f23150 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 17 Oct 2019 22:30:48 -0400 Subject: [PATCH] wakeup-cleanup: only allow one wakeup calblack per I/O operation --- lib/server_connection.ml | 62 ++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index b662034b..fbaaa8b7 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -64,10 +64,12 @@ type t = ; request_queue : Reqd.t Queue.t (* invariant: If [request_queue] is not empty, then the head of the queue has already had [request_handler] called on it. *) - ; wakeup_writer : (unit -> unit) list ref - ; wakeup_reader : (unit -> unit) list ref + ; mutable wakeup_writer : (unit -> unit) + ; mutable wakeup_reader : (unit -> unit) } +let default_wakeup = Sys.opaque_identity (fun () -> ()) + let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer @@ -80,37 +82,39 @@ let is_active t = let current_reqd_exn t = Queue.peek_exn t.request_queue -let on_wakeup_reader t k = +let yield_reader t k = if is_closed t then failwith "on_wakeup_reader on closed conn" - else t.wakeup_reader := k::!(t.wakeup_reader) + else if not (t.wakeup_reader == default_wakeup); + then failwith "yield_reader: only one callback can be registered at a time" + else t.wakeup_reader <- k +;; + +let wakeup_reader t = + let f = t.wakeup_reader in + t.wakeup_reader <- default_wakeup; + f () +;; let on_wakeup_writer t k = if is_closed t then failwith "on_wakeup_writer on closed conn" - else t.wakeup_writer := k::!(t.wakeup_writer) + else if not (t.wakeup_writer == default_wakeup) + then failwith "yield_writer: only one callback can be registered at a time" + else t.wakeup_writer <- k +;; let wakeup_writer t = - let fs = !(t.wakeup_writer) in - t.wakeup_writer := []; - List.iter (fun f -> f ()) fs - -let rec _transfer_writer_callbacks fs reqd = - match fs with - | [] -> () - | f :: fs -> - Reqd.on_more_output_available reqd f; - _transfer_writer_callbacks fs reqd - -let transfer_writer_callbacks t reqd = - let fs = !(t.wakeup_writer) in - t.wakeup_writer := []; - _transfer_writer_callbacks fs reqd + let f = t.wakeup_writer in + t.wakeup_writer <- default_wakeup; + f () +;; -let wakeup_reader t = - let fs = !(t.wakeup_reader) in - t.wakeup_reader := []; - List.iter (fun f -> f ()) fs +let transfer_writer_callback t reqd = + let f = t.wakeup_writer in + t.wakeup_writer <- default_wakeup; + Reqd.on_more_output_available reqd f +;; let default_error_handler ?request:_ error handle = let message = @@ -145,8 +149,8 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque ; request_handler = request_handler ; error_handler = error_handler ; request_queue - ; wakeup_writer = ref [] - ; wakeup_reader = ref [] + ; wakeup_writer = default_wakeup + ; wakeup_reader = default_wakeup } let shutdown_reader t = @@ -244,7 +248,7 @@ let read_with_more t bs ~off ~len more = let reqd = current_reqd_exn t in if call_handler then ( - transfer_writer_callbacks t reqd; + transfer_writer_callback t reqd; t.request_handler reqd ); Reqd.flush_request_body reqd; @@ -258,10 +262,6 @@ let read t bs ~off ~len = let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete -let yield_reader t k = - on_wakeup_reader t k -;; - let flush_response_body t = if is_active t then let reqd = current_reqd_exn t in