Skip to content

Commit

Permalink
Add support for file descriptor passing
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Michaelis <[email protected]>
  • Loading branch information
mgjm committed Aug 15, 2023
1 parent 7214897 commit af3e813
Show file tree
Hide file tree
Showing 15 changed files with 1,370 additions and 225 deletions.
63 changes: 59 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ interface Conmon {
metadata @11 :Metadata; # Standard metadata to carry.
envVars @12 :TextTextMap;
cgroupManager @13 :CgroupManager;
additionalFds @14 :List(UInt64);
leakFds @15 :List(UInt64);
}

struct LogDriver {
Expand Down Expand Up @@ -169,6 +171,18 @@ interface Conmon {

createNamespaces @6 (request: CreateNamespacesRequest) -> (response: CreateNamespacesResponse);

###############################################
# StartFdSocket
struct StartFdSocketRequest {
metadata @0 :Metadata; # Standard metadata to carry.
}

struct StartFdSocketResponse {
path @0 :Text; # The path to the fd socket.
}

startFdSocket @7 (request: StartFdSocketRequest) -> (response: StartFdSocketResponse);

###############################################
# Helper types

Expand Down
2 changes: 2 additions & 0 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ anyhow = "1.0.72"
capnp = "0.17.2"
capnp-rpc = "0.17.0"
clap = { version = "4.3.8", features = ["color", "cargo", "deprecated", "derive", "deprecated", "env", "string", "unicode", "wrap_help"] }
command-fds = { version = "0.2.2", features = ["tokio"] }
conmon-common = { path = "../common" }
futures = "0.3.28"
getset = "0.1.2"
Expand All @@ -37,6 +38,7 @@ tempfile = "3.7.1"
tokio = { version = "1.31.0", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] }
tokio-eventfd = "0.2.0"
tokio-fd = "0.3.0"
tokio-seqpacket = "0.7.0"
tokio-util = { version = "0.7.8", features = ["compat"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.20.0"
Expand Down
33 changes: 26 additions & 7 deletions conmon-rs/server/src/child_reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::{
container_io::{ContainerIO, ContainerIOType, SharedContainerIO},
oom_watcher::OOMWatcher,
};
use anyhow::{bail, format_err, Context, Result};
use anyhow::{bail, Context, Result};
use command_fds::{tokio::CommandFdAsyncExt, FdMapping};
use getset::{CopyGetters, Getters, Setters};
use libc::pid_t;
use multimap::MultiMap;
Expand All @@ -19,6 +20,7 @@ use nix::{
use std::{
ffi::OsStr,
fmt::Write,
os::fd::{AsRawFd, OwnedFd, RawFd},
path::{Path, PathBuf},
process::Stdio,
str,
Expand All @@ -41,11 +43,8 @@ pub struct ChildReaper {
grandchildren: Arc<Mutex<MultiMap<String, ReapableChild>>>,
}

macro_rules! lock {
($x:expr) => {
$x.lock().map_err(|e| format_err!("{:#}", e))?
};
}
/// first usable file descriptor after stdin, stdout and stderr
const FIRST_FD_AFTER_STDIO: RawFd = 3;

impl ChildReaper {
pub fn get(&self, id: &str) -> Result<ReapableChild> {
Expand All @@ -56,6 +55,7 @@ impl ChildReaper {
Ok(r)
}

#[allow(clippy::too_many_arguments)]
pub async fn create_child<P, I, S>(
&self,
cmd: P,
Expand All @@ -64,6 +64,7 @@ impl ChildReaper {
container_io: &mut ContainerIO,
pidfile: &Path,
env_vars: Vec<(String, String)>,
additional_fds: Vec<OwnedFd>,
) -> Result<(u32, CancellationToken)>
where
P: AsRef<OsStr>,
Expand All @@ -81,9 +82,22 @@ impl ChildReaper {
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(env_vars)
.fd_mappings(
additional_fds
.iter()
.enumerate()
.map(|(i, fd)| FdMapping {
parent_fd: fd.as_raw_fd(),
child_fd: i as RawFd + FIRST_FD_AFTER_STDIO,
})
.collect(),
)?
.spawn()
.context("spawn child process: {}")?;

// close file descriptors after spawn
drop(additional_fds);

let token = CancellationToken::new();

match container_io.typ_mut() {
Expand Down Expand Up @@ -134,7 +148,11 @@ impl ChildReaper {
Ok((grandchild_pid, token))
}

pub fn watch_grandchild(&self, child: Child) -> Result<Receiver<ExitChannelData>> {
pub fn watch_grandchild(
&self,
child: Child,
leak_fds: Vec<OwnedFd>,
) -> Result<Receiver<ExitChannelData>> {
let locked_grandchildren = &self.grandchildren().clone();
let mut map = lock!(locked_grandchildren);
let mut reapable_grandchild = ReapableChild::from_child(&child);
Expand All @@ -148,6 +166,7 @@ impl ChildReaper {
task::spawn(
async move {
exit_tx.subscribe().recv().await?;
drop(leak_fds);
Self::forget_grandchild(&cleanup_grandchildren, pid)
}
.instrument(debug_span!("watch_grandchild", pid)),
Expand Down
8 changes: 8 additions & 0 deletions conmon-rs/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl Default for Config {
// Sync with `pkg/client/client.go`
const SOCKET: &str = "conmon.sock";
const PIDFILE: &str = "pidfile";
const FD_SOCKET: &str = "conmon-fd.sock";

impl Config {
/// Validate the configuration integrity.
Expand Down Expand Up @@ -354,6 +355,10 @@ impl Config {
fs::remove_file(self.socket())?;
}

if self.fd_socket().exists() {
fs::remove_file(self.fd_socket())?;
}

Ok(())
}
pub fn socket(&self) -> PathBuf {
Expand All @@ -362,4 +367,7 @@ impl Config {
pub fn conmon_pidfile(&self) -> PathBuf {
self.runtime_dir().join(PIDFILE)
}
pub fn fd_socket(&self) -> PathBuf {
self.runtime_dir().join(FD_SOCKET)
}
}
Loading

0 comments on commit af3e813

Please sign in to comment.