Skip to content

Commit

Permalink
feat(chain-listener): integration tests for the chain listener [fixes N…
Browse files Browse the repository at this point in the history
…ET-824, VM-615] (#2290)
  • Loading branch information
gurinderu authored Aug 13, 2024
1 parent ef100c5 commit 3af4541
Show file tree
Hide file tree
Showing 25 changed files with 1,603 additions and 459 deletions.
421 changes: 152 additions & 269 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,20 @@ once_cell = "1.19.0"
tempfile = "3.9.0"
hex = "0.4.3"
ethabi = "18.0.0"
jsonrpsee = "0.22.3"
jsonrpsee = "0.23.0"
blake3 = "1.5.0"
rand = "0.8.5"
futures-util = "0.3.30"
num_cpus = "1.16.0"
enum_dispatch = "0.3.12"
serde_with = "3.8.1"
mockito = "1.2.0"
clarity = "1.3.0"
clarity = "1.4.0"
cpu-utils = "0.13.0"
ccp-shared = "0.13.0"
ccp-rpc-client = "0.13.0"
alloy-sol-types = "0.6.4"
alloy-primitives = "0.6.4"
alloy-sol-types = "0.7.5"
alloy-primitives = "0.7.5"
alloy_serde_macro = "0.1.2"
const-hex = "1.11.3"
bytesize = "1.3.0"
Expand Down
2 changes: 1 addition & 1 deletion aquamarine/src/vm_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl<RT: AquaRuntime> VmPool<RT> {
let fut = &mut idx_fut.1;
if let Poll::Ready(vm) = fut.poll_unpin(cx) {
// Remove completed future
creating_vms.remove(fut_index);
let _ = creating_vms.remove(fut_index);
if creating_vms.is_empty() {
tracing::info!("All {} AquaVMs created.", self.pool_size)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/chain-connector/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl HttpChainConnector {
.for_each(|unit| {
deals
.entry(unit.deal.to_string().into())
.or_insert_with(Vec::new)
.or_default()
.push(CUID::new(unit.id.into()));
});

Expand Down
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/capacity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn is_commitment_not_active(data: &str) -> bool {
))
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct CommitmentId(pub [u8; 32]);

impl CommitmentId {
Expand Down
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/deal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl CIDV1 {

impl Status {
pub fn from_hex(hex: &str) -> Result<Self, ConnectorError> {
let bytes = decode_hex(&hex)?;
let bytes = decode_hex(hex)?;
if bytes.is_empty() {
return Err(ConnectorError::EmptyData(hex.to_string()));
}
Expand Down
5 changes: 3 additions & 2 deletions crates/chain-listener/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

pub mod cc_activated;
mod cc_activated;
mod compute_unit_matched;
mod unit_activated;
mod unit_deactivated;

pub use compute_unit_matched::ComputeUnitMatched;
pub use cc_activated::CommitmentActivated;
pub use compute_unit_matched::{ComputeUnitMatched, CIDV1};
pub use unit_activated::UnitActivated;
pub use unit_deactivated::UnitDeactivated;
7 changes: 6 additions & 1 deletion crates/chain-listener/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
#![feature(extract_if)]
#![feature(btree_extract_if)]
#![feature(result_option_inspect)]

extern crate core;

pub use event::CommitmentActivated;
pub use event::ComputeUnitMatched;
pub use event::UnitActivated;
pub use event::UnitDeactivated;
pub use event::CIDV1;
pub use listener::ChainListener;

mod event;
mod listener;

mod persistence;
mod proof_tracker;
mod types;
52 changes: 16 additions & 36 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tokio::task::JoinHandle;
use tokio::time::{interval, Instant};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;
use tracing::Instrument;

use chain_connector::Offer::ComputeUnit;
use chain_connector::{
Expand All @@ -63,9 +64,10 @@ use peer_metrics::ChainListenerMetrics;
use server_config::{ChainConfig, ChainListenerConfig};
use types::DealId;

use crate::event::cc_activated::CommitmentActivated;
use crate::event::CommitmentActivated;
use crate::event::{ComputeUnitMatched, UnitActivated, UnitDeactivated};
use crate::proof_tracker::ProofTracker;
use crate::types::{CUGroups, PhysicalCoreGroups};

const PROOF_POLL_LIMIT: usize = 50;

Expand Down Expand Up @@ -118,7 +120,9 @@ pub struct ChainListener {
metrics: Option<ChainListenerMetrics>,
}

async fn poll_subscription<T>(s: &mut Option<Subscription<T>>) -> Option<Result<T, client::Error>>
async fn poll_subscription<T>(
s: &mut Option<Subscription<T>>,
) -> Option<Result<T, serde_json::Error>>
where
T: DeserializeOwned + Send,
{
Expand Down Expand Up @@ -277,7 +281,9 @@ impl ChainListener {
}
}
}
})
}
.in_current_span()
)
.expect("Could not spawn task");

result
Expand Down Expand Up @@ -634,7 +640,7 @@ impl ChainListener {

async fn process_new_header(
&mut self,
event: Option<Result<Value, client::Error>>,
event: Option<Result<Value, serde_json::Error>>,
) -> eyre::Result<()> {
let header = event.ok_or(eyre!("Failed to process newHeads event: got None"))?;

Expand Down Expand Up @@ -685,7 +691,7 @@ impl ChainListener {

async fn process_commitment_activated(
&mut self,
event: Option<Result<JsonValue, client::Error>>,
event: Option<Result<JsonValue, serde_json::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!(
"Failed to process CommitmentActivated event: got None"
Expand Down Expand Up @@ -735,7 +741,7 @@ impl ChainListener {

async fn process_unit_activated(
&mut self,
event: Option<Result<JsonValue, client::Error>>,
event: Option<Result<JsonValue, serde_json::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process UnitActivated event: got None"))??;

Expand Down Expand Up @@ -767,7 +773,7 @@ impl ChainListener {
/// Unit goes to Deal
async fn process_unit_deactivated(
&mut self,
event: Option<Result<JsonValue, client::Error>>,
event: Option<Result<JsonValue, serde_json::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process UnitDeactivated event: got None"))??;
let log = serde_json::from_value::<Log>(event.clone()).map_err(|err| {
Expand All @@ -788,7 +794,7 @@ impl ChainListener {

fn process_unit_matched(
&mut self,
event: Option<Result<JsonValue, client::Error>>,
event: Option<Result<JsonValue, serde_json::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process DealMatched event: got None"))??;
let log = serde_json::from_value::<Log>(event.clone()).map_err(|err| {
Expand Down Expand Up @@ -865,8 +871,10 @@ impl ChainListener {
};

let cu_groups = self.get_cu_groups();
tracing::trace!(target: "chain-listener", "cu_groups {:?}", cu_groups);

let cc_cores = self.acquire_cores_for_cc(&cu_groups)?;
tracing::trace!(target: "chain-listener", "cc_cores {:?}", cc_cores);

let mut cu_allocation: HashMap<PhysicalCoreId, CUID> = HashMap::new();

Expand Down Expand Up @@ -1397,34 +1405,6 @@ impl ChainListener {
}
}

struct CUGroups {
/// Already started units involved in CC and not having less than MIN_PROOFS_PER_EPOCH proofs in the current epoch
pub priority_units: Vec<CUID>,
/// Already started units involved in CC and found at least MIN_PROOFS_PER_EPOCH proofs,
/// but less that MAX_PROOFS_PER_EPOCH proofs in the current epoch
pub non_priority_units: Vec<CUID>,
/// Units in CC that is not active yet and can't produce proofs in the current epoch
pub pending_units: Vec<CUID>,
/// Already started units involved in CC and having more than MAX_PROOFS_PER_EPOCH proofs in the current epoch
pub finished_units: Vec<CUID>,
}

impl CUGroups {
fn all_min_proofs_found(&self) -> bool {
self.priority_units.is_empty()
}

fn all_max_proofs_found(&self) -> bool {
self.non_priority_units.is_empty()
}
}
struct PhysicalCoreGroups {
pub priority_cores: Vec<PhysicalCoreId>,
pub non_priority_cores: Vec<PhysicalCoreId>,
pub pending_cores: Vec<PhysicalCoreId>,
pub finished_cores: Vec<PhysicalCoreId>,
}

// measure the request execution time and store it in the metrics
async fn measured_request<Fut, R, E>(
metrics: &Option<ChainListenerMetrics>,
Expand Down
50 changes: 50 additions & 0 deletions crates/chain-listener/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Nox Fluence Peer
*
* Copyright (C) 2024 Fluence DAO
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation version 3 of the
* License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use ccp_shared::types::{PhysicalCoreId, CUID};

#[derive(Debug)]
pub struct CUGroups {
/// Already started units involved in CC and not having less than MIN_PROOFS_PER_EPOCH proofs in the current epoch
pub priority_units: Vec<CUID>,
/// Already started units involved in CC and found at least MIN_PROOFS_PER_EPOCH proofs,
/// but less that MAX_PROOFS_PER_EPOCH proofs in the current epoch
pub non_priority_units: Vec<CUID>,
/// Units in CC that is not active yet and can't produce proofs in the current epoch
pub pending_units: Vec<CUID>,
/// Already started units involved in CC and having more than MAX_PROOFS_PER_EPOCH proofs in the current epoch
pub finished_units: Vec<CUID>,
}

impl CUGroups {
pub fn all_min_proofs_found(&self) -> bool {
self.priority_units.is_empty()
}

pub fn all_max_proofs_found(&self) -> bool {
self.non_priority_units.is_empty()
}
}

#[derive(Debug)]
pub struct PhysicalCoreGroups {
pub priority_cores: Vec<PhysicalCoreId>,
pub non_priority_cores: Vec<PhysicalCoreId>,
pub pending_cores: Vec<PhysicalCoreId>,
pub finished_cores: Vec<PhysicalCoreId>,
}
3 changes: 2 additions & 1 deletion crates/created-swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ system-services = { workspace = true }
tempfile = { workspace = true }
core-distributor = { workspace = true }
test-utils = { workspace = true }
cpu-utils = { workspace = true }
cpu-utils = { workspace = true, features = ["mockall"] }
cid-utils = { workspace = true }
nonempty = { workspace = true }

fluence-keypair = { workspace = true }
log = { workspace = true }
Expand Down
Loading

0 comments on commit 3af4541

Please sign in to comment.