For this project, I worked with Apache Airflow to manage workflow of different data operators scheduled as per the dependency on each other represented by DAG (Directed Acyclic Graph) for extracting data stored in JSON and CSV file formats in S3, staging them into tables in Amazon Redshift, loading data into facts and dimensions tables of the data warehouse and checking the quality of data after each ETL cycle is completed.
A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow.
The task is to create high grade data pipelines that are dynamic and built from reusable tasks, can be monitored, and allow easy backfills. They have also noted that the data quality plays a big part when analyses are executed on top the data warehouse and want to run tests against their datasets after the ETL steps have been executed to catch any discrepancies in the datasets.
The source data resides in S3 and needs to be processed in Sparkify's data warehouse in Amazon Redshift. The source datasets consist of CSV logs that tell about user activity in the application and JSON metadata about the songs the users listen to.
I decided to create different custom operators that subclass Apache Airflow base operator and perform specific step in the ETL process delineated below.
- Reading user activity log data stored in CSV format in S3 into staging table in Redshift.
- Reading songs metadata stored in JSON format in S3 into staging table in Redshift.
For these tasks I created StageToRedshiftOperator
that takes as arguments the location and the type of file to be read from S3 i.e. JSON or CSV and the name of target table in which the raw data is to be staged on. It issues COPY command to Redshift that supports reading files in JSON files from directly from S3 into designated table in Redshift. This operator uses AwsHook
to retrieve AWS credentials set as connection in Airflow admin interface. It uses PostgresHook
which is compatible with Redshift to execute commands.
In our case, the songplays
table is considered the fact table as it stores the timestamps and other details of the user's activity of listening new songs on the music streaming app. I created LoadFactOperator
that takes as arguments the SQL statement for inserting data and target table name. For loading songplays
table it means that the SQL statement passed as argument would be joining data from staging tables of songs and user activity log data over matching song attributes like title, length and artist and selecting appropriate fields to be inserted into songplays
fact table. The operator executes the SQL statement to load targeted fact table using PostgresHook
compatible with Redshift. Considering the usual case where the fact tables are usually very large and are only appended with new data during scheduled ETL operations, this operator does not drop or empty (truncate) the facts table before inserting new data into it.
In this scenario, the songs
, users
, artists
and time
are dimension tables. To load the data from staging tables into them, I created LoadDimensionOperator
. It takes as arguments the sql statement that would do the selection and insertion of data and name of target table to be loaded. As it is a usual practise to empty the dimension table before inserting new data into it on every ETL cycle, this operator is also capable of first running TRUNCATE operation on the target table before inserting data into it. This truncation behaviour is controlled through a flag should_truncate
passed in as argument to the operator. After the optional truncation step, it then executes SQL statement passed in as argument to perform the insertion of data into targeted dimension table using PostgresHook
compatible with Redshift.
As the ETL process is supposed to be running automatically on regular interval, it essential to test the quality parameters of data after all the ETL steps are completed in given cycle. Data quality check helps instilling the trust of data consumers on the data generated by the pipeline. It also helps the maintainers of data pipeline to get notified when the data generated as part of ETL processing is not meeting quality expectations set by data consumers such as analytics or management teams. I created DataQuailtyOperator
, which takes as arguments a list of pairs of SQL statement that would perform some query and return a single row with single column named result
; and expected value for the result
. The operator runs each SQL statement as query on Redshift using compatible PostgresHook
and compares the retrieved result
value with expected value. If for any SQL statement, the result
value does not match with expected value then an exception is raise to cause the DataQuality
operator to fail, which the pipeline maintainer can take note of and perform necessary corrective actions.
After creating the operators, it is essential to organise them appropriately considering their inter task dependencies. In Airflow, this is represented by DAG (Directed Acyclic Graph) form, where the tasks are nodes and the directed edges are dependency of one task on another one to be completed before starting execution. In our case, the inter-dependency of tasks can be represented as a DAG and can be visualized by Airflow's web interface after it successfully reads the DAG created by us in python using appropriate python classes for airflow
package and setting appropriate dependencies between the operators in our code. Two dummy operators are included in the DAG for indicating the beginning of execution and end of execution of the ETL.
Visualization of DAG consisting of operations performed in data pipeline
Amazon Redshift cluster is to be created with desired configuration. The Airflow webserver can be started by executing following command on Linux system.
/opt/airflow/start.sh
The admin interface of Airflow can be accessed at port 8080 of host system from browser. In the interface the new connection to Redshift cluster with appropriate values for different attributes can be added from the Admin > Connections menu on the top navigation bar.
Dag written in python becomes visible in the Airflow's Dags section. The DAG can be turned on from the switch located to its left and then it starts executing according to the start time, schedule interval and end time set during its instantiation in python code.
The status of all the DAGs can be seen in the Airflow interface. The individual dag can be visualized in Graph view where the dependencies between various operators as set in the python code becomes visible.
Status and the logs from the execution of each operator in the past can be accessed from the Tree view of dag. If the certain operator is showing yellow or red colored square box in the tree view of dag, it can be debugged by going through log file generated by that operator during specific execution to understand the cause of error and make necessary corrections. Upon successful execution, the square box representing the operator turns dark green and the dag can be considered as running successfully when all of the square boxes in a column indicating one execution cycle of workflow have turned dark green.