Skip to content

Commit

Permalink
Pause (#791)
Browse files Browse the repository at this point in the history
#712

Add the `--pause` flag which pauses the target container while there are clients connected to the agent.
When the last client disconnects the container resumes.

Currently the pause behaviour is determined in the creation of the agent. So if clients from a different run want to reuse the same agent they will just get the existing pause behaviour of the agent.
  • Loading branch information
t4lz committed Nov 29, 2022
1 parent 27cb60b commit 0498565
Show file tree
Hide file tree
Showing 18 changed files with 383 additions and 102 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how

## [Unreleased]

### Added

- `--pause` feature (unstable). See [#712](https://github.com/metalbear-co/mirrord/issues/712).

### Changed

- CI: cancel previous runs of same PR.
Expand Down
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ categories = ["development-tools", "backend", "devtool"]

[workspace.dependencies]
actix-codec = "0.5"
async-trait = "0.1"
bytes = "1"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "net", "macros"] }
tokio-stream = "0.1"
Expand Down
7 changes: 7 additions & 0 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@
"null"
]
},
"pause": {
"description": "Controls target pause feature. Unstable.\n\nWith this feature enabled, the remote container is paused while clients are connected to the agent.",
"type": [
"boolean",
"null"
]
},
"startup_timeout": {
"description": "Controls how long to wait for the agent to finish initialization.\n\nIf initialization takes longer than this value, mirrord exits.",
"type": [
Expand Down
2 changes: 2 additions & 0 deletions mirrord/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ regex = "1"
socket2 = "0.4"
rawsocket = {git = "https://github.com/metalbear-co/rawsocket.git"}
wildmatch = "2"
enum_dispatch = "0.3.8"
async-trait.workspace = true

[dev-dependencies]
mockall = "0.11"
Expand Down
4 changes: 4 additions & 0 deletions mirrord/agent/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub struct Args {
/// Inform the agent to use `proc/1/root` as the root directory.
#[arg(short = 'e', long, default_value_t = false)]
pub ephemeral_container: bool,

/// Pause the target container while clients are connected.
#[arg(short = 'p', long, default_value_t = false)]
pub pause: bool,
}

const DEFAULT_RUNTIME: &str = "containerd";
Expand Down
6 changes: 6 additions & 0 deletions mirrord/agent/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ pub enum AgentError {

#[error("DNS response receive failed with `{0}`")]
DnsResponseReceiveError(#[from] tokio::sync::oneshot::error::RecvError),

#[error("Pause was set, but container id or runtime is missing.")]
MissingContainerInfo,

#[error("start_client -> Ran out of connections, dropping new connection")]
ConnectionLimitReached,
}

pub(crate) type Result<T, E = AgentError> = std::result::Result<T, E>;
165 changes: 111 additions & 54 deletions mirrord/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use tracing::{debug, error, info, trace};
use tracing_subscriber::{fmt::format::FmtSpan, prelude::*};

use crate::{
runtime::get_container_pid,
cli::Args,
runtime::{get_container, Container, ContainerRuntime},
steal::steal_worker,
util::{run_thread, ClientID, IndexAllocator},
};
Expand All @@ -51,27 +52,87 @@ mod util;

const CHANNEL_SIZE: usize = 1024;

/// Keeps track of connected clients.
/// If pausing target, also pauses and unpauses when number of clients changes from or to 0.
#[derive(Debug)]
struct State {
pub clients: HashSet<ClientID>,
clients: HashSet<ClientID>,
index_allocator: IndexAllocator<ClientID>,
/// Was the pause argument passed? If true, will pause the container when no clients are
/// connected.
should_pause: bool,
/// This is an option because it is acceptable not to pass a container runtime and id if not
/// pausing. When those args are not passed, container is None.
container: Option<Container>,
}

impl State {
pub fn new() -> State {
State {
/// Returns Err if container runtime operations failed or if the `pause` arg was passed, but
/// the container info (runtime and id) was not.
pub async fn new(args: &Args) -> Result<State> {
let container =
get_container(args.container_id.as_ref(), args.container_runtime.as_ref()).await?;
if container.is_none() && args.pause {
return Err(AgentError::MissingContainerInfo);
}
Ok(State {
clients: HashSet::new(),
index_allocator: IndexAllocator::new(),
should_pause: args.pause,
container,
})
}

/// Get the external pid of the target container, if container info available.
pub async fn get_container_pid(&self) -> Result<Option<u64>> {
if self.container.is_some() {
let container = self.container.as_ref().unwrap();
let pid = container.get_pid().await?;
Ok(Some(pid))
} else {
Ok(None)
}
}

/// If there are clientIDs left, insert new one and return it.
/// If there were no clients before, and there is a Pauser, start pausing.
/// Propagate container runtime errors.
pub async fn new_client(&mut self) -> Result<ClientID> {
match self.generate_id() {
None => Err(AgentError::ConnectionLimitReached),
Some(new_id) => {
self.clients.insert(new_id.to_owned());
if self.clients.len() == 1 {
// First client after no clients.
if self.should_pause {
self.container.as_ref().unwrap().pause().await?;
}
}
Ok(new_id)
}
}
}

pub fn generate_id(&mut self) -> Option<ClientID> {
fn generate_id(&mut self) -> Option<ClientID> {
self.index_allocator.next_index()
}

pub fn remove_client(&mut self, client_id: ClientID) {
/// If that was the last client and we are pausing, stop pausing.
/// Propagate container runtime errors.
pub async fn remove_client(&mut self, client_id: ClientID) -> Result<()> {
self.clients.remove(&client_id);
self.index_allocator.free_index(client_id)
self.index_allocator.free_index(client_id);
if self.clients.is_empty() {
// resume container (stop stopping).
if self.should_pause {
self.container.as_ref().unwrap().unpause().await?;
}
}
Ok(())
}

pub fn no_clients_left(&self) -> bool {
self.clients.is_empty()
}
}

Expand Down Expand Up @@ -287,14 +348,8 @@ async fn start_agent() -> Result<()> {
))
.await?;

let pid = match (args.container_id, args.container_runtime) {
(Some(container_id), Some(container_runtime)) => {
Some(get_container_pid(&container_id, &container_runtime).await?)
}
_ => None,
};

let mut state = State::new();
let mut state = State::new(&args).await?;
let pid = state.get_container_pid().await?;
let cancellation_token = CancellationToken::new();
// Cancel all other tasks on exit
let cancel_guard = cancellation_token.clone().drop_guard();
Expand All @@ -319,52 +374,54 @@ async fn start_agent() -> Result<()> {
Ok((stream, addr)) = listener.accept() => {
trace!("start -> Connection accepted from {:?}", addr);

if let Some(client_id) = state.generate_id() {

state.clients.insert(client_id);
let sniffer_command_tx = sniffer_command_tx.clone();
let cancellation_token = cancellation_token.clone();
let dns_sender = dns_sender.clone();
let client = tokio::spawn(async move {
match ClientConnectionHandler::new(
client_id,
stream,
pid,
args.ephemeral_container,
sniffer_command_tx,
dns_sender,
)
.and_then(|client| client.start(cancellation_token))
.await
{
Ok(_) => {
trace!(
"ClientConnectionHandler::start -> Client {} disconnected",
client_id
);
}
Err(e) => {
error!(
"ClientConnectionHandler::start -> Client {} disconnected with error: {}",
client_id, e
);
match state.new_client().await {
Ok(client_id) => {
let sniffer_command_tx = sniffer_command_tx.clone();
let cancellation_token = cancellation_token.clone();
let dns_sender = dns_sender.clone();
let client = tokio::spawn(async move {
match ClientConnectionHandler::new(
client_id,
stream,
pid,
args.ephemeral_container,
sniffer_command_tx,
dns_sender,
)
.and_then(|client| client.start(cancellation_token))
.await
{
Ok(_) => {
trace!(
"ClientConnectionHandler::start -> Client {} disconnected",
client_id
);
}
Err(e) => {
error!(
"ClientConnectionHandler::start -> Client {} disconnected with error: {}",
client_id, e
);
}
}
}
client_id

});
clients.push(client);
} else {
error!("start_client -> Ran out of connections, dropping new connection");
client_id

});
clients.push(client);
},
Err(AgentError::ConnectionLimitReached) => {
error!("start_client -> Ran out of connections, dropping new connection");
},
// Propagate all errors that are not ConnectionLimitReached.
err => { err?; },
}

},
Some(client) = clients.next() => {
let client_id = client?;
state.remove_client(client_id);
state.remove_client(client_id).await?;
},
_ = tokio::time::sleep(std::time::Duration::from_secs(args.communication_timeout.into())) => {
if state.clients.is_empty() {
if state.no_clients_left() {
trace!("Main thread timeout, no clients connected.");
break;
}
Expand Down
Loading

0 comments on commit 0498565

Please sign in to comment.