Skip to content

Commit

Permalink
feat: add commit updated_at check
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 13, 2024
1 parent 616affe commit 9a63135
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 28 deletions.
85 changes: 63 additions & 22 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Persister {
Ok(sync_state)
}

fn set_sync_state_stmt(con: &Connection) -> Result<Statement, rusqlite::Error> {
fn set_sync_state_stmt(con: &Connection) -> rusqlite::Result<Statement> {
con.prepare(
"
INSERT OR REPLACE INTO sync_state(data_id, record_id, record_revision, is_local)
Expand Down Expand Up @@ -190,41 +190,50 @@ impl Persister {
Ok(())
}

pub(crate) fn get_sync_outgoing_details(
&self,
record_id: &str,
) -> Result<Option<SyncOutgoingDetails>> {
let con = self.get_connection()?;

let mut stmt = con.prepare(
fn get_sync_outgoing_details_stmt(con: &Connection) -> rusqlite::Result<Statement> {
con.prepare(
"
SELECT
record_id,
record_type,
updated_at,
updated_fields_json
FROM sync_outgoing
WHERE record_id = :id
",
)?;
)
}

fn sql_row_to_sync_outgoing_details(row: &Row) -> Result<SyncOutgoingDetails> {
let record_id = row.get(0)?;
let record_type = row.get(1)?;
let updated_at = row.get(2)?;
let updated_fields = match row.get::<_, Option<String>>(3)? {
Some(fields) => Some(serde_json::from_str(&fields)?),
None => None,
};

Ok(SyncOutgoingDetails {
record_id,
record_type,
updated_at,
updated_fields,
})
}

pub(crate) fn get_sync_outgoing_details(
&self,
record_id: &str,
) -> Result<Option<SyncOutgoingDetails>> {
let con = self.get_connection()?;

let mut stmt = Self::get_sync_outgoing_details_stmt(&con)?;
let mut rows = stmt.query(named_params! {
":id": record_id,
})?;

if let Some(row) = rows.next()? {
let maybe_record_type: Option<RecordType> = row.get(0)?;
let updated_at = row.get(1)?;
let updated_fields = match row.get::<_, Option<String>>(2)? {
Some(fields) => Some(serde_json::from_str(&fields)?),
None => None,
};

return Ok(maybe_record_type.map(|record_type| SyncOutgoingDetails {
record_id: record_id.to_string(),
record_type,
updated_at,
updated_fields,
}));
return Ok(Some(Self::sql_row_to_sync_outgoing_details(row)?));
}

Ok(None)
Expand Down Expand Up @@ -267,14 +276,36 @@ impl Persister {
Ok(())
}

fn check_commit_update(con: &Connection, record_id: &str, last_updated_at: u32) -> Result<()> {
let mut stmt = Self::get_sync_outgoing_details_stmt(con)?;
let mut rows = stmt.query(named_params! {
":id": record_id,
})?;

if let Some(row) = rows.next()? {
let sync_outgoing_details = Self::sql_row_to_sync_outgoing_details(row)?;

if sync_outgoing_details.updated_at > last_updated_at {
return Err(anyhow::anyhow!("Record has been updated while pulling"));
}
}

Ok(())
}

pub(crate) fn commit_receive_swap(
&self,
data: &ReceiveSyncData,
sync_state: SyncState,
last_updated_at: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_updated_at) = last_updated_at {
Self::check_commit_update(&tx, &sync_state.record_id, last_updated_at)?;
}

tx.execute(
"
UPDATE receive_swaps
Expand Down Expand Up @@ -325,10 +356,15 @@ impl Persister {
&self,
data: &SendSyncData,
sync_state: SyncState,
last_updated_at: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_updated_at) = last_updated_at {
Self::check_commit_update(&tx, &sync_state.record_id, last_updated_at)?;
}

tx.execute(
"
UPDATE send_swaps
Expand Down Expand Up @@ -373,10 +409,15 @@ impl Persister {
&self,
data: &ChainSyncData,
sync_state: SyncState,
last_updated_at: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_updated_at) = last_updated_at {
Self::check_commit_update(&tx, &sync_state.record_id, last_updated_at)?;
}

tx.execute(
"
UPDATE chain_swaps
Expand Down
19 changes: 13 additions & 6 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ impl SyncService {
&self,
decrypted_record: &DecryptedRecord,
sync_state: SyncState,
last_updated_at: Option<u32>,
) -> Result<()> {
match decrypted_record.data.clone() {
SyncData::Chain(chain_data) => {
self.persister.commit_chain_swap(&chain_data, sync_state)?
self.persister
.commit_chain_swap(&chain_data, sync_state, last_updated_at)?
}
SyncData::Send(send_data) => {
self.persister
.commit_send_swap(&send_data, sync_state, last_updated_at)?
}
SyncData::Receive(receive_data) => {
self.persister
.commit_receive_swap(&receive_data, sync_state, last_updated_at)?
}
SyncData::Send(send_data) => self.persister.commit_send_swap(&send_data, sync_state)?,
SyncData::Receive(receive_data) => self
.persister
.commit_receive_swap(&receive_data, sync_state)?,
}
Ok(())
}
Expand Down Expand Up @@ -179,8 +185,9 @@ impl SyncService {
.map(|state| state.is_local)
.unwrap_or(false),
};
let last_updated_at = maybe_outgoing_details.map(|details| details.updated_at);
unwrap_or_continue!(
self.commit_record(&decrypted_record, new_sync_state),
self.commit_record(&decrypted_record, new_sync_state, last_updated_at,),
"Could not commit record: {}"
);
}
Expand Down

0 comments on commit 9a63135

Please sign in to comment.