Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction fixes and Client API (atop new quoting) #2522

Merged
4 changes: 3 additions & 1 deletion .github/workflows/benchmark-prs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ jobs:
###########################
### Client Mem Analysis ###
###########################
### The peak limit shall be restored back to 50MB,
### Once client side chunking/quoting flow got re-examined.

- name: Check client memory usage
shell: bash
run: |
client_peak_mem_limit_mb="1024" # mb
client_peak_mem_limit_mb="1500" # mb
client_avg_mem_limit_mb="512" # mb

peak_mem_usage=$(
Expand Down
46 changes: 1 addition & 45 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,50 +1063,6 @@ impl Network {
send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
pub async fn get_close_group_closest_peers(
&self,
key: &NetworkAddress,
client: bool,
) -> Result<Vec<PeerId>> {
debug!("Getting the closest peers to {key:?}");
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
key: key.clone(),
sender,
});
let k_bucket_peers = receiver.await?;

// Count self in if among the CLOSE_GROUP_SIZE closest and sort the result
let result_len = k_bucket_peers.len();
let mut closest_peers = k_bucket_peers;
// ensure we're not including self here
if client {
// remove our peer id from the calculations here:
closest_peers.retain(|&x| x != self.peer_id());
if result_len != closest_peers.len() {
info!("Remove self client from the closest_peers");
}
}
if tracing::level_enabled!(tracing::Level::DEBUG) {
let close_peers_pretty_print: Vec<_> = closest_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();

debug!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}");
}

let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?;
Ok(closest_peers.into_iter().cloned().collect())
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
///
Expand Down Expand Up @@ -1155,7 +1111,7 @@ impl Network {
);
}

let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?;
let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE + 1)?;
Ok(closest_peers.into_iter().cloned().collect())
}

Expand Down
2 changes: 1 addition & 1 deletion ant-node/src/log_markers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum Marker<'a> {
/// Valid paid to us and royalty paid register stored
ValidPaidRegisterPutFromClient(&'a PrettyPrintRecordKey<'a>),
/// Valid transaction stored
ValidSpendPutFromClient(&'a PrettyPrintRecordKey<'a>),
ValidTransactionPutFromClient(&'a PrettyPrintRecordKey<'a>),
/// Valid scratchpad stored
ValidScratchpadRecordPutFromClient(&'a PrettyPrintRecordKey<'a>),

Expand Down
49 changes: 20 additions & 29 deletions ant-node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use std::collections::BTreeSet;

use crate::{node::Node, Error, Marker, Result};
use ant_evm::payment_vault::verify_data_payment;
use ant_evm::{AttoTokens, ProofOfPayment};
Expand Down Expand Up @@ -162,29 +164,11 @@ impl Node {
.await
}
RecordKind::Transaction => {
let record_key = record.key.clone();
let value_to_hash = record.value.clone();
let transactions = try_deserialize_record::<Vec<Transaction>>(&record)?;
let result = self
.validate_merge_and_store_transactions(transactions, &record_key)
.await;
if result.is_ok() {
Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log();
let content_hash = XorName::from_content(&value_to_hash);
self.replicate_valid_fresh_record(
record_key,
RecordType::NonChunk(content_hash),
);

// Notify replication_fetcher to mark the attempt as completed.
// Send the notification earlier to avoid it got skipped due to:
// the record becomes stored during the fetch because of other interleaved process.
self.network().notify_fetch_completed(
record.key.clone(),
RecordType::NonChunk(content_hash),
);
}
result
// Transactions should always be paid for
error!("Transaction should not be validated at this point");
Err(Error::InvalidPutWithoutPayment(
PrettyPrintRecordKey::from(&record.key).into_owned(),
))
}
RecordKind::TransactionWithPayment => {
let (payment, transaction) =
Expand Down Expand Up @@ -224,6 +208,12 @@ impl Node {
.await;
if res.is_ok() {
let content_hash = XorName::from_content(&record.value);
Marker::ValidTransactionPutFromClient(&PrettyPrintRecordKey::from(&record.key))
.log();
self.replicate_valid_fresh_record(
record.key.clone(),
RecordType::NonChunk(content_hash),
);

// Notify replication_fetcher to mark the attempt as completed.
// Send the notification earlier to avoid it got skipped due to:
Expand Down Expand Up @@ -601,23 +591,24 @@ impl Node {
}

// verify the transactions
let mut validated_transactions: Vec<Transaction> = transactions_for_key
let mut validated_transactions: BTreeSet<Transaction> = transactions_for_key
.into_iter()
.filter(|t| t.verify())
.collect();

// skip if none are valid
let addr = match validated_transactions.as_slice() {
[] => {
let addr = match validated_transactions.first() {
None => {
warn!("Found no validated transactions to store at {pretty_key:?}");
return Ok(());
}
[t, ..] => t.address(),
Some(t) => t.address(),
};

// add local transactions to the validated transactions
// add local transactions to the validated transactions, turn to Vec
let local_txs = self.get_local_transactions(addr).await?;
validated_transactions.extend(local_txs);
validated_transactions.extend(local_txs.into_iter());
let validated_transactions: Vec<Transaction> = validated_transactions.into_iter().collect();

// store the record into the local storage
let record = Record {
Expand Down
49 changes: 39 additions & 10 deletions ant-protocol/src/storage/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::address::TransactionAddress;
use bls::SecretKey;
use serde::{Deserialize, Serialize};

// re-exports
Expand All @@ -27,13 +28,15 @@ pub struct Transaction {
}

impl Transaction {
/// Create a new transaction, signing it with the provided secret key.
pub fn new(
owner: PublicKey,
parents: Vec<PublicKey>,
content: TransactionContent,
outputs: Vec<(PublicKey, TransactionContent)>,
signature: Signature,
signing_key: &SecretKey,
) -> Self {
let signature = signing_key.sign(Self::bytes_to_sign(&owner, &parents, &content, &outputs));
Self {
owner,
parents,
Expand All @@ -43,35 +46,61 @@ impl Transaction {
}
}

pub fn address(&self) -> TransactionAddress {
TransactionAddress::from_owner(self.owner)
/// Create a new transaction, with the signature already calculated.
pub fn new_with_signature(
owner: PublicKey,
parents: Vec<PublicKey>,
content: TransactionContent,
outputs: Vec<(PublicKey, TransactionContent)>,
signature: Signature,
) -> Self {
Self {
owner,
parents,
content,
outputs,
signature,
}
}

pub fn bytes_for_signature(&self) -> Vec<u8> {
/// Get the bytes that the signature is calculated from.
pub fn bytes_to_sign(
owner: &PublicKey,
parents: &[PublicKey],
content: &[u8],
outputs: &[(PublicKey, TransactionContent)],
) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(&self.owner.to_bytes());
bytes.extend_from_slice(&owner.to_bytes());
bytes.extend_from_slice("parent".as_bytes());
bytes.extend_from_slice(
&self
.parents
&parents
.iter()
.map(|p| p.to_bytes())
.collect::<Vec<_>>()
.concat(),
);
bytes.extend_from_slice("content".as_bytes());
bytes.extend_from_slice(&self.content);
bytes.extend_from_slice(content);
bytes.extend_from_slice("outputs".as_bytes());
bytes.extend_from_slice(
&self
.outputs
&outputs
.iter()
.flat_map(|(p, c)| [&p.to_bytes(), c.as_slice()].concat())
.collect::<Vec<_>>(),
);
bytes
}

pub fn address(&self) -> TransactionAddress {
TransactionAddress::from_owner(self.owner)
}

/// Get the bytes that the signature is calculated from.
pub fn bytes_for_signature(&self) -> Vec<u8> {
Self::bytes_to_sign(&self.owner, &self.parents, &self.content, &self.outputs)
}

pub fn verify(&self) -> bool {
self.owner
.verify(&self.signature, self.bytes_for_signature())
Expand Down
4 changes: 3 additions & 1 deletion autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ pub mod payment;
pub mod quote;

pub mod data;
pub mod files;
pub mod transactions;

#[cfg(feature = "external-signer")]
#[cfg_attr(docsrs, doc(cfg(feature = "external-signer")))]
pub mod external_signer;
pub mod files;
#[cfg(feature = "registers")]
#[cfg_attr(docsrs, doc(cfg(feature = "registers")))]
pub mod registers;
Expand Down
13 changes: 2 additions & 11 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Client {
name: String,
owner: RegisterSecretKey,
) -> Result<AttoTokens, RegisterError> {
info!("Getting cost for register with name: {name}");
trace!("Getting cost for register with name: {name}");
// get register address
let pk = owner.public_key();
let name = XorName::from_content_parts(&[name.as_bytes()]);
Expand Down Expand Up @@ -321,15 +321,6 @@ impl Client {
};

let payees = proof.payees();

if payees.is_empty() {
error!(
"Failed to get payees from payment proof: {:?}",
RegisterError::PayeesMissing
);
return Err(RegisterError::PayeesMissing);
}

let signed_register = register.signed_reg.clone();

let record = Record {
Expand All @@ -356,7 +347,7 @@ impl Client {
put_quorum: Quorum::All,
retry_strategy: None,
use_put_record_to: Some(payees),
verification: Some((VerificationKind::Network, get_cfg)),
verification: Some((VerificationKind::Crdt, get_cfg)),
};

debug!("Storing register at address {address} to the network");
Expand Down
Loading
Loading