Skip to content

Commit

Permalink
fix(core-manager): process core persistence in a stream (#2100)
Browse files Browse the repository at this point in the history
* refactor persistance task

* fix
  • Loading branch information
gurinderu authored Feb 21, 2024
1 parent 0bd4d6e commit 7f333bd
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/core-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ serde = { workspace = true, features = ["derive"] }
types.workspace = true
tracing.workspace = true
serde_with.workspace = true
tokio-stream.workspace = true
futures.workspace = true


[dev-dependencies]
tempfile = { workspace = true }
hex.workspace = true
hex.workspace = true
50 changes: 26 additions & 24 deletions crates/core-manager/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use async_trait::async_trait;
use ccp_shared::types::CUID;
use cpu_utils::{CPUTopology, LogicalCoreId, PhysicalCoreId};
use enum_dispatch::enum_dispatch;
use futures::StreamExt;
use fxhash::{FxBuildHasher, FxHasher};
use parking_lot::RwLock;
use range_set_blaze::RangeSetBlaze;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;

use crate::core_range::CoreRange;
use crate::errors::{AcquireError, CreateError, LoadingError, PersistError};
Expand Down Expand Up @@ -249,39 +251,39 @@ pub struct PersistenceTask {
}

impl PersistenceTask {
async fn process_events(core_manager: Arc<CoreManager>, mut receiver: Receiver<()>) {
async fn process_events<Src>(stream: Src, core_manager: Arc<CoreManager>)
where
Src: futures::Stream<Item = ()> + Unpin + Send + Sync + 'static,
{
let core_manager = core_manager.clone();
// We are not interested in the content of the event
// We are waiting for the event to initiate the persistence process
let _ = receiver.recv().await;
tokio::task::spawn_blocking(move || {
let result = core_manager.persist();
match result {
Ok(_) => {
tracing::debug!(target: "core-manager", "Core state was persisted");
}
Err(err) => {
tracing::warn!(target: "core-manager", "Failed to save core state {err}");
}
stream.for_each(move |_| {
let core_manager = core_manager.clone();
async move {
tokio::task::spawn_blocking(move || {
let result = core_manager.persist();
match result {
Ok(_) => {
tracing::debug!(target: "core-manager", "Core state was persisted");
}
Err(err) => {
tracing::warn!(target: "core-manager", "Failed to save core state {err}");
}
}
})
.await
.expect("Could not spawn persist task")
}
})
.await
.expect("Could not spawn persist task")
}).await;
}

async fn persistence_task(self, core_manager: Arc<CoreManager>) {
let persist_task = Self::process_events(core_manager, self.receiver);
tokio::pin!(persist_task);
loop {
tokio::select! {
_ = &mut persist_task => {}
}
}
}
pub async fn run(self, core_manager: Arc<CoreManager>) {
let stream = ReceiverStream::from(self.receiver);

tokio::task::Builder::new()
.name("core-manager-persist")
.spawn(self.persistence_task(core_manager))
.spawn(Self::process_events(stream, core_manager))
.expect("Could not spawn persist task");
}
}
Expand Down

0 comments on commit 7f333bd

Please sign in to comment.