Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge pending incoming payments on disconnection #583

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
// We also need to filter out htlcs that we already settled and signed (the settlement messages are being retransmitted).
val alreadySettled = commitments1.changes.localChanges.signed.filterIsInstance<HtlcSettlementMessage>().map { it.id }.toSet()
val htlcsToReprocess = commitments1.latest.remoteCommit.spec.htlcs.outgoings().filter { !alreadySettled.contains(it.id) }
logger.debug { "re-processing signed IN: $htlcsToReprocess" }
logger.info { "re-processing signed incoming HTLCs: ${htlcsToReprocess.map { it.id }.joinToString(", ")}" }
sendQueue.addAll(htlcsToReprocess.map { ChannelAction.ProcessIncomingHtlc(it) })

return Pair(commitments1, sendQueue)
Expand Down
7 changes: 6 additions & 1 deletion src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,12 @@ class Peer(
_channels = _channels + (key to state1)
processActions(key, peerConnection, actions)
}
incomingPaymentHandler.purgePayToOpenRequests()
// We must purge pending incoming payments: incoming HTLCs that aren't settled yet will be
// re-processed on reconnection, and we must not keep HTLCs pending in the payment handler since
// another instance of the application may resolve them, which would lead to inconsistent
// payment handler state (whereas the channel state is kept consistent thanks to the encrypted
// channel backup).
incomingPaymentHandler.purgePendingPayments()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import fr.acinq.lightning.Lightning.randomBytes32
import fr.acinq.lightning.LiquidityEvents
import fr.acinq.lightning.MilliSatoshi
import fr.acinq.lightning.NodeParams
import fr.acinq.lightning.channel.*
import fr.acinq.lightning.channel.ChannelAction
import fr.acinq.lightning.channel.ChannelCommand
import fr.acinq.lightning.channel.Origin
import fr.acinq.lightning.db.IncomingPayment
import fr.acinq.lightning.db.IncomingPaymentsDb
import fr.acinq.lightning.io.PayToOpenResponseCommand
Expand All @@ -32,12 +32,14 @@ data class HtlcPart(val htlc: UpdateAddHtlc, override val finalPayload: PaymentO
override val amount: MilliSatoshi = htlc.amountMsat
override val totalAmount: MilliSatoshi = finalPayload.totalAmount
override val paymentHash: ByteVector32 = htlc.paymentHash
override fun toString(): String = "htlc(channelId=${htlc.channelId},id=${htlc.id})"
}

data class PayToOpenPart(val payToOpenRequest: PayToOpenRequest, override val finalPayload: PaymentOnion.FinalPayload) : PaymentPart() {
override val amount: MilliSatoshi = payToOpenRequest.amountMsat
override val totalAmount: MilliSatoshi = finalPayload.totalAmount
override val paymentHash: ByteVector32 = payToOpenRequest.paymentHash
override fun toString(): String = "pay-to-open(amount=${payToOpenRequest.amountMsat})"
}

class IncomingPaymentHandler(val nodeParams: NodeParams, val db: IncomingPaymentsDb) {
Expand Down Expand Up @@ -397,19 +399,19 @@ class IncomingPaymentHandler(val nodeParams: NodeParams, val db: IncomingPayment
*/
suspend fun purgeExpiredPayments(fromCreatedAt: Long = 0, toCreatedAt: Long = currentTimestampMillis()): Int {
return db.listExpiredPayments(fromCreatedAt, toCreatedAt).count {
logger.info { "purging unpaid expired payment for paymentHash=${it.paymentHash} from DB" }
db.removeIncomingPayment(it.paymentHash)
}
}

/**
* If we are disconnected, the LSP will forget pending pay-to-open requests. We need to do the same otherwise we
* will accept outdated ones.
* If we are disconnected, we must forget pending payment parts.
* Pay-to-open requests will be forgotten by the LSP, so we need to do the same otherwise we will accept outdated ones.
* Offered HTLCs that haven't been resolved will be re-processed when we reconnect.
*/
fun purgePayToOpenRequests() {
val valuesToReplace = pending.mapValues { entry -> entry.value.copy(parts = entry.value.parts.filter { it !is PayToOpenPart }.toSet()) }
pending.plusAssign(valuesToReplace)
val keysToRemove = pending.filterValues { it.parts.isEmpty() }.keys
pending.minusAssign(keysToRemove)
fun purgePendingPayments() {
pending.forEach { (paymentHash, pending) -> logger.info { "purging pending incoming payments for paymentHash=$paymentHash: ${pending.parts.map { it.toString() }.joinToString(", ")}" } }
pending.clear()
}

companion object {
Expand Down
106 changes: 104 additions & 2 deletions src/commonTest/kotlin/fr/acinq/lightning/io/peer/PeerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class PeerTest : LightningTestSuite() {
}

@Test
fun `payment test between two phoenix nodes -- manual mode`() = runSuspendTest {
fun `payment between two nodes -- manual mode`() = runSuspendTest {
val (alice0, bob0) = TestsHelper.reachNormal()
val nodeParams = Pair(alice0.staticParams.nodeParams, bob0.staticParams.nodeParams)
val walletParams = Pair(
Expand Down Expand Up @@ -547,7 +547,108 @@ class PeerTest : LightningTestSuite() {
}

@Test
fun `payment test between two phoenix nodes -- automated messaging`() = runSuspendTest {
fun `payment between two nodes -- with disconnection`() = runSuspendTest {
// We create two channels between Alice and Bob to ensure that the payment is split in two parts.
val (aliceChan1, bobChan1) = TestsHelper.reachNormal(aliceFundingAmount = 100_000.sat, bobFundingAmount = 100_000.sat, alicePushAmount = 0.msat, bobPushAmount = 0.msat)
val (aliceChan2, bobChan2) = TestsHelper.reachNormal(aliceFundingAmount = 100_000.sat, bobFundingAmount = 100_000.sat, alicePushAmount = 0.msat, bobPushAmount = 0.msat)
val nodeParams = Pair(aliceChan1.staticParams.nodeParams, bobChan1.staticParams.nodeParams)
val walletParams = Pair(
// Alice must declare Bob as her trampoline node to enable direct payments.
TestConstants.Alice.walletParams.copy(trampolineNode = NodeUri(nodeParams.second.nodeId, "bob.com", 9735)),
TestConstants.Bob.walletParams
)
// Bob sends a multipart payment to Alice.
val (alice, bob, alice2bob1, bob2alice1) = newPeers(this, nodeParams, walletParams, listOf(aliceChan1 to bobChan1, aliceChan2 to bobChan2), automateMessaging = false)
val invoice = alice.createInvoice(randomBytes32(), 150_000_000.msat, Either.Left("test invoice"), null)
bob.send(SendPayment(UUID.randomUUID(), invoice.amount!!, alice.nodeParams.nodeId, invoice))

// Bob sends one HTLC on each channel.
val htlcs = listOf(
bob2alice1.expect<UpdateAddHtlc>(),
bob2alice1.expect<UpdateAddHtlc>(),
)
assertEquals(2, htlcs.map { it.channelId }.toSet().size)
val commitSigsBob = listOf(
bob2alice1.expect<CommitSig>(),
bob2alice1.expect<CommitSig>(),
)

// We cross-sign the HTLC on the first channel.
run {
val htlc = htlcs.find { it.channelId == aliceChan1.channelId }
assertNotNull(htlc)
alice.forward(htlc)
val commitSigBob = commitSigsBob.find { it.channelId == aliceChan1.channelId }
assertNotNull(commitSigBob)
alice.forward(commitSigBob)
bob.forward(alice2bob1.expect<RevokeAndAck>())
bob.forward(alice2bob1.expect<CommitSig>())
alice.forward(bob2alice1.expect<RevokeAndAck>())
}
// We start cross-signing the HTLC on the second channel.
run {
val htlc = htlcs.find { it.channelId == aliceChan2.channelId }
assertNotNull(htlc)
alice.forward(htlc)
val commitSigBob = commitSigsBob.find { it.channelId == aliceChan2.channelId }
assertNotNull(commitSigBob)
alice.forward(commitSigBob)
bob.forward(alice2bob1.expect<RevokeAndAck>())
bob.forward(alice2bob1.expect<CommitSig>())
bob2alice1.expect<RevokeAndAck>() // Alice doesn't receive Bob's revocation.
}

// We disconnect before Alice receives Bob's revocation on the second channel.
alice.disconnect()
alice.send(Disconnected)
bob.disconnect()
bob.send(Disconnected)

// On reconnection, Bob retransmits its revocation.
val (_, _, alice2bob2, bob2alice2) = connect(this, connectionId = 1, alice, bob, channelsCount = 2, expectChannelReady = false, automateMessaging = false)
alice.forward(bob2alice2.expect<RevokeAndAck>(), connectionId = 1)

// Alice has now received the complete payment and fulfills it.
val fulfills = listOf(
alice2bob2.expect<UpdateFulfillHtlc>(),
alice2bob2.expect<UpdateFulfillHtlc>(),
)
val commitSigsAlice = listOf(
alice2bob2.expect<CommitSig>(),
alice2bob2.expect<CommitSig>(),
)

// We fulfill the first HTLC.
run {
val fulfill = fulfills.find { it.channelId == aliceChan1.channelId }
assertNotNull(fulfill)
bob.forward(fulfill, connectionId = 1)
val commitSigAlice = commitSigsAlice.find { it.channelId == aliceChan1.channelId }
assertNotNull(commitSigAlice)
bob.forward(commitSigAlice, connectionId = 1)
alice.forward(bob2alice2.expect<RevokeAndAck>(), connectionId = 1)
alice.forward(bob2alice2.expect<CommitSig>(), connectionId = 1)
bob.forward(alice2bob2.expect<RevokeAndAck>(), connectionId = 1)
}

// We fulfill the second HTLC.
run {
val fulfill = fulfills.find { it.channelId == aliceChan2.channelId }
assertNotNull(fulfill)
bob.forward(fulfill, connectionId = 1)
val commitSigAlice = commitSigsAlice.find { it.channelId == aliceChan2.channelId }
assertNotNull(commitSigAlice)
bob.forward(commitSigAlice, connectionId = 1)
alice.forward(bob2alice2.expect<RevokeAndAck>(), connectionId = 1)
alice.forward(bob2alice2.expect<CommitSig>(), connectionId = 1)
bob.forward(alice2bob2.expect<RevokeAndAck>(), connectionId = 1)
}

assertEquals(invoice.amount, alice.db.payments.getIncomingPayment(invoice.paymentHash)?.received?.amount)
}

@Test
fun `payment between two nodes -- automated messaging`() = runSuspendTest {
val (alice0, bob0) = TestsHelper.reachNormal()
val nodeParams = Pair(alice0.staticParams.nodeParams, bob0.staticParams.nodeParams)
val walletParams = Pair(
Expand All @@ -563,4 +664,5 @@ class PeerTest : LightningTestSuite() {

alice.expectState<Normal> { commitments.availableBalanceForReceive() > alice0.commitments.availableBalanceForReceive() }
}

}
Loading
Loading