diff --git a/scripts/helpers/helpers.py b/scripts/helpers/helpers.py index a3000dc7d..75b6bd88f 100644 --- a/scripts/helpers/helpers.py +++ b/scripts/helpers/helpers.py @@ -2,6 +2,8 @@ import re import sys import unicodedata +import logging +from typing import Dict, Optional import boto3 from awsglue.utils import getResolvedOptions @@ -577,3 +579,33 @@ def copy_file(source_bucket, source_path, source_filename, target_bucket, target except Exception as error: ## do nothing print('Error Occured: copy_file', error) + + +def initialise_job(args: Dict[str, str], job, logger: Optional[logging.Logger] = None) -> None: + """ + Initialize the AWS Glue job with specific arguments. + + Args: + args (Dict[str, str]): Arguments for the job initialization, must contain "JOB_NAME". + job (Job): The AWS Glue job instance. + logger (Optional[logging.Logger]): Optional logger for logging messages. + + The function initializes the job with "JOB_NAME" and, if provided, "BOOKMARK_CONTEXT" separated by an underscore. + If an exception occurs, it logs the error. + """ + if logger is None: + logger = logging.getLogger(__name__) + + job_name = args.get("JOB_NAME") + if not job_name: + logger.error("JOB_NAME is required in args") + raise ValueError("JOB_NAME is required in args") + + bookmark_context = args.get("BOOKMARK_CONTEXT") + full_job_name = job_name + (f"_{bookmark_context}" if bookmark_context else "") + + try: + job.init(full_job_name, args) + except Exception as e: + logger.error(f"Failed to initialise job: {e}") + raise \ No newline at end of file diff --git a/scripts/jobs/copy_tables_landing_to_raw.py b/scripts/jobs/copy_tables_landing_to_raw.py index 2315d8421..76b6cc6a6 100644 --- a/scripts/jobs/copy_tables_landing_to_raw.py +++ b/scripts/jobs/copy_tables_landing_to_raw.py @@ -8,59 +8,73 @@ from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job -from scripts.helpers.helpers import get_glue_env_var, add_timestamp_column, PARTITION_KEYS +from scripts.helpers.helpers import ( + add_timestamp_column, + get_glue_env_var, + initialise_job, + PARTITION_KEYS, +) ## @params: [JOB_NAME] -args = getResolvedOptions(sys.argv, ['JOB_NAME']) +args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glue_context = GlueContext(sc) spark = glue_context.spark_session job = Job(glue_context) -job.init(args['JOB_NAME'], args) +logger = glue_context.get_logger() + +initialise_job(args, job, logger) -glue_client = boto3.client('glue') +glue_client = boto3.client("glue") database_name_source = get_glue_env_var("glue_database_name_source") database_name_target = get_glue_env_var("glue_database_name_target") bucket_target = get_glue_env_var("s3_bucket_target") prefix = get_glue_env_var("s3_prefix") table_filter_expression = get_glue_env_var("table_filter_expression") -logger = glue_context.get_logger() + filtering_pattern = re.compile(table_filter_expression) -tables_to_copy = [table for table in glue_context.tableNames(dbName=database_name_source) if filtering_pattern.match(table)] +tables_to_copy = [ + table + for table in glue_context.tableNames(dbName=database_name_source) + if filtering_pattern.match(table) +] logger.info(f"Tables to copy: {tables_to_copy}") logger.info(f"Number of tables to copy: {len(tables_to_copy)}") for table_name in tables_to_copy: - logger.info(f"Starting to copy table {database_name_source}.{table_name}") - table_data_frame = glue_context.create_dynamic_frame.from_catalog( - name_space = database_name_source, - table_name = table_name, - transformation_ctx = "data_source" + table_name, - push_down_predicate = "import_date>=date_format(date_sub(current_date, 5), 'yyyyMMdd')" - ).toDF() + logger.info(f"Starting to copy table {database_name_source}.{table_name}") + table_data_frame = glue_context.create_dynamic_frame.from_catalog( + name_space=database_name_source, + table_name=table_name, + transformation_ctx="data_source" + table_name, + push_down_predicate="import_date>=date_format(date_sub(current_date, 5), 'yyyyMMdd')", + ).toDF() - if(len(table_data_frame.columns) == 0): - logger.info(f"Aborting copy of data for table {database_name_source}.{table_name}, as it has empty columns.") - continue + if len(table_data_frame.columns) == 0: + logger.info( + f"Aborting copy of data for table {database_name_source}.{table_name}, as it has empty columns." + ) + continue - table_with_timestamp = add_timestamp_column(table_data_frame) + table_with_timestamp = add_timestamp_column(table_data_frame) - data_sink = glue_context.getSink( - path = "s3://" + bucket_target + "/" + prefix + table_name + "/", - connection_type = "s3", - updateBehavior = "UPDATE_IN_DATABASE", - partitionKeys = PARTITION_KEYS, - enableUpdateCatalog = True, - transformation_ctx = "data_sink" + table_name - ) - data_sink.setCatalogInfo( - catalogDatabase = database_name_target, - catalogTableName = table_name - ) - data_sink.setFormat("glueparquet") - data_sink.writeFrame(DynamicFrame.fromDF(table_with_timestamp, glue_context, "result_dataframe")) - logger.info(f"Finished copying table {database_name_source}.{table_name}") + data_sink = glue_context.getSink( + path="s3://" + bucket_target + "/" + prefix + table_name + "/", + connection_type="s3", + updateBehavior="UPDATE_IN_DATABASE", + partitionKeys=PARTITION_KEYS, + enableUpdateCatalog=True, + transformation_ctx="data_sink" + table_name, + ) + data_sink.setCatalogInfo( + catalogDatabase=database_name_target, catalogTableName=table_name + ) + data_sink.setFormat("glueparquet") + data_sink.writeFrame( + DynamicFrame.fromDF(table_with_timestamp, glue_context, "result_dataframe") + ) + logger.info(f"Finished copying table {database_name_source}.{table_name}") job.commit() diff --git a/scripts/tests/test_initialise_job.py b/scripts/tests/test_initialise_job.py new file mode 100644 index 000000000..c995798e9 --- /dev/null +++ b/scripts/tests/test_initialise_job.py @@ -0,0 +1,55 @@ +import unittest +from unittest.mock import patch, MagicMock +from scripts.helpers.helpers import initialise_job + + +class TestInitialiseJob(unittest.TestCase): + def setUp(self): + self.mock_job = MagicMock() + self.mock_logger = MagicMock() + self.args_with_bookmark = { + "JOB_NAME": "test_job", + "BOOKMARK_CONTEXT": "test_context", + } + self.args_without_bookmark = {"JOB_NAME": "test_job"} + + def test_initialise_job_with_bookmark_context_and_logger(self): + initialise_job(self.args_with_bookmark, self.mock_job, self.mock_logger) + self.mock_job.init.assert_called_once_with( + "test_job_test_context", self.args_with_bookmark + ) + self.mock_logger.error.assert_not_called() + + def test_initialise_job_without_bookmark_context_and_logger(self): + initialise_job(self.args_without_bookmark, self.mock_job, self.mock_logger) + self.mock_job.init.assert_called_once_with( + "test_job", self.args_without_bookmark + ) + self.mock_logger.error.assert_not_called() + + def test_initialise_job_without_job_name_and_logger(self): + with self.assertRaises(ValueError): + initialise_job({}, self.mock_job, self.mock_logger) + self.mock_logger.error.assert_called_once() + + @patch("scripts.helpers.helpers.logging") + def test_initialise_job_with_initialisation_exception_and_default_logger( + self, mock_logging + ): + self.mock_job.init.side_effect = Exception("Test Exception") + with self.assertRaises(Exception): + initialise_job(self.args_with_bookmark, self.mock_job) + mock_logging.getLogger.assert_called_with("scripts.helpers.helpers") + self.mock_job.init.assert_called_once() + + def test_initialise_job_with_initialisation_exception_and_custom_logger(self): + self.mock_job.init.side_effect = Exception("Test Exception") + with self.assertRaises(Exception): + initialise_job(self.args_with_bookmark, self.mock_job, self.mock_logger) + self.mock_logger.error.assert_called_with( + "Failed to initialise job: Test Exception" + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/state-machine-definitions/academy_ingestion.asl.json b/state-machine-definitions/academy_ingestion.asl.json index 9c9edfdaf..74767b26f 100644 --- a/state-machine-definitions/academy_ingestion.asl.json +++ b/state-machine-definitions/academy_ingestion.asl.json @@ -121,6 +121,7 @@ "Parameters": { "JobName": "${AcademyLandingToRawGlueJobName}", "Arguments": { + "--BOOKMARK_CONTEXT.$": "$.BookmarkContext", "--table_filter_expression.$": "$.FilterString", "--s3_prefix.$": "$.S3Prefix", "--glue_database_name_target": "$.GlueDatabaseTarget" @@ -134,7 +135,8 @@ "ItemSelector": { "FilterString.$": "$$.Map.Item.Value.FilterString", "S3Prefix.$": "$$.Map.Item.Value.S3Prefix", - "GlueDatabaseTarget.$": "$$.Map.Item.Value.GlueDatabaseTarget" + "GlueDatabaseTarget.$": "$$.Map.Item.Value.GlueDatabaseTarget", + "BookmarkContext.$": "$$.Map.Item.Value.BookmarkContext" }, "End": true, "ResultPath": "$.copyToRawOutput" diff --git a/terraform/core/13-mssql-ingestion.tf b/terraform/core/13-mssql-ingestion.tf index fd3906275..b82a09179 100644 --- a/terraform/core/13-mssql-ingestion.tf +++ b/terraform/core/13-mssql-ingestion.tf @@ -111,12 +111,14 @@ locals { { S3Prefix = "revenues/", FilterString = "(^lbhaliverbviews_core_(?!hb).*|^lbhaliverbviews_current_(?!hb).*|^lbhaliverbviews_xdbvw_.*)", - GlueDatabaseTarget = module.department_revenues.raw_zone_catalog_database_name + GlueDatabaseTarget = module.department_revenues.raw_zone_catalog_database_name, + BookmarkContext = "revenues" }, { S3Prefix = "benefits-housing-needs/", FilterString = "(^lbhaliverbviews_core_hb.*|^lbhaliverbviews_current_hb.*)", - GlueDatabaseTarget = module.department_benefits_and_housing_needs.raw_zone_catalog_database_name + GlueDatabaseTarget = module.department_benefits_and_housing_needs.raw_zone_catalog_database_name, + BookmarkContext = "benefits-housing-needs" } ] } @@ -145,6 +147,7 @@ module "academy_glue_job" { number_of_workers_for_glue_job = local.number_of_glue_workers max_concurrent_runs_of_glue_job = length(local.academy_table_filters) job_parameters = { + "--BOOKMARK_CONTEXT" = "" "--source_data_database" = module.academy_mssql_database_ingestion[0].ingestion_database_name "--s3_ingestion_bucket_target" = "s3://${module.landing_zone.bucket_id}/academy/" "--s3_ingestion_details_target" = "s3://${module.landing_zone.bucket_id}/academy/ingestion-details/" @@ -230,7 +233,8 @@ data "aws_iam_policy_document" "academy_step_functions_policy" { "glue:GetJobRun", "glue:GetJobRuns", "glue:BatchStopJobRun", - "glue:StartCrawler" + "glue:StartCrawler", + "glue:GetCrawler" ] resources = ["*"] }