diff --git a/CHANGELOG.md b/CHANGELOG.md index 411bbfb..ea537d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). -## Unreleased +## [24.11.0] 2024-11-14 ### Changed - Upgrade to Spark 3.5.3 - Upgrade DataStax Spark Cassandra connector to 3.5.1 +### Added +- max-block cli parameter for utxo currencies and eth to test with smaller datasets. ## [24.02.0] 2024-03-04 ### Fixed diff --git a/Makefile b/Makefile index 0a862f4..17e43ab 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -RELEASE := 'v24.02.0' +RELEASE := 'v24.11.0' # RELEASESEM := 'v1.6.2' all: format lint build diff --git a/src/main/scala/org/graphsense/account/eth/Job.scala b/src/main/scala/org/graphsense/account/eth/Job.scala index ab6f77b..b5a0e67 100644 --- a/src/main/scala/org/graphsense/account/eth/Job.scala +++ b/src/main/scala/org/graphsense/account/eth/Job.scala @@ -67,14 +67,21 @@ class EthereumJob( val maxBlockExchangeRates = exchangeRates.select(max(col("blockId"))).first.getInt(0) + + val maxBlockToProcess = + Math.min( + maxBlockExchangeRates, + to.getOrElse(maxBlockExchangeRates) + ) + val blocksFiltered = - blocks.filter(col("blockId") <= maxBlockExchangeRates).persist() + blocks.filter(col("blockId") <= maxBlockToProcess).persist() val transactionsFiltered = - transactions.filter(col("blockId") <= maxBlockExchangeRates).persist() + transactions.filter(col("blockId") <= maxBlockToProcess).persist() val tracesFiltered = - traces.filter(col("blockId") <= maxBlockExchangeRates).persist() + traces.filter(col("blockId") <= maxBlockToProcess).persist() val tokenTransfersFiltered = tokenTransfers - .filter(col("blockId") <= maxBlockExchangeRates) + .filter(col("blockId") <= maxBlockToProcess) .persist() val maxBlock = blocksFiltered @@ -88,11 +95,11 @@ class EthereumJob( val maxBlockDatetime = maxBlock.select(col("maxBlockDatetime")).first.getString(0) - val noBlocks = maxBlockExchangeRates.toLong + 1 + val noBlocks = maxBlockToProcess.toLong + 1 val noTransactions = transactionsFiltered.count() println(s"Max block timestamp: ${maxBlockDatetime}") - println(s"Max block ID: ${maxBlockExchangeRates}") + println(s"Max block ID: ${maxBlockToProcess}") println(s"Max transaction ID: ${noTransactions - 1}") println("Computing transaction IDs") diff --git a/src/main/scala/org/graphsense/utxo/Config.scala b/src/main/scala/org/graphsense/utxo/Config.scala index 0b7d59e..4aee9ee 100644 --- a/src/main/scala/org/graphsense/utxo/Config.scala +++ b/src/main/scala/org/graphsense/utxo/Config.scala @@ -56,5 +56,11 @@ class UtxoConf(arguments: Seq[String]) extends ScallopConf(arguments) { noshort = true, descr = "Spark checkpoint directory (HFDS in non-local mode)" ) + val maxBlock: ScallopOption[Int] = opt[Int]( + "max-block", + default = None, + noshort = true, + descr = "Max block that is used for the transformation job" + ) verify() } diff --git a/src/main/scala/org/graphsense/utxo/TransformationJob.scala b/src/main/scala/org/graphsense/utxo/TransformationJob.scala index f170ed7..0e6a328 100644 --- a/src/main/scala/org/graphsense/utxo/TransformationJob.scala +++ b/src/main/scala/org/graphsense/utxo/TransformationJob.scala @@ -71,11 +71,18 @@ object TransformationJob { val maxBlockExchangeRates = exchangeRates.select(max(col(F.blockId))).first.getInt(0) + + val maxBlockToProcess = + Math.min( + maxBlockExchangeRates, + conf.maxBlock.toOption.getOrElse(maxBlockExchangeRates) + ) + val transactionsFiltered = - transactions.filter(col(F.blockId) <= maxBlockExchangeRates).persist() + transactions.filter(col(F.blockId) <= maxBlockToProcess).persist() val maxBlock = blocks - .filter(col(F.blockId) <= maxBlockExchangeRates) + .filter(col(F.blockId) <= maxBlockToProcess) .select( max(col(F.blockId)).as("maxBlockId"), max(col(F.timestamp)).as("maxBlockTimestamp") @@ -87,11 +94,11 @@ object TransformationJob { maxBlock.select(col("maxBlockDatetime")).first.getString(0) val maxTransactionId = transactionsFiltered.select(max(F.txId)).first.getLong(0) - val noBlocks = maxBlockExchangeRates + 1 + val noBlocks = maxBlockToProcess + 1 val noTransactions = maxTransactionId + 1 println(s"Max block timestamp: ${maxBlockDatetime}") - println(s"Max block ID: ${maxBlockExchangeRates}") + println(s"Max block ID: ${maxBlockToProcess}") println(s"Max transaction ID: ${maxTransactionId}") println("Extracting transaction inputs")