Skip to content

Commit

Permalink
Proper types for UNIX timestamps (#1990)
Browse files Browse the repository at this point in the history
We define `TimestampSecond` and `TimestampMilli` for second and millisecond precision UNIX-style timestamps.

Let me know what you think of the syntaxic sugar, I went for `123456 unixsec` and `123456789 unixms`.

Json serialization is as follows for resp. second and millisecond precision. Note that in both case we display the unix format in second precision, but the iso format is more precise:
```
{
  "iso": "2021-10-04T14:32:41Z",
  "unix": 1633357961
}
{
  "iso": "2021-10-04T14:32:41.456Z",
  "unix": 1633357961
}
```
  • Loading branch information
pm47 authored Oct 18, 2021
1 parent 9057c8e commit b4d285f
Show file tree
Hide file tree
Showing 82 changed files with 793 additions and 711 deletions.
40 changes: 40 additions & 0 deletions docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,46 @@

### API changes

#### Timestamps

All timestamps are now returned as an object with two attributes:
- `iso`: ISO-8601 format with GMT time zone. Precision may be second or millisecond depending on the timestamp.
- `unix`: seconds since epoch formats (seconds since epoch). Precision is always second.

Examples:
- second-precision timestamp:
- before:
```json
{
"timestamp": 1633357961
}
```
- after
```json
{
"timestamp": {
"iso": "2021-10-04T14:32:41Z",
"unix": 1633357961
}
}
```
- milli-second precision timestamp:
- before:
```json
{
"timestamp": 1633357961456
}
```
- after (note how the unix format is in second precision):
```json
{
"timestamp": {
"iso": "2021-10-04T14:32:41.456Z",
"unix": 1633357961
}
}
```

<insert changes>

### Miscellaneous improvements and bug fixes
Expand Down
55 changes: 17 additions & 38 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import akka.util.Timeout
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, Satoshi}
import fr.acinq.eclair.TimestampQueryFilters._
import fr.acinq.eclair.balance.CheckBalance.GlobalBalance
import fr.acinq.eclair.balance.{BalanceActor, ChannelsListener}
import fr.acinq.eclair.blockchain.OnChainWallet.OnChainBalance
Expand Down Expand Up @@ -57,25 +56,10 @@ case class GetInfoResponse(version: String, nodeId: PublicKey, alias: String, co

case class AuditResponse(sent: Seq[PaymentSent], received: Seq[PaymentReceived], relayed: Seq[PaymentRelayed])

case class TimestampQueryFilters(from: Long, to: Long)

case class SignedMessage(nodeId: PublicKey, message: String, signature: ByteVector)

case class VerifiedMessage(valid: Boolean, publicKey: PublicKey)

object TimestampQueryFilters {
/** We use this in the context of timestamp filtering, when we don't need an upper bound. */
val MaxEpochMilliseconds: Long = Duration.fromNanos(Long.MaxValue).toMillis

def getDefaultTimestampFilters(from_opt: Option[Long], to_opt: Option[Long]): TimestampQueryFilters = {
// NB: we expect callers to use seconds, but internally we use milli-seconds everywhere.
val from = from_opt.getOrElse(0L).seconds.toMillis
val to = to_opt.map(_.seconds.toMillis).getOrElse(MaxEpochMilliseconds)

TimestampQueryFilters(from, to)
}
}

object SignedMessage {
def signedBytes(message: ByteVector): ByteVector32 =
Crypto.hash256(ByteVector("Lightning Signed Message:".getBytes(StandardCharsets.UTF_8)) ++ message)
Expand Down Expand Up @@ -129,19 +113,19 @@ trait Eclair {

def sendToRoute(amount: MilliSatoshi, recipientAmount_opt: Option[MilliSatoshi], externalId_opt: Option[String], parentId_opt: Option[UUID], invoice: PaymentRequest, finalCltvExpiryDelta: CltvExpiryDelta, route: PredefinedRoute, trampolineSecret_opt: Option[ByteVector32] = None, trampolineFees_opt: Option[MilliSatoshi] = None, trampolineExpiryDelta_opt: Option[CltvExpiryDelta] = None, trampolineNodes_opt: Seq[PublicKey] = Nil)(implicit timeout: Timeout): Future[SendPaymentToRouteResponse]

def audit(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[AuditResponse]
def audit(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[AuditResponse]

def networkFees(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[NetworkFee]]
def networkFees(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[NetworkFee]]

def channelStats(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[Stats]]
def channelStats(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[Stats]]

def networkStats()(implicit timeout: Timeout): Future[Option[NetworkStats]]

def getInvoice(paymentHash: ByteVector32)(implicit timeout: Timeout): Future[Option[PaymentRequest]]

def pendingInvoices(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[PaymentRequest]]
def pendingInvoices(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[PaymentRequest]]

def allInvoices(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[PaymentRequest]]
def allInvoices(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[PaymentRequest]]

def allChannels()(implicit timeout: Timeout): Future[Iterable[ChannelDesc]]

Expand Down Expand Up @@ -380,35 +364,30 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
appKit.nodeParams.db.payments.getIncomingPayment(paymentHash)
}

override def audit(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[AuditResponse] = {
val filter = getDefaultTimestampFilters(from_opt, to_opt)
override def audit(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[AuditResponse] = {
Future(AuditResponse(
sent = appKit.nodeParams.db.audit.listSent(filter.from, filter.to),
received = appKit.nodeParams.db.audit.listReceived(filter.from, filter.to),
relayed = appKit.nodeParams.db.audit.listRelayed(filter.from, filter.to)
sent = appKit.nodeParams.db.audit.listSent(from.toTimestampMilli, to.toTimestampMilli),
received = appKit.nodeParams.db.audit.listReceived(from.toTimestampMilli, to.toTimestampMilli),
relayed = appKit.nodeParams.db.audit.listRelayed(from.toTimestampMilli, to.toTimestampMilli)
))
}

override def networkFees(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[NetworkFee]] = {
val filter = getDefaultTimestampFilters(from_opt, to_opt)
Future(appKit.nodeParams.db.audit.listNetworkFees(filter.from, filter.to))
override def networkFees(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[NetworkFee]] = {
Future(appKit.nodeParams.db.audit.listNetworkFees(from.toTimestampMilli, to.toTimestampMilli))
}

override def channelStats(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[Stats]] = {
val filter = getDefaultTimestampFilters(from_opt, to_opt)
Future(appKit.nodeParams.db.audit.stats(filter.from, filter.to))
override def channelStats(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[Stats]] = {
Future(appKit.nodeParams.db.audit.stats(from.toTimestampMilli, to.toTimestampMilli))
}

override def networkStats()(implicit timeout: Timeout): Future[Option[NetworkStats]] = (appKit.router ? GetNetworkStats).mapTo[GetNetworkStatsResponse].map(_.stats)

override def allInvoices(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[PaymentRequest]] = Future {
val filter = getDefaultTimestampFilters(from_opt, to_opt)
appKit.nodeParams.db.payments.listIncomingPayments(filter.from, filter.to).map(_.paymentRequest)
override def allInvoices(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[PaymentRequest]] = Future {
appKit.nodeParams.db.payments.listIncomingPayments(from.toTimestampMilli, to.toTimestampMilli).map(_.paymentRequest)
}

override def pendingInvoices(from_opt: Option[Long], to_opt: Option[Long])(implicit timeout: Timeout): Future[Seq[PaymentRequest]] = Future {
val filter = getDefaultTimestampFilters(from_opt, to_opt)
appKit.nodeParams.db.payments.listPendingIncomingPayments(filter.from, filter.to).map(_.paymentRequest)
override def pendingInvoices(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[PaymentRequest]] = Future {
appKit.nodeParams.db.payments.listPendingIncomingPayments(from.toTimestampMilli, to.toTimestampMilli).map(_.paymentRequest)
}

override def getInvoice(paymentHash: ByteVector32)(implicit timeout: Timeout): Future[Option[PaymentRequest]] = Future {
Expand Down
59 changes: 59 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/Timestamp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2019 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair

import java.sql
import java.time.Instant
import scala.concurrent.duration.{DurationLong, FiniteDuration}

case class TimestampSecond(private val underlying: Long) extends Ordered[TimestampSecond] {
// @formatter:off
def toLong: Long = underlying
def toTimestampMilli: TimestampMilli = TimestampMilli(underlying * 1000)
def toSqlTimestamp: sql.Timestamp = sql.Timestamp.from(Instant.ofEpochSecond(underlying))
override def toString: String = s"$underlying unixsec"
override def compare(that: TimestampSecond): Int = underlying.compareTo(that.underlying)
def +(x: Long): TimestampSecond = TimestampSecond(underlying + x)
def -(x: Long): TimestampSecond = TimestampSecond(underlying - x)
def +(x: FiniteDuration): TimestampSecond = TimestampSecond(underlying + x.toSeconds)
def -(x: FiniteDuration): TimestampSecond = TimestampSecond(underlying - x.toSeconds)
def -(x: TimestampSecond): FiniteDuration = (underlying - x.underlying).seconds
// @formatter:on
}

object TimestampSecond {
def now(): TimestampSecond = TimestampSecond(System.currentTimeMillis() / 1000)
}

case class TimestampMilli(private val underlying: Long) extends Ordered[TimestampMilli] {
// @formatter:off
def toLong: Long = underlying
def toSqlTimestamp: sql.Timestamp = sql.Timestamp.from(Instant.ofEpochMilli(underlying))
override def toString: String = s"$underlying unixms"
override def compare(that: TimestampMilli): Int = underlying.compareTo(that.underlying)
def +(x: FiniteDuration): TimestampMilli = TimestampMilli(underlying + x.toMillis)
def -(x: FiniteDuration): TimestampMilli = TimestampMilli(underlying - x.toMillis)
def -(x: TimestampMilli): FiniteDuration = (underlying - x.underlying).millis
// @formatter:on
}

object TimestampMilli {
// @formatter:off
def now(): TimestampMilli = TimestampMilli(System.currentTimeMillis())
def fromSqlTimestamp(sqlTs: sql.Timestamp): TimestampMilli = TimestampMilli(sqlTs.getTime)
// @formatter:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import fr.acinq.eclair.{KamonExt, NodeParams, ShortChannelId}
import fr.acinq.eclair.{KamonExt, NodeParams, ShortChannelId, TimestampSecond}

import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._
Expand Down Expand Up @@ -68,7 +68,7 @@ object ZmqWatcher {
final case class ValidateResult(c: ChannelAnnouncement, fundingTx: Either[Throwable, (Transaction, UtxoStatus)])

final case class GetTxWithMeta(replyTo: ActorRef[GetTxWithMetaResponse], txid: ByteVector32) extends Command
final case class GetTxWithMetaResponse(txid: ByteVector32, tx_opt: Option[Transaction], lastBlockTimestamp: Long)
final case class GetTxWithMetaResponse(txid: ByteVector32, tx_opt: Option[Transaction], lastBlockTimestamp: TimestampSecond)

sealed trait UtxoStatus
object UtxoStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package fr.acinq.eclair.blockchain.bitcoind.rpc
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.TxCoordinates
import fr.acinq.eclair.{TimestampSecond, TxCoordinates}
import fr.acinq.eclair.blockchain.OnChainWallet
import fr.acinq.eclair.blockchain.OnChainWallet.{MakeFundingTxResponse, OnChainBalance}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
Expand Down Expand Up @@ -62,7 +62,7 @@ class BitcoinCoreClient(val rpcClient: BitcoinJsonRPCClient) extends OnChainWall
tx_opt <- getTransaction(txid).map(Some(_)).recover { case _ => None }
blockchainInfo <- rpcClient.invoke("getblockchaininfo")
JInt(timestamp) = blockchainInfo \ "mediantime"
} yield GetTxWithMetaResponse(txid, tx_opt, timestamp.toLong)
} yield GetTxWithMetaResponse(txid, tx_opt, TimestampSecond(timestamp.toLong))

/** Get the number of confirmations of a given transaction. */
def getTxConfirmations(txid: ByteVector32)(implicit ec: ExecutionContext): Future[Option[Int]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
originChannels = Map.empty,
remoteNextCommitInfo = Right(randomKey().publicKey), // we will receive their next per-commitment point in the next message, so we temporarily put a random byte array
commitInput, ShaChain.init)
val now = System.currentTimeMillis.milliseconds.toSeconds
val blockHeight = nodeParams.currentBlockHeight
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
watchFundingTx(commitments)
Expand All @@ -567,7 +567,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
}
}

goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, Some(fundingTx), now, None, Left(fundingCreated)) storing() calling publishFundingTx()
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, Some(fundingTx), blockHeight, None, Left(fundingCreated)) storing() calling publishFundingTx()
}

case Event(c: CloseCommand, d: DATA_WAIT_FOR_FUNDING_SIGNED) =>
Expand Down Expand Up @@ -1019,7 +1019,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
goto(NORMAL) using d.copy(channelUpdate = channelUpdate1) storing()

case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) =>
val age = System.currentTimeMillis.milliseconds - d.channelUpdate.timestamp.seconds
val age = TimestampSecond.now() - d.channelUpdate.timestamp
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments))
reason match {
case Reconnected if d.commitments.announceChannel && Announcements.areSame(channelUpdate1, d.channelUpdate) && age < REFRESH_CHANNEL_UPDATE_INTERVAL =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ object Helpers {
*
* @return the delay until the next update
*/
def nextChannelUpdateRefresh(currentUpdateTimestamp: Long)(implicit log: DiagnosticLoggingAdapter): FiniteDuration = {
val age = System.currentTimeMillis.milliseconds - currentUpdateTimestamp.seconds
def nextChannelUpdateRefresh(currentUpdateTimestamp: TimestampSecond)(implicit log: DiagnosticLoggingAdapter): FiniteDuration = {
val age = TimestampSecond.now() - currentUpdateTimestamp
val delay = 0.days.max(REFRESH_CHANNEL_UPDATE_INTERVAL - age)
Logs.withMdc(log)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.debug("current channel_update was created {} days ago, will refresh it in {} days", age.toDays, delay.toDays)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto}
import fr.acinq.eclair.TimestampMilli
import fr.acinq.eclair.blockchain.NewBlock
import fr.acinq.eclair.channel.ChannelSignatureReceived
import fr.acinq.eclair.io.PeerConnected
Expand All @@ -47,7 +48,7 @@ object WeakEntropyPool {
sealed trait Command
private case object FlushEntropy extends Command
private case class WrappedNewBlock(blockHash: ByteVector32) extends Command
private case class WrappedPaymentRelayed(paymentHash: ByteVector32, relayedAt: Long) extends Command
private case class WrappedPaymentRelayed(paymentHash: ByteVector32, relayedAt: TimestampMilli) extends Command
private case class WrappedPeerConnected(nodeId: PublicKey) extends Command
private case class WrappedChannelSignature(wtxid: ByteVector32) extends Command
private case class WrappedNodeUpdated(sig: ByteVector64) extends Command
Expand Down Expand Up @@ -80,7 +81,7 @@ object WeakEntropyPool {

case WrappedNewBlock(blockHash) => collecting(collector, collect(entropy_opt, blockHash ++ ByteVector.fromLong(System.currentTimeMillis())))

case WrappedPaymentRelayed(paymentHash, relayedAt) => collecting(collector, collect(entropy_opt, paymentHash ++ ByteVector.fromLong(relayedAt)))
case WrappedPaymentRelayed(paymentHash, relayedAt) => collecting(collector, collect(entropy_opt, paymentHash ++ ByteVector.fromLong(relayedAt.toLong)))

case WrappedPeerConnected(nodeId) => collecting(collector, collect(entropy_opt, nodeId.value ++ ByteVector.fromLong(System.currentTimeMillis())))

Expand Down
13 changes: 7 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fr.acinq.eclair.db

import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi}
import fr.acinq.eclair.{TimestampMilli, TimestampSecond}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
Expand Down Expand Up @@ -45,21 +46,21 @@ trait AuditDb extends Closeable {

def addPathFindingExperimentMetrics(metrics: PathFindingExperimentMetrics): Unit

def listSent(from: Long, to: Long): Seq[PaymentSent]
def listSent(from: TimestampMilli, to: TimestampMilli): Seq[PaymentSent]

def listReceived(from: Long, to: Long): Seq[PaymentReceived]
def listReceived(from: TimestampMilli, to: TimestampMilli): Seq[PaymentReceived]

def listRelayed(from: Long, to: Long): Seq[PaymentRelayed]
def listRelayed(from: TimestampMilli, to: TimestampMilli): Seq[PaymentRelayed]

def listNetworkFees(from: Long, to: Long): Seq[NetworkFee]
def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee]

def stats(from: Long, to: Long): Seq[Stats]
def stats(from: TimestampMilli, to: TimestampMilli): Seq[Stats]

}

object AuditDb {

case class NetworkFee(remoteNodeId: PublicKey, channelId: ByteVector32, txId: ByteVector32, fee: Satoshi, txType: String, timestamp: Long)
case class NetworkFee(remoteNodeId: PublicKey, channelId: ByteVector32, txId: ByteVector32, fee: Satoshi, txType: String, timestamp: TimestampMilli)

case class Stats(channelId: ByteVector32, direction: String, avgPaymentAmount: Satoshi, paymentCount: Int, relayFee: Satoshi, networkFee: Satoshi)

Expand Down
Loading

0 comments on commit b4d285f

Please sign in to comment.