Skip to content

Commit

Permalink
feat(register)!: network only store ops list
Browse files Browse the repository at this point in the history
BREAKING CHANGE!

crdt MerkleReg only stored in client locally
node side only store ops list (signed by owner)
  • Loading branch information
maqi committed Sep 24, 2024
1 parent 43577b7 commit 04916f1
Show file tree
Hide file tree
Showing 12 changed files with 499 additions and 1,142 deletions.
132 changes: 73 additions & 59 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use libp2p::kad::{Quorum, Record};
use sn_client::networking::GetRecordCfg;
use sn_client::networking::NetworkError;
use sn_client::networking::PutRecordCfg;
use sn_client::registers::EntryHash;
use sn_client::registers::Permissions;
use sn_client::registers::Register as ClientRegister;
use sn_client::registers::Register as BaseRegister;
use sn_client::registers::RegisterCrdt;
use sn_client::registers::RegisterOp;
use sn_client::registers::SignedRegister;
use sn_client::transfers::HotWallet;
use sn_protocol::storage::try_deserialize_record;
Expand Down Expand Up @@ -42,27 +43,68 @@ pub enum RegisterError {

#[derive(Clone, Debug)]
pub struct Register {
inner: SignedRegister,
signed_reg: SignedRegister,
crdt_reg: RegisterCrdt,
}

impl Register {
pub fn address(&self) -> &RegisterAddress {
self.inner.address()
self.signed_reg.address()
}

/// Retrieve the current values of the register. There can be multiple values
/// in case a register was updated concurrently. This is because of the nature
/// of registers, which allows for network concurrency.
pub fn values(&self) -> Vec<Bytes> {
self.inner
.clone()
.register()
.expect("register to be valid")
self.crdt_reg
.read()
.into_iter()
.map(|(_hash, value)| value.into())
.collect()
}

pub fn new(
initial_value: Option<Bytes>,
name: XorName,
owner: SecretKey,
) -> Result<Register, RegisterError> {
let pk = owner.public_key();
// Only owner can write to the register.
let permissions = Permissions::new_with([pk]);

let base_register = BaseRegister::new(pk, name, permissions);

let signature = owner.sign(base_register.bytes().map_err(RegisterError::Write)?);
let signed_reg = SignedRegister::new(base_register, signature, BTreeSet::new());

let crdt_reg = RegisterCrdt::new(*signed_reg.address());

let mut register = Register {
signed_reg,
crdt_reg,
};

if let Some(value) = initial_value {
register.write_atop(&value, &owner)?;
}

Ok(register)
}

pub fn write_atop(&mut self, entry: &[u8], owner: &SecretKey) -> Result<(), RegisterError> {
let children: BTreeSet<_> = self.crdt_reg.read().iter().map(|(hash, _)| *hash).collect();

let (_hash, address, crdt_op) = self
.crdt_reg
.write(entry.to_vec(), &children)
.map_err(RegisterError::Write)?;

let op = RegisterOp::new(address, crdt_op, owner);

let _ = self.signed_reg.add_op(op);

Ok(())
}
}

impl Client {
Expand All @@ -74,36 +116,17 @@ impl Client {
owner: SecretKey,
wallet: &mut HotWallet,
) -> Result<Register, RegisterError> {
let pk = owner.public_key();

// Owner can write to the register.
let permissions = Permissions::new_with([pk]);
let mut register = ClientRegister::new(pk, name, permissions);
let address = NetworkAddress::from_register_address(*register.address());

let entries = register
.read()
.into_iter()
.map(|(entry_hash, _value)| entry_hash)
.collect();
register
.write(value.into(), &entries, &owner)
.map_err(RegisterError::Write)?;
let register = Register::new(Some(value), name, owner)?;
let address = register.address();

let _payment_result = self
.pay(std::iter::once(register.address().xorname()), wallet)
.await?;
let _payment_result = self.pay(std::iter::once(address.xorname()), wallet).await?;

let (payment, payee) =
self.get_recent_payment_for_addr(&register.address().xorname(), wallet)?;
let (payment, payee) = self.get_recent_payment_for_addr(&address.xorname(), wallet)?;

let signed_register = register
.clone()
.into_signed(&owner)
.map_err(RegisterError::CouldNotSign)?;
let signed_register = register.signed_reg.clone();

let record = Record {
key: address.to_record_key(),
key: NetworkAddress::from_register_address(*address).to_record_key(),
value: try_serialize_record(
&(payment, &signed_register),
RecordKind::RegisterWithPayment,
Expand All @@ -123,9 +146,7 @@ impl Client {

self.network.put_record(record, &put_cfg).await?;

Ok(Register {
inner: signed_register,
})
Ok(register)
}

/// Fetches a Register from the network.
Expand All @@ -146,44 +167,37 @@ impl Client {

let record = self.network.get_record_from_network(key, &get_cfg).await?;

let register: SignedRegister =
let signed_reg: SignedRegister =
try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?;

// Make sure the fetched record contains valid CRDT operations
register
signed_reg
.verify()
.map_err(|_| RegisterError::FailedVerification)?;

Ok(Register { inner: register })
let mut crdt_reg = RegisterCrdt::new(*signed_reg.address());
for op in signed_reg.ops() {
if let Err(err) = crdt_reg.apply_op(op.clone()) {
return Err(RegisterError::Write(err));
}
}

Ok(Register {
signed_reg,
crdt_reg,
})
}

/// Updates a Register on the network with a new value. This will overwrite existing value(s).
pub async fn update_register(
&self,
register: Register,
mut register: Register,
new_value: Bytes,
owner: SecretKey,
) -> Result<(), RegisterError> {
// Fetch the current register
let mut signed_register = register.inner;
let mut register = signed_register
.clone()
.register()
.expect("register to be valid")
.clone();

// Get all current branches
let children: BTreeSet<EntryHash> = register.read().into_iter().map(|(e, _)| e).collect();

// Write the new value to all branches
let (_, op) = register
.write(new_value.to_vec(), &children, &owner)
.map_err(RegisterError::Write)?;
register.write_atop(&new_value, &owner)?;

// Apply the operation to the register
signed_register
.add_op(op.clone())
.map_err(RegisterError::Write)?;
let signed_register = register.signed_reg.clone();

// Prepare the record for network storage
let record = Record {
Expand Down
96 changes: 43 additions & 53 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,37 +314,6 @@ impl Client {
///
/// [Signature]
///
/// # Example
/// ```no_run
/// use sn_client::{Client, Error};
/// use bls::SecretKey;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(),Error>{
/// use tracing::callsite::register;
/// use xor_name::XorName;
/// use sn_registers::Register;
/// use sn_protocol::messages::RegisterCmd;
/// let client = Client::new(SecretKey::random(), None, None, None).await?;
///
/// // Set up register prerequisites
/// let mut rng = rand::thread_rng();
/// let xorname = XorName::random(&mut rng);
/// let owner_sk = SecretKey::random();
/// let owner_pk = owner_sk.public_key();
///
/// // set up register
/// let mut register = Register::new(owner_pk, xorname, Default::default());
/// let mut register_clone = register.clone();
///
/// // Use of client.sign() with register through RegisterCmd::Create
/// let cmd = RegisterCmd::Create {
/// register,
/// signature: client.sign(register_clone.bytes()?),
/// };
/// # Ok(())
/// # }
/// ```
pub fn sign<T: AsRef<[u8]>>(&self, data: T) -> Signature {
self.signer.sign(data)
}
Expand Down Expand Up @@ -1105,10 +1074,27 @@ fn merge_register_records(
mod tests {
use std::collections::BTreeSet;

use sn_registers::Register;
use sn_registers::{Register, RegisterCrdt, RegisterOp};

use super::*;

fn write_atop(
signed_reg: &mut SignedRegister,
crdt_reg: &mut RegisterCrdt,
entry: &[u8],
owner: &SecretKey,
) -> eyre::Result<()> {
let children: BTreeSet<_> = crdt_reg.read().iter().map(|(hash, _)| *hash).collect();

let (_hash, address, crdt_op) = crdt_reg.write(entry.to_vec(), &children)?;

let op = RegisterOp::new(address, crdt_op, owner);

signed_reg.add_op(op)?;

Ok(())
}

#[test]
fn test_merge_register_records() -> eyre::Result<()> {
let mut rng = rand::thread_rng();
Expand All @@ -1117,28 +1103,32 @@ mod tests {
let owner_pk = owner_sk.public_key();
let address = RegisterAddress::new(meta, owner_pk);

let base_register = Register::new(owner_pk, meta, Default::default());
let signature = owner_sk.sign(base_register.bytes()?);

// prepare registers
let mut register_root = Register::new(owner_pk, meta, Default::default());
let (root_hash, _) =
register_root.write(b"root_entry".to_vec(), &BTreeSet::default(), &owner_sk)?;
let root = BTreeSet::from_iter(vec![root_hash]);
let signed_root = register_root.clone().into_signed(&owner_sk)?;

let mut register1 = register_root.clone();
let (_hash, op1) = register1.write(b"entry1".to_vec(), &root, &owner_sk)?;
let mut signed_register1 = signed_root.clone();
signed_register1.add_op(op1)?;

let mut register2 = register_root.clone();
let (_hash, op2) = register2.write(b"entry2".to_vec(), &root, &owner_sk)?;
let mut signed_register2 = signed_root;
signed_register2.add_op(op2)?;

let mut register_bad = Register::new(owner_pk, meta, Default::default());
let (_hash, _op_bad) =
register_bad.write(b"bad_root".to_vec(), &BTreeSet::default(), &owner_sk)?;
let invalid_sig = register2.sign(&owner_sk)?; // steal sig from something else
let signed_register_bad = SignedRegister::new(register_bad, invalid_sig);
let mut register_root = SignedRegister::new(base_register, signature, BTreeSet::new());
let mut crdt_reg_root = RegisterCrdt::new(address);

write_atop(
&mut register_root,
&mut crdt_reg_root,
b"root_entry",
&owner_sk,
)?;

let mut signed_register1 = register_root.clone();
let mut crdt_reg1 = crdt_reg_root.clone();
write_atop(&mut signed_register1, &mut crdt_reg1, b"entry1", &owner_sk)?;

let mut signed_register2 = register_root.clone();
let mut crdt_reg2 = crdt_reg_root.clone();
write_atop(&mut signed_register2, &mut crdt_reg2, b"entry2", &owner_sk)?;

let base_register_bad = Register::new(owner_pk, meta, Permissions::AnyoneCanWrite);
let signature_bad = owner_sk.sign(base_register_bad.bytes()?);
let signed_register_bad =
SignedRegister::new(base_register_bad, signature_bad, BTreeSet::new());

// prepare records
let record1 = Record {
Expand Down
Loading

0 comments on commit 04916f1

Please sign in to comment.