diff --git a/src/main/scala/org/graphsense/account/trx_test/Job.scala b/src/main/scala/org/graphsense/account/trx_test/Job.scala deleted file mode 100644 index 06e8357..0000000 --- a/src/main/scala/org/graphsense/account/trx_test/Job.scala +++ /dev/null @@ -1,554 +0,0 @@ -package org.graphsense.account.trxTest - -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.Encoder -import org.graphsense.Job -import org.graphsense.account.config.AccountConfig -import org.graphsense.TransformHelpers -import org.graphsense.Util._ -import org.graphsense.storage.CassandraStorage -import org.graphsense.account.trx.{TrxSource, TrxTransformation} - -class TronExperimentJob( - spark: SparkSession, - source: TrxSource, - cassandra: CassandraStorage, - config: AccountConfig -) extends Job { - import spark.implicits._ - - new TrxTransformation(spark, config.bucketSize()) - - config.debug() - - val base_path = - config.cacheDirectory.toOption.map(dir => dir + config.targetKeyspace()) - - def computeCached[ - R: Encoder - ]( - dataset_name: String - )(block: => Dataset[R]): Dataset[R] = { - TransformHelpers.computeCached(base_path, spark)(dataset_name)(block) - } - - def timeJob[R](title: String)(block: => R): R = { - spark.sparkContext.setJobDescription(title) - time(title)(block) - } - - // def prepareAndLoad(): ( - // Dataset[ExchangeRates], - // Dataset[Block], - // Dataset[Transaction], - // Dataset[TxFee], - // Dataset[Trace], - // Dataset[TokenTransfer], - // Dataset[TokenConfiguration], - // Long, - // Long, - // Int, - // Long - // ) = { - // val exchangeRatesRaw = source.exchangeRates() - // val blocks = source.blocks() - // val tokenConfigurations = source.tokenConfigurations().persist() - - // timeJob("Store configuration") { - // val conf = transformation.configuration( - // config.targetKeyspace(), - // config.bucketSize(), - // config.txPrefixLength(), - // config.addressPrefixLength(), - // TransformHelpers.getFiatCurrencies(exchangeRatesRaw) - // ) - - // cassandra.store(keyspace, "configuration", conf.withColumn("addressHashSeed")) - // } - - // timeJob("Store token configuration") { - // sink.saveTokenConfiguration(tokenConfigurations) - // } - - // val exchangeRates = timeJob("Computing exchange rates") { - // val rates = - // transformation - // .computeExchangeRates(blocks, exchangeRatesRaw) - // .persist() - - // sink.saveExchangeRates(rates) - // rates - // } - - // val minBlockToProcess = config.minBlock.toOption match { - // case None => exchangeRates.select(min(col("blockId"))).first.getInt(0) - // case Some(user_min_block) => - // scala.math.max( - // exchangeRates.select(min(col("blockId"))).first.getInt(0), - // user_min_block - // ) - // } - - // val maxBlockToProcess = config.maxBlock.toOption match { - // case None => exchangeRates.select(max(col("blockId"))).first.getInt(0) - // case Some(user_max_block) => - // scala.math.min( - // exchangeRates.select(max(col("blockId"))).first.getInt(0), - // user_max_block - // ) - // } - - // val ( - // blks, - // txs, - // traces, - // tokenTxs, - // noBlocks, - // noTransactions, - // maxBlockTimestamp, - // maxTxsPerBlock - // ) = timeJob( - // s"Filter source data from ${minBlockToProcess} to ${maxBlockToProcess}" - // ) { - - // val blocksFiltered = - // blocks - // .transform( - // TransformHelpers.filterBlockRange( - // Some(minBlockToProcess), - // Some(maxBlockToProcess) - // ) - // ) - // .persist() - - // val txsFiltered = source - // .transactions() - // .transform( - // TransformHelpers.filterBlockRange( - // Some(minBlockToProcess), - // Some(maxBlockToProcess) - // ) - // ) - // .transform(transformation.onlySuccessfulTxs) - // .persist() - - // val minBlock = blocksFiltered - // .select( - // min(col("blockId")).as("minBlockId"), - // min(col("timestamp")).as("minBlockTimestamp") - // ) - // .withColumn("minBlockDatetime", from_unixtime(col("minBlockTimestamp"))) - // minBlock.select(col("minBlockTimestamp")).first.getInt(0) - // val minBlockDatetime = - // minBlock.select(col("minBlockDatetime")).first.getString(0) - - // val maxBlock = blocksFiltered - // .select( - // max(col("blockId")).as("maxBlockId"), - // max(col("timestamp")).as("maxBlockTimestamp"), - // max(col("transactionCount")).as("maxTxsPerBlock") - // ) - // .withColumn("maxBlockDatetime", from_unixtime(col("maxBlockTimestamp"))) - // val maxBlockTimestamp = - // maxBlock.select(col("maxBlockTimestamp")).first.getInt(0) - // val maxBlockDatetime = - // maxBlock.select(col("maxBlockDatetime")).first.getString(0) - // val maxTxsPerBlock = - // maxBlock.select(col("maxTxsPerBlock")).first.get(0).toString.toLong - - // val noBlocks = blocksFiltered.count() - // val noTransactions = txsFiltered.count() - - // printStat("Min block timestamp", minBlockDatetime) - // printStat("Min block ID", minBlockToProcess) - // printStat("Max block timestamp", maxBlockDatetime) - // printStat("Max block ID", maxBlockToProcess) - // printStat("Transaction count", noTransactions) - // ( - // blocksFiltered, - // txsFiltered, - // source - // .traces() - // .transform( - // TransformHelpers.filterBlockRange( - // Some(minBlockToProcess), - // Some(maxBlockToProcess) - // ) - // ) - // .transform(transformation.onlySuccessfulTrace) - // .persist(), - // source - // .tokenTransfers() - // .transform( - // TransformHelpers.filterBlockRange( - // Some(minBlockToProcess), - // Some(maxBlockToProcess) - // ) - // ) - // .persist(), - // noBlocks, - // noTransactions, - // maxBlockTimestamp, - // maxTxsPerBlock - // ) - // } - - // ( - // exchangeRates, - // blks, - // txs, - // source.txFee(), // For now unfiltered. - // traces, - // tokenTxs, - // tokenConfigurations, - // noBlocks, - // noTransactions, - // maxBlockTimestamp, - // maxTxsPerBlock - // ) - // } - - override def run(from: Option[Int], to: Option[Int]): Unit = { - - // spark.conf.set("spark.sql.ansi.enabled", true) - - println("Running tron specific transformations.") - - // val ( - // exchangeRates, - // blocks, - // txs, - // txFees, - // traces, - // tokenTxs, - // tokenConfigurations, - // noBlocks, - // noTransactions, - // maxBlockTimestamp, - // maxTxsPerBlock - // ) = prepareAndLoad() - - // val (addressIds, noAddresses) = - // timeJob("Computing Address Ids") { - - // val ids = computeCached("addressIds") { - // transformation - // .computeAddressIds( - // traces, - // txs, - // tokenTxs, - // maxTxsPerBlock - // ) - // } - - // val noAddresses = ids.count() - // printStat("#addresses", noAddresses.toString()) - - // printDatasetStats(ids, "addressIds") - - // (ids, noAddresses) - // } - // printStat("#addresses", noAddresses) - - // /* computing and storing balances */ - // timeJob("Computing balances") { - // /* val balances = transformation - // .computeBalancesWithFeesTable( - // blocks, - // txs, - // txFees, - // traces, - // addressIds, - // tokenTxs, - // tokenConfigurations - // ) - // .persist()*/ - - // val balances = transformation - // .computeBalances( - // blocks, - // txs, - // traces, - // addressIds, - // tokenTxs, - // tokenConfigurations - // ) - // .persist() - - // printDatasetStats(balances, "balances") - - // if (debug > 0) { - // println("null balances") - // balances.filter(col("addressId").isNull).show(100) - // printStat("#balances", balances.count()) - // } - - // sink.saveBalances(balances.filter(col("addressId").isNotNull)) - - // balances.unpersist(true) - // txFees.unpersist(true) - // } - - // /* computing and storing address id prefixes */ - // timeJob("Computing and storing addr id lookups") { - // val addressIdsByAddressPrefix = - // addressIds.toDF.transform( - // TransformHelpers.withSortedPrefix[AddressIdByAddressPrefix]( - // "address", - // "addressPrefix", - // config.addressPrefixLength() - // ) - // ) - - // printDatasetStats(addressIdsByAddressPrefix, "addressIdsByAddressPrefix") - - // if (debug > 0) { - // println("null addressIdsByAddressPrefix addresses") - // addressIdsByAddressPrefix.filter(col("addressPrefix").isNull).show(10) - // } - - // sink.saveAddressIdsByPrefix( - // addressIdsByAddressPrefix - // ) - // } - - // val transactionIds = timeJob("Computing transaction IDs") { - // computeCached("transactionIds") { - // transformation.computeTransactionIds(txs) - // } - // } - - // if (debug > 1) { - // val txs = transactionIds.count() - // val blub = - // transactionIds.dropDuplicates("transaction", "transactionId").count() - // printStat("nr txid", txs) - // printStat("nr txid without duplicates", blub) - // } - - // /* computing and storing address id prefixes */ - // timeJob("Computing and storing txid lookups") { - // val transactionIdsByTransactionIdGroup = - // transactionIds.toDF.transform( - // TransformHelpers.withSortedIdGroup[TransactionIdByTransactionIdGroup]( - // "transactionId", - // "transactionIdGroup", - // config.bucketSize() - // ) - // ) - - // sink.saveTransactionIdsByGroup(transactionIdsByTransactionIdGroup) - - // val transactionIdsByTransactionPrefix = - // transactionIds.toDF.transform( - // TransformHelpers.withSortedPrefix[TransactionIdByTransactionPrefix]( - // "transaction", - // "transactionPrefix", - // config.txPrefixLength() - // ) - // ) - // sink.saveTransactionIdsByTxPrefix(transactionIdsByTransactionPrefix) - // } - - // val contracts = timeJob("Computing contracts") { - // computeCached("contracts") { - // transformation.computeContracts(traces, txs, addressIds) - // } - // } - // printDatasetStats(contracts, "contracts") - - // if (debug > 0) { - // printStat("# contracts", contracts.count()) - // } - - // val encodedTransactions = timeJob("Encoding transactions") { - // computeCached("encodedTransactions") { - // transformation - // .computeEncodedTransactions( - // traces, - // transactionIds, - // txs, - // addressIds, - // exchangeRates - // ) - // } - // } - - // printDatasetStats(encodedTransactions, "encodedTransactions") - - // /* - // Caution this is a functional count it allows - // to unpersist tx and traces here since - // dataset is forced to be cached - // */ - // printStat("#encoded Txs", encodedTransactions.count()) - // txs.unpersist(true) - // traces.unpersist(true) - - // val encodedTokenTransfers = - // timeJob("Compute encoded token txs") { - // computeCached("encodedTokenTransfers") { - // transformation - // .computeEncodedTokenTransfers( - // tokenTxs, - // tokenConfigurations, - // transactionIds, - // addressIds, - // exchangeRates - // ) - // } - // } - - // printDatasetStats(encodedTokenTransfers, "encodedTokenTransfers") - - // /* - // Caution this is a functional count it allows - // to unpersist exchangeRates, txids, tokentxs here since - // dataset is forced to be cached - // */ - // printStat("#encoded token Txs", encodedTokenTransfers.count()) - - // exchangeRates.unpersist(true) - // transactionIds.unpersist(true) - // tokenTxs.unpersist(true) - - // timeJob("Computing and storing block transactions") { - // val blockTransactions = - // computeCached("blockTransactions") { - // transformation - // .computeBlockTransactions(blocks, encodedTransactions) - // } - - // printDatasetStats(blockTransactions, "blockTransactions") - // sink.saveBlockTransactionsRelational(blockTransactions) - // blockTransactions.unpersist(true) - // } - - // val addressTransactions = timeJob("Computing address transactions") { - // computeCached("addressTransactions") { - // transformation - // .computeAddressTransactions( - // encodedTransactions, - // encodedTokenTransfers - // ) - // } - // } - // printDatasetStats(addressTransactions, "addressTransactions") - - // if (debug > 0) { - // printStat("#address Txs", addressTransactions.count()) - // } - - // timeJob("Saving address transactions and lookups") { - // sink.saveAddressTransactions(addressTransactions) - - // if (debug > 0) { - // addressTransactions.show(100) - // } - - // val addressTransactionsSecondaryIds = - // TransformHelpers - // .computeSecondaryPartitionIdLookup[AddressTransactionSecondaryIds]( - // addressTransactions.toDF, - // "addressIdGroup", - // "addressIdSecondaryGroup" - // ) - - // sink.saveAddressTransactionBySecondaryId(addressTransactionsSecondaryIds) - // } - - // timeJob("Computing address and storing statistics") { - // val addresses = - // computeCached("addresses") { - // transformation.computeAddresses( - // encodedTransactions, - // encodedTokenTransfers, - // addressTransactions, - // addressIds, - // contracts - // ) - // } - - // printDatasetStats(addresses, "addresses") - - // sink.saveAddresses(addresses) - // addresses.unpersist(true) - // } - - // addressTransactions.unpersist(true) - // addressIds.unpersist(true) - // contracts.unpersist(true) - - // val noAddressRelations = - // timeJob("Computing and storing address relations") { - // val addressRelations = - // computeCached("addressRelations") { - // transformation - // .computeAddressRelations( - // encodedTransactions, - // encodedTokenTransfers - // ) - // } - - // printDatasetStats(addressRelations, "addressRelations") - - // sink.saveAddressIncomingRelations( - // addressRelations.sort( - // "dstAddressIdGroup", - // "dstAddressIdSecondaryGroup" - // ) - // ) - // sink.saveAddressOutgoingRelations( - // addressRelations.sort( - // "srcAddressIdGroup", - // "srcAddressIdSecondaryGroup" - // ) - // ) - - // val addressIncomingRelationsSecondaryIds = - // TransformHelpers - // .computeSecondaryPartitionIdLookup[ - // AddressIncomingRelationSecondaryIds - // ]( - // addressRelations.toDF, - // "dstAddressIdGroup", - // "dstAddressIdSecondaryGroup" - // ) - // val addressOutgoingRelationsSecondaryIds = - // TransformHelpers - // .computeSecondaryPartitionIdLookup[ - // AddressOutgoingRelationSecondaryIds - // ]( - // addressRelations.toDF, - // "srcAddressIdGroup", - // "srcAddressIdSecondaryGroup" - // ) - - // sink.saveAddressIncomingRelationsBySecondaryId( - // addressIncomingRelationsSecondaryIds - // ) - // sink.saveAddressOutgoingRelationsBySecondaryId( - // addressOutgoingRelationsSecondaryIds - // ) - - // addressRelations.count() - // } - - // timeJob("Computing summary statistics") { - // val summaryStatistics = - // transformation.summaryStatistics( - // maxBlockTimestamp, - // noBlocks, - // noTransactions, - // noAddresses, - // noAddressRelations - // ) - // summaryStatistics.show() - - // sink.saveSummaryStatistics(summaryStatistics) - // } - - } - -}