Skip to content

Commit

Permalink
ensure htlc settlement
Browse files Browse the repository at this point in the history
If paying fails, we have to make sure all payment parts are failed back
before failing back the htlcs to the sender.
  • Loading branch information
JssDWt committed Jul 1, 2024
1 parent 00be234 commit 295b054
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 33 deletions.
108 changes: 80 additions & 28 deletions src/payment_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use cln_rpc::{
model::{
requests::{PayRequest, WaitsendpayRequest},
requests::{ListsendpaysRequest, ListsendpaysStatus, PayRequest, WaitsendpayRequest},
responses::PayStatus,
},
primitives::Amount,
};
use futures::{stream::FuturesUnordered, StreamExt};
#[cfg(test)]
use mockall::automock;
use secp256k1::hashes::sha256;
use tracing::{instrument, warn};
use tokio::join;
use tracing::{debug, instrument, warn};

use crate::rpc::{ClnRpc, Rpc};
use crate::rpc::{ClnRpc, Rpc, RpcError};

/// The `PaymentProvider` trait exposes a `pay` method.
#[cfg_attr(test, automock)]
Expand Down Expand Up @@ -56,7 +58,7 @@ impl PaymentProvider for PayPaymentProvider {
let label = format!("trampoline-{}-{}", req.bolt11, now);

// TODO: extract the failure reason here?
let resp = self
let resp = match self
.rpc
.pay(&PayRequest {
amount_msat: req.amount_msat.map(Amount::from_msat),
Expand All @@ -73,7 +75,17 @@ impl PaymentProvider for PayPaymentProvider {
maxfee: Some(Amount::from_msat(req.max_fee_msat)),
description: None,
})
.await?;
.await
{
Ok(resp) => resp,
Err(e) => {
debug!("pay returned error {:?}", e);
return match self.wait_payment(req.payment_hash).await? {
Some(preimage) => Ok(preimage),
None => Err(anyhow!("payment failed")),
};
}
};

match resp.status {
// Note there is no need to check warning_partial_completion on
Expand Down Expand Up @@ -104,33 +116,73 @@ impl PaymentProvider for PayPaymentProvider {
/// `wait_payment` waits until a payment is fully resolved and no htlcs for
/// the given payment hash are outgoing anymore.
async fn wait_payment(&self, payment_hash: sha256::Hash) -> Result<Option<Vec<u8>>> {
if let Some(preimage) = match self
.rpc
.waitsendpay(&WaitsendpayRequest {
let completed_req = ListsendpaysRequest {
payment_hash: Some(payment_hash),
bolt11: None,
index: None,
limit: None,
start: None,
status: Some(ListsendpaysStatus::COMPLETE),
};
let completed_payments_fut = self.rpc.listsendpays(&completed_req);
let pending_req = ListsendpaysRequest {
payment_hash: Some(payment_hash),
bolt11: None,
index: None,
limit: None,
start: None,
status: Some(ListsendpaysStatus::PENDING),
};
let pending_payments_fut = self.rpc.listsendpays(&pending_req);
let (completed_payments, pending_payments) =
join!(completed_payments_fut, pending_payments_fut);
let (completed_payments, pending_payments) = (completed_payments?, pending_payments?);

if let Some(preimage) = completed_payments
.payments
.iter()
.filter_map(|p| p.payment_preimage)
.next()
{
return Ok(Some(preimage.to_vec()));
}

let mut tasks = FuturesUnordered::new();

for payment in pending_payments.payments {
tasks.push(self.rpc.waitsendpay(WaitsendpayRequest {
groupid: Some(payment.groupid),
partid: payment.partid,
payment_hash,
timeout: None,
partid: None,
groupid: None,
})
.await
{
Ok(resp) => resp.payment_preimage.map(|p| p.to_vec()),
Err(e) => match e {
crate::rpc::RpcError::Rpc(rpc) => {
if let Some(code) = rpc.code {
match code {
200 => return Err(anyhow!("timeout")),
_ => None,
}
} else {
return Err(rpc.into());
}));
}

while let Some(res) = tasks.next().await {
match res {
Ok(res) => {
if let Some(preimage) = res.payment_preimage {
return Ok(Some(preimage.to_vec()));
}
}
crate::rpc::RpcError::General(e) => return Err(e),
},
} {
return Ok(Some(preimage));
};
Err(e) => match e {
RpcError::Rpc(e) => match e.code {
Some(code) => match code {
-1 => return Err(anyhow!("{:?}", e)),
200 => return Err(anyhow!("timeout")),
202 => {}
203 => {}
204 => {}
208 => {}
209 => {}
_ => return Err(anyhow!("unknown rpc error code {}: {:?}", code, e)),
},
None => return Err(anyhow!("unknown rpc error without code {:?}", e)),
},
RpcError::General(e) => return Err(e),
},
}
}

Ok(None)
}
Expand Down
23 changes: 18 additions & 5 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use anyhow::Result;
use async_trait::async_trait;
use cln_rpc::model::{
requests::{
DatastoreRequest, GetinfoRequest, ListdatastoreRequest, PayRequest, WaitsendpayRequest,
DatastoreRequest, GetinfoRequest, ListdatastoreRequest, ListsendpaysRequest, PayRequest,
WaitsendpayRequest,
},
responses::{
DatastoreResponse, GetinfoResponse, ListdatastoreResponse, PayResponse, WaitsendpayResponse,
DatastoreResponse, GetinfoResponse, ListdatastoreResponse, ListsendpaysResponse,
PayResponse, WaitsendpayResponse,
},
};
#[cfg(test)]
Expand All @@ -19,10 +21,14 @@ pub trait ClnRpc {
&self,
request: &ListdatastoreRequest,
) -> Result<ListdatastoreResponse, RpcError>;
async fn listsendpays(
&self,
request: &ListsendpaysRequest,
) -> Result<ListsendpaysResponse, RpcError>;
async fn pay(&self, request: &PayRequest) -> Result<PayResponse, RpcError>;
async fn waitsendpay(
&self,
request: &WaitsendpayRequest,
request: WaitsendpayRequest,
) -> Result<WaitsendpayResponse, RpcError>;
}

Expand Down Expand Up @@ -93,14 +99,21 @@ impl ClnRpc for Rpc {
Ok(self.rpc().await?.call_typed(request).await?)
}

async fn listsendpays(
&self,
request: &ListsendpaysRequest,
) -> Result<ListsendpaysResponse, RpcError> {
Ok(self.rpc().await?.call_typed(request).await?)
}

async fn pay(&self, request: &PayRequest) -> Result<PayResponse, RpcError> {
Ok(self.rpc().await?.call_typed(request).await?)
}

async fn waitsendpay(
&self,
request: &WaitsendpayRequest,
request: WaitsendpayRequest,
) -> Result<WaitsendpayResponse, RpcError> {
Ok(self.rpc().await?.call_typed(request).await?)
Ok(self.rpc().await?.call_typed(&request).await?)
}
}

0 comments on commit 295b054

Please sign in to comment.