Skip to content

Commit

Permalink
feat(register)!: network only store ops list
Browse files Browse the repository at this point in the history
BREAKING CHANGE!

crdt MerkleReg only stored in client locally
node side only store ops list (signed by owner)
  • Loading branch information
maqi authored and b-zee committed Oct 11, 2024
1 parent 0fb644d commit f4089d6
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 1,159 deletions.
158 changes: 78 additions & 80 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,22 @@
pub use bls::SecretKey as RegisterSecretKey;
use sn_evm::Amount;
use sn_evm::AttoTokens;
use sn_networking::GetRecordError;
use sn_networking::VerificationKind;
use sn_protocol::storage::RetryStrategy;
pub use sn_registers::{Permissions as RegisterPermissions, RegisterAddress};
use tracing::warn;

use crate::client::data::PayError;
use crate::client::Client;
use bytes::Bytes;
use evmlib::wallet::Wallet;
use libp2p::kad::{Quorum, Record};
use sn_networking::GetRecordCfg;
use sn_networking::NetworkError;
use sn_networking::PutRecordCfg;
use sn_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg};
use sn_protocol::storage::try_deserialize_record;
use sn_protocol::storage::try_serialize_record;
use sn_protocol::storage::RecordKind;
use sn_protocol::NetworkAddress;
use sn_registers::Register as ClientRegister;
use sn_registers::SignedRegister;
use sn_registers::{EntryHash, Permissions};
use sn_registers::Register as BaseRegister;
use sn_registers::{Permissions, RegisterCrdt, RegisterOp, SignedRegister};
use std::collections::BTreeSet;
use xor_name::XorName;

Expand All @@ -56,27 +51,67 @@ pub enum RegisterError {

#[derive(Clone, Debug)]
pub struct Register {
pub(crate) inner: SignedRegister,
signed_reg: SignedRegister,
crdt_reg: RegisterCrdt,
}

impl Register {
pub fn address(&self) -> &RegisterAddress {
self.inner.address()
self.signed_reg.address()
}

/// Retrieve the current values of the register. There can be multiple values
/// in case a register was updated concurrently. This is because of the nature
/// of registers, which allows for network concurrency.
pub fn values(&self) -> Vec<Bytes> {
self.inner
.clone()
.register()
.expect("register to be valid")
self.crdt_reg
.read()
.into_iter()
.map(|(_hash, value)| value.into())
.collect()
}

fn new(
initial_value: Option<Bytes>,
name: XorName,
owner: RegisterSecretKey,
permissions: RegisterPermissions,
) -> Result<Register, RegisterError> {
let pk = owner.public_key();

let base_register = BaseRegister::new(pk, name, permissions);

let signature = owner.sign(base_register.bytes().map_err(RegisterError::Write)?);
let signed_reg = SignedRegister::new(base_register, signature, BTreeSet::new());

let crdt_reg = RegisterCrdt::new(*signed_reg.address());

let mut register = Register {
signed_reg,
crdt_reg,
};

if let Some(value) = initial_value {
register.write_atop(&value, &owner)?;
}

Ok(register)
}

fn write_atop(&mut self, entry: &[u8], owner: &RegisterSecretKey) -> Result<(), RegisterError> {
let children: BTreeSet<_> = self.crdt_reg.read().iter().map(|(hash, _)| *hash).collect();

let (_hash, address, crdt_op) = self
.crdt_reg
.write(entry.to_vec(), &children)
.map_err(RegisterError::Write)?;

let op = RegisterOp::new(address, crdt_op, owner);

let _ = self.signed_reg.add_op(op);

Ok(())
}
}

impl Client {
Expand All @@ -99,9 +134,11 @@ impl Client {
is_register: true,
};

let register = match self.network.get_record_from_network(key, &get_cfg).await {
let signed_reg = match self.network.get_record_from_network(key, &get_cfg).await {
Ok(record) => {
try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?
let signed_reg: SignedRegister =
try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?;
signed_reg
}
// manage forked register case
Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => {
Expand All @@ -128,58 +165,33 @@ impl Client {
};

// Make sure the fetched record contains valid CRDT operations
register.verify().map_err(|err| {
error!("Failed to verify register {address:?} with error: {err}");
RegisterError::FailedVerification
})?;
signed_reg
.verify()
.map_err(|_| RegisterError::FailedVerification)?;

let mut crdt_reg = RegisterCrdt::new(*signed_reg.address());
for op in signed_reg.ops() {
if let Err(err) = crdt_reg.apply_op(op.clone()) {
return Err(RegisterError::Write(err));
}
}

Ok(Register { inner: register })
Ok(Register {
signed_reg,
crdt_reg,
})
}

/// Updates a Register on the network with a new value. This will overwrite existing value(s).
pub async fn register_update(
&self,
register: Register,
mut register: Register,
new_value: Bytes,
owner: RegisterSecretKey,
) -> Result<(), RegisterError> {
// Fetch the current register
let mut signed_register = register.inner;
let mut register = signed_register
.clone()
.register()
.map_err(|err| {
error!(
"Failed to get register from signed register as it failed verification: {err}"
);
RegisterError::FailedVerification
})?
.clone();
register.write_atop(&new_value, &owner)?;

info!("Updating register at addr: {}", register.address());

// Get all current branches
let children: BTreeSet<EntryHash> = register.read().into_iter().map(|(e, _)| e).collect();

// Write the new value to all branches
let (_, op) = register
.write(new_value.into(), &children, &owner)
.map_err(|err| {
error!(
"Failed to write to register at addr: {} : {err}",
register.address()
);
RegisterError::Write(err)
})?;

// Apply the operation to the register
signed_register.add_op(op.clone()).map_err(|err| {
error!(
"Failed to add op to register at addr: {} : {err}",
register.address()
);
RegisterError::Write(err)
})?;
let signed_register = register.signed_reg.clone();

// Prepare the record for network storage
let record = Record {
Expand Down Expand Up @@ -230,7 +242,7 @@ impl Client {
let pk = owner.public_key();
let name = XorName::from_content_parts(&[name.as_bytes()]);
let permissions = Permissions::new_with([pk]);
let register = ClientRegister::new(pk, name, permissions);
let register = Register::new(None, name, owner, permissions)?;
let reg_xor = register.address().xorname();

// get cost to store register
Expand Down Expand Up @@ -281,23 +293,14 @@ impl Client {
permissions: RegisterPermissions,
wallet: &Wallet,
) -> Result<Register, RegisterError> {
let pk = owner.public_key();
info!("Creating register with name: {name}");
let name = XorName::from_content_parts(&[name.as_bytes()]);

// Owner can write to the register.
let mut register = ClientRegister::new(pk, name, permissions);
let address = NetworkAddress::from_register_address(*register.address());

info!("Creating register at address: {address}");
let register = Register::new(Some(value), name, owner, permissions)?;
let address = register.address();

let entries = register
.read()
.into_iter()
.map(|(entry_hash, _value)| entry_hash)
.collect();

let _ = register.write(value.into(), &entries, &owner);
let reg_xor = register.address().xorname();
let reg_xor = address.xorname();
debug!("Paying for register at address: {address}");
let (payment_proofs, _skipped) = self
.pay(std::iter::once(reg_xor), wallet)
Expand All @@ -317,13 +320,10 @@ impl Client {
.to_peer_id_payee()
.ok_or(RegisterError::InvalidQuote)
.inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?;
let signed_register = register.clone().into_signed(&owner).map_err(|err| {
error!("Failed to sign register at address: {address} : {err}");
RegisterError::CouldNotSign(err)
})?;
let signed_register = register.signed_reg.clone();

let record = Record {
key: address.to_record_key(),
key: NetworkAddress::from_register_address(*address).to_record_key(),
value: try_serialize_record(
&(proof, &signed_register),
RecordKind::RegisterWithPayment,
Expand Down Expand Up @@ -356,8 +356,6 @@ impl Client {
error!("Failed to put record - register {address} to the network: {err}")
})?;

Ok(Register {
inner: signed_register,
})
Ok(register)
}
}
97 changes: 44 additions & 53 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,37 +314,6 @@ impl Client {
///
/// [Signature]
///
/// # Example
/// ```no_run
/// use sn_client::{Client, Error};
/// use bls::SecretKey;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(),Error>{
/// use tracing::callsite::register;
/// use xor_name::XorName;
/// use sn_registers::Register;
/// use sn_protocol::messages::RegisterCmd;
/// let client = Client::new(SecretKey::random(), None, None, None).await?;
///
/// // Set up register prerequisites
/// let mut rng = rand::thread_rng();
/// let xorname = XorName::random(&mut rng);
/// let owner_sk = SecretKey::random();
/// let owner_pk = owner_sk.public_key();
///
/// // set up register
/// let mut register = Register::new(owner_pk, xorname, Default::default());
/// let mut register_clone = register.clone();
///
/// // Use of client.sign() with register through RegisterCmd::Create
/// let cmd = RegisterCmd::Create {
/// register,
/// signature: client.sign(register_clone.bytes()?),
/// };
/// # Ok(())
/// # }
/// ```
pub fn sign<T: AsRef<[u8]>>(&self, data: T) -> Signature {
self.signer.sign(data)
}
Expand Down Expand Up @@ -1105,10 +1074,27 @@ fn merge_register_records(
mod tests {
use std::collections::BTreeSet;

use sn_registers::Register;
use sn_registers::{Register, RegisterCrdt, RegisterOp};

use super::*;

fn write_atop(
signed_reg: &mut SignedRegister,
crdt_reg: &mut RegisterCrdt,
entry: &[u8],
owner: &SecretKey,
) -> eyre::Result<()> {
let children: BTreeSet<_> = crdt_reg.read().iter().map(|(hash, _)| *hash).collect();

let (_hash, address, crdt_op) = crdt_reg.write(entry.to_vec(), &children)?;

let op = RegisterOp::new(address, crdt_op, owner);

signed_reg.add_op(op)?;

Ok(())
}

#[test]
fn test_merge_register_records() -> eyre::Result<()> {
let mut rng = rand::thread_rng();
Expand All @@ -1117,28 +1103,33 @@ mod tests {
let owner_pk = owner_sk.public_key();
let address = RegisterAddress::new(meta, owner_pk);

let base_register = Register::new(owner_pk, meta, Default::default());
let signature = owner_sk.sign(base_register.bytes()?);

// prepare registers
let mut register_root = Register::new(owner_pk, meta, Default::default());
let (root_hash, _) =
register_root.write(b"root_entry".to_vec(), &BTreeSet::default(), &owner_sk)?;
let root = BTreeSet::from_iter(vec![root_hash]);
let signed_root = register_root.clone().into_signed(&owner_sk)?;

let mut register1 = register_root.clone();
let (_hash, op1) = register1.write(b"entry1".to_vec(), &root, &owner_sk)?;
let mut signed_register1 = signed_root.clone();
signed_register1.add_op(op1)?;

let mut register2 = register_root.clone();
let (_hash, op2) = register2.write(b"entry2".to_vec(), &root, &owner_sk)?;
let mut signed_register2 = signed_root;
signed_register2.add_op(op2)?;

let mut register_bad = Register::new(owner_pk, meta, Default::default());
let (_hash, _op_bad) =
register_bad.write(b"bad_root".to_vec(), &BTreeSet::default(), &owner_sk)?;
let invalid_sig = register2.sign(&owner_sk)?; // steal sig from something else
let signed_register_bad = SignedRegister::new(register_bad, invalid_sig);
let mut register_root = SignedRegister::new(base_register, signature, BTreeSet::new());
let mut crdt_reg_root = RegisterCrdt::new(address);

write_atop(
&mut register_root,
&mut crdt_reg_root,
b"root_entry",
&owner_sk,
)?;

let mut signed_register1 = register_root.clone();
let mut crdt_reg1 = crdt_reg_root.clone();
write_atop(&mut signed_register1, &mut crdt_reg1, b"entry1", &owner_sk)?;

let mut signed_register2 = register_root.clone();
let mut crdt_reg2 = crdt_reg_root.clone();
write_atop(&mut signed_register2, &mut crdt_reg2, b"entry2", &owner_sk)?;

let base_register_bad = Register::new(owner_pk, meta, Default::default());
let bad_sk = SecretKey::random();
let signature_bad = bad_sk.sign(base_register_bad.bytes()?);
let signed_register_bad =
SignedRegister::new(base_register_bad, signature_bad, BTreeSet::new());

// prepare records
let record1 = Record {
Expand Down
Loading

0 comments on commit f4089d6

Please sign in to comment.