Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework channel reestablish #2036

Merged
merged 3 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 71 additions & 115 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.Commitments.PostRevocationAction
import fr.acinq.eclair.channel.Helpers.{Closing, Funding, getRelayFees}
import fr.acinq.eclair.channel.Helpers.Syncing.SyncResult
import fr.acinq.eclair.channel.Helpers.{Closing, Funding, Syncing, getRelayFees}
import fr.acinq.eclair.channel.Monitoring.Metrics.ProcessMessage
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.publish.TxPublisher
Expand Down Expand Up @@ -1662,44 +1663,38 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
goto(WAIT_FOR_FUNDING_LOCKED) sending fundingLocked

case Event(channelReestablish: ChannelReestablish, d: DATA_NORMAL) =>
var sendQueue = Queue.empty[LightningMessage]
val channelKeyPath = keyManager.keyPath(d.commitments.localParams, d.commitments.channelConfig)
channelReestablish match {
case ChannelReestablish(_, _, nextRemoteRevocationNumber, yourLastPerCommitmentSecret, _, _) if !Helpers.checkLocalCommit(d, nextRemoteRevocationNumber) =>
// if next_remote_revocation_number is greater than our local commitment index, it means that either we are using an outdated commitment, or they are lying
// but first we need to make sure that the last per_commitment_secret that they claim to have received from us is correct for that next_remote_revocation_number minus 1
if (keyManager.commitmentSecret(channelKeyPath, nextRemoteRevocationNumber - 1) == yourLastPerCommitmentSecret) {
log.warning(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourCommitmentNumber=${d.commitments.localCommit.index} theirCommitmentNumber=$nextRemoteRevocationNumber")
// their data checks out, we indeed seem to be using an old revoked commitment, and must absolutely *NOT* publish it, because that would be a cheating attempt and they
// would punish us by taking all the funds in the channel
val exc = PleasePublishYourCommitment(d.channelId)
val error = Error(d.channelId, exc.getMessage)
goto(WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) using DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT(d.commitments, channelReestablish) storing() sending error
} else {
// they lied! the last per_commitment_secret they claimed to have received from us is invalid
throw InvalidRevokedCommitProof(d.channelId, d.commitments.localCommit.index, nextRemoteRevocationNumber, yourLastPerCommitmentSecret)
}
case ChannelReestablish(_, nextLocalCommitmentNumber, _, _, _, _) if !Helpers.checkRemoteCommit(d, nextLocalCommitmentNumber) =>
// if next_local_commit_number is more than one more our remote commitment index, it means that either we are using an outdated commitment, or they are lying
log.warning(s"counterparty says that they have a more recent commitment than the one we know of!!! ourCommitmentNumber=${d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.index).getOrElse(d.commitments.remoteCommit.index)} theirCommitmentNumber=$nextLocalCommitmentNumber")
// there is no way to make sure that they are saying the truth, the best thing to do is ask them to publish their commitment right now
// maybe they will publish their commitment, in that case we need to remember their commitment point in order to be able to claim our outputs
// not that if they don't comply, we could publish our own commitment (it is not stale, otherwise we would be in the case above)
val exc = PleasePublishYourCommitment(d.channelId)
val error = Error(d.channelId, exc.getMessage)
goto(WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) using DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT(d.commitments, channelReestablish) storing() sending error
case _ =>
Syncing.checkSync(keyManager, d, channelReestablish) match {
case syncFailure: SyncResult.Failure =>
handleSyncFailure(channelReestablish, syncFailure, d)
case syncSuccess: SyncResult.Success =>
var sendQueue = Queue.empty[LightningMessage]
// normal case, our data is up-to-date
if (channelReestablish.nextLocalCommitmentNumber == 1 && d.commitments.localCommit.index == 0) {
// If next_local_commitment_number is 1 in both the channel_reestablish it sent and received, then the node MUST retransmit funding_locked, otherwise it MUST NOT
log.debug("re-sending fundingLocked")
val channelKeyPath = keyManager.keyPath(d.commitments.localParams, d.commitments.channelConfig)
val nextPerCommitmentPoint = keyManager.commitmentPoint(channelKeyPath, 1)
val fundingLocked = FundingLocked(d.commitments.channelId, nextPerCommitmentPoint)
sendQueue = sendQueue :+ fundingLocked
}

val (commitments1, sendQueue1) = handleSync(channelReestablish, d)
sendQueue = sendQueue ++ sendQueue1
// we may need to retransmit updates and/or commit_sig and/or revocation
sendQueue = sendQueue ++ syncSuccess.retransmit

// then we clean up unsigned updates
val commitments1 = Commitments.discardUnsignedUpdates(d.commitments)

commitments1.remoteNextCommitInfo match {
case Left(_) =>
// we expect them to (re-)send the revocation immediately
startSingleTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), nodeParams.revocationTimeout)
case _ => ()
}

// do I have something to sign?
if (Commitments.localHasChanges(commitments1)) {
self ! CMD_SIGN()
}

// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
d.localShutdown.foreach {
Expand Down Expand Up @@ -1756,11 +1751,15 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)

case Event(channelReestablish: ChannelReestablish, d: DATA_SHUTDOWN) =>
var sendQueue = Queue.empty[LightningMessage]
val (commitments1, sendQueue1) = handleSync(channelReestablish, d)
sendQueue = sendQueue ++ sendQueue1 :+ d.localShutdown
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
goto(SHUTDOWN) using d.copy(commitments = commitments1) sending sendQueue
Syncing.checkSync(keyManager, d, channelReestablish) match {
case syncFailure: SyncResult.Failure =>
handleSyncFailure(channelReestablish, syncFailure, d)
case syncSuccess: SyncResult.Success =>
val commitments1 = Commitments.discardUnsignedUpdates(d.commitments)
val sendQueue = Queue.empty[LightningMessage] ++ syncSuccess.retransmit :+ d.localShutdown
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
goto(SHUTDOWN) using d.copy(commitments = commitments1) sending sendQueue
}

case Event(_: ChannelReestablish, d: DATA_NEGOTIATING) =>
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
Expand Down Expand Up @@ -2229,6 +2228,29 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
stay() using d.copy(channelUpdate = channelUpdate1) storing()
}

private def handleSyncFailure(channelReestablish: ChannelReestablish, syncFailure: SyncResult.Failure, d: HasCommitments) = {
syncFailure match {
case res: SyncResult.LocalLateProven =>
log.error(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourLocalCommitmentNumber=${res.ourLocalCommitmentNumber} theirRemoteCommitmentNumber=${res.theirRemoteCommitmentNumber}")
// their data checks out, we indeed seem to be using an old revoked commitment, and must absolutely *NOT* publish it, because that would be a cheating attempt and they
// would punish us by taking all the funds in the channel
handleOutdatedCommitment(channelReestablish, d)
case res: Syncing.SyncResult.LocalLateUnproven =>
log.error(s"our local commitment is in sync, but counterparty says that they have a more recent remote commitment than the one we know of (they could be lying)!!! ourRemoteCommitmentNumber=${res.ourRemoteCommitmentNumber} theirCommitmentNumber=${res.theirLocalCommitmentNumber}")
// there is no way to make sure that they are saying the truth, the best thing to do is "call their bluff" and
// ask them to publish their commitment right now. If they weren't lying and they do publish their commitment,
// we need to remember their commitment point in order to be able to claim our outputs
handleOutdatedCommitment(channelReestablish, d)
case res: Syncing.SyncResult.RemoteLying =>
log.error(s"counterparty is lying about us having an outdated commitment!!! ourLocalCommitmentNumber=${res.ourLocalCommitmentNumber} theirRemoteCommitmentNumber=${res.theirRemoteCommitmentNumber}")
// they are deliberately trying to fool us into thinking we have a late commitment
handleLocalError(InvalidRevokedCommitProof(d.channelId, res.ourLocalCommitmentNumber, res.theirRemoteCommitmentNumber, res.invalidPerCommitmentSecret), d, Some(channelReestablish))
case SyncResult.RemoteLate =>
log.error("counterparty appears to be using an outdated commitment, they may request a force-close, standing by...")
stay()
}
}

private def maybeEmitChannelUpdateChangedEvent(newUpdate: ChannelUpdate, oldUpdate_opt: Option[ChannelUpdate], d: DATA_NORMAL): Unit = {
if (oldUpdate_opt.isEmpty || !Announcements.areSameIgnoreFlags(newUpdate, oldUpdate_opt.get)) {
context.system.eventStream.publish(ChannelUpdateParametersChanged(self, d.channelId, newUpdate.shortChannelId, d.commitments.remoteNodeId, newUpdate))
Expand Down Expand Up @@ -2267,13 +2289,16 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
private def handleLocalError(cause: Throwable, d: ChannelData, msg: Option[Any]) = {
cause match {
case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request")
case _ if stateName == WAIT_FOR_OPEN_CHANNEL => log.warning(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
case _ => log.error(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
}
cause match {
case _: ChannelException => ()
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData")
case _ if msg.exists(_.isInstanceOf[OpenChannel]) || msg.exists(_.isInstanceOf[AcceptChannel]) =>
// invalid remote channel parameters are logged as warning
log.warning(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
case _: ChannelException =>
log.error(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
case _ =>
// unhandled error: we dump the channel data, and print the stack trace
log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData:")
}

val error = Error(d.channelId, cause.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = true))

Expand Down Expand Up @@ -2535,79 +2560,10 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
goto(ERR_INFORMATION_LEAK) calling doPublish(localCommitPublished, d.commitments) sending error
}

private def handleSync(channelReestablish: ChannelReestablish, d: HasCommitments): (Commitments, Queue[LightningMessage]) = {
var sendQueue = Queue.empty[LightningMessage]
// first we clean up unacknowledged updates
log.debug("discarding proposed OUT: {}", d.commitments.localChanges.proposed.map(Commitments.msg2String(_)).mkString(","))
log.debug("discarding proposed IN: {}", d.commitments.remoteChanges.proposed.map(Commitments.msg2String(_)).mkString(","))
val commitments1 = d.commitments.copy(
localChanges = d.commitments.localChanges.copy(proposed = Nil),
remoteChanges = d.commitments.remoteChanges.copy(proposed = Nil),
localNextHtlcId = d.commitments.localNextHtlcId - d.commitments.localChanges.proposed.collect { case u: UpdateAddHtlc => u }.size,
remoteNextHtlcId = d.commitments.remoteNextHtlcId - d.commitments.remoteChanges.proposed.collect { case u: UpdateAddHtlc => u }.size)
log.debug(s"localNextHtlcId=${d.commitments.localNextHtlcId}->${commitments1.localNextHtlcId}")
log.debug(s"remoteNextHtlcId=${d.commitments.remoteNextHtlcId}->${commitments1.remoteNextHtlcId}")

def resendRevocation(): Unit = {
// let's see the state of remote sigs
if (commitments1.localCommit.index == channelReestablish.nextRemoteRevocationNumber) {
// nothing to do
} else if (commitments1.localCommit.index == channelReestablish.nextRemoteRevocationNumber + 1) {
// our last revocation got lost, let's resend it
log.debug("re-sending last revocation")
val channelKeyPath = keyManager.keyPath(d.commitments.localParams, d.commitments.channelConfig)
val localPerCommitmentSecret = keyManager.commitmentSecret(channelKeyPath, d.commitments.localCommit.index - 1)
val localNextPerCommitmentPoint = keyManager.commitmentPoint(channelKeyPath, d.commitments.localCommit.index + 1)
val revocation = RevokeAndAck(
channelId = commitments1.channelId,
perCommitmentSecret = localPerCommitmentSecret,
nextPerCommitmentPoint = localNextPerCommitmentPoint
)
sendQueue = sendQueue :+ revocation
} else throw RevocationSyncError(d.channelId)
}

// re-sending sig/rev (in the right order)
commitments1.remoteNextCommitInfo match {
case Left(waitingForRevocation) if waitingForRevocation.nextRemoteCommit.index + 1 == channelReestablish.nextLocalCommitmentNumber =>
// we had sent a new sig and were waiting for their revocation
// they had received the new sig but their revocation was lost during the disconnection
// they will send us the revocation, nothing to do here
log.debug("waiting for them to re-send their last revocation")
resendRevocation()
case Left(waitingForRevocation) if waitingForRevocation.nextRemoteCommit.index == channelReestablish.nextLocalCommitmentNumber =>
// we had sent a new sig and were waiting for their revocation
// they didn't receive the new sig because of the disconnection
// we just resend the same updates and the same sig

val revWasSentLast = commitments1.localCommit.index > waitingForRevocation.sentAfterLocalCommitIndex
if (!revWasSentLast) resendRevocation()

log.debug("re-sending previously local signed changes: {}", commitments1.localChanges.signed.map(Commitments.msg2String(_)).mkString(","))
commitments1.localChanges.signed.foreach(revocation => sendQueue = sendQueue :+ revocation)
log.debug("re-sending the exact same previous sig")
sendQueue = sendQueue :+ waitingForRevocation.sent

if (revWasSentLast) resendRevocation()
case Right(_) if commitments1.remoteCommit.index + 1 == channelReestablish.nextLocalCommitmentNumber =>
// there wasn't any sig in-flight when the disconnection occurred
resendRevocation()
case _ => throw CommitmentSyncError(d.channelId)
}

commitments1.remoteNextCommitInfo match {
case Left(_) =>
// we expect them to (re-)send the revocation immediately
startSingleTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), nodeParams.revocationTimeout)
case _ => ()
}

// have I something to sign?
if (Commitments.localHasChanges(commitments1)) {
self ! CMD_SIGN()
}

(commitments1, sendQueue)
private def handleOutdatedCommitment(channelReestablish: ChannelReestablish, d: HasCommitments) = {
val exc = PleasePublishYourCommitment(d.channelId)
val error = Error(d.channelId, exc.getMessage)
goto(WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) using DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT(d.commitments, channelReestablish) storing() sending error
}

/**
Expand Down Expand Up @@ -2696,7 +2652,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
}

// we let the peer decide what to do
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Escalate }
override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Escalate }

override def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
KamonExt.time(ProcessMessage.withTag("MessageType", msg.getClass.getSimpleName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ case class CannotSignWithoutChanges (override val channelId: Byte
case class CannotSignBeforeRevocation (override val channelId: ByteVector32) extends ChannelException(channelId, "cannot sign until next revocation hash is received")
case class UnexpectedRevocation (override val channelId: ByteVector32) extends ChannelException(channelId, "received unexpected RevokeAndAck message")
case class InvalidRevocation (override val channelId: ByteVector32) extends ChannelException(channelId, "invalid revocation")
case class InvalidRevokedCommitProof (override val channelId: ByteVector32, ourCommitmentNumber: Long, theirCommitmentNumber: Long, perCommitmentSecret: PrivateKey) extends ChannelException(channelId, s"counterparty claimed that we have a revoked commit but their proof doesn't check out: ourCommitmentNumber=$ourCommitmentNumber theirCommitmentNumber=$theirCommitmentNumber perCommitmentSecret=$perCommitmentSecret")
case class CommitmentSyncError (override val channelId: ByteVector32) extends ChannelException(channelId, "commitment sync error")
case class RevocationSyncError (override val channelId: ByteVector32) extends ChannelException(channelId, "revocation sync error")
case class InvalidRevokedCommitProof (override val channelId: ByteVector32, ourLocalCommitmentNumber: Long, theirRemoteCommitmentNumber: Long, invalidPerCommitmentSecret: PrivateKey) extends ChannelException(channelId, s"counterparty claimed that we have a revoked commit but their proof doesn't check out: ourCommitmentNumber=$ourLocalCommitmentNumber theirCommitmentNumber=$theirRemoteCommitmentNumber perCommitmentSecret=$invalidPerCommitmentSecret")
case class InvalidFailureCode (override val channelId: ByteVector32) extends ChannelException(channelId, "UpdateFailMalformedHtlc message doesn't have BADONION bit set")
case class PleasePublishYourCommitment (override val channelId: ByteVector32) extends ChannelException(channelId, "please publish your local commitment")
case class CommandUnavailableInThisState (override val channelId: ByteVector32, command: String, state: ChannelState) extends ChannelException(channelId, s"cannot execute command=$command in state=$state")
Expand Down
Loading