Skip to content

Commit

Permalink
Add singleton to trigger forward of async payments when receiver reco…
Browse files Browse the repository at this point in the history
…nnects
  • Loading branch information
remyers committed Nov 16, 2022
1 parent 3d31786 commit a1bd1b4
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 51 deletions.
6 changes: 4 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Peer, Server, Switchboard}
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
Expand Down Expand Up @@ -296,7 +296,8 @@ class Setup(val datadir: File,
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register), "payment-handler", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "peer-watcher")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future
Expand All @@ -311,6 +312,7 @@ class Setup(val datadir: File,
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)), "payment-initiator", SupervisorStrategy.Restart))
_ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart))

_ = triggerer ! AsyncPaymentTriggerer.Start(switchboard.toTyped)
balanceActor = system.spawn(BalanceActor(nodeParams.db, bitcoinClient, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")

postman = system.spawn(Behaviors.supervise(Postman(switchboard.toTyped)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2022 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.payment.relay

import akka.actor.typed.ActorRef.ActorRefOps
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.BlockHeight
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.io.PeerReadyNotifier.NotifyWhenPeerReady
import fr.acinq.eclair.io.{PeerReadyNotifier, Switchboard}
import fr.acinq.eclair.payment.relay.AsyncPaymentTriggerer.Command

import scala.concurrent.duration.Duration

/**
* This actor waits for an async payment receiver to become ready to receive a payment or for a block timeout to expire.
* If the receiver of the payment is a connected peer, spawn a PeerReadyNotifier actor.
* TODO: If the receiver is not a connected peer, wait for a `ReceiverReady` onion message containing the specified paymentHash.
*/

object AsyncPaymentTriggerer {
// @formatter:off
sealed trait Command
case class Start(switchboard: ActorRef[Switchboard.GetPeerInfo]) extends Command
case class Watch(replyTo: ActorRef[Result], remoteNodeId: PublicKey, paymentHash: ByteVector32, timeout: BlockHeight) extends Command
private case class WrappedPeerReadyResult(result: PeerReadyNotifier.Result) extends Command
private case class WrappedCurrentBlockHeight(currentBlockHeight: CurrentBlockHeight) extends Command

sealed trait Result
case object AsyncPaymentTriggered extends Result
case object AsyncPaymentTimeout extends Result
// @formatter:on

def apply(): Behavior[Command] = Behaviors.setup { context =>
new AsyncPaymentTriggerer(context).initializing()
}
}

private class AsyncPaymentTriggerer(context: ActorContext[Command]) {
import AsyncPaymentTriggerer._

case class Watcher(replyTo: ActorRef[Result], timeout: BlockHeight, paymentHash: ByteVector32) {
def expired(currentBlockHeight: BlockHeight): Boolean = timeout <= currentBlockHeight
}
case class AsyncPaymentTrigger(notifier: ActorRef[PeerReadyNotifier.Command], watchers: Set[Watcher]) {
def update(currentBlockHeight: BlockHeight): Option[AsyncPaymentTrigger] = {
// notify watchers that timeout occurred before offline peer reconnected
val expiredWatchers = watchers.filter(_.expired(currentBlockHeight))
expiredWatchers.foreach(e => e.replyTo ! AsyncPaymentTimeout)
// remove timed out watchers from set
val updatedWatchers: Set[Watcher] = watchers.removedAll(expiredWatchers)
if (updatedWatchers.isEmpty) {
// stop notifier for offline peer when all watchers time out
context.stop(notifier)
None
} else {
Some(AsyncPaymentTrigger(notifier, updatedWatchers))
}
}
def trigger(): Unit = watchers.foreach(e => e.replyTo ! AsyncPaymentTriggered)
}

private def initializing(): Behavior[Command] = {
Behaviors.receiveMessage[Command] {
case Start(switchboard) => watching(switchboard, Map())
case m => context.log.error(s"received unhandled message ${m.getClass.getSimpleName} before Start received.")
Behaviors.same
}
}

private def watching(switchboard: ActorRef[Switchboard.GetPeerInfo], triggers: Map[PublicKey, AsyncPaymentTrigger]): Behavior[Command] = {
val peerReadyResultAdapter = context.messageAdapter[PeerReadyNotifier.Result](WrappedPeerReadyResult)
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](WrappedCurrentBlockHeight))

Behaviors.receiveMessage[Command] {
case Watch(replyTo, remoteNodeId, paymentHash, timeout) =>
triggers.get(remoteNodeId) match {
case None =>
// add a new trigger
val notifier = context.spawn(Behaviors.supervise(PeerReadyNotifier(remoteNodeId, switchboard, Left(Duration.Inf)))
.onFailure(SupervisorStrategy.restart), s"peer-ready-notifier-$remoteNodeId-$timeout")
notifier ! NotifyWhenPeerReady(peerReadyResultAdapter)
val newTrigger = AsyncPaymentTrigger(notifier, Set(Watcher(replyTo, timeout, paymentHash)))
watching(switchboard, triggers + (remoteNodeId -> newTrigger))
case Some(trigger) =>
// add a new watcher to an existing trigger
val updatedTrigger = AsyncPaymentTrigger(trigger.notifier, trigger.watchers + Watcher(replyTo, timeout, paymentHash))
watching(switchboard, triggers + (remoteNodeId -> updatedTrigger))
}
case WrappedCurrentBlockHeight(CurrentBlockHeight(currentBlockHeight)) =>
// update watchers, and remove triggers with no more active watchers
val newTriggers = triggers.collect(m => m._2.update(currentBlockHeight) match {
case Some(t) => m._1 -> t
})
watching(switchboard, newTriggers)
case WrappedPeerReadyResult(PeerReadyNotifier.PeerReady(remoteNodeId, _)) =>
// notify watcher that destination peer is ready to receive async payments; PeerReadyNotifier will stop itself
triggers(remoteNodeId).trigger()
watching(switchboard, triggers - remoteNodeId)
case m => context.log.error(s"received unhandled message ${m.getClass.getSimpleName} after Start received.")
Behaviors.same
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package fr.acinq.eclair.payment.relay

import akka.actor.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.{TypedActorContextOps, TypedActorRefOps}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.{ActorRef, typed}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC}
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.payment.IncomingPaymentPacket.NodeRelayPacket
Expand Down Expand Up @@ -63,7 +62,7 @@ object NodeRelay {
private case class WrappedPreimageReceived(preimageReceived: PreimageReceived) extends Command
private case class WrappedPaymentSent(paymentSent: PaymentSent) extends Command
private case class WrappedPaymentFailed(paymentFailed: PaymentFailed) extends Command
private case class WrappedCurrentBlockHeight(currentBlockHeight: BlockHeight) extends Command
private case class WrappedPeerReadyResult(result: AsyncPaymentTriggerer.Result) extends Command
// @formatter:on

trait OutgoingPaymentFactory {
Expand All @@ -87,7 +86,8 @@ object NodeRelay {
register: ActorRef,
relayId: UUID,
nodeRelayPacket: NodeRelayPacket,
outgoingPaymentFactory: OutgoingPaymentFactory): Behavior[Command] =
outgoingPaymentFactory: OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command]): Behavior[Command] =
Behaviors.setup { context =>
val paymentHash = nodeRelayPacket.add.paymentHash
val totalAmountIn = nodeRelayPacket.outerPayload.totalAmount
Expand All @@ -101,8 +101,8 @@ object NodeRelay {
context.messageAdapter[MultiPartPaymentFSM.MultiPartPaymentFailed](WrappedMultiPartPaymentFailed)
context.messageAdapter[MultiPartPaymentFSM.MultiPartPaymentSucceeded](WrappedMultiPartPaymentSucceeded)
}.toClassic
val incomingPaymentHandler = context.actorOf(MultiPartPaymentFSM.props(nodeParams, paymentHash, totalAmountIn, mppFsmAdapters))
new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory)
val incomingPaymentHandler: ActorRef = context.actorOf(MultiPartPaymentFSM.props(nodeParams, paymentHash, totalAmountIn, mppFsmAdapters))
new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer)
.receiving(Queue.empty, nodeRelayPacket.innerPayload, nodeRelayPacket.nextPacket, incomingPaymentHandler)
}
}
Expand Down Expand Up @@ -170,7 +170,8 @@ class NodeRelay private(nodeParams: NodeParams,
paymentHash: ByteVector32,
paymentSecret: ByteVector32,
context: ActorContext[NodeRelay.Command],
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory) {
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command]) {

import NodeRelay._

Expand Down Expand Up @@ -218,27 +219,26 @@ class NodeRelay private(nodeParams: NodeParams,
val timeoutBlock: BlockHeight = nodeParams.currentBlockHeight + nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks
// a trigger must be received `cancelSafetyBeforeTimeoutBlocks` before the incoming payment cltv expiry
val safetyBlock: BlockHeight = (upstream.expiryIn - nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout).blockHeight
val messageAdapter = context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))
context.system.eventStream ! EventStream.Subscribe[CurrentBlockHeight](messageAdapter)
// wait for notification until which ever occurs first: the hold timeout block or the safety block
val notifierTimeout: BlockHeight = Seq(timeoutBlock, safetyBlock).min
val peerReadyResultAdapter = context.messageAdapter[AsyncPaymentTriggerer.Result](WrappedPeerReadyResult)

// TODO: send the WaitingToRelayPayment message to an actor that watches for the payment receiver to come back online before sending the RelayAsyncPayment message
triggerer ! AsyncPaymentTriggerer.Watch(peerReadyResultAdapter, nextPayload.outgoingNodeId, paymentHash, notifierTimeout)
context.system.eventStream ! EventStream.Publish(WaitingToRelayPayment(nextPayload.outgoingNodeId, paymentHash))
Behaviors.receiveMessagePartial {
case WrappedCurrentBlockHeight(blockHeight) if blockHeight >= safetyBlock =>
context.log.warn(s"rejecting async payment at block $blockHeight; was not triggered ${nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout} safety blocks before upstream cltv expiry at ${upstream.expiryIn}")
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case WrappedCurrentBlockHeight(blockHeight) if blockHeight >= timeoutBlock =>
context.log.warn(s"rejecting async payment at block $blockHeight; was not triggered after waiting ${nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks} blocks")
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentTimeout) =>
if (safetyBlock < timeoutBlock) {
context.log.warn(s"rejecting async payment; was not triggered ${nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout} safety blocks before upstream cltv expiry of ${upstream.expiryIn}")
} else {
context.log.warn(s"rejecting async payment; was not triggered after waiting ${nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks} blocks")
}
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case WrappedCurrentBlockHeight(blockHeight) =>
Behaviors.same
case CancelAsyncPayment =>
context.log.warn(s"payment sender canceled a waiting async payment")
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case RelayAsyncPayment =>
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentTriggered) =>
doSend(upstream, nextPayload, nextPacket)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair.payment.relay

import akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.ByteVector32
Expand Down Expand Up @@ -56,7 +57,7 @@ object NodeRelayer {
* NB: the payment secret used here is different from the invoice's payment secret and ensures we can
* group together HTLCs that the previous trampoline node sent in the same MPP.
*/
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) {
Behaviors.receiveMessage {
Expand All @@ -71,15 +72,15 @@ object NodeRelayer {
case None =>
val relayId = UUID.randomUUID()
context.log.debug(s"spawning a new handler with relayId=$relayId")
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory), relayId.toString)
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer), relayId.toString)
context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId)
handler ! NodeRelay.Relay(nodeRelayPacket)
apply(nodeParams, register, outgoingPaymentFactory, children + (childKey -> handler))
apply(nodeParams, register, outgoingPaymentFactory, triggerer, children + (childKey -> handler))
}
case RelayComplete(childHandler, paymentHash, paymentSecret) =>
// we do a back-and-forth between parent and child before stopping the child to prevent a race condition
childHandler ! NodeRelay.Stop
apply(nodeParams, register, outgoingPaymentFactory, children - PaymentKey(paymentHash, paymentSecret))
apply(nodeParams, register, outgoingPaymentFactory, triggerer, children - PaymentKey(paymentHash, paymentSecret))
case GetPendingPayments(replyTo) =>
replyTo ! children
Behaviors.same
Expand Down
Loading

0 comments on commit a1bd1b4

Please sign in to comment.