Skip to content

Commit

Permalink
Process API for kqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
patricoferris committed Feb 2, 2023
1 parent f9381d9 commit 9875511
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 27 deletions.
4 changes: 2 additions & 2 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type status = Exited of int | Signaled of int | Stopped of int

class virtual mgr = object (_ : #Generic.t)
method probe _ = None
method virtual spawn : sw:Switch.t -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t
method virtual spawn : sw:Switch.t -> ?cwd:Fs.dir Path.t -> ?stdin:Flow.source -> ?stdout:Flow.sink -> ?stderr:Flow.sink -> string -> string list -> t
end

let spawn ~sw t ?cwd ~stdin ~stdout ~stderr cmd args = t#spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args
let spawn ~sw t ?cwd ?stdin ?stdout ?stderr cmd args = t#spawn ~sw ?cwd ?stdin ?stdout ?stderr cmd args
14 changes: 7 additions & 7 deletions lib_eio/process.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ type status = Exited of int | Signaled of int | Stopped of int

class virtual mgr : object
inherit Generic.t
method virtual spawn :
sw:Switch.t ->
method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
stdin:Flow.source ->
stdout:Flow.sink ->
stderr:Flow.sink ->
?stdin:Flow.source ->
?stdout:Flow.sink ->
?stderr:Flow.sink ->
string ->
string list ->
t
end
(** A process manager capable of spawning new processes. *)

val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t
val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> ?stdin:Flow.source -> ?stdout:Flow.sink -> ?stderr:Flow.sink -> string -> string list -> t
(** [spawn ~sw mgr ?cwd ~stdin ~stdout ~stderr cmd args] creates a new subprocess that is connected to the
switch [sw]. A process will be stopped when the switch is released.
You must provide a standard input and outputs that are backed by file descriptors and
[cwd] will optionally change the current working directory of the process.*)
86 changes: 85 additions & 1 deletion lib_eio_kqueue/eio_kqueue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type io_job =
| Accept : Switch.t * (FD.t * Unix.sockaddr) Suspended.t -> io_job
| Connect : unit Suspended.t -> io_job
| Clock : unit Suspended.t -> io_job
| Proc_exited : Unix.process_status Suspended.t -> io_job

type _ Effect.t += Cancel : (io_job Heap.entry * exn * Kq.Events.t) -> unit Effect.t

Expand Down Expand Up @@ -202,6 +203,7 @@ let discontinue_io st exn = function
| Accept (_, k) -> enqueue_failed_thread st k exn
| Connect k -> enqueue_failed_thread st k exn
| Clock k -> enqueue_failed_thread st k exn
| Proc_exited k -> enqueue_failed_thread st k exn

let with_cancel_hook ~action st fn =
match Fiber_context.get_error action.Suspended.fiber with
Expand Down Expand Up @@ -315,6 +317,50 @@ module Low_level = struct
let arc4random { Cstruct.buffer; off; len } =
let got = eio_arc4random buffer off len in
assert (len = got)

module Process = struct
type t = {
pid : int;
mutable hook : Switch.hook option;
mutable status : Unix.process_status option;
}

let await_process_exit pid =
enter @@ fun st action ->
Atomic.incr st.pending_io;
let retry = with_cancel_hook ~action st (fun () ->
let flags = Kqueue.Flag.(add + oneshot) in
let filter = Kqueue.Filter.proc in
let note = Kqueue.Note.exit in
Kq.submit ~note ~flags ~filter ~ident:pid st.kqueue (Proc_exited action)
) in
if retry then Eio.traceln "TODO: retry"

let await_exit t =
match t.status with
| Some status -> status
| None ->
let status = await_process_exit t.pid in
Option.iter Switch.remove_hook t.hook;
t.status <- Some status;
status

let spawn ?env ?cwd ?stdin ?stdout ?stderr ~sw prog argv =
let paths = Option.map (fun v -> String.split_on_char ':' v) (Sys.getenv_opt "PATH") |> Option.value ~default:[ "/usr/bin"; "/usr/local/bin" ] in
let prog = match Eio_unix.Spawn.resolve_program ~paths prog with
| Some prog -> prog
| None -> raise (Eio.Fs.err (Eio.Fs.Not_found (Eio_unix.Unix_error (Unix.ENOENT, "", ""))))
in
let pid = Eio_unix.Spawn.spawn ?env ?cwd ?stdin ?stdout ?stderr ~prog ~argv () in
let t = { pid; hook = None; status = None } in
let hook = Switch.on_release_cancellable sw (fun () ->
Unix.kill pid Sys.sigkill; ignore (await_exit t)
) in
t.hook <- Some hook;
t

let send_signal t i = Unix.kill t.pid i
end
end

let fallback_copy src dst =
Expand Down Expand Up @@ -371,6 +417,9 @@ let flow_null fd =
method unix_fd op = FD.to_unix op fd
end

type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty
let get_fd_opt t = Eio.Generic.probe t FD

let flow fd =
object (_ : <Flow.source; Flow.sink; ..>)
method fd = fd
Expand All @@ -379,7 +428,7 @@ let flow fd =
method stat = FD.fstat fd

method probe : type a. a Eio.Generic.ty -> a option = function
(* | FD -> Some fd *)
| FD -> Some fd
| Eio_unix.Private.Unix_file_descr op -> Some (FD.to_unix op fd)
| _ -> None

Expand Down Expand Up @@ -561,6 +610,13 @@ and complete_io st ((event, ready) : io_job Kq.ready) = match ready with
enqueue_thread st k ();
Atomic.decr st.pending_io
)
| Proc_exited k -> (
Fiber_context.clear_cancel_fn k.fiber;
check_for_error st k event;
let pid = Kq.Events.get_ident event in
enqueue_thread st k (Unix.waitpid [] pid |> snd);
Atomic.decr st.pending_io
)

let listening_socket fd = object
inherit Eio.Net.listening_socket
Expand Down Expand Up @@ -818,6 +874,7 @@ type stdenv = <
stdout : Flow.sink;
stderr : Flow.sink;
net : Eio.Net.t;
process_mgr : Eio.Process.mgr;
domain_mgr : Eio.Domain_manager.t;
clock : Eio.Time.clock;
mono_clock : Eio.Time.Mono.t;
Expand Down Expand Up @@ -906,6 +963,32 @@ let secure_random = object
method read_into buf = Low_level.arc4random buf; Cstruct.length buf
end

let get_fd_or_err flow =
match get_fd_opt flow with
| Some fd -> fd
| None -> failwith "TODO: Only flows backed by FDs can be passed to spawn"

let process_mgr ~stdin ~stdout ~stderr = object
inherit Eio.Process.mgr

method spawn ~sw ?cwd ?(stdin=stdin) ?(stdout=stdout) ?(stderr=stderr) cmd args =
let stdin = get_fd_or_err stdin |> FD.get_exn "spawn" in
let stdout = get_fd_or_err stdout |> FD.get_exn "spawn" in
let stderr = get_fd_or_err stderr |> FD.get_exn "spawn" in
let cwd = Option.map (fun (_, s) -> Spawn.Working_dir.Path s) cwd in
let t = Low_level.Process.spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args in
object
inherit Eio.Process.t
method pid = t.pid
method stop = Low_level.Process.send_signal t Sys.sigkill
method await_exit =
match Low_level.Process.await_exit t with
| Unix.WEXITED i -> Eio.Process.Exited i
| Unix.WSIGNALED i -> Eio.Process.Signaled i
| Unix.WSTOPPED i -> Eio.Process.Stopped i
end
end

let stdenv ~run_event_loop =
let of_unix fd = FD.of_unix_no_hook ~seekable:(FD.is_seekable fd) ~close_unix:true fd in
let stdout = lazy (flow (of_unix Unix.stdout)) in
Expand All @@ -916,6 +999,7 @@ let stdenv ~run_event_loop =
method stdout = (Lazy.force stdout :> Flow.sink)
method stderr = (Lazy.force stderr :> Flow.sink)
method net = net
method process_mgr = process_mgr ~stdin:(Lazy.force stdin :> Flow.source) ~stdout:(Lazy.force stdout :> Flow.sink) ~stderr:(Lazy.force stderr :> Flow.sink)
method domain_mgr = domain_mgr ~run_event_loop
method clock = clock
method mono_clock = mono_clock
Expand Down
7 changes: 4 additions & 3 deletions lib_eio_kqueue/kq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ let with_id t fn a = with_id_full t fn a ~extra_data:()
module Events = struct
include Kqueue.Event_list.Event

let singleton ?flags ?filter ?data ident udata =
let singleton ?note ?flags ?filter ?data ident udata =
let evs = Kqueue.Event_list.create 1 in
let ev = Kqueue.Event_list.get evs 0 in
Option.iter (Kqueue.Event_list.Event.set_flags ev) flags;
Option.iter (Kqueue.Event_list.Event.set_filter ev) filter;
Option.iter (Kqueue.Event_list.Event.set_data ev) data;
Option.iter (Kqueue.Event_list.Event.set_fflags ev) note;
Kqueue.Event_list.Event.set_ident ev ident;
Kqueue.Event_list.Event.set_udata ev udata;
evs
Expand All @@ -45,10 +46,10 @@ let submit_to_kqueue kq evs =
let v : int = Kqueue.kevent kq ~changelist:evs ~eventlist:Kqueue.Event_list.null Kqueue.Timeout.immediate in
assert (v = 0)

let submit ?flags ?filter ?data ?ident t user_data =
let submit ?note ?flags ?filter ?data ?ident t user_data =
with_id t (fun ptr ->
let ident = Option.value ~default:(ptr :> int) ident in
let evs = Events.singleton ?flags ?filter ?data ident (ptr :> int) in
let evs = Events.singleton ?note ?flags ?filter ?data ident (ptr :> int) in
let ev = Kqueue.Event_list.get evs 0 in
submit_to_kqueue t.kq evs;
Some ev) user_data
Expand Down
12 changes: 6 additions & 6 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ module Low_level = struct
|> List.filter_map to_eio_sockaddr_t

module Process = struct
external pidfd_open : int -> Unix.file_descr = "caml_eio_pidfd_open"
external pidfd_open : int -> Unix.file_descr = "caml_eio_pidfd_open"

type t = {
process : FD.t;
Expand All @@ -937,7 +937,7 @@ module Low_level = struct
Option.iter Switch.remove_hook t.hook;
t.status <- Some status;
status

let spawn ?env ?cwd ?stdin ?stdout ?stderr ~sw prog argv =
let paths = Option.map (fun v -> String.split_on_char ':' v) (Sys.getenv_opt "PATH") |> Option.value ~default:[ "/usr/bin"; "/usr/local/bin" ] in
let prog = match Eio_unix.Spawn.resolve_program ~paths prog with
Expand Down Expand Up @@ -1386,10 +1386,10 @@ let get_fd_or_err flow =
| Some fd -> fd
| None -> failwith "TODO: Only flows backed by FDs can be passed to spawn"

let process_mgr = object
let process_mgr ~stdout ~stderr ~stdin = object
inherit Eio.Process.mgr

method spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args =
method spawn ~sw ?cwd ?(stdin=stdin) ?(stdout=stdout) ?(stderr=stderr) cmd args =
let stdin = get_fd_or_err stdin |> FD.get_exn "spawn" in
let stdout = get_fd_or_err stdout |> FD.get_exn "spawn" in
let stderr = get_fd_or_err stderr |> FD.get_exn "spawn" in
Expand All @@ -1399,7 +1399,7 @@ let process_mgr = object
inherit Eio.Process.t
method pid = t.pid
method stop = Low_level.Process.send_signal t Sys.sigkill
method await_exit =
method await_exit =
match Low_level.Process.await_exit t with
| Unix.WEXITED i -> Eio.Process.Exited i
| Unix.WSIGNALED i -> Eio.Process.Signaled i
Expand All @@ -1419,7 +1419,7 @@ let stdenv ~run_event_loop =
method stdout = Lazy.force stdout
method stderr = Lazy.force stderr
method net = net
method process_mgr = process_mgr
method process_mgr = process_mgr ~stdin:(Lazy.force stdin :> Eio.Flow.source) ~stdout:(Lazy.force stdout :> Eio.Flow.sink) ~stderr:(Lazy.force stderr :> Eio.Flow.sink)
method domain_mgr = domain_mgr ~run_event_loop
method clock = clock
method mono_clock = mono_clock
Expand Down
4 changes: 2 additions & 2 deletions lib_main/dune
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(library
(name eio_main)
(public_name eio_main)
(libraries eio_luv
(libraries
(select eio_main.ml from
(eio_linux -> eio_main.linux.ml)
(eio_kqueue -> eio_main.kqueue.ml)
(_ -> eio_main.default.ml))))
(eio_luv -> eio_main.default.ml))))
7 changes: 1 addition & 6 deletions lib_main/eio_main.kqueue.ml
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
let run_luv fn = Eio_luv.run (fun env -> fn (env :> Eio.Stdenv.t))
let run_kqueue fn = Eio_kqueue.run (fun env -> fn (env :> Eio.Stdenv.t))

let run fn =
match Sys.getenv_opt "EIO_BACKEND" with
| Some "kqueue" ->
run_kqueue fn
| _ -> run_luv fn
let run fn = run_kqueue fn

0 comments on commit 9875511

Please sign in to comment.