diff --git a/python/lambda-ddb-mysql-etl-pipeline/README.md b/python/lambda-ddb-mysql-etl-pipeline/README.md new file mode 100644 index 000000000..b77b26d9f --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/README.md @@ -0,0 +1,123 @@ +# lambda-ddb-mysql-etl-pipeline + +--- + +![Stability: Stable](https://img.shields.io/badge/stability-Stable-success.svg?style=for-the-badge) + +> **This is a stable example. It should successfully build out of the box** +> +> This examples does is built on Construct Libraries marked "Stable" and does not have any infrastructure prerequisites to build. + +--- + + +This is a CDK Python ETL Pipeline example that produces the AWS resources necessary to achieve the following: +1) Dynamically deploy CDK apps to different environments. +2) Make an API Request to a NASA asteroid API. +3) Process and write response content to both .csv and .json files. +4) Upload the files to s3. +5) Trigger an s3 event for object retrieval post-put s3 object. +6) Process then dynamically write to either DynamoDB or a MySQL instance. +*The `__doc__` strings are verbose (overly). Please read them carefully as exceptions +and considerations have been included, to provide a more comprehensive example. + +**Please don't forget to read the 'Important Notes' section at the bottom of this README. +I've also included additional links to useful documentation there as well. + +## Project Directory Rundown +`README.md` — The introductory README for this project. + +`etl_pipeline_cdk` — A Python module directory containing the core stack code. + +`etl_pipeline_cdk_stack.py` — A custom CDK stack construct that is the core of the CDK application. +It is where we bring the core stack components together before synthesizing our Cloudformation template. + +`requirements.txt` — Pip uses this file to install all of the dependencies for this CDK app. +In this case, it contains only '-e', which tells pip to install the requirements +specified in `setup.py`--I have all requirements listed. +It also tells pip to run python `setup.py` develop to install the code in the `etl_pipeline_cdk` module so that it can be edited in place. + +`setup.py` — Defines how this Python package would be constructed and what the dependencies are. + +`lambda` — Contains all lambda handler code in the example. See `__doc__` strings for specifics. + +`layers` — Contains the requests layer archive, created for this project. + +## Pre-requisites +#### Keys, Copy & Paste +1) Submit a request for a NASA API key here (it comes quick!): https://api.nasa.gov/ +2) Navigate to the `etl_pipeline_cdk_stack.py` file and replace this text `` +with your NASA key that was emailed to you.** +3) Navigate to the `app.py` file and replace this text `` with your AWS account id +and `` with the region you plan to work in--e.g. `us-west-2` for Oregon and `us-east-1` for N. Virginia. +4) Via macOS cli, run this command to set `preprod` env variable: `export AWS_CDK_ENV=preprod` + +**Yes, this is not best practice. We should be using Secrets Manager to store these keys. +I have included the required code to extract those along with some commented notes in my sample of how this is achieved. +Just haven't the time to "plug them in" at the moment--plus it makes this a bit easier to follow. + +## AWS Instructions for env setup +This project is set up like a standard Python project. The initialization +process also creates a virtualenv within this project, stored under the .env +directory. To create the virtualenv it assumes that there is a `python3` +(or `python` for Windows) executable in your path with access to the `venv` +package. If for any reason the automatic creation of the virtualenv fails, +you can create the virtualenv manually. + +To manually create a virtualenv on MacOS and Linux: + +``` +$ python3 -m venv .env +``` + +After the init process completes and the virtualenv is created, you can use the following +step to activate your virtualenv. + +``` +$ source .env/bin/activate +``` + +If you are a Windows platform, you would activate the virtualenv like this: + +``` +% .env\Scripts\activate.bat +``` + +Once the virtualenv is activated, you can install the required dependencies. +**I've listed all required dependencies in setup.py, thus the `-e`. + +``` +$ pip install -r requirements.txt +``` + +At this point you can now synthesize the CloudFormation template for this code. + +``` +$ cdk synth +``` + +To add additional dependencies, for example other CDK libraries, just add +them to your `setup.py` file and rerun the `pip install -r requirements.txt` +command. + +# Useful commands + + * `cdk ls` list all stacks in the app + * `cdk synth` emits the synthesized CloudFormation template + * `cdk deploy` deploy this stack to your default AWS account/region + * `cdk diff` compare deployed stack with current state + * `cdk docs` open CDK documentation + +# Important Notes: +Destroying Resources: + +After you are finished with this app, you can run `cdk destroy` to quickly remove the majority +of the stack's resources. However, some resources will NOT automatically be destroyed and require +some manual intervention. Here is a list directions of what you must do: +1) S3 bucket: You must first delete all files in bucket. Changes to the current policy which forbid +bucket deletion, if files are present are in development and can be found here: https://github.com/aws/aws-cdk/issues/3297 +2) CloudWatch Log Groups for lambda logging. Found on filter: `/aws/lambda/Etl` +3) s3 CDK folder with your CloudFormation templates. Delete at your discretion. +4) Your bootstrap stack asset s3 folder will have some assets in there. Delete/save at your discretion. +**Don't delete the bootstrap stack, nor the s3 asset bucket, if you plan to continue using CDK. +5) Both lambdas are set to run in `logging.DEBUG`, switch if too verbose. See CloudWatch logs for logs. diff --git a/python/lambda-ddb-mysql-etl-pipeline/app.py b/python/lambda-ddb-mysql-etl-pipeline/app.py new file mode 100644 index 000000000..87eee6a96 --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/app.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 + +from aws_cdk import core +from etl_pipeline_cdk.etl_pipeline_cdk_stack import EtlPipelineCdkStack + +app = core.App() +STAGE = app.node.try_get_context("STAGE") +ENV={ + "region": app.node.try_get_context("REGION"), + "account": app.node.try_get_context("ACCTID") +} +stack_name = app.node.try_get_context("stack_name") + +EtlPipelineCdkStack(app, + stack_name, + env=ENV, + stage=STAGE) +app.synth() diff --git a/python/lambda-ddb-mysql-etl-pipeline/cdk.json b/python/lambda-ddb-mysql-etl-pipeline/cdk.json new file mode 100644 index 000000000..101c952b7 --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/cdk.json @@ -0,0 +1,13 @@ +{ + "context": { + "STAGE":"preprod", + "REGION":"us-east-1", + "ACCTID":"", + "NASA_KEY":"", + "SCHEMA":"DB_NAME_PREPROD", + "DB_SECRETS_REF":"", + "TOPIC_ARN":"", + "stack_name":"EtlPipelineCdkStackPreProd" + }, + "app": "python3 app.py" +} diff --git a/python/lambda-ddb-mysql-etl-pipeline/etl_pipeline_cdk/__init__.py b/python/lambda-ddb-mysql-etl-pipeline/etl_pipeline_cdk/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/lambda-ddb-mysql-etl-pipeline/etl_pipeline_cdk/etl_pipeline_cdk_stack.py b/python/lambda-ddb-mysql-etl-pipeline/etl_pipeline_cdk/etl_pipeline_cdk_stack.py new file mode 100644 index 000000000..8b9fb5796 --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/etl_pipeline_cdk/etl_pipeline_cdk_stack.py @@ -0,0 +1,142 @@ +from aws_cdk import ( + core, + aws_s3 as s3, + aws_s3_notifications as s3n, + aws_lambda as _lambda, + aws_dynamodb as ddb, + aws_events as events, + aws_events_targets as targets, +) + +class EtlPipelineCdkStack(core.Stack): + """Define the custom CDK stack construct class that inherits from the cdk.Construct base class. + + Notes: + This is the meat of our stack that will be built in app.py of the CDK application. + + Lambda inline code ex. => You can use the following in lieu of AssetCode() for inline code: + with open("lambda/dbwrite.py", encoding="utf8") as fp: + handler_code = fp.read() + ...code=_lambda.InlineCode(handler_code)... + *Please consider char limits for inline code write. + """ + + def __init__(self, scope: core.Construct, id: str, **kwargs) -> None: + """Invoke the base class constructor via super with the received scope, id, and props + + Args: + scope: Defines scope in which this custom construct stack is created. + id (str): Defines local identity of the construct. Must be unique amongst constructs + within the same scope, as it's used to formulate the cF logical id for each resource + defined in this scope. + kwargs: Lots of possibilities + """ + + # example of passing app.py level params to stack class + self.stage=kwargs['stage'] + kwargs={} + + super().__init__(scope, id, **kwargs) + + # Resources to create + s3_bucket = s3.Bucket( + self, "Bucket", + bucket_name=f"asteroids-{self.stage}", + versioned=False, + removal_policy=core.RemovalPolicy.DESTROY # NOT recommended for production code + ) + + ddb_asteroids_table = ddb.Table( + self, "Table", + table_name="asteroids_table", + partition_key={ + "name": "id", + "type": ddb.AttributeType.STRING + }, + removal_policy=core.RemovalPolicy.DESTROY # NOT recommended for production code + ) + + # Lambdas and layers + requests_layer = _lambda.LayerVersion( + self, "requests", + code=_lambda.AssetCode('layers/requests.zip')) + pandas_layer = _lambda.LayerVersion( + self, "pandas", + code=_lambda.AssetCode('layers/pandas.zip')) + pymysql_layer = _lambda.LayerVersion( + self, "pymysql", + code=_lambda.AssetCode('layers/pymysql.zip')) + + process_asteroid_data = _lambda.Function( + self, "ProcessAsteroidsLambda", + runtime=_lambda.Runtime.PYTHON_3_7, + code=_lambda.AssetCode("lambda"), + handler="asteroids.handler", + layers=[requests_layer], + environment={ + "S3_BUCKET": s3_bucket.bucket_name, + "NASA_KEY": self.node.try_get_context("NASA_KEY"), + } + ) + + db_write = _lambda.Function( + self, "DbWriteLambda", + runtime=_lambda.Runtime.PYTHON_3_7, + handler="dbwrite.handler", + layers=[pandas_layer, pymysql_layer], + code=_lambda.Code.asset('lambda'), + environment={ + "ASTEROIDS_TABLE": ddb_asteroids_table.table_name, + "S3_BUCKET": s3_bucket.bucket_name, + "SCHEMA": self.node.try_get_context("SCHEMA"), + "REGION": self.node.try_get_context("REGION"), + "DB_SECRETS": self.node.try_get_context("DB_SECRETS_REF"), + "TOPIC_ARN": self.node.try_get_context("TOPIC_ARN") + } + ) + + # Rules and Events + json_rule = events.Rule( + self, "JSONRule", + schedule=events.Schedule.cron( + minute="15", + hour="*", + month="*", + week_day="*", + year="*" + ) + ) + + csv_rule = events.Rule( + self, "CSVRule", + schedule=events.Schedule.cron( + minute="30", + hour="*", + month="*", + week_day="*", + year="*" + ) + ) + + # add lambda function target as well as custom trigger input to rules + json_rule.add_target( + targets.LambdaFunction( + process_asteroid_data, + event=events.RuleTargetInput.from_text("json") + ) + ) + csv_rule.add_target( + targets.LambdaFunction( + process_asteroid_data, + event=events.RuleTargetInput.from_text("csv") + ) + ) + # create s3 notification for the db_write function + notify_lambda = s3n.LambdaDestination(db_write) + # assign 'notify_lambda' notification for 'OBJECT_CREATED' event type + s3_bucket.add_event_notification(s3.EventType.OBJECT_CREATED, notify_lambda) + + # Permissions + s3_bucket.grant_read_write(process_asteroid_data) + s3_bucket.grant_read_write(db_write) + ddb_asteroids_table.grant_read_write_data(db_write) diff --git a/python/lambda-ddb-mysql-etl-pipeline/lambda/asteroids.py b/python/lambda-ddb-mysql-etl-pipeline/lambda/asteroids.py new file mode 100644 index 000000000..a6afcf3b4 --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/lambda/asteroids.py @@ -0,0 +1,155 @@ +import requests +import json +from requests.exceptions import Timeout +from requests.exceptions import HTTPError +from botocore.exceptions import ClientError +from datetime import date +import csv +import os +import boto3 +import logging + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + +class Asteroids: + """Client to NASA API and execution interface to branch data processing by file type. + + Notes: + This class doesn't look like a normal class. It is a simple example of how one might + workaround AWS Lambda's limitations of class use in handlers. It also allows for + better organization of code to simplify this example. If one planned to add + other NASA endpoints or process larger amounts of Asteroid data for both .csv and .json formats, + asteroids_json and asteroids_csv should be modularized and divided into separate lambdas + where stepfunction orchestration is implemented for a more comprehensive workflow. + However, for the sake of this demo I'm keeping it lean and easy. + """ + + def execute(self, format): + """Serves as logical interface to assign class attributes and execute class methods + + Raises: + Exception: If file format is not of .json or .csv file types. + Notes: + Have fun! + """ + self.file_format=format + self.today=date.today().strftime('%Y-%m-%d') + # method call below used when Secrets Manager integrated. See get_secret.__doc__ for more. + # self.api_key=get_secret('nasa_api_key') + self.api_key=os.environ["NASA_KEY"] + self.endpoint=f"""https://api.nasa.gov/neo/rest/v1/feed?start_date={self.today}&end_date={self.today}&api_key={self.api_key}""" + self.response_object=self.nasa_client(self.endpoint) + self.processed_response=self.process_asteroids(self.response_object) + if self.file_format == "json": + self.asteroids_json(self.processed_response) + elif self.file_format == "csv": + self.asteroids_csv(self.processed_response) + else: + raise Exception("FILE FORMAT NOT RECOGNIZED") + self.write_to_s3() + + def nasa_client(self, endpoint): + """Client component for API call to NASA endpoint. + + Args: + endpoint (str): Parameterized url for API call. + Raises: + Timeout: If connection not made in 5s and/or data not retrieved in 15s. + HTTPError & Exception: Self-explanatory + Notes: + See Cloudwatch logs for debugging. + """ + try: + response = requests.get(endpoint, timeout=(5, 15)) + except Timeout as timeout: + print(f"NASA GET request timed out: {timeout}") + except HTTPError as http_err: + print(f"HTTP error occurred: {http_err}") + except Exception as err: + print(f'Other error occurred: {err}') + else: + return json.loads(response.content) + + def process_asteroids(self, payload): + """Process old, and create new, data object with content from response. + + Args: + payload (b'str'): Binary string of asteroid data to be processed. + """ + near_earth_objects = payload["near_earth_objects"][f"{self.today}"] + asteroids = [] + for neo in near_earth_objects: + asteroid_object = { + "id" : neo['id'], + "name" : neo['name'], + "hazard_potential" : neo['is_potentially_hazardous_asteroid'], + "est_diameter_min_ft": neo['estimated_diameter']['feet']['estimated_diameter_min'], + "est_diameter_max_ft": neo['estimated_diameter']['feet']['estimated_diameter_max'], + "miss_distance_miles": [item['miss_distance']['miles'] for item in neo['close_approach_data']], + "close_approach_exact_time": [item['close_approach_date_full'] for item in neo['close_approach_data']] + } + asteroids.append(asteroid_object) + + return asteroids + + def asteroids_json(self, payload): + """Creates json object from payload content then writes to .json file. + + Args: + payload (b'str'): Binary string of asteroid data to be processed. + """ + json_file = open(f"/tmp/asteroids_{self.today}.json",'w') + json_file.write(json.dumps(payload, indent=4)) + json_file.close() + + def asteroids_csv(self, payload): + """Creates .csv object from payload content then writes to .csv file. + """ + csv_file=open(f"/tmp/asteroids_{self.today}.csv",'w', newline='\n') + fields=list(payload[0].keys()) + writer=csv.DictWriter(csv_file, fieldnames=fields) + writer.writeheader() + writer.writerows(payload) + csv_file.close() + + def get_secret(self): + """Gets secret from AWS Secrets Manager + + Notes: + Not necessary for CDK example but required in regular envs + thus leaving for use + """ + try: + session = boto3.session.Session() + client = session.client(service_name='secretsmanager' + , region_name=os.environ['REGION']) + SECRET = client.get_secret_value(SecretId=os.environ['LAMBDA_DWR_SECRET']) + if 'SecretString' in SECRET: + SECRETS = json.loads(SECRET['SecretString']) + else: + SECRETS = json.loads(b64decode(SECRET['SecretBinary'])) + except Exception: + logger.error("ERROR: Unable to GET/Process DWR Secret") + + return SECRETS + + def write_to_s3(self): + """Uploads both .json and .csv files to s3 + """ + s3 = boto3.client('s3') + s3.upload_file(f"/tmp/asteroids_{self.today}.{self.file_format}", os.environ['S3_BUCKET'], f"asteroid_data/asteroids_{self.today}.{self.file_format}") + + +def handler(event, context): + """Instantiates class and triggers execution method. + + Args: + event (dict): Lists a custom dict that determines interface control flow--i.e. `csv` or `json`. + context (obj): Provides methods and properties that contain invocation, function and + execution environment information. + *Not used herein. + """ + asteroids = Asteroids() + asteroids.execute(event) + \ No newline at end of file diff --git a/python/lambda-ddb-mysql-etl-pipeline/lambda/dbwrite.py b/python/lambda-ddb-mysql-etl-pipeline/lambda/dbwrite.py new file mode 100644 index 000000000..2b675e4ad --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/lambda/dbwrite.py @@ -0,0 +1,200 @@ +import json +import os +import decimal +import mimetypes +import boto3 +import logging +import pymysql +import pandas + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + +ddb = boto3.resource('dynamodb') +s3 = boto3.client('s3') +ddb_table = ddb.Table(os.environ['ASTEROIDS_TABLE']) +_lambda = boto3.client('lambda') +db_conn = None +SCHEMA = os.environ['SCHEMA'] + +def handler(event, context): + """Retrieve s3 object and write to dynamo or mysql table(not-functional in this example, thus comments) + + Args: + event (dict, list, str, int, float, NoneType): Provides event specific data to the handler. It + can also be of type list, str, int, float, or NoneType. + context (obj): Provides methods and properties that contain invocation, function and + execution environment information. + Raises: + Exception: If file format is not of .json or .csv file types. + + Notes: + A call to `write_to_mysql()` will require that you setup a DB Connection + and set `db_conn` herein to `True`. Otherwise, the current example will + only print results to CloudWatch Logs. Pandas & PyMySQL archives are + included as layers in the project. + """ + s3_key=event['Records'][0]['s3']['object']['key'] + stream_obj=s3.get_object(Bucket=os.environ['S3_BUCKET'], Key=s3_key) + data = stream_obj['Body'].read().decode('utf-8') + if mimetypes.guess_type(s3_key)[0] == "application/json": + asteroid_list=json.loads(data, parse_float=decimal.Decimal) + write_to_ddb(asteroid_list) + elif mimetypes.guess_type(s3_key)[0] == "text/csv": + write_to_mysql(data, 'asteroids') + else: + raise Exception("FILE FORMAT NOT RECOGNIZED") + +def write_to_ddb(ast_list): + """Puts/Writes each JSON record to DynamoDB. + + Args: + ast_list (obj): A JSON list of objects that contain Asteroid data. + """ + for index in ast_list: + ddb_table.put_item(Item=index) + +def write_to_mysql(ast_list, t_suffix): + mysql_attrs=get_mysql_attrs(t_suffix) + dynamic_mysql_crud_ops(ast_list, t_suffix, mysql_attrs) + +def get_mysql_attrs(t_suffix): + """Gets table attributes from Information Schema & creates new staging table, if necessary + + Args: + t_suffix (str): The table name suffix of the write target table. + Returns: + attribute_list (list): Array of column names in the specified db table. + """ + if db_conn is None: + return "`mysql_attrs` ref from `get_mysql_attrs()`" + else: + # Gets DB Creds from AWS Secrets Manager + try: + session = boto3.session.Session() + client = session.client(service_name='secretsmanager' + , region_name=os.environ['REGION']) + SECRET = client.get_secret_value(SecretId=os.environ['DB_SECRETS_REF']) + if 'SecretString' in SECRET: + SECRETS = json.loads(SECRET['SecretString']) + else: + SECRETS = json.loads(b64decode(SECRET['SecretBinary'])) + except Exception: + logger.error("ERROR: Unable to GET DB Credentials from Secrets Manager") + + try: + connection = pymysql.connect(host=SECRETS['MYSQL_ENDPOINT'], port=3306 + , user=SECRETS['MYSQL_USER'], password=SECRETS['MYSQL_PASSWD'] + , autocommit=True, connect_timeout=5) + except pymysql.MySQLError: + logger.error("MySQLError: MySQL Connection Issue") + cursor_obj = connection.cursor() + table_check = f""" + SELECT + COUNT(*) + FROM information_schema.tables + WHERE TABLE_SCHEMA = '{SCHEMA}' + AND TABLE_NAME = 'table_{t_suffix}' + """ + cursor_obj.execute(table_check) + if cursor_obj.fetchone()[0] == 0: + logger.info('No table exists!!!') + sns = boto3.client('sns') + response = sns.publish( + TopicArn=os.environ['TOPIC_ARN'], + Message=f"""A new t_suffix has been detected. A new staging table has been created with + relevant tuples loaded to : {SCHEMA}.table_{t_suffix}""" + ) + + create_table = f""" + CREATE TABLE IF NOT EXISTS {SCHEMA}.table_{t_suffix} ( + id VARCHAR(7), + `name` VARCHAR(12), + created TIMESTAMP DEFAULT CURRENT_TIMESTAMP + PRIMARY KEY (id), + UNIQUE KEY comp_idx_id_name (id,`name`) + )""" + cursor_obj.execute(create_table) + + get_attrs = f""" + SELECT + COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '{SCHEMA}' AND TABLE_NAME = 'table_{t_suffix}' + """ + cursor_obj.execute(get_attrs) + + return [column[0] for column in dwr_cursor.fetchall()] + + else: + get_attrs = f""" + SELECT + COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '{SCHEMA}' AND TABLE_NAME = 'table_{t_suffix}'""" + cursor_obj.execute(get_attrs) + return [column[0] for column in cursor_obj.fetchall()] + +def dynamic_mysql_crud_ops(ast_list, t_suffix, mysql_attrs): + """Backs dataset attrs against table attrs then runs CRUD ops dynamically. + + Args: + ast_list (str): String of s3 body data. + t_suffix (str): The table name suffix of the write target table. + mysql_attrs (list): Array of column names from the DB table in question. + + Notes: + Prints results to CloudWatch Logs will work if one includes DB Credentials + and removes `None` flag on `db_conn` herein. + """ + if db_conn is None: + logger.info(f"VALUES for ast_list, t_suffix, mysql_attrs respectively: \n {ast_list} | {t_suffix} | {mysql_attrs}") + return { + "asteroid_list":ast_list, + "table_suffix":t_suffix, + "table_attributes": mysql_attrs + } + else: + # Setting header to None to trick Pandas as it ignores them otherwise + df_attrs = pandas.read_csv(ast_list.read()), sep='|', header=None, index_col=False, dtype=str, keep_default_na=False, nrows=1) + # Creates final tuple of attributes from dataframe attributes + final_attrs = [tuple(x) for x in df_attrs.values] + final_attrs = final_attrs[0] + # Extract rows from file and create tuples + df_tuples = pandas.read_csv(ast_list.read()), sep='|', na_values=None, keep_default_na=False, dtype=str) + df_tuples = df_tuples.apply(tuple, axis=1) + + # Compare dwr attrs against s3 attrs to determine ALTER statement requirement and run, if necessary + for attr in final_attrs: + if attr in mysql_attrs: + logger.info('No new attribute to record') + else: + logger.info(attr) + alter_attr_statement = f""" + ALTER TABLE {SCHEMA}.table_{t_suffix} ADD `{attr}` VARCHAR(50)""" + cursor_obj.execute(alter_attr_statement) + + # Strip empty quotes in df_tuples, transform to 'None' and build final tuples array for INSERT + tuples_nullified = [] + for tup in df_tuples: + tuples_nullified.append((tuple(None if elem == '' else elem for elem in tup))) + + # Build dynamic attribute strings for INSERT + attr_str_insert = "" + attr_var_insert = "" + for attr in final_attrs: + if attr != final_attrs[len(final_attrs) -1]: + attr_str_insert += "`"+attr+"`, " + attr_var_insert += "%s," + else: + attr_str_insert += "`"+attr+"`" + attr_var_insert += "%s" + + # Dynamic Insert + replace_statement = f""" + REPLACE INTO {SCHEMA}.table_{t_suffix} + ({attr_str_insert}) VALUES ({attr_var_insert})""" + cursor_obj.executemany(replace_statement, tuples_nullified) + logger.info(cursor_obj.rowcount, f"Record(s) inserted successfully into table_{t_suffix}") + + return f"Record(s) inserted successfully into table_{t_suffix}" \ No newline at end of file diff --git a/python/lambda-ddb-mysql-etl-pipeline/layers/pandas.zip b/python/lambda-ddb-mysql-etl-pipeline/layers/pandas.zip new file mode 100644 index 000000000..2c2bb8d64 Binary files /dev/null and b/python/lambda-ddb-mysql-etl-pipeline/layers/pandas.zip differ diff --git a/python/lambda-ddb-mysql-etl-pipeline/layers/pymysql.zip b/python/lambda-ddb-mysql-etl-pipeline/layers/pymysql.zip new file mode 100644 index 000000000..5b1cfeff7 Binary files /dev/null and b/python/lambda-ddb-mysql-etl-pipeline/layers/pymysql.zip differ diff --git a/python/lambda-ddb-mysql-etl-pipeline/layers/requests.zip b/python/lambda-ddb-mysql-etl-pipeline/layers/requests.zip new file mode 100644 index 000000000..8133bf34a Binary files /dev/null and b/python/lambda-ddb-mysql-etl-pipeline/layers/requests.zip differ diff --git a/python/lambda-ddb-mysql-etl-pipeline/requirements.txt b/python/lambda-ddb-mysql-etl-pipeline/requirements.txt new file mode 100644 index 000000000..ecf975e2f --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/requirements.txt @@ -0,0 +1 @@ +-e . \ No newline at end of file diff --git a/python/lambda-ddb-mysql-etl-pipeline/setup.py b/python/lambda-ddb-mysql-etl-pipeline/setup.py new file mode 100644 index 000000000..b07428dd3 --- /dev/null +++ b/python/lambda-ddb-mysql-etl-pipeline/setup.py @@ -0,0 +1,51 @@ +import setuptools + + +with open("README.md") as fp: + long_description = fp.read() + + +setuptools.setup( + name="etl_pipeline_cdk", + version="0.0.1", + + description="A CDK Python App that defines a .csv & .json file processing pipeline", + long_description=long_description, + long_description_content_type="text/markdown", + + author="Benjamin E. Farr", + + package_dir={"": "etl_pipeline_cdk"}, + packages=setuptools.find_packages(where="etl_pipeline_cdk"), + + install_requires=[ + "aws-cdk.core", + "aws-cdk.aws-s3", + "aws-cdk.aws-s3-notifications", + "aws-cdk.aws-lambda", + "aws-cdk.aws-dynamodb", + "aws-cdk.aws-events", + "aws-cdk.aws-events-targets", + ], + + python_requires=">=3.6", + + classifiers=[ + "Development Status :: 4 - Beta", + + "Intended Audience :: Developers", + + "License :: OSI Approved :: Apache Software License", + + "Programming Language :: JavaScript", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + + "Topic :: Software Development :: Code Generators", + "Topic :: Utilities", + + "Typing :: Typed", + ], +)