diff --git a/lib_eio/process.ml b/lib_eio/process.ml index 4cacd78e9..a09c64385 100644 --- a/lib_eio/process.ml +++ b/lib_eio/process.ml @@ -18,7 +18,7 @@ type status = Exited of int | Signaled of int | Stopped of int class virtual mgr = object (_ : #Generic.t) method probe _ = None - method virtual spawn : sw:Switch.t -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t + method virtual spawn : sw:Switch.t -> ?cwd:Fs.dir Path.t -> ?stdin:Flow.source -> ?stdout:Flow.sink -> ?stderr:Flow.sink -> string -> string list -> t end - let spawn ~sw t ?cwd ~stdin ~stdout ~stderr cmd args = t#spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args + let spawn ~sw t ?cwd ?stdin ?stdout ?stderr cmd args = t#spawn ~sw ?cwd ?stdin ?stdout ?stderr cmd args diff --git a/lib_eio/process.mli b/lib_eio/process.mli index d34bcb801..5ca28a5ad 100644 --- a/lib_eio/process.mli +++ b/lib_eio/process.mli @@ -21,21 +21,21 @@ type status = Exited of int | Signaled of int | Stopped of int class virtual mgr : object inherit Generic.t - method virtual spawn : - sw:Switch.t -> + method virtual spawn : + sw:Switch.t -> ?cwd:Fs.dir Path.t -> - stdin:Flow.source -> - stdout:Flow.sink -> - stderr:Flow.sink -> + ?stdin:Flow.source -> + ?stdout:Flow.sink -> + ?stderr:Flow.sink -> string -> string list -> t end (** A process manager capable of spawning new processes. *) - val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t + val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> ?stdin:Flow.source -> ?stdout:Flow.sink -> ?stderr:Flow.sink -> string -> string list -> t (** [spawn ~sw mgr ?cwd ~stdin ~stdout ~stderr cmd args] creates a new subprocess that is connected to the switch [sw]. A process will be stopped when the switch is released. - + You must provide a standard input and outputs that are backed by file descriptors and [cwd] will optionally change the current working directory of the process.*) diff --git a/lib_eio_kqueue/eio_kqueue.ml b/lib_eio_kqueue/eio_kqueue.ml index 6c1d5f708..84cacf695 100644 --- a/lib_eio_kqueue/eio_kqueue.ml +++ b/lib_eio_kqueue/eio_kqueue.ml @@ -154,6 +154,7 @@ type io_job = | Accept : Switch.t * (FD.t * Unix.sockaddr) Suspended.t -> io_job | Connect : unit Suspended.t -> io_job | Clock : unit Suspended.t -> io_job + | Proc_exited : Unix.process_status Suspended.t -> io_job type _ Effect.t += Cancel : (io_job Heap.entry * exn * Kq.Events.t) -> unit Effect.t @@ -202,6 +203,7 @@ let discontinue_io st exn = function | Accept (_, k) -> enqueue_failed_thread st k exn | Connect k -> enqueue_failed_thread st k exn | Clock k -> enqueue_failed_thread st k exn + | Proc_exited k -> enqueue_failed_thread st k exn let with_cancel_hook ~action st fn = match Fiber_context.get_error action.Suspended.fiber with @@ -315,6 +317,50 @@ module Low_level = struct let arc4random { Cstruct.buffer; off; len } = let got = eio_arc4random buffer off len in assert (len = got) + + module Process = struct + type t = { + pid : int; + mutable hook : Switch.hook option; + mutable status : Unix.process_status option; + } + + let await_process_exit pid = + enter @@ fun st action -> + Atomic.incr st.pending_io; + let retry = with_cancel_hook ~action st (fun () -> + let flags = Kqueue.Flag.(add + oneshot) in + let filter = Kqueue.Filter.proc in + let note = Kqueue.Note.exit in + Kq.submit ~note ~flags ~filter ~ident:pid st.kqueue (Proc_exited action) + ) in + if retry then Eio.traceln "TODO: retry" + + let await_exit t = + match t.status with + | Some status -> status + | None -> + let status = await_process_exit t.pid in + Option.iter Switch.remove_hook t.hook; + t.status <- Some status; + status + + let spawn ?env ?cwd ?stdin ?stdout ?stderr ~sw prog argv = + let paths = Option.map (fun v -> String.split_on_char ':' v) (Sys.getenv_opt "PATH") |> Option.value ~default:[ "/usr/bin"; "/usr/local/bin" ] in + let prog = match Eio_unix.Spawn.resolve_program ~paths prog with + | Some prog -> prog + | None -> raise (Eio.Fs.err (Eio.Fs.Not_found (Eio_unix.Unix_error (Unix.ENOENT, "", "")))) + in + let pid = Eio_unix.Spawn.spawn ?env ?cwd ?stdin ?stdout ?stderr ~prog ~argv () in + let t = { pid; hook = None; status = None } in + let hook = Switch.on_release_cancellable sw (fun () -> + Unix.kill pid Sys.sigkill; ignore (await_exit t) + ) in + t.hook <- Some hook; + t + + let send_signal t i = Unix.kill t.pid i + end end let fallback_copy src dst = @@ -371,6 +417,9 @@ let flow_null fd = method unix_fd op = FD.to_unix op fd end +type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty +let get_fd_opt t = Eio.Generic.probe t FD + let flow fd = object (_ : ) method fd = fd @@ -379,7 +428,7 @@ let flow fd = method stat = FD.fstat fd method probe : type a. a Eio.Generic.ty -> a option = function - (* | FD -> Some fd *) + | FD -> Some fd | Eio_unix.Private.Unix_file_descr op -> Some (FD.to_unix op fd) | _ -> None @@ -561,6 +610,13 @@ and complete_io st ((event, ready) : io_job Kq.ready) = match ready with enqueue_thread st k (); Atomic.decr st.pending_io ) + | Proc_exited k -> ( + Fiber_context.clear_cancel_fn k.fiber; + check_for_error st k event; + let pid = Kq.Events.get_ident event in + enqueue_thread st k (Unix.waitpid [] pid |> snd); + Atomic.decr st.pending_io + ) let listening_socket fd = object inherit Eio.Net.listening_socket @@ -818,6 +874,7 @@ type stdenv = < stdout : Flow.sink; stderr : Flow.sink; net : Eio.Net.t; + process_mgr : Eio.Process.mgr; domain_mgr : Eio.Domain_manager.t; clock : Eio.Time.clock; mono_clock : Eio.Time.Mono.t; @@ -906,6 +963,32 @@ let secure_random = object method read_into buf = Low_level.arc4random buf; Cstruct.length buf end +let get_fd_or_err flow = + match get_fd_opt flow with + | Some fd -> fd + | None -> failwith "TODO: Only flows backed by FDs can be passed to spawn" + +let process_mgr ~stdin ~stdout ~stderr = object + inherit Eio.Process.mgr + + method spawn ~sw ?cwd ?(stdin=stdin) ?(stdout=stdout) ?(stderr=stderr) cmd args = + let stdin = get_fd_or_err stdin |> FD.get_exn "spawn" in + let stdout = get_fd_or_err stdout |> FD.get_exn "spawn" in + let stderr = get_fd_or_err stderr |> FD.get_exn "spawn" in + let cwd = Option.map (fun (_, s) -> Spawn.Working_dir.Path s) cwd in + let t = Low_level.Process.spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args in + object + inherit Eio.Process.t + method pid = t.pid + method stop = Low_level.Process.send_signal t Sys.sigkill + method await_exit = + match Low_level.Process.await_exit t with + | Unix.WEXITED i -> Eio.Process.Exited i + | Unix.WSIGNALED i -> Eio.Process.Signaled i + | Unix.WSTOPPED i -> Eio.Process.Stopped i + end +end + let stdenv ~run_event_loop = let of_unix fd = FD.of_unix_no_hook ~seekable:(FD.is_seekable fd) ~close_unix:true fd in let stdout = lazy (flow (of_unix Unix.stdout)) in @@ -916,6 +999,7 @@ let stdenv ~run_event_loop = method stdout = (Lazy.force stdout :> Flow.sink) method stderr = (Lazy.force stderr :> Flow.sink) method net = net + method process_mgr = process_mgr ~stdin:(Lazy.force stdin :> Flow.source) ~stdout:(Lazy.force stdout :> Flow.sink) ~stderr:(Lazy.force stderr :> Flow.sink) method domain_mgr = domain_mgr ~run_event_loop method clock = clock method mono_clock = mono_clock diff --git a/lib_eio_kqueue/kq.ml b/lib_eio_kqueue/kq.ml index 64a11e32d..542b79444 100644 --- a/lib_eio_kqueue/kq.ml +++ b/lib_eio_kqueue/kq.ml @@ -30,12 +30,13 @@ let with_id t fn a = with_id_full t fn a ~extra_data:() module Events = struct include Kqueue.Event_list.Event - let singleton ?flags ?filter ?data ident udata = + let singleton ?note ?flags ?filter ?data ident udata = let evs = Kqueue.Event_list.create 1 in let ev = Kqueue.Event_list.get evs 0 in Option.iter (Kqueue.Event_list.Event.set_flags ev) flags; Option.iter (Kqueue.Event_list.Event.set_filter ev) filter; Option.iter (Kqueue.Event_list.Event.set_data ev) data; + Option.iter (Kqueue.Event_list.Event.set_fflags ev) note; Kqueue.Event_list.Event.set_ident ev ident; Kqueue.Event_list.Event.set_udata ev udata; evs @@ -45,10 +46,10 @@ let submit_to_kqueue kq evs = let v : int = Kqueue.kevent kq ~changelist:evs ~eventlist:Kqueue.Event_list.null Kqueue.Timeout.immediate in assert (v = 0) -let submit ?flags ?filter ?data ?ident t user_data = +let submit ?note ?flags ?filter ?data ?ident t user_data = with_id t (fun ptr -> let ident = Option.value ~default:(ptr :> int) ident in - let evs = Events.singleton ?flags ?filter ?data ident (ptr :> int) in + let evs = Events.singleton ?note ?flags ?filter ?data ident (ptr :> int) in let ev = Kqueue.Event_list.get evs 0 in submit_to_kqueue t.kq evs; Some ev) user_data diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 3632d255a..c4c7eb818 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -919,7 +919,7 @@ module Low_level = struct |> List.filter_map to_eio_sockaddr_t module Process = struct - external pidfd_open : int -> Unix.file_descr = "caml_eio_pidfd_open" + external pidfd_open : int -> Unix.file_descr = "caml_eio_pidfd_open" type t = { process : FD.t; @@ -937,7 +937,7 @@ module Low_level = struct Option.iter Switch.remove_hook t.hook; t.status <- Some status; status - + let spawn ?env ?cwd ?stdin ?stdout ?stderr ~sw prog argv = let paths = Option.map (fun v -> String.split_on_char ':' v) (Sys.getenv_opt "PATH") |> Option.value ~default:[ "/usr/bin"; "/usr/local/bin" ] in let prog = match Eio_unix.Spawn.resolve_program ~paths prog with @@ -1386,10 +1386,10 @@ let get_fd_or_err flow = | Some fd -> fd | None -> failwith "TODO: Only flows backed by FDs can be passed to spawn" -let process_mgr = object +let process_mgr ~stdout ~stderr ~stdin = object inherit Eio.Process.mgr - method spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args = + method spawn ~sw ?cwd ?(stdin=stdin) ?(stdout=stdout) ?(stderr=stderr) cmd args = let stdin = get_fd_or_err stdin |> FD.get_exn "spawn" in let stdout = get_fd_or_err stdout |> FD.get_exn "spawn" in let stderr = get_fd_or_err stderr |> FD.get_exn "spawn" in @@ -1399,7 +1399,7 @@ let process_mgr = object inherit Eio.Process.t method pid = t.pid method stop = Low_level.Process.send_signal t Sys.sigkill - method await_exit = + method await_exit = match Low_level.Process.await_exit t with | Unix.WEXITED i -> Eio.Process.Exited i | Unix.WSIGNALED i -> Eio.Process.Signaled i @@ -1419,7 +1419,7 @@ let stdenv ~run_event_loop = method stdout = Lazy.force stdout method stderr = Lazy.force stderr method net = net - method process_mgr = process_mgr + method process_mgr = process_mgr ~stdin:(Lazy.force stdin :> Eio.Flow.source) ~stdout:(Lazy.force stdout :> Eio.Flow.sink) ~stderr:(Lazy.force stderr :> Eio.Flow.sink) method domain_mgr = domain_mgr ~run_event_loop method clock = clock method mono_clock = mono_clock diff --git a/lib_main/dune b/lib_main/dune index eda9a51e3..18b269ab3 100644 --- a/lib_main/dune +++ b/lib_main/dune @@ -1,8 +1,8 @@ (library (name eio_main) (public_name eio_main) - (libraries eio_luv + (libraries (select eio_main.ml from (eio_linux -> eio_main.linux.ml) (eio_kqueue -> eio_main.kqueue.ml) - (_ -> eio_main.default.ml)))) + (eio_luv -> eio_main.default.ml)))) diff --git a/lib_main/eio_main.kqueue.ml b/lib_main/eio_main.kqueue.ml index f1d8abb02..9518fa8f2 100644 --- a/lib_main/eio_main.kqueue.ml +++ b/lib_main/eio_main.kqueue.ml @@ -1,8 +1,3 @@ -let run_luv fn = Eio_luv.run (fun env -> fn (env :> Eio.Stdenv.t)) let run_kqueue fn = Eio_kqueue.run (fun env -> fn (env :> Eio.Stdenv.t)) -let run fn = - match Sys.getenv_opt "EIO_BACKEND" with - | Some "kqueue" -> - run_kqueue fn - | _ -> run_luv fn \ No newline at end of file +let run fn = run_kqueue fn \ No newline at end of file