Simplify and document scheduler
patricoferris committed Mar 13, 2023
1 parent 5c5f5ac commit bb9ca8a
Showing 5 changed files with 112 additions and 88 deletions.
4 changes: 0 additions & 4 deletions lib_eio_js/browser/dune
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
(name eio_browser)
(public_name eio_browser)
(language c)
(names stubs))
(js_of_ocaml (javascript_files runtime.js))
(libraries eio brr))
146 changes: 95 additions & 51 deletions lib_eio_js/browser/
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ end

module Ctf = Eio.Private.Ctf

type suspend = Suspend

module Suspended = struct
type 'a t = {
fiber : Eio.Private.Fiber_context.t;
k : ('a, unit) Effect.Deep.continuation;
k : ('a, suspend) Effect.Deep.continuation;

let tid t = Eio.Private.Fiber_context.tid t.fiber
Expand All @@ -64,59 +66,89 @@ module Suspended = struct
Effect.Deep.discontinue t.k ex

(* Resume the next runnable fiber, if any. *)
let rec wakeup run_q =
match Run_queue.pop run_q with
| Some f ->
f ();
wakeup run_q
| None -> ()

(* The Javascript backend scheduler is implemented as an event listener.
We don't need to worry about multiple domains. Here any time something
asynchronously enqueues a task to our queue, it also sends a wakeup event to
the event listener which will run the callback calling the scheduler. *)
module Scheduler = struct
type t = {
run_q : (unit -> unit) Run_queue.t;
mutable idle_callback : Jv.t option;
(* Scheduler description:
let v run_q =
let idle_callback = None in
{ run_q; idle_callback }
The Javascript, browser scheduler relies on yielding to the Javascript engine
to allow IO tasks to complete. There are a few different corner cases to be aware
- The [IO] job is used to fix the "busy yielding" problem.
- Calls to [scheduled_wakeup] are coalesced by checking if the timer has already
been set or not. If the timer callback is fired, the timer id is removed.
When do we need to "schedule a wakeup"? From the code this happens in two places,
whenever we hit the [IO] job or whenever we run one of the [enqueue_] functions
which are called in callbacks that are trying to enqueue a continuation to resume.
external _request_idle_callback : Jv.t -> Jv.t = "requestIdleCallbackShim"
external _cancel_idle_callback : Jv.t -> unit = "cancelIdleCallbackShim"
[IO] case: this is necessary in the "busy yielding" example.
let request_idle_callback cb =
_request_idle_callback (Jv.callback ~arity:1 (fun _ -> cb ()))
[enqueue_] case: this is necessary for IO. Consider calling [Eio_browser.Timeout.sleep ~ms:200].
This wants to register the continuation to be run whenever the callback passed to [set_timeout]
is run. The path through our scheduler would be:
let wakeup t =
(* No need to schedule a wakeup if the idle_callback is already set. *)
if Option.is_some t.idle_callback then () else begin
let idle_callback = request_idle_callback (fun () -> t.idle_callback <- None; wakeup t.run_q) in
t.idle_callback <- Some idle_callback
- The initial IO job is put in the run queue.
- The timeout function is called.
- We are then idle allowing the IO job's wakeup to run, calling the scheduler.
- The run queue is empty so we hit [None] and are idle again.
- The timeout callback function is called enqueuing the continuation but no wakeup
is currently set to actually run the scheduler, so we call wakeup.
module Scheduler = struct
type job =
| Runnable of (unit -> suspend)
| IO

type t = {
run_q : job Run_queue.t;
mutable io_queued : bool;
mutable scheduled_wakeup : int option;

let stop t =
Option.iter _cancel_idle_callback t.idle_callback;
t.idle_callback <- None
let rec next t = match Run_queue.pop t.run_q with
| Some (Runnable fn) ->
if not t.io_queued then begin
Run_queue.push t.run_q IO;
t.io_queued <- true
fn ()
| Some IO ->
t.io_queued <- false;
schedule_wakeup t;
| None -> Suspend

and schedule_wakeup t : unit = match t.scheduled_wakeup with
| Some _ -> ()
| None ->
(* Calling a timeout with 0 runs the callback "'immediately',
or more accurately, the next event cycle.". Note this will only
work on active tabs, meaning the scheduler will somewhat freeze if your
tab becomes inactive. *)
let timeout = G.set_timeout ~ms:0 (fun () ->
t.scheduled_wakeup <- None;
let Suspend = next t in ()
) in
t.scheduled_wakeup <- Some timeout

let v run_q =
{ run_q; io_queued = false; scheduled_wakeup = None }

let enqueue_thread t k v =
Run_queue.push t.run_q (fun () -> Suspended.continue k v);
wakeup t
Run_queue.push t.run_q (Runnable (fun () -> Suspended.continue k v));
schedule_wakeup t

let enqueue_failed_thread t k v =
Run_queue.push t.run_q (fun () -> Suspended.discontinue k v);
wakeup t
Run_queue.push t.run_q (Runnable (fun () -> Suspended.discontinue k v));
schedule_wakeup t

let enqueue_at_head t k v =
Run_queue.push_head t.run_q (fun () -> Suspended.continue k v);
wakeup t
Run_queue.push_head t.run_q (Runnable (fun () -> Suspended.continue k v));
schedule_wakeup t

type _ Effect.t += Enter_unchecked : (Scheduler.t -> 'a Suspended.t -> unit) -> 'a Effect.t
type _ Effect.t += Enter_unchecked : (Scheduler.t -> 'a Suspended.t -> suspend) -> 'a Effect.t
let enter_unchecked fn = Effect.perform (Enter_unchecked fn)

module Timeout = struct
Expand All @@ -126,7 +158,11 @@ module Timeout = struct
Fiber_context.clear_cancel_fn k.fiber;
Scheduler.enqueue_thread st k ()
) in
Fiber_context.set_cancel_fn k.fiber (fun exn -> G.stop_timer id; Scheduler.enqueue_failed_thread st k exn);
Fiber_context.set_cancel_fn k.fiber (fun exn ->
G.stop_timer id;
Scheduler.enqueue_failed_thread st k exn
); st

let await fut =
Expand All @@ -140,7 +176,8 @@ let await fut =
Fiber_context.clear_cancel_fn k.fiber;
Scheduler.enqueue_thread st k v
); st

let next_event : 'a Brr.Ev.type' -> -> 'a Brr.Ev.t = fun typ target ->
let opts = Brr.Ev.listen_opts ~once:true () in
Expand All @@ -150,28 +187,35 @@ let next_event : 'a Brr.Ev.type' -> -> 'a Brr.Ev.t = fun typ targe
will be called and so enqueue_thread will never be called even
if another event arrives. *)
let v = listen (fun v -> Fiber_context.clear_cancel_fn k.fiber; Scheduler.enqueue_thread st k v) in
Fiber_context.set_cancel_fn k.fiber (fun exn -> Ev.unlisten v; Scheduler.enqueue_failed_thread st k exn)
Fiber_context.set_cancel_fn k.fiber (fun exn -> Ev.unlisten v; Scheduler.enqueue_failed_thread st k exn); st

(* Largely based on the Eio_mock.Backend event loop. *)
let run main =
let run_q = Run_queue.create () in
let scheduler = Scheduler.v run_q in
Run_queue.push run_q IO;
let rec fork ~new_fiber:fiber fn =
Effect.Deep.match_with fn ()
{ retc = (fun () -> Fiber_context.destroy fiber);
{ retc = (fun () -> Fiber_context.destroy fiber; scheduler);
exnc = (fun ex ->
let bt = Printexc.get_raw_backtrace () in
Fiber_context.destroy fiber;
Printexc.raise_with_backtrace ex bt
effc = fun (type a) (e : a Effect.t) : ((a, unit) Effect.Deep.continuation -> unit) option ->
effc = fun (type a) (e : a Effect.t) : ((a, suspend) Effect.Deep.continuation -> suspend) option ->
match e with
| Eio.Private.Effects.Suspend f -> Some (fun k ->
let k = { Suspended.k; fiber } in
f fiber (function
| Ok v -> Scheduler.enqueue_thread scheduler k v
| Error ex -> Scheduler.enqueue_failed_thread scheduler k ex
| Ok v -> (
Scheduler.enqueue_thread scheduler k v;
| Error ex -> (
Scheduler.enqueue_failed_thread scheduler k ex;
); scheduler
| Enter_unchecked fn -> Some (fun k ->
fn scheduler { Suspended.k; fiber }
Expand All @@ -189,5 +233,5 @@ let run main =
let new_fiber = Fiber_context.make_root () in
let result, r = Fut.create () in
let () = fork ~new_fiber (fun () -> r (main ())) in (fun v -> Scheduler.stop scheduler; v) result
let Suspend = fork ~new_fiber (fun () -> r (main ())) in
27 changes: 0 additions & 27 deletions lib_eio_js/browser/runtime.js

5 changes: 0 additions & 5 deletions lib_eio_js/browser/stubs.c

18 changes: 17 additions & 1 deletion lib_eio_js/browser/test/
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,28 @@ module Browser_tests = struct
Alcotest.(check (list int)) "timeouts" lst v

let test_busy_yielding () =
let i = ref 0 in
let rec loop () =
Eio.Fiber.yield ();
incr i;
(* An early cut off to prevent browser tab from crashing! *)
if !i > 1000000 then () else loop ()
Fiber.yield ();
Eio_browser.Timeout.sleep ~ms:10;
(fun () -> Eio_browser.Timeout.sleep ~ms:10);
if !i > 1000000 then "Yielding was not cancelled"

let tests = [
Alcotest.test_case "timeout cancelled" `Quick test_timeout_cancel;
Alcotest.test_case "fut await" `Quick test_fut_await;
Alcotest.test_case "fut cancelled" `Quick test_fut_cancel;
Alcotest.test_case "test timeout" `Quick test_timeout;
Alcotest.test_case "test multiple timeouts" `Quick test_multiple_timeouts
Alcotest.test_case "test multiple timeouts" `Quick test_multiple_timeouts;
Alcotest.test_case "test busy yielding" `Quick test_busy_yielding;

Expand Down

0 comments on commit bb9ca8a

