Skip to content

Commit

Permalink
[Refactor] update the argument type of run_func_with_timeout and `r…
Browse files Browse the repository at this point in the history
…un_func_async_with_timeout` (WasmEdge#76)

* [Feat] run func timeout use std::time::Duration

Signed-off-by: csh <[email protected]>

* [Doc]: update api docs

Signed-off-by: csh <[email protected]>

---------

Signed-off-by: csh <[email protected]>
(cherry picked from commit 07e84b7)
  • Loading branch information
L-jasmine committed Oct 27, 2023
1 parent 9d07f6c commit b299c09
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 25 deletions.
21 changes: 16 additions & 5 deletions crates/wasmedge-sys/src/async/fiber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ impl<'a> FiberFuture<'a> {
///
/// # Arguments
///
/// * `async_state` - Used to store asynchronous state at run time.
///
/// * `func` - The function to execute.
///
/// # Error
Expand Down Expand Up @@ -106,7 +108,7 @@ pub(crate) struct TimeoutFiberFuture<'a> {
fiber: Fiber<'a, Result<(), ()>, (), Result<(), ()>>,
current_suspend: *mut *const Suspend<Result<(), ()>, (), Result<(), ()>>,
current_poll_cx: *mut *mut Context<'static>,
timeout_sec: u64,
deadline: std::time::SystemTime,
}

impl<'a> TimeoutFiberFuture<'a> {
Expand All @@ -116,15 +118,17 @@ impl<'a> TimeoutFiberFuture<'a> {
///
/// * `func` - The function to execute.
///
/// * `timeout_sec` - The maximum execution time in seconds for the function instance.
/// * `async_state` - Used to store asynchronous state at run time.
///
/// * `deadline` - The deadline the function to be run.
///
/// # Error
///
/// If fail to create the fiber stack or the fiber fail to resume, then an error is returned.
pub(crate) async fn on_fiber<R>(
async_state: &AsyncState,
func: impl FnOnce() -> R + Send,
timeout_sec: u64,
deadline: std::time::SystemTime,
) -> Result<R, ()> {
let mut slot = None;

Expand All @@ -150,7 +154,7 @@ impl<'a> TimeoutFiberFuture<'a> {
fiber,
current_suspend,
current_poll_cx,
timeout_sec,
deadline,
}
};

Expand Down Expand Up @@ -185,8 +189,15 @@ impl<'a> Future for TimeoutFiberFuture<'a> {
if libc::timer_create(libc::CLOCK_REALTIME, &mut sev, &mut timerid) < 0 {
return Poll::Ready(Err(()));
}

let timeout = match self.deadline.duration_since(std::time::SystemTime::now()) {
Ok(timeout) => timeout.max(std::time::Duration::from_millis(100)),
Err(_) => return Poll::Ready(Err(())),
};

let mut value: libc::itimerspec = std::mem::zeroed();
value.it_value.tv_sec = self.timeout_sec as i64;
value.it_value.tv_sec = timeout.as_secs() as _;
value.it_value.tv_nsec = timeout.subsec_nanos() as _;
if libc::timer_settime(timerid, 0, &value, std::ptr::null_mut()) < 0 {
libc::timer_delete(timerid);
return Poll::Ready(Err(()));
Expand Down
20 changes: 14 additions & 6 deletions crates/wasmedge-sys/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Executor {
///
/// * `params` - The arguments to pass to the function.
///
/// * `timeout` - The maximum execution time (in seconds) of the function to be run.
/// * `timeout` - The maximum execution time of the function to be run.
///
/// # Errors
///
Expand All @@ -201,7 +201,7 @@ impl Executor {
&self,
func: &Function,
params: impl IntoIterator<Item = WasmValue>,
timeout: u64,
timeout: std::time::Duration,
) -> WasmEdgeResult<Vec<WasmValue>> {
use wasmedge_types::error;

Expand Down Expand Up @@ -231,7 +231,8 @@ impl Executor {
)));
}
let mut value: libc::itimerspec = std::mem::zeroed();
value.it_value.tv_sec = timeout as i64;
value.it_value.tv_sec = timeout.as_secs() as _;
value.it_value.tv_nsec = timeout.subsec_nanos() as _;
if libc::timer_settime(timerid, 0, &value, std::ptr::null_mut()) < 0 {
libc::timer_delete(timerid);
return Err(Box::new(error::WasmEdgeError::Operation(
Expand Down Expand Up @@ -267,6 +268,8 @@ impl Executor {
///
/// # Arguments
///
/// * `async_state` - Used to store asynchronous state at run time.
///
/// * `func` - The function instance to run.
///
/// * `params` - The arguments to pass to the function.
Expand All @@ -291,11 +294,13 @@ impl Executor {
///
/// # Arguments
///
/// * `async_state` - Used to store asynchronous state at run time.
///
/// * `func` - The function instance to run.
///
/// * `params` - The arguments to pass to the function.
///
/// * `timeout` - The maximum execution time (in seconds) of the function to be run.
/// * `timeout` - The maximum execution time of the function to be run.
///
/// # Errors
///
Expand All @@ -307,10 +312,11 @@ impl Executor {
async_state: &AsyncState,
func: &mut Function,
params: impl IntoIterator<Item = WasmValue> + Send,
timeout: u64,
timeout: std::time::Duration,
) -> WasmEdgeResult<Vec<WasmValue>> {
use wasmedge_types::error;
TimeoutFiberFuture::on_fiber(async_state, || self.call_func(func, params), timeout)
let ldd = std::time::SystemTime::now() + timeout;
TimeoutFiberFuture::on_fiber(async_state, || self.call_func(func, params), ldd)
.await
.map_err(|_| Box::new(error::WasmEdgeError::ExecuteTimeout))?
}
Expand Down Expand Up @@ -357,6 +363,8 @@ impl Executor {
///
/// # Arguments
///
/// * `async_state` - Used to store asynchronous state at run time.
///
/// * `func_ref` - The function reference instance to run.
///
/// * `params` - The arguments to pass to the function.
Expand Down
4 changes: 2 additions & 2 deletions src/async/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<'inst, T: ?Sized + Send + AsyncInst> Vm<'inst, T> {
///
/// * `args` - The arguments to be passed to the target wasm function.
///
/// * `timeout` - The maximum execution time (in seconds) of the function to be run.
/// * `timeout` - The maximum execution time of the function to be run.
///
/// # Error
///
Expand All @@ -196,7 +196,7 @@ impl<'inst, T: ?Sized + Send + AsyncInst> Vm<'inst, T> {
mod_name: Option<&str>,
func_name: impl AsRef<str>,
args: impl IntoIterator<Item = WasmValue> + Send,
timeout: u64,
timeout: std::time::Duration,
) -> WasmEdgeResult<Vec<WasmValue>> {
let (mut func, executor) = match mod_name {
Some(mod_name) => {
Expand Down
18 changes: 8 additions & 10 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl Executor {
///
/// * `params` - The arguments to pass to the function.
///
/// * `timeout` - The maximum execution time (in seconds) of the function to be run.
/// * `timeout` - The maximum execution time of the function to be run.
///
/// # Errors
///
Expand All @@ -78,14 +78,10 @@ impl Executor {
&self,
func: &Func,
params: impl IntoIterator<Item = WasmValue>,
timeout: u64,
timeout: std::time::Duration,
) -> WasmEdgeResult<Vec<WasmValue>> {
if timeout > 0 {
self.inner
.call_func_with_timeout(&func.inner, params, timeout)
} else {
self.inner.call_func(&func.inner, params)
}
self.inner
.call_func_with_timeout(&func.inner, params, timeout)
}

/// Asynchronously runs a host function instance and returns the results.
Expand Down Expand Up @@ -116,11 +112,13 @@ impl Executor {
///
/// # Arguments
///
/// * `async_state` - Used to store asynchronous state at run time.
///
/// * `func` - The function instance to run.
///
/// * `params` - The arguments to pass to the function.
///
/// * `timeout` - The maximum execution time (in seconds) of the function to be run.
/// * `timeout` - The maximum execution time of the function to be run.
///
/// # Errors
///
Expand All @@ -132,7 +130,7 @@ impl Executor {
async_state: &AsyncState,
func: &Func,
params: impl IntoIterator<Item = WasmValue> + Send,
timeout: u64,
timeout: std::time::Duration,
) -> WasmEdgeResult<Vec<WasmValue>> {
self.inner
.call_func_async_with_timeout(async_state, &func.inner, params, timeout)
Expand Down
4 changes: 2 additions & 2 deletions src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl<'inst, T: ?Sized + SyncInst> Vm<'inst, T> {
///
/// * `args` - The arguments to be passed to the target wasm function.
///
/// * `timeout` - The maximum execution time (in seconds) of the function to be run.
/// * `timeout` - The maximum execution time of the function to be run.
///
/// # Error
///
Expand All @@ -188,7 +188,7 @@ impl<'inst, T: ?Sized + SyncInst> Vm<'inst, T> {
mod_name: Option<&str>,
func_name: impl AsRef<str>,
args: impl IntoIterator<Item = WasmValue>,
timeout: u64,
timeout: std::time::Duration,
) -> WasmEdgeResult<Vec<WasmValue>> {
let (mut func, executor) = match mod_name {
Some(mod_name) => {
Expand Down

0 comments on commit b299c09

Please sign in to comment.