From bb9ca8a465807866c6fdb9e6fd162cdc92fd1fd5 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Mon, 13 Mar 2023 21:26:09 +0000 Subject: [PATCH] Simplify and document scheduler --- lib_eio_js/browser/dune | 4 - lib_eio_js/browser/eio_browser.ml | 146 +++++++++++++++++++----------- lib_eio_js/browser/runtime.js | 27 ------ lib_eio_js/browser/stubs.c | 5 - lib_eio_js/browser/test/test.ml | 18 +++- 5 files changed, 112 insertions(+), 88 deletions(-) delete mode 100644 lib_eio_js/browser/runtime.js delete mode 100644 lib_eio_js/browser/stubs.c diff --git a/lib_eio_js/browser/dune b/lib_eio_js/browser/dune index 2f1bcd0cd..efe3a256b 100644 --- a/lib_eio_js/browser/dune +++ b/lib_eio_js/browser/dune @@ -1,8 +1,4 @@ (library (name eio_browser) (public_name eio_browser) - (foreign_stubs - (language c) - (names stubs)) - (js_of_ocaml (javascript_files runtime.js)) (libraries eio brr)) \ No newline at end of file diff --git a/lib_eio_js/browser/eio_browser.ml b/lib_eio_js/browser/eio_browser.ml index b31dd93a3..e2e35f3f6 100644 --- a/lib_eio_js/browser/eio_browser.ml +++ b/lib_eio_js/browser/eio_browser.ml @@ -47,10 +47,12 @@ end module Ctf = Eio.Private.Ctf +type suspend = Suspend + module Suspended = struct type 'a t = { fiber : Eio.Private.Fiber_context.t; - k : ('a, unit) Effect.Deep.continuation; + k : ('a, suspend) Effect.Deep.continuation; } let tid t = Eio.Private.Fiber_context.tid t.fiber @@ -64,59 +66,89 @@ module Suspended = struct Effect.Deep.discontinue t.k ex end -(* Resume the next runnable fiber, if any. *) -let rec wakeup run_q = - match Run_queue.pop run_q with - | Some f -> - f (); - wakeup run_q - | None -> () - -(* The Javascript backend scheduler is implemented as an event listener. - We don't need to worry about multiple domains. Here any time something - asynchronously enqueues a task to our queue, it also sends a wakeup event to - the event listener which will run the callback calling the scheduler. *) -module Scheduler = struct - type t = { - run_q : (unit -> unit) Run_queue.t; - mutable idle_callback : Jv.t option; - } +(* Scheduler description: - let v run_q = - let idle_callback = None in - { run_q; idle_callback } + The Javascript, browser scheduler relies on yielding to the Javascript engine + to allow IO tasks to complete. There are a few different corner cases to be aware + of: + + - The [IO] job is used to fix the "busy yielding" problem. + - Calls to [scheduled_wakeup] are coalesced by checking if the timer has already + been set or not. If the timer callback is fired, the timer id is removed. + + When do we need to "schedule a wakeup"? From the code this happens in two places, + whenever we hit the [IO] job or whenever we run one of the [enqueue_] functions + which are called in callbacks that are trying to enqueue a continuation to resume. - external _request_idle_callback : Jv.t -> Jv.t = "requestIdleCallbackShim" - external _cancel_idle_callback : Jv.t -> unit = "cancelIdleCallbackShim" + [IO] case: this is necessary in the "busy yielding" example. - let request_idle_callback cb = - _request_idle_callback (Jv.callback ~arity:1 (fun _ -> cb ())) + [enqueue_] case: this is necessary for IO. Consider calling [Eio_browser.Timeout.sleep ~ms:200]. + This wants to register the continuation to be run whenever the callback passed to [set_timeout] + is run. The path through our scheduler would be: - let wakeup t = - (* No need to schedule a wakeup if the idle_callback is already set. *) - if Option.is_some t.idle_callback then () else begin - let idle_callback = request_idle_callback (fun () -> t.idle_callback <- None; wakeup t.run_q) in - t.idle_callback <- Some idle_callback - end + - The initial IO job is put in the run queue. + - The timeout function is called. + - We are then idle allowing the IO job's wakeup to run, calling the scheduler. + - The run queue is empty so we hit [None] and are idle again. + - The timeout callback function is called enqueuing the continuation but no wakeup + is currently set to actually run the scheduler, so we call wakeup. +*) +module Scheduler = struct + type job = + | Runnable of (unit -> suspend) + | IO + + type t = { + run_q : job Run_queue.t; + mutable io_queued : bool; + mutable scheduled_wakeup : int option; + } - let stop t = - Option.iter _cancel_idle_callback t.idle_callback; - t.idle_callback <- None + let rec next t = match Run_queue.pop t.run_q with + | Some (Runnable fn) -> + if not t.io_queued then begin + Run_queue.push t.run_q IO; + t.io_queued <- true + end; + fn () + | Some IO -> + t.io_queued <- false; + schedule_wakeup t; + Suspend + | None -> Suspend + + and schedule_wakeup t : unit = match t.scheduled_wakeup with + | Some _ -> () + | None -> + (* Calling a timeout with 0 runs the callback "'immediately', + or more accurately, the next event cycle.". Note this will only + work on active tabs, meaning the scheduler will somewhat freeze if your + tab becomes inactive. + + https://developer.mozilla.org/en-US/docs/Web/API/setTimeout *) + let timeout = G.set_timeout ~ms:0 (fun () -> + t.scheduled_wakeup <- None; + let Suspend = next t in () + ) in + t.scheduled_wakeup <- Some timeout + + let v run_q = + { run_q; io_queued = false; scheduled_wakeup = None } let enqueue_thread t k v = - Run_queue.push t.run_q (fun () -> Suspended.continue k v); - wakeup t + Run_queue.push t.run_q (Runnable (fun () -> Suspended.continue k v)); + schedule_wakeup t let enqueue_failed_thread t k v = - Run_queue.push t.run_q (fun () -> Suspended.discontinue k v); - wakeup t + Run_queue.push t.run_q (Runnable (fun () -> Suspended.discontinue k v)); + schedule_wakeup t let enqueue_at_head t k v = - Run_queue.push_head t.run_q (fun () -> Suspended.continue k v); - wakeup t + Run_queue.push_head t.run_q (Runnable (fun () -> Suspended.continue k v)); + schedule_wakeup t end -type _ Effect.t += Enter_unchecked : (Scheduler.t -> 'a Suspended.t -> unit) -> 'a Effect.t +type _ Effect.t += Enter_unchecked : (Scheduler.t -> 'a Suspended.t -> suspend) -> 'a Effect.t let enter_unchecked fn = Effect.perform (Enter_unchecked fn) module Timeout = struct @@ -126,7 +158,11 @@ module Timeout = struct Fiber_context.clear_cancel_fn k.fiber; Scheduler.enqueue_thread st k () ) in - Fiber_context.set_cancel_fn k.fiber (fun exn -> G.stop_timer id; Scheduler.enqueue_failed_thread st k exn); + Fiber_context.set_cancel_fn k.fiber (fun exn -> + G.stop_timer id; + Scheduler.enqueue_failed_thread st k exn + ); + Scheduler.next st end let await fut = @@ -140,7 +176,8 @@ let await fut = Fiber_context.clear_cancel_fn k.fiber; Scheduler.enqueue_thread st k v end - ) + ); + Scheduler.next st let next_event : 'a Brr.Ev.type' -> Brr.Ev.target -> 'a Brr.Ev.t = fun typ target -> let opts = Brr.Ev.listen_opts ~once:true () in @@ -150,28 +187,35 @@ let next_event : 'a Brr.Ev.type' -> Brr.Ev.target -> 'a Brr.Ev.t = fun typ targe will be called and so enqueue_thread will never be called even if another event arrives. *) let v = listen (fun v -> Fiber_context.clear_cancel_fn k.fiber; Scheduler.enqueue_thread st k v) in - Fiber_context.set_cancel_fn k.fiber (fun exn -> Ev.unlisten v; Scheduler.enqueue_failed_thread st k exn) + Fiber_context.set_cancel_fn k.fiber (fun exn -> Ev.unlisten v; Scheduler.enqueue_failed_thread st k exn); + Scheduler.next st (* Largely based on the Eio_mock.Backend event loop. *) let run main = let run_q = Run_queue.create () in let scheduler = Scheduler.v run_q in + Run_queue.push run_q IO; let rec fork ~new_fiber:fiber fn = Effect.Deep.match_with fn () - { retc = (fun () -> Fiber_context.destroy fiber); + { retc = (fun () -> Fiber_context.destroy fiber; Scheduler.next scheduler); exnc = (fun ex -> let bt = Printexc.get_raw_backtrace () in Fiber_context.destroy fiber; Printexc.raise_with_backtrace ex bt ); - effc = fun (type a) (e : a Effect.t) : ((a, unit) Effect.Deep.continuation -> unit) option -> + effc = fun (type a) (e : a Effect.t) : ((a, suspend) Effect.Deep.continuation -> suspend) option -> match e with | Eio.Private.Effects.Suspend f -> Some (fun k -> let k = { Suspended.k; fiber } in f fiber (function - | Ok v -> Scheduler.enqueue_thread scheduler k v - | Error ex -> Scheduler.enqueue_failed_thread scheduler k ex - ) + | Ok v -> ( + Scheduler.enqueue_thread scheduler k v; + ) + | Error ex -> ( + Scheduler.enqueue_failed_thread scheduler k ex; + ) + ); + Scheduler.next scheduler ) | Enter_unchecked fn -> Some (fun k -> fn scheduler { Suspended.k; fiber } @@ -189,5 +233,5 @@ let run main = in let new_fiber = Fiber_context.make_root () in let result, r = Fut.create () in - let () = fork ~new_fiber (fun () -> r (main ())) in - Fut.map (fun v -> Scheduler.stop scheduler; v) result + let Suspend = fork ~new_fiber (fun () -> r (main ())) in + result diff --git a/lib_eio_js/browser/runtime.js b/lib_eio_js/browser/runtime.js deleted file mode 100644 index 44bd41af7..000000000 --- a/lib_eio_js/browser/runtime.js +++ /dev/null @@ -1,27 +0,0 @@ -// A shim for safari: https://developer.chrome.com/blog/using-requestidlecallback/ - -// Provides: requestIdleCallbackShim -function requestIdleCallbackShim (cb) { - if (window.requestIdleCallback) { - window.requestIdleCallback(cb) - } else { - var start = Date.now(); - globalThis.setTimeout(function () { - cb({ - didTimeout: false, - timeRemaining: function () { - return Math.max(0, 50 - (Date.now() - start)); - } - }); - }, 1); - } -} - -// Provides: cancelIdleCallbackShim -function cancelIdleCallbackShim (id) { - if (window.cancelIdleCallback) { - window.cancelIdleCallback(id); - } else { - globalThis.clearTimeout(id); - } -} \ No newline at end of file diff --git a/lib_eio_js/browser/stubs.c b/lib_eio_js/browser/stubs.c deleted file mode 100644 index c19e71549..000000000 --- a/lib_eio_js/browser/stubs.c +++ /dev/null @@ -1,5 +0,0 @@ - -#include -#include -void requestIdleCallbackShim () { fprintf(stderr, "Unimplemented Javascript primitive requestIdleCallbackShim!\n"); exit(1); } -void cancelIdleCallbackShim () { fprintf(stderr, "Unimplemented Javascript primitive cancelIdleCallbackShim!\n"); exit(1); } \ No newline at end of file diff --git a/lib_eio_js/browser/test/test.ml b/lib_eio_js/browser/test/test.ml index f177b3deb..f1efe0a78 100644 --- a/lib_eio_js/browser/test/test.ml +++ b/lib_eio_js/browser/test/test.ml @@ -80,12 +80,28 @@ module Browser_tests = struct in Alcotest.(check (list int)) "timeouts" lst v + let test_busy_yielding () = + let i = ref 0 in + let rec loop () = + Eio.Fiber.yield (); + incr i; + (* An early cut off to prevent browser tab from crashing! *) + if !i > 1000000 then () else loop () + in + Fiber.yield (); + Eio_browser.Timeout.sleep ~ms:10; + Fiber.first + loop + (fun () -> Eio_browser.Timeout.sleep ~ms:10); + if !i > 1000000 then Alcotest.fail "Yielding was not cancelled" + let tests = [ Alcotest.test_case "timeout cancelled" `Quick test_timeout_cancel; Alcotest.test_case "fut await" `Quick test_fut_await; Alcotest.test_case "fut cancelled" `Quick test_fut_cancel; Alcotest.test_case "test timeout" `Quick test_timeout; - Alcotest.test_case "test multiple timeouts" `Quick test_multiple_timeouts + Alcotest.test_case "test multiple timeouts" `Quick test_multiple_timeouts; + Alcotest.test_case "test busy yielding" `Quick test_busy_yielding; ] end