Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add automatic shutdown after inactivity #1590

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions conmon-rs/server/src/child_reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::{
child::Child,
container_io::{ContainerIO, ContainerIOType, SharedContainerIO},
inactivity::{Activity, Inactivity},
oom_watcher::OOMWatcher,
};
use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -37,16 +38,26 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, warn, Instrument};

#[derive(Debug, Default, Getters)]
#[derive(Debug, Getters)]
pub struct ChildReaper {
#[getset(get)]
grandchildren: Arc<Mutex<MultiMap<String, ReapableChild>>>,

#[getset(get)]
inactivity: Inactivity,
}

/// first usable file descriptor after stdin, stdout and stderr
const FIRST_FD_AFTER_STDIO: RawFd = 3;

impl ChildReaper {
pub fn new(inactivity: Inactivity) -> ChildReaper {
Self {
grandchildren: Arc::default(),
inactivity,
}
}

pub fn get(&self, id: &str) -> Result<ReapableChild> {
let locked_grandchildren = &self.grandchildren().clone();
let lock = lock!(locked_grandchildren);
Expand Down Expand Up @@ -155,7 +166,7 @@ impl ChildReaper {
) -> Result<Receiver<ExitChannelData>> {
let locked_grandchildren = &self.grandchildren().clone();
let mut map = lock!(locked_grandchildren);
let mut reapable_grandchild = ReapableChild::from_child(&child);
let mut reapable_grandchild = ReapableChild::from_child(&child, self.inactivity.activity());

let (exit_tx, exit_rx) = reapable_grandchild.watch()?;

Expand Down Expand Up @@ -252,6 +263,8 @@ pub struct ReapableChild {

#[getset(get = "pub")]
cleanup_cmd: Vec<String>,

activity: Activity,
}

#[derive(Clone, CopyGetters, Debug, Getters, Setters)]
Expand All @@ -267,7 +280,7 @@ pub struct ExitChannelData {
}

impl ReapableChild {
pub fn from_child(child: &Child) -> Self {
pub fn from_child(child: &Child, activity: Activity) -> Self {
Self {
exit_paths: child.exit_paths().clone(),
oom_exit_paths: child.oom_exit_paths().clone(),
Expand All @@ -277,6 +290,7 @@ impl ReapableChild {
token: child.token().clone(),
task: None,
cleanup_cmd: child.cleanup_cmd().to_vec(),
activity,
}
}

Expand Down Expand Up @@ -307,6 +321,8 @@ impl ReapableChild {
let stop_token = self.token().clone();
let cleanup_cmd_raw = self.cleanup_cmd().clone();

let activity = self.activity.clone();

let task = task::spawn(
async move {
debug!("Running task");
Expand Down Expand Up @@ -360,14 +376,16 @@ impl ReapableChild {
}

if !cleanup_cmd_raw.is_empty() {
Self::spawn_cleanup_process(&cleanup_cmd_raw).await;
Self::spawn_cleanup_process(&cleanup_cmd_raw, activity.clone()).await;
}

debug!("Sending exit struct to channel: {:?}", exit_channel_data);
if exit_tx_clone.send(exit_channel_data).is_err() {
debug!("Unable to send exit status");
}
debug!("Task done");

activity.stop();
}
.instrument(debug_span!("watch", pid)),
);
Expand All @@ -382,7 +400,7 @@ impl ReapableChild {
Ok((exit_tx, exit_rx))
}

async fn spawn_cleanup_process(raw_cmd: &[String]) {
async fn spawn_cleanup_process(raw_cmd: &[String], activity: Activity) {
let mut cleanup_cmd = Command::new(&raw_cmd[0]);

cleanup_cmd.args(&raw_cmd[1..]);
Expand All @@ -399,6 +417,7 @@ impl ReapableChild {
e
),
}
activity.stop();
});
}

Expand Down
12 changes: 11 additions & 1 deletion conmon-rs/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ macro_rules! prefix {
};
}

#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)]
#[derive(CopyGetters, Debug, Deserialize, Getters, Parser, PartialEq, Serialize, Setters)]
#[serde(rename_all = "kebab-case")]
#[command(
after_help("More info at: https://github.com/containers/conmon-rs"),
Expand Down Expand Up @@ -125,6 +125,16 @@ pub struct Config {
)]
/// OpenTelemetry GRPC endpoint to be used for tracing.
tracing_endpoint: String,

#[get_copy = "pub"]
#[arg(
default_value_t,
env(concat!(prefix!(), "SHUTDOWN_DELAY")),
long("shutdown-delay"),
value_name("DELAY"),
)]
/// Automatically stop conmon-rs after DELAY seconds of inactivity. (0 = disabled)
shutdown_delay: f64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make this something like Option<humantime::Duration>

Copy link
Contributor Author

@mgjm mgjm Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... the string representation of a go time.Duration is not compatible with the humantime::Duration parser. But we could write a custom "to string" method in go. Do we expect the conmon server to be invoked by humans or only by the go client?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should only support go client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And even for humans it is still relatively ergonomic to specify the duration in (fractional) seconds.
1.5 vs 1s 500ms
Especially considering that I expect the most common durations to be in the range of a few full seconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is specifying milliseconds really a use case? I'm also fine with using seconds only.

}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Subcommand)]
Expand Down
34 changes: 30 additions & 4 deletions conmon-rs/server/src/fd_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
//!
//! An empty request does **not** receive a response.

use crate::listener::{Listener, SeqpacketListener};
use crate::{
inactivity::Inactivity,
listener::{Listener, SeqpacketListener},
};
use anyhow::Result;
use std::{
collections::{hash_map, HashMap},
Expand All @@ -46,10 +49,11 @@ use std::{
os::fd::OwnedFd,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{runtime::Handle, sync::Mutex as AsyncMutex, task};
use tokio_seqpacket::{ancillary::OwnedAncillaryMessage, UnixSeqpacket};
use tracing::{debug_span, Instrument};
use tracing::{debug, debug_span, error, Instrument};

#[derive(Debug, Default)]
pub struct FdSocket {
Expand Down Expand Up @@ -170,12 +174,34 @@ impl Server {
let mut listener = Listener::<SeqpacketListener>::default().bind_long_path(&server.path)?;
let guard = ListenerGuard(fd_socket);

let inactivity = Inactivity::new();
let timeout = Duration::from_secs(3);

task::spawn(
async move {
while let Ok(conn) = listener.accept().await {
loop {
let conn = tokio::select! {
() = inactivity.wait(timeout) => {
debug!("Stop fd socket after inactivity");
break;
}
conn = listener.accept() => match conn {
Ok(conn) => conn,
Err(err) => {
error!("Unable to accept on fd socket: {err}");
break;
}
},
};
let fd_socket = guard.0.clone();
let activity = inactivity.activity();
task::spawn(
Self::serve(conn, fd_socket).instrument(debug_span!("fd_socket_serve")),
async move {
let result = Self::serve(conn, fd_socket).await;
activity.stop();
result
}
.instrument(debug_span!("fd_socket_serve")),
);
}
drop(guard);
Expand Down
128 changes: 128 additions & 0 deletions conmon-rs/server/src/inactivity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::{
future, process,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use tokio::sync::{futures::Notified, Notify};

/// Track activity and reacto to inactivity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Track activity and reacto to inactivity.
/// Track activity and react to inactivity.

///
/// Can be used to exit accept loops after inactivity.
#[derive(Debug, Clone)]
pub struct Inactivity(Option<Arc<Inner>>);

impl Inactivity {
/// Create a new inactivity tracker.
pub fn new() -> Self {
Self(Some(Arc::default()))
}

/// Create a disabled inactivity tracker.
///
/// The wait function will never return. There is always activity.
pub const fn disabled() -> Self {
Self(None)
}

/// Start tracking an activity.
pub fn activity(&self) -> Activity {
Activity::new(self.0.as_ref())
}

/// Async "block" until there is no activity and then wait for an additional`timeout`.
pub async fn wait(&self, timeout: Duration) {
if let Some(inner) = &self.0 {
loop {
let changed = inner.changed();
if inner.no_activity() {
let _ = tokio::time::timeout(timeout, changed).await;
if inner.no_activity() {
break;
}
} else {
changed.await
}
}
} else {
future::pending().await
}
}
}

/// Track an activity. Can be cloned to track additional activities.
///
/// The Activity stops on drop.
#[derive(Debug)]
pub struct Activity(Option<Arc<Inner>>);

impl Activity {
fn new(inner: Option<&Arc<Inner>>) -> Self {
Self(inner.map(Inner::increment))
}

/// Explicitly stop an activity. Just a wrapper for `drop(activity)`.
pub fn stop(self) {
// nothing to to, we take self by value
}
}

impl Clone for Activity {
fn clone(&self) -> Self {
Self::new(self.0.as_ref())
}
}

impl Drop for Activity {
fn drop(&mut self) {
if let Some(inner) = &self.0 {
inner.decrement();
}
}
}

#[derive(Debug, Default)]
struct Inner {
// number of current activities
active: AtomicUsize,
// gets notofied whenever the result of `no_activity` changes
notify: Notify,
}

impl Inner {
/// Abort if more then isize::MAX activities are active.
///
/// This prevents integer overflow of the `active` counter in case
/// someone is `mem::forget`ing activities.
///
/// The same logic is applied internally by the `Arc` implementation.
const MAX_ACTIVE: usize = isize::MAX as usize;

fn increment(self: &Arc<Self>) -> Arc<Self> {
match self.active.fetch_add(1, Ordering::Relaxed) {
0 => self.notify.notify_waiters(),
1..=Self::MAX_ACTIVE => {}
_ => process::abort(),
}
self.clone()
}

fn decrement(&self) {
match self.active.fetch_sub(1, Ordering::Relaxed) {
1 => self.notify.notify_waiters(),
2..=Self::MAX_ACTIVE => {}
_ => process::abort(),
}
}

fn no_activity(&self) -> bool {
self.active.load(Ordering::Relaxed) == 0
}

fn changed(&self) -> Notified {
self.notify.notified()
}
}
1 change: 1 addition & 0 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod container_io;
mod container_log;
mod cri_logger;
mod fd_socket;
mod inactivity;
mod init;
mod journal;
mod listener;
Expand Down
Loading