Skip to content

Commit

Permalink
Propagate jobs into JobExecutor::run_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jedel1043 committed Jan 14, 2025
1 parent a7c10d3 commit 5dd1972
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 33 deletions.
6 changes: 3 additions & 3 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use boa_engine::{
optimizer::OptimizerOptions,
script::Script,
vm::flowgraph::{Direction, Graph},
Context, JsError, Source,
Context, JsError, JsResult, Source,
};
use boa_parser::source::ReadChar;
use clap::{Parser, ValueEnum, ValueHint};
Expand Down Expand Up @@ -467,10 +467,10 @@ impl JobExecutor for Executor {
}
}

fn run_jobs(&self, context: &mut Context) {
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
loop {
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
return Ok(());
}

let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
Expand Down
34 changes: 19 additions & 15 deletions core/engine/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub trait JobExecutor {
fn enqueue_job(&self, job: Job, context: &mut Context);

/// Runs all jobs in the executor.
fn run_jobs(&self, context: &mut Context);
fn run_jobs(&self, context: &mut Context) -> JsResult<()>;

/// Asynchronously runs all jobs in the executor.
///
Expand All @@ -395,7 +395,7 @@ pub trait JobExecutor {
fn run_jobs_async<'a, 'b, 'fut>(
&'a self,
context: &'b RefCell<&mut Context>,
) -> Pin<Box<dyn Future<Output = ()> + 'fut>>
) -> Pin<Box<dyn Future<Output = JsResult<()>> + 'fut>>
where
'a: 'fut,
'b: 'fut,
Expand Down Expand Up @@ -427,7 +427,9 @@ pub struct IdleJobExecutor;
impl JobExecutor for IdleJobExecutor {
fn enqueue_job(&self, _: Job, _: &mut Context) {}

fn run_jobs(&self, _: &mut Context) {}
fn run_jobs(&self, _: &mut Context) -> JsResult<()> {
Ok(())
}
}

/// A simple FIFO executor that bails on the first error.
Expand All @@ -438,7 +440,7 @@ impl JobExecutor for IdleJobExecutor {
/// To disable running promise jobs on the engine, see [`IdleJobExecutor`].
#[derive(Default)]
pub struct SimpleJobExecutor {
jobs: RefCell<VecDeque<PromiseJob>>,
promise_jobs: RefCell<VecDeque<PromiseJob>>,
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
}

Expand All @@ -459,36 +461,38 @@ impl SimpleJobExecutor {
impl JobExecutor for SimpleJobExecutor {
fn enqueue_job(&self, job: Job, _: &mut Context) {
match job {
Job::PromiseJob(p) => self.jobs.borrow_mut().push_back(p),
Job::PromiseJob(p) => self.promise_jobs.borrow_mut().push_back(p),
Job::AsyncJob(a) => self.async_jobs.borrow_mut().push_back(a),
}
}

fn run_jobs(&self, context: &mut Context) {
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
let context = RefCell::new(context);
loop {
let mut next_job = self.async_jobs.borrow_mut().pop_front();
while let Some(job) = next_job {
if pollster::block_on(job.call(&context)).is_err() {
if let Err(err) = pollster::block_on(job.call(&context)) {
self.async_jobs.borrow_mut().clear();
return;
self.promise_jobs.borrow_mut().clear();
return Err(err);
};
next_job = self.async_jobs.borrow_mut().pop_front();
}

// Yeah, I have no idea why Rust extends the lifetime of a `RefCell` that should be immediately
// dropped after calling `pop_front`.
let mut next_job = self.jobs.borrow_mut().pop_front();
let mut next_job = self.promise_jobs.borrow_mut().pop_front();
while let Some(job) = next_job {
if job.call(&mut context.borrow_mut()).is_err() {
self.jobs.borrow_mut().clear();
return;
if let Err(err) = job.call(&mut context.borrow_mut()) {
self.async_jobs.borrow_mut().clear();
self.promise_jobs.borrow_mut().clear();
return Err(err);
};
next_job = self.jobs.borrow_mut().pop_front();
next_job = self.promise_jobs.borrow_mut().pop_front();
}

if self.async_jobs.borrow().is_empty() && self.jobs.borrow().is_empty() {
return;
if self.async_jobs.borrow().is_empty() && self.promise_jobs.borrow().is_empty() {
return Ok(());
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/module_fetch_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,23 @@ impl JobExecutor for Queue {
}

// While the sync flavor of `run_jobs` will block the current thread until all the jobs have finished...
fn run_jobs(&self, context: &mut Context) {
smol::block_on(smol::LocalExecutor::new().run(self.run_jobs_async(&RefCell::new(context))));
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
smol::block_on(smol::LocalExecutor::new().run(self.run_jobs_async(&RefCell::new(context))))
}

// ...the async flavor won't, which allows concurrent execution with external async tasks.
fn run_jobs_async<'a, 'b, 'fut>(
&'a self,
context: &'b RefCell<&mut Context>,
) -> Pin<Box<dyn Future<Output = ()> + 'fut>>
) -> Pin<Box<dyn Future<Output = JsResult<()>> + 'fut>>
where
'a: 'fut,
'b: 'fut,
{
Box::pin(async move {
// Early return in case there were no jobs scheduled.
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
return Ok(());
}
let mut group = FutureGroup::new();
loop {
Expand All @@ -231,7 +231,7 @@ impl JobExecutor for Queue {
if self.promise_jobs.borrow().is_empty() {
let Some(result) = group.next().await else {
// Both queues are empty. We can exit.
return;
return Ok(());
};

if let Err(err) = result {
Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/smol_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,23 @@ impl JobExecutor for Queue {
}

// While the sync flavor of `run_jobs` will block the current thread until all the jobs have finished...
fn run_jobs(&self, context: &mut Context) {
smol::block_on(smol::LocalExecutor::new().run(self.run_jobs_async(&RefCell::new(context))));
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
smol::block_on(smol::LocalExecutor::new().run(self.run_jobs_async(&RefCell::new(context))))
}

// ...the async flavor won't, which allows concurrent execution with external async tasks.
fn run_jobs_async<'a, 'b, 'fut>(
&'a self,
context: &'b RefCell<&mut Context>,
) -> Pin<Box<dyn Future<Output = ()> + 'fut>>
) -> Pin<Box<dyn Future<Output = JsResult<()>> + 'fut>>
where
'a: 'fut,
'b: 'fut,
{
Box::pin(async move {
// Early return in case there were no jobs scheduled.
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
return Ok(());
}
let mut group = FutureGroup::new();
loop {
Expand All @@ -93,7 +93,7 @@ impl JobExecutor for Queue {
if self.promise_jobs.borrow().is_empty() {
let Some(result) = group.next().await else {
// Both queues are empty. We can exit.
return;
return Ok(());
};

if let Err(err) = result {
Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/tokio_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,28 @@ impl JobExecutor for Queue {
}

// While the sync flavor of `run_jobs` will block the current thread until all the jobs have finished...
fn run_jobs(&self, context: &mut Context) {
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();

task::LocalSet::default().block_on(&runtime, self.run_jobs_async(&RefCell::new(context)));
task::LocalSet::default().block_on(&runtime, self.run_jobs_async(&RefCell::new(context)))
}

// ...the async flavor won't, which allows concurrent execution with external async tasks.
fn run_jobs_async<'a, 'b, 'fut>(
&'a self,
context: &'b RefCell<&mut Context>,
) -> Pin<Box<dyn Future<Output = ()> + 'fut>>
) -> Pin<Box<dyn Future<Output = JsResult<()>> + 'fut>>
where
'a: 'fut,
'b: 'fut,
{
Box::pin(async move {
// Early return in case there were no jobs scheduled.
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
return Ok(());
}
let mut group = FutureGroup::new();
loop {
Expand All @@ -98,7 +98,7 @@ impl JobExecutor for Queue {
if self.promise_jobs.borrow().is_empty() {
let Some(result) = group.next().await else {
// Both queues are empty. We can exit.
return;
return Ok(());
};

if let Err(err) = result {
Expand Down

0 comments on commit 5dd1972

Please sign in to comment.