Skip to content

Commit

Permalink
runtime: Refactor block_on to use local runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Jan 13, 2020
1 parent 66b3306 commit 39f35c4
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 532 deletions.
675 changes: 352 additions & 323 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion chain/ethereum/tests/network_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn run_network_indexer(
.map_err(|_| ())
.forward(event_sink.sink_map_err(|_| ()))
.map(|_| ())
.timeout(timeout)
.timeout(timeout),
);

future::ok((chains, event_stream.collect()))
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl SubgraphInstanceManager {
// Subgraph instance shutdown senders
let instances: SharedInstanceKeepAliveMap = Default::default();

graph::spawn(receiver.compat().try_for_each(move |event| {
graph::spawn_blocking(receiver.compat().try_for_each(move |event| {
use self::SubgraphAssignmentProviderEvent::*;

match event {
Expand Down
4 changes: 1 addition & 3 deletions graph/src/log/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use chrono::prelude::{SecondsFormat, Utc};
use futures::Future;
use futures03::{FutureExt, TryFutureExt};
use futures03::TryFutureExt;
use reqwest;
use reqwest::Client;
use serde::ser::Serializer as SerdeSerializer;
Expand Down Expand Up @@ -183,7 +182,6 @@ impl ElasticDrain {
}

fn periodically_flush_logs(&self) {
use futures03::compat::Future01CompatExt;
use futures03::stream::StreamExt;

let flush_logger = self.error_logger.clone();
Expand Down
8 changes: 7 additions & 1 deletion runtime/wasm/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ where
subgraph_id: SubgraphDeploymentId,
metrics: Arc<HostMetrics>,
) -> Result<Sender<Self::Req>, Error> {
crate::mapping::spawn_module(parsed_module, logger, subgraph_id, metrics)
crate::mapping::spawn_module(
parsed_module,
logger,
subgraph_id,
metrics,
tokio::runtime::Handle::current(),
)
}

fn build(
Expand Down
19 changes: 4 additions & 15 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ impl HostExports {
/// Returns `Ok(None)` if the call was reverted.
pub(crate) fn ethereum_call(
&self,
task_sink: &mut impl Sink<SinkItem = Box<dyn Future<Item = (), Error = ()> + Send>>,
logger: &Logger,
block: &LightEthereumBlock,
unresolved_call: UnresolvedContractCall,
Expand Down Expand Up @@ -349,7 +348,6 @@ impl HostExports {
pub(crate) fn ipfs_cat(
&self,
logger: &Logger,
task_sink: &mut impl Sink<SinkItem = Box<dyn Future<Item = (), Error = ()> + Send>>,
link: String,
) -> Result<Vec<u8>, HostExportError<impl ExportError>> {
block_on(
Expand All @@ -366,28 +364,20 @@ impl HostExports {
// which is identical to `module` when it was first started. The signature
// of the callback must be `callback(JSONValue, Value)`, and the `userData`
// parameter is passed to the callback without any changes
pub(crate) fn ipfs_map<U>(
pub(crate) fn ipfs_map(
&self,
module: &WasmiModule<U>,
module: &WasmiModule,
link: String,
callback: &str,
user_data: store::Value,
flags: Vec<String>,
) -> Result<Vec<BlockState>, HostExportError<impl ExportError>>
where
U: Sink<SinkItem = Box<dyn Future<Item = (), Error = ()> + Send>>
+ Clone
+ Send
+ Sync
+ 'static,
{
) -> Result<Vec<BlockState>, HostExportError<impl ExportError>> {
const JSON_FLAG: &str = "json";
if !flags.contains(&JSON_FLAG.to_string()) {
return Err(HostExportError(format!("Flags must contain 'json'")));
}

let host_metrics = module.host_metrics.clone();
let task_sink = module.task_sink.clone();
let valid_module = module.valid_module.clone();
let ctx = module.ctx.clone();
let callback = callback.to_owned();
Expand All @@ -409,7 +399,6 @@ impl HostExports {
let module = WasmiModule::from_valid_module_with_ctx(
valid_module.clone(),
ctx.clone(),
task_sink.clone(),
host_metrics.clone(),
)?;
let result =
Expand Down Expand Up @@ -662,5 +651,5 @@ fn test_string_to_h160_with_0x() {
fn block_on<I: Send + 'static, ER: Send + 'static>(
future: impl Future<Item = I, Error = ER> + Send + 'static,
) -> Result<I, ER> {
futures03::executor::block_on(tokio::spawn(future.compat())).unwrap()
tokio::spawn(future.compat()).compat().wait().unwrap()
}
137 changes: 64 additions & 73 deletions runtime/wasm/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub fn spawn_module(
logger: Logger,
subgraph_id: SubgraphDeploymentId,
host_metrics: Arc<HostMetrics>,
runtime: tokio::runtime::Handle,
) -> Result<mpsc::Sender<MappingRequest>, Error> {
let valid_module = Arc::new(ValidModule::new(parsed_module)?);

Expand All @@ -23,86 +24,76 @@ pub fn spawn_module(
// wasmi modules are not `Send` therefore they cannot be scheduled by
// the regular tokio executor, so we create a dedicated thread.
//
// This thread can spawn tasks on the runtime by sending them to
// `task_receiver`.
let (task_sender, task_receiver) =
mpsc::channel::<Box<dyn Future<Item = (), Error = ()> + Send>>(100);
graph::spawn(task_receiver.compat().try_for_each(|f| {
graph::spawn(f.compat());
futures03::future::ok(())
}));

// Spawn a dedicated thread for the runtime.
//
// In case of failure, this thread may panic or simply terminate,
// dropping the `mapping_request_receiver` which ultimately causes the
// subgraph to fail the next time it tries to handle an event.
let conf =
thread::Builder::new().name(format!("mapping-{}-{}", &subgraph_id, uuid::Uuid::new_v4()));
conf.spawn(move || {
// Pass incoming triggers to the WASM module and return entity changes;
// Stop when canceled because all RuntimeHosts and their senders were dropped.
match mapping_request_receiver
.map_err(|()| unreachable!())
.for_each(move |request| -> Result<(), Error> {
let MappingRequest {
ctx,
trigger,
result_sender,
} = request;

// Start the WASMI module runtime.
let section = host_metrics.stopwatch.start_section("module_init");
let module = WasmiModule::from_valid_module_with_ctx(
valid_module.clone(),
ctx,
task_sender.clone(),
host_metrics.clone(),
)?;
section.end();

let section = host_metrics.stopwatch.start_section("run_handler");
let result = match trigger {
MappingTrigger::Log {
transaction,
log,
params,
handler,
} => module.handle_ethereum_log(
handler.handler.as_str(),
transaction,
log,
params,
),
MappingTrigger::Call {
transaction,
call,
inputs,
outputs,
handler,
} => module.handle_ethereum_call(
handler.handler.as_str(),
transaction,
call,
inputs,
outputs,
),
MappingTrigger::Block { handler } => {
module.handle_ethereum_block(handler.handler.as_str())
}
};
section.end();

result_sender
.send((result, future::ok(Instant::now())))
.map_err(|_| err_msg("WASM module result receiver dropped."))
})
.wait()
{
Ok(()) => debug!(logger, "Subgraph stopped, WASM runtime thread terminated"),
Err(e) => debug!(logger, "WASM runtime thread terminated abnormally";
runtime.enter(|| {
// Pass incoming triggers to the WASM module and return entity changes;
// Stop when canceled because all RuntimeHosts and their senders were dropped.
match mapping_request_receiver
.map_err(|()| unreachable!())
.for_each(move |request| -> Result<(), Error> {
let MappingRequest {
ctx,
trigger,
result_sender,
} = request;

// Start the WASMI module runtime.
let section = host_metrics.stopwatch.start_section("module_init");
let module = WasmiModule::from_valid_module_with_ctx(
valid_module.clone(),
ctx,
host_metrics.clone(),
)?;
section.end();

let section = host_metrics.stopwatch.start_section("run_handler");
let result = match trigger {
MappingTrigger::Log {
transaction,
log,
params,
handler,
} => module.handle_ethereum_log(
handler.handler.as_str(),
transaction,
log,
params,
),
MappingTrigger::Call {
transaction,
call,
inputs,
outputs,
handler,
} => module.handle_ethereum_call(
handler.handler.as_str(),
transaction,
call,
inputs,
outputs,
),
MappingTrigger::Block { handler } => {
module.handle_ethereum_block(handler.handler.as_str())
}
};
section.end();

result_sender
.send((result, future::ok(Instant::now())))
.map_err(|_| err_msg("WASM module result receiver dropped."))
})
.wait()
{
Ok(()) => debug!(logger, "Subgraph stopped, WASM runtime thread terminated"),
Err(e) => debug!(logger, "WASM runtime thread terminated abnormally";
"error" => e.to_string()),
}
}
})
})
.map(|_| ())
.map_err(|e| format_err!("Spawning WASM runtime thread failed: {}", e))?;
Expand Down
56 changes: 10 additions & 46 deletions runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,12 @@ fn format_wasmi_error(e: Error) -> String {
}

/// A WASM module based on wasmi that powers a subgraph runtime.
pub(crate) struct WasmiModule<U> {
pub(crate) struct WasmiModule {
pub module: ModuleRef,
memory: MemoryRef,

pub ctx: MappingContext,
pub(crate) valid_module: Arc<ValidModule>,
pub(crate) task_sink: U,
pub(crate) host_metrics: Arc<HostMetrics>,

// Time when the current handler began processing.
Expand All @@ -118,19 +117,11 @@ pub(crate) struct WasmiModule<U> {
timeout_checkpoint_count: u64,
}

impl<U> WasmiModule<U>
where
U: Sink<SinkItem = Box<dyn Future<Item = (), Error = ()> + Send>>
+ Clone
+ Send
+ Sync
+ 'static,
{
impl WasmiModule {
/// Creates a new wasmi module
pub fn from_valid_module_with_ctx(
valid_module: Arc<ValidModule>,
ctx: MappingContext,
task_sink: U,
host_metrics: Arc<HostMetrics>,
) -> Result<Self, FailureError> {
// Build import resolver
Expand Down Expand Up @@ -158,7 +149,6 @@ where
memory,
ctx,
valid_module: valid_module.clone(),
task_sink,
host_metrics,
start_time: Instant::now(),
running_start: true,
Expand Down Expand Up @@ -323,14 +313,7 @@ where
}
}

impl<U> AscHeap for WasmiModule<U>
where
U: Sink<SinkItem = Box<dyn Future<Item = (), Error = ()> + Send>>
+ Clone
+ Send
+ Sync
+ 'static,
{
impl AscHeap for WasmiModule {
fn raw_new(&mut self, bytes: &[u8]) -> Result<u32, Error> {
// We request large chunks from the AssemblyScript allocator to use as arenas that we
// manage directly.
Expand Down Expand Up @@ -370,14 +353,7 @@ where
impl<E> HostError for HostExportError<E> where E: fmt::Debug + fmt::Display + Send + Sync + 'static {}

// Implementation of externals.
impl<U> WasmiModule<U>
where
U: Sink<SinkItem = Box<dyn Future<Item = (), Error = ()> + Send>>
+ Clone
+ Send
+ Sync
+ 'static,
{
impl WasmiModule {
fn gas(&mut self) -> Result<Option<RuntimeValue>, Trap> {
// This function is called so often that the overhead of calling `Instant::now()` every
// time would be significant, so we spread out the checks.
Expand Down Expand Up @@ -490,12 +466,10 @@ where
call_ptr: AscPtr<AscUnresolvedContractCall>,
) -> Result<Option<RuntimeValue>, Trap> {
let call = self.asc_get(call_ptr);
let result = self.ctx.host_exports.ethereum_call(
&mut self.task_sink,
&mut self.ctx.logger,
&self.ctx.block,
call,
)?;
let result =
self.ctx
.host_exports
.ethereum_call(&mut self.ctx.logger, &self.ctx.block, call)?;
Ok(Some(match result {
Some(tokens) => RuntimeValue::from(self.asc_new(tokens.as_slice())),
None => RuntimeValue::from(0),
Expand Down Expand Up @@ -581,10 +555,7 @@ where
/// function ipfs.cat(link: String): Bytes
fn ipfs_cat(&mut self, link_ptr: AscPtr<AscString>) -> Result<Option<RuntimeValue>, Trap> {
let link = self.asc_get(link_ptr);
let ipfs_res = self
.ctx
.host_exports
.ipfs_cat(&self.ctx.logger, &mut self.task_sink, link);
let ipfs_res = self.ctx.host_exports.ipfs_cat(&self.ctx.logger, link);
match ipfs_res {
Ok(bytes) => {
let bytes_obj: AscPtr<Uint8Array> = self.asc_new(&*bytes);
Expand Down Expand Up @@ -952,14 +923,7 @@ where
}
}

impl<U> Externals for WasmiModule<U>
where
U: Sink<SinkItem = Box<dyn Future<Item = (), Error = ()> + Send>>
+ Clone
+ Send
+ Sync
+ 'static,
{
impl Externals for WasmiModule {
fn invoke_index(
&mut self,
index: usize,
Expand Down
Loading

0 comments on commit 39f35c4

Please sign in to comment.