Skip to content

Commit

Permalink
Dpp 584 academy bookmark version handling (#1529)
Browse files Browse the repository at this point in the history
* add GetCrawler action to permissions

(cherry picked from commit 25bec05)

* add BOOKMARK_CONTEXT job parameter

(cherry picked from commit d5ae51c)

* add BOOKMARK_CONTEXT handling

* formatting

* extract initialise_job to methods

* remove type hints

* accidentally deleted the function

* import logging

* point assert at correct module

* one too many ctrl z

* add _ between job name and bookmark context

* remove 'Job' type

* add logger to job initialisation
  • Loading branch information
timburke-hackit authored Dec 4, 2023
1 parent 5e97ebc commit 187489b
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 36 deletions.
32 changes: 32 additions & 0 deletions scripts/helpers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import re
import sys
import unicodedata
import logging
from typing import Dict, Optional

import boto3
from awsglue.utils import getResolvedOptions
Expand Down Expand Up @@ -577,3 +579,33 @@ def copy_file(source_bucket, source_path, source_filename, target_bucket, target
except Exception as error:
## do nothing
print('Error Occured: copy_file', error)


def initialise_job(args: Dict[str, str], job, logger: Optional[logging.Logger] = None) -> None:
"""
Initialize the AWS Glue job with specific arguments.
Args:
args (Dict[str, str]): Arguments for the job initialization, must contain "JOB_NAME".
job (Job): The AWS Glue job instance.
logger (Optional[logging.Logger]): Optional logger for logging messages.
The function initializes the job with "JOB_NAME" and, if provided, "BOOKMARK_CONTEXT" separated by an underscore.
If an exception occurs, it logs the error.
"""
if logger is None:
logger = logging.getLogger(__name__)

job_name = args.get("JOB_NAME")
if not job_name:
logger.error("JOB_NAME is required in args")
raise ValueError("JOB_NAME is required in args")

bookmark_context = args.get("BOOKMARK_CONTEXT")
full_job_name = job_name + (f"_{bookmark_context}" if bookmark_context else "")

try:
job.init(full_job_name, args)
except Exception as e:
logger.error(f"Failed to initialise job: {e}")
raise
78 changes: 46 additions & 32 deletions scripts/jobs/copy_tables_landing_to_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,59 +8,73 @@
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

from scripts.helpers.helpers import get_glue_env_var, add_timestamp_column, PARTITION_KEYS
from scripts.helpers.helpers import (
add_timestamp_column,
get_glue_env_var,
initialise_job,
PARTITION_KEYS,
)

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args)
logger = glue_context.get_logger()

initialise_job(args, job, logger)

glue_client = boto3.client('glue')
glue_client = boto3.client("glue")
database_name_source = get_glue_env_var("glue_database_name_source")
database_name_target = get_glue_env_var("glue_database_name_target")
bucket_target = get_glue_env_var("s3_bucket_target")
prefix = get_glue_env_var("s3_prefix")
table_filter_expression = get_glue_env_var("table_filter_expression")
logger = glue_context.get_logger()


filtering_pattern = re.compile(table_filter_expression)
tables_to_copy = [table for table in glue_context.tableNames(dbName=database_name_source) if filtering_pattern.match(table)]
tables_to_copy = [
table
for table in glue_context.tableNames(dbName=database_name_source)
if filtering_pattern.match(table)
]
logger.info(f"Tables to copy: {tables_to_copy}")
logger.info(f"Number of tables to copy: {len(tables_to_copy)}")

for table_name in tables_to_copy:
logger.info(f"Starting to copy table {database_name_source}.{table_name}")
table_data_frame = glue_context.create_dynamic_frame.from_catalog(
name_space = database_name_source,
table_name = table_name,
transformation_ctx = "data_source" + table_name,
push_down_predicate = "import_date>=date_format(date_sub(current_date, 5), 'yyyyMMdd')"
).toDF()
logger.info(f"Starting to copy table {database_name_source}.{table_name}")
table_data_frame = glue_context.create_dynamic_frame.from_catalog(
name_space=database_name_source,
table_name=table_name,
transformation_ctx="data_source" + table_name,
push_down_predicate="import_date>=date_format(date_sub(current_date, 5), 'yyyyMMdd')",
).toDF()

if(len(table_data_frame.columns) == 0):
logger.info(f"Aborting copy of data for table {database_name_source}.{table_name}, as it has empty columns.")
continue
if len(table_data_frame.columns) == 0:
logger.info(
f"Aborting copy of data for table {database_name_source}.{table_name}, as it has empty columns."
)
continue

table_with_timestamp = add_timestamp_column(table_data_frame)
table_with_timestamp = add_timestamp_column(table_data_frame)

data_sink = glue_context.getSink(
path = "s3://" + bucket_target + "/" + prefix + table_name + "/",
connection_type = "s3",
updateBehavior = "UPDATE_IN_DATABASE",
partitionKeys = PARTITION_KEYS,
enableUpdateCatalog = True,
transformation_ctx = "data_sink" + table_name
)
data_sink.setCatalogInfo(
catalogDatabase = database_name_target,
catalogTableName = table_name
)
data_sink.setFormat("glueparquet")
data_sink.writeFrame(DynamicFrame.fromDF(table_with_timestamp, glue_context, "result_dataframe"))
logger.info(f"Finished copying table {database_name_source}.{table_name}")
data_sink = glue_context.getSink(
path="s3://" + bucket_target + "/" + prefix + table_name + "/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=PARTITION_KEYS,
enableUpdateCatalog=True,
transformation_ctx="data_sink" + table_name,
)
data_sink.setCatalogInfo(
catalogDatabase=database_name_target, catalogTableName=table_name
)
data_sink.setFormat("glueparquet")
data_sink.writeFrame(
DynamicFrame.fromDF(table_with_timestamp, glue_context, "result_dataframe")
)
logger.info(f"Finished copying table {database_name_source}.{table_name}")

job.commit()
55 changes: 55 additions & 0 deletions scripts/tests/test_initialise_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import unittest
from unittest.mock import patch, MagicMock
from scripts.helpers.helpers import initialise_job


class TestInitialiseJob(unittest.TestCase):
def setUp(self):
self.mock_job = MagicMock()
self.mock_logger = MagicMock()
self.args_with_bookmark = {
"JOB_NAME": "test_job",
"BOOKMARK_CONTEXT": "test_context",
}
self.args_without_bookmark = {"JOB_NAME": "test_job"}

def test_initialise_job_with_bookmark_context_and_logger(self):
initialise_job(self.args_with_bookmark, self.mock_job, self.mock_logger)
self.mock_job.init.assert_called_once_with(
"test_job_test_context", self.args_with_bookmark
)
self.mock_logger.error.assert_not_called()

def test_initialise_job_without_bookmark_context_and_logger(self):
initialise_job(self.args_without_bookmark, self.mock_job, self.mock_logger)
self.mock_job.init.assert_called_once_with(
"test_job", self.args_without_bookmark
)
self.mock_logger.error.assert_not_called()

def test_initialise_job_without_job_name_and_logger(self):
with self.assertRaises(ValueError):
initialise_job({}, self.mock_job, self.mock_logger)
self.mock_logger.error.assert_called_once()

@patch("scripts.helpers.helpers.logging")
def test_initialise_job_with_initialisation_exception_and_default_logger(
self, mock_logging
):
self.mock_job.init.side_effect = Exception("Test Exception")
with self.assertRaises(Exception):
initialise_job(self.args_with_bookmark, self.mock_job)
mock_logging.getLogger.assert_called_with("scripts.helpers.helpers")
self.mock_job.init.assert_called_once()

def test_initialise_job_with_initialisation_exception_and_custom_logger(self):
self.mock_job.init.side_effect = Exception("Test Exception")
with self.assertRaises(Exception):
initialise_job(self.args_with_bookmark, self.mock_job, self.mock_logger)
self.mock_logger.error.assert_called_with(
"Failed to initialise job: Test Exception"
)


if __name__ == "__main__":
unittest.main()
4 changes: 3 additions & 1 deletion state-machine-definitions/academy_ingestion.asl.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
"Parameters": {
"JobName": "${AcademyLandingToRawGlueJobName}",
"Arguments": {
"--BOOKMARK_CONTEXT.$": "$.BookmarkContext",
"--table_filter_expression.$": "$.FilterString",
"--s3_prefix.$": "$.S3Prefix",
"--glue_database_name_target": "$.GlueDatabaseTarget"
Expand All @@ -134,7 +135,8 @@
"ItemSelector": {
"FilterString.$": "$$.Map.Item.Value.FilterString",
"S3Prefix.$": "$$.Map.Item.Value.S3Prefix",
"GlueDatabaseTarget.$": "$$.Map.Item.Value.GlueDatabaseTarget"
"GlueDatabaseTarget.$": "$$.Map.Item.Value.GlueDatabaseTarget",
"BookmarkContext.$": "$$.Map.Item.Value.BookmarkContext"
},
"End": true,
"ResultPath": "$.copyToRawOutput"
Expand Down
10 changes: 7 additions & 3 deletions terraform/core/13-mssql-ingestion.tf
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ locals {
{
S3Prefix = "revenues/",
FilterString = "(^lbhaliverbviews_core_(?!hb).*|^lbhaliverbviews_current_(?!hb).*|^lbhaliverbviews_xdbvw_.*)",
GlueDatabaseTarget = module.department_revenues.raw_zone_catalog_database_name
GlueDatabaseTarget = module.department_revenues.raw_zone_catalog_database_name,
BookmarkContext = "revenues"
},
{
S3Prefix = "benefits-housing-needs/",
FilterString = "(^lbhaliverbviews_core_hb.*|^lbhaliverbviews_current_hb.*)",
GlueDatabaseTarget = module.department_benefits_and_housing_needs.raw_zone_catalog_database_name
GlueDatabaseTarget = module.department_benefits_and_housing_needs.raw_zone_catalog_database_name,
BookmarkContext = "benefits-housing-needs"
}
]
}
Expand Down Expand Up @@ -145,6 +147,7 @@ module "academy_glue_job" {
number_of_workers_for_glue_job = local.number_of_glue_workers
max_concurrent_runs_of_glue_job = length(local.academy_table_filters)
job_parameters = {
"--BOOKMARK_CONTEXT" = ""
"--source_data_database" = module.academy_mssql_database_ingestion[0].ingestion_database_name
"--s3_ingestion_bucket_target" = "s3://${module.landing_zone.bucket_id}/academy/"
"--s3_ingestion_details_target" = "s3://${module.landing_zone.bucket_id}/academy/ingestion-details/"
Expand Down Expand Up @@ -230,7 +233,8 @@ data "aws_iam_policy_document" "academy_step_functions_policy" {
"glue:GetJobRun",
"glue:GetJobRuns",
"glue:BatchStopJobRun",
"glue:StartCrawler"
"glue:StartCrawler",
"glue:GetCrawler"
]
resources = ["*"]
}
Expand Down

0 comments on commit 187489b

Please sign in to comment.