Skip to content

Commit

Permalink
Add singleton AsyncPaymentTriggerer to monitor when the receiver of…
Browse files Browse the repository at this point in the history
… an async payment reconnects (#2491)

This actor spawns only a single `PeerReadyNotifier` actor for each peer that is watched to prevent multiple actors from redundantly polling for when the same peer reconnects.

This actor can be extended to watch for onion messages from async payment receivers, instead of only watching for local peer re-connections.
  • Loading branch information
remyers authored Jan 2, 2023
1 parent f0d12eb commit 33ca262
Show file tree
Hide file tree
Showing 11 changed files with 404 additions and 62 deletions.
6 changes: 4 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Peer, Server, Switchboard}
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
Expand Down Expand Up @@ -296,7 +296,8 @@ class Setup(val datadir: File,
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register), "payment-handler", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future
Expand All @@ -311,6 +312,7 @@ class Setup(val datadir: File,
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)), "payment-initiator", SupervisorStrategy.Restart))
_ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart))

_ = triggerer ! AsyncPaymentTriggerer.Start(switchboard.toTyped)
balanceActor = system.spawn(BalanceActor(nodeParams.db, bitcoinClient, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")

postman = system.spawn(Behaviors.supervise(Postman(switchboard.toTyped)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2022 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.payment.relay

import akka.actor.typed.ActorRef.ActorRefOps
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.io.PeerReadyNotifier.{NotifyWhenPeerReady, PeerUnavailable}
import fr.acinq.eclair.io.{PeerReadyNotifier, Switchboard}
import fr.acinq.eclair.payment.relay.AsyncPaymentTriggerer.Command
import fr.acinq.eclair.{BlockHeight, Logs}

/**
* This actor waits for an async payment receiver to become ready to receive a payment or for a block timeout to expire.
* If the receiver of the payment is a connected peer, spawn a PeerReadyNotifier actor.
*/
object AsyncPaymentTriggerer {
// @formatter:off
sealed trait Command
case class Start(switchboard: ActorRef[Switchboard.GetPeerInfo]) extends Command
case class Watch(replyTo: ActorRef[Result], remoteNodeId: PublicKey, paymentHash: ByteVector32, timeout: BlockHeight) extends Command
case class Cancel(paymentHash: ByteVector32) extends Command
private[relay] case class NotifierStopped(remoteNodeId: PublicKey) extends Command
private case class WrappedPeerReadyResult(result: PeerReadyNotifier.Result) extends Command
private case class WrappedCurrentBlockHeight(currentBlockHeight: CurrentBlockHeight) extends Command

sealed trait Result
case object AsyncPaymentTriggered extends Result
case object AsyncPaymentTimeout extends Result
case object AsyncPaymentCanceled extends Result
// @formatter:on

def apply(): Behavior[Command] = Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(category_opt = Some(LogCategory.PAYMENT))) {
Behaviors.receiveMessagePartial {
case Start(switchboard) => new AsyncPaymentTriggerer(switchboard, context).start()
}
}
}
}

private class AsyncPaymentTriggerer(switchboard: ActorRef[Switchboard.GetPeerInfo], context: ActorContext[Command]) {

import AsyncPaymentTriggerer._

case class Payment(replyTo: ActorRef[Result], timeout: BlockHeight, paymentHash: ByteVector32) {
def expired(currentBlockHeight: BlockHeight): Boolean = timeout <= currentBlockHeight
}

case class PeerPayments(notifier: ActorRef[PeerReadyNotifier.Command], pendingPayments: Set[Payment]) {
def update(currentBlockHeight: BlockHeight): Option[PeerPayments] = {
val expiredPayments = pendingPayments.filter(_.expired(currentBlockHeight))
expiredPayments.foreach(e => e.replyTo ! AsyncPaymentTimeout)
updatePaymentsOrStop(pendingPayments.removedAll(expiredPayments))
}

def cancel(paymentHash: ByteVector32): Option[PeerPayments] = {
val canceledPayments = pendingPayments.filter(_.paymentHash == paymentHash)
canceledPayments.foreach(_.replyTo ! AsyncPaymentCanceled)
updatePaymentsOrStop(pendingPayments.removedAll(canceledPayments))
}

private def updatePaymentsOrStop(pendingPayments: Set[Payment]): Option[PeerPayments] = {
if (pendingPayments.isEmpty) {
context.stop(notifier)
None
} else {
Some(PeerPayments(notifier, pendingPayments))
}
}

def trigger(): Unit = pendingPayments.foreach(e => e.replyTo ! AsyncPaymentTriggered)
def cancel(): Unit = pendingPayments.foreach(e => e.replyTo ! AsyncPaymentCanceled)
}

def start(): Behavior[Command] = {
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](WrappedCurrentBlockHeight))
watching(Map.empty)
}

private def watching(peers: Map[PublicKey, PeerPayments]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case Watch(replyTo, remoteNodeId, paymentHash, timeout) =>
peers.get(remoteNodeId) match {
case None =>
val notifier = context.spawnAnonymous(Behaviors.supervise(PeerReadyNotifier(remoteNodeId, switchboard, timeout_opt = None)).onFailure(SupervisorStrategy.stop))
context.watchWith(notifier, NotifierStopped(remoteNodeId))
notifier ! NotifyWhenPeerReady(context.messageAdapter[PeerReadyNotifier.Result](WrappedPeerReadyResult))
val peer = PeerPayments(notifier, Set(Payment(replyTo, timeout, paymentHash)))
watching(peers + (remoteNodeId -> peer))
case Some(peer) =>
val peer1 = PeerPayments(peer.notifier, peer.pendingPayments + Payment(replyTo, timeout, paymentHash))
watching(peers + (remoteNodeId -> peer1))
}
case Cancel(paymentHash) =>
val peers1 = peers.flatMap {
case (remoteNodeId, peer) => peer.cancel(paymentHash).map(peer1 => remoteNodeId -> peer1)
}
watching(peers1)
case WrappedCurrentBlockHeight(CurrentBlockHeight(currentBlockHeight)) =>
val peers1 = peers.flatMap {
case (remoteNodeId, peer) => peer.update(currentBlockHeight).map(peer1 => remoteNodeId -> peer1)
}
watching(peers1)
case WrappedPeerReadyResult(result) => result match {
case PeerReadyNotifier.PeerReady(remoteNodeId, _) =>
// notify watcher that destination peer is ready to receive async payments; PeerReadyNotifier will stop itself
peers.get(remoteNodeId).foreach(_.trigger())
watching(peers - remoteNodeId)
case PeerUnavailable(_) =>
// only use PeerReadyNotifier to signal when the peer connects, not for timeouts
Behaviors.same
}
case NotifierStopped(remoteNodeId) =>
peers.get(remoteNodeId) match {
case None => Behaviors.same
case Some(peer) =>
context.log.error(s"PeerReadyNotifier stopped unexpectedly while watching node $remoteNodeId.")
peer.cancel()
watching(peers - remoteNodeId)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package fr.acinq.eclair.payment.relay

import akka.actor.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.{TypedActorContextOps, TypedActorRefOps}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.{ActorRef, typed}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC}
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.payment.IncomingPaymentPacket.NodeRelayPacket
Expand All @@ -40,7 +39,7 @@ import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, UInt64, nodeFee, randomBytes32}
import fr.acinq.eclair.{CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, UInt64, nodeFee, randomBytes32}

import java.util.UUID
import scala.collection.immutable.Queue
Expand All @@ -55,15 +54,13 @@ object NodeRelay {
sealed trait Command
case class Relay(nodeRelayPacket: IncomingPaymentPacket.NodeRelayPacket) extends Command
case object Stop extends Command
case object RelayAsyncPayment extends Command
case object CancelAsyncPayment extends Command
private case class WrappedMultiPartExtraPaymentReceived(mppExtraReceived: MultiPartPaymentFSM.ExtraPaymentReceived[HtlcPart]) extends Command
private case class WrappedMultiPartPaymentFailed(mppFailed: MultiPartPaymentFSM.MultiPartPaymentFailed) extends Command
private case class WrappedMultiPartPaymentSucceeded(mppSucceeded: MultiPartPaymentFSM.MultiPartPaymentSucceeded) extends Command
private case class WrappedPreimageReceived(preimageReceived: PreimageReceived) extends Command
private case class WrappedPaymentSent(paymentSent: PaymentSent) extends Command
private case class WrappedPaymentFailed(paymentFailed: PaymentFailed) extends Command
private case class WrappedCurrentBlockHeight(currentBlockHeight: BlockHeight) extends Command
private[relay] case class WrappedPeerReadyResult(result: AsyncPaymentTriggerer.Result) extends Command
// @formatter:on

trait OutgoingPaymentFactory {
Expand All @@ -87,7 +84,8 @@ object NodeRelay {
register: ActorRef,
relayId: UUID,
nodeRelayPacket: NodeRelayPacket,
outgoingPaymentFactory: OutgoingPaymentFactory): Behavior[Command] =
outgoingPaymentFactory: OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command]): Behavior[Command] =
Behaviors.setup { context =>
val paymentHash = nodeRelayPacket.add.paymentHash
val totalAmountIn = nodeRelayPacket.outerPayload.totalAmount
Expand All @@ -102,7 +100,7 @@ object NodeRelay {
context.messageAdapter[MultiPartPaymentFSM.MultiPartPaymentSucceeded](WrappedMultiPartPaymentSucceeded)
}.toClassic
val incomingPaymentHandler = context.actorOf(MultiPartPaymentFSM.props(nodeParams, paymentHash, totalAmountIn, mppFsmAdapters))
new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory)
new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer)
.receiving(Queue.empty, nodeRelayPacket.innerPayload, nodeRelayPacket.nextPacket, incomingPaymentHandler)
}
}
Expand Down Expand Up @@ -170,7 +168,8 @@ class NodeRelay private(nodeParams: NodeParams,
paymentHash: ByteVector32,
paymentSecret: ByteVector32,
context: ActorContext[NodeRelay.Command],
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory) {
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command]) {

import NodeRelay._

Expand Down Expand Up @@ -214,31 +213,24 @@ class NodeRelay private(nodeParams: NodeParams,

private def waitForTrigger(upstream: Upstream.Trampoline, nextPayload: IntermediatePayload.NodeRelay.Standard, nextPacket: OnionRoutingPacket): Behavior[Command] = {
context.log.info(s"waiting for async payment to trigger before relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv}, asyncPaymentsParams=${nodeParams.relayParams.asyncPaymentsParams})")
// a trigger must be received before waiting more than `holdTimeoutBlocks`
val timeoutBlock: BlockHeight = nodeParams.currentBlockHeight + nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks
// a trigger must be received `cancelSafetyBeforeTimeoutBlocks` before the incoming payment cltv expiry
val safetyBlock: BlockHeight = (upstream.expiryIn - nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout).blockHeight
val messageAdapter = context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))
context.system.eventStream ! EventStream.Subscribe[CurrentBlockHeight](messageAdapter)
val timeoutBlock = nodeParams.currentBlockHeight + nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks
val safetyBlock = (upstream.expiryIn - nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout).blockHeight
// wait for notification until which ever occurs first: the hold timeout block or the safety block
val notifierTimeout = Seq(timeoutBlock, safetyBlock).min
val peerReadyResultAdapter = context.messageAdapter[AsyncPaymentTriggerer.Result](WrappedPeerReadyResult)

// TODO: send the WaitingToRelayPayment message to an actor that watches for the payment receiver to come back online before sending the RelayAsyncPayment message
triggerer ! AsyncPaymentTriggerer.Watch(peerReadyResultAdapter, nextPayload.outgoingNodeId, paymentHash, notifierTimeout)
context.system.eventStream ! EventStream.Publish(WaitingToRelayPayment(nextPayload.outgoingNodeId, paymentHash))
Behaviors.receiveMessagePartial {
case WrappedCurrentBlockHeight(blockHeight) if blockHeight >= safetyBlock =>
context.log.warn(s"rejecting async payment at block $blockHeight; was not triggered ${nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout} safety blocks before upstream cltv expiry at ${upstream.expiryIn}")
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentTimeout) =>
context.log.warn("rejecting async payment; was not triggered before block {}", notifierTimeout)
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case WrappedCurrentBlockHeight(blockHeight) if blockHeight >= timeoutBlock =>
context.log.warn(s"rejecting async payment at block $blockHeight; was not triggered after waiting ${nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks} blocks")
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case WrappedCurrentBlockHeight(_) =>
Behaviors.same
case CancelAsyncPayment =>
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentCanceled) =>
context.log.warn(s"payment sender canceled a waiting async payment")
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case RelayAsyncPayment =>
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentTriggered) =>
doSend(upstream, nextPayload, nextPacket)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair.payment.relay

import akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.ByteVector32
Expand Down Expand Up @@ -56,7 +57,7 @@ object NodeRelayer {
* NB: the payment secret used here is different from the invoice's payment secret and ensures we can
* group together HTLCs that the previous trampoline node sent in the same MPP.
*/
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) {
Behaviors.receiveMessage {
Expand All @@ -71,15 +72,15 @@ object NodeRelayer {
case None =>
val relayId = UUID.randomUUID()
context.log.debug(s"spawning a new handler with relayId=$relayId")
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory), relayId.toString)
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer), relayId.toString)
context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId)
handler ! NodeRelay.Relay(nodeRelayPacket)
apply(nodeParams, register, outgoingPaymentFactory, children + (childKey -> handler))
apply(nodeParams, register, outgoingPaymentFactory, triggerer, children + (childKey -> handler))
}
case RelayComplete(childHandler, paymentHash, paymentSecret) =>
// we do a back-and-forth between parent and child before stopping the child to prevent a race condition
childHandler ! NodeRelay.Stop
apply(nodeParams, register, outgoingPaymentFactory, children - PaymentKey(paymentHash, paymentSecret))
apply(nodeParams, register, outgoingPaymentFactory, triggerer, children - PaymentKey(paymentHash, paymentSecret))
case GetPendingPayments(replyTo) =>
replyTo ! children
Behaviors.same
Expand Down
Loading

0 comments on commit 33ca262

Please sign in to comment.