From f1546a97ea4f3f9e31f29d750a0960cc84e77046 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Thu, 11 Feb 2021 18:20:04 +0000 Subject: [PATCH] feat(rest-test): new rest test project These are the initial bits of a new test project which leverages the new control plane to test mayastor. It has already revealed a few issues. --- Cargo.lock | 17 ++ Cargo.toml | 3 +- composer/src/lib.rs | 130 ++++++-- control-plane/deployer/bin/src/deployer.rs | 2 +- control-plane/deployer/src/infra/mod.rs | 40 ++- control-plane/deployer/src/lib.rs | 6 +- control-plane/mbus-api/Cargo.toml | 1 + control-plane/mbus-api/src/mbus_nats.rs | 11 +- control-plane/mbus-api/src/v0.rs | 17 +- control-plane/rest/src/lib.rs | 64 ++-- control-plane/tests/Cargo.toml | 22 ++ control-plane/tests/tests/common/mod.rs | 329 +++++++++++++++++++++ control-plane/tests/tests/nexus.rs | 25 ++ control-plane/tests/tests/pools.rs | 218 ++++++++++++++ control-plane/tests/tests/replicas.rs | 131 ++++++++ nix/pkgs/control-plane/cargo-project.nix | 2 +- nix/pkgs/mayastor/default.nix | 2 +- scripts/cargo-test.sh | 6 +- scripts/ctrlp-cargo-test.sh | 22 ++ 19 files changed, 982 insertions(+), 66 deletions(-) create mode 100644 control-plane/tests/Cargo.toml create mode 100644 control-plane/tests/tests/common/mod.rs create mode 100644 control-plane/tests/tests/nexus.rs create mode 100644 control-plane/tests/tests/pools.rs create mode 100644 control-plane/tests/tests/replicas.rs create mode 100755 scripts/ctrlp-cargo-test.sh diff --git a/Cargo.lock b/Cargo.lock index 3c9391f47..c2e2772d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1288,6 +1288,22 @@ dependencies = [ "sct", ] +[[package]] +name = "ctrlp-tests" +version = "0.1.0" +dependencies = [ + "actix-rt", + "actix-web-opentelemetry", + "anyhow", + "composer", + "deployer", + "opentelemetry", + "opentelemetry-jaeger", + "rest", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "curve25519-dalek" version = "3.0.0" @@ -2668,6 +2684,7 @@ dependencies = [ "tracing", "tracing-futures", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5a901b448..06bdc6388 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,5 +20,6 @@ members = [ "control-plane/rest", "control-plane/operators", "control-plane/macros", - "control-plane/deployer" + "control-plane/deployer", + "control-plane/tests" ] diff --git a/composer/src/lib.rs b/composer/src/lib.rs index 9f1e65e03..b25912103 100644 --- a/composer/src/lib.rs +++ b/composer/src/lib.rs @@ -113,11 +113,11 @@ impl Binary { let path = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")); let srcdir = path.parent().unwrap().to_string_lossy(); - Self::new(format!("{}/target/debug/{}", srcdir, name), vec![]) + Self::new(&format!("{}/target/debug/{}", srcdir, name), vec![]) } /// Setup nix shell binary from path and arguments pub fn from_nix(name: &str) -> Self { - Self::new(Self::which(name).expect("binary should exist"), vec![]) + Self::new(name, vec![]) } /// Add single argument /// Only one argument can be passed per use. So instead of: @@ -171,11 +171,17 @@ impl Binary { } fn which(name: &str) -> std::io::Result { let output = std::process::Command::new("which").arg(name).output()?; + if !output.status.success() { + return Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + name, + )); + } Ok(String::from_utf8_lossy(&output.stdout).trim().into()) } - fn new(path: String, args: Vec) -> Self { + fn new(path: &str, args: Vec) -> Self { Self { - path, + path: Self::which(path).expect("Binary path should exist!"), arguments: args, ..Default::default() } @@ -310,6 +316,10 @@ pub struct Builder { containers: Vec, /// the network for the tests used network: String, + /// reuse existing containers + reuse: bool, + /// allow cleaning up on a panic (if clean is true) + allow_clean_on_panic: bool, /// delete the container and network when dropped clean: bool, /// destroy existing containers if any @@ -343,6 +353,8 @@ impl Builder { name: TEST_NET_NAME.to_string(), containers: Default::default(), network: "10.1.0.0/16".to_string(), + reuse: false, + allow_clean_on_panic: true, clean: true, prune: true, autorun: true, @@ -420,8 +432,7 @@ impl Builder { } /// add a generic container which runs a local binary - pub fn add_container_bin(self, name: &str, mut bin: Binary) -> Builder { - bin.setup_nats(&self.name); + pub fn add_container_bin(self, name: &str, bin: Binary) -> Builder { self.add_container_spec(ContainerSpec::from_binary(name, bin)) } @@ -430,12 +441,25 @@ impl Builder { self.add_container_spec(ContainerSpec::from_binary(name, image)) } + /// attempt to reuse and restart containers instead of starting new ones + pub fn with_reuse(mut self, reuse: bool) -> Builder { + self.reuse = reuse; + self.prune = !reuse; + self + } + /// clean on drop? pub fn with_clean(mut self, enable: bool) -> Builder { self.clean = enable; self } + /// allow clean on panic if clean is set + pub fn with_clean_on_panic(mut self, enable: bool) -> Builder { + self.allow_clean_on_panic = enable; + self + } + /// prune containers and networks on start pub fn with_prune(mut self, enable: bool) -> Builder { self.prune = enable; @@ -463,14 +487,16 @@ impl Builder { } /// setup tracing for the cargo test code with `filter` + /// ignore when called multiple times pub fn with_tracing(self, filter: &str) -> Self { - if let Ok(filter) = + let builder = if let Ok(filter) = tracing_subscriber::EnvFilter::try_from_default_env() { - tracing_subscriber::fmt().with_env_filter(filter).init(); + tracing_subscriber::fmt().with_env_filter(filter) } else { - tracing_subscriber::fmt().with_env_filter(filter).init(); - } + tracing_subscriber::fmt().with_env_filter(filter) + }; + builder.try_init().ok(); self } @@ -517,6 +543,8 @@ impl Builder { containers: Default::default(), ipam, label_prefix: "io.mayastor.test".to_string(), + reuse: self.reuse, + allow_clean_on_panic: self.allow_clean_on_panic, clean: self.clean, prune: self.prune, image: self.image, @@ -526,14 +554,37 @@ impl Builder { compose.network_id = compose.network_create().await.map_err(|e| e.to_string())?; - // containers are created where the IPs are ordinal - for (i, spec) in self.containers.iter().enumerate() { - compose - .create_container( - spec, - &net.nth((i + 2) as u32).unwrap().to_string(), - ) - .await?; + if self.reuse { + let containers = + compose.list_network_containers(&self.name).await?; + + for container in containers { + let networks = container + .network_settings + .unwrap_or_default() + .networks + .unwrap_or_default(); + if let Some(n) = container.names.unwrap_or_default().first() { + if let Some(endpoint) = networks.get(&self.name) { + if let Some(ip) = endpoint.ip_address.clone() { + compose.containers.insert( + n[1 ..].into(), + (container.id.unwrap_or_default(), ip.parse()?), + ); + } + } + } + } + } else { + // containers are created where the IPs are ordinal + for (i, spec) in self.containers.iter().enumerate() { + compose + .create_container( + spec, + &net.nth((i + 2) as u32).unwrap().to_string(), + ) + .await?; + } } Ok(compose) @@ -569,6 +620,10 @@ pub struct ComposeTest { /// prefix for labels set on containers and networks /// $prefix.name = $name will be created automatically label_prefix: String, + /// reuse existing containers + reuse: bool, + /// allow cleaning up on a panic (if clean is set) + allow_clean_on_panic: bool, /// automatically clean up the things we have created for this test clean: bool, /// remove existing containers upon creation @@ -591,10 +646,10 @@ impl Drop for ComposeTest { }); } - if self.clean { + if self.clean && (!thread::panicking() || self.allow_clean_on_panic) { self.containers.keys().for_each(|c| { std::process::Command::new("docker") - .args(&["stop", c]) + .args(&["kill", c]) .output() .unwrap(); std::process::Command::new("docker") @@ -723,7 +778,7 @@ impl ComposeTest { } /// list containers - pub async fn list_containers( + pub async fn list_cluster_containers( &self, ) -> Result, Error> { self.docker @@ -974,9 +1029,11 @@ impl ComposeTest { ), }, )?; - self.docker - .start_container::<&str>(id.0.as_str(), None) - .await?; + if !self.reuse { + self.docker + .start_container::<&str>(id.0.as_str(), None) + .await?; + } Ok(()) } @@ -1010,11 +1067,16 @@ impl ComposeTest { /// restart the container pub async fn restart(&self, name: &str) -> Result<(), Error> { - let id = self.containers.get(name).unwrap(); + let (id, _) = self.containers.get(name).unwrap(); + self.restart_id(id.as_str()).await + } + + /// restart the container id + pub async fn restart_id(&self, id: &str) -> Result<(), Error> { if let Err(e) = self .docker .restart_container( - id.0.as_str(), + id, Some(RestartContainerOptions { t: 3, }), @@ -1115,6 +1177,22 @@ impl ComposeTest { result } + /// restart all the containers part of the network + /// returns the last error, if any or Ok + pub async fn restart_network_containers(&self) -> Result<(), Error> { + let mut result = Ok(()); + let containers = self.list_network_containers(&self.name).await?; + for container in containers { + if let Some(id) = container.id { + if let Err(e) = self.restart_id(&id).await { + println!("Failed to restart container id {:?}", id); + result = Err(e); + } + } + } + result + } + /// inspect the given container pub async fn inspect( &self, diff --git a/control-plane/deployer/bin/src/deployer.rs b/control-plane/deployer/bin/src/deployer.rs index 052e83960..90841692b 100644 --- a/control-plane/deployer/bin/src/deployer.rs +++ b/control-plane/deployer/bin/src/deployer.rs @@ -6,5 +6,5 @@ async fn main() -> Result<(), Error> { let cli_args = CliArgs::from_args(); println!("Using options: {:?}", &cli_args); - cli_args.act().await + cli_args.execute().await } diff --git a/control-plane/deployer/src/infra/mod.rs b/control-plane/deployer/src/infra/mod.rs index ca5f34203..1d7a3f85b 100644 --- a/control-plane/deployer/src/infra/mod.rs +++ b/control-plane/deployer/src/infra/mod.rs @@ -235,6 +235,20 @@ pub fn build_error(name: &str, status: Option) -> Result<(), Error> { } impl Components { + pub async fn start_wait( + &self, + cfg: &ComposeTest, + timeout: std::time::Duration, + ) -> Result<(), Error> { + match tokio::time::timeout(timeout, self.start_wait_inner(cfg)).await { + Ok(result) => result, + Err(_) => { + let error = format!("Time out of {:?} expired", timeout); + Err(std::io::Error::new(std::io::ErrorKind::TimedOut, error) + .into()) + } + } + } pub async fn start(&self, cfg: &ComposeTest) -> Result<(), Error> { let mut last_done = None; for component in &self.0 { @@ -254,6 +268,30 @@ impl Components { } Ok(()) } + async fn start_wait_inner(&self, cfg: &ComposeTest) -> Result<(), Error> { + let mut last_done = None; + for component in &self.0 { + if let Some(last_done) = last_done { + if component.boot_order() == last_done { + continue; + } + } + let components = self + .0 + .iter() + .filter(|c| c.boot_order() == component.boot_order()) + .collect::>(); + + for component in &components { + component.start(&self.1, &cfg).await?; + } + for component in &components { + component.wait_on(&self.1, &cfg).await?; + } + last_done = Some(component.boot_order()); + } + Ok(()) + } } #[macro_export] @@ -313,7 +351,7 @@ macro_rules! impl_component { match tokio::time::timeout(timeout, self.wait_on_inner(cfg)).await { Ok(result) => result, Err(_) => { - let error = format!("Timed out of {:?} expired", timeout); + let error = format!("Time out of {:?} expired", timeout); Err(std::io::Error::new(std::io::ErrorKind::TimedOut, error).into()) } } diff --git a/control-plane/deployer/src/lib.rs b/control-plane/deployer/src/lib.rs index ea9d17463..dd524f392 100644 --- a/control-plane/deployer/src/lib.rs +++ b/control-plane/deployer/src/lib.rs @@ -150,13 +150,13 @@ impl StartOptions { impl CliArgs { /// Act upon the requested action - pub async fn act(&self) -> Result<(), Error> { - self.action.act().await + pub async fn execute(&self) -> Result<(), Error> { + self.action.execute().await } } impl Action { - async fn act(&self) -> Result<(), Error> { + async fn execute(&self) -> Result<(), Error> { match self { Action::Start(options) => options.start(self).await, Action::Stop(options) => options.stop(self).await, diff --git a/control-plane/mbus-api/Cargo.toml b/control-plane/mbus-api/Cargo.toml index 47b551b32..027b730c2 100644 --- a/control-plane/mbus-api/Cargo.toml +++ b/control-plane/mbus-api/Cargo.toml @@ -23,6 +23,7 @@ tracing-futures = "0.2.4" tracing-subscriber = "0.2.0" paperclip = { version = "0.5.0", features = ["actix3"] } percent-encoding = "2.1.0" +uuid = { version = "0.7", features = ["v4"] } [dev-dependencies] composer = { path = "../../composer" } diff --git a/control-plane/mbus-api/src/mbus_nats.rs b/control-plane/mbus-api/src/mbus_nats.rs index 8c2d79668..340b0a105 100644 --- a/control-plane/mbus-api/src/mbus_nats.rs +++ b/control-plane/mbus-api/src/mbus_nats.rs @@ -31,15 +31,16 @@ pub async fn message_bus_init(server: String) { } /// Initialise the Nats Message Bus with Options +/// IGNORES all but the first initialisation of NATS_MSG_BUS pub async fn message_bus_init_options( server: String, timeouts: TimeoutOptions, ) { - let nc = NatsMessageBus::new(&server, BusOptions::new(), timeouts).await; - NATS_MSG_BUS - .set(nc) - .ok() - .expect("Expect to be initialised only once"); + if NATS_MSG_BUS.get().is_none() { + let nc = + NatsMessageBus::new(&server, BusOptions::new(), timeouts).await; + NATS_MSG_BUS.set(nc).ok(); + } } /// Get the static `NatsMessageBus` as a boxed `MessageBus` diff --git a/control-plane/mbus-api/src/v0.rs b/control-plane/mbus-api/src/v0.rs index 5e754cece..1d299be78 100644 --- a/control-plane/mbus-api/src/v0.rs +++ b/control-plane/mbus-api/src/v0.rs @@ -296,7 +296,7 @@ impl Default for Filter { macro_rules! bus_impl_string_id_inner { ($Name:ident, $Doc:literal) => { #[doc = $Doc] - #[derive(Serialize, Deserialize, Default, Debug, Clone, Eq, PartialEq, Hash, Apiv2Schema)] + #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash, Apiv2Schema)] pub struct $Name(String); impl std::fmt::Display for $Name { @@ -340,11 +340,21 @@ macro_rules! bus_impl_string_id_inner { macro_rules! bus_impl_string_id { ($Name:ident, $Doc:literal) => { bus_impl_string_id_inner!($Name, $Doc); + impl Default for $Name { + /// Generates new blank identifier + fn default() -> Self { + $Name(uuid::Uuid::default().to_string()) + } + } impl $Name { /// Build Self from a string trait id pub fn from>(id: T) -> Self { $Name(id.into()) } + /// Generates new random identifier + pub fn new() -> Self { + $Name(uuid::Uuid::new_v4().to_string()) + } } }; } @@ -352,6 +362,11 @@ macro_rules! bus_impl_string_id { macro_rules! bus_impl_string_id_percent_decoding { ($Name:ident, $Doc:literal) => { bus_impl_string_id_inner!($Name, $Doc); + impl Default for $Name { + fn default() -> Self { + $Name("".to_string()) + } + } impl $Name { /// Build Self from a string trait id pub fn from>(id: T) -> Self { diff --git a/control-plane/rest/src/lib.rs b/control-plane/rest/src/lib.rs index 0153804e5..023420188 100644 --- a/control-plane/rest/src/lib.rs +++ b/control-plane/rest/src/lib.rs @@ -15,8 +15,13 @@ /// expose different versions of the client pub mod versions; -use actix_web::{body::Body, client::Client}; +use actix_web::{ + body::Body, + client::{Client, ClientResponse, PayloadError}, + web::Bytes, +}; use actix_web_opentelemetry::ClientExt; +use futures::Stream; use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; use std::{io::BufReader, string::ToString}; @@ -86,7 +91,7 @@ impl ActixRestClient { self.client.get(uri.clone()).send().await }; - let mut rest_response = result.map_err(|error| { + let rest_response = result.map_err(|error| { anyhow::anyhow!( "Failed to get uri '{}' from rest, err={:?}", uri, @@ -94,17 +99,7 @@ impl ActixRestClient { ) })?; - let rest_body = rest_response.body().await?; - if rest_response.status().is_success() { - match serde_json::from_slice(&rest_body) { - Ok(result) => Ok(result), - Err(_) => Ok(vec![serde_json::from_slice::(&rest_body)?]), - } - } else { - let error: serde_json::value::Value = - serde_json::from_slice(&rest_body)?; - Err(anyhow::anyhow!(error.to_string())) - } + Self::rest_vec_result(rest_response).await } async fn put>( &self, @@ -131,7 +126,7 @@ impl ActixRestClient { .await }; - let mut rest_response = result.map_err(|error| { + let rest_response = result.map_err(|error| { anyhow::anyhow!( "Failed to put uri '{}' from rest, err={:?}", uri, @@ -139,14 +134,7 @@ impl ActixRestClient { ) })?; - let rest_body = rest_response.body().await?; - if rest_response.status().is_success() { - Ok(serde_json::from_slice::(&rest_body)?) - } else { - let error: serde_json::value::Value = - serde_json::from_slice(&rest_body)?; - Err(anyhow::anyhow!(error.to_string())) - } + Self::rest_result(rest_response).await } async fn del(&self, urn: String) -> anyhow::Result where @@ -160,7 +148,7 @@ impl ActixRestClient { self.client.delete(uri.clone()).send().await }; - let mut rest_response = result.map_err(|error| { + let rest_response = result.map_err(|error| { anyhow::anyhow!( "Failed to delete uri '{}' from rest, err={:?}", uri, @@ -168,6 +156,36 @@ impl ActixRestClient { ) })?; + Self::rest_result(rest_response).await + } + + async fn rest_vec_result( + mut rest_response: ClientResponse, + ) -> anyhow::Result> + where + S: Stream> + Unpin, + for<'de> R: Deserialize<'de>, + { + let rest_body = rest_response.body().await?; + if rest_response.status().is_success() { + match serde_json::from_slice(&rest_body) { + Ok(result) => Ok(result), + Err(_) => Ok(vec![serde_json::from_slice::(&rest_body)?]), + } + } else { + let error: serde_json::value::Value = + serde_json::from_slice(&rest_body)?; + Err(anyhow::anyhow!(error.to_string())) + } + } + + async fn rest_result( + mut rest_response: ClientResponse, + ) -> anyhow::Result + where + S: Stream> + Unpin, + for<'de> R: Deserialize<'de>, + { let rest_body = rest_response.body().await?; if rest_response.status().is_success() { Ok(serde_json::from_slice::(&rest_body)?) diff --git a/control-plane/tests/Cargo.toml b/control-plane/tests/Cargo.toml new file mode 100644 index 000000000..c26125240 --- /dev/null +++ b/control-plane/tests/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "ctrlp-tests" +version = "0.1.0" +authors = ["Tiago Castro "] +edition = "2018" +description = "Control Plane 'Compose' Tests" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +[dev-dependencies] +composer = { path = "../../composer" } +deployer = { path = "../deployer" } +rest = { path = "../rest" } +actix-rt = "1.1.1" +opentelemetry-jaeger = { version = "0.10", features = ["tokio"] } +tracing-opentelemetry = "0.10.0" +tracing = "0.1" +opentelemetry = "0.11.2" +actix-web-opentelemetry = "0.9.0" +anyhow = "1.0.32" \ No newline at end of file diff --git a/control-plane/tests/tests/common/mod.rs b/control-plane/tests/tests/common/mod.rs new file mode 100644 index 000000000..f8ecc399f --- /dev/null +++ b/control-plane/tests/tests/common/mod.rs @@ -0,0 +1,329 @@ +use composer::*; +use deployer_lib::{ + infra::{Components, Error, Mayastor}, + *, +}; +use opentelemetry::{ + global, + sdk::{propagation::TraceContextPropagator, trace::Tracer}, +}; + +use opentelemetry_jaeger::Uninstall; +pub use rest_client::{ + versions::v0::{self, RestClient}, + ActixRestClient, +}; + +#[actix_rt::test] +#[ignore] +async fn smoke_test() { + // make sure the cluster can bootstrap properly + let _cluster = ClusterBuilder::builder() + .build() + .await + .expect("Should bootstrap the cluster!"); +} + +/// Default options to create a cluster +pub fn default_options() -> StartOptions { + StartOptions::default() + .with_agents(default_agents().split(',').collect()) + .with_jaeger(true) + .with_mayastors(1) + .with_show_info(true) + .with_cluster_name("rest_cluster") +} + +/// Cluster with the composer, the rest client and the jaeger pipeline# +#[allow(unused)] +pub struct Cluster { + composer: ComposeTest, + rest_client: ActixRestClient, + jaeger: (Tracer, Uninstall), + builder: ClusterBuilder, +} + +impl Cluster { + /// node id for `index` + pub fn node(&self, index: u32) -> v0::NodeId { + Mayastor::name(index, &self.builder.opts).into() + } + + /// pool id for `pool` index on `node` index + pub fn pool(&self, node: u32, pool: u32) -> v0::PoolId { + format!("{}-pool-{}", self.node(node), pool + 1).into() + } + + /// replica id with index for `pool` index and `replica` index + pub fn replica(pool: u32, replica: u32) -> v0::ReplicaId { + let mut uuid = v0::ReplicaId::default().to_string(); + let _ = uuid.drain(27 .. uuid.len()); + format!("{}{:01x}{:08x}", uuid, pool as u8, replica).into() + } + + /// rest client v0 + pub fn rest_v0(&self) -> impl RestClient { + self.rest_client.v0() + } + + /// New cluster + async fn new( + trace_rest: bool, + components: Components, + composer: ComposeTest, + jaeger: (Tracer, Uninstall), + ) -> Result { + let rest_client = + ActixRestClient::new("https://localhost:8080", trace_rest).unwrap(); + + components + .start_wait(&composer, std::time::Duration::from_secs(10)) + .await?; + + let cluster = Cluster { + composer, + rest_client, + jaeger, + builder: ClusterBuilder::builder(), + }; + + Ok(cluster) + } +} + +fn option_str(input: Option) -> String { + match input { + Some(input) => input.to_string(), + None => "?".into(), + } +} + +/// Run future and compare result with what's expected +/// Expected result should be in the form Result +/// where TestValue is a useful value which will be added to the returned error +/// string Eg, testing the replica share protocol: +/// test_result(Ok(Nvmf), async move { ... }) +/// test_result(Err(NBD), async move { ... }) +pub async fn test_result( + expected: &Result, + future: F, +) -> Result<(), anyhow::Error> +where + F: std::future::Future>, + R: std::fmt::Display, + E: std::fmt::Debug, + O: std::fmt::Debug, +{ + match future.await { + Ok(_) if expected.is_ok() => Ok(()), + Err(_) if expected.is_err() => Ok(()), + Err(error) => Err(anyhow::anyhow!( + "Expected '{:#?}' but failed with '{}'!", + expected, + error + )), + Ok(_) => { + Err(anyhow::anyhow!("Expected '{:#?}' but succeeded!", expected)) + } + } +} + +#[macro_export] +macro_rules! result_either { + ($test:expr) => { + match $test { + Ok(v) => v, + Err(v) => v, + } + }; +} + +/// Builder for the Cluster +pub struct ClusterBuilder { + opts: StartOptions, + pools: u32, + replicas: (u32, u64, v0::Protocol), + trace: bool, +} + +impl ClusterBuilder { + /// Cluster Builder with default options + pub fn builder() -> Self { + ClusterBuilder { + opts: default_options(), + pools: 0, + replicas: (0, 0, v0::Protocol::Off), + trace: true, + } + } + /// Update the start options + pub fn with_options(mut self, set: F) -> Self + where + F: Fn(StartOptions) -> StartOptions, + { + self.opts = set(self.opts); + self + } + /// Enable/Disable jaeger tracing + pub fn with_tracing(mut self, enabled: bool) -> Self { + self.trace = enabled; + self + } + /// Add `count` malloc pools (100MiB size) to each node + pub fn with_pools(mut self, count: u32) -> Self { + self.pools = count; + self + } + /// Add `count` replicas to each node per pool + pub fn with_replicas( + mut self, + count: u32, + size: u64, + share: v0::Protocol, + ) -> Self { + self.replicas = (count, size, share); + self + } + /// Build into the resulting Cluster using a composer closure, eg: + /// .compose_build(|c| c.with_logs(false)) + pub async fn compose_build(self, set: F) -> Result + where + F: Fn(Builder) -> Builder, + { + let (components, composer) = self.build_prepare()?; + let composer = set(composer); + let mut cluster = self.new_cluster(components, composer).await?; + cluster.builder = self; + Ok(cluster) + } + /// Build into the resulting Cluster + pub async fn build(self) -> Result { + let (components, composer) = self.build_prepare()?; + let mut cluster = self.new_cluster(components, composer).await?; + cluster.builder = self; + Ok(cluster) + } + fn build_prepare(&self) -> Result<(Components, Builder), Error> { + let components = Components::new(self.opts.clone()); + let composer = Builder::new() + .name(&self.opts.cluster_name) + .configure(components.clone())? + .with_base_image(self.opts.base_image.clone()) + .autorun(false) + .with_default_tracing() + .with_clean(true) + // test script will clean up containers if ran on CI/CD + .with_clean_on_panic(false) + .with_logs(true); + Ok((components, composer)) + } + async fn new_cluster( + &self, + components: Components, + compose_builder: Builder, + ) -> Result { + global::set_text_map_propagator(TraceContextPropagator::new()); + let jaeger = opentelemetry_jaeger::new_pipeline() + .with_service_name("tests-client") + .install() + .unwrap(); + + let composer = compose_builder.build().await?; + let cluster = + Cluster::new(self.trace, components, composer, jaeger).await?; + + if self.opts.show_info { + for container in cluster.composer.list_cluster_containers().await? { + let networks = + container.network_settings.unwrap().networks.unwrap(); + let ip = networks + .get(&self.opts.cluster_name) + .unwrap() + .ip_address + .clone(); + tracing::debug!( + "{:?} [{}] {}", + container.names.clone().unwrap_or_default(), + ip.clone().unwrap_or_default(), + option_str(container.command.clone()) + ); + } + } + for pool in &self.pools() { + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: pool.node.clone().into(), + id: pool.id(), + disks: vec![pool.disk()], + }) + .await + .unwrap(); + for replica in &pool.replicas { + cluster + .rest_v0() + .create_replica(replica.clone()) + .await + .unwrap(); + } + } + + Ok(cluster) + } + fn pools(&self) -> Vec { + let mut pools = vec![]; + for pool_index in 0 .. self.pools { + for node in 0 .. self.opts.mayastors { + let mut pool = Pool { + node: Mayastor::name(node, &self.opts), + kind: PoolKind::Malloc, + size_mb: 100, + index: (pool_index + 1) as u32, + replicas: vec![], + }; + for replica_index in 0 .. self.replicas.0 { + pool.replicas.push(v0::CreateReplica { + node: pool.node.clone().into(), + uuid: Cluster::replica(pool_index, replica_index), + pool: pool.id(), + size: self.replicas.1, + thin: false, + share: self.replicas.2.clone(), + }); + } + pools.push(pool); + } + } + pools + } +} + +#[allow(dead_code)] +enum PoolKind { + Malloc, + Aio, + Uring, + Nvmf, +} + +struct Pool { + node: String, + kind: PoolKind, + size_mb: u32, + index: u32, + replicas: Vec, +} + +impl Pool { + fn id(&self) -> v0::PoolId { + format!("{}-pool-{}", self.node, self.index).into() + } + fn disk(&self) -> String { + match self.kind { + PoolKind::Malloc => { + format!("malloc:///disk{}?size_mb={}", self.index, self.size_mb) + } + _ => panic!("kind not supported!"), + } + } +} diff --git a/control-plane/tests/tests/nexus.rs b/control-plane/tests/tests/nexus.rs new file mode 100644 index 000000000..d1497c57a --- /dev/null +++ b/control-plane/tests/tests/nexus.rs @@ -0,0 +1,25 @@ +#![feature(allow_fail)] + +pub mod common; +use common::*; + +#[actix_rt::test] +async fn create_nexus() { + let cluster = ClusterBuilder::builder() + .with_pools(1) + .with_replicas(2, 5 * 1024 * 1024, v0::Protocol::Off) + .build() + .await + .unwrap(); + + cluster + .rest_v0() + .create_nexus(v0::CreateNexus { + node: cluster.node(0), + uuid: v0::NexusId::new(), + size: 10 * 1024 * 1024, + children: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .unwrap(); +} diff --git a/control-plane/tests/tests/pools.rs b/control-plane/tests/tests/pools.rs new file mode 100644 index 000000000..2268f9ac9 --- /dev/null +++ b/control-plane/tests/tests/pools.rs @@ -0,0 +1,218 @@ +#![feature(allow_fail)] + +pub mod common; +use common::*; + +#[actix_rt::test] +async fn create_pool_malloc() { + let cluster = ClusterBuilder::builder().build().await.unwrap(); + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .unwrap(); +} + +#[actix_rt::test] +async fn create_pool_with_missing_disk() { + let cluster = ClusterBuilder::builder().build().await.unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop".into(), + disks: vec!["/dev/c/3po".into()], + }) + .await + .expect_err("Device should not exist"); +} + +#[actix_rt::test] +async fn create_pool_with_existing_disk() { + let cluster = ClusterBuilder::builder().build().await.unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop-new".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .expect_err("Disk should be used by another pool"); + + cluster + .rest_v0() + .destroy_pool(v0::DestroyPool { + node: "mayastor".into(), + id: "pooloop".into(), + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop-new".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .expect("Should now be able to create the new pool"); +} + +#[actix_rt::test] +async fn create_pool_idempotent() { + let cluster = ClusterBuilder::builder().build().await.unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .unwrap(); +} + +/// FIXME: CAS-710 +#[actix_rt::test] +#[allow_fail] +async fn create_pool_idempotent_same_disk_different_query() { + let cluster = ClusterBuilder::builder() + // don't log whilst we have the allow_fail + .compose_build(|c| c.with_logs(false)) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop".into(), + disks: vec!["malloc:///disk?size_mb=100&blk_size=512".into()], + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor".into(), + id: "pooloop".into(), + disks: vec!["malloc:///disk?size_mb=200&blk_size=4096".into()], + }) + .await + .expect_err("Different query not allowed!"); +} + +#[actix_rt::test] +async fn create_pool_idempotent_different_nvmf_host() { + let cluster = ClusterBuilder::builder() + .with_options(|opts| opts.with_mayastors(3)) + .build() + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor-1".into(), + id: "pooloop-1".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .unwrap(); + + let replica1 = cluster + .rest_v0() + .create_replica(v0::CreateReplica { + node: "mayastor-1".into(), + uuid: "0aa4a830-a971-4e96-a97c-15c39dd8f162".into(), + pool: "pooloop-1".into(), + size: 10 * 1024 * 1024, + thin: true, + share: v0::Protocol::Nvmf, + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor-2".into(), + id: "pooloop-2".into(), + disks: vec!["malloc:///disk?size_mb=100".into()], + }) + .await + .unwrap(); + + let replica2 = cluster + .rest_v0() + .create_replica(v0::CreateReplica { + node: "mayastor-2".into(), + uuid: "0aa4a830-a971-4e96-a97c-15c39dd8f162".into(), + pool: "pooloop-2".into(), + size: 10 * 1024 * 1024, + thin: true, + share: v0::Protocol::Nvmf, + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor-3".into(), + id: "pooloop".into(), + disks: vec![replica1.uri.clone()], + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor-3".into(), + id: "pooloop".into(), + disks: vec![replica1.uri], + }) + .await + .unwrap(); + + cluster + .rest_v0() + .create_pool(v0::CreatePool { + node: "mayastor-3".into(), + id: "pooloop".into(), + disks: vec![replica2.uri], + }) + .await + .expect_err("Different host!"); +} diff --git a/control-plane/tests/tests/replicas.rs b/control-plane/tests/tests/replicas.rs new file mode 100644 index 000000000..ebe121116 --- /dev/null +++ b/control-plane/tests/tests/replicas.rs @@ -0,0 +1,131 @@ +#![feature(allow_fail)] + +pub mod common; +use common::*; + +// FIXME: CAS-721 +#[actix_rt::test] +#[allow_fail] +async fn create_replica() { + let cluster = ClusterBuilder::builder() + .with_pools(1) + // don't log whilst we have the allow_fail + .compose_build(|c| c.with_logs(false)) + .await + .unwrap(); + + let replica = v0::CreateReplica { + node: cluster.node(0), + uuid: Default::default(), + pool: cluster.pool(0, 0), + size: 5 * 1024 * 1024, + thin: true, + share: v0::Protocol::Off, + }; + let created_replica = cluster + .rest_v0() + .create_replica(replica.clone()) + .await + .unwrap(); + assert_eq!(created_replica.node, replica.node); + assert_eq!(created_replica.uuid, replica.uuid); + assert_eq!(created_replica.pool, replica.pool); + + // todo: why is this not the same? + // assert_eq!(created_replica.size, replica.size); + // fixme: replicas are always created without thin provisioning + assert_eq!(created_replica.thin, replica.thin); + assert_eq!(created_replica.share, replica.share); +} + +#[actix_rt::test] +async fn create_replica_protocols() { + let cluster = ClusterBuilder::builder() + .with_pools(1) + .build() + .await + .unwrap(); + + let protocols = vec![ + Err(v0::Protocol::Nbd), + Err(v0::Protocol::Iscsi), + Ok(v0::Protocol::Nvmf), + Ok(v0::Protocol::Off), + ]; + + for test in protocols { + let protocol = result_either!(&test); + test_result( + &test, + cluster.rest_v0().create_replica(v0::CreateReplica { + node: cluster.node(0), + uuid: v0::ReplicaId::new(), + pool: cluster.pool(0, 0), + size: 5 * 1024 * 1024, + thin: true, + share: protocol.clone(), + }), + ) + .await + .unwrap(); + } +} + +// FIXME: CAS-731 +#[actix_rt::test] +#[allow_fail] +async fn create_replica_idempotent_different_sizes() { + let cluster = ClusterBuilder::builder() + .with_pools(1) + // don't log whilst we have the allow_fail + .compose_build(|c| c.with_logs(false)) + .await + .unwrap(); + + let uuid = v0::ReplicaId::new(); + let size = 5 * 1024 * 1024; + let replica = cluster + .rest_v0() + .create_replica(v0::CreateReplica { + node: cluster.node(0), + uuid: uuid.clone(), + pool: cluster.pool(0, 0), + size, + thin: false, + share: v0::Protocol::Off, + }) + .await + .unwrap(); + assert_eq!(&replica.uuid, &uuid); + + cluster + .rest_v0() + .create_replica(v0::CreateReplica { + node: cluster.node(0), + uuid: uuid.clone(), + pool: cluster.pool(0, 0), + size, + thin: replica.thin, + share: v0::Protocol::Off, + }) + .await + .unwrap(); + + let sizes = vec![Ok(size), Err(size / 2), Err(size * 2)]; + for test in sizes { + let size = result_either!(test); + test_result( + &test, + cluster.rest_v0().create_replica(v0::CreateReplica { + node: cluster.node(0), + uuid: v0::ReplicaId::new(), + pool: cluster.pool(0, 0), + size, + thin: replica.thin, + share: v0::Protocol::Off, + }), + ) + .await + .unwrap(); + } +} diff --git a/nix/pkgs/control-plane/cargo-project.nix b/nix/pkgs/control-plane/cargo-project.nix index aa700c632..2cef59bc7 100644 --- a/nix/pkgs/control-plane/cargo-project.nix +++ b/nix/pkgs/control-plane/cargo-project.nix @@ -29,7 +29,7 @@ let buildProps = rec { name = "control-plane"; #cargoSha256 = "0000000000000000000000000000000000000000000000000000"; - cargoSha256 = "02qf9pnja4cn31qnzawbrqhny88ja19sqm68zy12ly4vmg6dd3lf"; + cargoSha256 = "1iqmrl8qm8nw1hg219kdyxd1zk9c58p1avymjis3snxnlagafx37"; inherit version; src = whitelistSource ../../../. (pkgs.callPackage ../mayastor { }).src_list; cargoBuildFlags = [ "-p mbus_api" "-p agents" "-p rest" "-p operators" ]; diff --git a/nix/pkgs/mayastor/default.nix b/nix/pkgs/mayastor/default.nix index 94b78b47d..c490e4703 100644 --- a/nix/pkgs/mayastor/default.nix +++ b/nix/pkgs/mayastor/default.nix @@ -56,7 +56,7 @@ let buildProps = rec { name = "mayastor"; #cargoSha256 = "0000000000000000000000000000000000000000000000000000"; - cargoSha256 = "1c5zwaivwsx7gznjvsd0gfhbvjji5q1qbjacdm6vfapqv9i79yfn"; + cargoSha256 = "1ynd6fmdr89f0g9vqsbz2rfl6ld23qv92lqcma5m4xcyhblbv5g0"; inherit version; src = whitelistSource ../../../. src_list; LIBCLANG_PATH = "${llvmPackages.libclang}/lib"; diff --git a/scripts/cargo-test.sh b/scripts/cargo-test.sh index 242149a95..02dba8ed8 100755 --- a/scripts/cargo-test.sh +++ b/scripts/cargo-test.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +SCRIPTDIR=$(dirname "$0") + cleanup_handler() { for c in $(docker ps -a --filter "label=io.mayastor.test.name" --format '{{.ID}}') ; do docker kill "$c" || true @@ -19,7 +21,5 @@ export PATH=$PATH:${HOME}/.cargo/bin # test dependencies cargo build --bins ( cd mayastor && cargo test -- --test-threads=1 ) -for test in composer agents rest; do - cargo test -p ${test} -- --test-threads=1 -done ( cd nvmeadm && cargo test ) +"$SCRIPTDIR/ctrlp-cargo-test.sh" diff --git a/scripts/ctrlp-cargo-test.sh b/scripts/ctrlp-cargo-test.sh new file mode 100755 index 000000000..680532303 --- /dev/null +++ b/scripts/ctrlp-cargo-test.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +cleanup_handler() { + for c in $(docker ps -a --filter "label=io.mayastor.test.name" --format '{{.ID}}') ; do + docker kill "$c" || true + docker rm "$c" || true + done + + for n in $(docker network ls --filter "label=io.mayastor.test.name" --format '{{.ID}}') ; do + docker network rm "$n" || true + done +} + +trap cleanup_handler ERR INT QUIT TERM HUP + +set -euxo pipefail +export PATH=$PATH:${HOME}/.cargo/bin +# test dependencies +cargo build --bins +for test in composer agents rest ctrlp-tests; do + cargo test -p ${test} -- --test-threads=1 +done