From 44429cb8a21316013fc15a238b2e68142b32e41f Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 01:43:23 +0200 Subject: [PATCH 01/13] feat(chain-listener): exit expired deals --- Cargo.lock | 16 +- Cargo.toml | 8 +- crates/chain-connector/Cargo.toml | 1 + crates/chain-connector/src/connector.rs | 56 ++++- .../src/function/current_epoch.rs | 2 +- .../src/function/difficulty.rs | 2 +- .../src/function/epoch_duration.rs | 2 +- .../src/function/get_commitment.rs | 2 +- .../src/function/get_commitment_status.rs | 6 +- .../src/function/get_compute_peer.rs | 2 +- .../src/function/get_compute_units.rs | 2 +- .../src/function/get_deal_status.rs | 23 ++ .../src/function/global_nonce.rs | 2 +- .../src/function/init_timestamp.rs | 2 +- crates/chain-connector/src/function/mod.rs | 6 +- .../src/function/remove_cu_from_deal.rs | 27 +++ .../src/function/submit_proof.rs | 2 +- crates/chain-data/src/function.rs | 8 +- crates/chain-listener/Cargo.toml | 1 + .../chain-listener/src/event/deal_matched.rs | 203 ++++++++++++++++++ crates/chain-listener/src/event/mod.rs | 2 + .../src/event/unit_activated.rs | 7 +- crates/chain-listener/src/lib.rs | 2 +- crates/chain-listener/src/listener.rs | 138 ++++++++++-- crates/chain-types/Cargo.toml | 1 + crates/chain-types/src/compute_unit.rs | 31 ++- crates/chain-types/src/deal_status.rs | 57 +++++ crates/chain-types/src/lib.rs | 4 +- crates/types/Cargo.toml | 2 +- crates/types/src/deal_id.rs | 9 +- 30 files changed, 569 insertions(+), 57 deletions(-) create mode 100644 crates/chain-connector/src/function/get_deal_status.rs create mode 100644 crates/chain-connector/src/function/remove_cu_from_deal.rs create mode 100644 crates/chain-listener/src/event/deal_matched.rs create mode 100644 crates/chain-types/src/deal_status.rs diff --git a/Cargo.lock b/Cargo.lock index ab37beac2b..60fc481b4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1021,8 +1021,8 @@ dependencies = [ [[package]] name = "ccp-rpc-client" -version = "0.3.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#7eb94c2ce42c5126c35bc34240981a8eb65ba189" +version = "0.4.0" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=type_add_ord#5d90da1e32dc165a6fe663032fd53020f948424c" dependencies = [ "ccp-shared", "hex", @@ -1032,8 +1032,8 @@ dependencies = [ [[package]] name = "ccp-shared" -version = "0.3.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#7eb94c2ce42c5126c35bc34240981a8eb65ba189" +version = "0.4.0" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=type_add_ord#5d90da1e32dc165a6fe663032fd53020f948424c" dependencies = [ "hex", "newtype_derive", @@ -1095,6 +1095,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "types", ] [[package]] @@ -1131,6 +1132,7 @@ dependencies = [ "hex", "hex-utils", "jsonrpsee", + "libipld", "libp2p-identity", "log", "log-utils", @@ -1157,6 +1159,7 @@ dependencies = [ "hex", "serde", "tokio", + "types", ] [[package]] @@ -1518,8 +1521,8 @@ dependencies = [ [[package]] name = "cpu-utils" -version = "0.3.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#7eb94c2ce42c5126c35bc34240981a8eb65ba189" +version = "0.4.0" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=type_add_ord#5d90da1e32dc165a6fe663032fd53020f948424c" dependencies = [ "ccp-shared", "core_affinity", @@ -8380,6 +8383,7 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" name = "types" version = "0.1.0" dependencies = [ + "hex", "libp2p-identity", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 6286af560f..a80724c987 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ members = [ "crates/chain-types", "crates/types", "crates/core-manager", - ] +] exclude = [ "nox/tests/tetraplets", ] @@ -173,9 +173,9 @@ enum_dispatch = "0.3.12" serde_with = "3.6.0" mockito = "1.2.0" clarity = "1.3.0" -cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } -ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } -ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "main" } +cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "type_add_ord" } +ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "type_add_ord" } +ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "type_add_ord" } [profile.dev] diff --git a/crates/chain-connector/Cargo.toml b/crates/chain-connector/Cargo.toml index 49713ba3b7..3ba9998af7 100644 --- a/crates/chain-connector/Cargo.toml +++ b/crates/chain-connector/Cargo.toml @@ -25,6 +25,7 @@ futures = { workspace = true } ccp-shared = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } +types = { workspace = true } [dev-dependencies] mockito = { workspace = true } diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index ffa04b436d..61330a4279 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -18,19 +18,23 @@ use tokio::sync::Mutex; use chain_data::ChainDataError::InvalidTokenSize; use chain_data::{next_opt, parse_chain_data, peer_id_to_bytes, ChainFunction}; -use chain_types::{Commitment, CommitmentId, CommitmentStatus, ComputePeer, ComputeUnit}; +use chain_types::{ + Commitment, CommitmentId, CommitmentStatus, ComputePeer, ComputeUnit, DealStatus, +}; use fluence_libp2p::PeerId; use particle_args::{Args, JError}; use particle_builtins::{wrap, CustomService}; use particle_execution::{ParticleParams, ServiceFunction}; use server_config::ChainConfig; +use types::DealId; use crate::error::{process_response, ConnectorError}; -use crate::function::{GetCommitmentFunction, GetStatusFunction, SubmitProofFunction}; +use crate::function::{GetCommitmentFunction, GetCommitmentStatusFunction, SubmitProofFunction}; use crate::ConnectorError::InvalidBaseFeePerGas; use crate::{ CurrentEpochFunction, DifficultyFunction, EpochDurationFunction, GetComputePeerFunction, - GetComputeUnitsFunction, GetGlobalNonceFunction, InitTimestampFunction, + GetComputeUnitsFunction, GetDealStatusFunction, GetGlobalNonceFunction, InitTimestampFunction, + ReturnComputeUnitFromDeal, }; const BASE_FEE_MULTIPLIER: f64 = 0.125; @@ -166,6 +170,7 @@ impl ChainConnector { pub async fn send_tx(&self, data: Vec, to: &str) -> Result { let base_fee_per_gas = self.get_base_fee_per_gas().await?; + tracing::info!(target: "chain-connector", "Estimating gas for tx from {} to {} data {}", self.config.wallet_key.to_address(), to, hex::encode(&data)); let gas_limit = self.estimate_gas_limit(&data, to).await?; let max_priority_fee_per_gas = self.max_priority_fee_per_gas().await?; @@ -236,7 +241,7 @@ impl ChainConnector { &self, commitment_id: CommitmentId, ) -> Result { - let data = GetStatusFunction::data(&[Token::FixedBytes(commitment_id.0)])?; + let data = GetCommitmentStatusFunction::data(&[Token::FixedBytes(commitment_id.0)])?; let resp: String = process_response( self.client .request( @@ -327,7 +332,7 @@ impl ChainConnector { .await, )?; let mut tokens = - parse_chain_data(&resp, &GetComputeUnitsFunction::signature())?.into_iter(); + parse_chain_data(&resp, &GetComputeUnitsFunction::result_signature())?.into_iter(); let units = next_opt(&mut tokens, "units", Token::into_array)?.into_iter(); let compute_units = units .map(ComputeUnit::from_token) @@ -391,6 +396,47 @@ impl ChainConnector { }) } + pub async fn get_deal_statuses<'a, I>( + &self, + deal_ids: I, + ) -> Result>, ConnectorError> + where + I: Iterator, + { + let mut batch = BatchRequestBuilder::new(); + for deal_id in deal_ids { + let data = GetDealStatusFunction::data(&[])?; + batch.insert( + "eth_call", + rpc_params![ + json!({ + "data": data, + "to": deal_id.to_address(), + }), + "latest" + ], + )?; + } + + let resp: BatchResponse = self.client.batch_request(batch).await?; + let mut statuses = vec![]; + + for status in resp.into_iter() { + let status = status + .map(|r| DealStatus::from(&r).map_err(ConnectorError::ParseChainDataFailed)) + .map_err(|e| ConnectorError::RpcError(e.to_owned().into()))?; + statuses.push(status); + } + + Ok(statuses) + } + + pub async fn exit_deal(&self, deal_id: &DealId) -> Result { + let data = ReturnComputeUnitFromDeal::data_bytes(&[Token::FixedBytes(deal_id.to_bytes())])?; + self.send_tx(data, &self.config.market_contract_address) + .await + } + fn difficulty_params(&self) -> eyre::Result { let data = DifficultyFunction::data(&[])?; Ok(rpc_params![ diff --git a/crates/chain-connector/src/function/current_epoch.rs b/crates/chain-connector/src/function/current_epoch.rs index 936ce91eed..1a441acb03 100644 --- a/crates/chain-connector/src/function/current_epoch.rs +++ b/crates/chain-connector/src/function/current_epoch.rs @@ -17,7 +17,7 @@ impl ChainFunction for CurrentEpochFunction { } } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ParamType::Uint(256)] } } diff --git a/crates/chain-connector/src/function/difficulty.rs b/crates/chain-connector/src/function/difficulty.rs index bfb364457d..4fc5efb306 100644 --- a/crates/chain-connector/src/function/difficulty.rs +++ b/crates/chain-connector/src/function/difficulty.rs @@ -15,7 +15,7 @@ impl ChainFunction for DifficultyFunction { } } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ParamType::FixedBytes(32)] } } diff --git a/crates/chain-connector/src/function/epoch_duration.rs b/crates/chain-connector/src/function/epoch_duration.rs index 5fb519c733..91ca14783a 100644 --- a/crates/chain-connector/src/function/epoch_duration.rs +++ b/crates/chain-connector/src/function/epoch_duration.rs @@ -18,7 +18,7 @@ impl ChainFunction for EpochDurationFunction { } } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ParamType::Uint(256)] } } diff --git a/crates/chain-connector/src/function/get_commitment.rs b/crates/chain-connector/src/function/get_commitment.rs index a2d1230330..3024f184b1 100644 --- a/crates/chain-connector/src/function/get_commitment.rs +++ b/crates/chain-connector/src/function/get_commitment.rs @@ -22,7 +22,7 @@ impl ChainFunction for GetCommitmentFunction { state_mutability: ethabi::StateMutability::View, } } - fn signature() -> Vec { + fn result_signature() -> Vec { Commitment::signature() } } diff --git a/crates/chain-connector/src/function/get_commitment_status.rs b/crates/chain-connector/src/function/get_commitment_status.rs index e4c791e953..ae9303f794 100644 --- a/crates/chain-connector/src/function/get_commitment_status.rs +++ b/crates/chain-connector/src/function/get_commitment_status.rs @@ -4,9 +4,9 @@ use chain_data::ChainFunction; /// @param commitmentId Commitment id /// @return status commitment status /// function getStatus(bytes32 commitmentId) external view returns (CCStatus); -pub struct GetStatusFunction; +pub struct GetCommitmentStatusFunction; -impl ChainFunction for GetStatusFunction { +impl ChainFunction for GetCommitmentStatusFunction { fn function() -> ethabi::Function { #[allow(deprecated)] let function = ethabi::Function { @@ -23,7 +23,7 @@ impl ChainFunction for GetStatusFunction { function } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ethabi::ParamType::FixedBytes(32)] } } diff --git a/crates/chain-connector/src/function/get_compute_peer.rs b/crates/chain-connector/src/function/get_compute_peer.rs index f29ebfc1a1..761c57991d 100644 --- a/crates/chain-connector/src/function/get_compute_peer.rs +++ b/crates/chain-connector/src/function/get_compute_peer.rs @@ -28,7 +28,7 @@ impl ChainFunction for GetComputePeerFunction { } } - fn signature() -> Vec { + fn result_signature() -> Vec { ComputePeer::signature() } } diff --git a/crates/chain-connector/src/function/get_compute_units.rs b/crates/chain-connector/src/function/get_compute_units.rs index 04d70c41fa..ce95fbf5cc 100644 --- a/crates/chain-connector/src/function/get_compute_units.rs +++ b/crates/chain-connector/src/function/get_compute_units.rs @@ -21,7 +21,7 @@ impl ChainFunction for GetComputeUnitsFunction { state_mutability: ethabi::StateMutability::View, } } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ParamType::Array(Box::new(ParamType::Tuple( ComputeUnit::signature(), )))] diff --git a/crates/chain-connector/src/function/get_deal_status.rs b/crates/chain-connector/src/function/get_deal_status.rs new file mode 100644 index 0000000000..7d0f5eb31a --- /dev/null +++ b/crates/chain-connector/src/function/get_deal_status.rs @@ -0,0 +1,23 @@ +use chain_data::ChainFunction; +use ethabi::ParamType; + +/// function getStatus() public view returns (Status) +pub struct GetDealStatusFunction; + +impl ChainFunction for GetDealStatusFunction { + fn function() -> ethabi::Function { + #[allow(deprecated)] + let function = ethabi::Function { + name: "getStatus".to_string(), + inputs: vec![], + outputs: vec![], + constant: None, + state_mutability: ethabi::StateMutability::View, + }; + function + } + + fn result_signature() -> Vec { + vec![ParamType::FixedBytes(32)] + } +} diff --git a/crates/chain-connector/src/function/global_nonce.rs b/crates/chain-connector/src/function/global_nonce.rs index 0f4932e111..240e4bfad1 100644 --- a/crates/chain-connector/src/function/global_nonce.rs +++ b/crates/chain-connector/src/function/global_nonce.rs @@ -17,7 +17,7 @@ impl ChainFunction for GetGlobalNonceFunction { } } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ParamType::FixedBytes(32)] } } diff --git a/crates/chain-connector/src/function/init_timestamp.rs b/crates/chain-connector/src/function/init_timestamp.rs index 0b757846d7..3dcd2d5642 100644 --- a/crates/chain-connector/src/function/init_timestamp.rs +++ b/crates/chain-connector/src/function/init_timestamp.rs @@ -20,7 +20,7 @@ impl ChainFunction for InitTimestampFunction { } } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ParamType::Uint(256)] } } diff --git a/crates/chain-connector/src/function/mod.rs b/crates/chain-connector/src/function/mod.rs index c116865021..0e3c6ddc48 100644 --- a/crates/chain-connector/src/function/mod.rs +++ b/crates/chain-connector/src/function/mod.rs @@ -5,17 +5,21 @@ mod get_commitment; mod get_commitment_status; mod get_compute_peer; mod get_compute_units; +mod get_deal_status; mod global_nonce; mod init_timestamp; +mod remove_cu_from_deal; mod submit_proof; pub use current_epoch::CurrentEpochFunction; pub use difficulty::DifficultyFunction; pub use epoch_duration::EpochDurationFunction; pub use get_commitment::GetCommitmentFunction; -pub use get_commitment_status::GetStatusFunction; +pub use get_commitment_status::GetCommitmentStatusFunction; pub use get_compute_peer::GetComputePeerFunction; pub use get_compute_units::GetComputeUnitsFunction; +pub use get_deal_status::GetDealStatusFunction; pub use global_nonce::GetGlobalNonceFunction; pub use init_timestamp::InitTimestampFunction; +pub use remove_cu_from_deal::ReturnComputeUnitFromDeal; pub use submit_proof::SubmitProofFunction; diff --git a/crates/chain-connector/src/function/remove_cu_from_deal.rs b/crates/chain-connector/src/function/remove_cu_from_deal.rs new file mode 100644 index 0000000000..4a974091c8 --- /dev/null +++ b/crates/chain-connector/src/function/remove_cu_from_deal.rs @@ -0,0 +1,27 @@ +use chain_data::ChainFunction; +use ethabi::{Function, Param, ParamType, StateMutability}; + +/// @dev Return the compute unit from a deal +/// function returnComputeUnitFromDeal(bytes32 unitId) external; +pub struct ReturnComputeUnitFromDeal; + +impl ChainFunction for ReturnComputeUnitFromDeal { + fn function() -> Function { + #[allow(deprecated)] + Function { + name: "returnComputeUnitFromDeal".to_string(), + inputs: vec![Param { + name: "unitId".to_string(), + kind: ParamType::FixedBytes(32), + internal_type: None, + }], + outputs: vec![], + constant: None, + state_mutability: StateMutability::NonPayable, + } + } + + fn result_signature() -> Vec { + vec![] + } +} diff --git a/crates/chain-connector/src/function/submit_proof.rs b/crates/chain-connector/src/function/submit_proof.rs index 4f547d744e..76f52f4b70 100644 --- a/crates/chain-connector/src/function/submit_proof.rs +++ b/crates/chain-connector/src/function/submit_proof.rs @@ -38,7 +38,7 @@ impl ChainFunction for SubmitProofFunction { function } - fn signature() -> Vec { + fn result_signature() -> Vec { vec![ ethabi::ParamType::FixedBytes(32), ethabi::ParamType::FixedBytes(32), diff --git a/crates/chain-data/src/function.rs b/crates/chain-data/src/function.rs index 2c4fbe906a..4581f5a4a3 100644 --- a/crates/chain-data/src/function.rs +++ b/crates/chain-data/src/function.rs @@ -4,7 +4,7 @@ use ethabi::Token; pub trait ChainFunction { fn function() -> ethabi::Function; - fn signature() -> Vec; + fn result_signature() -> Vec; fn data(inputs: &[Token]) -> Result { let function = Self::function(); @@ -18,16 +18,16 @@ pub trait ChainFunction { } fn decode_uint(data: &str) -> Result { - let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter(); + let mut tokens = crate::parse_chain_data(data, &Self::result_signature())?.into_iter(); next_opt(&mut tokens, "uint", Token::into_uint) } fn decode_fixed_bytes(data: &str) -> Result, ChainDataError> { - let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter(); + let mut tokens = crate::parse_chain_data(data, &Self::result_signature())?.into_iter(); next_opt(&mut tokens, "bytes", Token::into_fixed_bytes) } fn decode_tuple(data: &str) -> Result, ChainDataError> { - crate::parse_chain_data(data, &Self::signature()) + crate::parse_chain_data(data, &Self::result_signature()) } } diff --git a/crates/chain-listener/Cargo.toml b/crates/chain-listener/Cargo.toml index 0acc721ebe..079be7ffed 100644 --- a/crates/chain-listener/Cargo.toml +++ b/crates/chain-listener/Cargo.toml @@ -24,6 +24,7 @@ serde_json = { workspace = true } tokio = { workspace = true, features = ["rt"] } server-config = { workspace = true } types = { workspace = true } +libipld = "0.16.0" fluence-libp2p = { workspace = true } tempfile = { workspace = true } diff --git a/crates/chain-listener/src/event/deal_matched.rs b/crates/chain-listener/src/event/deal_matched.rs new file mode 100644 index 0000000000..8d7b45408f --- /dev/null +++ b/crates/chain-listener/src/event/deal_matched.rs @@ -0,0 +1,203 @@ +use ccp_shared::types::CUID; +use chain_data::ChainDataError::InvalidTokenSize; +use chain_data::EventField::{Indexed, NotIndexed}; +use chain_data::{next_opt, parse_peer_id, ChainData, ChainDataError, ChainEvent, EventField}; +use ethabi::ethereum_types::U256; +use ethabi::param_type::ParamType; +use ethabi::Token; +use types::DealId; + +/// Corresponding Solidity type: +/// ```solidity +/// struct CIDV1 { +/// bytes4 prefixes; +/// bytes32 hash; +/// } +/// +/// event ComputeUnitMatched( +/// bytes32 indexed peerId, +/// address deal +/// bytes32 unitId, +/// uint256 dealCreationBlock, +/// CIDV1 appCID +/// ); +/// ``` + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct DealMatchedData { + compute_peer: String, + pub deal_id: DealId, + unit_id: CUID, + deal_creation_block: U256, + app_cid: String, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct DealMatched { + block_number: String, + pub info: DealMatchedData, +} + +impl DealMatched { + pub const EVENT_NAME: &'static str = "ComputeUnitMatched"; +} + +impl ChainData for DealMatchedData { + fn event_name() -> &'static str { + DealMatched::EVENT_NAME + } + + fn signature() -> Vec { + vec![ + // compute_provider + Indexed(ParamType::FixedBytes(32)), + // deal + NotIndexed(ParamType::Address), + // unit_id + NotIndexed(ParamType::FixedBytes(32)), + // deal_creation_block + NotIndexed(ParamType::Uint(256)), + // app_cid + NotIndexed(ParamType::Tuple(vec![ + // prefixes + ParamType::FixedBytes(4), + // hash + ParamType::FixedBytes(32), + ])), + ] + } + + /// Parse data from chain. Accepts data with and without "0x" prefix. + fn parse(data_tokens: &mut impl Iterator) -> Result { + let tokens = &mut data_tokens.into_iter(); + + let compute_peer = next_opt(tokens, "compute_peer", Token::into_fixed_bytes)?; + let compute_peer = parse_peer_id(compute_peer)?.to_string(); + + let deal = next_opt(tokens, "deal", Token::into_address)?; + let unit_id = next_opt(tokens, "unit_id", Token::into_fixed_bytes)?; + let deal_creation_block = next_opt(tokens, "deal_creation_block", Token::into_uint)?; + + let app_cid = &mut next_opt(tokens, "app_cid", Token::into_tuple)?.into_iter(); + let cid_prefixes = next_opt(app_cid, "app_cid.prefixes", Token::into_fixed_bytes)?; + let cid_hash = next_opt(app_cid, "app_cid.cid_hash", Token::into_fixed_bytes)?; + let cid_bytes = [cid_prefixes, cid_hash].concat(); + let app_cid = libipld::Cid::read_bytes(cid_bytes.as_slice()) + .map_err(|_| ChainDataError::InvalidParsedToken("app_cid"))? + .to_string(); + + Ok(DealMatchedData { + compute_peer, + deal_id: format!("{deal:#x}").into(), + unit_id: CUID::new(unit_id.try_into().map_err(|_| InvalidTokenSize)?), + deal_creation_block, + app_cid, + }) + } +} + +impl ChainEvent for DealMatched { + fn new(block_number: String, info: DealMatchedData) -> Self { + Self { block_number, info } + } +} + +#[cfg(test)] +mod tests { + use super::{DealMatched, DealMatchedData}; + use chain_data::{parse_log, parse_peer_id, ChainData, Log}; + use hex_utils::decode_hex; + + #[test] + fn topic() { + assert_eq!( + DealMatchedData::topic(), + String::from("0xb1c5a9179c3104a43de668491f14c45778f00ec34d5deee023af204820483bdb") + ); + } + + #[test] + fn peer_id() { + let bytes = [ + 88, 198, 255, 218, 126, 170, 188, 84, 84, 39, 255, 137, 18, 55, 7, 139, 121, 207, 149, + 42, 196, 115, 102, 160, 4, 47, 227, 62, 7, 53, 189, 15, + ]; + let peer_id = parse_peer_id(bytes.into()).expect("parse peer_id from Token"); + assert_eq!( + peer_id.to_string(), + String::from("12D3KooWFnv3Qc25eKpTDCNBoW1jXHMHHHSzcJoPkHai1b2dHNra") + ); + + let hex = "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7"; + let bytes = decode_hex(hex).expect("parse peer_id from hex"); + let peer_id = parse_peer_id(bytes).expect("parse peer_id from Token"); + assert_eq!( + peer_id.to_string(), + String::from("12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE") + ); + } + + #[test] + fn parse() { + let data1 = "0x000000000000000000000000ffa0611a099ab68ad7c3c67b4ca5bbbee7a58b9900000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000500155122000000000000000000000000000000000000000000000000000000000ae5c519332925f31f747a4edd958fb5b0791b10383ec6d5e77e2264f211e09e300000000000000000000000000000000000000000000000000000000000000036c9d5e8bcc73a422dd6f968f13cd6fc92ccd5609b455cf2c7978cbc694297853fef3b95696986bf289166835e05f723f0fdea97d2bc5fea0ebbbf87b6a866cfa5a5a0f4fa4d41a4f976e799895cce944d5080041dba7d528d30e81c67973bac3".to_string(); + let data2 = "0x00000000000000000000000067b2ad3866429282e16e55b715d12a77f85b7ce800000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000560155122000000000000000000000000000000000000000000000000000000000ae5c519332925f31f747a4edd958fb5b0791b10383ec6d5e77e2264f211e09e300000000000000000000000000000000000000000000000000000000000000036c9d5e8bcc73a422dd6f968f13cd6fc92ccd5609b455cf2c7978cbc694297853fef3b95696986bf289166835e05f723f0fdea97d2bc5fea0ebbbf87b6a866cfa5a5a0f4fa4d41a4f976e799895cce944d5080041dba7d528d30e81c67973bac3".to_string(); + let log1 = Log { + data: data1, + block_number: "0x0".to_string(), + removed: false, + topics: vec![ + DealMatchedData::topic(), + "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7".to_string(), + ], + }; + let log2 = Log { + data: data2, + block_number: "0x1".to_string(), + removed: false, + topics: vec![ + DealMatchedData::topic(), + "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7".to_string(), + ], + }; + + let m = + parse_log::(log1).expect("error parsing Match from log"); + assert_eq!(m.block_number, "0x0"); + let m = m.info; + assert_eq!( + m.compute_peer, + "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE" + ); + assert_eq!(m.deal_id, "0xFfA0611a099AB68AD7C3C67B4cA5bbBEE7a58B99"); + assert_eq!( + m.unit_id.to_string(), + "00000000000000000000000000000000000000000000000000000000000000a0" + ); + assert_eq!(m.deal_creation_block, 80.into()); + assert_eq!( + m.app_cid, + "bafkreifolrizgmusl4y7or5e5xmvr623a6i3ca4d5rwv457cezhschqj4m" + ); + + let m = + parse_log::(log2).expect("error parsing Match from log"); + assert_eq!(m.block_number, "0x1"); + let m = m.info; + assert_eq!( + m.compute_peer, + "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE" + ); + assert_eq!(m.deal_id, "0x67b2AD3866429282e16e55B715d12A77F85B7CE8"); + assert_eq!( + m.unit_id.to_string(), + "00000000000000000000000000000000000000000000000000000000000000a0" + ); + assert_eq!(m.deal_creation_block, 86.into()); + assert_eq!( + m.app_cid, + "bafkreifolrizgmusl4y7or5e5xmvr623a6i3ca4d5rwv457cezhschqj4m" + ); + } +} diff --git a/crates/chain-listener/src/event/mod.rs b/crates/chain-listener/src/event/mod.rs index 9868c2de52..150b9a284b 100644 --- a/crates/chain-listener/src/event/mod.rs +++ b/crates/chain-listener/src/event/mod.rs @@ -1,7 +1,9 @@ pub mod cc_activated; +mod deal_matched; mod unit_activated; mod unit_deactivated; pub use cc_activated::CommitmentActivatedData; +pub use deal_matched::{DealMatched, DealMatchedData}; pub use unit_activated::{UnitActivated, UnitActivatedData}; pub use unit_deactivated::{UnitDeactivated, UnitDeactivatedData}; diff --git a/crates/chain-listener/src/event/unit_activated.rs b/crates/chain-listener/src/event/unit_activated.rs index ecda707696..b77ba0adaa 100644 --- a/crates/chain-listener/src/event/unit_activated.rs +++ b/crates/chain-listener/src/event/unit_activated.rs @@ -1,7 +1,7 @@ use chain_data::ChainDataError::InvalidTokenSize; use chain_data::EventField::{Indexed, NotIndexed}; use chain_data::{next_opt, ChainData, ChainDataError, ChainEvent, EventField}; -use chain_types::{CommitmentId, ComputeUnit}; +use chain_types::{CommitmentId, PendingUnit}; use core_manager::CUID; use ethabi::ethereum_types::U256; use ethabi::param_type::ParamType; @@ -73,11 +73,10 @@ impl ChainEvent for UnitActivated { } } -impl From for ComputeUnit { +impl From for PendingUnit { fn from(data: UnitActivatedData) -> Self { - ComputeUnit { + PendingUnit { id: data.unit_id, - deal: None, start_epoch: data.start_epoch, } } diff --git a/crates/chain-listener/src/lib.rs b/crates/chain-listener/src/lib.rs index 6425aa20db..ad1ff206a9 100644 --- a/crates/chain-listener/src/lib.rs +++ b/crates/chain-listener/src/lib.rs @@ -1,7 +1,7 @@ #![feature(assert_matches)] #![feature(try_blocks)] #![feature(extract_if)] -#![feature(hash_extract_if)] +#![feature(btree_extract_if)] pub use listener::ChainListener; diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 76211874cd..47954588a8 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1,5 +1,5 @@ use backoff::Error::Permanent; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap}; use std::path::PathBuf; use std::process::exit; use std::sync::Arc; @@ -27,17 +27,19 @@ use tokio_stream::StreamExt; use chain_connector::{CCInitParams, ChainConnector, ConnectorError}; use chain_data::{parse_log, peer_id_to_hex, ChainData, Log}; use chain_types::{ - CommitmentId, CommitmentStatus, ComputeUnit, COMMITMENT_IS_NOT_ACTIVE, PEER_NOT_EXISTS, - TOO_MANY_PROOFS, + CommitmentId, CommitmentStatus, ComputeUnit, DealStatus, PendingUnit, COMMITMENT_IS_NOT_ACTIVE, + PEER_NOT_EXISTS, TOO_MANY_PROOFS, }; use core_manager::manager::{CoreManager, CoreManagerFunctions}; use core_manager::types::{AcquireRequest, WorkType}; use core_manager::CUID; use server_config::{ChainConfig, ChainListenerConfig}; +use types::DealId; use crate::event::cc_activated::CommitmentActivated; use crate::event::{ - CommitmentActivatedData, UnitActivated, UnitActivatedData, UnitDeactivated, UnitDeactivatedData, + CommitmentActivatedData, DealMatched, DealMatchedData, UnitActivated, UnitActivatedData, + UnitDeactivated, UnitDeactivatedData, }; use crate::persistence; @@ -63,8 +65,10 @@ pub struct ChainListener { // proof_counter: HashMap, current_commitment: Option, - active_compute_units: HashSet, - pending_compute_units: HashSet, + active_compute_units: BTreeSet, + pending_compute_units: BTreeSet, + + active_deals: BTreeSet, /// Resets every epoch last_submitted_proof_id: ProofIdx, @@ -74,6 +78,7 @@ pub struct ChainListener { unit_deactivated: Option>, heads: Option>, commitment_activated: Option>, + unit_matched: Option>, } async fn poll_subscription(s: &mut Option>) -> Option> @@ -113,8 +118,8 @@ impl ChainListener { current_epoch: init_params.current_epoch, epoch_duration: init_params.epoch_duration, current_commitment: None, - active_compute_units: HashSet::new(), - pending_compute_units: HashSet::new(), + active_compute_units: BTreeSet::new(), + pending_compute_units: BTreeSet::new(), core_manager, timer_resolution: listener_config.proof_poll_period, ccp_client, @@ -124,6 +129,8 @@ impl ChainListener { unit_deactivated: None, heads: None, commitment_activated: None, + unit_matched: None, + active_deals: BTreeSet::new(), } } @@ -266,6 +273,8 @@ impl ChainListener { })?; Ok(()) }).await?; + + tracing::info!("Utility core {utility_core} successfully reallocated"); } Ok(()) } @@ -289,6 +298,11 @@ impl ChainListener { tracing::error!(target: "chain-listener", "Failed to subscribe to CommitmentActivated event: {err}; Stopping..."); exit(1); } + + if let Err(err) = self.subscribe_deal_matched().await { + tracing::error!(target: "chain-listener", "Failed to subscribe to deal matched: {err}; Stopping..."); + exit(1); + } tracing::info!(target: "chain-listener", "Subscribed successfully"); let setup: eyre::Result<()> = try { @@ -340,6 +354,15 @@ impl ChainListener { } } }, + Some(event) = poll_subscription(&mut self.unit_matched) => { + if let Err(err) = self.process_deal_matched(event) { + tracing::error!(target: "chain-listener", "DealMatched event processing error: {err}"); + if let Err(err) = self.subscribe_deal_matched().await { + tracing::error!(target: "chain-listener", "Failed to resubscribe to DealMatched: {err}; Stopping..."); + exit(1); + } + } + }, _ = timer.next() => { if self.ccp_client.is_some() { if let Err(err) = self.poll_proofs().await { @@ -350,6 +373,10 @@ impl ChainListener { tracing::error!(target: "chain-listener", "Failed to submit mocked proofs: {err}"); } } + + if let Err(err) = self.poll_deal_statuses().await { + tracing::error!(target: "chain-listener", "Failed to poll deal statuses: {err}"); + } } } } @@ -371,17 +398,21 @@ impl ChainListener { } } - /// Returns active and pending compute units - async fn get_compute_units(&self) -> eyre::Result<(Vec, Vec)> { + /// Returns active and pending compute units + /// + async fn get_compute_units(&mut self) -> eyre::Result<(Vec, Vec)> { let mut units = self.chain_connector.get_compute_units().await?; let in_deal: Vec<_> = units.extract_if(|cu| cu.deal.is_some()).collect(); + self.active_deals = in_deal.iter().map(|cu| cu.deal.clone()).flatten().collect(); let (active, pending): (Vec, Vec) = units .into_iter() + .filter(|unit| unit.deal.is_none()) .partition(|unit| unit.start_epoch <= self.current_epoch); let active: Vec<_> = active.into_iter().map(|unit| unit.id).collect(); + let pending: Vec = pending.into_iter().map(PendingUnit::from).collect(); tracing::info!(target: "chain-listener", "Compute units mapping: active {}, pending {}, in deal {}", @@ -503,6 +534,31 @@ impl ChainListener { Ok(()) } + async fn subscribe_deal_matched(&mut self) -> eyre::Result<()> { + let sub = retry(ExponentialBackoff::default(), || async { + let topics = vec![ + DealMatchedData::topic(), + peer_id_to_hex(self.host_id), + ]; + let params = rpc_params![ + "logs", + json!({"address": self.config.market_contract_address, "topics": topics}) + ]; + let subs = self + .ws_client + .subscribe("eth_subscribe", params, "eth_unsubscribe") + .await.map_err(|err| { + tracing::error!(target: "chain-listener", "Failed to subscribe to deal matched: {err}; Retrying..."); + eyre!(err) + })?; + + Ok(subs) + }).await?; + + self.unit_matched = Some(sub); + Ok(()) + } + async fn process_new_header( &mut self, header: Result, @@ -579,7 +635,7 @@ impl ChainListener { } else { self.pending_compute_units = unit_ids .into_iter() - .map(|id| ComputeUnit::new(id, cc_event.info.start_epoch)) + .map(|id| PendingUnit::new(id, cc_event.info.start_epoch)) .collect(); self.stop_commitment().await?; } @@ -627,6 +683,17 @@ impl ChainListener { Ok(()) } + pub fn process_deal_matched(&mut self, event: Result) -> eyre::Result<()> { + let deal_event = parse_log::(event?)?; + tracing::info!(target: "chain-listener", + "Received DealMatched event for deal: {}", + deal_event.info.deal_id + ); + + self.active_deals.insert(deal_event.info.deal_id); + Ok(()) + } + /// Send GlobalNonce, Difficulty and Core<>CUID mapping (full commitment info) to CCP async fn refresh_commitment(&self) -> eyre::Result<()> { if self.active_compute_units.is_empty() { @@ -812,7 +879,7 @@ impl ChainListener { self.active_compute_units.remove(&proof.cu_id); self.pending_compute_units - .insert(ComputeUnit::new(proof.cu_id, self.current_epoch + 1)); + .insert(PendingUnit::new(proof.cu_id, self.current_epoch + 1)); self.refresh_commitment().await?; Ok(()) @@ -858,4 +925,51 @@ impl ChainListener { Ok(U256::from_str_radix(×tamp, 16)?) } + async fn poll_deal_statuses(&mut self) -> eyre::Result<()> { + let statuses = retry(ExponentialBackoff::default(), || async { + let s = self.chain_connector.get_deal_statuses(self.active_deals.iter()).await.map_err(|err| { + tracing::error!(target: "chain-listener", "Failed to poll deal statuses: {err}"); + eyre!("Failed to poll deal statuses: {err}") + })?; + + Ok(s) + }) + .await?; + + for (status, deal_id) in statuses + .into_iter() + .zip(self.active_deals.clone().into_iter()) + { + match status { + Ok(status) => match status { + DealStatus::InsufficientFunds | DealStatus::Ended => { + tracing::info!(target: "chain-listener", "Deal {deal_id} status: {status}"); + self.exit_deal(deal_id).await?; + } + DealStatus::Active + | DealStatus::NotEnoughWorkers + | DealStatus::SmallBalance => {} + }, + Err(err) => { + tracing::error!(target: "chain-listener", "Failed to get deal status for {deal_id}: {err}"); + } + } + } + + Ok(()) + } + async fn exit_deal(&mut self, deal_id: DealId) -> eyre::Result<()> { + tracing::info!(target: "chain-listener", "Exiting deal: {deal_id}"); + retry(ExponentialBackoff::default(), || async { + self.chain_connector.exit_deal(&deal_id).await.map_err(|err| { + tracing::error!(target: "chain-listener", "Failed to exit deal {deal_id}: {err}"); + eyre!("Failed to exit deal {deal_id}: {err}") + })?; + Ok(()) + }) + .await?; + + self.active_deals.remove(&deal_id); + Ok(()) + } } diff --git a/crates/chain-types/Cargo.toml b/crates/chain-types/Cargo.toml index 4daf6d3b8f..28490bd5d6 100644 --- a/crates/chain-types/Cargo.toml +++ b/crates/chain-types/Cargo.toml @@ -12,6 +12,7 @@ eyre = { workspace = true } chain-data = { workspace = true } hex = { workspace = true } ccp-shared = { workspace = true } +types = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/crates/chain-types/src/compute_unit.rs b/crates/chain-types/src/compute_unit.rs index 53f7427d49..663481cd21 100644 --- a/crates/chain-types/src/compute_unit.rs +++ b/crates/chain-types/src/compute_unit.rs @@ -3,6 +3,7 @@ use chain_data::ChainDataError::InvalidParsedToken; use chain_data::{next_opt, parse_chain_data, ChainDataError}; use ethabi::ethereum_types::U256; use ethabi::Token; +use types::DealId; /// struct ComputeUnitView { /// bytes32 id; @@ -13,7 +14,7 @@ use ethabi::Token; pub struct ComputeUnit { pub id: CUID, /// if deal is zero-address, it means the unit is not assigned to any deal - pub deal: Option, + pub deal: Option, pub start_epoch: U256, } @@ -33,7 +34,7 @@ impl ComputeUnit { ] } - pub fn from(data: &str) -> Result { + pub fn parse(data: &str) -> Result { let mut tokens = parse_chain_data(data, &Self::signature())?.into_iter(); Self::from_tokens(&mut tokens) } @@ -58,7 +59,7 @@ impl ComputeUnit { let deal = if deal.is_zero() { None } else { - Some(format!("{deal:#x}")) + Some(format!("{deal:#x}").into()) }; let start_epoch = next_opt(data_tokens, "start_epoch", Token::into_uint)?; @@ -74,6 +75,26 @@ impl ComputeUnit { } } +#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub struct PendingUnit { + pub id: CUID, + pub start_epoch: U256, +} + +impl PendingUnit { + pub fn new(id: CUID, start_epoch: U256) -> Self { + Self { id, start_epoch } + } +} +impl From for PendingUnit { + fn from(unit: ComputeUnit) -> Self { + Self { + id: unit.id, + start_epoch: unit.start_epoch, + } + } +} + #[cfg(test)] mod tests { use ccp_shared::types::CUID; @@ -82,7 +103,7 @@ mod tests { #[tokio::test] async fn decode_compute_unit() { let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000003e8"; - let compute_unit = super::ComputeUnit::from(data); + let compute_unit = super::ComputeUnit::parse(data); assert!(compute_unit.is_ok()); let compute_unit = compute_unit.unwrap(); @@ -102,7 +123,7 @@ mod tests { #[tokio::test] async fn decode_compute_unit_no_deal() { let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003e8"; - let compute_unit = super::ComputeUnit::from(data); + let compute_unit = super::ComputeUnit::parse(data); assert!(compute_unit.is_ok()); let compute_unit = compute_unit.unwrap(); assert_eq!( diff --git a/crates/chain-types/src/deal_status.rs b/crates/chain-types/src/deal_status.rs new file mode 100644 index 0000000000..ce9d349199 --- /dev/null +++ b/crates/chain-types/src/deal_status.rs @@ -0,0 +1,57 @@ +use std::fmt::Display; + +use chain_data::{next_opt, parse_chain_data, ChainDataError}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DealStatus { + // Deal does have enough funds to pay for the workers + InsufficientFunds = 0, + Active, + // Deal is stopped + Ended, + // Deal has a balance and waiting for workers + NotEnoughWorkers, + // Deal has balance less than the minimal balance. Min balance: 2 * targetWorkers * pricePerWorkerEpoch + SmallBalance, +} + +impl DealStatus { + pub fn signature() -> Vec { + vec![ethabi::ParamType::Uint(8)] + } + pub fn from_num(num: u8) -> Option { + match num { + 0 => Some(DealStatus::InsufficientFunds), + 1 => Some(DealStatus::Active), + 2 => Some(DealStatus::Ended), + 3 => Some(DealStatus::NotEnoughWorkers), + 4 => Some(DealStatus::SmallBalance), + _ => None, + } + } + + pub fn from_token(token: ethabi::Token) -> Option { + token + .into_uint() + .and_then(|u| Self::from_num(u.as_u64() as u8)) + } + + pub fn from(data: &str) -> Result { + let mut tokens = parse_chain_data(data, &Self::signature())?.into_iter(); + next_opt(&mut tokens, "deal_status", Self::from_token) + } +} + +impl Display for DealStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { + DealStatus::Active => "Active", + DealStatus::Ended => "Ended", + DealStatus::NotEnoughWorkers => "NotEnoughWorkers", + DealStatus::SmallBalance => "SmallBalance", + DealStatus::InsufficientFunds => "InsufficientFunds", + } + .to_string(); + write!(f, "{}", str) + } +} diff --git a/crates/chain-types/src/lib.rs b/crates/chain-types/src/lib.rs index 76fc647311..a4b958ffe7 100644 --- a/crates/chain-types/src/lib.rs +++ b/crates/chain-types/src/lib.rs @@ -2,12 +2,14 @@ mod commitment; mod commitment_status; mod compute_peer; mod compute_unit; +mod deal_status; mod errors; mod types; pub use commitment::Commitment; pub use commitment_status::CommitmentStatus; pub use compute_peer::ComputePeer; -pub use compute_unit::ComputeUnit; +pub use compute_unit::{ComputeUnit, PendingUnit}; +pub use deal_status::DealStatus; pub use errors::*; pub use types::*; diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 90b3472fb5..42278c3c84 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] libp2p-identity = { workspace = true, features = ["peerid", "ed25519", "rand"] } serde = { workspace = true, features = ["derive"] } - +hex = { workspace = true } [dev-dependencies] serde_json = { workspace = true } diff --git a/crates/types/src/deal_id.rs b/crates/types/src/deal_id.rs index 308532e476..c391488437 100644 --- a/crates/types/src/deal_id.rs +++ b/crates/types/src/deal_id.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::borrow::{Borrow, Cow}; use std::fmt::Display; -#[derive(Eq, Clone, Debug, Hash, PartialEq)] +#[derive(Eq, Clone, Debug, Hash, PartialEq, PartialOrd, Ord)] pub struct DealId(String); impl DealId { @@ -13,6 +13,13 @@ impl DealId { pub fn get_contract_address(&self) -> String { format!("0x{}", self.0) } + + pub fn to_address(&self) -> String { + format!("0x{}", self.0) + } + pub fn to_bytes(&self) -> Vec { + hex::decode(&self.0).unwrap() + } } impl<'de> Deserialize<'de> for DealId { From 86bd79bb728103bfe47dca61e4e1180596c2c801 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 01:44:31 +0200 Subject: [PATCH 02/13] update --- Cargo.lock | 6 +++--- Cargo.toml | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60fc481b4f..cc83048a73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1022,7 +1022,7 @@ dependencies = [ [[package]] name = "ccp-rpc-client" version = "0.4.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=type_add_ord#5d90da1e32dc165a6fe663032fd53020f948424c" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#2beb3b065c58e0e792a742e3378a685b275b53e9" dependencies = [ "ccp-shared", "hex", @@ -1033,7 +1033,7 @@ dependencies = [ [[package]] name = "ccp-shared" version = "0.4.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=type_add_ord#5d90da1e32dc165a6fe663032fd53020f948424c" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#2beb3b065c58e0e792a742e3378a685b275b53e9" dependencies = [ "hex", "newtype_derive", @@ -1522,7 +1522,7 @@ dependencies = [ [[package]] name = "cpu-utils" version = "0.4.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=type_add_ord#5d90da1e32dc165a6fe663032fd53020f948424c" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#2beb3b065c58e0e792a742e3378a685b275b53e9" dependencies = [ "ccp-shared", "core_affinity", diff --git a/Cargo.toml b/Cargo.toml index a80724c987..7150356301 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,9 +173,9 @@ enum_dispatch = "0.3.12" serde_with = "3.6.0" mockito = "1.2.0" clarity = "1.3.0" -cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "type_add_ord" } -ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "type_add_ord" } -ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "type_add_ord" } +cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } +ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } +ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "main" } [profile.dev] From 87961646f22156ffdcaa5df349f312b55c054790 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 01:46:35 +0200 Subject: [PATCH 03/13] fix --- crates/chain-listener/src/listener.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 47954588a8..5849e3937a 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -399,7 +399,6 @@ impl ChainListener { } /// Returns active and pending compute units - /// async fn get_compute_units(&mut self) -> eyre::Result<(Vec, Vec)> { let mut units = self.chain_connector.get_compute_units().await?; From d990a937db84ccd412a26e1f8140a3735ad89fdf Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 02:11:34 +0200 Subject: [PATCH 04/13] fix --- crates/chain-listener/src/listener.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 5849e3937a..4810a76a8d 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -925,6 +925,10 @@ impl ChainListener { Ok(U256::from_str_radix(×tamp, 16)?) } async fn poll_deal_statuses(&mut self) -> eyre::Result<()> { + if self.active_deals.is_empty() { + return Ok(()); + } + let statuses = retry(ExponentialBackoff::default(), || async { let s = self.chain_connector.get_deal_statuses(self.active_deals.iter()).await.map_err(|err| { tracing::error!(target: "chain-listener", "Failed to poll deal statuses: {err}"); From a3d970069711f71d40d584017a44f62b4664016d Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 02:13:39 +0200 Subject: [PATCH 05/13] fix --- crates/chain-listener/src/listener.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 4810a76a8d..ef183fb700 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -932,7 +932,7 @@ impl ChainListener { let statuses = retry(ExponentialBackoff::default(), || async { let s = self.chain_connector.get_deal_statuses(self.active_deals.iter()).await.map_err(|err| { tracing::error!(target: "chain-listener", "Failed to poll deal statuses: {err}"); - eyre!("Failed to poll deal statuses: {err}") + eyre!("Failed to poll deal statuses: {err}; Retrying...") })?; Ok(s) @@ -963,10 +963,13 @@ impl ChainListener { } async fn exit_deal(&mut self, deal_id: DealId) -> eyre::Result<()> { tracing::info!(target: "chain-listener", "Exiting deal: {deal_id}"); - retry(ExponentialBackoff::default(), || async { + let mut backoff = ExponentialBackoff::default(); + backoff.max_elapsed_time = Some(Duration::from_secs(3)); + + retry(backoff, || async { self.chain_connector.exit_deal(&deal_id).await.map_err(|err| { tracing::error!(target: "chain-listener", "Failed to exit deal {deal_id}: {err}"); - eyre!("Failed to exit deal {deal_id}: {err}") + eyre!("Failed to exit deal {deal_id}: {err}; Retrying...") })?; Ok(()) }) From 643d017856e6b42bd50c4f9cccc7839f7be1fe4b Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 02:55:03 +0200 Subject: [PATCH 06/13] fix --- crates/chain-connector/src/connector.rs | 7 +++-- .../chain-listener/src/event/deal_matched.rs | 2 +- crates/chain-listener/src/listener.rs | 30 +++++++++++-------- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index 61330a4279..3857521ee8 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use ccp_shared::proof::CCProof; -use ccp_shared::types::{Difficulty, GlobalNonce}; +use ccp_shared::types::{Difficulty, GlobalNonce, CUID}; use clarity::Transaction; use ethabi::ethereum_types::U256; use ethabi::Token; @@ -431,8 +431,9 @@ impl ChainConnector { Ok(statuses) } - pub async fn exit_deal(&self, deal_id: &DealId) -> Result { - let data = ReturnComputeUnitFromDeal::data_bytes(&[Token::FixedBytes(deal_id.to_bytes())])?; + pub async fn exit_deal(&self, cu_id: &CUID) -> Result { + let data = + ReturnComputeUnitFromDeal::data_bytes(&[Token::FixedBytes(cu_id.as_ref().to_vec())])?; self.send_tx(data, &self.config.market_contract_address) .await } diff --git a/crates/chain-listener/src/event/deal_matched.rs b/crates/chain-listener/src/event/deal_matched.rs index 8d7b45408f..ea0898acf3 100644 --- a/crates/chain-listener/src/event/deal_matched.rs +++ b/crates/chain-listener/src/event/deal_matched.rs @@ -28,7 +28,7 @@ use types::DealId; pub struct DealMatchedData { compute_peer: String, pub deal_id: DealId, - unit_id: CUID, + pub(crate) unit_id: CUID, deal_creation_block: U256, app_cid: String, } diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index ef183fb700..ff0ca869df 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1,5 +1,5 @@ use backoff::Error::Permanent; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::path::PathBuf; use std::process::exit; use std::sync::Arc; @@ -68,7 +68,7 @@ pub struct ChainListener { active_compute_units: BTreeSet, pending_compute_units: BTreeSet, - active_deals: BTreeSet, + active_deals: BTreeMap, /// Resets every epoch last_submitted_proof_id: ProofIdx, @@ -130,7 +130,7 @@ impl ChainListener { heads: None, commitment_activated: None, unit_matched: None, - active_deals: BTreeSet::new(), + active_deals: BTreeMap::new(), } } @@ -403,11 +403,9 @@ impl ChainListener { let mut units = self.chain_connector.get_compute_units().await?; let in_deal: Vec<_> = units.extract_if(|cu| cu.deal.is_some()).collect(); - self.active_deals = in_deal.iter().map(|cu| cu.deal.clone()).flatten().collect(); let (active, pending): (Vec, Vec) = units .into_iter() - .filter(|unit| unit.deal.is_none()) .partition(|unit| unit.start_epoch <= self.current_epoch); let active: Vec<_> = active.into_iter().map(|unit| unit.id).collect(); @@ -440,6 +438,12 @@ impl ChainListener { .collect::>() ); + for cu in in_deal { + if let Some(deal) = cu.deal { + self.active_deals.insert(deal, cu.id); + } + } + Ok((active, pending)) } @@ -689,7 +693,8 @@ impl ChainListener { deal_event.info.deal_id ); - self.active_deals.insert(deal_event.info.deal_id); + self.active_deals + .insert(deal_event.info.deal_id, deal_event.info.unit_id); Ok(()) } @@ -930,7 +935,7 @@ impl ChainListener { } let statuses = retry(ExponentialBackoff::default(), || async { - let s = self.chain_connector.get_deal_statuses(self.active_deals.iter()).await.map_err(|err| { + let s = self.chain_connector.get_deal_statuses(self.active_deals.keys()).await.map_err(|err| { tracing::error!(target: "chain-listener", "Failed to poll deal statuses: {err}"); eyre!("Failed to poll deal statuses: {err}; Retrying...") })?; @@ -939,15 +944,15 @@ impl ChainListener { }) .await?; - for (status, deal_id) in statuses + for (status, (deal_id, cu_id)) in statuses .into_iter() .zip(self.active_deals.clone().into_iter()) { match status { Ok(status) => match status { DealStatus::InsufficientFunds | DealStatus::Ended => { - tracing::info!(target: "chain-listener", "Deal {deal_id} status: {status}"); - self.exit_deal(deal_id).await?; + tracing::info!(target: "chain-listener", "Deal {deal_id} status: {status}; Exiting..."); + self.exit_deal(deal_id, cu_id).await?; } DealStatus::Active | DealStatus::NotEnoughWorkers @@ -961,13 +966,12 @@ impl ChainListener { Ok(()) } - async fn exit_deal(&mut self, deal_id: DealId) -> eyre::Result<()> { - tracing::info!(target: "chain-listener", "Exiting deal: {deal_id}"); + async fn exit_deal(&mut self, deal_id: DealId, cu_id: CUID) -> eyre::Result<()> { let mut backoff = ExponentialBackoff::default(); backoff.max_elapsed_time = Some(Duration::from_secs(3)); retry(backoff, || async { - self.chain_connector.exit_deal(&deal_id).await.map_err(|err| { + self.chain_connector.exit_deal(&cu_id).await.map_err(|err| { tracing::error!(target: "chain-listener", "Failed to exit deal {deal_id}: {err}"); eyre!("Failed to exit deal {deal_id}: {err}; Retrying...") })?; From d7314e6242e3382ff9eae26841851ce029528cb3 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 03:16:49 +0200 Subject: [PATCH 07/13] subscribe to unit events on start --- crates/chain-listener/src/listener.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index ff0ca869df..2d8f7d51bc 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -163,6 +163,10 @@ impl ChainListener { if let Some(ref c) = self.current_commitment { tracing::info!(target: "chain-listener", "Current commitment id: {}", c); + tracing::info!(target: "chain-listener", "Subscribing to unit events"); + self.subscribe_unit_deactivated().await?; + self.subscribe_unit_activated().await?; + tracing::info!(target: "chain-listener", "Successfully subscribed to unit events"); } else { tracing::info!(target: "chain-listener", "Compute peer has no commitment"); self.stop_commitment().await? @@ -288,7 +292,7 @@ impl ChainListener { exit(1); } - tracing::info!(target: "chain-listener", "Subscribing to newHeads and cc events"); + tracing::info!(target: "chain-listener", "Subscribing to chain events"); if let Err(err) = self.subscribe_new_heads().await { tracing::error!(target: "chain-listener", "Failed to subscribe to newHeads: {err}; Stopping..."); exit(1); @@ -303,6 +307,7 @@ impl ChainListener { tracing::error!(target: "chain-listener", "Failed to subscribe to deal matched: {err}; Stopping..."); exit(1); } + tracing::info!(target: "chain-listener", "Subscribed successfully"); let setup: eyre::Result<()> = try { From 2501e62ea88c27a4ba44f2a450367d9c8e76a713 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 16:00:00 +0200 Subject: [PATCH 08/13] don't send exit tx for ended deals --- crates/chain-listener/src/listener.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 2d8f7d51bc..7310dd5102 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -955,9 +955,13 @@ impl ChainListener { { match status { Ok(status) => match status { - DealStatus::InsufficientFunds | DealStatus::Ended => { + DealStatus::InsufficientFunds => { tracing::info!(target: "chain-listener", "Deal {deal_id} status: {status}; Exiting..."); - self.exit_deal(deal_id, cu_id).await?; + self.exit_deal(&deal_id, cu_id).await?; + tracing::info!(target: "chain-listener", "Exited deal {deal_id} successfully"); + } + DealStatus::Ended => { + self.active_deals.remove(&deal_id); } DealStatus::Active | DealStatus::NotEnoughWorkers @@ -971,7 +975,7 @@ impl ChainListener { Ok(()) } - async fn exit_deal(&mut self, deal_id: DealId, cu_id: CUID) -> eyre::Result<()> { + async fn exit_deal(&mut self, deal_id: &DealId, cu_id: CUID) -> eyre::Result<()> { let mut backoff = ExponentialBackoff::default(); backoff.max_elapsed_time = Some(Duration::from_secs(3)); @@ -984,7 +988,7 @@ impl ChainListener { }) .await?; - self.active_deals.remove(&deal_id); + self.active_deals.remove(deal_id); Ok(()) } } From 124fda572cdc95a7633bf84c13c2a34f48a437aa Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 16:08:01 +0200 Subject: [PATCH 09/13] fix unit deactivation --- crates/chain-listener/src/listener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 7310dd5102..f1800fce03 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -685,7 +685,7 @@ impl ChainListener { ); self.active_compute_units.remove(&unit_event.info.unit_id); self.pending_compute_units - .retain(|cu| cu.id == unit_event.info.unit_id); + .retain(|cu| cu.id != unit_event.info.unit_id); self.refresh_commitment().await?; self.acquire_deal_core(unit_event.info.unit_id)?; Ok(()) From acc2da5445c19e62625b9aa11bb04cf93bfa41cc Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 16:42:04 +0200 Subject: [PATCH 10/13] exit on deal ended --- crates/chain-listener/src/listener.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index f1800fce03..9440f6cd84 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -426,7 +426,7 @@ impl ChainListener { // TODO: log compute units pretty tracing::info!(target: "chain-listener", "Active compute units: {:?}", - active + active.iter().map(CUID::to_string).collect::>() ); tracing::info!(target: "chain-listener", "Pending compute units: {:?}", @@ -955,14 +955,11 @@ impl ChainListener { { match status { Ok(status) => match status { - DealStatus::InsufficientFunds => { + DealStatus::InsufficientFunds | DealStatus::Ended => { tracing::info!(target: "chain-listener", "Deal {deal_id} status: {status}; Exiting..."); self.exit_deal(&deal_id, cu_id).await?; tracing::info!(target: "chain-listener", "Exited deal {deal_id} successfully"); } - DealStatus::Ended => { - self.active_deals.remove(&deal_id); - } DealStatus::Active | DealStatus::NotEnoughWorkers | DealStatus::SmallBalance => {} From 9efcbd065cc57b51fd96c00e5a499a67037cf4bc Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 21:11:32 +0200 Subject: [PATCH 11/13] resubscribe on subscription termination --- crates/chain-listener/src/listener.rs | 108 ++++++++++++++++++-------- 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 9440f6cd84..01bf79f3ad 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -13,7 +13,7 @@ use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash}; use cpu_utils::PhysicalCoreId; use ethabi::ethereum_types::U256; use eyre::eyre; -use jsonrpsee::core::client::{Client as WsClient, Subscription, SubscriptionClientT}; +use jsonrpsee::core::client::{Client as WsClient, Error, Subscription, SubscriptionClientT}; use jsonrpsee::core::{client, JsonValue}; use jsonrpsee::rpc_params; use libp2p_identity::PeerId; @@ -323,45 +323,80 @@ impl ChainListener { loop { tokio::select! { - Some(header) = poll_subscription(&mut self.heads) => { - if let Err(err) = self.process_new_header(header).await { - tracing::error!(target: "chain-listener", "newHeads event processing error: {err}"); + event = poll_subscription(&mut self.heads) => { + if let Some(header) = event { + if let Err(err) = self.process_new_header(header).await { + tracing::error!(target: "chain-listener", "newHeads event processing error: {err}"); + if let Err(err) = self.subscribe_new_heads().await { + tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping..."); + exit(1); + } + } + } else { if let Err(err) = self.subscribe_new_heads().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping..."); - exit(1); + tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping..."); + exit(1); } } }, - Some(cc) = poll_subscription(&mut self.commitment_activated) => { - if let Err(err) = self.process_commitment_activated(cc).await { - tracing::error!(target: "chain-listener", "CommitmentActivated event processing error: {err}"); + event = poll_subscription(&mut self.commitment_activated) => { + if let Some(cc) = event { + if let Err(err) = self.process_commitment_activated(cc).await { + tracing::error!(target: "chain-listener", "CommitmentActivated event processing error: {err}"); + if let Err(err) = self.subscribe_cc_activated().await { + tracing::error!(target: "chain-listener", "Failed to resubscribe to CommitmentActivated event: {err}; Stopping..."); + exit(1); + } + } + } else { if let Err(err) = self.subscribe_cc_activated().await { tracing::error!(target: "chain-listener", "Failed to resubscribe to CommitmentActivated event: {err}; Stopping..."); exit(1); } } }, - Some(event) = poll_subscription(&mut self.unit_activated) => { - if let Err(err) = self.process_unit_activated(event).await { - tracing::error!(target: "chain-listener", "UnitActivated event processing error: {err}"); - if let Err(err) = self.subscribe_unit_activated().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated: {err}; Stopping..."); - exit(1); + event = poll_subscription(&mut self.unit_activated) => { + if let Some(event) = event { + if let Err(err) = self.process_unit_activated(event).await { + tracing::error!(target: "chain-listener", "UnitActivated event processing error: {err}"); + if let Err(err) = self.subscribe_unit_activated().await { + tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated: {err}; Stopping..."); + exit(1); + } } - } + } else { + if let Err(err) = self.subscribe_unit_activated().await { + tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated: {err}; Stopping..."); + exit(1); + } + } }, - Some(event) = poll_subscription(&mut self.unit_deactivated) => { - if let Err(err) = self.process_unit_deactivated(event).await { - tracing::error!(target: "chain-listener", "UnitDeactivated event processing error: {err}"); + event = poll_subscription(&mut self.unit_deactivated) => { + if let Some(event) = event { + if let Err(err) = self.process_unit_deactivated(event).await { + tracing::error!(target: "chain-listener", "UnitDeactivated event processing error: {err}"); + if let Err(err) = self.subscribe_unit_deactivated().await { + tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated: {err}; Stopping..."); + exit(1); + } + } + } else { if let Err(err) = self.subscribe_unit_deactivated().await { tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated: {err}; Stopping..."); exit(1); } } }, - Some(event) = poll_subscription(&mut self.unit_matched) => { - if let Err(err) = self.process_deal_matched(event) { - tracing::error!(target: "chain-listener", "DealMatched event processing error: {err}"); + event = poll_subscription(&mut self.unit_matched) => { + if let Some(event) = event { + if let Err(err) = self.process_deal_matched(event) { + tracing::error!(target: "chain-listener", "DealMatched event processing error: {err}"); + if let Err(err) = self.subscribe_deal_matched().await { + tracing::error!(target: "chain-listener", "Failed to resubscribe to DealMatched: {err}; Stopping..."); + exit(1); + } + } + } else { if let Err(err) = self.subscribe_deal_matched().await { tracing::error!(target: "chain-listener", "Failed to resubscribe to DealMatched: {err}; Stopping..."); exit(1); @@ -567,11 +602,9 @@ impl ChainListener { Ok(()) } - async fn process_new_header( - &mut self, - header: Result, - ) -> eyre::Result<()> { - let block_timestamp = Self::parse_timestamp(header?)?; + async fn process_new_header(&mut self, header: Result) -> eyre::Result<()> { + // TODO: add block_number to metrics + let (block_timestamp, _block_number) = Self::parse_block_header(header?)?; // `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration` let epoch_number = @@ -579,6 +612,7 @@ impl ChainListener { let epoch_changed = epoch_number > self.current_epoch; if epoch_changed { + // TODO: add epoch_number to metrics tracing::info!(target: "chain-listener", "Epoch changed, new epoch number: {epoch_number}"); tracing::info!(target: "chain-listener", "Resetting proof id counter"); @@ -924,15 +958,27 @@ impl ChainListener { } } - fn parse_timestamp(header: Value) -> eyre::Result { - let timestamp = header + fn parse_block_header(header: Value) -> eyre::Result<(U256, U256)> { + let header = header .as_object() - .and_then(|o| o.get("timestamp")) + .ok_or(eyre::eyre!("newHeads: header is not an object"))?; + + let timestamp = header + .get("timestamp") .and_then(Value::as_str) .ok_or(eyre::eyre!("newHeads: timestamp field not found"))? .to_string(); - Ok(U256::from_str_radix(×tamp, 16)?) + let block_number = header + .get("number") + .and_then(Value::as_str) + .ok_or(eyre::eyre!("newHeads: number field not found"))? + .to_string(); + + Ok(( + U256::from_str_radix(×tamp, 16)?, + U256::from_str_radix(&block_number, 16)?, + )) } async fn poll_deal_statuses(&mut self) -> eyre::Result<()> { if self.active_deals.is_empty() { From 8b63c14054c13874fde2cc3cce359d8030011aef Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 21:32:56 +0200 Subject: [PATCH 12/13] log message from newHeads subs --- crates/chain-listener/src/listener.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 01bf79f3ad..97649196b6 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -602,9 +602,11 @@ impl ChainListener { Ok(()) } - async fn process_new_header(&mut self, header: Result) -> eyre::Result<()> { + async fn process_new_header(&mut self, event: Result) -> eyre::Result<()> { + let header = event?; + tracing::info!(target: "chain-listener", "Received newHeads event: {header}"); // TODO: add block_number to metrics - let (block_timestamp, _block_number) = Self::parse_block_header(header?)?; + let (block_timestamp, _block_number) = Self::parse_block_header(header)?; // `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration` let epoch_number = @@ -650,7 +652,7 @@ impl ChainListener { async fn process_commitment_activated( &mut self, - event: Result, + event: Result, ) -> eyre::Result<()> { let cc_event = parse_log::(event?)?; let unit_ids = cc_event.info.unit_ids; @@ -685,10 +687,7 @@ impl ChainListener { Ok(()) } - async fn process_unit_activated( - &mut self, - event: Result, - ) -> eyre::Result<()> { + async fn process_unit_activated(&mut self, event: Result) -> eyre::Result<()> { let unit_event = parse_log::(event?)?; tracing::info!(target: "chain-listener", "Received UnitActivated event for unit: {}, startEpoch: {}", From 21192d2256c0fa6b125f9ab03e15d4f4a2d47e2c Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sat, 9 Mar 2024 22:00:38 +0200 Subject: [PATCH 13/13] fix --- crates/chain-listener/src/listener.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 97649196b6..dfc2bb0385 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -603,10 +603,8 @@ impl ChainListener { } async fn process_new_header(&mut self, event: Result) -> eyre::Result<()> { - let header = event?; - tracing::info!(target: "chain-listener", "Received newHeads event: {header}"); // TODO: add block_number to metrics - let (block_timestamp, _block_number) = Self::parse_block_header(header)?; + let (block_timestamp, _block_number) = Self::parse_block_header(event?)?; // `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration` let epoch_number = @@ -958,20 +956,24 @@ impl ChainListener { } fn parse_block_header(header: Value) -> eyre::Result<(U256, U256)> { - let header = header - .as_object() - .ok_or(eyre::eyre!("newHeads: header is not an object"))?; + let obj = header.as_object().ok_or(eyre::eyre!( + "newHeads: header is not an object; got {header}" + ))?; - let timestamp = header + let timestamp = obj .get("timestamp") .and_then(Value::as_str) - .ok_or(eyre::eyre!("newHeads: timestamp field not found"))? + .ok_or(eyre::eyre!( + "newHeads: timestamp field not found; got {header}" + ))? .to_string(); let block_number = header .get("number") .and_then(Value::as_str) - .ok_or(eyre::eyre!("newHeads: number field not found"))? + .ok_or(eyre::eyre!( + "newHeads: number field not found; got {header}" + ))? .to_string(); Ok((