Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat(executor): allow cancel execution units through context
Browse files Browse the repository at this point in the history
Execution Flows:

tx hook before units -----> tx unit -----> tx hook after units

Once canceled, current execution state will be reverted, following
units will not be executed. State changed before cacneled will be
saved.
  • Loading branch information
zeroqn committed Jun 28, 2020
1 parent 9f93455 commit ac01dbb
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 54 deletions.
32 changes: 27 additions & 5 deletions framework/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,35 @@ impl<DB: TrieDB> CommitHooks<DB> {
}

// bagua kan 101 :)
fn kan<H: FnOnce() -> R, R>(states: Rc<ServiceStateMap<DB>>, hook: H) -> ProtocolResult<()> {
fn kan<H: FnOnce() -> R, R>(
context: ServiceContext,
states: Rc<ServiceStateMap<DB>>,
hook: H,
) -> ProtocolResult<()> {
match panic::catch_unwind(AssertUnwindSafe(hook)) {
Ok(_) => states.stash(),
Ok(_) if !context.canceled() => states.stash(),
Ok(_) => {
// An reason must be passed to cancel
let reason = context.cancel_reason();
debug_assert!(reason.is_some());

states.revert_cache()?;

Err(ExecutorError::Canceled {
service: context.get_service_name().to_owned(),
reason,
}
.into())
}
Err(_) => states.revert_cache(),
}
}
}

impl<DB: TrieDB> TxHooks for CommitHooks<DB> {
// TODO: support abort execution
fn before(&mut self, _context: Context, service_context: ServiceContext) -> ProtocolResult<()> {
for hook in self.inner.iter_mut() {
Self::kan(Rc::clone(&self.states), || {
Self::kan(service_context.clone(), Rc::clone(&self.states), || {
hook.tx_hook_before_(service_context.clone())
})?;
}
Expand All @@ -129,7 +145,7 @@ impl<DB: TrieDB> TxHooks for CommitHooks<DB> {

fn after(&mut self, _context: Context, service_context: ServiceContext) -> ProtocolResult<()> {
for hook in self.inner.iter_mut() {
Self::kan(Rc::clone(&self.states), || {
Self::kan(service_context.clone(), Rc::clone(&self.states), || {
hook.tx_hook_after_(service_context.clone())
})?;
}
Expand Down Expand Up @@ -512,6 +528,12 @@ pub enum ExecutorError {
QueryService(String),
#[display(fmt = "Call service failed: {:?}", _0)]
CallService(String),

#[display(fmt = "service {} canceled {:?}", service, reason)]
Canceled {
service: String,
reason: Option<String>,
},
}

impl std::error::Error for ExecutorError {}
Expand Down
183 changes: 134 additions & 49 deletions framework/src/executor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,24 +332,11 @@ fn test_commit_tx_hook_use_panic_tx() {
assert!(error_resp.is_err());

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();
let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""before""#.to_owned(),
};
let before = executor
.read(&params, &caller, 1, &request)
.expect("read before");

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""before""#);

let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""after""#.to_owned(),
};
let after = executor
.read(&params, &caller, 1, &request)
.expect("read after");
let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""after""#);
}

Expand Down Expand Up @@ -394,24 +381,17 @@ fn test_tx_hook_before_panic() {
assert!(error_resp.is_ok());

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();
let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""tx_hook_before_panic""#.to_owned(),
};
let before = executor
.read(&params, &caller, 1, &request)
.expect("read tx");
assert_eq!(before.succeed_data, r#""tx_hook_before_panic""#);

let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""after""#.to_owned(),
};
let after = executor
.read(&params, &caller, 1, &request)
.expect("read after");
let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""""#);

let tx_hook_before_panic = read!(executor, &params, &caller, r#""tx_hook_before_panic""#);
assert_eq!(
tx_hook_before_panic.succeed_data,
r#""tx_hook_before_panic""#
);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""after""#);
}

Expand Down Expand Up @@ -456,25 +436,130 @@ fn test_tx_hook_after_panic() {
assert!(error_resp.is_ok());

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();
let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""before""#.to_owned(),

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""before""#);

let tx_hook_after_panic = read!(executor, &params, &caller, r#""tx_hook_after_panic""#);
assert_eq!(tx_hook_after_panic.succeed_data, r#""tx_hook_after_panic""#);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""""#);
}

#[test]
fn test_tx_hook_before_cancel() {
let toml_str = include_str!("./genesis_services.toml");
let genesis: Genesis = toml::from_str(toml_str).unwrap();

let db = Arc::new(MemoryDB::new(false));

let root = ServiceExecutor::create_genesis(
genesis.services,
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let mut executor = ServiceExecutor::with_root(
root.clone(),
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let params = ExecutorParams {
state_root: root,
height: 1,
timestamp: 0,
cycles_limit: std::u64::MAX,
};
let after = executor
.read(&params, &caller, 1, &request)
.expect("read before");
assert_eq!(after.succeed_data, r#""before""#);

let request = TransactionRequest {
service_name: "test".to_owned(),
method: "test_read".to_owned(),
payload: r#""tx_hook_after_panic""#.to_owned(),
let mut stx = mock_signed_tx();
stx.raw.request.service_name = "test".to_owned();
stx.raw.request.method = "tx_hook_before_cancel".to_owned();
stx.raw.request.payload = r#""""#.to_owned();

let txs = vec![stx];
let error_resp = executor.exec(Context::new(), &params, &txs);
assert!(error_resp.is_err());
assert!(error_resp
.err()
.expect("err")
.to_string()
.contains("Canceled"));

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""""#);

let tx_hook_before_cancel = read!(executor, &params, &caller, r#""tx_hook_before_cancel""#);
assert_eq!(tx_hook_before_cancel.succeed_data, r#""""#);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""""#);
}

#[test]
fn test_tx_hook_after_cancel() {
let toml_str = include_str!("./genesis_services.toml");
let genesis: Genesis = toml::from_str(toml_str).unwrap();

let db = Arc::new(MemoryDB::new(false));

let root = ServiceExecutor::create_genesis(
genesis.services,
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let mut executor = ServiceExecutor::with_root(
root.clone(),
Arc::clone(&db),
Arc::new(MockStorage {}),
Arc::new(MockServiceMapping {}),
)
.unwrap();

let params = ExecutorParams {
state_root: root,
height: 1,
timestamp: 0,
cycles_limit: std::u64::MAX,
};
let before = executor
.read(&params, &caller, 1, &request)
.expect("read tx");
assert_eq!(before.succeed_data, r#""tx_hook_after_panic""#);

let mut stx = mock_signed_tx();
stx.raw.request.service_name = "test".to_owned();
stx.raw.request.method = "tx_hook_after_cancel".to_owned();
stx.raw.request.payload = r#""""#.to_owned();

let txs = vec![stx];
let error_resp = executor.exec(Context::new(), &params, &txs);
assert!(error_resp.is_err());
assert!(error_resp
.err()
.expect("err")
.to_string()
.contains("Canceled"));

let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap();

let before = read!(executor, &params, &caller, r#""before""#);
assert_eq!(before.succeed_data, r#""before""#);

let tx_hook_after_cancel = read!(executor, &params, &caller, r#""tx_hook_after_cancel""#);
assert_eq!(
tx_hook_after_cancel.succeed_data,
r#""tx_hook_after_cancel""#
);

let after = read!(executor, &params, &caller, r#""after""#);
assert_eq!(after.succeed_data, r#""""#);
}

#[bench]
Expand Down
37 changes: 37 additions & 0 deletions framework/src/executor/tests/test_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ impl<SDK: ServiceSDK> TestService<SDK> {
ServiceResponse::from_succeed(())
}

#[cycles(210_00)]
#[write]
fn tx_hook_before_cancel(
&mut self,
ctx: ServiceContext,
_payload: String,
) -> ServiceResponse<()> {
self.sdk.set_value(
"tx_hook_before_cancel".to_owned(),
"tx_hook_before_cancel".to_owned(),
);
ServiceResponse::from_succeed(())
}

#[cycles(210_00)]
#[write]
fn tx_hook_after_cancel(
&mut self,
ctx: ServiceContext,
_payload: String,
) -> ServiceResponse<()> {
self.sdk.set_value(
"tx_hook_after_cancel".to_owned(),
"tx_hook_after_cancel".to_owned(),
);
ctx.cancel("tx_hook_after_cancel".to_owned());
ServiceResponse::from_error(1, "tx_hook_after_cancel".to_owned())
}

#[tx_hook_before]
fn test_tx_hook_before(&mut self, ctx: ServiceContext) {
if ctx.get_service_name() == "test"
Expand All @@ -122,6 +151,10 @@ impl<SDK: ServiceSDK> TestService<SDK> {
panic!("tx hook before");
}

if ctx.get_service_method() == "tx_hook_before_cancel" {
ctx.cancel("tx_hook_before_cancel".to_owned());
}

self.sdk.set_value("before".to_owned(), "before".to_owned());
}

Expand All @@ -137,6 +170,10 @@ impl<SDK: ServiceSDK> TestService<SDK> {
panic!("tx hook before");
}

if ctx.get_service_method() == "tx_hook_after_cancel" {
ctx.cancel("tx_hook_after_cancel".to_owned());
}

self.sdk.set_value("after".to_owned(), "after".to_owned());
}
}
25 changes: 25 additions & 0 deletions protocol/src/types/service_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct ServiceContextParams {
pub events: Rc<RefCell<Vec<Event>>>,
}

pub type Reason = String;

#[derive(Debug, Clone, PartialEq)]
pub struct ServiceContext {
tx_hash: Option<Hash>,
Expand All @@ -39,6 +41,7 @@ pub struct ServiceContext {
extra: Option<Bytes>,
timestamp: u64,
events: Rc<RefCell<Vec<Event>>>,
canceled: Rc<RefCell<Option<Reason>>>,
}

impl ServiceContext {
Expand All @@ -57,6 +60,7 @@ impl ServiceContext {
extra: params.extra,
timestamp: params.timestamp,
events: params.events,
canceled: Rc::new(RefCell::new(None)),
}
}

Expand All @@ -81,6 +85,7 @@ impl ServiceContext {
extra,
timestamp: context.get_timestamp(),
events: Rc::clone(&context.events),
canceled: Rc::clone(&context.canceled),
}
}

Expand Down Expand Up @@ -145,6 +150,18 @@ impl ServiceContext {
self.timestamp
}

pub fn canceled(&self) -> bool {
self.canceled.borrow().is_some()
}

pub fn cancel_reason(&self) -> Option<Reason> {
self.canceled.borrow().to_owned()
}

pub fn cancel(&self, reason: String) {
*self.canceled.borrow_mut() = Some(reason);
}

pub fn emit_event(&self, message: String) {
self.events.borrow_mut().push(Event {
service: self.service_name.clone(),
Expand Down Expand Up @@ -208,5 +225,13 @@ mod tests {
assert_eq!(ctx.get_service_name(), "service_name");
assert_eq!(ctx.get_service_method(), "service_method");
assert_eq!(ctx.get_payload(), "service_payload");

let bro = ctx.clone();
let reason = "hurry up, bus is about to leave".to_owned();

ctx.cancel(reason.clone());
assert!(ctx.canceled());
assert!(bro.canceled());
assert_eq!(bro.cancel_reason(), Some(reason));
}
}

0 comments on commit ac01dbb

Please sign in to comment.