-
Notifications
You must be signed in to change notification settings - Fork 267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extended API for fine-grained payment tracking #867
Changes from 8 commits
a29b5f3
eb401bd
59da1ac
fe33608
2598e9a
c87239e
fc5006c
8d696f1
25a8866
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ case class NodeParams(keyManager: KeyManager, | |
pendingRelayDb: PendingRelayDb, | ||
paymentsDb: PaymentsDb, | ||
auditDb: AuditDb, | ||
onChainRefundsDb: OnChainRefundsDb, | ||
revocationTimeout: FiniteDuration, | ||
pingInterval: FiniteDuration, | ||
pingTimeout: FiniteDuration, | ||
|
@@ -150,6 +151,7 @@ object NodeParams { | |
val networkDb = new SqliteNetworkDb(sqliteNetwork) | ||
|
||
val sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(chaindir, "audit.sqlite")}") | ||
val onChainRefundsDb = new SqliteOnChainRefundsDb(sqliteAudit) | ||
val auditDb = new SqliteAuditDb(sqliteAudit) | ||
|
||
val color = BinaryData(config.getString("node-color")) | ||
|
@@ -179,7 +181,7 @@ object NodeParams { | |
val p = PublicKey(e.getString("nodeid")) | ||
val gf = BinaryData(e.getString("global-features")) | ||
val lf = BinaryData(e.getString("local-features")) | ||
(p -> (gf, lf)) | ||
(p, (gf, lf)) | ||
}.toMap | ||
|
||
val socksProxy_opt = if (config.getBoolean("socks5.enabled")) { | ||
|
@@ -202,7 +204,7 @@ object NodeParams { | |
NodeParams( | ||
keyManager = keyManager, | ||
alias = nodeAlias, | ||
color = Color(color.data(0), color.data(1), color.data(2)), | ||
color = Color(color.data.head, color.data(1), color.data(2)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above. For this particular change it hurts readibility more than anything, as previous code was more consistent. |
||
publicAddresses = addresses, | ||
globalFeatures = BinaryData(config.getString("global-features")), | ||
localFeatures = BinaryData(config.getString("local-features")), | ||
|
@@ -226,6 +228,7 @@ object NodeParams { | |
pendingRelayDb = pendingRelayDb, | ||
paymentsDb = paymentsDb, | ||
auditDb = auditDb, | ||
onChainRefundsDb = onChainRefundsDb, | ||
revocationTimeout = FiniteDuration(config.getDuration("revocation-timeout").getSeconds, TimeUnit.SECONDS), | ||
pingInterval = FiniteDuration(config.getDuration("ping-interval").getSeconds, TimeUnit.SECONDS), | ||
pingTimeout = FiniteDuration(config.getDuration("ping-timeout").getSeconds, TimeUnit.SECONDS), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAddress, No | |
import fr.acinq.eclair.{Kit, ShortChannelId, feerateByte2Kw} | ||
import grizzled.slf4j.Logging | ||
import org.json4s.JsonAST.{JBool, JInt, JString} | ||
import org.json4s.jackson.Serialization | ||
import org.json4s.{JValue, jackson} | ||
|
||
import scala.concurrent.duration._ | ||
|
@@ -231,6 +232,14 @@ trait Service extends Logging { | |
completeRpcFuture(req.id, (paymentHandler ? ReceivePayment(Some(MilliSatoshi(amountMsat.toLong)), description)).mapTo[PaymentRequest].map(PaymentRequest.write)) | ||
case JInt(amountMsat) :: JString(description) :: JInt(expirySeconds) :: Nil => | ||
completeRpcFuture(req.id, (paymentHandler ? ReceivePayment(Some(MilliSatoshi(amountMsat.toLong)), description, Some(expirySeconds.toLong))).mapTo[PaymentRequest].map(PaymentRequest.write)) | ||
case JString(description) :: JInt(expirySeconds) :: JString(fallbackAddress) :: Nil => | ||
val isFallbackAddressCorrect = Try(fr.acinq.eclair.addressToPublicKeyScript(fallbackAddress, nodeParams.chainHash)).isSuccess | ||
if (!isFallbackAddressCorrect) reject(RpcValidationRejection(req.id, s"invalid fallback address '$fallbackAddress'")) | ||
else completeRpcFuture(req.id, (paymentHandler ? ReceivePayment(None, description, Some(expirySeconds.toLong), Nil, Some(fallbackAddress))).mapTo[PaymentRequest].map(PaymentRequest.write)) | ||
case JInt(amountMsat) :: JString(description) :: JInt(expirySeconds) :: JString(fallbackAddress) :: Nil => | ||
val isFallbackAddressCorrect = Try(fr.acinq.eclair.addressToPublicKeyScript(fallbackAddress, nodeParams.chainHash)).isSuccess | ||
if (!isFallbackAddressCorrect) reject(RpcValidationRejection(req.id, s"invalid fallback address '$fallbackAddress'")) | ||
else completeRpcFuture(req.id, (paymentHandler ? ReceivePayment(Some(MilliSatoshi(amountMsat.toLong)), description, Some(expirySeconds.toLong), Nil, Some(fallbackAddress))).mapTo[PaymentRequest].map(PaymentRequest.write)) | ||
case _ => reject(UnknownParamsRejection(req.id, "[description] or [amount, description] or [amount, description, expiryDuration]")) | ||
} | ||
|
||
|
@@ -300,17 +309,47 @@ trait Service extends Logging { | |
|
||
// check received payments | ||
case "checkpayment" => req.params match { | ||
case JString(identifier) :: Nil => completeRpcFuture(req.id, for { | ||
paymentHash <- Try(PaymentRequest.read(identifier)) match { | ||
case Success(pr) => Future.successful(pr.paymentHash) | ||
case _ => Try(BinaryData(identifier)) match { | ||
case Success(s) => Future.successful(s) | ||
case _ => Future.failed(new IllegalArgumentException("payment identifier must be a payment request or a payment hash")) | ||
case JString(identifier) :: Nil => extractPaymentHash(identifier) match { | ||
case Success(hash) => completeRpcFuture(req.id, (paymentHandler ? CheckPayment(hash)).map(found => new JBool(found.asInstanceOf[Boolean]))) | ||
case _ => completeRpcFuture(req.id, Future.failed(new IllegalArgumentException("payment identifier must be a payment request or a payment hash"))) | ||
} | ||
case _ => reject(UnknownParamsRejection(req.id, "[paymentHash] or [paymentRequest]")) | ||
} | ||
|
||
case "receivedinfo" => | ||
req.params match { | ||
case JString(identifier) :: Nil => | ||
extractPaymentHash(identifier) match { | ||
case Success(hash) => | ||
// We may receive a payment and release a preimage but then a channel breaking may follow. | ||
// Same payment may be reported as lost on chain in local commit and settling on chain in remote commit. | ||
// This means we need to ask for on-chain settling payment first, then check if it has been lost on-chain, and only then check whether it has been received off-chain. | ||
kit.nodeParams.onChainRefundsDb.getSettlingOnChain(hash) orElse kit.nodeParams.onChainRefundsDb.getLostOnChain(hash) orElse kit.nodeParams.auditDb.receivedPaymentInfo(hash) match { | ||
case Some(paymentReceived) => completeRpcFuture(req.id, Future.successful(paymentReceived)) | ||
case None => completeRpcFuture(req.id, Future.failed(new IllegalArgumentException("no such payment received yet"))) | ||
} | ||
case _ => | ||
completeRpcFuture(req.id, Future.failed(new IllegalArgumentException("payment identifier must be a payment request or a payment hash"))) | ||
} | ||
case _ => | ||
reject(UnknownParamsRejection(req.id, "[paymentHash] or [paymentRequest]")) | ||
} | ||
|
||
case "sentinfo" => req.params match { | ||
case JString(identifier) :: Nil => | ||
extractPaymentHash(identifier) match { | ||
case Success(hash) => | ||
// We may send a payment, a channel breaking may follow with `PaymentSettlingOnChain` or `PaymentLostOnChain` following, but then remote peer may fetch a payment on-chain by revealing a preimage | ||
// This means we need to ask if a payment has been received off-chain first, then check whether it is settling on-chain, and only then check if it has been lost on-chain. | ||
kit.nodeParams.auditDb.sentPaymentInfo(hash) orElse kit.nodeParams.onChainRefundsDb.getSettlingOnChain(hash) orElse kit.nodeParams.onChainRefundsDb.getLostOnChain(hash) orElse kit.nodeParams.auditDb.failedPaymentInfo(hash) match { | ||
case Some(paymentSent) => completeRpcFuture(req.id, Future.successful(paymentSent)) | ||
case None => completeRpcFuture(req.id, Future.failed(new IllegalArgumentException("no such payment sent yet"))) | ||
} | ||
case _ => | ||
completeRpcFuture(req.id, Future.failed(new IllegalArgumentException("payment identifier must be a payment request or a payment hash"))) | ||
} | ||
found <- (paymentHandler ? CheckPayment(paymentHash)).map(found => new JBool(found.asInstanceOf[Boolean])) | ||
} yield found) | ||
case _ => reject(UnknownParamsRejection(req.id, "[paymentHash] or [paymentRequest]")) | ||
case _ => | ||
reject(UnknownParamsRejection(req.id, "[paymentHash] or [paymentRequest]")) | ||
} | ||
|
||
// retrieve audit events | ||
|
@@ -352,15 +391,27 @@ trait Service extends Logging { | |
|
||
def getInfoResponse: Future[GetInfoResponse] | ||
|
||
private def extractPaymentHash(identifier: String) = Try(PaymentRequest.read(identifier).paymentHash) orElse Try(BinaryData(identifier)) | ||
|
||
def makeSocketHandler(system: ActorSystem)(implicit materializer: ActorMaterializer): Flow[Message, TextMessage.Strict, NotUsed] = { | ||
|
||
// create a flow transforming a queue of string -> string | ||
val (flowInput, flowOutput) = Source.queue[String](10, OverflowStrategy.dropTail).toMat(BroadcastHub.sink[String])(Keep.both).run() | ||
|
||
// register an actor that feeds the queue when a payment is received | ||
// register an actor that feeds the queue on payment related events | ||
system.actorOf(Props(new Actor { | ||
override def preStart: Unit = context.system.eventStream.subscribe(self, classOf[PaymentReceived]) | ||
def receive: Receive = { case received: PaymentReceived => flowInput.offer(received.paymentHash.toString) } | ||
|
||
override def preStart: Unit = { | ||
context.system.eventStream.subscribe(self, classOf[PaymentFailed]) | ||
context.system.eventStream.subscribe(self, classOf[PaymentEvent]) | ||
} | ||
|
||
def receive: Receive = { | ||
case message: PaymentFailed => flowInput.offer(Serialization write message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The json serialization should probably be part of the flow instead of duplicating |
||
case message: PaymentEvent => flowInput.offer(Serialization write message) | ||
case other => logger.info(s"Unexpected ws message: $other") | ||
} | ||
|
||
})) | ||
|
||
Flow[Message] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you following intellij recommendations or some automated code modification suggestions? This makes review more difficult are there are lot of unrelated changes.
Can you revert and submit a separate PR for those?
Also by convention I Iike using
->
for defining tuples in the context of aMap
.