diff --git a/README.md b/README.md index f06aa7c..e64877c 100644 --- a/README.md +++ b/README.md @@ -1,148 +1,69 @@ -Example Production ETL System - TPC-H -===================================== +# Easy Scalable Production ETL -This is an example system that runs regular jobs on a schedule, at scale, on -the cloud using a variety of technologies: +**This repository is a lightweight scalable example pipeline that runs large Python jobs on a schedule in the cloud. We hope this example is easy to copy and modify for your own needs.** -- **Prefect:** for job scheduling and workflow management -- **Coiled::** for cloud hardware provisioning -- **Dask:** for parallel processing -- **Parquet** and **Deltalake:** for data storage -- **XGBoost:** for model training -- **Streamlit** and **Fast API:** for dashboarding and model hosting +## Background -Raw data flows in every minute and is transformed through several stages at -different scales and cadences. +It’s common to run regular large-scale Python jobs on the cloud as part of production data pipelines. Modern workflow orchestration systems like Prefect, Dagster, Airflow, Argo, etc. all work well for running jobs on a regular cadence, but we often see groups struggle with complexity around cloud infrastructure and lack of scalability. -- **Data Generation:** (*every 30 seconds*) new JSON files appear -- **Data Preprocessing:** (every minute) JSON gets transformed into Parquet / Delta -- **Data Compaction:** (every 30 minutes) small Parquet files get repartitioned into larger ones -- **Business Queries:** (every hour) large scale multi-table analysisqueries run that feed dashboards -- **Model Training:** (every six hours) with XGBoost +This repository contains a scalable data pipeline that runs regular jobs on the cloud with [Coiled](https://docs.coiled.io/user_guide/index.html) and Prefect. This approach is: -Additionally we keep Streamlit and FastAPI servers up and running. - -It looks kinda like this: +- **Easy to deploy** on the cloud +- **Scalable** across many cloud machines +- **Cheap to run** on ephemeral VMs and a small always-on VM ![ETL Pipeline](images/excalidraw.png) -How this works --------------- - -### Concurrent Flows - -The file [workflow.py](workflow.py) defines several Prefect flows operating at -different cadences: - -```python -# workflow.py -... - -generate = generate_json.to_deployment( - name="generate_data", - interval=timedelta(seconds=30), -) -preprocess = json_to_parquet.to_deployment( - name="preprocess", - interval=timedelta(minutes=1), -) -train = update_model.to_deployment( - name="train", - interval=timedelta(hours=6), -) - -prefect.serve( - generate, - preprocess, - train, - ... -) -``` - -### Coiled Functions + Prefect Tasks +## How to run -These flows are defined in files like [pipeline/preprocess.py](pipeline/preprocess.py), which combine prefect tasks either with Coiled functions for remote execution like the following: +_You can run this pipeline yourself, either locally or on the cloud._ -```python -# pipeline/preprocess.py - -import coiled -from prefect import task, flow -import pandas as pd +Make sure you have a [Prefect cloud](https://www.prefect.io/cloud) account and have authenticated your local machine. -@task -@coiled.function(region="us-east-2", memory="64 GiB") -def json_to_parquet(filename): - df = pd.read_json(filename) - df.to_parquet(OUTFILE / filename.split(".")[-2] + ".parquet") +Clone this repository and install dependencies: -@flow -def convert_all_files(): - files = list_files() - json_to_parquet.map(files) +```bash +git clone https://github.com/coiled/etl-tpch +cd etl-tpch +mamba env create -f environment.yml +mamba activate etl-tpch ``` -### Dask Clusters for larger jobs - -Or in files like [pipeline/reduce.py](pipeline/reduce.py) which create larger -clusters on-demand. - -```python -# pipeline/reduce.py +### Local -import coiled -from prefect import task, flow -import dask.dataframe as dd +In your terminal run: -@task -def query_for_bad_accounts(bucket): - with coiled.Cluster(n_workers=20, region="us-east-2") as cluster: - with cluster.get_client() as client: - df = dd.read_parquet(bucket) - - ... # complex query - - result.to_parquet(...) +```bash +python workflow.py # Run data pipeline locally ``` -Data Generation ---------------- +### Cloud -In the background we're generating the data ourselves. This data is taken from -the TPC-H benchmark suite. It's a bunch of tables that simulate orders / -customers / suppliers, and has a schema that looks like this: +If you haven't already, create a Coiled account and follow the setup +guide at [coiled.io/start](https://coiled.io/start). -![TPC-H Schema](https://docs.snowflake.com/en/_images/sample-data-tpch-schema.png) +Next, adjust the [`pipeline/config.yml`](pipeline/config.yml) +configuration file by setting `local: false` and `data-dir` to an +S3 bucket where you would like data assets to be stored. -This gives the system a sense of realism, while still being able to tune up or -down in scale and run easily as an example. - -How to Run Locally ------------------- - -Make sure you have a [Prefect cloud](https://www.prefect.io/cloud) account and have authenticated your local machine. - -Create a software environment: +Set `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` and `AWS_REGION` +environment variables that enable access to your S3 bucket and +specify the region the bucket is in, respectively. ```bash -mamba env create -f environment.yml +export AWS_ACCESS_KEY_ID=... +export AWS_SECRET_ACCESS_KEY=... +export AWS_REGION=... ``` -Then run each command below in separate terminal windows: +Finally, in your terminal run: ```bash -python serve_model.py # Serve ML model -``` -```bash -python workflow.py # Run ETL pipeline -``` -```bash -streamlit run dashboard.py # Serve dashboard +coiled prefect serve \ + --vm-type t3.medium \ # Small, always-on VM + --region $AWS_REGION \ # Same region as data + -f dashboard.py -f pipeline \ # Include pipeline files + -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ # S3 bucket access + -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ + workflow.py ``` - -Watch things run, both in your terminals and on Prefect cloud, and then ctrl-C to shut everything down. - -How to Run in the Cloud ------------------------ - -This all works, we just haven't documented it well yet. diff --git a/images/excalidraw.png b/images/excalidraw.png index 87caab3..f0cfac7 100644 Binary files a/images/excalidraw.png and b/images/excalidraw.png differ