diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index 710dc7140..fe155cdf8 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -18,12 +18,12 @@ use crate::{ }; use self::client::SyncerClient; -use self::model::SyncOutgoingChanges; use self::model::{ data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData}, sync::ListChangesRequest, RecordType, SyncState, }; +use self::model::{DecryptionError, SyncOutgoingChanges}; pub(crate) mod client; pub(crate) mod model; @@ -201,22 +201,25 @@ impl SyncService { Ok(()) } - async fn handle_decryption(&self, new_record: Record) -> Result { - log::info!( + async fn handle_decryption( + &self, + new_record: Record, + ) -> Result { + log::debug!( "Handling decryption for record record_id {}", &new_record.id ); // Step 3: Check whether or not record is applicable (from its schema_version) if !new_record.is_applicable()? { - return Err(anyhow!("Record is not applicable: schema_version too high")); + return Err(DecryptionError::SchemaNotApplicable); } // Step 4: Check whether we already have this record, and if the revision is newer let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?; if let Some(sync_state) = &maybe_sync_state { if sync_state.record_revision >= new_record.revision { - return Err(anyhow!("Remote record revision is lower or equal to the persisted one. Skipping update.")); + return Err(DecryptionError::AlreadyPersisted); } } @@ -247,6 +250,8 @@ impl SyncService { }; let last_commit_time = maybe_outgoing_changes.map(|details| details.commit_time); + log::debug!("Successfully decrypted record {}", &decrypted_record.id); + Ok(DecryptionInfo { new_sync_state, record: decrypted_record, @@ -291,11 +296,14 @@ impl SyncService { // Step 3: Decrypt all the records, if possible. Filter those whose revision/schema is not // applicable + let mut succeded = vec![]; let mut decrypted: Vec = vec![]; for record in incoming_records { let record_id = record.id.clone(); match self.handle_decryption(record).await { Ok(decryption_info) => decrypted.push(decryption_info), + // If we already have this record, it should be cleaned from sync_incoming + Err(DecryptionError::AlreadyPersisted) => succeded.push(record_id), Err(e) => { log::debug!( "Could not handle decryption of incoming record {record_id}: {e:?}", @@ -313,8 +321,6 @@ impl SyncService { .into_iter() .partition(|result| result.record.data.is_swap()); - let mut succeded = vec![]; - // Step 5: Recover the swap records' data from onchain, and commit it for (decryption_info, swap) in self.handle_recovery(decrypted_swap_info).await? { if let Err(e) = self.commit_record(&decryption_info, Some(swap)) { @@ -347,7 +353,7 @@ impl SyncService { data_id: &str, record_type: RecordType, ) -> Result<()> { - log::info!("Handling push for record record_id {record_id} data_id {data_id}"); + log::debug!("Handling push for record record_id {record_id} data_id {data_id}"); // Step 1: Get the sync state, if it exists, to compute the revision let maybe_sync_state = self.persister.get_sync_state_by_record_id(record_id)?; diff --git a/lib/core/src/sync/model/mod.rs b/lib/core/src/sync/model/mod.rs index f137d5762..61d795b5b 100644 --- a/lib/core/src/sync/model/mod.rs +++ b/lib/core/src/sync/model/mod.rs @@ -78,6 +78,46 @@ pub(crate) struct DecryptedRecord { pub(crate) data: SyncData, } +#[derive(thiserror::Error, Debug)] +pub(crate) enum DecryptionError { + #[error("Record is not applicable: schema_version too high")] + SchemaNotApplicable, + + #[error("Remote record revision is lower or equal to the persisted one. Skipping update.")] + AlreadyPersisted, + + #[error("Could not decrypt payload with signer: {err}")] + InvalidPayload { err: String }, + + #[error("Could not deserialize JSON bytes: {err}")] + DeserializeError { err: String }, + + #[error("Generic error: {err}")] + Generic { err: String }, +} + +impl DecryptionError { + pub(crate) fn invalid_payload(err: crate::model::SignerError) -> Self { + Self::InvalidPayload { + err: err.to_string(), + } + } + + pub(crate) fn deserialize_error(err: serde_json::Error) -> Self { + Self::DeserializeError { + err: err.to_string(), + } + } +} + +impl From for DecryptionError { + fn from(value: anyhow::Error) -> Self { + Self::Generic { + err: value.to_string(), + } + } +} + pub(crate) struct DecryptionInfo { pub(crate) new_sync_state: SyncState, pub(crate) record: DecryptedRecord, @@ -135,9 +175,14 @@ impl Record { Ok(CURRENT_SCHEMA_VERSION.major >= record_version.major) } - pub(crate) fn decrypt(self, signer: Arc>) -> Result { - let dec_data = signer.ecies_decrypt(self.data)?; - let data = serde_json::from_slice(&dec_data)?; + pub(crate) fn decrypt( + self, + signer: Arc>, + ) -> Result { + let dec_data = signer + .ecies_decrypt(self.data) + .map_err(DecryptionError::invalid_payload)?; + let data = serde_json::from_slice(&dec_data).map_err(DecryptionError::deserialize_error)?; Ok(DecryptedRecord { id: self.id, revision: self.revision,