From 4ad89fefdf6060319d594d4a0fd6f37d6daf43ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Schre=CC=81ter?= Date: Wed, 15 Nov 2023 12:57:30 +0100 Subject: [PATCH] Fix: For single-threaded execution, there is no need for `Send`/`Sync` bounds. Some preliminary implementation with `OptionalSend`/`OptionalSync` was done previously, it was not complete, though. Currently, `Send`/`Sync` bounds are required for `AppData`, `RaftEntry`, async runtime wrappers and other types, but they are only relevant for multi-threaded access. If `openraft` is configured to use single-threaded runtime, then they don't have to be `Send`/`Sync`. Consequently replace `Send`/`Sync` bounds with `OptionalSend`/`OptionalSync` bounds to clean it up. Even if there are no `Send`/`Sync` bounds required, the `Raft` object can still be `Send`/`Sync` capable. It is only an API object sending requests to the Raft main loop over a channel. As long as the involved data types are `Send` (and some of them, which are used in `RaftInner`, also `Sync`), we can declare `Raft` as such `Send`/`Sync`. This change also fixes `timeout.rs`, which seems to be unused so far, but didn't properly use `AsyncRuntime` abstraction. Two points for the future: - We should add a generic test invocation to invoke tests on `LocalSet` for tests with `singlethreaded` feature. Currently, there is a single test only, in `timeout_test.rs`. - Later, even `Arc` could be replaced with a single-threaded counterpart to prevent the need for atomic refcounting. --- openraft/src/async_runtime.rs | 6 +-- openraft/src/core/raft_msg/mod.rs | 8 +++- openraft/src/defensive.rs | 3 +- .../src/docs/feature_flags/feature-flags.md | 4 +- openraft/src/entry/traits.rs | 3 +- openraft/src/instant.rs | 7 ++- openraft/src/lib.rs | 8 ++-- openraft/src/network/backoff.rs | 7 ++- openraft/src/network/factory.rs | 4 +- openraft/src/network/network.rs | 3 +- openraft/src/node.rs | 30 ++++++++++--- openraft/src/raft/mod.rs | 44 ++++++++++++++++++- openraft/src/storage/adapter.rs | 3 +- openraft/src/storage/log_store_ext.rs | 4 +- openraft/src/storage/mod.rs | 9 ++-- openraft/src/storage/v2.rs | 5 ++- openraft/src/timer/timeout.rs | 38 +++++++++------- openraft/src/timer/timeout_test.rs | 21 +++++++-- openraft/src/type_config.rs | 5 ++- 19 files changed, 159 insertions(+), 53 deletions(-) diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs index 4356eb2f3..13d99b96a 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/async_runtime.rs @@ -16,9 +16,9 @@ use crate::TokioInstant; /// ## Note /// /// The default asynchronous runtime is `tokio`. -pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static { +pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static { /// The error type of [`Self::JoinHandle`]. - type JoinError: Debug + Display + Send; + type JoinError: Debug + Display + OptionalSend; /// The return type of [`Self::spawn`]. type JoinHandle: Future> @@ -33,7 +33,7 @@ pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static { type Instant: Instant; /// The timeout error type. - type TimeoutError: Debug + Display + Send; + type TimeoutError: Debug + Display + OptionalSend; /// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user /// to await the outcome of a [`Future`]. diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index e4804b733..a18aa70ea 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -89,13 +89,19 @@ where tx: ResultSender, ClientWriteError>, }, + #[allow(clippy::type_complexity)] ExternalRequest { - #[allow(clippy::type_complexity)] + #[cfg(not(feature = "singlethreaded"))] req: Box< dyn FnOnce(&RaftState::Instant>, &mut LS, &mut N) + Send + 'static, >, + #[cfg(feature = "singlethreaded")] + req: Box< + dyn FnOnce(&RaftState::Instant>, &mut LS, &mut N) + + 'static, + >, }, ExternalCommand { diff --git a/openraft/src/defensive.rs b/openraft/src/defensive.rs index 383492e8c..507e6eb34 100644 --- a/openraft/src/defensive.rs +++ b/openraft/src/defensive.rs @@ -5,11 +5,12 @@ use std::ops::RangeBounds; use crate::log_id::RaftLogId; use crate::DefensiveError; use crate::ErrorSubject; +use crate::OptionalSend; use crate::RaftTypeConfig; use crate::StorageError; use crate::Violation; -pub fn check_range_matches_entries + Debug + Send>( +pub fn check_range_matches_entries + Debug + OptionalSend>( range: RB, entries: &[C::Entry], ) -> Result<(), StorageError> { diff --git a/openraft/src/docs/feature_flags/feature-flags.md b/openraft/src/docs/feature_flags/feature-flags.md index 9ab45c970..dd399cedd 100644 --- a/openraft/src/docs/feature_flags/feature-flags.md +++ b/openraft/src/docs/feature_flags/feature-flags.md @@ -38,8 +38,8 @@ By default openraft enables no features. V2 storage separates log store and state machine store so that log IO and state machine IO can be parallelized naturally.

-- `singlethreaded`: removes `Send` bounds from `AppData`, `AppDataResponse`, `RaftEntry`, and `SnapshotData` to force the - asynchronous runtime to spawn any tasks in the current thread. +- `singlethreaded`: removes `Send` and `Sync` bounds from `AppData`, `AppDataResponse`, `RaftEntry`, `SnapshotData` + and other types to force the asynchronous runtime to spawn any tasks in the current thread. This is for any single-threaded application that never allows a raft instance to be shared among multiple threads. In order to use the feature, `AsyncRuntime::spawn` should invoke `tokio::task::spawn_local` or equivalents.

diff --git a/openraft/src/entry/traits.rs b/openraft/src/entry/traits.rs index fafcf303e..baad89b4f 100644 --- a/openraft/src/entry/traits.rs +++ b/openraft/src/entry/traits.rs @@ -8,6 +8,7 @@ use crate::Node; use crate::NodeId; use crate::OptionalSend; use crate::OptionalSerde; +use crate::OptionalSync; /// Defines operations on an entry payload. pub trait RaftPayload @@ -27,7 +28,7 @@ pub trait RaftEntry: RaftPayload + RaftLogId where N: Node, NID: NodeId, - Self: OptionalSerde + Debug + Display + OptionalSend + Sync, + Self: OptionalSerde + Debug + Display + OptionalSend + OptionalSync, { /// Create a new blank log entry. /// diff --git a/openraft/src/instant.rs b/openraft/src/instant.rs index 815cfb168..78c6d440c 100644 --- a/openraft/src/instant.rs +++ b/openraft/src/instant.rs @@ -7,6 +7,9 @@ use std::panic::RefUnwindSafe; use std::panic::UnwindSafe; use std::time::Duration; +use crate::OptionalSend; +use crate::OptionalSync; + /// A measurement of a monotonically non-decreasing clock. pub trait Instant: Add @@ -19,11 +22,11 @@ pub trait Instant: + PartialEq + PartialOrd + RefUnwindSafe - + Send + + OptionalSend + Sub + Sub + SubAssign - + Sync + + OptionalSync + Unpin + UnwindSafe + 'static diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index b0e4c89ea..5fe24843b 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -189,9 +189,9 @@ impl OptionalSync for T {} /// ## Note /// /// The trait is automatically implemented for all types which satisfy its supertraits. -pub trait AppData: OptionalSend + Sync + 'static + OptionalSerde {} +pub trait AppData: OptionalSend + OptionalSync + 'static + OptionalSerde {} -impl AppData for T where T: OptionalSend + Sync + 'static + OptionalSerde {} +impl AppData for T where T: OptionalSend + OptionalSync + 'static + OptionalSerde {} /// A trait defining application specific response data. /// @@ -210,6 +210,6 @@ impl AppData for T where T: OptionalSend + Sync + 'static + OptionalSerde {} /// ## Note /// /// The trait is automatically implemented for all types which satisfy its supertraits. -pub trait AppDataResponse: OptionalSend + Sync + 'static + OptionalSerde {} +pub trait AppDataResponse: OptionalSend + OptionalSync + 'static + OptionalSerde {} -impl AppDataResponse for T where T: OptionalSend + Sync + 'static + OptionalSerde {} +impl AppDataResponse for T where T: OptionalSend + OptionalSync + 'static + OptionalSerde {} diff --git a/openraft/src/network/backoff.rs b/openraft/src/network/backoff.rs index b77626fd1..66b96c895 100644 --- a/openraft/src/network/backoff.rs +++ b/openraft/src/network/backoff.rs @@ -1,13 +1,18 @@ use std::time::Duration; +use crate::OptionalSend; + /// A backoff instance that is an infinite iterator of durations to sleep before next retry, when a /// [`Unreachable`](`crate::error::Unreachable`) occurs. pub struct Backoff { + #[cfg(not(feature = "singlethreaded"))] inner: Box + Send + 'static>, + #[cfg(feature = "singlethreaded")] + inner: Box + 'static>, } impl Backoff { - pub fn new(iter: impl Iterator + Send + 'static) -> Self { + pub fn new(iter: impl Iterator + OptionalSend + 'static) -> Self { Self { inner: Box::new(iter) } } } diff --git a/openraft/src/network/factory.rs b/openraft/src/network/factory.rs index 4bc13ae7d..29ef48b2a 100644 --- a/openraft/src/network/factory.rs +++ b/openraft/src/network/factory.rs @@ -1,6 +1,8 @@ use macros::add_async_trait; use crate::network::RaftNetwork; +use crate::OptionalSend; +use crate::OptionalSync; use crate::RaftTypeConfig; /// A trait defining the interface for a Raft network factory to create connections between cluster @@ -12,7 +14,7 @@ use crate::RaftTypeConfig; /// Typically, the network implementation as such will be hidden behind a `Box` or `Arc` and /// this interface implemented on the `Box` or `Arc`. #[add_async_trait] -pub trait RaftNetworkFactory: Send + Sync + 'static +pub trait RaftNetworkFactory: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Actual type of the network handling a single connection. diff --git a/openraft/src/network/network.rs b/openraft/src/network/network.rs index 22b90511a..b0b8fe100 100644 --- a/openraft/src/network/network.rs +++ b/openraft/src/network/network.rs @@ -14,6 +14,7 @@ use crate::raft::InstallSnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::OptionalSend; +use crate::OptionalSync; use crate::RaftTypeConfig; /// A trait defining the interface for a Raft network between cluster members. @@ -36,7 +37,7 @@ use crate::RaftTypeConfig; /// /// - Implementing the new APIs will disable the old APIs. #[add_async_trait] -pub trait RaftNetwork: OptionalSend + Sync + 'static +pub trait RaftNetwork: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Send an AppendEntries RPC to the target. diff --git a/openraft/src/node.rs b/openraft/src/node.rs index dbf9eea08..c280df842 100644 --- a/openraft/src/node.rs +++ b/openraft/src/node.rs @@ -3,16 +3,32 @@ use std::fmt::Display; use std::fmt::Formatter; use std::hash::Hash; +use crate::OptionalSend; +use crate::OptionalSync; + /// Essential trait bound for node-id, except serde. #[doc(hidden)] pub trait NodeIdEssential: - Sized + Send + Sync + Eq + PartialEq + Ord + PartialOrd + Debug + Display + Hash + Copy + Clone + Default + 'static + Sized + + OptionalSend + + OptionalSync + + Eq + + PartialEq + + Ord + + PartialOrd + + Debug + + Display + + Hash + + Copy + + Clone + + Default + + 'static { } impl NodeIdEssential for T where T: Sized - + Send - + Sync + + OptionalSend + + OptionalSync + Eq + PartialEq + Ord @@ -43,8 +59,12 @@ pub trait NodeId: NodeIdEssential {} impl NodeId for T where T: NodeIdEssential {} /// Essential trait bound for application level node-data, except serde. -pub trait NodeEssential: Sized + Send + Sync + Eq + PartialEq + Debug + Clone + Default + 'static {} -impl NodeEssential for T where T: Sized + Send + Sync + Eq + PartialEq + Debug + Clone + Default + 'static {} +pub trait NodeEssential: + Sized + OptionalSend + OptionalSync + Eq + PartialEq + Debug + Clone + Default + 'static +{ +} +impl NodeEssential for T where T: Sized + OptionalSend + OptionalSync + Eq + PartialEq + Debug + Clone + Default + 'static +{} /// A Raft `Node`, this trait holds all relevant node information. /// diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 3c3ae1ef8..6203f2150 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -59,6 +59,7 @@ use crate::ChangeMembers; use crate::LogId; use crate::LogIdOptionExt; use crate::MessageSummary; +use crate::OptionalSend; use crate::RaftState; pub use crate::RaftTypeConfig; use crate::StorageHelper; @@ -149,6 +150,47 @@ where } } +#[cfg(feature = "singlethreaded")] +// SAFETY: Even for a single-threaded Raft, the API object is MT-capable. +// +// The API object just sends the requests to the Raft loop over a channel. If all the relevant +// types in the type config are `Send`, then it's safe to send the request across threads over +// the channel. +// +// Notably, the state machine, log storage and network factory DO NOT have to be `Send`, those +// are only used within Raft task(s) on a single thread. +unsafe impl Send for Raft +where + C: RaftTypeConfig, + N: RaftNetworkFactory, + LS: RaftLogStorage, + SM: RaftStateMachine, + C::D: Send, + C::Entry: Send, + C::Node: Send + Sync, + C::NodeId: Send + Sync, + C::R: Send, +{ +} + +#[cfg(feature = "singlethreaded")] +// SAFETY: Even for a single-threaded Raft, the API object is MT-capable. +// +// See above for details. +unsafe impl Sync for Raft +where + C: RaftTypeConfig + Send, + N: RaftNetworkFactory, + LS: RaftLogStorage, + SM: RaftStateMachine, + C::D: Send, + C::Entry: Send, + C::Node: Send + Sync, + C::NodeId: Send + Sync, + C::R: Send, +{ +} + impl Raft where C: RaftTypeConfig, @@ -698,7 +740,7 @@ where /// destroyed right away and not called at all. pub fn external_request< F: FnOnce(&RaftState::Instant>, &mut LS, &mut N) - + Send + + OptionalSend + 'static, >( &self, diff --git a/openraft/src/storage/adapter.rs b/openraft/src/storage/adapter.rs index 525fedebb..669c83f82 100644 --- a/openraft/src/storage/adapter.rs +++ b/openraft/src/storage/adapter.rs @@ -16,6 +16,7 @@ use crate::storage::RaftStateMachine; use crate::LogId; use crate::LogState; use crate::OptionalSend; +use crate::OptionalSync; use crate::RaftLogReader; use crate::RaftStorage; use crate::RaftTypeConfig; @@ -103,7 +104,7 @@ where C: RaftTypeConfig, S: RaftStorage, { - async fn try_get_log_entries + Clone + Debug + Send + Sync>( + async fn try_get_log_entries + Clone + Debug + OptionalSend + OptionalSync>( &mut self, range: RB, ) -> Result, StorageError> { diff --git a/openraft/src/storage/log_store_ext.rs b/openraft/src/storage/log_store_ext.rs index 30a71a16d..a7649a07e 100644 --- a/openraft/src/storage/log_store_ext.rs +++ b/openraft/src/storage/log_store_ext.rs @@ -5,6 +5,8 @@ use macros::add_async_trait; use crate::defensive::check_range_matches_entries; use crate::LogId; +use crate::OptionalSend; +use crate::OptionalSync; use crate::RaftLogId; use crate::RaftLogReader; use crate::RaftTypeConfig; @@ -26,7 +28,7 @@ where C: RaftTypeConfig /// /// Similar to `try_get_log_entries` except an error will be returned if there is an entry not /// found in the specified range. - async fn get_log_entries + Clone + Debug + Send + Sync>( + async fn get_log_entries + Clone + Debug + OptionalSend + OptionalSync>( &mut self, range: RB, ) -> Result, StorageError> { diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index 36156af04..891148757 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -28,6 +28,7 @@ use crate::LogId; use crate::MessageSummary; use crate::NodeId; use crate::OptionalSend; +use crate::OptionalSync; use crate::RaftTypeConfig; use crate::StorageError; use crate::StoredMembership; @@ -138,7 +139,7 @@ pub struct LogState { /// this interface implemented on the `Arc`. It can be co-implemented with [`RaftStorage`] /// interface on the same cloneable object, if the underlying state machine is anyway synchronized. #[add_async_trait] -pub trait RaftLogReader: Send + Sync + 'static +pub trait RaftLogReader: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Get a series of log entries from storage. @@ -147,7 +148,7 @@ where C: RaftTypeConfig /// stop)`. /// /// Entry that is not found is allowed. - async fn try_get_log_entries + Clone + Debug + Send + Sync>( + async fn try_get_log_entries + Clone + Debug + OptionalSend + OptionalSync>( &mut self, range: RB, ) -> Result, StorageError>; @@ -162,7 +163,7 @@ where C: RaftTypeConfig /// co-implemented with [`RaftStorage`] interface on the same cloneable object, if the underlying /// state machine is anyway synchronized. #[add_async_trait] -pub trait RaftSnapshotBuilder: Send + Sync + 'static +pub trait RaftSnapshotBuilder: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Build snapshot @@ -192,7 +193,7 @@ where C: RaftTypeConfig /// The implementation of the API has to cope with (infrequent) concurrent access from these two /// components. #[add_async_trait] -pub trait RaftStorage: RaftLogReader + Send + Sync + 'static +pub trait RaftStorage: RaftLogReader + OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Log reader type. diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index e69d830a4..d637181aa 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -9,6 +9,7 @@ use crate::storage::v2::sealed::Sealed; use crate::LogId; use crate::LogState; use crate::OptionalSend; +use crate::OptionalSync; use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftTypeConfig; @@ -43,7 +44,7 @@ pub(crate) mod sealed { /// write request before a former write request is completed. This rule applies to both `vote` and /// `log` IO. E.g., Saving a vote and appending a log entry must be serialized too. #[add_async_trait] -pub trait RaftLogStorage: Sealed + RaftLogReader + Send + Sync + 'static +pub trait RaftLogStorage: Sealed + RaftLogReader + OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Log reader type. @@ -141,7 +142,7 @@ where C: RaftTypeConfig /// Snapshot is part of the state machine, because usually a snapshot is the persisted state of the /// state machine. #[add_async_trait] -pub trait RaftStateMachine: Sealed + Send + Sync + 'static +pub trait RaftStateMachine: Sealed + OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Snapshot builder type. diff --git a/openraft/src/timer/timeout.rs b/openraft/src/timer/timeout.rs index 7a18cd0d5..c9bdd7fd6 100644 --- a/openraft/src/timer/timeout.rs +++ b/openraft/src/timer/timeout.rs @@ -8,14 +8,16 @@ use futures::future::Either; use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use tokio::sync::oneshot::Sender; -use tokio::time::sleep_until; -use tokio::time::Instant; use tracing::trace_span; use tracing::Instrument; -pub(crate) trait RaftTimer { +use crate::AsyncRuntime; +use crate::Instant; +use crate::OptionalSend; + +pub(crate) trait RaftTimer { /// Create a new instance that will call `callback` after `timeout`. - fn new(callback: F, timeout: Duration) -> Self; + fn new(callback: F, timeout: Duration) -> Self; /// Update the timeout to a duration since now. fn update_timeout(&self, timeout: Duration); @@ -28,33 +30,33 @@ pub(crate) trait RaftTimer { /// /// The deadline can be updated to a higher value then the old deadline won't trigger the /// `callback`. -pub(crate) struct Timeout { +pub(crate) struct Timeout { /// A guard to notify the inner-task to quit when it is dropped. // tx is not explicitly used. #[allow(dead_code)] tx: Sender<()>, /// Shared state for running the sleep-notify task. - inner: Arc, + inner: Arc>, } -pub(crate) struct TimeoutInner { +pub(crate) struct TimeoutInner { /// The time when this Timeout is created. /// /// The `relative_deadline` stores timeout deadline relative to `init` in micro second. /// Thus a `u64` is enough for it to run for years. - init: Instant, + init: RT::Instant, /// The micro seconds since `init` after which the callback will be triggered. relative_deadline: AtomicU64, } -impl RaftTimer for Timeout { - fn new(callback: F, timeout: Duration) -> Self { +impl RaftTimer for Timeout { + fn new(callback: F, timeout: Duration) -> Self { let (tx, rx) = oneshot::channel(); let inner = TimeoutInner { - init: Instant::now(), + init: RT::Instant::now(), relative_deadline: AtomicU64::new(timeout.as_micros() as u64), }; @@ -65,13 +67,13 @@ impl RaftTimer for Timeout { inner: inner.clone(), }; - tokio::spawn(inner.sleep_loop(rx, callback).instrument(trace_span!("timeout-loop").or_current())); + RT::spawn(inner.sleep_loop(rx, callback).instrument(trace_span!("timeout-loop").or_current())); t } fn update_timeout(&self, timeout: Duration) { - let since_init = Instant::now() + timeout - self.inner.init; + let since_init = RT::Instant::now() + timeout - self.inner.init; let new_at = since_init.as_micros() as u64; @@ -79,11 +81,15 @@ impl RaftTimer for Timeout { } } -impl TimeoutInner { +impl TimeoutInner { /// Sleep until the deadline and send callback if the deadline is not changed. /// Otherwise, sleep again. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn sleep_loop(self: Arc, rx: Receiver<()>, callback: F) { + pub(crate) async fn sleep_loop( + self: Arc, + rx: Receiver<()>, + callback: F, + ) { let mut wake_up_at = None; let mut rx = rx; @@ -102,7 +108,7 @@ impl TimeoutInner { let deadline = self.init + Duration::from_micros(curr_deadline); - let either = select(Box::pin(sleep_until(deadline)), rx).await; + let either = select(Box::pin(RT::sleep_until(deadline)), rx).await; rx = match either { Either::Left((_sleep_res, rx)) => { tracing::debug!("sleep returned, continue to check if deadline changed"); diff --git a/openraft/src/timer/timeout_test.rs b/openraft/src/timer/timeout_test.rs index ab178a044..98de77cc9 100644 --- a/openraft/src/timer/timeout_test.rs +++ b/openraft/src/timer/timeout_test.rs @@ -6,14 +6,27 @@ use tokio::time::Instant; use crate::timer::timeout::RaftTimer; use crate::timer::Timeout; +use crate::TokioRuntime; +#[cfg(not(feature = "singlethreaded"))] #[async_entry::test(worker_threads = 3)] async fn test_timeout() -> anyhow::Result<()> { + test_timeout_inner().await +} + +#[cfg(feature = "singlethreaded")] +#[test] +fn test_timeout() -> anyhow::Result<()> { + let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap(); + tokio::task::LocalSet::new().block_on(&rt, test_timeout_inner()) +} + +async fn test_timeout_inner() -> anyhow::Result<()> { tracing::info!("--- set timeout, recv result"); { let (tx, rx) = oneshot::channel(); let now = Instant::now(); - let _t = Timeout::new( + let _t = Timeout::::new( || { let _ = tx.send(1u64); }, @@ -32,7 +45,7 @@ async fn test_timeout() -> anyhow::Result<()> { { let (tx, rx) = oneshot::channel(); let now = Instant::now(); - let t = Timeout::new( + let t = Timeout::::new( || { let _ = tx.send(1u64); }, @@ -54,7 +67,7 @@ async fn test_timeout() -> anyhow::Result<()> { { let (tx, rx) = oneshot::channel(); let now = Instant::now(); - let t = Timeout::new( + let t = Timeout::::new( || { let _ = tx.send(1u64); }, @@ -76,7 +89,7 @@ async fn test_timeout() -> anyhow::Result<()> { { let (tx, rx) = oneshot::channel(); let now = Instant::now(); - let t = Timeout::new( + let t = Timeout::::new( || { let _ = tx.send(1u64); }, diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 6753a5eb3..9436a9ae4 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -12,6 +12,7 @@ use crate::AsyncRuntime; use crate::Node; use crate::NodeId; use crate::OptionalSend; +use crate::OptionalSync; /// Configuration of types used by the [`Raft`] core engine. /// @@ -40,7 +41,7 @@ use crate::OptionalSend; /// ``` /// [`Raft`]: crate::Raft pub trait RaftTypeConfig: - Sized + Send + Sync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd + 'static + Sized + OptionalSend + OptionalSync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd + 'static { /// Application-specific request data passed to the state machine. type D: AppData; @@ -61,7 +62,7 @@ pub trait RaftTypeConfig: /// /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Sync + Unpin + 'static; + type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + OptionalSync + Unpin + 'static; /// Asynchronous runtime type. type AsyncRuntime: AsyncRuntime;