Skip to content

Commit

Permalink
fix(core-manager): add range check + clippy fixes (#2250)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu authored May 20, 2024
1 parent 7f96659 commit a873a69
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 17 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ jobs:
cli:
needs:
- nox-snapshot
uses: fluencelabs/cli/.github/workflows/tests.yml@main
uses: fluencelabs/cli/.github/workflows/tests.yml@timeout
with:
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"
ref: "timeout"


js-client:
needs:
Expand Down
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ enum_dispatch = "0.3.12"
serde_with = "3.7.0"
mockito = "1.2.0"
clarity = "1.3.0"
cpu-utils = "0.8.0"
ccp-shared = "0.8.0"
ccp-rpc-client = "0.8.0"
cpu-utils = "0.9.0"
ccp-shared = "0.9.0"
ccp-rpc-client = "0.9.0"
alloy-sol-types = "0.6.4"
alloy-primitives = "0.6.4"
alloy_serde_macro = "0.1.2"
Expand Down
3 changes: 2 additions & 1 deletion crates/core-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ multimap = { version = "0.10.0", features = ["serde"] }
bimap = { version = "0.6.3", features = ["serde"] }
toml = "0.8.12"
newtype_derive = "0.1.6"
nonempty = "0.9.0"

tokio = { workspace = true, features = ["fs", "rt", "sync", "macros", "tracing"] }
async-trait.workspace = true
Expand All @@ -25,7 +26,7 @@ serde = { workspace = true, features = ["derive"] }
tracing.workspace = true
tokio-stream.workspace = true
futures.workspace = true
rand = "0.8.5"
rand = { workspace = true }
hex.workspace = true
serde_with = { workspace = true }
hex-utils = { workspace = true, features = ["serde_with"] }
Expand Down
10 changes: 10 additions & 0 deletions crates/core-manager/src/core_range.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use ccp_shared::types::PhysicalCoreId;
use nonempty::NonEmpty;
use range_set_blaze::RangeSetBlaze;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{Debug, Display, Formatter};
Expand All @@ -6,6 +8,14 @@ use thiserror::Error;

#[derive(Clone, PartialEq)]
pub struct CoreRange(pub(crate) RangeSetBlaze<usize>);
impl CoreRange {
pub fn is_subset(&self, cores: &NonEmpty<PhysicalCoreId>) -> bool {
let range: RangeSetBlaze<usize> =
RangeSetBlaze::from_iter(cores.into_iter().map(|core| <usize>::from(*core)));

self.0.is_subset(&range)
}
}

impl Debug for CoreRange {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand Down
28 changes: 25 additions & 3 deletions crates/core-manager/src/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ impl DevCoreManager {
.physical_cores()
.map_err(|err| CreateError::CollectCoresData { err })?;

if !core_range.is_subset(&physical_cores) {
return Err(CreateError::WrongCpuRange);
}

let mut cores_mapping: MultiMap<PhysicalCoreId, LogicalCoreId> =
MultiMap::with_capacity_and_hasher(available_core_count, FxBuildHasher::default());

Expand Down Expand Up @@ -301,7 +305,7 @@ impl CoreManagerFunctions for DevCoreManager {
fn release(&self, unit_ids: &[CUID]) {
let mut lock = self.state.write();
for unit_id in unit_ids {
if let Some(physical_core_id) = lock.unit_id_core_mapping.remove(&unit_id) {
if let Some(physical_core_id) = lock.unit_id_core_mapping.remove(unit_id) {
let mapping = lock.core_unit_id_mapping.get_vec_mut(&physical_core_id);
if let Some(mapping) = mapping {
let index = mapping.iter().position(|x| x == unit_id).unwrap();
Expand All @@ -310,7 +314,7 @@ impl CoreManagerFunctions for DevCoreManager {
lock.core_unit_id_mapping.remove(&physical_core_id);
}
}
lock.work_type_mapping.remove(&unit_id);
lock.work_type_mapping.remove(unit_id);
}
}
}
Expand Down Expand Up @@ -354,10 +358,11 @@ mod tests {
use ccp_shared::types::CUID;
use hex::FromHex;
use rand::RngCore;
use std::str::FromStr;

use crate::manager::CoreManagerFunctions;
use crate::types::{AcquireRequest, WorkType};
use crate::{CoreRange, DevCoreManager};
use crate::{CoreRange, DevCoreManager, StrictCoreManager};

fn cores_exists() -> bool {
num_cpus::get_physical() >= 4
Expand Down Expand Up @@ -539,4 +544,21 @@ mod tests {
assert_eq!(after_assignment_type_mapping.len(), assignment_count * 2);
}
}

#[test]
fn test_wrong_range() {
if cores_exists() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");

let range = CoreRange::from_str("0-16384").unwrap();

let result = StrictCoreManager::from_path(temp_dir.path().join("test.toml"), 2, range);

assert!(result.is_err());
assert_eq!(
result.err().map(|err| err.to_string()),
Some("The specified CPU range exceeds the available CPU count".to_string())
);
}
}
}
2 changes: 2 additions & 0 deletions crates/core-manager/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub enum CreateError {
CreateTopology { err: CPUTopologyError },
#[error("Failed to collect cores data from OS {err:?}")]
CollectCoresData { err: CPUTopologyError },
#[error("The specified CPU range exceeds the available CPU count")]
WrongCpuRange,
}

#[derive(Debug, Error)]
Expand Down
26 changes: 24 additions & 2 deletions crates/core-manager/src/strict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ impl StrictCoreManager {
.physical_cores()
.map_err(|err| CreateError::CollectCoresData { err })?;

if !core_range.is_subset(&physical_cores) {
return Err(CreateError::WrongCpuRange);
}

let mut cores_mapping: MultiMap<PhysicalCoreId, LogicalCoreId> =
MultiMap::with_capacity_and_hasher(available_core_count, FxBuildHasher::default());

Expand Down Expand Up @@ -302,9 +306,9 @@ impl CoreManagerFunctions for StrictCoreManager {
fn release(&self, unit_ids: &[CUID]) {
let mut lock = self.state.write();
for unit_id in unit_ids {
if let Some((physical_core_id, _)) = lock.unit_id_mapping.remove_by_right(&unit_id) {
if let Some((physical_core_id, _)) = lock.unit_id_mapping.remove_by_right(unit_id) {
lock.available_cores.insert(physical_core_id);
lock.work_type_mapping.remove(&unit_id);
lock.work_type_mapping.remove(unit_id);
}
}
}
Expand Down Expand Up @@ -348,6 +352,7 @@ mod tests {
use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID};
use hex::FromHex;
use std::collections::BTreeSet;
use std::str::FromStr;

use crate::manager::CoreManagerFunctions;
use crate::persistence::PersistentCoreManagerState;
Expand Down Expand Up @@ -536,4 +541,21 @@ mod tests {
assert_eq!(expected, result.unwrap_err().to_string());
}
}

#[test]
fn test_wrong_range() {
if cores_exists() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");

let range = CoreRange::from_str("0-16384").unwrap();

let result = StrictCoreManager::from_path(temp_dir.path().join("test.toml"), 2, range);

assert!(result.is_err());
assert_eq!(
result.err().map(|err| err.to_string()),
Some("The specified CPU range exceeds the available CPU count".to_string())
);
}
}
}
15 changes: 14 additions & 1 deletion crates/workers/src/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,21 @@ impl Workers {
.await
.map_err(|_err| WorkersError::FailedToNotifySubsystem { worker_id });
match result {
Ok(_) => Ok(()),
Ok(_) => {
tracing::info!(
target = "worker-registry",
worker_id = worker_id.to_string(),
"Worker created {worker_id}"
);
Ok(())
}
Err(err) => {
tracing::error!(
target = "worker-registry",
worker_id = worker_id.to_string(),
"Failed to notify subsystem for {worker_id}: {}",
err
);
let mut worker_ids = self.worker_ids.write();
let mut worker_infos = self.worker_infos.write();
let mut runtimes = self.runtimes.write();
Expand Down

0 comments on commit a873a69

Please sign in to comment.