Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(listener): save pending units on start; refresh state on subs drop #2158

Merged
merged 8 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;

use chain_connector::{CCInitParams, ChainConnector, ConnectorError};
use chain_connector::{ChainConnector, ConnectorError};
use chain_data::{parse_log, peer_id_to_hex, ChainData, Log};
use chain_types::{
CommitmentId, CommitmentStatus, ComputeUnit, DealStatus, PendingUnit, COMMITMENT_IS_NOT_ACTIVE,
Expand All @@ -43,7 +43,7 @@ use crate::event::{
};
use crate::persistence;

const PROOF_POLL_LIMIT: usize = 50;
const PROOF_POLL_LIMIT: usize = 10;

pub struct ChainListener {
config: ChainConfig,
Expand Down Expand Up @@ -98,7 +98,6 @@ impl ChainListener {
host_id: PeerId,
chain_connector: Arc<ChainConnector>,
core_manager: Arc<CoreManager>,
init_params: CCInitParams,
ws_client: WsClient,
ccp_client: Option<CCPRpcHttpClient>,
persisted_proof_id_dir: PathBuf,
Expand All @@ -112,11 +111,11 @@ impl ChainListener {
ws_client,
config: chain_config,
host_id,
difficulty: init_params.difficulty,
init_timestamp: init_params.init_timestamp,
global_nonce: init_params.global_nonce,
current_epoch: init_params.current_epoch,
epoch_duration: init_params.epoch_duration,
difficulty: Difficulty::default(),
init_timestamp: U256::zero(),
global_nonce: GlobalNonce::new([0; 32]).into(),
current_epoch: U256::zero(),
epoch_duration: U256::zero(),
current_commitment: None,
active_compute_units: BTreeSet::new(),
pending_compute_units: BTreeSet::new(),
Expand All @@ -134,14 +133,17 @@ impl ChainListener {
}
}

async fn get_current_commitment_id(&self) -> eyre::Result<Option<CommitmentId>> {
async fn refresh_current_commitment_id(&mut self) -> eyre::Result<()> {
match self.chain_connector.get_current_commitment_id().await {
Ok(id) => Ok(id),
Ok(id) => {
self.current_commitment = id;
Ok(())
}
Err(err) => match err {
ConnectorError::RpcCallError { ref data, .. } => {
if data.contains(PEER_NOT_EXISTS) {
tracing::info!("Peer doesn't exist on chain. Waiting for market offer");
Ok(None)
Ok(())
} else {
tracing::error!(target: "chain-listener", "Failed to get current commitment id: {err}");
Err(err.into())
Expand All @@ -155,11 +157,33 @@ impl ChainListener {
}
}

async fn refresh_commitment_params(&mut self) -> eyre::Result<()> {
let init_params =
justprosh marked this conversation as resolved.
Show resolved Hide resolved
self.chain_connector
.get_cc_init_params()
.await
.map_err(|err| {
tracing::info!(target: "chain-listener", "Error getting Commitment initial params: {err}");
err
})?;

self.difficulty = init_params.difficulty;
self.init_timestamp = init_params.init_timestamp;
self.global_nonce = init_params.global_nonce;
self.epoch_duration = init_params.epoch_duration;
self.current_epoch = init_params.current_epoch;

justprosh marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!(target: "chain-listener","Commitment initial params: difficulty {}, global nonce {}, init_timestamp {}, epoch_duration {}, current_epoch {}", init_params.difficulty, init_params.global_nonce, init_params.init_timestamp, init_params.epoch_duration, init_params.current_epoch);
Ok(())
}

async fn refresh_compute_units(&mut self) -> eyre::Result<()> {
loop {
let result: eyre::Result<()> = try {
self.refresh_commitment_params().await?;

let (active, pending) = self.get_compute_units().await?;
self.current_commitment = self.get_current_commitment_id().await?;
self.refresh_current_commitment_id().await?;

if let Some(ref c) = self.current_commitment {
tracing::info!(target: "chain-listener", "Current commitment id: {}", c);
Expand Down Expand Up @@ -189,6 +213,7 @@ impl ChainListener {
}
CommitmentStatus::WaitDelegation | CommitmentStatus::WaitStart => {
tracing::info!(target: "chain-listener", "Waiting for commitment to be activated; Stopping current one");
self.pending_compute_units.extend(pending);
self.stop_commitment().await?
}
}
Expand Down Expand Up @@ -311,8 +336,8 @@ impl ChainListener {
tracing::info!(target: "chain-listener", "Subscribed successfully");

let setup: eyre::Result<()> = try {
self.load_proof_id().await?;
self.refresh_compute_units().await?;
self.load_proof_id().await?;
};
if let Err(err) = setup {
tracing::error!(target: "chain-listener", "ChainListener: compute units refresh error: {err}");
Expand Down
8 changes: 0 additions & 8 deletions nox/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,6 @@ async fn setup_listener(

let cc_events_dir = config.dir_config.cc_events_dir.clone();
let host_id = config.root_key_pair.get_peer_id();

let init_params = connector.get_cc_init_params().await.map_err(|err| {
log::error!("Error getting Commitment initial params: {err}");
err
})?;
log::info!("Commitment initial params: difficulty {}, global nonce {}, init_timestamp {}, epoch_duration {}, current_epoch {}", init_params.difficulty, init_params.global_nonce, init_params.init_timestamp, init_params.epoch_duration, init_params.current_epoch);

let ws_client = WsClientBuilder::default()
.build(&listener_config.ws_endpoint)
.await
Expand All @@ -164,7 +157,6 @@ async fn setup_listener(
host_id,
connector,
core_manager,
init_params,
ws_client,
ccp_client,
cc_events_dir,
Expand Down
Loading