Skip to content

Commit

Permalink
Various fix and improvements in time/timestamp handling (#971)
Browse files Browse the repository at this point in the history
This PR standardizes the way we compute the current time as unix timestamp 

- Scala's Platform is used and the conversion is done via scala's concurrent.duration facilities
- Java's Instant has been replaced due to broken compatibility with android
- AuditDB events use milliseconds (fixes #970)
- PaymentDB events use milliseconds
- Query filters for AuditDB and PaymentDB use seconds
  • Loading branch information
araspitzu authored and pm47 committed Apr 26, 2019
1 parent 9c41ee5 commit 83dc817
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// this is a bit tricky: let's say we shut down eclair right after someone opened a channel to us, and didn't start it up before a very long time
// we don't want the timeout to expire right away, because the watcher could be syncing or be busy, and may only notice the funding tx after some time
// so we always give us 10 minutes before doing anything
val delay = Funding.computeFundingTimeout(Platform.currentTime / 1000, funding.waitingSince, delay = FUNDING_TIMEOUT_FUNDEE, minDelay = 10 minutes)
val delay = Funding.computeFundingTimeout(Platform.currentTime.milliseconds.toSeconds, funding.waitingSince, delay = FUNDING_TIMEOUT_FUNDEE, minDelay = 10 minutes)
context.system.scheduler.scheduleOnce(delay, self, BITCOIN_FUNDING_TIMEOUT)
}
goto(OFFLINE) using data
Expand Down Expand Up @@ -416,7 +416,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
log.info(s"waiting for them to publish the funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
blockchain ! WatchSpent(self, commitInput.outPoint.txid, commitInput.outPoint.index.toInt, commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT) // TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchConfirmed(self, commitInput.outPoint.txid, commitments.commitInput.txOut.publicKeyScript, nodeParams.minDepthBlocks, BITCOIN_FUNDING_DEPTHOK)
val now = Platform.currentTime / 1000
val now = Platform.currentTime.milliseconds.toSeconds
context.system.scheduler.scheduleOnce(FUNDING_TIMEOUT_FUNDEE, self, BITCOIN_FUNDING_TIMEOUT)
goto(WAIT_FOR_FUNDING_CONFIRMED) using store(DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, None, now, None, Right(fundingSigned))) sending fundingSigned
}
Expand Down Expand Up @@ -448,7 +448,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
originChannels = Map.empty,
remoteNextCommitInfo = Right(randomKey.publicKey), // we will receive their next per-commitment point in the next message, so we temporarily put a random byte array
commitInput, ShaChain.init, channelId = channelId)
val now = Platform.currentTime / 1000
val now = Platform.currentTime.milliseconds.toSeconds
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
// we do this to make sure that the channel state has been written to disk when we publish the funding tx
Expand Down Expand Up @@ -1467,7 +1467,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
}
// re-enable the channel
val timestamp = Platform.currentTime / 1000 match {
val timestamp = Platform.currentTime.milliseconds.toSeconds match {
case ts if ts == d.channelUpdate.timestamp => ts + 1 // corner case: in case of quick reconnection, we bump the timestamp of the new channel_update, otherwise it will get ignored by the network
case ts => ts
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ package fr.acinq.eclair.db.sqlite

import java.sql.{Connection, Statement}
import java.util.UUID

import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.MilliSatoshi
import fr.acinq.eclair.channel.{AvailableBalanceChanged, Channel, ChannelErrorOccured, NetworkFeePaid}
import fr.acinq.eclair.db.{AuditDb, ChannelLifecycleEvent, NetworkFee, Stats}
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.wire.ChannelCodecs
import grizzled.slf4j.Logging

import scala.collection.immutable.Queue
import scala.compat.Platform
import concurrent.duration._

class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {

Expand Down Expand Up @@ -164,8 +163,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {

override def listSent(from: Long, to: Long): Seq[PaymentSent] =
using(sqlite.prepareStatement("SELECT * FROM sent WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(1, from.seconds.toMillis)
statement.setLong(2, to.seconds.toMillis)
val rs = statement.executeQuery()
var q: Queue[PaymentSent] = Queue()
while (rs.next()) {
Expand All @@ -183,8 +182,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {

override def listReceived(from: Long, to: Long): Seq[PaymentReceived] =
using(sqlite.prepareStatement("SELECT * FROM received WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(1, from.seconds.toMillis)
statement.setLong(2, to.seconds.toMillis)
val rs = statement.executeQuery()
var q: Queue[PaymentReceived] = Queue()
while (rs.next()) {
Expand All @@ -199,8 +198,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {

override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] =
using(sqlite.prepareStatement("SELECT * FROM relayed WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(1, from.seconds.toMillis)
statement.setLong(2, to.seconds.toMillis)
val rs = statement.executeQuery()
var q: Queue[PaymentRelayed] = Queue()
while (rs.next()) {
Expand All @@ -217,8 +216,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {

override def listNetworkFees(from: Long, to: Long): Seq[NetworkFee] =
using(sqlite.prepareStatement("SELECT * FROM network_fees WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(1, from.seconds.toMillis)
statement.setLong(2, to.seconds.toMillis)
val rs = statement.executeQuery()
var q: Queue[NetworkFee] = Queue()
while (rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fr.acinq.eclair.db.sqlite

import java.sql.Connection
import java.time.Instant
import java.util.UUID
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.db.sqlite.SqliteUtils._
Expand All @@ -26,6 +25,8 @@ import fr.acinq.eclair.payment.PaymentRequest
import grizzled.slf4j.Logging
import scala.collection.immutable.Queue
import OutgoingPaymentStatus._
import concurrent.duration._
import scala.compat.Platform

class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {

Expand Down Expand Up @@ -58,7 +59,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
require((newStatus == SUCCEEDED && preimage.isDefined) || (newStatus == FAILED && preimage.isEmpty), "Wrong combination of state/preimage")

using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, preimage, status) = (?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
statement.setLong(1, Instant.now().getEpochSecond)
statement.setLong(1, Platform.currentTime)
statement.setBytes(2, if (preimage.isEmpty) null else preimage.get.toArray)
statement.setString(3, newStatus.toString)
statement.setString(4, id.toString)
Expand Down Expand Up @@ -135,8 +136,8 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
statement.setBytes(1, pr.paymentHash.toArray)
statement.setBytes(2, preimage.toArray)
statement.setString(3, PaymentRequest.write(pr))
statement.setLong(4, pr.timestamp)
pr.expiry.foreach { ex => statement.setLong(5, pr.timestamp + ex) } // we store "when" the invoice will expire
statement.setLong(4, pr.timestamp.seconds.toMillis) // BOLT11 timestamp is in seconds
pr.expiry.foreach { ex => statement.setLong(5, pr.timestamp.seconds.toMillis + ex.seconds.toMillis) } // we store "when" the invoice will expire, in milliseconds
statement.executeUpdate()
}
}
Expand Down Expand Up @@ -178,9 +179,9 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
}

using(sqlite.prepareStatement(queryStmt)) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
if (pendingOnly) statement.setLong(3, Instant.now().getEpochSecond)
statement.setLong(1, from.seconds.toMillis)
statement.setLong(2, to.seconds.toMillis)
if (pendingOnly) statement.setLong(3, Platform.currentTime)

val rs = statement.executeQuery()
var q: Queue[PaymentRequest] = Queue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import fr.acinq.eclair.db.IncomingPayment
import fr.acinq.eclair.payment.PaymentLifecycle.ReceivePayment
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{Globals, NodeParams, randomBytes32}

import concurrent.duration._
import scala.compat.Platform
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -75,7 +75,7 @@ class LocalPaymentHandler(nodeParams: NodeParams) extends Actor with ActorLoggin
case _ =>
log.info(s"received payment for paymentHash=${htlc.paymentHash} amountMsat=${htlc.amountMsat}")
// amount is correct or was not specified in the payment request
nodeParams.db.payments.addIncomingPayment(IncomingPayment(htlc.paymentHash, htlc.amountMsat, Platform.currentTime / 1000))
nodeParams.db.payments.addIncomingPayment(IncomingPayment(htlc.paymentHash, htlc.amountMsat, Platform.currentTime))
sender ! CMD_FULFILL_HTLC(htlc.id, paymentPreimage, commit = true)
context.system.eventStream.publish(PaymentReceived(MilliSatoshi(htlc.amountMsat), htlc.paymentHash, htlc.channelId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package fr.acinq.eclair.payment

import java.time.Instant
import java.util.UUID
import fr.acinq.bitcoin.{ByteVector32, MilliSatoshi}
import scala.compat.Platform

/**
* Created by PM on 01/02/2017.
Expand All @@ -27,10 +27,10 @@ sealed trait PaymentEvent {
val paymentHash: ByteVector32
}

case class PaymentSent(id: UUID, amount: MilliSatoshi, feesPaid: MilliSatoshi, paymentHash: ByteVector32, paymentPreimage: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Instant.now().getEpochSecond) extends PaymentEvent
case class PaymentSent(id: UUID, amount: MilliSatoshi, feesPaid: MilliSatoshi, paymentHash: ByteVector32, paymentPreimage: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Instant.now().getEpochSecond) extends PaymentEvent
case class PaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentReceived(amount: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, timestamp: Long = Instant.now().getEpochSecond) extends PaymentEvent
case class PaymentReceived(amount: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentSettlingOnChain(id: UUID, amount: MilliSatoshi, paymentHash: ByteVector32, timestamp: Long = Instant.now().getEpochSecond) extends PaymentEvent
case class PaymentSettlingOnChain(id: UUID, amount: MilliSatoshi, paymentHash: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package fr.acinq.eclair.payment

import java.time.Instant
import java.util.UUID

import akka.actor.{ActorRef, FSM, Props, Status}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, MilliSatoshi}
Expand All @@ -33,6 +31,8 @@ import fr.acinq.eclair.router._
import fr.acinq.eclair.wire._
import scodec.Attempt
import scodec.bits.ByteVector
import concurrent.duration._
import scala.compat.Platform
import scala.util.{Failure, Success}

/**
Expand All @@ -47,7 +47,7 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
when(WAITING_FOR_REQUEST) {
case Event(c: SendPayment, WaitingForRequest) =>
router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.amountMsat, c.assistedRoutes, routeParams = c.routeParams)
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.amountMsat, Instant.now().getEpochSecond, None, OutgoingPaymentStatus.PENDING))
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.amountMsat, Platform.currentTime, None, OutgoingPaymentStatus.PENDING))
goto(WAITING_FOR_ROUTE) using WaitingForRoute(sender, c, failures = Nil)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import fr.acinq.eclair.wire._
import fr.acinq.eclair.{ShortChannelId, serializationResult}
import scodec.bits.{BitVector, ByteVector}
import shapeless.HNil

import scala.concurrent.duration._
import scala.compat.Platform


/**
* Created by PM on 03/02/2017.
*/
Expand Down Expand Up @@ -73,7 +72,7 @@ object Announcements {
)
}

def makeNodeAnnouncement(nodeSecret: PrivateKey, alias: String, color: Color, nodeAddresses: List[NodeAddress], timestamp: Long = Platform.currentTime / 1000): NodeAnnouncement = {
def makeNodeAnnouncement(nodeSecret: PrivateKey, alias: String, color: Color, nodeAddresses: List[NodeAddress], timestamp: Long = Platform.currentTime.milliseconds.toSeconds): NodeAnnouncement = {
require(alias.size <= 32)
val witness = nodeAnnouncementWitnessEncode(timestamp, nodeSecret.publicKey, color, alias, ByteVector.empty, nodeAddresses)
val sig = Crypto.encodeSignature(Crypto.sign(witness, nodeSecret)) :+ 1.toByte
Expand Down Expand Up @@ -119,7 +118,7 @@ object Announcements {

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()

def makeChannelUpdate(chainHash: ByteVector32, nodeSecret: PrivateKey, remoteNodeId: PublicKey, shortChannelId: ShortChannelId, cltvExpiryDelta: Int, htlcMinimumMsat: Long, feeBaseMsat: Long, feeProportionalMillionths: Long, htlcMaximumMsat: Long, enable: Boolean = true, timestamp: Long = Platform.currentTime / 1000): ChannelUpdate = {
def makeChannelUpdate(chainHash: ByteVector32, nodeSecret: PrivateKey, remoteNodeId: PublicKey, shortChannelId: ShortChannelId, cltvExpiryDelta: Int, htlcMinimumMsat: Long, feeBaseMsat: Long, feeProportionalMillionths: Long, htlcMaximumMsat: Long, enable: Boolean = true, timestamp: Long = Platform.currentTime.milliseconds.toSeconds): ChannelUpdate = {
val messageFlags = makeMessageFlags(hasOptionChannelHtlcMax = true) // NB: we always support option_channel_htlc_max
val channelFlags = makeChannelFlags(isNode1 = isNode1(nodeSecret.publicKey, remoteNodeId), enable = enable)
val htlcMaximumMsatOpt = Some(htlcMaximumMsat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
// the first_timestamp field to the current date/time and timestamp_range to the maximum value
// NB: we can't just set firstTimestamp to 0, because in that case peer would send us all past messages matching
// that (i.e. the whole routing table)
val filter = GossipTimestampFilter(nodeParams.chainHash, firstTimestamp = Platform.currentTime / 1000, timestampRange = Int.MaxValue)
val filter = GossipTimestampFilter(nodeParams.chainHash, firstTimestamp = Platform.currentTime.milliseconds.toSeconds, timestampRange = Int.MaxValue)
remote ! filter

// clean our sync state for this peer: we receive a SendChannelQuery just when we connect/reconnect to a peer and
Expand Down Expand Up @@ -738,7 +738,7 @@ object Router {
def toFakeUpdate(extraHop: ExtraHop): ChannelUpdate =
// the `direction` bit in flags will not be accurate but it doesn't matter because it is not used
// what matters is that the `disable` bit is 0 so that this update doesn't get filtered out
ChannelUpdate(signature = ByteVector.empty, chainHash = ByteVector32.Zeroes, extraHop.shortChannelId, Platform.currentTime / 1000, messageFlags = 0, channelFlags = 0, extraHop.cltvExpiryDelta, htlcMinimumMsat = 0L, extraHop.feeBaseMsat, extraHop.feeProportionalMillionths, None)
ChannelUpdate(signature = ByteVector.empty, chainHash = ByteVector32.Zeroes, extraHop.shortChannelId, Platform.currentTime.milliseconds.toSeconds, messageFlags = 0, channelFlags = 0, extraHop.cltvExpiryDelta, htlcMinimumMsat = 0L, extraHop.feeBaseMsat, extraHop.feeProportionalMillionths, None)

def toFakeUpdates(extraRoute: Seq[ExtraHop], targetNodeId: PublicKey): Map[ChannelDesc, ChannelUpdate] = {
// BOLT 11: "For each entry, the pubkey is the node ID of the start of the channel", and the last node is the destination
Expand All @@ -760,7 +760,7 @@ object Router {
def isStale(u: ChannelUpdate): Boolean = {
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks (1209600 seconds)"
// but we don't want to prune brand new channels for which we didn't yet receive a channel update
val staleThresholdSeconds = Platform.currentTime / 1000 - 1209600
val staleThresholdSeconds = (Platform.currentTime.milliseconds - 14.days).toSeconds
u.timestamp < staleThresholdSeconds
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import grizzled.slf4j.Logging
import scodec.bits.BitVector
import scodec.codecs._
import scodec.{Attempt, Codec}
import scala.concurrent.duration._
import scala.compat.Platform

/**
* Created by PM on 02/06/2017.
Expand Down Expand Up @@ -237,7 +239,7 @@ object ChannelCodecs extends Logging {
val DATA_WAIT_FOR_FUNDING_CONFIRMED_COMPAT_01_Codec: Codec[DATA_WAIT_FOR_FUNDING_CONFIRMED] = (
("commitments" | commitmentsCodec) ::
("fundingTx" | provide[Option[Transaction]](None)) ::
("waitingSince" | provide(compat.Platform.currentTime / 1000)) ::
("waitingSince" | provide(Platform.currentTime.milliseconds.toSeconds)) ::
("deferred" | optional(bool, fundingLockedCodec)) ::
("lastSent" | either(bool, fundingCreatedCodec, fundingSignedCodec))).as[DATA_WAIT_FOR_FUNDING_CONFIRMED].decodeOnly

Expand Down
Loading

0 comments on commit 83dc817

Please sign in to comment.