Skip to content

Commit

Permalink
feat(deployer): new tool to test the control plane
Browse files Browse the repository at this point in the history
Deploys all (configurable) control plane components in a local docker
cluster.
For more details please use the help argument and have a look at the
 README.md.
  • Loading branch information
tiagolobocastro committed Jan 28, 2021
1 parent 211cc12 commit 1767074
Show file tree
Hide file tree
Showing 16 changed files with 1,076 additions and 27 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 116 additions & 15 deletions composer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ pub struct ContainerSpec {
/// Key-Map of environment variables
/// Starts with RUST_LOG=debug,h2=info
env: HashMap<String, String>,
/// Volume bind dst/source
binds: HashMap<String, String>,
}

impl ContainerSpec {
Expand Down Expand Up @@ -259,6 +261,20 @@ impl ContainerSpec {
}
self
}
/// use a volume binds between host path and container container
pub fn with_bind(mut self, host: &str, container: &str) -> Self {
self.binds.insert(container.to_string(), host.to_string());
self
}

/// List of volume binds with each element as host:container
fn binds(&self) -> Vec<String> {
let mut vec = vec![];
self.binds.iter().for_each(|(container, host)| {
vec.push(format!("{}:{}", host, container));
});
vec
}

/// Environment variables as a vector with each element as:
/// "{key}={value}"
Expand Down Expand Up @@ -312,6 +328,14 @@ impl Default for Builder {
}
}

/// trait to allow extensibility using the Builder pattern
pub trait BuilderConfigure {
fn configure(
&self,
cfg: Builder,
) -> Result<Builder, Box<dyn std::error::Error>>;
}

impl Builder {
/// construct a new builder for `[ComposeTest']
pub fn new() -> Self {
Expand All @@ -327,6 +351,41 @@ impl Builder {
}
}

/// get the name of the experiment
pub fn get_name(&self) -> String {
self.name.clone()
}

/// configure the `Builder` using the `BuilderConfigure` trait
pub fn configure(
self,
cfg: impl BuilderConfigure,
) -> Result<Builder, Box<dyn std::error::Error>> {
cfg.configure(self)
}

/// next ordinal container ip
pub fn next_container_ip(&self) -> Result<String, Error> {
let net: Ipv4Network = self.network.parse().map_err(|error| {
bollard::errors::Error::IOError {
err: std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Invalid network format: {}", error),
),
}
})?;
let ip = net.nth((self.containers.len() + 2) as u32);
match ip {
None => Err(bollard::errors::Error::IOError {
err: std::io::Error::new(
std::io::ErrorKind::AddrNotAvailable,
"No available ip",
),
}),
Some(ip) => Ok(ip.to_string()),
}
}

/// run all containers on build
pub fn autorun(mut self, run: bool) -> Builder {
self.autorun = run;
Expand Down Expand Up @@ -512,7 +571,8 @@ pub struct ComposeTest {
label_prefix: String,
/// automatically clean up the things we have created for this test
clean: bool,
pub prune: bool,
/// remove existing containers upon creation
prune: bool,
/// base image for image-less containers
image: Option<String>,
/// output container logs on panic
Expand Down Expand Up @@ -557,14 +617,15 @@ impl ComposeTest {
/// networking IP and/or subnets
async fn network_create(&mut self) -> Result<NetworkId, Error> {
let mut net = self.network_list_labeled().await?;

if !net.is_empty() {
let first = net.pop().unwrap();
if Some(self.name.clone()) == first.name {
// reuse the same network
self.network_id = first.id.unwrap();
// but clean up the existing containers
self.remove_network_containers(&self.name).await?;
if self.prune {
// but clean up the existing containers
self.remove_network_containers(&self.name).await?;
}
return Ok(self.network_id.clone());
} else {
self.network_remove_labeled().await?;
Expand Down Expand Up @@ -607,7 +668,10 @@ impl ComposeTest {
}

/// remove all containers from the network
async fn remove_network_containers(&self, name: &str) -> Result<(), Error> {
pub async fn remove_network_containers(
&self,
name: &str,
) -> Result<(), Error> {
let containers = self.list_network_containers(name).await?;
for k in &containers {
let name = k.id.clone().unwrap();
Expand Down Expand Up @@ -741,13 +805,14 @@ impl ComposeTest {
)
.await;
}

let mut binds = vec![
format!("{}:{}", self.srcdir, self.srcdir),
"/nix:/nix:ro".into(),
"/dev/hugepages:/dev/hugepages:rw".into(),
];
binds.extend(spec.binds());
let host_config = HostConfig {
binds: Some(vec![
format!("{}:{}", self.srcdir, self.srcdir),
"/nix:/nix:ro".into(),
"/dev/hugepages:/dev/hugepages:rw".into(),
]),
binds: Some(binds),
mounts: Some(vec![
// DPDK needs to have a /tmp
Mount {
Expand Down Expand Up @@ -855,8 +920,13 @@ impl ComposeTest {
/// Pulls the docker image, if one is specified and is not present locally
async fn pull_missing_image(&self, image: &Option<String>) {
if let Some(image) = image {
if !self.image_exists(image).await {
self.pull_image(image).await;
let image = if !image.contains(':') {
format!("{}:latest", image)
} else {
image.clone()
};
if !self.image_exists(&image).await {
self.pull_image(&image).await;
}
}
}
Expand Down Expand Up @@ -893,7 +963,17 @@ impl ComposeTest {

/// start the container
pub async fn start(&self, name: &str) -> Result<(), Error> {
let id = self.containers.get(name).unwrap();
let id = self.containers.get(name).ok_or(
bollard::errors::Error::IOError {
err: std::io::Error::new(
std::io::ErrorKind::NotFound,
format!(
"Can't start container {} as it was not configured",
name
),
),
},
)?;
self.docker
.start_container::<&str>(id.0.as_str(), None)
.await?;
Expand All @@ -904,10 +984,15 @@ impl ComposeTest {
/// stop the container
pub async fn stop(&self, name: &str) -> Result<(), Error> {
let id = self.containers.get(name).unwrap();
self.stop_id(id.0.as_str()).await
}

/// stop the container by its id
pub async fn stop_id(&self, id: &str) -> Result<(), Error> {
if let Err(e) = self
.docker
.stop_container(
id.0.as_str(),
id,
Some(StopContainerOptions {
t: 3,
}),
Expand Down Expand Up @@ -1014,6 +1099,22 @@ impl ComposeTest {
Ok(())
}

/// stop all the containers part of the network
/// returns the last error, if any or Ok
pub async fn stop_network_containers(&self) -> Result<(), Error> {
let mut result = Ok(());
let containers = self.list_network_containers(&self.name).await?;
for container in containers {
if let Some(id) = container.id {
if let Err(e) = self.stop_id(&id).await {
println!("Failed to stop container id {:?}", id);
result = Err(e);
}
}
}
result
}

/// inspect the given container
pub async fn inspect(
&self,
Expand Down
2 changes: 1 addition & 1 deletion nix/pkgs/mayastor/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ let
buildProps = rec {
name = "mayastor";
#cargoSha256 = "0000000000000000000000000000000000000000000000000000";
cargoSha256 = "001a92rjffm1jc6pffmq3ci4a7ac3wxz6sbmrps67ir3chh2lv4g";
cargoSha256 = "0jxi2z78kc0knr3bscyk622rg7b5ynjiw205xl6g4v8saychxpbd";
inherit version;
src = whitelistSource ../../../. [
"Cargo.lock"
Expand Down
10 changes: 4 additions & 6 deletions operators/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use tracing::{debug, error, info, instrument};

#[derive(Debug, StructOpt)]
struct CliArgs {
/// The Rest Server hostname to connect to
/// Default: localhost:8080
#[structopt(long, short, default_value = "localhost:8080")]
/// The Rest Server URL to connect to
#[structopt(long, short, default_value = "https://localhost:8080")]
rest: String,

/// Polling period
Expand Down Expand Up @@ -85,9 +84,8 @@ async fn main() -> anyhow::Result<()> {

let polling_period = CliArgs::from_args().period.into();

let rest_url = format!("https://{}", CliArgs::from_args().rest);
let rest_cli = rest_client::ActixRestClient::new(
&rest_url,
&CliArgs::from_args().rest,
CliArgs::from_args().jaeger.is_some(),
)?;

Expand Down Expand Up @@ -122,7 +120,7 @@ async fn polling_work(
) -> anyhow::Result<()> {
// Fetch all nodes as seen by the control plane via REST
let rest_nodes = rest_cli.get_nodes().await?;
println!("Retrieved rest nodes: {:?}", rest_nodes);
debug!("Retrieved rest nodes: {:?}", rest_nodes);

// Fetch all node CRD's from k8s
let kube_nodes = nodes_get_all(&nodes_api).await?;
Expand Down
24 changes: 23 additions & 1 deletion rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,21 @@ pub struct ActixRestClient {

impl ActixRestClient {
/// creates a new client which uses the specified `url`
/// uses the rustls connector if the url has the https scheme
pub fn new(url: &str, trace: bool) -> anyhow::Result<Self> {
let url: url::Url = url.parse()?;

match url.scheme() {
"https" => Self::new_https(&url, trace),
"http" => Self::new_http(&url, trace),
invalid => {
let msg = format!("Invalid url scheme: {}", invalid);
Err(anyhow::Error::msg(msg))
}
}
}
/// creates a new secure client
fn new_https(url: &url::Url, trace: bool) -> anyhow::Result<Self> {
let cert_file = &mut BufReader::new(
&std::include_bytes!("../certs/rsa/ca.cert")[..],
);
Expand All @@ -46,7 +60,15 @@ impl ActixRestClient {

Ok(Self {
client: rest_client,
url: url.to_string(),
url: url.to_string().trim_end_matches('/').into(),
trace,
})
}
/// creates a new client
fn new_http(url: &url::Url, trace: bool) -> anyhow::Result<Self> {
Ok(Self {
client: Client::new(),
url: url.to_string().trim_end_matches('/').into(),
trace,
})
}
Expand Down
2 changes: 1 addition & 1 deletion rest/tests/v0_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async fn client() {
.autorun(false)
// uncomment to leave containers running allowing us access the jaeger
// traces at localhost:16686
//.with_clean(false)
.with_clean(false)
.build()
.await
.unwrap();
Expand Down
11 changes: 8 additions & 3 deletions services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ path = "pool/src/server.rs"
name = "volume"
path = "volume/src/server.rs"

[[bin]]
name = "deployer"
path = "deployer/src/bin.rs"

[lib]
name = "common"
path = "common/src/lib.rs"

[dependencies]
mbus_api = { path = "../mbus-api" }
composer = { path = "../composer" }
nats = "0.8"
structopt = "0.3.15"
tokio = { version = "0.2", features = ["full"] }
Expand All @@ -45,9 +50,9 @@ tracing-futures = "0.2.4"
rpc = { path = "../rpc" }
url = "2.2.0"
http = "0.2.1"

[dev-dependencies]
composer = { path = "../composer" }
strum = "0.19"
strum_macros = "0.19"
paste = "1.0.4"

[dependencies.serde]
features = ["derive"]
Expand Down
Loading

0 comments on commit 1767074

Please sign in to comment.