Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

feat(executor): allow cancel execution units through context #317

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions framework/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,35 @@ impl<DB: TrieDB> CommitHooks<DB> {
}

// bagua kan 101 :)
fn kan<H: FnOnce() -> R, R>(states: Rc<ServiceStateMap<DB>>, hook: H) -> ProtocolResult<()> {
fn kan<H: FnOnce() -> R, R>(
context: ServiceContext,
states: Rc<ServiceStateMap<DB>>,
hook: H,
) -> ProtocolResult<()> {
match panic::catch_unwind(AssertUnwindSafe(hook)) {
Ok(_) => states.stash(),
Ok(_) if !context.canceled() => states.stash(),
Ok(_) => {
states.stash()?;

// An reason must be passed to cancel
let reason = context.cancel_reason();
debug_assert!(reason.is_some());

Err(ExecutorError::Canceled {
service: context.get_service_name().to_owned(),
reason,
}
.into())
}
Err(_) => states.revert_cache(),
}
}
}

impl<DB: TrieDB> TxHooks for CommitHooks<DB> {
// TODO: support abort execution
fn before(&mut self, _context: Context, service_context: ServiceContext) -> ProtocolResult<()> {
for hook in self.inner.iter_mut() {
Self::kan(Rc::clone(&self.states), || {
Self::kan(service_context.clone(), Rc::clone(&self.states), || {
hook.tx_hook_before_(service_context.clone())
})?;
}
Expand All @@ -129,7 +145,7 @@ impl<DB: TrieDB> TxHooks for CommitHooks<DB> {

fn after(&mut self, _context: Context, service_context: ServiceContext) -> ProtocolResult<()> {
for hook in self.inner.iter_mut() {
Self::kan(Rc::clone(&self.states), || {
Self::kan(service_context.clone(), Rc::clone(&self.states), || {
hook.tx_hook_after_(service_context.clone())
})?;
}
Expand Down Expand Up @@ -512,6 +528,12 @@ pub enum ExecutorError {
QueryService(String),
#[display(fmt = "Call service failed: {:?}", _0)]
CallService(String),

#[display(fmt = "service {} canceled {:?}", service, reason)]
Canceled {
service: String,
reason: Option<String>,
},
}

impl std::error::Error for ExecutorError {}
Expand Down
199 changes: 150 additions & 49 deletions framework/src/executor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ use protocol::ProtocolResult;
use crate::executor::ServiceExecutor;
use test_service::TestService;

macro_rules! read {
($executor:expr, $params:expr, $caller:expr, $payload:expr) => {{
let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: $payload.to_owned(),
};

$executor
.read($params, $caller, 1, &request)
.expect(&format!("read {}", $payload))
}};
}

pub const PUB_KEY_STR: &str = "031288a6788678c25952eba8693b2f278f66e2187004b64ac09416d07f83f96d5b";

#[test]
Expand Down Expand Up @@ -332,24 +346,11 @@ fn test_commit_tx_hook_use_panic_tx() {
assert!(error_resp.is_err());

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();
let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""before""#.to_owned(),
};
let before = executor
.read(&params, &caller, 1, &request)
.expect("read before");

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""before""#);

let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""after""#.to_owned(),
};
let after = executor
.read(&params, &caller, 1, &request)
.expect("read after");
let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""after""#);
}

Expand Down Expand Up @@ -394,24 +395,17 @@ fn test_tx_hook_before_panic() {
assert!(error_resp.is_ok());

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();
let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""tx_hook_before_panic""#.to_owned(),
};
let before = executor
.read(&params, &caller, 1, &request)
.expect("read tx");
assert_eq!(before.succeed_data, r#""tx_hook_before_panic""#);

let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""after""#.to_owned(),
};
let after = executor
.read(&params, &caller, 1, &request)
.expect("read after");
let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""""#);

let tx_hook_before_panic = read!(executor, &params, &caller, r#""tx_hook_before_panic""#);
assert_eq!(
tx_hook_before_panic.succeed_data,
r#""tx_hook_before_panic""#
);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""after""#);
}

Expand Down Expand Up @@ -456,25 +450,132 @@ fn test_tx_hook_after_panic() {
assert!(error_resp.is_ok());

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();
let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""before""#.to_owned(),

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""before""#);

let tx_hook_after_panic = read!(executor, &params, &caller, r#""tx_hook_after_panic""#);
assert_eq!(tx_hook_after_panic.succeed_data, r#""tx_hook_after_panic""#);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""""#);
}

#[test]
fn test_tx_hook_before_cancel() {
let toml_str = include_str!("./genesis_services.toml");
let genesis: Genesis = toml::from_str(toml_str).unwrap();

let db = Arc::new(MemoryDB::new(false));

let root = ServiceExecutor::create_genesis(
genesis.services,
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let mut executor = ServiceExecutor::with_root(
root.clone(),
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let params = ExecutorParams {
state_root: root,
height: 1,
timestamp: 0,
cycles_limit: std::u64::MAX,
proposer: Address::from_hash(Hash::from_empty()).unwrap(),
};
let after = executor
.read(&params, &caller, 1, &request)
.expect("read before");
assert_eq!(after.succeed_data, r#""before""#);

let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""tx_hook_after_panic""#.to_owned(),
let mut stx = mock_signed_tx();
stx.raw.request.service_name = "test".to_owned();
stx.raw.request.method = "tx_hook_before_cancel".to_owned();
stx.raw.request.payload = r#""""#.to_owned();

let txs = vec![stx];
let error_resp = executor.exec(Context::new(), &params, &txs);
assert!(error_resp.is_err());
assert!(error_resp
.err()
.expect("err")
.to_string()
.contains("Canceled"));

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""before""#);

let tx_hook_before_cancel = read!(executor, &params, &caller, r#""tx_hook_before_cancel""#);
assert_eq!(tx_hook_before_cancel.succeed_data, r#""""#);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""""#);
}

#[test]
fn test_tx_hook_after_cancel() {
let toml_str = include_str!("./genesis_services.toml");
let genesis: Genesis = toml::from_str(toml_str).unwrap();

let db = Arc::new(MemoryDB::new(false));

let root = ServiceExecutor::create_genesis(
genesis.services,
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let mut executor = ServiceExecutor::with_root(
root.clone(),
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let params = ExecutorParams {
state_root: root,
height: 1,
timestamp: 0,
cycles_limit: std::u64::MAX,
proposer: Address::from_hash(Hash::from_empty()).unwrap(),
};
let before = executor
.read(&params, &caller, 1, &request)
.expect("read tx");
assert_eq!(before.succeed_data, r#""tx_hook_after_panic""#);

let mut stx = mock_signed_tx();
stx.raw.request.service_name = "test".to_owned();
stx.raw.request.method = "tx_hook_after_cancel".to_owned();
stx.raw.request.payload = r#""""#.to_owned();

let txs = vec![stx];
let error_resp = executor.exec(Context::new(), &params, &txs);
assert!(error_resp.is_err());
assert!(error_resp
.err()
.expect("err")
.to_string()
.contains("Canceled"));

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""before""#);

let tx_hook_after_cancel = read!(executor, &params, &caller, r#""tx_hook_after_cancel""#);
assert_eq!(
tx_hook_after_cancel.succeed_data,
r#""tx_hook_after_cancel""#
);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""""#);
}

#[bench]
Expand Down
37 changes: 37 additions & 0 deletions framework/src/executor/tests/test_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ impl<SDK: ServiceSDK> TestService<SDK> {
ServiceResponse::from_succeed(())
}

#[cycles(210_00)]
#[write]
fn tx_hook_before_cancel(
&mut self,
ctx: ServiceContext,
_payload: String,
) -> ServiceResponse<()> {
self.sdk.set_value(
"tx_hook_before_cancel".to_owned(),
"tx_hook_before_cancel".to_owned(),
);
ServiceResponse::from_succeed(())
}

#[cycles(210_00)]
#[write]
fn tx_hook_after_cancel(
&mut self,
ctx: ServiceContext,
_payload: String,
) -> ServiceResponse<()> {
self.sdk.set_value(
"tx_hook_after_cancel".to_owned(),
"tx_hook_after_cancel".to_owned(),
);
ctx.cancel("tx_hook_after_cancel".to_owned());
ServiceResponse::from_error(1, "tx_hook_after_cancel".to_owned())
}

#[tx_hook_before]
fn test_tx_hook_before(&mut self, ctx: ServiceContext) {
if ctx.get_service_name() == "test"
Expand All @@ -122,6 +151,10 @@ impl<SDK: ServiceSDK> TestService<SDK> {
panic!("tx hook before");
}

if ctx.get_service_method() == "tx_hook_before_cancel" {
ctx.cancel("tx_hook_before_cancel".to_owned());
}

self.sdk.set_value("before".to_owned(), "before".to_owned());
}

Expand All @@ -137,6 +170,10 @@ impl<SDK: ServiceSDK> TestService<SDK> {
panic!("tx hook before");
}

if ctx.get_service_method() == "tx_hook_after_cancel" {
ctx.cancel("tx_hook_after_cancel".to_owned());
}

self.sdk.set_value("after".to_owned(), "after".to_owned());
}
}
Loading