Skip to content

Commit

Permalink
feat: add pull method
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 16, 2024
1 parent cedef81 commit 7fa6ee2
Show file tree
Hide file tree
Showing 6 changed files with 641 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions lib/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ lwk_wollet = { git = "https://github.com/dangeross/lwk", branch = "savage-full-s
#lwk_wollet = "0.7.0"
rusqlite = { version = "0.31", features = ["backup", "bundled"] }
rusqlite_migration = "1.0"
sdk-common = { git = "https://github.com/breez/breez-sdk", rev = "441a9fd50c32098b2887e960c8a4bcc5956da1af", features = ["liquid"]}
sdk-common = { git = "https://github.com/breez/breez-sdk", rev = "441a9fd50c32098b2887e960c8a4bcc5956da1af", features = [
"liquid",
] }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.116"
strum = "0.25"
Expand All @@ -53,9 +55,10 @@ tonic = { version = "0.12.3", features = ["tls"] }
prost = "0.13.3"
ecies = "0.2.7"
uuid = { version = "1.8.0", features = ["v4"] }
semver = "1.0.23"
lazy_static = "1.5.0"

[dev-dependencies]
lazy_static = "1.5.0"
paste = "1.0.15"
tempdir = "0.3.7"
uuid = { version = "1.8.0", features = ["v4"] }
Expand Down
338 changes: 336 additions & 2 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use std::collections::HashMap;
use anyhow::Result;
use rusqlite::{named_params, Connection, OptionalExtension, Row, Statement, TransactionBehavior};

use super::Persister;
use crate::sync::model::{sync::Record, SyncOutgoingDetails, SyncSettings, SyncState};
use super::{PaymentState, Persister};
use crate::sync::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData},
sync::Record,
RecordType, SyncOutgoingDetails, SyncSettings, SyncState,
};

impl Persister {
fn select_sync_state_query(where_clauses: Vec<String>) -> String {
Expand Down Expand Up @@ -309,4 +313,334 @@ impl Persister {

Ok(())
}

fn check_commit_update(con: &Connection, record_id: &str, last_commit_time: u32) -> Result<()> {
let query =
Self::select_sync_outgoing_details_query(vec!["record_id = :record_id".to_string()]);
let mut stmt = con.prepare(&query)?;
let mut rows = stmt.query(named_params! {
":record_id": record_id,
})?;

if let Some(row) = rows.next()? {
let sync_outgoing_details = Self::sql_row_to_sync_outgoing_details(row)?;

if sync_outgoing_details.commit_time > last_commit_time {
return Err(anyhow::anyhow!("Record has been updated while pulling"));
}
}

Ok(())
}

pub(crate) fn commit_receive_swap(
&self,
data: &ReceiveSyncData,
sync_state: SyncState,
is_update: bool,
last_commit_time: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_commit_time) = last_commit_time {
Self::check_commit_update(&tx, &sync_state.record_id, last_commit_time)?;
}

let params = named_params! {
":id": &data.swap_id,
":invoice": &data.invoice,
":preimage": &data.preimage,
":create_response_json": &data.create_response_json,
":claim_fees_sat": &data.claim_fees_sat,
":claim_private_key": &data.claim_private_key,
":payer_amount_sat": &data.payer_amount_sat,
":receiver_amount_sat": &data.receiver_amount_sat,
":mrh_address": &data.mrh_address,
":mrh_script_pubkey": &data.mrh_script_pubkey,
":created_at": &data.created_at,
":payment_hash": &data.payment_hash,
":description": &data.description,
};
match is_update {
true => {
tx.execute(
"
UPDATE receive_swaps
SET
invoice = :invoice,
preimage = :preimage,
create_response_json = :create_response_json,
claim_fees_sat = :claim_fees_sat,
claim_private_key = :claim_private_key,
payer_amount_sat = :payer_amount_sat,
receiver_amount_sat = :receiver_amount_sat,
mrh_address = :mrh_address,
mrh_script_pubkey = :mrh_script_pubkey,
created_at = :created_at,
payment_hash = :payment_hash,
description = :description
WHERE id = :id",
params,
)?;
}
false => {
tx.execute(
"
INSERT INTO receive_swaps(
id,
invoice,
preimage,
create_response_json,
claim_fees_sat,
claim_private_key,
payer_amount_sat,
receiver_amount_sat,
mrh_address,
mrh_script_pubkey,
created_at,
payment_hash,
description,
state
)
VALUES(
:id,
:invoice,
:preimage,
:create_response_json,
:claim_fees_sat,
:claim_private_key,
:payer_amount_sat,
:receiver_amount_sat,
:mrh_address,
:mrh_script_pubkey,
:created_at,
:payment_hash,
:description,
:state
)",
[params, &[(":state", &PaymentState::Created)]]
.concat()
.as_slice(),
)?;
}
}

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}

pub(crate) fn commit_send_swap(
&self,
data: &SendSyncData,
sync_state: SyncState,
is_update: bool,
last_commit_time: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_commit_time) = last_commit_time {
Self::check_commit_update(&tx, &sync_state.record_id, last_commit_time)?;
}

let params = named_params! {
":id": &data.swap_id,
":invoice": &data.invoice,
":create_response_json": &data.create_response_json,
":refund_private_key": &data.refund_private_key,
":payer_amount_sat": &data.payer_amount_sat,
":receiver_amount_sat": &data.receiver_amount_sat,
":created_at": &data.created_at,
":preimage": &data.preimage,
":payment_hash": &data.payment_hash,
":description": &data.description,
};
match is_update {
true => {
tx.execute(
"
UPDATE send_swaps
SET
invoice = :invoice,
create_response_json = :create_response_json,
refund_private_key = :refund_private_key,
payer_amount_sat = :payer_amount_sat,
receiver_amount_sat = :receiver_amount_sat,
created_at = :created_at,
preimage = :preimage,
payment_hash = :payment_hash,
description = :description
WHERE id = :id",
params,
)?;
}
false => {
tx.execute(
"
INSERT INTO send_swaps(
id,
invoice,
create_response_json,
refund_private_key,
payer_amount_sat,
receiver_amount_sat,
created_at,
preimage,
payment_hash,
description,
state
)
VALUES(
:id,
:invoice,
:create_response_json,
:refund_private_key,
:payer_amount_sat,
:receiver_amount_sat,
:created_at,
:preimage,
:payment_hash,
:description,
:state
)",
[params, &[(":state", &PaymentState::Created)]]
.concat()
.as_slice(),
)?;
}
}

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}

pub(crate) fn commit_chain_swap(
&self,
data: &ChainSyncData,
sync_state: SyncState,
is_update: bool,
last_commit_time: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_commit_time) = last_commit_time {
Self::check_commit_update(&tx, &sync_state.record_id, last_commit_time)?;
}

let params = named_params! {
":id": &data.swap_id,
":preimage": &data.preimage,
":create_response_json": &data.create_response_json,
":direction": &data.direction,
":lockup_address": &data.lockup_address,
":claim_fees_sat": &data.claim_fees_sat,
":claim_private_key": &data.claim_private_key,
":refund_private_key": &data.refund_private_key,
":timeout_block_height": &data.timeout_block_height,
":payer_amount_sat": &data.payer_amount_sat,
":receiver_amount_sat": &data.receiver_amount_sat,
":accept_zero_conf": &data.accept_zero_conf,
":created_at": &data.created_at,
":description": &data.description,
":claim_address": &data.claim_address,
};
match is_update {
true => {
tx.execute(
"
UPDATE chain_swaps
SET
preimage = :preimage,
create_response_json = :create_response_json,
direction = :direction,
lockup_address = :lockup_address,
claim_fees_sat = :claim_fees_sat,
claim_private_key = :claim_private_key,
refund_private_key = :refund_private_key,
timeout_block_height = :timeout_block_height,
payer_amount_sat = :payer_amount_sat,
receiver_amount_sat = :receiver_amount_sat,
accept_zero_conf = :accept_zero_conf,
created_at = :created_at,
description = :description,
claim_address = :claim_address
WHERE id = :id",
params,
)?;
}
false => {
tx.execute(
"
INSERT INTO chain_swaps(
id,
preimage,
create_response_json,
direction,
lockup_address,
claim_fees_sat,
claim_private_key,
refund_private_key,
timeout_block_height,
payer_amount_sat,
receiver_amount_sat,
accept_zero_conf,
created_at,
description,
claim_address,
state
)
VALUES(
:id,
:preimage,
:create_response_json,
:direction,
:lockup_address,
:claim_fees_sat,
:claim_private_key,
:refund_private_key,
:timeout_block_height,
:payer_amount_sat,
:receiver_amount_sat,
:accept_zero_conf,
:created_at,
:description,
:claim_address,
:state
)",
[params, &[(":state", &PaymentState::Created)]]
.concat()
.as_slice(),
)?;
}
}

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}
}
Loading

0 comments on commit 7fa6ee2

Please sign in to comment.