diff --git a/scripts/jobs/env_enforcement/noisework_complaints_refined.py b/scripts/jobs/env_enforcement/noisework_complaints_refined.py index 8f5f6b87a..e91f47068 100644 --- a/scripts/jobs/env_enforcement/noisework_complaints_refined.py +++ b/scripts/jobs/env_enforcement/noisework_complaints_refined.py @@ -14,24 +14,10 @@ from scripts.helpers.coordinates import convert_bng_to_latlon -def drop_null_columns(dataframe): - _df_length = dataframe.count() - null_counts = dataframe.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in dataframe.columns]).collect()[0].asDict() - to_drop = [k for k, v in null_counts.items() if v >= _df_length] - dataframe = dataframe.drop(*to_drop) - return dataframe - - -def get_latest_snapshot(dataframe): - """Creates a function that returns only the latest snapshot, not needed if pushdown_predicate is used but it""" - dataframe = dataframe.where(f.col('import_date') == dataframe.select(max('import_date')).first()[0]) - return dataframe - - if __name__ == "__main__": # read job parameters args = getResolvedOptions(sys.argv, ['JOB_NAME']) - source_catalog_table = get_glue_env_var('source_catalog_table', '') + source_catalog_table1 = get_glue_env_var('source_catalog_table1', '') source_catalog_table2 = get_glue_env_var('source_catalog_table2', '') source_catalog_database = get_glue_env_var('source_catalog_database', '') s3_bucket_target = get_glue_env_var('s3_bucket_target', '') @@ -42,13 +28,13 @@ def get_latest_snapshot(dataframe): logger = glueContext.get_logger() job = Job(glueContext) job.init(args['JOB_NAME'], args) - logger.info(f'The job is starting. The source tables are {source_catalog_database}.{source_catalog_table},' + logger.info(f'The job is starting. The source tables are {source_catalog_database}.{source_catalog_table1},' f' {source_catalog_database}.{source_catalog_table2}') # Load data from glue catalog - data_source = glueContext.create_dynamic_frame.from_catalog( + data_source1 = glueContext.create_dynamic_frame.from_catalog( name_space=source_catalog_database, - table_name=source_catalog_table, + table_name=source_catalog_table1, push_down_predicate="import_date=date_format(current_date, 'yyyyMMdd')" ) data_source2 = glueContext.create_dynamic_frame.from_catalog( @@ -58,11 +44,14 @@ def get_latest_snapshot(dataframe): ) # convert dynamic frame to data frame - df = data_source.toDF() + logger.info(f'Cases table schema before conversion.{data_source1.printSchema()}') + logger.info(f'Complaints table schema before conversion.{data_source2.printSchema()}') + + df1 = data_source1.toDF() df2 = data_source2.toDF() # filter out the blanks - df = df.filter(df.id != "") + df1 = df1.filter(df1.id != "") df2 = df2.filter(df2.case_id != "") # remove the partition dates from complaints @@ -71,17 +60,17 @@ def get_latest_snapshot(dataframe): df2 = df2.withColumnRenamed("id", "complaint_id") # Create date and time columns - df = df.withColumn('case_created_date', f.date_format("created", "MM/dd/yyyy")) - df = df.withColumn('case_created_time', f.date_format("created", "HH:mm")) + df1 = df1.withColumn('case_created_date', f.date_format("created", "MM/dd/yyyy")) + df1 = df1.withColumn('case_created_time', f.date_format("created", "HH:mm")) df2 = df2.withColumn('complaint_created_date', f.date_format("created", "MM/dd/yyyy")) df2 = df2.withColumn('complaint_created_time', f.date_format("created", "HH:mm")) # rename the created fields - df = df.withColumnRenamed("created", "case_created_datetime") + df1 = df1.withColumnRenamed("created", "case_created_datetime") df2 = df2.withColumnRenamed("created", "complaint_created_datetime") # join the tables - output = df.join(df2, df.id == df2.case_id, "left") + output = df1.join(df2, df1.id == df2.case_id, "left") output = output.drop("id") # convert to int so suitable for geo df diff --git a/terraform/etl/35-aws-glue-env-enforcement.tf b/terraform/etl/35-aws-glue-env-enforcement.tf index 924405e78..7d36d76b2 100644 --- a/terraform/etl/35-aws-glue-env-enforcement.tf +++ b/terraform/etl/35-aws-glue-env-enforcement.tf @@ -51,7 +51,7 @@ module "noisework_complaints_to_refined" { "--s3_bucket_target2" = "s3://${module.refined_zone_data_source.bucket_id}/env-enforcement/noisework_complaints_to_geocode" "--enable-glue-datacatalog" = "true" "--source_catalog_database" = module.department_env_enforcement_data_source.raw_zone_catalog_database_name - "--source_catalog_table" = "noiseworks_case" + "--source_catalog_table1" = "noiseworks_case" "--source_catalog_table2" = "noiseworks_complaint" }