Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update readme #26

Merged
merged 7 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 41 additions & 120 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,148 +1,69 @@
Example Production ETL System - TPC-H
=====================================
# Easy Scalable Production ETL

This is an example system that runs regular jobs on a schedule, at scale, on
the cloud using a variety of technologies:
**This repository is a lightweight scalable example pipeline that runs large Python jobs on a schedule in the cloud. We hope this example is easy to copy and modify for your own needs.**

- **Prefect:** for job scheduling and workflow management
- **Coiled::** for cloud hardware provisioning
- **Dask:** for parallel processing
- **Parquet** and **Deltalake:** for data storage
- **XGBoost:** for model training
- **Streamlit** and **Fast API:** for dashboarding and model hosting
## Background

Raw data flows in every minute and is transformed through several stages at
different scales and cadences.
It’s common to run regular large-scale Python jobs on the cloud as part of production data pipelines. Modern workflow orchestration systems like Prefect, Dagster, Airflow, Argo, etc. all work well for running jobs on a regular cadence, but we often see groups struggle with complexity around cloud infrastructure and lack of scalability.

- **Data Generation:** (*every 30 seconds*) new JSON files appear
- **Data Preprocessing:** (every minute) JSON gets transformed into Parquet / Delta
- **Data Compaction:** (every 30 minutes) small Parquet files get repartitioned into larger ones
- **Business Queries:** (every hour) large scale multi-table analysisqueries run that feed dashboards
- **Model Training:** (every six hours) with XGBoost
This repository contains a scalable data pipeline that runs regular jobs on the cloud with [Coiled](https://docs.coiled.io/user_guide/index.html) and Prefect. This approach is:

Additionally we keep Streamlit and FastAPI servers up and running.

It looks kinda like this:
- **Easy to deploy** on the cloud
- **Scalable** across many cloud machines
- **Cheap to run** on ephemeral VMs and a small always-on VM

![ETL Pipeline](images/excalidraw.png)

How this works
--------------

### Concurrent Flows

The file [workflow.py](workflow.py) defines several Prefect flows operating at
different cadences:

```python
# workflow.py
...

generate = generate_json.to_deployment(
name="generate_data",
interval=timedelta(seconds=30),
)
preprocess = json_to_parquet.to_deployment(
name="preprocess",
interval=timedelta(minutes=1),
)
train = update_model.to_deployment(
name="train",
interval=timedelta(hours=6),
)

prefect.serve(
generate,
preprocess,
train,
...
)
```

### Coiled Functions + Prefect Tasks
## How to run

These flows are defined in files like [pipeline/preprocess.py](pipeline/preprocess.py), which combine prefect tasks either with Coiled functions for remote execution like the following:
_You can run this pipeline yourself, either locally or on the cloud._

```python
# pipeline/preprocess.py

import coiled
from prefect import task, flow
import pandas as pd
Make sure you have a [Prefect cloud](https://www.prefect.io/cloud) account and have authenticated your local machine.

@task
@coiled.function(region="us-east-2", memory="64 GiB")
def json_to_parquet(filename):
df = pd.read_json(filename)
df.to_parquet(OUTFILE / filename.split(".")[-2] + ".parquet")
Clone this repository and install dependencies:

@flow
def convert_all_files():
files = list_files()
json_to_parquet.map(files)
```bash
git clone https://github.com/coiled/etl-tpch
cd etl-tpch
mamba env create -f environment.yml
mamba activate etl-tpch
```

### Dask Clusters for larger jobs

Or in files like [pipeline/reduce.py](pipeline/reduce.py) which create larger
clusters on-demand.

```python
# pipeline/reduce.py
### Local

import coiled
from prefect import task, flow
import dask.dataframe as dd
In your terminal run:

@task
def query_for_bad_accounts(bucket):
with coiled.Cluster(n_workers=20, region="us-east-2") as cluster:
with cluster.get_client() as client:
df = dd.read_parquet(bucket)

... # complex query

result.to_parquet(...)
```bash
python workflow.py # Run data pipeline locally
```

Data Generation
---------------
### Cloud

In the background we're generating the data ourselves. This data is taken from
the TPC-H benchmark suite. It's a bunch of tables that simulate orders /
customers / suppliers, and has a schema that looks like this:
If you haven't already, create a Coiled account and follow the setup
guide at [coiled.io/start](https://coiled.io/start).

![TPC-H Schema](https://docs.snowflake.com/en/_images/sample-data-tpch-schema.png)
Next, adjust the [`pipeline/config.yml`](pipeline/config.yml)
configuration file by setting `local: false` and `data-dir` to an
S3 bucket where you would like data assets to be stored.

This gives the system a sense of realism, while still being able to tune up or
down in scale and run easily as an example.

How to Run Locally
------------------

Make sure you have a [Prefect cloud](https://www.prefect.io/cloud) account and have authenticated your local machine.

Create a software environment:
Set `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` and `AWS_REGION`
environment variables that enable access to your S3 bucket and
specify the region the bucket is in, respectively.

```bash
mamba env create -f environment.yml
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_REGION=...
```

Then run each command below in separate terminal windows:
Finally, in your terminal run:

```bash
python serve_model.py # Serve ML model
```
```bash
python workflow.py # Run ETL pipeline
```
```bash
streamlit run dashboard.py # Serve dashboard
coiled prefect serve \
--vm-type t3.medium \ # Small, always-on VM
--region $AWS_REGION \ # Same region as data
-f dashboard.py -f pipeline \ # Include pipeline files
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ # S3 bucket access
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
workflow.py
```

Watch things run, both in your terminals and on Prefect cloud, and then ctrl-C to shut everything down.

How to Run in the Cloud
-----------------------

This all works, we just haven't documented it well yet.
Binary file modified images/excalidraw.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading