Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mpm integration-test hangs in rpc-fork mode, update test case on halley chain #3653

Merged
merged 6 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vm/starcoin-transactional-test-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ starcoin-storage = {path = "../../storage"}
starcoin-types = {path = "../../types"}
starcoin-vm-runtime = {path = "../../vm/vm-runtime"}
starcoin-vm-types = {path = "../../vm/types"}
stdlib = {path = "../stdlib"}
stdlib = { path = "../stdlib" }

[dev-dependencies]
datatest-stable = "0.1.1"
Expand Down
28 changes: 13 additions & 15 deletions vm/starcoin-transactional-test-harness/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, Result};
use futures::executor::block_on;
use starcoin_config::{BuiltinNetworkID, ChainNetwork};
use starcoin_crypto::HashValue;
use starcoin_genesis::Genesis;
Expand All @@ -10,8 +11,8 @@ use starcoin_state_api::{ChainStateReader, ChainStateWriter, StateNodeStore};
use starcoin_statedb::ChainStateDB;
use starcoin_types::write_set::WriteSet;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

use jsonrpc_client_transports::RawClient;
use jsonrpc_core::{IoHandler, Params, Value};
Expand All @@ -25,28 +26,27 @@ use crate::fork_state::{MockChainStateAsyncService, MockStateNodeStore};
use crate::remote_state::RemoteRpcAsyncClient;

pub struct MockServer {
server_handle: JoinHandle<()>,
_server_handle: JoinHandle<()>,
}

impl MockServer {
pub fn create_and_start(
chain_api: MockChainApi,
state_api: impl StateApi,
rt: Arc<Runtime>,
) -> Result<(Self, RawClient)> {
let mut io = IoHandler::new();
io.extend_with(ChainApi::to_delegate(chain_api));
io.extend_with(StateApi::to_delegate(state_api));

let (client, server) = local::connect::<RawClient, _, _>(io);
let server_handle = rt.spawn(async move { server.await.unwrap() });
Ok((Self { server_handle }, client))
}
}
let server_handle = thread::spawn(move || block_on(server).unwrap());

impl Drop for MockServer {
fn drop(&mut self) {
self.server_handle.abort();
Ok((
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主要是这里导致卡住?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

之前用 tokio Runtime::spawn 启动 local server 的 server 端,然后 server 内部会用 futures::block_on 去执行 remote Client 的请求, 这样的用法会卡住。

server 内部用 Runtime::handle::block_on 去执行 remote Client 的请求,就会出现 nested Runtime 的问题。

现在是改成用 thread::spawn 启动 local server, server 内部用 Runtime::handle::block_on 其执行请求,就没问题了。

这里删掉 Drop 是因为启动 local server 的 thread::JoinHandle 没找到中断线程的方法,也就是没管 local server 的停机问题了。mpm 的场景下用应该也没啥问题,如果有解决方案当然更好。

Self {
_server_handle: server_handle,
},
client,
))
}
}

Expand Down Expand Up @@ -91,10 +91,8 @@ impl ForkContext {
rt.block_on(async { RemoteRpcAsyncClient::from_url(rpc, block_number).await })?,
);
let state_api_client = Arc::new(remote_async_client.get_state_client().clone());
let root_hash = rt
.block_on(async { state_api_client.get_state_root().await })
.map_err(|e| anyhow!("{}", e))?;
let data_store = Arc::new(MockStateNodeStore::new(state_api_client)?);
let root_hash = remote_async_client.get_fork_state_root();
let data_store = Arc::new(MockStateNodeStore::new(state_api_client, rt.clone()));
let state_db = ChainStateDB::new(data_store.clone(), Some(root_hash));

let fork_nubmer = remote_async_client.get_fork_block_number();
Expand All @@ -117,7 +115,7 @@ impl ForkContext {
let chain_api = MockChainApi::new(chain.clone());
let state_svc = MockChainStateAsyncService::new(data_store.clone(), state_root.clone());
let state_api = StateRpcImpl::new(state_svc, data_store);
let (server, client) = MockServer::create_and_start(chain_api, state_api, rt.clone())?;
let (server, client) = MockServer::create_and_start(chain_api, state_api)?;

Ok(Self {
chain,
Expand Down
27 changes: 17 additions & 10 deletions vm/starcoin-transactional-test-harness/src/fork_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,29 @@ use starcoin_statedb::ChainStateDB;
use starcoin_storage::state_node::StateStorage;
use starcoin_storage::storage::{CodecKVStore, CodecWriteBatch, StorageInstance};

use futures::executor::block_on;
use starcoin_rpc_api::state::StateApiClient;
use starcoin_state_tree::StateNode;
use starcoin_types::access_path::AccessPath;
use starcoin_types::account_state::AccountState;
use starcoin_types::state_set::AccountStateSet;
use tokio::runtime::Runtime;

pub struct MockStateNodeStore {
local_storage: StateStorage,
remote: Arc<StateApiClient>,
rt: Arc<Runtime>,
}

impl MockStateNodeStore {
pub fn new(remote: Arc<StateApiClient>) -> Result<Self> {
pub fn new(remote: Arc<StateApiClient>, rt: Arc<Runtime>) -> Self {
let storage_instance = StorageInstance::new_cache_instance();
let storage = StateStorage::new(storage_instance);

Ok(Self {
Self {
local_storage: storage,
remote,
})
rt,
}
}
}

Expand All @@ -43,14 +45,19 @@ impl StateNodeStore for MockStateNodeStore {
match self.local_storage.get(*hash)? {
Some(sn) => Ok(Some(sn)),
None => {
let blob =
block_on(async move { self.remote.get_state_node_by_node_hash(*hash).await })
.map(|res| res.map(StateNode))
.map_err(|e| anyhow!("{}", e))?;
let client = self.remote.clone();
let handle = self.rt.handle().clone();
let hash = *hash;
let blob = // thread::spawn(move || {
handle.block_on(client.get_state_node_by_node_hash(hash))
// })
// .join()
// .expect("Thread getting StateNode from remote panicked")
.map(|res| res.map(StateNode))
.map_err(|e| anyhow!("{}", e))?;

// Put result to local storage to accelerate the following getting.
if let Some(node) = blob.clone() {
self.put(*hash, node)?;
self.put(hash, node)?;
};
Ok(blob)
}
Expand Down
1 change: 1 addition & 0 deletions vm/starcoin-transactional-test-harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ impl<'a> StarcoinTestAdapter<'a> {
}
}
}

fn handle_new_block(
&mut self,
author: Option<RawAddress>,
Expand Down
23 changes: 5 additions & 18 deletions vm/starcoin-transactional-test-harness/src/remote_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, Result};
use jsonrpc_client_transports::{RawClient, RpcChannel};
use jsonrpc_client_transports::RpcChannel;
use move_binary_format::errors::VMError;
use move_core_types::resolver::{ModuleResolver, ResourceResolver};
use serde_json::Value;
use starcoin_crypto::HashValue;

use starcoin_rpc_api::chain::ChainApiClient;
use starcoin_rpc_api::state::StateApiClient;
use starcoin_rpc_api::types::{BlockView, StateWithProofView};
use starcoin_rpc_api::Params;
use starcoin_state_api::ChainStateWriter;
use starcoin_types::access_path::{AccessPath, DataPath};
use starcoin_types::account_address::AccountAddress;
Expand Down Expand Up @@ -190,7 +188,6 @@ where
pub struct RemoteRpcAsyncClient {
state_client: StateApiClient,
chain_client: ChainApiClient,
raw_client: RawClient,
state_root: HashValue,
fork_number: u64,
}
Expand Down Expand Up @@ -219,11 +216,9 @@ impl RemoteRpcAsyncClient {
}
};
let state_client: starcoin_rpc_api::state::StateApiClient = rpc_channel.clone().into();
let raw_client = rpc_channel.clone().into();
Ok(Self {
state_client,
chain_client,
raw_client,
state_root,
fork_number,
})
Expand Down Expand Up @@ -281,13 +276,6 @@ impl RemoteRpcAsyncClient {
Ok(state_with_proof.state.map(|v| v.0))
}

pub async fn call_api(&self, method: &str, params: Params) -> Result<Value> {
self.raw_client
.call_method(method, params)
.await
.map_err(|e| anyhow!(format!("{}", e)))
}

pub fn get_chain_client(&self) -> &ChainApiClient {
&self.chain_client
}
Expand All @@ -299,6 +287,10 @@ impl RemoteRpcAsyncClient {
pub fn get_fork_block_number(&self) -> u64 {
self.fork_number
}

pub fn get_fork_state_root(&self) -> HashValue {
self.state_root
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -337,11 +329,6 @@ impl RemoteViewer {
let handle = self.rt.handle().clone();
handle.block_on(self.svc.get_modules_async(addr))
}

pub fn call_api(&self, method: &str, params: Params) -> Result<Value> {
let handle = self.rt.handle().clone();
handle.block_on(self.svc.call_api(method, params))
}
}

impl ModuleResolver for RemoteViewer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

//# run --signers creator --args {{$.call-api[0].header.number}}u64 --args {{$.call-api[0].header.block_hash}}
script{
use StarcoinFramework::Vector;
use StarcoinFramework::Vector;
fun main(_sender: signer, block_number: u64, block_hash: vector<u8>){
assert!(block_number == 1, 1000);
assert!(Vector::length(&block_hash) == 32, 1001);
}
assert!(block_number == 1, 1000);
assert!(Vector::length(&block_hash) == 32, 1001);
}
}