Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve host model loading (on-demand background thread) #1477

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/fj-host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ categories.workspace = true
cargo_metadata = "0.15.2"
crossbeam-channel = "0.5.6"
fj.workspace = true
fj-interop.workspace = true
fj-operations.workspace = true
libloading = "0.7.4"
notify = "5.0.0"
thiserror = "1.0.35"
tracing = "0.1.37"
winit = "0.27.5"
115 changes: 61 additions & 54 deletions crates/fj-host/src/evaluator.rs
Original file line number Diff line number Diff line change
@@ -1,80 +1,87 @@
use std::thread;
use std::thread::{self, JoinHandle};

use crossbeam_channel::{Receiver, SendError, Sender};
use crossbeam_channel::Sender;
use fj_interop::processed_shape::ProcessedShape;
use fj_operations::shape_processor::ShapeProcessor;
use winit::event_loop::{EventLoopClosed, EventLoopProxy};

use crate::{Error, Evaluation, Model};
use crate::{Error, Model};

/// Evaluates a model in a background thread
pub struct Evaluator {
trigger_tx: Sender<TriggerEvaluation>,
event_rx: Receiver<ModelEvent>,
}
/// Start a background thread for evaluating a model
pub fn spawn_evaluator(
model: Model,
shape_processor: ShapeProcessor,
event_loop_proxy: EventLoopProxy<ModelEvent>,
) -> (JoinHandle<()>, Sender<TriggerEvaluation>) {
let (trigger_tx, trigger_rx) = crossbeam_channel::bounded(0);

impl Evaluator {
/// Create an `Evaluator` from a model
pub fn from_model(model: Model) -> Self {
let (event_tx, event_rx) = crossbeam_channel::bounded(0);
let (trigger_tx, trigger_rx) = crossbeam_channel::bounded(0);
let join_handle = thread::Builder::new()
.name("evaluator".to_string())
.spawn(move || {
if let Err(EventLoopClosed(..)) =
event_loop_proxy.send_event(ModelEvent::StartWatching)
{
return;
}
evaluate_model(&model, &shape_processor, &event_loop_proxy);

thread::spawn(move || {
while matches!(trigger_rx.recv(), Ok(TriggerEvaluation)) {
if let Err(SendError(_)) =
event_tx.send(ModelEvent::ChangeDetected)
if let Err(EventLoopClosed(..)) =
event_loop_proxy.send_event(ModelEvent::ChangeDetected)
{
break;
return;
}

let evaluation = match model.evaluate() {
Ok(evaluation) => evaluation,
Err(err) => {
if let Err(SendError(_)) =
event_tx.send(ModelEvent::Error(err))
{
break;
}
continue;
}
};

if let Err(SendError(_)) =
event_tx.send(ModelEvent::Evaluation(evaluation))
{
break;
};
evaluate_model(&model, &shape_processor, &event_loop_proxy);
}
})
.expect("Cannot create thread in evaluator");

// The channel is disconnected, which means this instance of
// `Evaluator`, as well as all `Sender`s created from it, have been
// dropped. We're done.
});
(join_handle, trigger_tx)
}

Self {
trigger_tx,
event_rx,
fn evaluate_model(
model: &Model,
shape_processor: &ShapeProcessor,
event_loop_proxy: &EventLoopProxy<ModelEvent>,
) {
let evaluation = match model.evaluate() {
Ok(evaluation) => evaluation,
Err(err) => {
event_loop_proxy
.send_event(ModelEvent::Error(err))
.expect("Event loop proxy closed");
return;
}
}
};

event_loop_proxy
.send_event(ModelEvent::Evaluated)
.expect("Event loop proxy closed");

/// Access a channel for triggering evaluations
pub fn trigger(&self) -> Sender<TriggerEvaluation> {
self.trigger_tx.clone()
}
let shape = shape_processor.process(&evaluation.shape).unwrap();

/// Access a channel for receiving status updates
pub fn events(&self) -> Receiver<ModelEvent> {
self.event_rx.clone()
}
event_loop_proxy
.send_event(ModelEvent::ProcessedShape(shape))
.expect("Event loop proxy closed");
}

/// Command received by [`Evaluator`] through its channel
/// Command received by an evaluator thread through its channel
pub struct TriggerEvaluation;

/// An event emitted by [`Evaluator`]
/// An event emitted by an evaluator thread
#[derive(Debug)]
pub enum ModelEvent {
/// A new model is being watched
StartWatching,

/// A change in the model has been detected
ChangeDetected,

/// The model has been evaluated
Evaluation(Evaluation),
Evaluated,

/// The model has been processed into a `Shape`
ProcessedShape(ProcessedShape),

/// An error
Error(Error),
Expand Down
53 changes: 43 additions & 10 deletions crates/fj-host/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crossbeam_channel::Receiver;
use std::thread::JoinHandle;

use crate::{Error, Evaluator, Model, ModelEvent, Watcher};
use fj_operations::shape_processor::ShapeProcessor;
use winit::event_loop::EventLoopProxy;

use crate::{spawn_evaluator, Error, Model, ModelEvent, Watcher};

/// A Fornjot model host
pub struct Host {
evaluator: Evaluator,
evaluator_thread: Option<JoinHandle<()>>,
_watcher: Watcher,
}

Expand All @@ -13,19 +16,49 @@ impl Host {
///
/// This is only useful, if you want to continuously watch the model for
/// changes. If you don't, just keep using `Model`.
pub fn from_model(model: Model) -> Result<Self, Error> {
pub fn new(
model: Model,
shape_processor: ShapeProcessor,
event_loop_proxy: EventLoopProxy<ModelEvent>,
) -> Result<Self, Error> {
let watch_path = model.watch_path();
let evaluator = Evaluator::from_model(model);
let watcher = Watcher::watch_model(watch_path, &evaluator)?;
let (evaluator_thread, trigger_tx) =
spawn_evaluator(model, shape_processor, event_loop_proxy);
let watcher = Watcher::watch_model(watch_path, trigger_tx)?;

Ok(Self {
evaluator,
evaluator_thread: Some(evaluator_thread),
_watcher: watcher,
})
}

/// Access a channel with evaluation events
pub fn events(&self) -> Receiver<ModelEvent> {
self.evaluator.events()
/// Check if the evaluator thread has exited with a panic.
///
/// # Panics
///
/// Panics if the evaluator thread has panicked.
pub fn propagate_panic(&mut self) {
if self.evaluator_thread.is_none() {
unreachable!("Constructor requires host thread")
}
if let Some(evaluator_thread) = &self.evaluator_thread {
// The host thread should not finish while this handle holds the
// `command_tx` channel open, so an exit means the thread panicked.
if evaluator_thread.is_finished() {
let evaluator_thread = self.evaluator_thread.take().unwrap();
match evaluator_thread.join() {
Ok(()) => {
unreachable!(
"Evaluator thread cannot exit until host handle disconnects"
)
}
// The error value has already been reported by the panic
// in the host thread, so just ignore it here.
Err(_) => {
panic!("Evaluator thread panicked")
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion crates/fj-host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod platform;
mod watcher;

pub use self::{
evaluator::{Evaluator, ModelEvent},
evaluator::{spawn_evaluator, ModelEvent},
host::Host,
model::{Error, Evaluation, Model},
parameters::Parameters,
Expand Down
1 change: 1 addition & 0 deletions crates/fj-host/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl Model {
/// The result of evaluating a model
///
/// See [`Model::evaluate`].
#[derive(Debug)]
pub struct Evaluation {
/// The shape
pub shape: fj::Shape,
Expand Down
27 changes: 5 additions & 22 deletions crates/fj-host/src/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{collections::HashSet, ffi::OsStr, path::Path, thread};
use std::{collections::HashSet, ffi::OsStr, path::Path};

use crossbeam_channel::Sender;
use notify::Watcher as _;

use crate::{evaluator::TriggerEvaluation, Error, Evaluator};
use crate::{evaluator::TriggerEvaluation, Error};

/// Watches a model for changes, reloading it continually
pub struct Watcher {
Expand All @@ -13,13 +14,10 @@ impl Watcher {
/// Watch the provided model for changes
pub fn watch_model(
watch_path: impl AsRef<Path>,
evaluator: &Evaluator,
trigger_tx: Sender<TriggerEvaluation>,
) -> Result<Self, Error> {
let watch_path = watch_path.as_ref();

let watch_tx = evaluator.trigger();
let watch_tx_2 = evaluator.trigger();

let mut watcher = notify::recommended_watcher(
move |event: notify::Result<notify::Event>| {
// Unfortunately the `notify` documentation doesn't say when
Expand Down Expand Up @@ -59,7 +57,7 @@ impl Watcher {
// application is being shut down.
//
// Either way, not much we can do about it here.
watch_tx
trigger_tx
.send(TriggerEvaluation)
.expect("Channel is disconnected");
}
Expand All @@ -68,21 +66,6 @@ impl Watcher {

watcher.watch(watch_path, notify::RecursiveMode::Recursive)?;

// To prevent a race condition between the initial load and the start of
// watching, we'll trigger the initial load here, after having started
// watching.
//
// This happens in a separate thread, because the channel is bounded and
// has no buffer.
//
// Will panic, if the receiving end has panicked. Not much we can do
// about that, if it happened.
thread::spawn(move || {
watch_tx_2
.send(TriggerEvaluation)
.expect("Channel is disconnected");
});

Ok(Self {
_watcher: Box::new(watcher),
})
Expand Down
1 change: 1 addition & 0 deletions crates/fj-operations/src/shape_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use fj_math::Scalar;
use crate::Shape as _;

/// Processes an [`fj::Shape`] into a [`ProcessedShape`]
#[derive(Clone)]
pub struct ShapeProcessor {
/// The tolerance value used for creating the triangle mesh
pub tolerance: Option<Tolerance>,
Expand Down
Loading