Skip to content

Commit

Permalink
test(rest): add more tests using rest
Browse files Browse the repository at this point in the history
Add more rest tests around replica and the nexus.
Get the correct error from the rest to avoid false negatives.
Deferring the "replica outage" tests as those kind of tests were often
 failing on mayastor already.
  • Loading branch information
tiagolobocastro committed Feb 23, 2021
1 parent c20e45d commit 1aeb34a
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 74 deletions.
73 changes: 41 additions & 32 deletions control-plane/rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,33 +182,32 @@ impl ActixRestClient {
head.headers = headers.clone();
head
};
let body = rest_response.body().await.map_err(|_| {
ClientError::InvalidPayload {
head: head(),
}
let body = rest_response.body().await.context(InvalidPayload {
head: head(),
})?;
if status.is_success() {
match serde_json::from_slice(&body) {
Ok(r) => Ok(r),
Err(_error) => match serde_json::from_slice(&body) {
Ok(r) => Ok(vec![r]),
Err(_error) => Err(ClientError::InvalidBody {
head: head(),
body,
}),
},
Err(_) => {
let result =
serde_json::from_slice(&body).context(InvalidBody {
head: head(),
body,
})?;
Ok(vec![result])
}
}
} else if body.is_empty() {
Err(ClientError::Header {
head: head(),
})
} else {
let error = serde_json::from_slice::<serde_json::Value>(&body)
.map_err(|_| ClientError::InvalidBody {
.context(InvalidBody {
head: head(),
body,
})?;
Err(ClientError::Valid {
Err(ClientError::RestServer {
head: head(),
error,
})
Expand All @@ -229,30 +228,27 @@ impl ActixRestClient {
head.headers = headers.clone();
head
};
let body = rest_response.body().await.map_err(|_| {
ClientError::InvalidPayload {
head: head(),
}
let body = rest_response.body().await.context(InvalidPayload {
head: head(),
})?;
if status.is_success() {
let result = serde_json::from_slice(&body).map_err(|_| {
ClientError::InvalidBody {
let result =
serde_json::from_slice(&body).context(InvalidBody {
head: head(),
body,
}
})?;
})?;
Ok(result)
} else if body.is_empty() {
Err(ClientError::Header {
head: head(),
})
} else {
let error = serde_json::from_slice::<serde_json::Value>(&body)
.map_err(|_| ClientError::InvalidBody {
.context(InvalidBody {
head: head(),
body,
})?;
Err(ClientError::Valid {
Err(ClientError::RestServer {
head: head(),
error,
})
Expand All @@ -267,43 +263,56 @@ pub type ClientResult<T> = Result<T, ClientError>;
/// Rest Client Error
#[derive(Debug, Snafu)]
pub enum ClientError {
/// Failed to send message to the server
/// Failed to send message to the server (details in source)
#[snafu(display("{}, reason: {}", details, source))]
Send {
/// Message
details: String,
/// Source Request Error
source: SendRequestError,
},
/// Invalid Resource Filter
/// Invalid Resource Filter so couldn't send the request
#[snafu(display("Invalid Resource Filter: {}", details))]
InvalidFilter {
/// Message
details: String,
},
/// Invalid Payload
#[snafu(display("Invalid payload, header: {:?}", head))]
/// Response an error code and with an invalid payload
#[snafu(display(
"Invalid payload, header: {:?}, reason: {}",
head,
source
))]
InvalidPayload {
/// http Header
head: ResponseHead,
/// source payload error
source: PayloadError,
},
/// Invalid Body
#[snafu(display("Invalid body, header: {:?}", head))]
/// Response an error code and also with an invalid body
#[snafu(display(
"Invalid body, header: {:?}, body: {:?}, reason: {}",
head,
body,
source
))]
InvalidBody {
/// http Header
head: ResponseHead,
/// http Body
body: Bytes,
/// source json deserialize error
source: serde_json::Error,
},
/// No Body
/// Response an error code and only the header (and so no additional info)
#[snafu(display("No body, header: {:?}", head))]
Header {
/// http Header
head: ResponseHead,
},
/// Body in JSON format
/// Error within the Body in valid JSON format, returned by the Rest Server
#[snafu(display("Http status: {}, error: {}", head.status, error.to_string()))]
Valid {
RestServer {
/// http Header
head: ResponseHead,
/// JSON error
Expand Down
122 changes: 85 additions & 37 deletions control-plane/tests/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use opentelemetry::{
};

use opentelemetry_jaeger::Uninstall;
use rest_client::ClientError;
pub use rest_client::{
versions::v0::{self, RestClient},
ActixRestClient,
Expand Down Expand Up @@ -55,7 +56,7 @@ impl Cluster {
}

/// replica id with index for `pool` index and `replica` index
pub fn replica(pool: u32, replica: u32) -> v0::ReplicaId {
pub fn replica(pool: usize, replica: u32) -> v0::ReplicaId {
let mut uuid = v0::ReplicaId::default().to_string();
let _ = uuid.drain(27 .. uuid.len());
format!("{}{:01x}{:08x}", uuid, pool as u8, replica).into()
Expand All @@ -69,12 +70,17 @@ impl Cluster {
/// New cluster
async fn new(
trace_rest: bool,
timeout_rest: std::time::Duration,
components: Components,
composer: ComposeTest,
jaeger: (Tracer, Uninstall),
) -> Result<Cluster, Error> {
let rest_client =
ActixRestClient::new("https://localhost:8080", trace_rest).unwrap();
let rest_client = ActixRestClient::new_timeout(
"https://localhost:8080",
trace_rest,
timeout_rest,
)
.unwrap();

components
.start_wait(&composer, std::time::Duration::from_secs(10))
Expand Down Expand Up @@ -104,19 +110,26 @@ fn option_str<F: ToString>(input: Option<F>) -> String {
/// string Eg, testing the replica share protocol:
/// test_result(Ok(Nvmf), async move { ... })
/// test_result(Err(NBD), async move { ... })
pub async fn test_result<F, O, E, T, R>(
pub async fn test_result<F, O, E, T>(
expected: &Result<O, E>,
future: F,
) -> Result<(), anyhow::Error>
where
F: std::future::Future<Output = Result<T, R>>,
R: std::fmt::Display,
F: std::future::Future<Output = Result<T, rest_client::ClientError>>,
E: std::fmt::Debug,
O: std::fmt::Debug,
{
match future.await {
Ok(_) if expected.is_ok() => Ok(()),
Err(_) if expected.is_err() => Ok(()),
Err(error) if expected.is_err() => match error {
ClientError::RestServer {
..
} => Ok(()),
_ => {
// not the error we were waiting for
Err(anyhow::anyhow!("Invalid rest response: {}", error))
}
},
Err(error) => Err(anyhow::anyhow!(
"Expected '{:#?}' but failed with '{}'!",
expected,
Expand All @@ -138,22 +151,37 @@ macro_rules! result_either {
};
}

#[derive(Clone)]
enum PoolDisk {
Malloc(u64),
Uri(String),
}

/// Builder for the Cluster
pub struct ClusterBuilder {
opts: StartOptions,
pools: u32,
replicas: (u32, u64, v0::Protocol),
pools: Vec<PoolDisk>,
replicas: Replica,
trace: bool,
timeout: std::time::Duration,
}

#[derive(Default)]
pub struct Replica {
count: u32,
size: u64,
share: v0::Protocol,
}

impl ClusterBuilder {
/// Cluster Builder with default options
pub fn builder() -> Self {
ClusterBuilder {
opts: default_options(),
pools: 0,
replicas: (0, 0, v0::Protocol::Off),
pools: vec![],
replicas: Default::default(),
trace: true,
timeout: std::time::Duration::from_secs(3),
}
}
/// Update the start options
Expand All @@ -169,19 +197,40 @@ impl ClusterBuilder {
self.trace = enabled;
self
}
/// Rest request timeout
pub fn with_rest_timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = timeout;
self
}
/// Add `count` malloc pools (100MiB size) to each node
pub fn with_pools(mut self, count: u32) -> Self {
self.pools = count;
for _ in 0 .. count {
self.pools.push(PoolDisk::Malloc(100 * 1024 * 1024));
}
self
}
/// Add pool with `disk` to each node
pub fn with_pool(mut self, disk: &str) -> Self {
self.pools.push(PoolDisk::Uri(disk.to_string()));
self
}
/// Add `count` replicas to each node per pool
/// Specify `count` replicas to add to each node per pool
pub fn with_replicas(
mut self,
count: u32,
size: u64,
share: v0::Protocol,
) -> Self {
self.replicas = (count, size, share);
self.replicas = Replica {
count,
size,
share,
};
self
}
/// Specify `count` mayastors for the cluster
pub fn with_mayastors(mut self, count: u32) -> Self {
self.opts = self.opts.with_mayastors(count);
self
}
/// Build into the resulting Cluster using a composer closure, eg:
Expand Down Expand Up @@ -229,8 +278,14 @@ impl ClusterBuilder {
.unwrap();

let composer = compose_builder.build().await?;
let cluster =
Cluster::new(self.trace, components, composer, jaeger).await?;
let cluster = Cluster::new(
self.trace,
self.timeout,
components,
composer,
jaeger,
)
.await?;

if self.opts.show_info {
for container in cluster.composer.list_cluster_containers().await? {
Expand Down Expand Up @@ -272,23 +327,24 @@ impl ClusterBuilder {
}
fn pools(&self) -> Vec<Pool> {
let mut pools = vec![];
for pool_index in 0 .. self.pools {
for node in 0 .. self.opts.mayastors {

for node in 0 .. self.opts.mayastors {
for pool_index in 0 .. self.pools.len() {
let pool = &self.pools[pool_index];
let mut pool = Pool {
node: Mayastor::name(node, &self.opts),
kind: PoolKind::Malloc,
size_mb: 100,
disk: pool.clone(),
index: (pool_index + 1) as u32,
replicas: vec![],
};
for replica_index in 0 .. self.replicas.0 {
for replica_index in 0 .. self.replicas.count {
pool.replicas.push(v0::CreateReplica {
node: pool.node.clone().into(),
uuid: Cluster::replica(pool_index, replica_index),
pool: pool.id(),
size: self.replicas.1,
size: self.replicas.size,
thin: false,
share: self.replicas.2.clone(),
share: self.replicas.share.clone(),
});
}
pools.push(pool);
Expand All @@ -298,18 +354,9 @@ impl ClusterBuilder {
}
}

#[allow(dead_code)]
enum PoolKind {
Malloc,
Aio,
Uring,
Nvmf,
}

struct Pool {
node: String,
kind: PoolKind,
size_mb: u32,
disk: PoolDisk,
index: u32,
replicas: Vec<v0::CreateReplica>,
}
Expand All @@ -319,11 +366,12 @@ impl Pool {
format!("{}-pool-{}", self.node, self.index).into()
}
fn disk(&self) -> String {
match self.kind {
PoolKind::Malloc => {
format!("malloc:///disk{}?size_mb={}", self.index, self.size_mb)
match &self.disk {
PoolDisk::Malloc(size) => {
let size = size / (1024 * 1024);
format!("malloc:///disk{}?size_mb={}", self.index, size)
}
_ => panic!("kind not supported!"),
PoolDisk::Uri(uri) => uri.clone(),
}
}
}
Loading

0 comments on commit 1aeb34a

Please sign in to comment.