Skip to content

Commit

Permalink
Merge #1642
Browse files Browse the repository at this point in the history
1642: refactor: add local pool builder r=tiagolobocastro a=tiagolobocastro

todo: extend to allow for Lvm pools as well

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Apr 30, 2024
2 parents 400ab0e + ac3a8e9 commit 5374ae7
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 51 deletions.
212 changes: 191 additions & 21 deletions io-engine-tests/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,78 +8,188 @@ use super::{
},
generate_uuid,
};
use io_engine::{core::Reactor, lvs, pool_backend::PoolArgs};
use std::ops::Deref;
use tonic::Code;

#[derive(Clone)]
pub struct PoolBuilder {
rpc: SharedRpcHandle,
#[derive(Clone, Default)]
pub struct PoolBuilderOpts {
name: Option<String>,
uuid: Option<String>,
bdev: Option<String>,
}

impl PoolBuilder {
pub type PoolBuilder = PoolBuilderRpc;
#[derive(Clone)]
pub struct PoolBuilderRpc {
rpc: SharedRpcHandle,
builder: PoolBuilderOpts,
}

#[derive(Clone, Default)]
pub struct PoolBuilderLocal {
builder: PoolBuilderOpts,
}

#[async_trait::async_trait(?Send)]
pub trait PoolOps {
type Replica;
async fn get_replicas(&self) -> Result<Vec<Self::Replica>, Status>;
async fn create_repl(
&self,
name: &str,
size: u64,
uuid: Option<&str>,
thin: bool,
entity_id: Option<String>,
) -> Result<Self::Replica, Status>;
async fn destroy(self) -> Result<(), Status>;
}

#[async_trait::async_trait(?Send)]
impl PoolOps for PoolLocal {
type Replica = lvs::Lvol;

async fn get_replicas(&self) -> Result<Vec<Self::Replica>, Status> {
Err(Status::unimplemented(""))
}

async fn create_repl(
&self,
name: &str,
size: u64,
uuid: Option<&str>,
thin: bool,
entity_id: Option<String>,
) -> Result<Self::Replica, Status> {
let Some(lvs) = self.lvs.as_ref() else {
return Err(Status::internal("deleted"));
};
let lvol = lvs.create_lvol(name, size, uuid, thin, entity_id).await?;
Ok(lvol)
}

async fn destroy(mut self) -> Result<(), Status> {
if let Some(pool) = self.lvs.take() {
pool.destroy().await?;
}
Ok(())
}
}

pub struct PoolLocal {
lvs: Option<lvs::Lvs>,
cleanup: bool,
}

impl Deref for PoolBuilderRpc {
type Target = PoolBuilderOpts;
fn deref(&self) -> &Self::Target {
&self.builder
}
}
impl Deref for PoolBuilderLocal {
type Target = PoolBuilderOpts;
fn deref(&self) -> &Self::Target {
&self.builder
}
}

impl PoolBuilderRpc {
pub fn new(rpc: SharedRpcHandle) -> Self {
Self {
rpc,
name: None,
uuid: None,
bdev: None,
builder: PoolBuilderOpts::default(),
}
}
}

pub fn with_name(mut self, name: &str) -> Self {
impl PoolBuilderOpts {
pub fn with_name(&mut self, name: &str) -> &mut Self {
self.name = Some(name.to_owned());
self
}

pub fn with_uuid(mut self, uuid: &str) -> Self {
pub fn with_uuid(&mut self, uuid: &str) -> &mut Self {
self.uuid = Some(uuid.to_owned());
self
}

pub fn with_new_uuid(self) -> Self {
pub fn with_new_uuid(&mut self) -> &mut Self {
self.with_uuid(&generate_uuid())
}

pub fn with_bdev(mut self, bdev: &str) -> Self {
pub fn with_bdev(&mut self, bdev: &str) -> &mut Self {
self.bdev = Some(bdev.to_owned());
self
}

pub fn with_malloc(self, bdev_name: &str, size_mb: u64) -> Self {
pub fn with_malloc(&mut self, bdev_name: &str, size_mb: u64) -> &mut Self {
let bdev = format!("malloc:///{bdev_name}?size_mb={size_mb}");
self.with_bdev(&bdev)
}

pub fn with_malloc_blk_size(
self,
&mut self,
bdev_name: &str,
size_mb: u64,
blk_size: u64,
) -> Self {
) -> &mut Self {
let bdev = format!(
"malloc:///{bdev_name}?size_mb={size_mb}&blk_size={blk_size}"
);
self.with_bdev(&bdev)
}

pub fn rpc(&self) -> SharedRpcHandle {
self.rpc.clone()
}

pub fn name(&self) -> String {
self.name.as_ref().expect("Pool name must be set").clone()
}

pub fn uuid(&self) -> String {
self.uuid.as_ref().expect("Pool UUID must be set").clone()
}

pub fn bdev(&self) -> String {
self.bdev.as_ref().expect("Pool Bdev must be set").clone()
}
}

impl PoolBuilderRpc {
pub fn rpc(&self) -> SharedRpcHandle {
self.rpc.clone()
}
pub fn with_name(mut self, name: &str) -> Self {
self.builder.with_name(name);
self
}

pub fn with_uuid(mut self, uuid: &str) -> Self {
self.builder.with_uuid(uuid);
self
}

pub fn with_new_uuid(self) -> Self {
self.with_uuid(&generate_uuid())
}

pub fn with_bdev(mut self, bdev: &str) -> Self {
self.builder.with_bdev(bdev);
self
}

pub fn with_malloc(mut self, bdev_name: &str, size_mb: u64) -> Self {
self.builder.with_malloc(bdev_name, size_mb);
self
}

pub fn with_malloc_blk_size(
mut self,
bdev_name: &str,
size_mb: u64,
blk_size: u64,
) -> Self {
self.builder
.with_malloc_blk_size(bdev_name, size_mb, blk_size);
self
}
pub async fn create(&mut self) -> Result<Pool, Status> {
self.rpc()
.lock()
Expand Down Expand Up @@ -125,6 +235,66 @@ impl PoolBuilder {
}
}

impl PoolBuilderLocal {
pub async fn malloc(name: &str, size_mb: u64) -> Result<PoolLocal, Status> {
let lvs = PoolBuilderLocal::default()
.with_builder(|b| {
b.with_name(name).with_new_uuid().with_malloc(name, size_mb)
})
.create()
.await?;
Ok(PoolLocal {
lvs: Some(lvs),
cleanup: true,
})
}

pub fn with_builder<F>(&mut self, builder: F) -> &mut Self
where
F: Fn(&mut PoolBuilderOpts) -> &mut PoolBuilderOpts,
{
builder(&mut self.builder);
self
}

pub async fn create(&mut self) -> Result<lvs::Lvs, Status> {
let lvs = lvs::Lvs::create_or_import(PoolArgs {
name: self.name(),
uuid: Some(self.uuid()),
disks: vec![self.bdev.as_ref().unwrap().clone()],
cluster_size: None,
backend: Default::default(),
})
.await?;
Ok(lvs)
}

pub async fn destroy(&mut self) -> Result<(), Status> {
let pool = self.get_pool().await?;
pool.destroy().await?;
Ok(())
}

pub async fn get_pool(&self) -> Result<lvs::Lvs, Status> {
let uuid = self.uuid();
lvs::Lvs::lookup_by_uuid(&uuid).ok_or_else(|| {
Status::new(Code::NotFound, format!("Pool '{uuid}' not found"))
})
}
}

impl Drop for PoolLocal {
fn drop(&mut self) {
if self.cleanup {
if let Some(lvs) = self.lvs.take() {
Reactor::block_on(async move {
lvs.destroy().await.ok();
});
}
}
}
}

pub async fn list_pools(rpc: SharedRpcHandle) -> Result<Vec<Pool>, Status> {
rpc.lock()
.await
Expand All @@ -139,7 +309,7 @@ pub async fn list_pools(rpc: SharedRpcHandle) -> Result<Vec<Pool>, Status> {
}

/// Tests that all given pools report the same usage statistics.
pub async fn validate_pools_used_space(pools: &[PoolBuilder]) {
pub async fn validate_pools_used_space(pools: &[PoolBuilderRpc]) {
let mut used_space: Option<u64> = None;
for p in pools {
let pool = p.get_pool().await.unwrap();
Expand Down
39 changes: 9 additions & 30 deletions io-engine/tests/snapshot_rebuild.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use nix::errno::Errno;
use once_cell::sync::OnceCell;
use std::time::Duration;

Expand All @@ -11,12 +10,12 @@ use io_engine::{
pub mod common;
use common::compose::MayastorTest;
use io_engine::{
core::{LogicalVolume, ReadOptions, Share, ToErrno},
lvs::{Lvol, Lvs, LvsLvol},
pool_backend::PoolArgs,
core::{LogicalVolume, ReadOptions, Share},
lvs::{Lvol, LvsLvol},
rebuild::{RebuildJobOptions, SnapshotRebuildJob},
sleep::mayastor_sleep,
};
use io_engine_tests::pool::{PoolBuilderLocal, PoolLocal, PoolOps};

static MAYASTOR: OnceCell<MayastorTest> = OnceCell::new();

Expand All @@ -28,29 +27,8 @@ fn get_ms() -> &'static MayastorTest<'static> {
})
}

async fn create_pool() -> Result<Lvs, String> {
match Lvs::create_or_import(PoolArgs {
name: "tpool".into(),
disks: vec!["malloc:///md?size_mb=100".into()],
uuid: None,
cluster_size: None,
backend: Default::default(),
})
.await
{
Err(error) => {
let err_str = error.to_string();
if error.to_errno() == Errno::EEXIST {
Lvs::lookup("tpool").ok_or("Failed to lookup".into())
} else {
Err(err_str)
}
}
Ok(pool) => Ok(pool),
}
}
async fn create_replica(pool: &Lvs, uuid: &str) -> Result<Lvol, String> {
pool.create_lvol(uuid, SIZE_MB * 1024 * 1024, Some(uuid), true, None)
async fn create_replica(pool: &PoolLocal, uuid: &str) -> Result<Lvol, String> {
pool.create_repl(uuid, SIZE_MB * 1024 * 1024, Some(uuid), true, None)
.await
.map_err(|error| error.to_string())
}
Expand All @@ -67,6 +45,7 @@ fn mb_to_blocks(mb: u64) -> u64 {
(mb * 1024 * 1024) / BLOCK_SIZE
}
const SIZE_MB: u64 = 32;
const POOL_SZ_MB: u64 = SIZE_MB * 3;

#[tokio::test]
async fn malloc_to_malloc() {
Expand Down Expand Up @@ -115,7 +94,7 @@ async fn malloc_to_replica() {
ms.spawn(async move {
let src_uri = format!("malloc:///d?size_mb={SIZE_MB}");

let pool = create_pool().await.unwrap();
let pool = PoolBuilderLocal::malloc("md", POOL_SZ_MB).await.unwrap();
let replica =
create_replica(&pool, "3be1219f-682b-4672-b88b-8b9d07e8104a")
.await
Expand Down Expand Up @@ -149,7 +128,7 @@ async fn replica_to_rebuild_full() {
let ms = get_ms();

ms.spawn(async move {
let pool = create_pool().await.unwrap();
let pool = PoolBuilderLocal::malloc("md", POOL_SZ_MB).await.unwrap();
let replica_src =
create_replica(&pool, "2be1219f-682b-4672-b88b-8b9d07e8104a")
.await
Expand Down Expand Up @@ -191,7 +170,7 @@ async fn replica_to_rebuild_partial() {
let ms = get_ms();

ms.spawn(async move {
let pool = create_pool().await.unwrap();
let pool = PoolBuilderLocal::malloc("md", POOL_SZ_MB).await.unwrap();
let replica_src =
create_replica(&pool, "2be1219f-682b-4672-b88b-8b9d07e8104a")
.await
Expand Down

0 comments on commit 5374ae7

Please sign in to comment.