This project mainly aims to provide some samples on how to load data from either a CSV or a Parquet Table as well as some code snippets for processing this loaded data. The code provided in this project is written in Scala. Apache Spark also provides the ability to write the code in either Python or Java. However, this project was meant to target efficiency, that’s why Scala was used as it provides more efficiency compared to other programming languages mentioned previously especially for spark because spark itself is implemented using Scala.
In order to set up this project successfully, some prerequisites were required to provide the perfect environment for this project. Here are some of essential tools for our environment:
-
This is the virtual machine where most of our work will be done. Cloudera QuickStart virtual machines (VMs) include everything you need to try CDH, Cloudera Manager, Impala, and Cloudera Search. Hence, we will use it to connect with Hive database later on.
You can download Cloudera from this link. -
This is the software development environment that offers a collection of tools and libraries necessary for developing Java applications. This version is specifically required for writing code in Scala.
You can download Cloudera from this link. -
This is the main component of our project which will enable us to apply some processing operations on our data.
You can download Cloudera from this link. -
This is the main cross-platform IDE that provides us to write a Scala code to apply some Spark operations.
You can download Cloudera from this link.
In this project I used a randomly selected dataset from Kaggle. This dataset is a simple collection of Credit Card Transactions. You can access the dataset from this link. This dataset consists of the following fields:
Field Name | Field Description |
---|---|
Account Number | Represents the customer’s bank account number |
Customer ID | Represents a unique number for each bank customer |
Credit Limit | Represents the maximum amount to be withdrawal |
Available Money | Represents the amount of Debit Balance available |
Transaction Date & Time | Represents the transaction date and time |
Transaction Amount | Represents the total amount of the transaction |
Merchant Name | Represents the name of account accepting payments |
Merchant Country | Represents the country of account accepting payments |
Merchant Category | Represents the category of account accepting payments |
Expiry Date | Represents the expiry date of the Credit Card |
Transaction Type | Represents the type of the credit card transaction |
Is Fraud? | Represents whether the transaction is Fraud or not |
In this section, I will provide a sort of visualization some sample operations used in this project. For furthermore operations along with their output results, please check the following Technical Report.
-
Code Snippet:
val transactions = spark.read.format("csv").option("header", "true").load("/home/transactions.csv")
-
Code Snippet:
transactions.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("bdp.hv_parq") val df = spark.sql("SELECT * FROM bdp.hv_parq")
-
Code Snippet:
transactions.groupBy("IsFraud").count().show()
Output:
IsFraud Count true 11302 false 630612 Output Chart:
-
Code Snippet:
transactions.groupBy("Merchant_Name").count().orderBy(desc("count")).limit(5).show()
Output:
Merchant_Name Count Lyft 25311 Uber 25263 gap.com 13824 apple.com 13607 target.com 13601 Output Chart:
-
Operation #5: Find out how many types of transactions are available arranged according to popularity:
Code Snippet:
transactions.groupBy("Trans_Type").count().orderBy(desc("count")).show()
Output:
Trans_Type Count PURCHASE 608685 ADDRESS_VERIFICATION 16478 REVERSAL 16162 Others 589 Output Chart:
-
Code Snippet:
transactions.createOrReplaceTempView("TransactionsTable") val snippet = spark.sql( """SELECT COUNT(*) FROM ( (SELECT customerId FROM TransactionsTable WHERE merchantCategoryCode='gym' GROUP BY customerId) INTERSECT (SELECT customerId FROM TransactionsTable WHERE merchantCategoryCode='fastfood' GROUP BY customerId))""") val results = snippet.collect() results.foreach(println)
Output:
90
-
Code Snippet:
val mcdonalds = spark.sql( """ SELECT customerId, MIN(transactionDate) AS startDate FROM TransactionsTable WHERE merchantName LIKE 'McDonalds%' GROUP BY customerId ORDER BY MIN(transactionDate) """) val hardees = spark.sql( """ SELECT customerId, MIN(transactionDate) AS startDate FROM TransactionsTable WHERE merchantName LIKE "Hardee's%" GROUP BY customerId ORDER BY MIN(transactionDate) """) mcdonalds.createOrReplaceTempView("McDonalds_Table") hardees.createOrReplaceTempView("Hardees_Table") val Mc_Hardees = spark.sql( """ SELECT McDonalds_Table.customerId FROM McDonalds_Table JOIN Hardees_Table ON McDonalds_Table.customerId = Hardees_Table.customerId WHERE McDonalds_Table.startDate < Hardees_Table.startDate """) Mc_Hardees.createOrReplaceTempView("McHardees") val result = spark.sql( """ (SELECT customerId FROM McHardees) EXCEPT (SELECT customerId FROM TransactionsTable WHERE merchantName LIKE 'Five Guys%' GROUP BY customerId) """) result.show()
Output:
customerId 847174168 667315366 477081008 -
Operation #8: Find out top 3 customers total spending on food/fast food/food delivery on each month:
Code Snippet:
val foodCustomers = spark.sql( """ SELECT customerId, transactionAmount, MONTH(transactionDate) AS Month FROM TransactionsTable WHERE merchantCategoryCode='fastfood' OR merchantCategoryCode='food' OR merchantCategoryCode='food_delivery' ORDER BY Month """) foodCustomers.createOrReplaceTempView("FoodCustomers") val result = spark.sql( """ Select Month, customerId, TotalAmount FROM ( Select Month, customerId, TotalAmount, row_number() over(partition by Month ORDER BY Month, TotalAmount Desc) as rn FROM( SELECT Month, customerId, Round(Sum(transactionAmount), 2) as TotalAmount FROM FoodCustomers GROUP BY Month, customerId) as Rank) as Result WHERE rn <= 3 """) result.show()
Output:
Month customerId TotalAmount 1 314506271 43343.01 1 456044564 41831.58 1 772212779 38482.51 2 314506271 36458.69 2 772212779 31741.49