Skip to content

Commit

Permalink
feat(cores)!: core manager [fixes NET-739 NET-755 NET-740] (#2069)
Browse files Browse the repository at this point in the history
* feat(cores): cpu_range set

* feat(cores): core manager

* feat(cores): core manager

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* fix dep

* fix dep

* fix dep

* small improvements

* small improvements

* small improvements

* pin system threads

* pin worker threads

* pin worker threads

* compilation fix

* fix tests

* Fix dockerfile

* Fix dockerfile

* Fixes

* Fix tests

* fix order

* fix order

* Fix explore_services_fixed_heavy

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* switch to cpu-utils

* fix clippy

* use CUID

* fix hex

* fix hex

* fix clippy

* fix(deps): decider 0.6.0

---------

Co-authored-by: Anatolios Laskaris <[email protected]>
Co-authored-by: folex <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2024
1 parent 26cdc6e commit 7366f84
Show file tree
Hide file tree
Showing 39 changed files with 1,593 additions and 148 deletions.
327 changes: 287 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ members = [
"crates/chain-listener",
"crates/hex-utils",
"crates/chain-data",
"crates/types"
]
"crates/types",
"crates/core-manager"]
exclude = [
"nox/tests/tetraplets",
]
Expand Down Expand Up @@ -94,10 +94,11 @@ particle-execution = { path = "particle-execution" }
system-services = { path = "crates/system-services" }
health = { path = "crates/health" }
subnet-resolver = { path = "crates/subnet-resolver" }
hex-utils = { path = "crates/hex-utils"}
chain-data = { path = "crates/chain-data"}
chain-listener = { path = "crates/chain-listener"}
types = { path = "crates/types"}
hex-utils = { path = "crates/hex-utils" }
chain-data = { path = "crates/chain-data" }
chain-listener = { path = "crates/chain-listener" }
types = { path = "crates/types" }
core-manager = { path = "crates/core-manager" }

# spell
fluence-spell-dtos = "=0.7.0"
Expand Down Expand Up @@ -131,6 +132,7 @@ bs58 = "0.5.0"
fluence-keypair = "0.10.4"
parking_lot = "0.12.1"
tokio = "1.36.0"
async-trait = "0.1.77"
tokio-stream = "0.1.14"
tokio-util = "0.7.10"
uuid = { version = "1.7.0", features = ["v4"] }
Expand Down Expand Up @@ -160,6 +162,9 @@ jsonrpsee = "0.21.0"
blake3 = "1.5.0"
rand = "0.8.5"
futures-util = "0.3.30"
num_cpus = "1.16.0"
enum_dispatch = "0.3.12"
serde_with = "3.6.0"

# Enable a small amount of optimization in debug mode
[profile.dev]
Expand Down
4 changes: 2 additions & 2 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ humantime = "2.1.0"
anyhow = "1.0.79"
eyre = { workspace = true }
bytesize = "1.3.0"
async-trait = "0.1.77"
async-trait = { workspace = true }
health = { workspace = true }
config = { version = "0.13.4", features = [] }
enum_dispatch = "0.3.12"
enum_dispatch = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
6 changes: 4 additions & 2 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {
use fluence_keypair::KeyPair;
use fluence_libp2p::RandomPeerId;
use futures::task::noop_waker_ref;
use workers::{KeyStorage, PeerScopes, Workers};
use workers::{DummyCoreManager, KeyStorage, PeerScopes, Workers};

use particle_args::Args;
use particle_execution::{FunctionOutcome, ParticleFunction, ParticleParams, ServiceFunction};
Expand Down Expand Up @@ -537,14 +537,16 @@ mod tests {

let key_storage = Arc::new(key_storage);

let core_manager = Arc::new(DummyCoreManager::default().into());

let scope = PeerScopes::new(
root_key_pair.get_peer_id(),
RandomPeerId::random(),
RandomPeerId::random(),
key_storage.clone(),
);

let workers = Workers::from_path(workers_path.clone(), key_storage.clone())
let workers = Workers::from_path(workers_path.clone(), key_storage.clone(), core_manager)
.await
.expect("Could not load worker registry");

Expand Down
2 changes: 1 addition & 1 deletion crates/connected-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ libp2p = { workspace = true, features = ["identify"] }
libp2p-swarm = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
serde = { version = "1.0.196", features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
log = { workspace = true }
derivative = { workspace = true }
Expand Down
32 changes: 32 additions & 0 deletions crates/core-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "core-manager"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fxhash = "0.2.1"
range-set-blaze = "0.1.14"
cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" }
ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" }
multimap = { version = "0.10.0", features = ["serde"] }
bimap = { version = "0.6.3", features = ["serde"] }
toml = { version = "0.8.10" }
newtype_derive = "0.1.6"

tokio = { workspace = true, features = ["fs", "rt", "sync", "macros", "tracing"] }
async-trait.workspace = true
enum_dispatch.workspace = true
num_cpus.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
serde = { workspace = true, features = ["derive"] }
types.workspace = true
tracing.workspace = true
serde_with.workspace = true


[dev-dependencies]
tempfile = { workspace = true }
hex.workspace = true
266 changes: 266 additions & 0 deletions crates/core-manager/src/core_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
use range_set_blaze::RangeSetBlaze;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use thiserror::Error;

#[derive(Clone, PartialEq)]
pub struct CoreRange(pub(crate) RangeSetBlaze<usize>);

impl Debug for CoreRange {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.0.to_string().as_str())
}
}

impl Default for CoreRange {
fn default() -> Self {
CoreRange(RangeSetBlaze::from_iter(0..num_cpus::get_physical()))
}
}

impl TryFrom<&[usize]> for CoreRange {
type Error = ParseError;

fn try_from(value: &[usize]) -> Result<Self, Self::Error> {
if value.is_empty() {
return Err(ParseError::EmptyRange);
}
Ok(CoreRange(RangeSetBlaze::from_iter(value)))
}
}

impl FromStr for CoreRange {
type Err = ParseError;

/// Parse CoreRange from string like "1,2-30,31"
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut result: RangeSetBlaze<usize> = RangeSetBlaze::new();
let trimmed = s.trim();
if trimmed.is_empty() {
return Err(ParseError::EmptyRange);
}

for part in trimmed.split(',') {
let trimmed = part.trim();
// either a single number or a dash range
let range: Vec<&str> = trimmed.split('-').collect();
match range[..] {
[l, r] => {
let l = l
.parse::<usize>()
.map_err(|_| ParseError::WrongRangeFormat {
raw_str: trimmed.to_string(),
})?;
let r = r
.parse::<usize>()
.map_err(|_| ParseError::WrongRangeFormat {
raw_str: trimmed.to_string(),
})?;
// insert the inclusive range
result.ranges_insert(l..=r);
}
[value] => {
let value =
value
.parse::<usize>()
.map_err(|_| ParseError::WrongRangeFormat {
raw_str: trimmed.to_string(),
})?;
result.insert(value);
}
_ => {
return Err(ParseError::WrongRangeFormat {
raw_str: trimmed.to_string(),
});
}
}
}

Ok(CoreRange(result))
}
}

impl Display for CoreRange {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
for (index, range) in self.0.ranges().enumerate() {
if index != 0 {
write!(f, ",")?;
};
let start = range.start();
let end = range.end();
if start == end {
write!(f, "{}", start)?;
} else {
write!(f, "{}-{}", start, end)?;
}
}
Ok(())
}
}

impl Serialize for CoreRange {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.to_string().as_str())
}
}

impl<'de> Deserialize<'de> for CoreRange {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let raw_str = String::deserialize(deserializer)?;
CoreRange::from_str(raw_str.as_str()).map_err(|e| {
serde::de::Error::custom(format!("failed to deserialize core range {raw_str} {e:?}"))
})
}
}

#[derive(Debug, Error, PartialEq)]
pub enum ParseError {
#[error("Range can't be an empty")]
EmptyRange,
#[error("Failed to parse: {raw_str}")]
WrongRangeFormat { raw_str: String },
}

#[cfg(test)]
mod tests {
use crate::core_range::{CoreRange, ParseError};

#[test]
fn range_parsing_test() {
let core_range: CoreRange = "0-2".parse().unwrap();
assert!(core_range.0.contains(0));
assert!(core_range.0.contains(1));
assert!(core_range.0.contains(2));
assert!(!core_range.0.contains(3));
}

#[test]
fn values_parsing_test() {
let core_range: CoreRange = "0,1,3".parse().unwrap();
assert!(core_range.0.contains(0));
assert!(core_range.0.contains(1));
assert!(!core_range.0.contains(2));
assert!(core_range.0.contains(3));
}

#[test]
fn wrong_parsing_test() {
let result = "aaaa".parse::<CoreRange>();
assert!(result.is_err());
if let Err(err) = result {
assert_eq!(
err,
ParseError::WrongRangeFormat {
raw_str: "aaaa".to_string()
}
);
assert_eq!(err.to_string(), "Failed to parse: aaaa")
}
}

#[test]
fn wrong_parsing_test_2() {
let result = "1-a".parse::<CoreRange>();
assert!(result.is_err());
if let Err(err) = result {
assert_eq!(
err,
ParseError::WrongRangeFormat {
raw_str: "1-a".to_string()
}
);
assert_eq!(err.to_string(), "Failed to parse: 1-a")
}
}

#[test]
fn wrong_parsing_test_3() {
let result = "a-1".parse::<CoreRange>();
assert!(result.is_err());
if let Err(err) = result {
assert_eq!(
err,
ParseError::WrongRangeFormat {
raw_str: "a-1".to_string()
}
);
assert_eq!(err.to_string(), "Failed to parse: a-1")
}
}

#[test]
fn wrong_parsing_test_4() {
let result = "a-1-2,3".parse::<CoreRange>();
assert!(result.is_err());
if let Err(err) = result {
assert_eq!(
err,
ParseError::WrongRangeFormat {
raw_str: "a-1-2".to_string()
}
);
assert_eq!(err.to_string(), "Failed to parse: a-1-2")
}
}

#[test]
fn empty_parsing_test_3() {
let result = "".parse::<CoreRange>();
assert!(result.is_err());
if let Err(err) = result {
assert_eq!(err, ParseError::EmptyRange);
}
}

#[test]
fn slice_convert() {
let core_range = CoreRange::try_from(&vec![1, 2, 3, 10][..]).unwrap();

assert!(!core_range.0.contains(0));
assert!(core_range.0.contains(1));
assert!(core_range.0.contains(2));
assert!(core_range.0.contains(3));
assert!(core_range.0.contains(10));
assert!(!core_range.0.contains(11));
}

#[test]
fn empty_slice_convert() {
let result = CoreRange::try_from(&vec![][..]);

assert!(result.is_err());
if let Err(err) = result {
assert_eq!(err, ParseError::EmptyRange);
assert_eq!(err.to_string(), "Range can't be an empty")
}
}

#[test]
fn last() {
let core_range: CoreRange = "0-2".parse().unwrap();
assert!(core_range.0.contains(0));
assert!(core_range.0.contains(1));
assert!(core_range.0.contains(2));
assert!(!core_range.0.contains(3));
}

#[test]
fn compare_ranges() {
let core_range_1: CoreRange = "0-2".parse().unwrap();
let core_range_2: CoreRange = "0,1,2".parse().unwrap();
assert_eq!(core_range_1, core_range_2);
}

#[test]
fn fmt() {
let core_range_1: CoreRange = "0-2,5,7-9".parse().unwrap();
assert_eq!(format!("{}", core_range_1), "0-2,5,7-9");
}
}
Loading

0 comments on commit 7366f84

Please sign in to comment.