From 2f9b272ffa9dd44df7bd1bfdc8ac74e6baaf9267 Mon Sep 17 00:00:00 2001 From: Benno Zeeman Date: Tue, 10 Dec 2024 16:55:56 +0100 Subject: [PATCH 1/9] feat(autonomi): transaction API --- ant-networking/src/error.rs | 3 + ant-protocol/src/storage/transaction.rs | 57 +++++---- autonomi/Cargo.toml | 3 +- autonomi/src/client/mod.rs | 3 + autonomi/src/client/transactions.rs | 155 ++++++++++++++++++++++++ autonomi/tests/transaction.rs | 31 +++++ 6 files changed, 228 insertions(+), 24 deletions(-) create mode 100644 autonomi/src/client/transactions.rs create mode 100644 autonomi/tests/transaction.rs diff --git a/ant-networking/src/error.rs b/ant-networking/src/error.rs index c683ff4432..30dd587ff2 100644 --- a/ant-networking/src/error.rs +++ b/ant-networking/src/error.rs @@ -182,6 +182,9 @@ pub enum NetworkError { #[error("Register already exists at this address")] RegisterAlreadyExists, + + #[error("Transaction already exists at this address")] + TransactionAlreadyExists, } #[cfg(test)] diff --git a/ant-protocol/src/storage/transaction.rs b/ant-protocol/src/storage/transaction.rs index 0045f9e746..30e77c29e8 100644 --- a/ant-protocol/src/storage/transaction.rs +++ b/ant-protocol/src/storage/transaction.rs @@ -7,6 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::address::TransactionAddress; +use bls::SecretKey; use serde::{Deserialize, Serialize}; // re-exports @@ -27,13 +28,15 @@ pub struct Transaction { } impl Transaction { + /// Create a new transaction, signing it with the provided secret key. pub fn new( owner: PublicKey, parents: Vec, content: TransactionContent, outputs: Vec<(PublicKey, TransactionContent)>, - signature: Signature, + signing_key: &SecretKey, ) -> Self { + let signature = signing_key.sign(bytes_for_signature(&owner, &parents, &content, &outputs)); Self { owner, parents, @@ -47,29 +50,9 @@ impl Transaction { TransactionAddress::from_owner(self.owner) } + /// Get the bytes that the signature is calculated from. pub fn bytes_for_signature(&self) -> Vec { - let mut bytes = Vec::new(); - bytes.extend_from_slice(&self.owner.to_bytes()); - bytes.extend_from_slice("parent".as_bytes()); - bytes.extend_from_slice( - &self - .parents - .iter() - .map(|p| p.to_bytes()) - .collect::>() - .concat(), - ); - bytes.extend_from_slice("content".as_bytes()); - bytes.extend_from_slice(&self.content); - bytes.extend_from_slice("outputs".as_bytes()); - bytes.extend_from_slice( - &self - .outputs - .iter() - .flat_map(|(p, c)| [&p.to_bytes(), c.as_slice()].concat()) - .collect::>(), - ); - bytes + bytes_for_signature(&self.owner, &self.parents, &self.content, &self.outputs) } pub fn verify(&self) -> bool { @@ -77,3 +60,31 @@ impl Transaction { .verify(&self.signature, self.bytes_for_signature()) } } + +fn bytes_for_signature( + owner: &PublicKey, + parents: &[PublicKey], + content: &[u8], + outputs: &[(PublicKey, TransactionContent)], +) -> Vec { + let mut bytes = Vec::new(); + bytes.extend_from_slice(&owner.to_bytes()); + bytes.extend_from_slice("parent".as_bytes()); + bytes.extend_from_slice( + &parents + .iter() + .map(|p| p.to_bytes()) + .collect::>() + .concat(), + ); + bytes.extend_from_slice("content".as_bytes()); + bytes.extend_from_slice(content); + bytes.extend_from_slice("outputs".as_bytes()); + bytes.extend_from_slice( + &outputs + .iter() + .flat_map(|(p, c)| [&p.to_bytes(), c.as_slice()].concat()) + .collect::>(), + ); + bytes +} diff --git a/autonomi/Cargo.toml b/autonomi/Cargo.toml index d5089d14bc..5dceadbd17 100644 --- a/autonomi/Cargo.toml +++ b/autonomi/Cargo.toml @@ -18,10 +18,11 @@ default = ["vault"] external-signer = ["ant-evm/external-signer"] extension-module = ["pyo3/extension-module"] fs = ["tokio/fs"] -full = ["registers", "vault", "fs"] +full = ["registers", "vault", "fs", "transactions"] local = ["ant-networking/local", "ant-evm/local"] loud = [] registers = [] +transactions = [] vault = ["registers"] [dependencies] diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index 8a233b8085..244f96bc1a 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -21,6 +21,9 @@ pub mod files; #[cfg(feature = "registers")] #[cfg_attr(docsrs, doc(cfg(feature = "registers")))] pub mod registers; +#[cfg(feature = "transactions")] +#[cfg_attr(docsrs, doc(cfg(feature = "transactions")))] +pub mod transactions; #[cfg(feature = "vault")] #[cfg_attr(docsrs, doc(cfg(feature = "vault")))] pub mod vault; diff --git a/autonomi/src/client/transactions.rs b/autonomi/src/client/transactions.rs new file mode 100644 index 0000000000..20c1b76090 --- /dev/null +++ b/autonomi/src/client/transactions.rs @@ -0,0 +1,155 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::client::data::PayError; +use crate::client::Client; +use crate::client::ClientEvent; +use crate::client::UploadSummary; + +pub use ant_protocol::storage::Transaction; +use ant_protocol::storage::TransactionAddress; +pub use bls::SecretKey as TransactionSecretKey; + +use ant_evm::{EvmWallet, EvmWalletError}; +use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; +use ant_protocol::{ + storage::{try_serialize_record, RecordKind, RetryStrategy}, + NetworkAddress, +}; +use libp2p::kad::{Quorum, Record}; + +use super::data::CostError; + +#[derive(Debug, thiserror::Error)] +pub enum TransactionError { + #[error("Cost error: {0}")] + Cost(#[from] CostError), + #[error("Network error")] + Network(#[from] NetworkError), + #[error("Serialization error")] + Serialization, + #[error("Transaction could not be verified (corrupt)")] + FailedVerification, + #[error("Payment failure occurred during transaction creation.")] + Pay(#[from] PayError), + #[error("Failed to retrieve wallet payment")] + Wallet(#[from] EvmWalletError), + #[error("Received invalid quote from node, this node is possibly malfunctioning, try another node by trying another transaction name")] + InvalidQuote, +} + +impl Client { + /// Fetches a Transaction from the network. + pub async fn transaction_get( + &self, + address: TransactionAddress, + ) -> Result, TransactionError> { + let transactions = self.network.get_transactions(address).await?; + + Ok(transactions) + } + + pub async fn transaction_put( + &self, + transaction: Transaction, + wallet: &EvmWallet, + ) -> Result<(), TransactionError> { + let address = transaction.address(); + + let xor_name = address.xorname(); + debug!("Paying for transaction at address: {address:?}"); + let (payment_proofs, _skipped) = self + .pay(std::iter::once(*xor_name), wallet) + .await + .inspect_err(|err| { + error!("Failed to pay for transaction at address: {address:?} : {err}") + })?; + let proof = if let Some(proof) = payment_proofs.get(xor_name) { + proof + } else { + // transaction was skipped, meaning it was already paid for + error!("Transaction at address: {address:?} was already paid for"); + return Err(TransactionError::Network( + NetworkError::TransactionAlreadyExists, + )); + }; + let payee = proof + .to_peer_id_payee() + .ok_or(TransactionError::InvalidQuote) + .inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?; + + let record = Record { + key: NetworkAddress::from_transaction_address(address).to_record_key(), + value: try_serialize_record(&(proof, &transaction), RecordKind::TransactionWithPayment) + .map_err(|_| TransactionError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + }; + + let get_cfg = GetRecordCfg { + get_quorum: Quorum::Majority, + retry_strategy: Some(RetryStrategy::default()), + target_record: None, + expected_holders: Default::default(), + is_register: false, + }; + let put_cfg = PutRecordCfg { + put_quorum: Quorum::All, + retry_strategy: None, + use_put_record_to: Some(vec![payee]), + verification: Some((VerificationKind::Network, get_cfg)), + }; + + debug!("Storing transaction at address {address:?} to the network"); + self.network + .put_record(record, &put_cfg) + .await + .inspect_err(|err| { + error!("Failed to put record - transaction {address:?} to the network: {err}") + })?; + + if let Some(channel) = self.client_event_sender.as_ref() { + let summary = UploadSummary { + record_count: 1, + tokens_spent: proof.quote.cost.as_atto(), + }; + if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { + error!("Failed to send client event: {err}"); + } + } + + Ok(()) + } + + // /// Get the cost to create a transaction + // pub async fn transaction_cost( + // &self, + // name: String, + // owner: TransactionSecretKey, + // ) -> Result { + // info!("Getting cost for transaction with name: {name}"); + // // get transaction address + // let pk = owner.public_key(); + // let name = XorName::from_content_parts(&[name.as_bytes()]); + // let transaction = Transaction::new(None, name, owner, permissions)?; + // let reg_xor = transaction.address().xorname(); + + // // get cost to store transaction + // // NB TODO: transaction should be priced differently from other data + // let cost_map = self.get_store_quotes(std::iter::once(reg_xor)).await?; + // let total_cost = AttoTokens::from_atto( + // cost_map + // .values() + // .map(|quote| quote.2.cost.as_atto()) + // .sum::(), + // ); + // debug!("Calculated the cost to create transaction with name: {name} is {total_cost}"); + // Ok(total_cost) + // } +} diff --git a/autonomi/tests/transaction.rs b/autonomi/tests/transaction.rs new file mode 100644 index 0000000000..10e871f193 --- /dev/null +++ b/autonomi/tests/transaction.rs @@ -0,0 +1,31 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +#![cfg(feature = "transactions")] + +use ant_logging::LogBuilder; +use ant_protocol::storage::Transaction; +use autonomi::Client; +use eyre::Result; +use test_utils::{evm::get_funded_wallet, peers_from_env}; + +#[tokio::test] +async fn transaction() -> Result<()> { + let _log_appender_guard = LogBuilder::init_single_threaded_tokio_test("transaction", false); + + let client = Client::connect(&peers_from_env()?).await?; + let wallet = get_funded_wallet(); + + let key = bls::SecretKey::random(); + let content = [0u8; 32]; + let mut transaction = Transaction::new(key.public_key(), vec![], content, vec![], &key); + + client.transaction_put(transaction, &wallet).await?; + + Ok(()) +} From fa1405581ae4fee72878580921edbda9b0dfe1f7 Mon Sep 17 00:00:00 2001 From: Benno Zeeman Date: Tue, 10 Dec 2024 17:03:30 +0100 Subject: [PATCH 2/9] fix(autonomi): fix transaction test --- autonomi/tests/transaction.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autonomi/tests/transaction.rs b/autonomi/tests/transaction.rs index 10e871f193..34b6b5ea94 100644 --- a/autonomi/tests/transaction.rs +++ b/autonomi/tests/transaction.rs @@ -15,7 +15,7 @@ use eyre::Result; use test_utils::{evm::get_funded_wallet, peers_from_env}; #[tokio::test] -async fn transaction() -> Result<()> { +async fn transaction_put() -> Result<()> { let _log_appender_guard = LogBuilder::init_single_threaded_tokio_test("transaction", false); let client = Client::connect(&peers_from_env()?).await?; @@ -23,7 +23,7 @@ async fn transaction() -> Result<()> { let key = bls::SecretKey::random(); let content = [0u8; 32]; - let mut transaction = Transaction::new(key.public_key(), vec![], content, vec![], &key); + let transaction = Transaction::new(key.public_key(), vec![], content, vec![], &key); client.transaction_put(transaction, &wallet).await?; From 65f5c2b67b141b46cb85639468b0eb917b648bc7 Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Dec 2024 14:06:24 +0900 Subject: [PATCH 3/9] feat: wrap up Transaction work --- ant-networking/src/error.rs | 3 -- autonomi/Cargo.toml | 3 +- autonomi/src/client/mod.rs | 7 ++- autonomi/src/client/registers.rs | 11 +--- autonomi/src/client/transactions.rs | 82 ++++++++++++++--------------- autonomi/tests/transaction.rs | 2 - 6 files changed, 45 insertions(+), 63 deletions(-) diff --git a/ant-networking/src/error.rs b/ant-networking/src/error.rs index 30dd587ff2..c683ff4432 100644 --- a/ant-networking/src/error.rs +++ b/ant-networking/src/error.rs @@ -182,9 +182,6 @@ pub enum NetworkError { #[error("Register already exists at this address")] RegisterAlreadyExists, - - #[error("Transaction already exists at this address")] - TransactionAlreadyExists, } #[cfg(test)] diff --git a/autonomi/Cargo.toml b/autonomi/Cargo.toml index 5dceadbd17..d5089d14bc 100644 --- a/autonomi/Cargo.toml +++ b/autonomi/Cargo.toml @@ -18,11 +18,10 @@ default = ["vault"] external-signer = ["ant-evm/external-signer"] extension-module = ["pyo3/extension-module"] fs = ["tokio/fs"] -full = ["registers", "vault", "fs", "transactions"] +full = ["registers", "vault", "fs"] local = ["ant-networking/local", "ant-evm/local"] loud = [] registers = [] -transactions = [] vault = ["registers"] [dependencies] diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index 244f96bc1a..fae0a87ba8 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -14,16 +14,15 @@ pub mod payment; pub mod quote; pub mod data; +pub mod files; +pub mod transactions; + #[cfg(feature = "external-signer")] #[cfg_attr(docsrs, doc(cfg(feature = "external-signer")))] pub mod external_signer; -pub mod files; #[cfg(feature = "registers")] #[cfg_attr(docsrs, doc(cfg(feature = "registers")))] pub mod registers; -#[cfg(feature = "transactions")] -#[cfg_attr(docsrs, doc(cfg(feature = "transactions")))] -pub mod transactions; #[cfg(feature = "vault")] #[cfg_attr(docsrs, doc(cfg(feature = "vault")))] pub mod vault; diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index fa353d4873..5d56055a08 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -239,7 +239,7 @@ impl Client { name: String, owner: RegisterSecretKey, ) -> Result { - info!("Getting cost for register with name: {name}"); + trace!("Getting cost for register with name: {name}"); // get register address let pk = owner.public_key(); let name = XorName::from_content_parts(&[name.as_bytes()]); @@ -321,15 +321,6 @@ impl Client { }; let payees = proof.payees(); - - if payees.is_empty() { - error!( - "Failed to get payees from payment proof: {:?}", - RegisterError::PayeesMissing - ); - return Err(RegisterError::PayeesMissing); - } - let signed_register = register.signed_reg.clone(); let record = Record { diff --git a/autonomi/src/client/transactions.rs b/autonomi/src/client/transactions.rs index 20c1b76090..c87a31dd1d 100644 --- a/autonomi/src/client/transactions.rs +++ b/autonomi/src/client/transactions.rs @@ -11,9 +11,11 @@ use crate::client::Client; use crate::client::ClientEvent; use crate::client::UploadSummary; +use ant_evm::Amount; +use ant_evm::AttoTokens; pub use ant_protocol::storage::Transaction; use ant_protocol::storage::TransactionAddress; -pub use bls::SecretKey as TransactionSecretKey; +pub use bls::SecretKey; use ant_evm::{EvmWallet, EvmWalletError}; use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; @@ -41,6 +43,8 @@ pub enum TransactionError { Wallet(#[from] EvmWalletError), #[error("Received invalid quote from node, this node is possibly malfunctioning, try another node by trying another transaction name")] InvalidQuote, + #[error("Transaction already exists at this address: {0:?}")] + TransactionAlreadyExists(TransactionAddress), } impl Client { @@ -61,28 +65,28 @@ impl Client { ) -> Result<(), TransactionError> { let address = transaction.address(); + // pay for the transaction let xor_name = address.xorname(); debug!("Paying for transaction at address: {address:?}"); - let (payment_proofs, _skipped) = self + let payment_proofs = self .pay(std::iter::once(*xor_name), wallet) .await .inspect_err(|err| { error!("Failed to pay for transaction at address: {address:?} : {err}") })?; - let proof = if let Some(proof) = payment_proofs.get(xor_name) { - proof - } else { - // transaction was skipped, meaning it was already paid for - error!("Transaction at address: {address:?} was already paid for"); - return Err(TransactionError::Network( - NetworkError::TransactionAlreadyExists, - )); + + // make sure the transaction was paid for + let (proof, price) = match payment_proofs.get(xor_name) { + Some((proof, price)) => (proof, price), + None => { + // transaction was skipped, meaning it was already paid for + error!("Transaction at address: {address:?} was already paid for"); + return Err(TransactionError::TransactionAlreadyExists(address)); + } }; - let payee = proof - .to_peer_id_payee() - .ok_or(TransactionError::InvalidQuote) - .inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?; + // prepare the record for network storage + let payees = proof.payees(); let record = Record { key: NetworkAddress::from_transaction_address(address).to_record_key(), value: try_serialize_record(&(proof, &transaction), RecordKind::TransactionWithPayment) @@ -91,7 +95,6 @@ impl Client { publisher: None, expires: None, }; - let get_cfg = GetRecordCfg { get_quorum: Quorum::Majority, retry_strategy: Some(RetryStrategy::default()), @@ -102,10 +105,11 @@ impl Client { let put_cfg = PutRecordCfg { put_quorum: Quorum::All, retry_strategy: None, - use_put_record_to: Some(vec![payee]), + use_put_record_to: Some(payees), verification: Some((VerificationKind::Network, get_cfg)), }; + // put the record to the network debug!("Storing transaction at address {address:?} to the network"); self.network .put_record(record, &put_cfg) @@ -114,10 +118,11 @@ impl Client { error!("Failed to put record - transaction {address:?} to the network: {err}") })?; + // send client event if let Some(channel) = self.client_event_sender.as_ref() { let summary = UploadSummary { record_count: 1, - tokens_spent: proof.quote.cost.as_atto(), + tokens_spent: price.as_atto(), }; if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { error!("Failed to send client event: {err}"); @@ -127,29 +132,22 @@ impl Client { Ok(()) } - // /// Get the cost to create a transaction - // pub async fn transaction_cost( - // &self, - // name: String, - // owner: TransactionSecretKey, - // ) -> Result { - // info!("Getting cost for transaction with name: {name}"); - // // get transaction address - // let pk = owner.public_key(); - // let name = XorName::from_content_parts(&[name.as_bytes()]); - // let transaction = Transaction::new(None, name, owner, permissions)?; - // let reg_xor = transaction.address().xorname(); - - // // get cost to store transaction - // // NB TODO: transaction should be priced differently from other data - // let cost_map = self.get_store_quotes(std::iter::once(reg_xor)).await?; - // let total_cost = AttoTokens::from_atto( - // cost_map - // .values() - // .map(|quote| quote.2.cost.as_atto()) - // .sum::(), - // ); - // debug!("Calculated the cost to create transaction with name: {name} is {total_cost}"); - // Ok(total_cost) - // } + /// Get the cost to create a transaction + pub async fn transaction_cost(&self, key: SecretKey) -> Result { + let pk = key.public_key(); + trace!("Getting cost for transaction of {pk:?}"); + + let address = TransactionAddress::from_owner(pk); + let xor = *address.xorname(); + let store_quote = self.get_store_quotes(std::iter::once(xor)).await?; + let total_cost = AttoTokens::from_atto( + store_quote + .0 + .values() + .map(|quote| quote.price()) + .sum::(), + ); + debug!("Calculated the cost to create transaction of {pk:?} is {total_cost}"); + Ok(total_cost) + } } diff --git a/autonomi/tests/transaction.rs b/autonomi/tests/transaction.rs index 34b6b5ea94..62e74b83e3 100644 --- a/autonomi/tests/transaction.rs +++ b/autonomi/tests/transaction.rs @@ -6,8 +6,6 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -#![cfg(feature = "transactions")] - use ant_logging::LogBuilder; use ant_protocol::storage::Transaction; use autonomi::Client; From 50eba251d235b64d6054078a03849048fe2632a1 Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Dec 2024 16:33:07 +0900 Subject: [PATCH 4/9] fix: all tx come with payment fee, client side reject second tx put --- ant-node/src/log_markers.rs | 2 +- ant-node/src/put_validation.rs | 34 ++++------- ant-protocol/src/storage/transaction.rs | 78 +++++++++++++++---------- autonomi/tests/transaction.rs | 23 +++++++- 4 files changed, 81 insertions(+), 56 deletions(-) diff --git a/ant-node/src/log_markers.rs b/ant-node/src/log_markers.rs index d5ef326b63..23f7c0829e 100644 --- a/ant-node/src/log_markers.rs +++ b/ant-node/src/log_markers.rs @@ -51,7 +51,7 @@ pub enum Marker<'a> { /// Valid paid to us and royalty paid register stored ValidPaidRegisterPutFromClient(&'a PrettyPrintRecordKey<'a>), /// Valid transaction stored - ValidSpendPutFromClient(&'a PrettyPrintRecordKey<'a>), + ValidTransactionPutFromClient(&'a PrettyPrintRecordKey<'a>), /// Valid scratchpad stored ValidScratchpadRecordPutFromClient(&'a PrettyPrintRecordKey<'a>), diff --git a/ant-node/src/put_validation.rs b/ant-node/src/put_validation.rs index 9beec8b740..203eeeb733 100644 --- a/ant-node/src/put_validation.rs +++ b/ant-node/src/put_validation.rs @@ -162,29 +162,11 @@ impl Node { .await } RecordKind::Transaction => { - let record_key = record.key.clone(); - let value_to_hash = record.value.clone(); - let transactions = try_deserialize_record::>(&record)?; - let result = self - .validate_merge_and_store_transactions(transactions, &record_key) - .await; - if result.is_ok() { - Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log(); - let content_hash = XorName::from_content(&value_to_hash); - self.replicate_valid_fresh_record( - record_key, - RecordType::NonChunk(content_hash), - ); - - // Notify replication_fetcher to mark the attempt as completed. - // Send the notification earlier to avoid it got skipped due to: - // the record becomes stored during the fetch because of other interleaved process. - self.network().notify_fetch_completed( - record.key.clone(), - RecordType::NonChunk(content_hash), - ); - } - result + // Transactions should always be paid for + error!("Transaction should not be validated at this point"); + Err(Error::InvalidPutWithoutPayment( + PrettyPrintRecordKey::from(&record.key).into_owned(), + )) } RecordKind::TransactionWithPayment => { let (payment, transaction) = @@ -224,6 +206,12 @@ impl Node { .await; if res.is_ok() { let content_hash = XorName::from_content(&record.value); + Marker::ValidTransactionPutFromClient(&PrettyPrintRecordKey::from(&record.key)) + .log(); + self.replicate_valid_fresh_record( + record.key.clone(), + RecordType::NonChunk(content_hash), + ); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: diff --git a/ant-protocol/src/storage/transaction.rs b/ant-protocol/src/storage/transaction.rs index 30e77c29e8..6f7a7a9b11 100644 --- a/ant-protocol/src/storage/transaction.rs +++ b/ant-protocol/src/storage/transaction.rs @@ -36,7 +36,7 @@ impl Transaction { outputs: Vec<(PublicKey, TransactionContent)>, signing_key: &SecretKey, ) -> Self { - let signature = signing_key.sign(bytes_for_signature(&owner, &parents, &content, &outputs)); + let signature = signing_key.sign(Self::bytes_to_sign(&owner, &parents, &content, &outputs)); Self { owner, parents, @@ -46,13 +46,59 @@ impl Transaction { } } + /// Create a new transaction, with the signature already calculated. + pub fn new_with_signature( + owner: PublicKey, + parents: Vec, + content: TransactionContent, + outputs: Vec<(PublicKey, TransactionContent)>, + signature: Signature, + ) -> Self { + Self { + owner, + parents, + content, + outputs, + signature, + } + } + + /// Get the bytes that the signature is calculated from. + pub fn bytes_to_sign( + owner: &PublicKey, + parents: &[PublicKey], + content: &[u8], + outputs: &[(PublicKey, TransactionContent)], + ) -> Vec { + let mut bytes = Vec::new(); + bytes.extend_from_slice(&owner.to_bytes()); + bytes.extend_from_slice("parent".as_bytes()); + bytes.extend_from_slice( + &parents + .iter() + .map(|p| p.to_bytes()) + .collect::>() + .concat(), + ); + bytes.extend_from_slice("content".as_bytes()); + bytes.extend_from_slice(content); + bytes.extend_from_slice("outputs".as_bytes()); + bytes.extend_from_slice( + &outputs + .iter() + .flat_map(|(p, c)| [&p.to_bytes(), c.as_slice()].concat()) + .collect::>(), + ); + bytes + } + pub fn address(&self) -> TransactionAddress { TransactionAddress::from_owner(self.owner) } /// Get the bytes that the signature is calculated from. pub fn bytes_for_signature(&self) -> Vec { - bytes_for_signature(&self.owner, &self.parents, &self.content, &self.outputs) + Self::bytes_to_sign(&self.owner, &self.parents, &self.content, &self.outputs) } pub fn verify(&self) -> bool { @@ -60,31 +106,3 @@ impl Transaction { .verify(&self.signature, self.bytes_for_signature()) } } - -fn bytes_for_signature( - owner: &PublicKey, - parents: &[PublicKey], - content: &[u8], - outputs: &[(PublicKey, TransactionContent)], -) -> Vec { - let mut bytes = Vec::new(); - bytes.extend_from_slice(&owner.to_bytes()); - bytes.extend_from_slice("parent".as_bytes()); - bytes.extend_from_slice( - &parents - .iter() - .map(|p| p.to_bytes()) - .collect::>() - .concat(), - ); - bytes.extend_from_slice("content".as_bytes()); - bytes.extend_from_slice(content); - bytes.extend_from_slice("outputs".as_bytes()); - bytes.extend_from_slice( - &outputs - .iter() - .flat_map(|(p, c)| [&p.to_bytes(), c.as_slice()].concat()) - .collect::>(), - ); - bytes -} diff --git a/autonomi/tests/transaction.rs b/autonomi/tests/transaction.rs index 62e74b83e3..64e7502344 100644 --- a/autonomi/tests/transaction.rs +++ b/autonomi/tests/transaction.rs @@ -8,7 +8,7 @@ use ant_logging::LogBuilder; use ant_protocol::storage::Transaction; -use autonomi::Client; +use autonomi::{client::transactions::TransactionError, Client}; use eyre::Result; use test_utils::{evm::get_funded_wallet, peers_from_env}; @@ -23,7 +23,26 @@ async fn transaction_put() -> Result<()> { let content = [0u8; 32]; let transaction = Transaction::new(key.public_key(), vec![], content, vec![], &key); - client.transaction_put(transaction, &wallet).await?; + client.transaction_put(transaction.clone(), &wallet).await?; + println!("transaction put 1"); + // wait for the transaction to be replicated + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // check that the transaction is stored + let txs = client.transaction_get(transaction.address()).await?; + assert_eq!(txs, vec![transaction.clone()]); + println!("transaction got 1"); + + // try put another transaction with the same address + let content2 = [1u8; 32]; + let transaction2 = Transaction::new(key.public_key(), vec![], content2, vec![], &key); + let res = client.transaction_put(transaction2.clone(), &wallet).await; + + assert!(matches!( + res, + Err(TransactionError::TransactionAlreadyExists(address)) + if address == transaction2.address() + )); Ok(()) } From a237c1dd3f38e247ab38fe37a45aadb5dfb7d9d2 Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Dec 2024 17:15:22 +0900 Subject: [PATCH 5/9] feat: test transation cost too --- autonomi/tests/transaction.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/autonomi/tests/transaction.rs b/autonomi/tests/transaction.rs index 64e7502344..76f0bd760d 100644 --- a/autonomi/tests/transaction.rs +++ b/autonomi/tests/transaction.rs @@ -23,6 +23,11 @@ async fn transaction_put() -> Result<()> { let content = [0u8; 32]; let transaction = Transaction::new(key.public_key(), vec![], content, vec![], &key); + // estimate the cost of the transaction + let cost = client.transaction_cost(key.clone()).await?; + println!("transaction cost: {cost}"); + + // put the transaction client.transaction_put(transaction.clone(), &wallet).await?; println!("transaction put 1"); From 2ea25b070d3c1f225ab053fd8521a8d74c9a7b59 Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Dec 2024 17:34:22 +0900 Subject: [PATCH 6/9] fix: crdt verification for Transaction and Registers put --- autonomi/src/client/registers.rs | 2 +- autonomi/src/client/transactions.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index 5d56055a08..d2ae5f203a 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -347,7 +347,7 @@ impl Client { put_quorum: Quorum::All, retry_strategy: None, use_put_record_to: Some(payees), - verification: Some((VerificationKind::Network, get_cfg)), + verification: Some((VerificationKind::Crdt, get_cfg)), }; debug!("Storing register at address {address} to the network"); diff --git a/autonomi/src/client/transactions.rs b/autonomi/src/client/transactions.rs index c87a31dd1d..1585709960 100644 --- a/autonomi/src/client/transactions.rs +++ b/autonomi/src/client/transactions.rs @@ -106,7 +106,7 @@ impl Client { put_quorum: Quorum::All, retry_strategy: None, use_put_record_to: Some(payees), - verification: Some((VerificationKind::Network, get_cfg)), + verification: Some((VerificationKind::Crdt, get_cfg)), }; // put the record to the network From f3c5906a47d58d86b1d26d54ace238071f5a42ce Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Dec 2024 17:38:03 +0900 Subject: [PATCH 7/9] fix: stop storing duplicate transactions --- ant-node/src/put_validation.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ant-node/src/put_validation.rs b/ant-node/src/put_validation.rs index 203eeeb733..67a01b275b 100644 --- a/ant-node/src/put_validation.rs +++ b/ant-node/src/put_validation.rs @@ -6,6 +6,8 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +use std::collections::BTreeSet; + use crate::{node::Node, Error, Marker, Result}; use ant_evm::payment_vault::verify_data_payment; use ant_evm::{AttoTokens, ProofOfPayment}; @@ -589,23 +591,24 @@ impl Node { } // verify the transactions - let mut validated_transactions: Vec = transactions_for_key + let mut validated_transactions: BTreeSet = transactions_for_key .into_iter() .filter(|t| t.verify()) .collect(); // skip if none are valid - let addr = match validated_transactions.as_slice() { - [] => { + let addr = match validated_transactions.first() { + None => { warn!("Found no validated transactions to store at {pretty_key:?}"); return Ok(()); } - [t, ..] => t.address(), + Some(t) => t.address(), }; - // add local transactions to the validated transactions + // add local transactions to the validated transactions, turn to Vec let local_txs = self.get_local_transactions(addr).await?; - validated_transactions.extend(local_txs); + validated_transactions.extend(local_txs.into_iter()); + let validated_transactions: Vec = validated_transactions.into_iter().collect(); // store the record into the local storage let record = Record { From 8fa3dd9b039102fff8a3116e31655e3158f0c2b4 Mon Sep 17 00:00:00 2001 From: qima Date: Thu, 12 Dec 2024 20:16:40 +0800 Subject: [PATCH 8/9] fix(client): expand quoting range by 1 --- ant-networking/src/lib.rs | 46 +-------------------------------------- 1 file changed, 1 insertion(+), 45 deletions(-) diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index a02767594c..c43cdcdf8e 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -1063,50 +1063,6 @@ impl Network { send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd); } - /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. - /// If `client` is false, then include `self` among the `closest_peers` - pub async fn get_close_group_closest_peers( - &self, - key: &NetworkAddress, - client: bool, - ) -> Result> { - debug!("Getting the closest peers to {key:?}"); - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { - key: key.clone(), - sender, - }); - let k_bucket_peers = receiver.await?; - - // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result - let result_len = k_bucket_peers.len(); - let mut closest_peers = k_bucket_peers; - // ensure we're not including self here - if client { - // remove our peer id from the calculations here: - closest_peers.retain(|&x| x != self.peer_id()); - if result_len != closest_peers.len() { - info!("Remove self client from the closest_peers"); - } - } - if tracing::level_enabled!(tracing::Level::DEBUG) { - let close_peers_pretty_print: Vec<_> = closest_peers - .iter() - .map(|peer_id| { - format!( - "{peer_id:?}({:?})", - PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key()) - ) - }) - .collect(); - - debug!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}"); - } - - let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?; - Ok(closest_peers.into_iter().cloned().collect()) - } - /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. /// If `client` is false, then include `self` among the `closest_peers` /// @@ -1155,7 +1111,7 @@ impl Network { ); } - let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?; + let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE + 1)?; Ok(closest_peers.into_iter().cloned().collect()) } From 343a22d0374d93f74358e3d50fdec988d9a32541 Mon Sep 17 00:00:00 2001 From: qima Date: Thu, 12 Dec 2024 20:52:09 +0800 Subject: [PATCH 9/9] chore(CI): raise up the client peak memory usage limit --- .github/workflows/benchmark-prs.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/benchmark-prs.yml b/.github/workflows/benchmark-prs.yml index 25392240a3..eb27cf7ffc 100644 --- a/.github/workflows/benchmark-prs.yml +++ b/.github/workflows/benchmark-prs.yml @@ -85,11 +85,13 @@ jobs: ########################### ### Client Mem Analysis ### ########################### + ### The peak limit shall be restored back to 50MB, + ### Once client side chunking/quoting flow got re-examined. - name: Check client memory usage shell: bash run: | - client_peak_mem_limit_mb="1024" # mb + client_peak_mem_limit_mb="1500" # mb client_avg_mem_limit_mb="512" # mb peak_mem_usage=$(