diff --git a/geth-engine/src/process.rs b/geth-engine/src/process.rs index 2a01e07..412ee0e 100644 --- a/geth-engine/src/process.rs +++ b/geth-engine/src/process.rs @@ -25,6 +25,8 @@ mod echo; pub mod grpc; pub mod indexing; mod messages; +#[cfg(test)] +mod panic; pub mod reading; mod resource; #[cfg(test)] @@ -69,6 +71,8 @@ pub enum Proc { Echo, #[cfg(test)] Sink, + #[cfg(test)] + Panic, } enum Topology { @@ -281,6 +285,9 @@ where #[cfg(test)] Proc::Sink => spawn(options, client, proc, sink::run), + + #[cfg(test)] + Proc::Panic => spawn(options, client, proc, panic::run), }; let proc_id = running_proc.id; diff --git a/geth-engine/src/process/messages.rs b/geth-engine/src/process/messages.rs index 68bd184..e29712b 100644 --- a/geth-engine/src/process/messages.rs +++ b/geth-engine/src/process/messages.rs @@ -10,6 +10,13 @@ pub enum Messages { Responses(Responses), } +#[cfg(test)] +impl Messages { + pub fn is_fatal_error(&self) -> bool { + matches!(self, Messages::Responses(Responses::FatalError)) + } +} + impl From for Messages { fn from(req: IndexRequests) -> Self { Messages::Requests(Requests::Index(req)) diff --git a/geth-engine/src/process/panic.rs b/geth-engine/src/process/panic.rs new file mode 100644 index 0000000..50ab98e --- /dev/null +++ b/geth-engine/src/process/panic.rs @@ -0,0 +1,5 @@ +use super::ProcessEnv; + +pub async fn run(_: ProcessEnv) -> eyre::Result<()> { + panic!("this process panic on purpose"); +} diff --git a/geth-engine/src/process/tests/interactions.rs b/geth-engine/src/process/tests/interactions.rs index f1e0ece..7283f00 100644 --- a/geth-engine/src/process/tests/interactions.rs +++ b/geth-engine/src/process/tests/interactions.rs @@ -11,6 +11,7 @@ fn test_catalog() -> Catalog { Catalog::builder() .register(Proc::Echo) .register(Proc::Sink) + .register(Proc::Panic) .build() } @@ -90,3 +91,31 @@ async fn test_shutdown_reported_properly() -> eyre::Result<()> { Ok(()) } + +#[tokio::test] +async fn test_request_returns_when_proc_panicked() -> eyre::Result<()> { + let manager = start_process_manager_with_catalog(Options::in_mem(), test_catalog()).await?; + let proc_id = manager.wait_for(Proc::Panic).await?; + + let resp = manager + .request(proc_id, TestSinkResponses::Stream(42).into()) + .await?; + + assert!(resp.payload.is_fatal_error()); + + Ok(()) +} + +#[tokio::test] +async fn test_stream_returns_when_proc_panicked() -> eyre::Result<()> { + let manager = start_process_manager_with_catalog(Options::in_mem(), test_catalog()).await?; + let proc_id = manager.wait_for(Proc::Panic).await?; + + let mut resp = manager + .request_stream(proc_id, TestSinkResponses::Stream(42).into()) + .await?; + + assert!(resp.recv().await.is_none()); + + Ok(()) +}