From ed0cce2ead1fd177fae506042a09dee38b639792 Mon Sep 17 00:00:00 2001 From: "zero.qn" Date: Thu, 11 Jun 2020 16:58:20 +0800 Subject: [PATCH 1/3] feat(executor): allow cancel execution units through context Execution Flows: tx hook before units -----> tx unit -----> tx hook after units Once canceled, current execution state will be reverted, following units will not be executed. State changed before cacneled will be saved. --- framework/src/executor/mod.rs | 32 +++- framework/src/executor/tests/mod.rs | 183 ++++++++++++++----- framework/src/executor/tests/test_service.rs | 37 ++++ protocol/src/types/service_context.rs | 25 +++ 4 files changed, 223 insertions(+), 54 deletions(-) diff --git a/framework/src/executor/mod.rs b/framework/src/executor/mod.rs index 790081056..f47c1cfc3 100644 --- a/framework/src/executor/mod.rs +++ b/framework/src/executor/mod.rs @@ -107,19 +107,35 @@ impl CommitHooks { } // bagua kan 101 :) - fn kan R, R>(states: Rc>, hook: H) -> ProtocolResult<()> { + fn kan R, R>( + context: ServiceContext, + states: Rc>, + hook: H, + ) -> ProtocolResult<()> { match panic::catch_unwind(AssertUnwindSafe(hook)) { - Ok(_) => states.stash(), + Ok(_) if !context.canceled() => states.stash(), + Ok(_) => { + // An reason must be passed to cancel + let reason = context.cancel_reason(); + debug_assert!(reason.is_some()); + + states.revert_cache()?; + + Err(ExecutorError::Canceled { + service: context.get_service_name().to_owned(), + reason, + } + .into()) + } Err(_) => states.revert_cache(), } } } impl TxHooks for CommitHooks { - // TODO: support abort execution fn before(&mut self, _context: Context, service_context: ServiceContext) -> ProtocolResult<()> { for hook in self.inner.iter_mut() { - Self::kan(Rc::clone(&self.states), || { + Self::kan(service_context.clone(), Rc::clone(&self.states), || { hook.tx_hook_before_(service_context.clone()) })?; } @@ -129,7 +145,7 @@ impl TxHooks for CommitHooks { fn after(&mut self, _context: Context, service_context: ServiceContext) -> ProtocolResult<()> { for hook in self.inner.iter_mut() { - Self::kan(Rc::clone(&self.states), || { + Self::kan(service_context.clone(), Rc::clone(&self.states), || { hook.tx_hook_after_(service_context.clone()) })?; } @@ -512,6 +528,12 @@ pub enum ExecutorError { QueryService(String), #[display(fmt = "Call service failed: {:?}", _0)] CallService(String), + + #[display(fmt = "service {} canceled {:?}", service, reason)] + Canceled { + service: String, + reason: Option, + }, } impl std::error::Error for ExecutorError {} diff --git a/framework/src/executor/tests/mod.rs b/framework/src/executor/tests/mod.rs index fdbb91cd6..9724e4347 100644 --- a/framework/src/executor/tests/mod.rs +++ b/framework/src/executor/tests/mod.rs @@ -332,24 +332,11 @@ fn test_commit_tx_hook_use_panic_tx() { assert!(error_resp.is_err()); let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap(); - let request = TransactionRequest { - service_name: "test".to_owned(), - method: "test_read".to_owned(), - payload: r#""before""#.to_owned(), - }; - let before = executor - .read(¶ms, &caller, 1, &request) - .expect("read before"); + + let before = read!(executor, ¶ms, &caller, r#""before""#); assert_eq!(before.succeed_data, r#""before""#); - let request = TransactionRequest { - service_name: "test".to_owned(), - method: "test_read".to_owned(), - payload: r#""after""#.to_owned(), - }; - let after = executor - .read(¶ms, &caller, 1, &request) - .expect("read after"); + let after = read!(executor, ¶ms, &caller, r#""after""#); assert_eq!(after.succeed_data, r#""after""#); } @@ -394,24 +381,17 @@ fn test_tx_hook_before_panic() { assert!(error_resp.is_ok()); let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap(); - let request = TransactionRequest { - service_name: "test".to_owned(), - method: "test_read".to_owned(), - payload: r#""tx_hook_before_panic""#.to_owned(), - }; - let before = executor - .read(¶ms, &caller, 1, &request) - .expect("read tx"); - assert_eq!(before.succeed_data, r#""tx_hook_before_panic""#); - let request = TransactionRequest { - service_name: "test".to_owned(), - method: "test_read".to_owned(), - payload: r#""after""#.to_owned(), - }; - let after = executor - .read(¶ms, &caller, 1, &request) - .expect("read after"); + let before = read!(executor, ¶ms, &caller, r#""before""#); + assert_eq!(before.succeed_data, r#""""#); + + let tx_hook_before_panic = read!(executor, ¶ms, &caller, r#""tx_hook_before_panic""#); + assert_eq!( + tx_hook_before_panic.succeed_data, + r#""tx_hook_before_panic""# + ); + + let after = read!(executor, ¶ms, &caller, r#""after""#); assert_eq!(after.succeed_data, r#""after""#); } @@ -456,25 +436,130 @@ fn test_tx_hook_after_panic() { assert!(error_resp.is_ok()); let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap(); - let request = TransactionRequest { - service_name: "test".to_owned(), - method: "test_read".to_owned(), - payload: r#""before""#.to_owned(), + + let before = read!(executor, ¶ms, &caller, r#""before""#); + assert_eq!(before.succeed_data, r#""before""#); + + let tx_hook_after_panic = read!(executor, ¶ms, &caller, r#""tx_hook_after_panic""#); + assert_eq!(tx_hook_after_panic.succeed_data, r#""tx_hook_after_panic""#); + + let after = read!(executor, ¶ms, &caller, r#""after""#); + assert_eq!(after.succeed_data, r#""""#); +} + +#[test] +fn test_tx_hook_before_cancel() { + let toml_str = include_str!("./genesis_services.toml"); + let genesis: Genesis = toml::from_str(toml_str).unwrap(); + + let db = Arc::new(MemoryDB::new(false)); + + let root = ServiceExecutor::create_genesis( + genesis.services, + Arc::clone(&db), + Arc::new(MockStorage {}), + Arc::new(MockServiceMapping {}), + ) + .unwrap(); + + let mut executor = ServiceExecutor::with_root( + root.clone(), + Arc::clone(&db), + Arc::new(MockStorage {}), + Arc::new(MockServiceMapping {}), + ) + .unwrap(); + + let params = ExecutorParams { + state_root: root, + height: 1, + timestamp: 0, + cycles_limit: std::u64::MAX, }; - let after = executor - .read(¶ms, &caller, 1, &request) - .expect("read before"); - assert_eq!(after.succeed_data, r#""before""#); - let request = TransactionRequest { - service_name: "test".to_owned(), - method: "test_read".to_owned(), - payload: r#""tx_hook_after_panic""#.to_owned(), + let mut stx = mock_signed_tx(); + stx.raw.request.service_name = "test".to_owned(); + stx.raw.request.method = "tx_hook_before_cancel".to_owned(); + stx.raw.request.payload = r#""""#.to_owned(); + + let txs = vec![stx]; + let error_resp = executor.exec(Context::new(), ¶ms, &txs); + assert!(error_resp.is_err()); + assert!(error_resp + .err() + .expect("err") + .to_string() + .contains("Canceled")); + + let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap(); + + let before = read!(executor, ¶ms, &caller, r#""before""#); + assert_eq!(before.succeed_data, r#""""#); + + let tx_hook_before_cancel = read!(executor, ¶ms, &caller, r#""tx_hook_before_cancel""#); + assert_eq!(tx_hook_before_cancel.succeed_data, r#""""#); + + let after = read!(executor, ¶ms, &caller, r#""after""#); + assert_eq!(after.succeed_data, r#""""#); +} + +#[test] +fn test_tx_hook_after_cancel() { + let toml_str = include_str!("./genesis_services.toml"); + let genesis: Genesis = toml::from_str(toml_str).unwrap(); + + let db = Arc::new(MemoryDB::new(false)); + + let root = ServiceExecutor::create_genesis( + genesis.services, + Arc::clone(&db), + Arc::new(MockStorage {}), + Arc::new(MockServiceMapping {}), + ) + .unwrap(); + + let mut executor = ServiceExecutor::with_root( + root.clone(), + Arc::clone(&db), + Arc::new(MockStorage {}), + Arc::new(MockServiceMapping {}), + ) + .unwrap(); + + let params = ExecutorParams { + state_root: root, + height: 1, + timestamp: 0, + cycles_limit: std::u64::MAX, }; - let before = executor - .read(¶ms, &caller, 1, &request) - .expect("read tx"); - assert_eq!(before.succeed_data, r#""tx_hook_after_panic""#); + + let mut stx = mock_signed_tx(); + stx.raw.request.service_name = "test".to_owned(); + stx.raw.request.method = "tx_hook_after_cancel".to_owned(); + stx.raw.request.payload = r#""""#.to_owned(); + + let txs = vec![stx]; + let error_resp = executor.exec(Context::new(), ¶ms, &txs); + assert!(error_resp.is_err()); + assert!(error_resp + .err() + .expect("err") + .to_string() + .contains("Canceled")); + + let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap(); + + let before = read!(executor, ¶ms, &caller, r#""before""#); + assert_eq!(before.succeed_data, r#""before""#); + + let tx_hook_after_cancel = read!(executor, ¶ms, &caller, r#""tx_hook_after_cancel""#); + assert_eq!( + tx_hook_after_cancel.succeed_data, + r#""tx_hook_after_cancel""# + ); + + let after = read!(executor, ¶ms, &caller, r#""after""#); + assert_eq!(after.succeed_data, r#""""#); } #[bench] diff --git a/framework/src/executor/tests/test_service.rs b/framework/src/executor/tests/test_service.rs index f918a5939..776c60f7b 100644 --- a/framework/src/executor/tests/test_service.rs +++ b/framework/src/executor/tests/test_service.rs @@ -110,6 +110,35 @@ impl TestService { ServiceResponse::from_succeed(()) } + #[cycles(210_00)] + #[write] + fn tx_hook_before_cancel( + &mut self, + ctx: ServiceContext, + _payload: String, + ) -> ServiceResponse<()> { + self.sdk.set_value( + "tx_hook_before_cancel".to_owned(), + "tx_hook_before_cancel".to_owned(), + ); + ServiceResponse::from_succeed(()) + } + + #[cycles(210_00)] + #[write] + fn tx_hook_after_cancel( + &mut self, + ctx: ServiceContext, + _payload: String, + ) -> ServiceResponse<()> { + self.sdk.set_value( + "tx_hook_after_cancel".to_owned(), + "tx_hook_after_cancel".to_owned(), + ); + ctx.cancel("tx_hook_after_cancel".to_owned()); + ServiceResponse::from_error(1, "tx_hook_after_cancel".to_owned()) + } + #[tx_hook_before] fn test_tx_hook_before(&mut self, ctx: ServiceContext) { if ctx.get_service_name() == "test" @@ -122,6 +151,10 @@ impl TestService { panic!("tx hook before"); } + if ctx.get_service_method() == "tx_hook_before_cancel" { + ctx.cancel("tx_hook_before_cancel".to_owned()); + } + self.sdk.set_value("before".to_owned(), "before".to_owned()); } @@ -137,6 +170,10 @@ impl TestService { panic!("tx hook before"); } + if ctx.get_service_method() == "tx_hook_after_cancel" { + ctx.cancel("tx_hook_after_cancel".to_owned()); + } + self.sdk.set_value("after".to_owned(), "after".to_owned()); } } diff --git a/protocol/src/types/service_context.rs b/protocol/src/types/service_context.rs index 081b85b91..39148855d 100644 --- a/protocol/src/types/service_context.rs +++ b/protocol/src/types/service_context.rs @@ -24,6 +24,8 @@ pub struct ServiceContextParams { pub events: Rc>>, } +pub type Reason = String; + #[derive(Debug, Clone, PartialEq)] pub struct ServiceContext { tx_hash: Option, @@ -39,6 +41,7 @@ pub struct ServiceContext { extra: Option, timestamp: u64, events: Rc>>, + canceled: Rc>>, } impl ServiceContext { @@ -57,6 +60,7 @@ impl ServiceContext { extra: params.extra, timestamp: params.timestamp, events: params.events, + canceled: Rc::new(RefCell::new(None)), } } @@ -81,6 +85,7 @@ impl ServiceContext { extra, timestamp: context.get_timestamp(), events: Rc::clone(&context.events), + canceled: Rc::clone(&context.canceled), } } @@ -145,6 +150,18 @@ impl ServiceContext { self.timestamp } + pub fn canceled(&self) -> bool { + self.canceled.borrow().is_some() + } + + pub fn cancel_reason(&self) -> Option { + self.canceled.borrow().to_owned() + } + + pub fn cancel(&self, reason: String) { + *self.canceled.borrow_mut() = Some(reason); + } + pub fn emit_event(&self, message: String) { self.events.borrow_mut().push(Event { service: self.service_name.clone(), @@ -208,5 +225,13 @@ mod tests { assert_eq!(ctx.get_service_name(), "service_name"); assert_eq!(ctx.get_service_method(), "service_method"); assert_eq!(ctx.get_payload(), "service_payload"); + + let bro = ctx.clone(); + let reason = "hurry up, bus is about to leave".to_owned(); + + ctx.cancel(reason.clone()); + assert!(ctx.canceled()); + assert!(bro.canceled()); + assert_eq!(bro.cancel_reason(), Some(reason)); } } From ca311c93c86d63cb471a8cbf0edd93f652f42c3f Mon Sep 17 00:00:00 2001 From: "zero.qn" Date: Thu, 11 Jun 2020 17:16:13 +0800 Subject: [PATCH 2/3] change(executor): cancel will not revert current execution unit changes We can still use panic to revert changes --- framework/src/executor/mod.rs | 4 ++-- framework/src/executor/tests/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/framework/src/executor/mod.rs b/framework/src/executor/mod.rs index f47c1cfc3..88f4c6b51 100644 --- a/framework/src/executor/mod.rs +++ b/framework/src/executor/mod.rs @@ -115,12 +115,12 @@ impl CommitHooks { match panic::catch_unwind(AssertUnwindSafe(hook)) { Ok(_) if !context.canceled() => states.stash(), Ok(_) => { + states.stash()?; + // An reason must be passed to cancel let reason = context.cancel_reason(); debug_assert!(reason.is_some()); - states.revert_cache()?; - Err(ExecutorError::Canceled { service: context.get_service_name().to_owned(), reason, diff --git a/framework/src/executor/tests/mod.rs b/framework/src/executor/tests/mod.rs index 9724e4347..427e94cdc 100644 --- a/framework/src/executor/tests/mod.rs +++ b/framework/src/executor/tests/mod.rs @@ -494,7 +494,7 @@ fn test_tx_hook_before_cancel() { let caller = Address::from_hex("0xf8389d774afdad8755ef8e629e5a154fddc6325a").unwrap(); let before = read!(executor, ¶ms, &caller, r#""before""#); - assert_eq!(before.succeed_data, r#""""#); + assert_eq!(before.succeed_data, r#""before""#); let tx_hook_before_cancel = read!(executor, ¶ms, &caller, r#""tx_hook_before_cancel""#); assert_eq!(tx_hook_before_cancel.succeed_data, r#""""#); From 8c325e8bb1583825e70c24c6107c7bd8cdadcec5 Mon Sep 17 00:00:00 2001 From: "zero.qn" Date: Sun, 28 Jun 2020 15:06:47 +0800 Subject: [PATCH 3/3] fix(executor): test failed after rebase --- framework/src/executor/tests/mod.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/framework/src/executor/tests/mod.rs b/framework/src/executor/tests/mod.rs index 427e94cdc..cce85d6c9 100644 --- a/framework/src/executor/tests/mod.rs +++ b/framework/src/executor/tests/mod.rs @@ -25,6 +25,20 @@ use protocol::ProtocolResult; use crate::executor::ServiceExecutor; use test_service::TestService; +macro_rules! read { + ($executor:expr, $params:expr, $caller:expr, $payload:expr) => {{ + let request = TransactionRequest { + service_name: "test".to_owned(), + method: "test_read".to_owned(), + payload: $payload.to_owned(), + }; + + $executor + .read($params, $caller, 1, &request) + .expect(&format!("read {}", $payload)) + }}; +} + pub const PUB_KEY_STR: &str = "031288a6788678c25952eba8693b2f278f66e2187004b64ac09416d07f83f96d5b"; #[test] @@ -475,6 +489,7 @@ fn test_tx_hook_before_cancel() { height: 1, timestamp: 0, cycles_limit: std::u64::MAX, + proposer: Address::from_hash(Hash::from_empty()).unwrap(), }; let mut stx = mock_signed_tx(); @@ -531,6 +546,7 @@ fn test_tx_hook_after_cancel() { height: 1, timestamp: 0, cycles_limit: std::u64::MAX, + proposer: Address::from_hash(Hash::from_empty()).unwrap(), }; let mut stx = mock_signed_tx();