forked from aws-samples/aws-cdk-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add lambda-ddb-mysql-etl-pipeline python service (aws-samples#134)
* Contributing new Python CDK example project * feat: Add context vars & fix: README update * feat: added layers, context vars, fixed mysql exmpl & gen refactor
- Loading branch information
Showing
12 changed files
with
703 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# lambda-ddb-mysql-etl-pipeline | ||
<!--BEGIN STABILITY BANNER--> | ||
--- | ||
|
||
![Stability: Stable](https://img.shields.io/badge/stability-Stable-success.svg?style=for-the-badge) | ||
|
||
> **This is a stable example. It should successfully build out of the box** | ||
> | ||
> This examples does is built on Construct Libraries marked "Stable" and does not have any infrastructure prerequisites to build. | ||
--- | ||
<!--END STABILITY BANNER--> | ||
|
||
This is a CDK Python ETL Pipeline example that produces the AWS resources necessary to achieve the following: | ||
1) Dynamically deploy CDK apps to different environments. | ||
2) Make an API Request to a NASA asteroid API. | ||
3) Process and write response content to both .csv and .json files. | ||
4) Upload the files to s3. | ||
5) Trigger an s3 event for object retrieval post-put s3 object. | ||
6) Process then dynamically write to either DynamoDB or a MySQL instance. | ||
*The `__doc__` strings are verbose (overly). Please read them carefully as exceptions | ||
and considerations have been included, to provide a more comprehensive example. | ||
|
||
**Please don't forget to read the 'Important Notes' section at the bottom of this README. | ||
I've also included additional links to useful documentation there as well. | ||
|
||
## Project Directory Rundown | ||
`README.md` — The introductory README for this project. | ||
|
||
`etl_pipeline_cdk` — A Python module directory containing the core stack code. | ||
|
||
`etl_pipeline_cdk_stack.py` — A custom CDK stack construct that is the core of the CDK application. | ||
It is where we bring the core stack components together before synthesizing our Cloudformation template. | ||
|
||
`requirements.txt` — Pip uses this file to install all of the dependencies for this CDK app. | ||
In this case, it contains only '-e', which tells pip to install the requirements | ||
specified in `setup.py`--I have all requirements listed. | ||
It also tells pip to run python `setup.py` develop to install the code in the `etl_pipeline_cdk` module so that it can be edited in place. | ||
|
||
`setup.py` — Defines how this Python package would be constructed and what the dependencies are. | ||
|
||
`lambda` — Contains all lambda handler code in the example. See `__doc__` strings for specifics. | ||
|
||
`layers` — Contains the requests layer archive, created for this project. | ||
|
||
## Pre-requisites | ||
#### Keys, Copy & Paste | ||
1) Submit a request for a NASA API key here (it comes quick!): https://api.nasa.gov/ | ||
2) Navigate to the `etl_pipeline_cdk_stack.py` file and replace this text `<nasa_key_here>` | ||
with your NASA key that was emailed to you.** | ||
3) Navigate to the `app.py` file and replace this text `<acct_id>` with your AWS account id | ||
and `<region_id>` with the region you plan to work in--e.g. `us-west-2` for Oregon and `us-east-1` for N. Virginia. | ||
4) Via macOS cli, run this command to set `preprod` env variable: `export AWS_CDK_ENV=preprod` | ||
|
||
**Yes, this is not best practice. We should be using Secrets Manager to store these keys. | ||
I have included the required code to extract those along with some commented notes in my sample of how this is achieved. | ||
Just haven't the time to "plug them in" at the moment--plus it makes this a bit easier to follow. | ||
|
||
## AWS Instructions for env setup | ||
This project is set up like a standard Python project. The initialization | ||
process also creates a virtualenv within this project, stored under the .env | ||
directory. To create the virtualenv it assumes that there is a `python3` | ||
(or `python` for Windows) executable in your path with access to the `venv` | ||
package. If for any reason the automatic creation of the virtualenv fails, | ||
you can create the virtualenv manually. | ||
|
||
To manually create a virtualenv on MacOS and Linux: | ||
|
||
``` | ||
$ python3 -m venv .env | ||
``` | ||
|
||
After the init process completes and the virtualenv is created, you can use the following | ||
step to activate your virtualenv. | ||
|
||
``` | ||
$ source .env/bin/activate | ||
``` | ||
|
||
If you are a Windows platform, you would activate the virtualenv like this: | ||
|
||
``` | ||
% .env\Scripts\activate.bat | ||
``` | ||
|
||
Once the virtualenv is activated, you can install the required dependencies. | ||
**I've listed all required dependencies in setup.py, thus the `-e`. | ||
|
||
``` | ||
$ pip install -r requirements.txt | ||
``` | ||
|
||
At this point you can now synthesize the CloudFormation template for this code. | ||
|
||
``` | ||
$ cdk synth | ||
``` | ||
|
||
To add additional dependencies, for example other CDK libraries, just add | ||
them to your `setup.py` file and rerun the `pip install -r requirements.txt` | ||
command. | ||
|
||
# Useful commands | ||
|
||
* `cdk ls` list all stacks in the app | ||
* `cdk synth` emits the synthesized CloudFormation template | ||
* `cdk deploy` deploy this stack to your default AWS account/region | ||
* `cdk diff` compare deployed stack with current state | ||
* `cdk docs` open CDK documentation | ||
|
||
# Important Notes: | ||
Destroying Resources: | ||
|
||
After you are finished with this app, you can run `cdk destroy` to quickly remove the majority | ||
of the stack's resources. However, some resources will NOT automatically be destroyed and require | ||
some manual intervention. Here is a list directions of what you must do: | ||
1) S3 bucket: You must first delete all files in bucket. Changes to the current policy which forbid | ||
bucket deletion, if files are present are in development and can be found here: https://github.com/aws/aws-cdk/issues/3297 | ||
2) CloudWatch Log Groups for lambda logging. Found on filter: `/aws/lambda/Etl` | ||
3) s3 CDK folder with your CloudFormation templates. Delete at your discretion. | ||
4) Your bootstrap stack asset s3 folder will have some assets in there. Delete/save at your discretion. | ||
**Don't delete the bootstrap stack, nor the s3 asset bucket, if you plan to continue using CDK. | ||
5) Both lambdas are set to run in `logging.DEBUG`, switch if too verbose. See CloudWatch logs for logs. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
#!/usr/bin/env python3 | ||
|
||
from aws_cdk import core | ||
from etl_pipeline_cdk.etl_pipeline_cdk_stack import EtlPipelineCdkStack | ||
|
||
app = core.App() | ||
STAGE = app.node.try_get_context("STAGE") | ||
ENV={ | ||
"region": app.node.try_get_context("REGION"), | ||
"account": app.node.try_get_context("ACCTID") | ||
} | ||
stack_name = app.node.try_get_context("stack_name") | ||
|
||
EtlPipelineCdkStack(app, | ||
stack_name, | ||
env=ENV, | ||
stage=STAGE) | ||
app.synth() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
{ | ||
"context": { | ||
"STAGE":"preprod", | ||
"REGION":"us-east-1", | ||
"ACCTID":"<INSERTACCTID>", | ||
"NASA_KEY":"<INSERTNASAKEY>", | ||
"SCHEMA":"DB_NAME_PREPROD", | ||
"DB_SECRETS_REF":"<INSERTSECRETS_MGR_REF>", | ||
"TOPIC_ARN":"<INSERT_TOPIC_ARN>", | ||
"stack_name":"EtlPipelineCdkStackPreProd" | ||
}, | ||
"app": "python3 app.py" | ||
} |
Empty file.
142 changes: 142 additions & 0 deletions
142
python/lambda-ddb-mysql-etl-pipeline/etl_pipeline_cdk/etl_pipeline_cdk_stack.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
from aws_cdk import ( | ||
core, | ||
aws_s3 as s3, | ||
aws_s3_notifications as s3n, | ||
aws_lambda as _lambda, | ||
aws_dynamodb as ddb, | ||
aws_events as events, | ||
aws_events_targets as targets, | ||
) | ||
|
||
class EtlPipelineCdkStack(core.Stack): | ||
"""Define the custom CDK stack construct class that inherits from the cdk.Construct base class. | ||
Notes: | ||
This is the meat of our stack that will be built in app.py of the CDK application. | ||
Lambda inline code ex. => You can use the following in lieu of AssetCode() for inline code: | ||
with open("lambda/dbwrite.py", encoding="utf8") as fp: | ||
handler_code = fp.read() | ||
...code=_lambda.InlineCode(handler_code)... | ||
*Please consider char limits for inline code write. | ||
""" | ||
|
||
def __init__(self, scope: core.Construct, id: str, **kwargs) -> None: | ||
"""Invoke the base class constructor via super with the received scope, id, and props | ||
Args: | ||
scope: Defines scope in which this custom construct stack is created. | ||
id (str): Defines local identity of the construct. Must be unique amongst constructs | ||
within the same scope, as it's used to formulate the cF logical id for each resource | ||
defined in this scope. | ||
kwargs: Lots of possibilities | ||
""" | ||
|
||
# example of passing app.py level params to stack class | ||
self.stage=kwargs['stage'] | ||
kwargs={} | ||
|
||
super().__init__(scope, id, **kwargs) | ||
|
||
# Resources to create | ||
s3_bucket = s3.Bucket( | ||
self, "Bucket", | ||
bucket_name=f"asteroids-{self.stage}", | ||
versioned=False, | ||
removal_policy=core.RemovalPolicy.DESTROY # NOT recommended for production code | ||
) | ||
|
||
ddb_asteroids_table = ddb.Table( | ||
self, "Table", | ||
table_name="asteroids_table", | ||
partition_key={ | ||
"name": "id", | ||
"type": ddb.AttributeType.STRING | ||
}, | ||
removal_policy=core.RemovalPolicy.DESTROY # NOT recommended for production code | ||
) | ||
|
||
# Lambdas and layers | ||
requests_layer = _lambda.LayerVersion( | ||
self, "requests", | ||
code=_lambda.AssetCode('layers/requests.zip')) | ||
pandas_layer = _lambda.LayerVersion( | ||
self, "pandas", | ||
code=_lambda.AssetCode('layers/pandas.zip')) | ||
pymysql_layer = _lambda.LayerVersion( | ||
self, "pymysql", | ||
code=_lambda.AssetCode('layers/pymysql.zip')) | ||
|
||
process_asteroid_data = _lambda.Function( | ||
self, "ProcessAsteroidsLambda", | ||
runtime=_lambda.Runtime.PYTHON_3_7, | ||
code=_lambda.AssetCode("lambda"), | ||
handler="asteroids.handler", | ||
layers=[requests_layer], | ||
environment={ | ||
"S3_BUCKET": s3_bucket.bucket_name, | ||
"NASA_KEY": self.node.try_get_context("NASA_KEY"), | ||
} | ||
) | ||
|
||
db_write = _lambda.Function( | ||
self, "DbWriteLambda", | ||
runtime=_lambda.Runtime.PYTHON_3_7, | ||
handler="dbwrite.handler", | ||
layers=[pandas_layer, pymysql_layer], | ||
code=_lambda.Code.asset('lambda'), | ||
environment={ | ||
"ASTEROIDS_TABLE": ddb_asteroids_table.table_name, | ||
"S3_BUCKET": s3_bucket.bucket_name, | ||
"SCHEMA": self.node.try_get_context("SCHEMA"), | ||
"REGION": self.node.try_get_context("REGION"), | ||
"DB_SECRETS": self.node.try_get_context("DB_SECRETS_REF"), | ||
"TOPIC_ARN": self.node.try_get_context("TOPIC_ARN") | ||
} | ||
) | ||
|
||
# Rules and Events | ||
json_rule = events.Rule( | ||
self, "JSONRule", | ||
schedule=events.Schedule.cron( | ||
minute="15", | ||
hour="*", | ||
month="*", | ||
week_day="*", | ||
year="*" | ||
) | ||
) | ||
|
||
csv_rule = events.Rule( | ||
self, "CSVRule", | ||
schedule=events.Schedule.cron( | ||
minute="30", | ||
hour="*", | ||
month="*", | ||
week_day="*", | ||
year="*" | ||
) | ||
) | ||
|
||
# add lambda function target as well as custom trigger input to rules | ||
json_rule.add_target( | ||
targets.LambdaFunction( | ||
process_asteroid_data, | ||
event=events.RuleTargetInput.from_text("json") | ||
) | ||
) | ||
csv_rule.add_target( | ||
targets.LambdaFunction( | ||
process_asteroid_data, | ||
event=events.RuleTargetInput.from_text("csv") | ||
) | ||
) | ||
# create s3 notification for the db_write function | ||
notify_lambda = s3n.LambdaDestination(db_write) | ||
# assign 'notify_lambda' notification for 'OBJECT_CREATED' event type | ||
s3_bucket.add_event_notification(s3.EventType.OBJECT_CREATED, notify_lambda) | ||
|
||
# Permissions | ||
s3_bucket.grant_read_write(process_asteroid_data) | ||
s3_bucket.grant_read_write(db_write) | ||
ddb_asteroids_table.grant_read_write_data(db_write) |
Oops, something went wrong.