From e98fa4f49d9dd35ce2ed3096efa1b62c72af4125 Mon Sep 17 00:00:00 2001 From: Sandrine Balley <42965897+LBHSBALLEY@users.noreply.github.com> Date: Thu, 28 Sep 2023 14:54:51 +0100 Subject: [PATCH] Update parking_cycle_hangars_denormalisation.py new email column in cycle hangars + pushdown predicates --- .../parking_cycle_hangars_denormalisation.py | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py b/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py index 029e1d0b2..35512ba89 100644 --- a/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py +++ b/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py @@ -6,7 +6,7 @@ from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame -from scripts.helpers.helpers import get_glue_env_var +from scripts.helpers.helpers import get_glue_env_var, create_pushdown_predicate_for_max_date_partition_value environment = get_glue_env_var("environment") def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: @@ -87,7 +87,8 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra B.ADDRESS2 , B.ADDRESS3 , B.POSTCODE , - B.TELEPHONE_NUMBER + B.TELEPHONE_NUMBER, + B.EMAIL_ADDRESS FROM HangarAlloc as A LEFT JOIN licence_party as B ON A.party_id = B.business_party_id WHERE RW = 1 AND Allocation_Status NOT IN ('cancelled', 'key_returned') @@ -117,22 +118,39 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_licence_party", transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] -DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_licence_party", transformation_ctx = "DataSource0") +DataSource0 = glueContext.create_dynamic_frame.from_catalog( + database = "dataplatform-" + environment + "-liberator-raw-zone", + table_name = "liberator_licence_party", + transformation_ctx = "DataSource0", + push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_licence_party", 'import_date'), + additional_options = {"mergeSchema": "true"}) ## @type: DataSource ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_allocations", transformation_ctx = "DataSource3"] ## @return: DataSource3 ## @inputs: [] -DataSource3 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_allocations", transformation_ctx = "DataSource3") +DataSource3 = glueContext.create_dynamic_frame.from_catalog( + database = "dataplatform-" + environment + "-liberator-raw-zone", + table_name = "liberator_hangar_allocations", + push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_allocations", 'import_date'), + transformation_ctx = "DataSource3") ## @type: DataSource ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_types", transformation_ctx = "DataSource2"] ## @return: DataSource2 ## @inputs: [] -DataSource2 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_types", transformation_ctx = "DataSource2") +DataSource2 = glueContext.create_dynamic_frame.from_catalog( + database = "dataplatform-" + environment + "-liberator-raw-zone", + table_name = "liberator_hangar_types", + push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_types", 'import_date'), + transformation_ctx = "DataSource2") ## @type: DataSource ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_details", transformation_ctx = "DataSource1"] ## @return: DataSource1 ## @inputs: [] -DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_details", transformation_ctx = "DataSource1") +DataSource1 = glueContext.create_dynamic_frame.from_catalog( + database = "dataplatform-" + environment + "-liberator-raw-zone", + table_name = "liberator_hangar_details", + push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_details", 'import_date') + transformation_ctx = "DataSource1") ## @type: SqlCode ## @args: [sqlAliases = {"liberator_hangar_details": DataSource1, "liberator_hangar_types": DataSource2, "liberator_hangar_allocations": DataSource3, "liberator_licence_party": DataSource0}, sqlName = SqlQuery0, transformation_ctx = "Transform0"] ## @return: Transform0 @@ -147,4 +165,4 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra DataSink0.setFormat("glueparquet") DataSink0.writeFrame(Transform0) -job.commit() \ No newline at end of file +job.commit()