Skip to content

Commit

Permalink
Add ChannelAborted event (#2593)
Browse files Browse the repository at this point in the history
That additional event lets subcribers know when a channel was closed
without ever being used.

The possible flows for a channel lifecycle are now:

- ChannelCreated -> ChannelOpened -> ChannelClosed
- ChannelCreated -> ChannelOpened -> ChannelAborted
- ChannelCreated -> ChannelAborted
- ChannelCreated -> ChannelClosed
- ChannelAborted

Co-authored-by: Richard Myers <[email protected]>
  • Loading branch information
t-bast and remyers authored Feb 3, 2023
1 parent a54ae22 commit 6483896
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ case class ChannelRestored(channel: ActorRef, channelId: ByteVector32, peer: Act

case class ChannelIdAssigned(channel: ActorRef, remoteNodeId: PublicKey, temporaryChannelId: ByteVector32, channelId: ByteVector32) extends ChannelEvent

/**
* This event will be sent whenever a new scid is assigned to the channel, be it a real, local alias or remote alias.
*/
/** This event will be sent whenever a new scid is assigned to the channel, be it a real, local alias or remote alias. */
case class ShortChannelIdAssigned(channel: ActorRef, channelId: ByteVector32, shortIds: ShortIds, remoteNodeId: PublicKey) extends ChannelEvent

/** This event will be sent if a channel was aborted before completing the opening flow. */
case class ChannelAborted(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32) extends ChannelEvent

/** This event will be sent once a channel has been successfully opened and is ready to process payments. */
case class ChannelOpened(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32) extends ChannelEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// NB: order matters!
case closing: DATA_CLOSING if Closing.nothingAtStake(closing) =>
log.info("we have nothing at stake, going straight to CLOSED")
context.system.eventStream.publish(ChannelAborted(self, remoteNodeId, closing.channelId))
goto(CLOSED) using closing
case closing: DATA_CLOSING =>
val isInitiator = closing.metaCommitments.params.localParams.isInitiator
Expand Down Expand Up @@ -1052,7 +1053,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with

case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_CLOSING) if getTxResponse.txid == d.metaCommitments.latest.fundingTxId =>
// NB: waitingSinceBlock contains the block at which closing was initiated, not the block at which funding was initiated.
// That means we're lenient with our peer and give its funding tx more time to confirm, to avoid having to store two distinct
// That means we're lenient with our peer and give its funding tx more time to confirm, to avoid having to store two distinct
// waitingSinceBlock (e.g. closingWaitingSinceBlock and fundingWaitingSinceBlock).
handleGetFundingTx(getTxResponse, d.waitingSince, d.metaCommitments.latest.localFundingStatus.signedTx_opt)

Expand Down Expand Up @@ -1211,7 +1212,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
.collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, amount = add.amountMsat, add.paymentHash)) }
// then let's see if any of the possible close scenarios can be considered done
val closingType_opt = Closing.isClosed(d1, Some(tx))
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay() (note that we don't store the state)
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay()
closingType_opt match {
case Some(closingType) =>
log.info(s"channel closed (type=${closingType_opt.map(c => EventType.Closed(c).label).getOrElse("UnknownYet")})")
Expand Down Expand Up @@ -1685,6 +1686,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case (_: DATA_WAIT_FOR_DUAL_FUNDING_READY, d2: DATA_NORMAL) => maybeEmitChannelUpdateChangedEvent(newUpdate = d2.channelUpdate, oldUpdate_opt = None, d2)
case _ => ()
}

// Notify when the channel was aborted.
(stateData, nextState) match {
case (_: TransientChannelData, CLOSING | CLOSED) => context.system.eventStream.publish(ChannelAborted(self, remoteNodeId, stateData.channelId))
case (_: DATA_WAIT_FOR_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_CHANNEL_READY, CLOSING | CLOSED) => context.system.eventStream.publish(ChannelAborted(self, remoteNodeId, stateData.channelId))
case (_: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED | _: DATA_WAIT_FOR_DUAL_FUNDING_READY, CLOSING | CLOSED) => context.system.eventStream.publish(ChannelAborted(self, remoteNodeId, stateData.channelId))
case _ => ()
}
}

/** Metrics */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import scala.concurrent.duration._

class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {

case class FixtureParam(alice: TestFSMRef[ChannelState, ChannelData, Channel], bob: TestFSMRef[ChannelState, ChannelData, Channel], aliceOrigin: TestProbe, alice2bob: TestProbe, bob2alice: TestProbe, alice2blockchain: TestProbe)
case class FixtureParam(alice: TestFSMRef[ChannelState, ChannelData, Channel], bob: TestFSMRef[ChannelState, ChannelData, Channel], aliceOrigin: TestProbe, alice2bob: TestProbe, bob2alice: TestProbe, alice2blockchain: TestProbe, listener: TestProbe)

override def withFixture(test: OneArgTest): Outcome = {
import com.softwaremill.quicklens._
Expand All @@ -62,14 +62,16 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
val commitTxFeerate = if (channelType.isInstanceOf[ChannelTypes.AnchorOutputs] || channelType.isInstanceOf[ChannelTypes.AnchorOutputsZeroFeeHtlcTx]) TestConstants.anchorOutputsFeeratePerKw else TestConstants.feeratePerKw
val aliceInit = Init(aliceParams.initFeatures)
val bobInit = Init(bobParams.initFeatures)
val listener = TestProbe()
within(30 seconds) {
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelAborted])
val fundingAmount = if (test.tags.contains(ChannelStateTestsTags.Wumbo)) Btc(5).toSatoshi else TestConstants.fundingSatoshis
alice ! INPUT_INIT_CHANNEL_INITIATOR(ByteVector32.Zeroes, fundingAmount, dualFunded = false, commitTxFeerate, TestConstants.feeratePerKw, Some(TestConstants.initiatorPushAmount), requireConfirmedInputs = false, aliceParams, alice2bob.ref, bobInit, channelFlags, channelConfig, channelType)
bob ! INPUT_INIT_CHANNEL_NON_INITIATOR(ByteVector32.Zeroes, None, dualFunded = false, None, bobParams, bob2alice.ref, aliceInit, channelConfig, channelType)
alice2bob.expectMsgType[OpenChannel]
alice2bob.forward(bob)
awaitCond(alice.stateName == WAIT_FOR_ACCEPT_CHANNEL)
withFixture(test.toNoArgTest(FixtureParam(alice, bob, aliceOrigin, alice2bob, bob2alice, alice2blockchain)))
withFixture(test.toNoArgTest(FixtureParam(alice, bob, aliceOrigin, alice2bob, bob2alice, alice2blockchain, listener)))
}
}

Expand Down Expand Up @@ -132,6 +134,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
assert(accept.channelType_opt.contains(ChannelTypes.AnchorOutputsZeroFeeHtlcTx()))
bob2alice.forward(alice, accept.copy(tlvStream = TlvStream.empty))
alice2bob.expectMsg(Error(accept.temporaryChannelId, "option_channel_type was negotiated but channel_type is missing"))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -155,6 +158,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
// type negotiation so Alice needs to abort because the channel types won't match.
bob2alice.forward(alice, accept.copy(tlvStream = TlvStream(ChannelTlv.UpfrontShutdownScriptTlv(ByteVector.empty))))
alice2bob.expectMsg(Error(accept.temporaryChannelId, "invalid channel_type=anchor_outputs_zero_fee_htlc_tx, expected channel_type=standard"))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand Down Expand Up @@ -187,6 +191,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
val invalidAccept = accept.copy(tlvStream = TlvStream(ChannelTlv.UpfrontShutdownScriptTlv(ByteVector.empty), ChannelTlv.ChannelTypeTlv(ChannelTypes.AnchorOutputs())))
bob2alice.forward(alice, invalidAccept)
alice2bob.expectMsg(Error(accept.temporaryChannelId, "invalid channel_type=anchor_outputs, expected channel_type=standard"))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -199,6 +204,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(maxAcceptedHtlcs = invalidMaxAcceptedHtlcs)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, InvalidMaxAcceptedHtlcs(accept.temporaryChannelId, invalidMaxAcceptedHtlcs, Channel.MAX_ACCEPTED_HTLCS).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -211,6 +217,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(dustLimitSatoshis = lowDustLimitSatoshis)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, DustLimitTooSmall(accept.temporaryChannelId, lowDustLimitSatoshis, Channel.MIN_DUST_LIMIT).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -222,6 +229,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(dustLimitSatoshis = highDustLimitSatoshis)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, DustLimitTooLarge(accept.temporaryChannelId, highDustLimitSatoshis, Alice.nodeParams.channelConf.maxRemoteDustLimit).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -233,6 +241,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(toSelfDelay = delayTooHigh)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, ToSelfDelayTooHigh(accept.temporaryChannelId, delayTooHigh, Alice.nodeParams.channelConf.maxToLocalDelay).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -245,6 +254,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(channelReserveSatoshis = reserveTooHigh)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, ChannelReserveTooHigh(accept.temporaryChannelId, reserveTooHigh, 0.3, 0.05).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -256,6 +266,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(channelReserveSatoshis = reserveTooSmall)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, DustLimitTooLarge(accept.temporaryChannelId, accept.dustLimitSatoshis, reserveTooSmall).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -268,6 +279,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(channelReserveSatoshis = reserveTooSmall)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, ChannelReserveBelowOurDustLimit(accept.temporaryChannelId, reserveTooSmall, open.dustLimitSatoshis).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -280,6 +292,7 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
alice ! accept.copy(dustLimitSatoshis = dustTooBig)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, DustLimitAboveOurChannelReserve(accept.temporaryChannelId, dustTooBig, open.channelReserveSatoshis).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand Down Expand Up @@ -319,13 +332,15 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
val accept = bob2alice.expectMsgType[AcceptChannel]
val accept1 = accept.copy(tlvStream = TlvStream(ChannelTlv.UpfrontShutdownScriptTlv(ByteVector.fromValidHex("deadbeef"))))
bob2alice.forward(alice, accept1)
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}

test("recv Error") { f =>
import f._
alice ! Error(ByteVector32.Zeroes, "oops")
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -336,20 +351,23 @@ class WaitForAcceptChannelStateSpec extends TestKitBaseClass with FixtureAnyFunS
val c = CMD_CLOSE(sender.ref, None, None)
alice ! c
sender.expectMsg(RES_SUCCESS(c, ByteVector32.Zeroes))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[ChannelOpenResponse.ChannelClosed]
}

test("recv INPUT_DISCONNECTED") { f =>
import f._
alice ! INPUT_DISCONNECTED
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}

test("recv TickChannelOpenTimeout") { f =>
import f._
alice ! TickChannelOpenTimeout
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
val bobRequiresConfirmedInputs = "bob_requires_confirmed_inputs"
val dualFundingContribution = "dual_funding_contribution"

case class FixtureParam(alice: TestFSMRef[ChannelState, ChannelData, Channel], bob: TestFSMRef[ChannelState, ChannelData, Channel], open: OpenDualFundedChannel, aliceOrigin: TestProbe, alice2bob: TestProbe, bob2alice: TestProbe)
case class FixtureParam(alice: TestFSMRef[ChannelState, ChannelData, Channel], bob: TestFSMRef[ChannelState, ChannelData, Channel], open: OpenDualFundedChannel, aliceOrigin: TestProbe, alice2bob: TestProbe, bob2alice: TestProbe, listener: TestProbe)

override def withFixture(test: OneArgTest): Outcome = {
val bobNodeParams = if (test.tags.contains(bobRequiresConfirmedInputs)) {
Expand All @@ -55,13 +55,15 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
val bobInit = Init(bobParams.initFeatures)
val nonInitiatorContribution = if (test.tags.contains(dualFundingContribution)) Some(TestConstants.nonInitiatorFundingSatoshis) else None
val nonInitiatorPushAmount = if (test.tags.contains(ChannelStateTestsTags.NonInitiatorPushAmount)) Some(TestConstants.nonInitiatorPushAmount) else None
val listener = TestProbe()
within(30 seconds) {
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelAborted])
alice ! INPUT_INIT_CHANNEL_INITIATOR(ByteVector32.Zeroes, TestConstants.fundingSatoshis, dualFunded = true, TestConstants.anchorOutputsFeeratePerKw, TestConstants.feeratePerKw, None, requireConfirmedInputs = false, aliceParams, alice2bob.ref, bobInit, ChannelFlags.Private, channelConfig, channelType)
bob ! INPUT_INIT_CHANNEL_NON_INITIATOR(ByteVector32.Zeroes, nonInitiatorContribution, dualFunded = true, nonInitiatorPushAmount, bobParams, bob2alice.ref, aliceInit, channelConfig, channelType)
val open = alice2bob.expectMsgType[OpenDualFundedChannel]
alice2bob.forward(bob, open)
awaitCond(alice.stateName == WAIT_FOR_ACCEPT_DUAL_FUNDED_CHANNEL)
withFixture(test.toNoArgTest(FixtureParam(alice, bob, open, aliceOrigin, alice2bob, bob2alice)))
withFixture(test.toNoArgTest(FixtureParam(alice, bob, open, aliceOrigin, alice2bob, bob2alice, listener)))
}
}

Expand Down Expand Up @@ -137,6 +139,7 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
alice ! accept.copy(fundingAmount = 25_000 sat)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, InvalidPushAmount(accept.temporaryChannelId, TestConstants.nonInitiatorPushAmount, 25_000_000 msat).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -148,6 +151,7 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
alice ! accept.copy(maxAcceptedHtlcs = invalidMaxAcceptedHtlcs)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, InvalidMaxAcceptedHtlcs(accept.temporaryChannelId, invalidMaxAcceptedHtlcs, Channel.MAX_ACCEPTED_HTLCS).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -159,6 +163,7 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
alice ! accept.copy(dustLimit = lowDustLimit)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, DustLimitTooSmall(accept.temporaryChannelId, lowDustLimit, Channel.MIN_DUST_LIMIT).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -170,6 +175,7 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
alice ! accept.copy(dustLimit = highDustLimit)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, DustLimitTooLarge(accept.temporaryChannelId, highDustLimit, Alice.nodeParams.channelConf.maxRemoteDustLimit).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -181,13 +187,15 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
alice ! accept.copy(toSelfDelay = delayTooHigh)
val error = alice2bob.expectMsgType[Error]
assert(error == Error(accept.temporaryChannelId, ToSelfDelayTooHigh(accept.temporaryChannelId, delayTooHigh, Alice.nodeParams.channelConf.maxToLocalDelay).getMessage))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}

test("recv Error", Tag(ChannelStateTestsTags.DualFunding), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f =>
import f._
alice ! Error(ByteVector32.Zeroes, "dual funding not supported")
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand All @@ -198,20 +206,23 @@ class WaitForAcceptDualFundedChannelStateSpec extends TestKitBaseClass with Fixt
val c = CMD_CLOSE(sender.ref, None, None)
alice ! c
sender.expectMsg(RES_SUCCESS(c, ByteVector32.Zeroes))
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[ChannelOpenResponse.ChannelClosed]
}

test("recv INPUT_DISCONNECTED", Tag(ChannelStateTestsTags.DualFunding), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f =>
import f._
alice ! INPUT_DISCONNECTED
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}

test("recv TickChannelOpenTimeout", Tag(ChannelStateTestsTags.DualFunding), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f =>
import f._
alice ! TickChannelOpenTimeout
listener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOrigin.expectMsgType[Status.Failure]
}
Expand Down
Loading

0 comments on commit 6483896

Please sign in to comment.