diff --git a/lib/core/src/persist/sync.rs b/lib/core/src/persist/sync.rs index 63b29712c..8d8a9c62e 100644 --- a/lib/core/src/persist/sync.rs +++ b/lib/core/src/persist/sync.rs @@ -57,7 +57,7 @@ impl Persister { Ok(sync_state) } - fn set_sync_state_stmt(con: &Connection) -> Result { + fn set_sync_state_stmt(con: &Connection) -> rusqlite::Result { con.prepare( " INSERT OR REPLACE INTO sync_state(data_id, record_id, record_revision, is_local) @@ -190,41 +190,50 @@ impl Persister { Ok(()) } - pub(crate) fn get_sync_outgoing_details( - &self, - record_id: &str, - ) -> Result> { - let con = self.get_connection()?; - - let mut stmt = con.prepare( + fn get_sync_outgoing_details_stmt(con: &Connection) -> rusqlite::Result { + con.prepare( " SELECT + record_id, record_type, updated_at, updated_fields_json FROM sync_outgoing WHERE record_id = :id ", - )?; + ) + } + + fn sql_row_to_sync_outgoing_details(row: &Row) -> Result { + let record_id = row.get(0)?; + let record_type = row.get(1)?; + let updated_at = row.get(2)?; + let updated_fields = match row.get::<_, Option>(3)? { + Some(fields) => Some(serde_json::from_str(&fields)?), + None => None, + }; + Ok(SyncOutgoingDetails { + record_id, + record_type, + updated_at, + updated_fields, + }) + } + + pub(crate) fn get_sync_outgoing_details( + &self, + record_id: &str, + ) -> Result> { + let con = self.get_connection()?; + + let mut stmt = Self::get_sync_outgoing_details_stmt(&con)?; let mut rows = stmt.query(named_params! { ":id": record_id, })?; if let Some(row) = rows.next()? { - let maybe_record_type: Option = row.get(0)?; - let updated_at = row.get(1)?; - let updated_fields = match row.get::<_, Option>(2)? { - Some(fields) => Some(serde_json::from_str(&fields)?), - None => None, - }; - - return Ok(maybe_record_type.map(|record_type| SyncOutgoingDetails { - record_id: record_id.to_string(), - record_type, - updated_at, - updated_fields, - })); + return Ok(Some(Self::sql_row_to_sync_outgoing_details(row)?)); } Ok(None) @@ -267,14 +276,36 @@ impl Persister { Ok(()) } + fn check_commit_update(con: &Connection, record_id: &str, last_updated_at: u32) -> Result<()> { + let mut stmt = Self::get_sync_outgoing_details_stmt(con)?; + let mut rows = stmt.query(named_params! { + ":id": record_id, + })?; + + if let Some(row) = rows.next()? { + let sync_outgoing_details = Self::sql_row_to_sync_outgoing_details(row)?; + + if sync_outgoing_details.updated_at > last_updated_at { + return Err(anyhow::anyhow!("Record has been updated while pulling")); + } + } + + Ok(()) + } + pub(crate) fn commit_receive_swap( &self, data: &ReceiveSyncData, sync_state: SyncState, + last_updated_at: Option, ) -> Result<()> { let mut con = self.get_connection()?; let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + if let Some(last_updated_at) = last_updated_at { + Self::check_commit_update(&tx, &sync_state.record_id, last_updated_at)?; + } + tx.execute( " UPDATE receive_swaps @@ -325,10 +356,15 @@ impl Persister { &self, data: &SendSyncData, sync_state: SyncState, + last_updated_at: Option, ) -> Result<()> { let mut con = self.get_connection()?; let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + if let Some(last_updated_at) = last_updated_at { + Self::check_commit_update(&tx, &sync_state.record_id, last_updated_at)?; + } + tx.execute( " UPDATE send_swaps @@ -373,10 +409,15 @@ impl Persister { &self, data: &ChainSyncData, sync_state: SyncState, + last_updated_at: Option, ) -> Result<()> { let mut con = self.get_connection()?; let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + if let Some(last_updated_at) = last_updated_at { + Self::check_commit_update(&tx, &sync_state.record_id, last_updated_at)?; + } + tx.execute( " UPDATE chain_swaps diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index 23c64f572..6a91eaa26 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -54,15 +54,21 @@ impl SyncService { &self, decrypted_record: &DecryptedRecord, sync_state: SyncState, + last_updated_at: Option, ) -> Result<()> { match decrypted_record.data.clone() { SyncData::Chain(chain_data) => { - self.persister.commit_chain_swap(&chain_data, sync_state)? + self.persister + .commit_chain_swap(&chain_data, sync_state, last_updated_at)? + } + SyncData::Send(send_data) => { + self.persister + .commit_send_swap(&send_data, sync_state, last_updated_at)? + } + SyncData::Receive(receive_data) => { + self.persister + .commit_receive_swap(&receive_data, sync_state, last_updated_at)? } - SyncData::Send(send_data) => self.persister.commit_send_swap(&send_data, sync_state)?, - SyncData::Receive(receive_data) => self - .persister - .commit_receive_swap(&receive_data, sync_state)?, } Ok(()) } @@ -179,8 +185,9 @@ impl SyncService { .map(|state| state.is_local) .unwrap_or(false), }; + let last_updated_at = maybe_outgoing_details.map(|details| details.updated_at); unwrap_or_continue!( - self.commit_record(&decrypted_record, new_sync_state), + self.commit_record(&decrypted_record, new_sync_state, last_updated_at,), "Could not commit record: {}" ); }