Skip to content

Commit

Permalink
Lwt.Canceled support for MultiWorkerLwt
Browse files Browse the repository at this point in the history
Summary:
This is actually relatively simple. The general idea:

1. `MultiWorkerLwt.call` runs N worker controllers in parallel using `LwtUtils.iter_all`
2. Someone tries to cancel a thread which is running `MultiWorkerLwt.call`
3. Canceling the `iter_all` propagates the cancellation to each of the N worker controllers automatically
4. A worker controller which is sending a job or merging a job fails immediately (the worker process is idle)
5. A worker controller which is waiting for the worker process to finish a job handles the cancellation, signals all workers to cancel, and waits for its worker to finish being cancelled
6. `MultiWorkerLwt.call` waits for all worker controllers to finish being cancelled and then re-enables all the workers

Reviewed By: avikchaudhuri

Differential Revision: D8161600

fbshipit-source-id: e00fadc690cc73b610f46455260499c121f6087f
  • Loading branch information
gabelevi authored and facebook-github-bot committed Jun 27, 2018
1 parent 9727a70 commit b729d93
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
16 changes: 12 additions & 4 deletions hack/procs/multiWorkerLwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,17 @@ include MultiWorker.CallFunctor (struct
run_worker worker
in

let%lwt () = LwtUtils.iter_all (List.map run_worker workers) in
let%lwt () =
let worker_threads = List.map run_worker workers in
try%lwt
LwtUtils.iter_all worker_threads
with Lwt.Canceled as exn ->
Hh_logger.info ~exn "Cancelling MultiWorkerLwt.multi_threaded_call";
(* For most exceptions, we want to propagate the exception as soon as one worker throws.
* However, for Canceled we want to wait for all the workers to process the Canceled.
* Lwt.join will wait for every thread to finish or fail *)
(Lwt.join worker_threads) [%lwt.finally SharedMem.resume_workers (); Lwt.return_unit]
in

Lwt.return (!acc)
end)
Expand All @@ -80,9 +90,7 @@ let call workers ~job ~merge ~neutral ~next =
raise MultiWorkersBusy
else begin
is_busy := true;
let%lwt result = call workers ~job ~merge ~neutral ~next in
is_busy := false;
Lwt.return result
(call workers ~job ~merge ~neutral ~next) [%lwt.finally is_busy := false; Lwt.return_unit]
end

(* A separate abstract type from MultiWorker.worker forces users to always use MultiWorkerLwt *)
Expand Down
15 changes: 14 additions & 1 deletion hack/procs/workerControllerLwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ let call w (type a) (type b) (f : a -> b) (x : a) : b Lwt.t =
let _ = Marshal_tools.to_fd_with_preamble ~flags:[Marshal.Closures] outfd request in
Lwt.return_unit
with exn ->
Hh_logger.error ~exn "Failed to read response from work #%d" (worker_id w);
(* Failed to send the job to the worker. Is it because the worker is dead or is it
* something else? *)
let%lwt pid, status = Lwt_unix.waitpid [Unix.WNOHANG] slave_pid in
Expand All @@ -69,7 +70,19 @@ let call w (type a) (type b) (f : a -> b) (x : a) : b Lwt.t =
(* Read in a lwt-unfriendly, blocking manner from the worker *)
(* Due to https://github.com/ocsigen/lwt/issues/564, annotation cannot go on let%let node *)
Lwt.return (Marshal_tools.from_fd_with_preamble infd: (b * Measure.record_data))
with exn ->
with
| Lwt.Canceled as exn ->
(* Worker is handling a job but we're cancelling *)
Hh_logger.info ~exn "Stopping running worker #%d" (worker_id w);
(* Each worker might call this but that's ok *)
SharedMem.stop_workers ();
(* Wait for the worker to finish cancelling *)
let%lwt () = Lwt_unix.wait_read infd_lwt in
(* Read the junk from the pipe *)
let _ = Marshal_tools.from_fd_with_preamble infd in
Hh_logger.info ~exn "Finished cancelling running worker #%d" (worker_id w);
raise exn
| exn ->
let%lwt pid, status = Lwt_unix.waitpid [Unix.WNOHANG] slave_pid in
begin match pid, status with
| 0, _ | _, Unix.WEXITED 0 ->
Expand Down

0 comments on commit b729d93

Please sign in to comment.