Skip to content

Commit

Permalink
eio_linux: add subprocess support
Browse files Browse the repository at this point in the history
Co-authored-by: Patrick Ferris <[email protected]>
  • Loading branch information
talex5 and patricoferris committed Mar 27, 2023
1 parent 8d005ff commit 2528770
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 7 deletions.
1 change: 1 addition & 0 deletions lib_eio_linux/dune
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
(foreign_stubs
(language c)
(flags :standard -D_LARGEFILE64_SOURCE)
(include_dirs ../lib_eio/unix/include)
(names eio_stubs))
(libraries eio eio.utils eio.unix uring logs fmt))
16 changes: 9 additions & 7 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,15 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
continue k ((a :> Eio_unix.socket), (b :> Eio_unix.socket))
)
| Eio_unix.Private.Pipe sw -> Some (fun k ->
let r, w = Unix.pipe ~cloexec:true () in
(* See issue #319, PR #327 *)
Unix.set_nonblock r;
Unix.set_nonblock w;
let r = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true r) :> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd>) in
let w = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true w) :> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd>) in
continue k (r, w)
match
let r, w = Low_level.pipe ~sw in
let source = (flow r :> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd>) in
let sink = (flow w :> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd>) in
(source, sink)
with
| r -> continue k r
| exception Unix.Unix_error (code, name, arg) ->
discontinue k (Err.wrap code name arg)
)
| _ -> None
} in
Expand Down
42 changes: 42 additions & 0 deletions lib_eio_linux/eio_linux.mli
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,46 @@ module Low_level : sig
(** [getaddrinfo host] returns a list of IP addresses for [host]. [host] is either a domain name or
an ipaddress. *)

(** {1 Processes} *)

module Process : sig
type t
(** A child process. *)

(** Setup actions to perform in the child process. *)
module Fork_action : sig
type t = Eio_unix.Private.Fork_action.t

val execve : string -> argv:string array -> env:string array -> t
(** See execve(2).
This replaces the current executable,
so it only makes sense as the last action to be performed. *)

val chdir : string -> t
(** [chdir path] changes directory to [path]. *)

val fchdir : FD.t -> t
(** [fchdir dir] changes directory to [dir]. *)
end

val spawn : sw:Switch.t -> Fork_action.t list -> t
(** [spawn ~sw actions] forks a child process, which executes [actions].
The last action should be {!Fork_action.execve}.
You will typically want to do [Promise.await (exit_status child)] after this.
@param sw The child will be sent {!Sys.sigkill} if [sw] finishes. *)

val signal : t -> int -> unit
(** [signal t x] sends signal [x] to [t].
This is similar to doing [Unix.kill t.pid x],
except that it ensures no signal is sent after [t] has been reaped. *)

val pid : t -> int

val exit_status : t -> Unix.process_status Promise.t
(** [exit_status t] is a promise for the process's exit status. *)
end

end
45 changes: 45 additions & 0 deletions lib_eio_linux/eio_stubs.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#define _GNU_SOURCE
#include <linux/sched.h>

#include <sys/stat.h>
#include <sys/types.h>
#include <sys/eventfd.h>
Expand All @@ -7,8 +10,11 @@
#include <errno.h>
#include <dirent.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>

// We need caml_convert_signal_number
#define CAML_INTERNALS

#include <caml/mlvalues.h>
#include <caml/memory.h>
Expand All @@ -17,6 +23,8 @@
#include <caml/unixsupport.h>
#include <caml/bigarray.h>

#include "fork_action.h"

// Make sure we have enough space for at least one entry.
#define DIRENT_BUF_SIZE (PATH_MAX + sizeof(struct dirent64))

Expand Down Expand Up @@ -99,3 +107,40 @@ CAMLprim value caml_eio_getdents(value v_fd) {

CAMLreturn(result);
}

static int pidfd_send_signal(int pidfd, int sig, siginfo_t *info, unsigned int flags) {
return syscall(SYS_pidfd_send_signal, pidfd, sig, info, flags);
}

CAMLprim value caml_eio_pidfd_send_signal(value v_pidfd, value v_signal) {
CAMLparam0();
int res;

res = pidfd_send_signal(Int_val(v_pidfd), caml_convert_signal_number(Int_val(v_signal)), NULL, 0);
if (res == -1) uerror("pidfd_send_signal", Nothing);
CAMLreturn(Val_unit);
}

CAMLprim value caml_eio_clone3(value v_errors, value v_actions) {
CAMLparam1(v_actions);
CAMLlocal1(v_result);
pid_t child_pid;
int pidfd = -1; /* Is automatically close-on-exec */
struct clone_args cl_args = {
.flags = CLONE_PIDFD,
.pidfd = (uint64_t) &pidfd,
};

child_pid = syscall(SYS_clone3, &cl_args, sizeof(struct clone_args));
if (child_pid == 0) {
eio_unix_run_fork_actions(Int_val(v_errors), v_actions);
} else if (child_pid < 0) {
uerror("clone3", Nothing);
}

v_result = caml_alloc_tuple(2);
Store_field(v_result, 0, Val_long(child_pid));
Store_field(v_result, 1, Val_long(pidfd));

CAMLreturn(v_result);
}
2 changes: 2 additions & 0 deletions lib_eio_linux/fd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ type t = {
mutable release_hook : Eio.Switch.hook; (* Use this on close to remove switch's [on_release] hook. *)
}

let to_rcfd t = t.fd

let err_closed op = Invalid_argument (op ^ ": file descriptor used after calling close!")

let use t f ~if_closed = Rcfd.use t.fd f ~if_closed
Expand Down
83 changes: 83 additions & 0 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,86 @@ let getaddrinfo ~service node =
Eio_unix.run_in_systhread @@ fun () ->
Unix.getaddrinfo node service []
|> List.filter_map to_eio_sockaddr_t

let pipe ~sw =
let unix_r, unix_w = Unix.pipe ~cloexec:true () in
(* See issue #319, PR #327 *)
let r = Fd.of_unix ~sw ~seekable:false ~close_unix:true unix_r in
let w = Fd.of_unix ~sw ~seekable:false ~close_unix:true unix_w in
Unix.set_nonblock unix_r;
Unix.set_nonblock unix_w;
(r, w)

let with_pipe fn =
Switch.run @@ fun sw ->
let r, w = pipe ~sw in
fn r w

module Process = struct
module Rcfd = Eio_unix.Private.Rcfd

external eio_spawn :
Unix.file_descr ->
Eio_unix.Private.Fork_action.c_action list ->
int * Unix.file_descr = "caml_eio_clone3"

external pidfd_send_signal : Unix.file_descr -> int -> unit = "caml_eio_pidfd_send_signal"

type t = {
pid : int;
pid_fd : Fd.t;
exit_status : Unix.process_status Promise.t;
}

let exit_status t = t.exit_status
let pid t = t.pid

module Fork_action = struct
type t = Eio_unix.Private.Fork_action.t

let fchdir fd = Eio_unix.Private.Fork_action.fchdir (Fd.to_rcfd fd)
let chdir = Eio_unix.Private.Fork_action.chdir
let execve = Eio_unix.Private.Fork_action.execve
end

(* Read a (typically short) error message from a child process. *)
let rec read_response fd =
let buf = Cstruct.create 256 in
match readv fd [buf] with
| len -> Cstruct.to_string buf ~len ^ read_response fd
| exception End_of_file -> ""

let signal t signum =
Fd.use_exn "signal" t.pid_fd @@ fun pid_fd ->
pidfd_send_signal pid_fd signum

let rec waitpid pid =
match Unix.waitpid [] pid with
| p, status -> assert (p = pid); status
| exception Unix.Unix_error (EINTR, _, _) -> waitpid pid

let spawn ~sw actions =
with_pipe @@ fun errors_r errors_w ->
Eio_unix.Private.Fork_action.with_actions actions @@ fun c_actions ->
Switch.check sw;
let exit_status, set_exit_status = Promise.create () in
let t =
Fd.use_exn "errors-w" errors_w @@ fun errors_w ->
let pid, pid_fd = eio_spawn errors_w c_actions in
let pid_fd = Fd.of_unix ~sw ~seekable:false ~close_unix:true pid_fd in
{ pid; pid_fd; exit_status }
in
Fd.close errors_w;
let hook = Switch.on_release_cancellable sw (fun () -> signal t Sys.sigkill) in
Fiber.fork_daemon ~sw (fun () ->
await_readable t.pid_fd;
Promise.resolve set_exit_status (waitpid t.pid);
Switch.remove_hook hook;
Fd.close t.pid_fd;
`Stop_daemon
);
(* Check for errors starting the process. *)
match read_response errors_r with
| "" -> t (* Success! Execing the child closed [errors_w] and we got EOF. *)
| err -> failwith err
end
139 changes: 139 additions & 0 deletions lib_eio_linux/tests/spawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
```ocaml
# #require "eio_linux";;
```

```ocaml
open Eio.Std
module Process = Eio_linux.Low_level.Process
```

## Spawning processes

Setting environment variables:

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let child = Process.spawn ~sw Process.Fork_action.[
execve "/usr/bin/env"
~argv:[| "env" |]
~env:[| "FOO=bar" |];
] in
Promise.await (Process.exit_status child);;
FOO=bar
- : Unix.process_status = Unix.WEXITED 0
```

Changing directory:

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let child = Process.spawn ~sw Process.Fork_action.[
chdir "/";
execve "/usr/bin/env"
~argv:[| "env"; "pwd" |]
~env:(Unix.environment ())
] in
Promise.await (Process.exit_status child);;
/
- : Unix.process_status = Unix.WEXITED 0
```

Changing directory using a file descriptor:

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let root =
Eio_linux.Low_level.openat2 ~sw "/"
~seekable:false
~access:`R
~perm:0
~resolve:Uring.Resolve.empty
~flags:Uring.Open_flags.(cloexec + path + directory)
in
let child = Process.spawn ~sw Process.Fork_action.[
fchdir root;
execve "/usr/bin/env"
~argv:[| "env"; "pwd" |]
~env:(Unix.environment ())
] in
Promise.await (Process.exit_status child);;
/
- : Unix.process_status = Unix.WEXITED 0
```

Exit status:

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let child = Process.spawn ~sw Process.Fork_action.[
execve "/usr/bin/env"
~argv:[| "env"; "false" |]
~env:(Unix.environment ())
] in
Promise.await (Process.exit_status child);;
- : Unix.process_status = Unix.WEXITED 1
```

Failure starting child:

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
try
let _child =
Process.spawn ~sw Process.Fork_action.[
chdir "/idontexist";
execve "/usr/bin/env"
~argv:[| "env"; "pwd" |]
~env:(Unix.environment ())
]
in
assert false
with Failure ex ->
String.sub ex 0 7
- : string = "chdir: "
```

Signalling a running child:

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let child =
Process.spawn ~sw Process.Fork_action.[
execve "/usr/bin/env"
~argv:[| "env"; "sleep"; "1000" |]
~env:(Unix.environment ())
]
in
Process.signal child Sys.sigkill;
match Promise.await (Process.exit_status child) with
| Unix.WSIGNALED x when x = Sys.sigkill -> traceln "Child got SIGKILL"
| _ -> assert false;;
+Child got SIGKILL
- : unit = ()
```

Signalling an exited child does nothing:

```ocaml
# Eio_linux.run @@ fun _env ->
Switch.run @@ fun sw ->
let child =
Process.spawn ~sw Process.Fork_action.[
execve "/usr/bin/env"
~argv:[| "env" |]
~env:[| "FOO=bar" |];
]
in
ignore (Promise.await (Process.exit_status child) : Unix.process_status);
Process.signal child Sys.sigkill;;
FOO=bar
Exception:
Invalid_argument "signal: file descriptor used after calling close!".
```

0 comments on commit 2528770

Please sign in to comment.