Skip to content

Commit

Permalink
fix(rt-sync): clean already persisted incoming records
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Dec 12, 2024
1 parent 87cf5c4 commit 35b0a8b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
22 changes: 14 additions & 8 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use crate::{
};

use self::client::SyncerClient;
use self::model::SyncOutgoingChanges;
use self::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData},
sync::ListChangesRequest,
RecordType, SyncState,
};
use self::model::{DecryptionError, SyncOutgoingChanges};

pub(crate) mod client;
pub(crate) mod model;
Expand Down Expand Up @@ -201,22 +201,25 @@ impl SyncService {
Ok(())
}

async fn handle_decryption(&self, new_record: Record) -> Result<DecryptionInfo> {
log::info!(
async fn handle_decryption(
&self,
new_record: Record,
) -> Result<DecryptionInfo, DecryptionError> {
log::debug!(
"Handling decryption for record record_id {}",
&new_record.id
);

// Step 3: Check whether or not record is applicable (from its schema_version)
if !new_record.is_applicable()? {
return Err(anyhow!("Record is not applicable: schema_version too high"));
return Err(DecryptionError::SchemaNotApplicable);
}

// Step 4: Check whether we already have this record, and if the revision is newer
let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?;
if let Some(sync_state) = &maybe_sync_state {
if sync_state.record_revision >= new_record.revision {
return Err(anyhow!("Remote record revision is lower or equal to the persisted one. Skipping update."));
return Err(DecryptionError::AlreadyPersisted);
}
}

Expand Down Expand Up @@ -247,6 +250,8 @@ impl SyncService {
};
let last_commit_time = maybe_outgoing_changes.map(|details| details.commit_time);

log::debug!("Successfully decrypted record {}", &decrypted_record.id);

Ok(DecryptionInfo {
new_sync_state,
record: decrypted_record,
Expand Down Expand Up @@ -291,11 +296,14 @@ impl SyncService {

// Step 3: Decrypt all the records, if possible. Filter those whose revision/schema is not
// applicable
let mut succeded = vec![];
let mut decrypted: Vec<DecryptionInfo> = vec![];
for record in incoming_records {
let record_id = record.id.clone();
match self.handle_decryption(record).await {
Ok(decryption_info) => decrypted.push(decryption_info),
// If we already have this record, it should be cleaned from sync_incoming
Err(DecryptionError::AlreadyPersisted) => succeded.push(record_id),
Err(e) => {
log::debug!(
"Could not handle decryption of incoming record {record_id}: {e:?}",
Expand All @@ -313,8 +321,6 @@ impl SyncService {
.into_iter()
.partition(|result| result.record.data.is_swap());

let mut succeded = vec![];

// Step 5: Recover the swap records' data from onchain, and commit it
for (decryption_info, swap) in self.handle_recovery(decrypted_swap_info).await? {
if let Err(e) = self.commit_record(&decryption_info, Some(swap)) {
Expand Down Expand Up @@ -347,7 +353,7 @@ impl SyncService {
data_id: &str,
record_type: RecordType,
) -> Result<()> {
log::info!("Handling push for record record_id {record_id} data_id {data_id}");
log::debug!("Handling push for record record_id {record_id} data_id {data_id}");

// Step 1: Get the sync state, if it exists, to compute the revision
let maybe_sync_state = self.persister.get_sync_state_by_record_id(record_id)?;
Expand Down
51 changes: 48 additions & 3 deletions lib/core/src/sync/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,46 @@ pub(crate) struct DecryptedRecord {
pub(crate) data: SyncData,
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum DecryptionError {
#[error("Record is not applicable: schema_version too high")]
SchemaNotApplicable,

#[error("Remote record revision is lower or equal to the persisted one. Skipping update.")]
AlreadyPersisted,

#[error("Could not decrypt payload with signer: {err}")]
InvalidPayload { err: String },

#[error("Could not deserialize JSON bytes: {err}")]
DeserializeError { err: String },

#[error("Generic error: {err}")]
Generic { err: String },
}

impl DecryptionError {
pub(crate) fn invalid_payload(err: crate::model::SignerError) -> Self {
Self::InvalidPayload {
err: err.to_string(),
}
}

pub(crate) fn deserialize_error(err: serde_json::Error) -> Self {
Self::DeserializeError {
err: err.to_string(),
}
}
}

impl From<anyhow::Error> for DecryptionError {
fn from(value: anyhow::Error) -> Self {
Self::Generic {
err: value.to_string(),
}
}
}

pub(crate) struct DecryptionInfo {
pub(crate) new_sync_state: SyncState,
pub(crate) record: DecryptedRecord,
Expand Down Expand Up @@ -135,9 +175,14 @@ impl Record {
Ok(CURRENT_SCHEMA_VERSION.major >= record_version.major)
}

pub(crate) fn decrypt(self, signer: Arc<Box<dyn Signer>>) -> Result<DecryptedRecord> {
let dec_data = signer.ecies_decrypt(self.data)?;
let data = serde_json::from_slice(&dec_data)?;
pub(crate) fn decrypt(
self,
signer: Arc<Box<dyn Signer>>,
) -> Result<DecryptedRecord, DecryptionError> {
let dec_data = signer
.ecies_decrypt(self.data)
.map_err(DecryptionError::invalid_payload)?;
let data = serde_json::from_slice(&dec_data).map_err(DecryptionError::deserialize_error)?;
Ok(DecryptedRecord {
id: self.id,
revision: self.revision,
Expand Down

0 comments on commit 35b0a8b

Please sign in to comment.