diff --git a/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py b/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py index a157d92e9..457d58583 100644 --- a/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py +++ b/scripts/jobs/parking/parking_cycle_hangars_denormalisation.py @@ -8,11 +8,11 @@ from scripts.helpers.helpers import get_glue_env_var, create_pushdown_predicate_for_max_date_partition_value environment = get_glue_env_var("environment") -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: +def spark_sql_query(glue_context, query, mapping, transformation_ctx) -> DynamicFrame: for alias, frame in mapping.items(): frame.toDF().createOrReplaceTempView(alias) result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) + return DynamicFrame.fromDF(result, glue_context, transformation_ctx) SqlQuery0 = ''' WITH HangarTypes as ( @@ -109,15 +109,15 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) +glue_context = GlueContext(sc) +spark = glue_context.spark_session +job = Job(glue_context) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_licence_party", transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] -DataSource0 = glueContext.create_dynamic_frame.from_catalog( +DataSource0 = glue_context.create_dynamic_frame.from_catalog( database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_licence_party", transformation_ctx = "DataSource0", @@ -127,7 +127,7 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_allocations", transformation_ctx = "DataSource3"] ## @return: DataSource3 ## @inputs: [] -DataSource3 = glueContext.create_dynamic_frame.from_catalog( +DataSource3 = glue_context.create_dynamic_frame.from_catalog( database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_allocations", push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_allocations", 'import_date'), @@ -136,7 +136,7 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_types", transformation_ctx = "DataSource2"] ## @return: DataSource2 ## @inputs: [] -DataSource2 = glueContext.create_dynamic_frame.from_catalog( +DataSource2 = glue_context.create_dynamic_frame.from_catalog( database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_types", push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_types", 'import_date'), @@ -145,7 +145,7 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra ## @args: [database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_details", transformation_ctx = "DataSource1"] ## @return: DataSource1 ## @inputs: [] -DataSource1 = glueContext.create_dynamic_frame.from_catalog( +DataSource1 = glue_context.create_dynamic_frame.from_catalog( database = "dataplatform-" + environment + "-liberator-raw-zone", table_name = "liberator_hangar_details", push_down_predicate = create_pushdown_predicate_for_max_date_partition_value("dataplatform-" + environment + "-liberator-raw-zone", "liberator_hangar_details", 'import_date'), @@ -154,12 +154,12 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra ## @args: [sqlAliases = {"liberator_hangar_details": DataSource1, "liberator_hangar_types": DataSource2, "liberator_hangar_allocations": DataSource3, "liberator_licence_party": DataSource0}, sqlName = SqlQuery0, transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [dfc = DataSource1,DataSource2,DataSource3,DataSource0] -Transform0 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"liberator_hangar_details": DataSource1, "liberator_hangar_types": DataSource2, "liberator_hangar_allocations": DataSource3, "liberator_licence_party": DataSource0}, transformation_ctx = "Transform0") +Transform0 = spark_sql_query(glue_context, query = SqlQuery0, mapping = {"liberator_hangar_details": DataSource1, "liberator_hangar_types": DataSource2, "liberator_hangar_allocations": DataSource3, "liberator_licence_party": DataSource0}, transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", catalog_database_name = "dataplatform-" + environment + "-liberator-refined-zone", format = "glueparquet", connection_options = {"path": "s3://dataplatform-" + environment + "-refined-zone/parking/liberator/parking_cycle_hangars_denormalisation/", "partitionKeys": ["import_year" ,"import_month" ,"import_day" ,"import_date"], "enableUpdateCatalog":true, "updateBehavior":"UPDATE_IN_DATABASE"}, catalog_table_name = "parking_cycle_hangars_denormalisation", transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] -DataSink0 = glueContext.getSink(path = "s3://dataplatform-" + environment + "-refined-zone/parking/liberator/parking_cycle_hangars_denormalisation/", connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = ["import_year","import_month","import_day","import_date"], enableUpdateCatalog = True, transformation_ctx = "DataSink0") +DataSink0 = glue_context.getSink(path = "s3://dataplatform-" + environment + "-refined-zone/parking/liberator/parking_cycle_hangars_denormalisation/", connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = ["import_year","import_month","import_day","import_date"], enableUpdateCatalog = True, transformation_ctx = "DataSink0") DataSink0.setCatalogInfo(catalogDatabase = "dataplatform-" + environment + "-liberator-refined-zone",catalogTableName = "parking_cycle_hangars_denormalisation") DataSink0.setFormat("glueparquet") DataSink0.writeFrame(Transform0)