Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(io-engine): add persistent store transaction API #1791

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed io-engine/local-randrw-0-verify.state
Binary file not shown.
25 changes: 24 additions & 1 deletion io-engine/src/persistent_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use crate::{
core::Reactor,
store::{
etcd::Etcd,
store_defs::{DeleteWait, GetWait, PutWait, Store, StoreError, StoreKey, StoreValue},
store_defs::{
DeleteWait, GetWait, PutWait, Store, StoreError, StoreKey, StoreValue, TxnWait,
},
},
};
use etcd_client::{Compare, TxnOp, TxnResponse};
use futures::channel::oneshot;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
Expand Down Expand Up @@ -193,6 +196,26 @@ impl PersistentStore {
})?
}

/// Executes a transaction for the given key.
pub async fn txn(
key: &impl StoreKey,
cmps: Vec<Compare>,
ops_success: Vec<TxnOp>,
ops_failure: Option<Vec<TxnOp>>,
) -> Result<TxnResponse, StoreError> {
let key_string = key.to_string();
let rx = Self::execute_store_op(async move {
info!("Executing transaction for key {}.", key_string);
Self::backing_store()
.txn_kv(&key_string, cmps, ops_success, ops_failure)
.await
});

rx.await.context(TxnWait {
key: key.to_string(),
})?
}

/// Retrieves a value, with the given key, from the store.
pub async fn get(key: &impl StoreKey) -> Result<Value, StoreError> {
let key_string = key.to_string();
Expand Down
23 changes: 21 additions & 2 deletions io-engine/src/store/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

use crate::store::store_defs::{
Connect, Delete, DeserialiseValue, Get, Put, SerialiseValue, Store, StoreError,
StoreError::MissingEntry, StoreKey, StoreValue, ValueString,
StoreError::MissingEntry, StoreKey, StoreValue, Txn as TxnErr, ValueString,
};
use async_trait::async_trait;
use etcd_client::Client;
use etcd_client::{Client, Compare, Txn, TxnOp, TxnResponse};
use serde_json::Value;
use snafu::ResultExt;

Expand Down Expand Up @@ -49,6 +49,25 @@ impl Store for Etcd {
Ok(())
}

/// Executes a transaction for the given key. If the compares succeed, then
/// ops_success will be executed atomically, otherwise ops_failure will be
/// executed atomically.
async fn txn_kv<K: StoreKey>(
&mut self,
key: &K,
cmps: Vec<Compare>,
ops_success: Vec<TxnOp>,
ops_failure: Option<Vec<TxnOp>>,
) -> Result<TxnResponse, StoreError> {
let fops = ops_failure.map_or(vec![], |v| v);
self.0
.txn(Txn::new().when(cmps).and_then(ops_success).or_else(fops))
.await
.context(TxnErr {
key: key.to_string(),
})
}

/// 'Get' the value for the given key from etcd.
async fn get_kv<K: StoreKey>(&mut self, key: &K) -> Result<Value, StoreError> {
let resp = self.0.get(key.to_string(), None).await.context(Get {
Expand Down
22 changes: 21 additions & 1 deletion io-engine/src/store/store_defs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Definition of a trait for a key-value store together with its error codes.

use async_trait::async_trait;
use etcd_client::Error;
use etcd_client::{Compare, Error, TxnOp, TxnResponse};
use serde_json::{Error as SerdeError, Value};
use snafu::Snafu;

Expand Down Expand Up @@ -56,6 +56,18 @@ pub enum StoreError {
key: String,
source: futures::channel::oneshot::Canceled,
},
/// Failed to complete a 'transaction'.
#[snafu(display("Failed to do 'transaction' for key {}. Error {}", key, source))]
Txn { key: String, source: Error },
/// Failed to wait for 'transaction' operations.
#[snafu(display(
"Failed to wait for 'transaction' operations to complete for key {}.",
key,
))]
TxnWait {
key: String,
source: futures::channel::oneshot::Canceled,
},
/// Failed to 'watch' an entry in the store.
#[snafu(display("Failed to 'watch' entry with key {}. Error {}", key, source))]
Watch { key: String, source: Error },
Expand Down Expand Up @@ -93,6 +105,14 @@ pub trait Store: Sync + Send + Clone {
value: &V,
) -> Result<(), StoreError>;

async fn txn_kv<K: StoreKey>(
&mut self,
key: &K,
cmps: Vec<Compare>,
ops_success: Vec<TxnOp>,
ops_failure: Option<Vec<TxnOp>>,
) -> Result<TxnResponse, StoreError>;

/// Get an entry from the store.
async fn get_kv<K: StoreKey>(&mut self, key: &K) -> Result<Value, StoreError>;

Expand Down
93 changes: 89 additions & 4 deletions io-engine/tests/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ use common::compose::{
},
GrpcConnect, RpcHandle,
},
Binary, Builder, ComposeTest, ContainerSpec,
Binary, Builder, ComposeTest, ContainerSpec, MayastorTest,
};
use etcd_client::Client;

use io_engine::bdev::nexus::{ChildInfo, NexusInfo};
use etcd_client::{Client, Compare, CompareOp, TxnOp};

use io_engine::{
bdev::nexus::{ChildInfo, NexusInfo},
core::MayastorCliArgs,
persistent_store::{PersistentStore, PersistentStoreBuilder},
};
use std::{convert::TryFrom, thread::sleep, time::Duration};
use url::Url;

pub mod common;
use once_cell::sync::OnceCell;

static MAYASTOR: OnceCell<MayastorTest> = OnceCell::new();
static ETCD_ENDPOINT: &str = "0.0.0.0:2379";
static CHILD1_UUID: &str = "d61b2fdf-1be8-457a-a481-70a42d0a2223";
static CHILD2_UUID: &str = "094ae8c6-46aa-4139-b4f2-550d39645db3";
Expand Down Expand Up @@ -519,3 +524,83 @@ pub(crate) fn uuid(uri: &str) -> String {
}
panic!("URI does not contain a uuid query parameter.");
}

// This test does a successful etcd transaction that upon successful CompareOp, modifies
// an existing key and adds a new key. Then does another transaction which is
// fails the CompareOp and hence that transaction is expected to fail.
#[tokio::test]
async fn pstor_txn_api() {
let dummy_key1 = "dummy_key_1".to_string();
let dummy_key2 = "dummy_key_2".to_string();
let pre_txn_value_k1 = "pre_txn_value_key1".to_string();
let post_txn_value_k1 = "post_txn_value_key1".to_string();
let post_txn_value_k2 = "post_txn_value_key2".to_string();
common::composer_init();

let _test = Builder::new()
.name("etcd_txn_test")
.add_container_spec(
ContainerSpec::from_binary(
"etcd",
Binary::from_path(env!("ETCD_BIN")).with_args(vec![
"--data-dir",
"/tmp/etcd-data",
"--advertise-client-urls",
"http://0.0.0.0:2379",
"--listen-client-urls",
"http://0.0.0.0:2379",
]),
)
.with_portmap("2379", "2379")
.with_portmap("2380", "2380"),
)
.build()
.await
.unwrap();

PersistentStoreBuilder::new()
.with_endpoint(ETCD_ENDPOINT)
.connect()
.await;

// create the mayastor test instance
let ms = MayastorTest::new(MayastorCliArgs {
log_components: vec!["all".into()],
reactor_mask: "0x3".to_string(),
no_pci: true,
..Default::default()
});

let _ms = MAYASTOR.get_or_init(|| ms);
MAYASTOR
.get()
.unwrap()
.spawn(async move {
let _ = PersistentStore::put(&dummy_key1, &pre_txn_value_k1).await;
let expect = serde_json::to_vec(&pre_txn_value_k1).unwrap();

// Transaction - expected to succeed.
let cmp = vec![Compare::value(
dummy_key1.to_string(),
CompareOp::Equal,
expect,
)];
let ops = vec![
TxnOp::put(dummy_key1.to_string(), post_txn_value_k1, None),
TxnOp::put(dummy_key2.to_string(), post_txn_value_k2, None),
];
let txn_resp = PersistentStore::txn(&dummy_key1, cmp.clone(), ops, None)
.await
.unwrap();
assert!(txn_resp.succeeded());

// A new transaction - expected to fail. Execute fops upon compare failure.
let ops = vec![TxnOp::delete(dummy_key1.to_string(), None)];
let fops = vec![TxnOp::delete(dummy_key2.to_string(), None)];
let txn_resp = PersistentStore::txn(&dummy_key1, cmp, ops, Some(fops))
.await
.unwrap();
assert!(!txn_resp.succeeded());
})
.await;
}
Loading