diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 10e5c6ff8..4c17f693d 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -291,9 +291,7 @@ impl<NID: NodeId> Engine<NID> { // // --- app API --- // // /// Write a new log entry. - // pub(crate) fn write(&mut self) -> Result<Vec<AlgoCmd<NID>>, ForwardToLeader<NID>> { - // todo!() - // } + // pub(crate) fn write(&mut self) -> Result<Vec<AlgoCmd<NID>>, ForwardToLeader<NID>> {} // // // --- raft protocol API --- // diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 5f5145aac..2694e994a 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -26,6 +26,9 @@ pub enum Fatal<NID: NodeId> { #[error(transparent)] StorageError(#[from] StorageError<NID>), + #[error("panicked")] + Panicked, + #[error("raft stopped")] Stopped, } diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index b446ebeb2..8ca34549b 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -115,6 +115,9 @@ struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> tx_shutdown: Mutex<Option<oneshot::Sender<()>>>, marker_n: std::marker::PhantomData<N>, marker_s: std::marker::PhantomData<S>, + + /// The error that cause RaftCore to quit. + core_error: std::sync::Mutex<Option<Fatal<C::NodeId>>>, } /// The Raft API. @@ -176,6 +179,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N, tx_shutdown: Mutex::new(Some(tx_shutdown)), marker_n: std::marker::PhantomData, marker_s: std::marker::PhantomData, + + core_error: std::sync::Mutex::new(None), }; Self { inner: Arc::new(inner) } } @@ -441,42 +446,62 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N, let send_res = self.inner.tx_api.send((mes, span)); if let Err(send_err) = send_res { - let last_err = self.inner.rx_metrics.borrow().running_state.clone(); + let last_err = self.get_core_error().await; tracing::error!(%send_err, mes=%sum.unwrap_or_default(), last_error=?last_err, "error send tx to RaftCore"); - - let err = match last_err { - Ok(_) => { - // normal shutdown, not caused by any error. - Fatal::Stopped - } - Err(e) => e, - }; - - return Err(err.into()); + return Err(last_err.into()); } let recv_res = rx.await; let res = match recv_res { Ok(x) => x, Err(e) => { - let last_err = self.inner.rx_metrics.borrow().running_state.clone(); + let last_err = self.get_core_error().await; tracing::error!(%e, mes=%sum.unwrap_or_default(), last_error=?last_err, "error recv rx from RaftCore"); - - let err = match last_err { - Ok(_) => { - // normal shutdown, not caused by any error. - Fatal::Stopped - } - Err(e) => e, - }; - - Err(err.into()) + Err(last_err.into()) } }; res } + async fn get_core_error(&self) -> Fatal<C::NodeId> { + // If there is an error recorded, return it. + { + let guard = self.inner.core_error.lock().unwrap(); + if let Some(x) = &*guard { + return x.clone(); + } + } + + if let Some(h) = self.inner.raft_handle.lock().await.take() { + let res = h.await; + tracing::error!(res=?res, "RaftCore exited"); + + if let Err(err) = res { + let mut guard = self.inner.core_error.lock().unwrap(); + + if err.is_panic() { + *guard = Some(Fatal::Panicked); + return Fatal::Panicked; + } else if err.is_cancelled() { + *guard = Some(Fatal::Stopped); + return Fatal::Stopped; + } + } + } + + // RaftCore encountered an un-handleable error + let last_err = self.inner.rx_metrics.borrow().running_state.clone(); + if let Err(err) = last_err { + let mut guard = self.inner.core_error.lock().unwrap(); + *guard = Some(err.clone()); + + return err; + } + + unreachable!("no RaftCore error found") + } + /// Send a request to the Raft core loop in a fire-and-forget manner. /// /// The request functor will be called with a mutable reference to both the state machine