diff --git a/Cargo.lock b/Cargo.lock index 6e29e63056..36996638a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1367,6 +1367,7 @@ dependencies = [ "ccp-shared", "cpu-utils", "enum_dispatch", + "futures", "fxhash", "hex", "multimap 0.10.0", @@ -1379,6 +1380,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "toml 0.5.11", "tracing", "types", diff --git a/crates/core-manager/Cargo.toml b/crates/core-manager/Cargo.toml index 8c985362c5..53793ce8f9 100644 --- a/crates/core-manager/Cargo.toml +++ b/crates/core-manager/Cargo.toml @@ -25,8 +25,10 @@ serde = { workspace = true, features = ["derive"] } types.workspace = true tracing.workspace = true serde_with.workspace = true +tokio-stream.workspace = true +futures.workspace = true [dev-dependencies] tempfile = { workspace = true } -hex.workspace = true \ No newline at end of file +hex.workspace = true diff --git a/crates/core-manager/src/manager.rs b/crates/core-manager/src/manager.rs index fb1a445e88..54def08f0b 100644 --- a/crates/core-manager/src/manager.rs +++ b/crates/core-manager/src/manager.rs @@ -10,11 +10,13 @@ use async_trait::async_trait; use ccp_shared::types::CUID; use cpu_utils::{CPUTopology, LogicalCoreId, PhysicalCoreId}; use enum_dispatch::enum_dispatch; +use futures::StreamExt; use fxhash::{FxBuildHasher, FxHasher}; use parking_lot::RwLock; use range_set_blaze::RangeSetBlaze; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::Receiver; +use tokio_stream::wrappers::ReceiverStream; use crate::core_range::CoreRange; use crate::errors::{AcquireError, CreateError, LoadingError, PersistError}; @@ -249,39 +251,39 @@ pub struct PersistenceTask { } impl PersistenceTask { - async fn process_events(core_manager: Arc, mut receiver: Receiver<()>) { + async fn process_events(stream: Src, core_manager: Arc) + where + Src: futures::Stream + Unpin + Send + Sync + 'static, + { let core_manager = core_manager.clone(); // We are not interested in the content of the event // We are waiting for the event to initiate the persistence process - let _ = receiver.recv().await; - tokio::task::spawn_blocking(move || { - let result = core_manager.persist(); - match result { - Ok(_) => { - tracing::debug!(target: "core-manager", "Core state was persisted"); - } - Err(err) => { - tracing::warn!(target: "core-manager", "Failed to save core state {err}"); - } + stream.for_each(move |_| { + let core_manager = core_manager.clone(); + async move { + tokio::task::spawn_blocking(move || { + let result = core_manager.persist(); + match result { + Ok(_) => { + tracing::debug!(target: "core-manager", "Core state was persisted"); + } + Err(err) => { + tracing::warn!(target: "core-manager", "Failed to save core state {err}"); + } + } + }) + .await + .expect("Could not spawn persist task") } - }) - .await - .expect("Could not spawn persist task") + }).await; } - async fn persistence_task(self, core_manager: Arc) { - let persist_task = Self::process_events(core_manager, self.receiver); - tokio::pin!(persist_task); - loop { - tokio::select! { - _ = &mut persist_task => {} - } - } - } pub async fn run(self, core_manager: Arc) { + let stream = ReceiverStream::from(self.receiver); + tokio::task::Builder::new() .name("core-manager-persist") - .spawn(self.persistence_task(core_manager)) + .spawn(Self::process_events(stream, core_manager)) .expect("Could not spawn persist task"); } }