Skip to content

Commit

Permalink
Update parking_cycle_hangars_denormalisation.py
Browse files Browse the repository at this point in the history
new email column in cycle hangars + pushdown predicates
  • Loading branch information
LBHSBALLEY authored Sep 28, 2023
1 parent 0c0770f commit e98fa4f
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions scripts/jobs/parking/parking_cycle_hangars_denormalisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

from scripts.helpers.helpers import get_glue_env_var
from scripts.helpers.helpers import get_glue_env_var, create_pushdown_predicate_for_max_date_partition_value
environment = get_glue_env_var("environment")

def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
Expand Down Expand Up @@ -87,7 +87,8 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
B.ADDRESS2 ,
B.ADDRESS3 ,
B.POSTCODE ,
B.TELEPHONE_NUMBER
B.TELEPHONE_NUMBER,
B.EMAIL_ADDRESS
FROM HangarAlloc as A
LEFT JOIN licence_party as B ON A.party_id = B.business_party_id
WHERE RW = 1 AND Allocation_Status NOT IN ('cancelled', 'key_returned')
Expand Down Expand Up @@ -117,22 +118,39 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_licence_party", transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_licence_party", transformation_ctx = "DataSource0")
DataSource0 = glueContext.create_dynamic_frame.from_catalog(
database = "dataplatform-" + environment + "-liberator-raw-zone",
table_name = "liberator_licence_party",
transformation_ctx = "DataSource0",
push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_licence_party", 'import_date'),
additional_options = {"mergeSchema": "true"})
## @type: DataSource
## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_allocations", transformation_ctx = "DataSource3"]
## @return: DataSource3
## @inputs: []
DataSource3 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_allocations", transformation_ctx = "DataSource3")
DataSource3 = glueContext.create_dynamic_frame.from_catalog(
database = "dataplatform-" + environment + "-liberator-raw-zone",
table_name = "liberator_hangar_allocations",
push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_allocations", 'import_date'),
transformation_ctx = "DataSource3")
## @type: DataSource
## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_types", transformation_ctx = "DataSource2"]
## @return: DataSource2
## @inputs: []
DataSource2 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_types", transformation_ctx = "DataSource2")
DataSource2 = glueContext.create_dynamic_frame.from_catalog(
database = "dataplatform-" + environment + "-liberator-raw-zone",
table_name = "liberator_hangar_types",
push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_types", 'import_date'),
transformation_ctx = "DataSource2")
## @type: DataSource
## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_details", transformation_ctx = "DataSource1"]
## @return: DataSource1
## @inputs: []
DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_details", transformation_ctx = "DataSource1")
DataSource1 = glueContext.create_dynamic_frame.from_catalog(
database = "dataplatform-" + environment + "-liberator-raw-zone",
table_name = "liberator_hangar_details",
push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_details", 'import_date')
transformation_ctx = "DataSource1")
## @type: SqlCode
## @args: [sqlAliases = {"liberator_hangar_details": DataSource1, "liberator_hangar_types": DataSource2, "liberator_hangar_allocations": DataSource3, "liberator_licence_party": DataSource0}, sqlName = SqlQuery0, transformation_ctx = "Transform0"]
## @return: Transform0
Expand All @@ -147,4 +165,4 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
DataSink0.setFormat("glueparquet")
DataSink0.writeFrame(Transform0)

job.commit()
job.commit()

0 comments on commit e98fa4f

Please sign in to comment.