Skip to content

Commit

Permalink
Merge pull request #1447 from LBHackney-IT/LBHSBALLEY-patch-5
Browse files Browse the repository at this point in the history
Update noisework_complaints_refined.py
  • Loading branch information
LBHSBALLEY authored Oct 4, 2023
2 parents aaa7c9f + bf1f5df commit cf50cce
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 25 deletions.
37 changes: 13 additions & 24 deletions scripts/jobs/env_enforcement/noisework_complaints_refined.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,10 @@
from scripts.helpers.coordinates import convert_bng_to_latlon


def drop_null_columns(dataframe):
_df_length = dataframe.count()
null_counts = dataframe.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in dataframe.columns]).collect()[0].asDict()
to_drop = [k for k, v in null_counts.items() if v >= _df_length]
dataframe = dataframe.drop(*to_drop)
return dataframe


def get_latest_snapshot(dataframe):
"""Creates a function that returns only the latest snapshot, not needed if pushdown_predicate is used but it"""
dataframe = dataframe.where(f.col('import_date') == dataframe.select(max('import_date')).first()[0])
return dataframe


if __name__ == "__main__":
# read job parameters
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
source_catalog_table = get_glue_env_var('source_catalog_table', '')
source_catalog_table1 = get_glue_env_var('source_catalog_table1', '')
source_catalog_table2 = get_glue_env_var('source_catalog_table2', '')
source_catalog_database = get_glue_env_var('source_catalog_database', '')
s3_bucket_target = get_glue_env_var('s3_bucket_target', '')
Expand All @@ -42,13 +28,13 @@ def get_latest_snapshot(dataframe):
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger.info(f'The job is starting. The source tables are {source_catalog_database}.{source_catalog_table},'
logger.info(f'The job is starting. The source tables are {source_catalog_database}.{source_catalog_table1},'
f' {source_catalog_database}.{source_catalog_table2}')

# Load data from glue catalog
data_source = glueContext.create_dynamic_frame.from_catalog(
data_source1 = glueContext.create_dynamic_frame.from_catalog(
name_space=source_catalog_database,
table_name=source_catalog_table,
table_name=source_catalog_table1,
push_down_predicate="import_date=date_format(current_date, 'yyyyMMdd')"
)
data_source2 = glueContext.create_dynamic_frame.from_catalog(
Expand All @@ -58,11 +44,14 @@ def get_latest_snapshot(dataframe):
)

# convert dynamic frame to data frame
df = data_source.toDF()
logger.info(f'Cases table schema before conversion.{data_source1.printSchema()}')
logger.info(f'Complaints table schema before conversion.{data_source2.printSchema()}')

df1 = data_source1.toDF()
df2 = data_source2.toDF()

# filter out the blanks
df = df.filter(df.id != "")
df1 = df1.filter(df1.id != "")
df2 = df2.filter(df2.case_id != "")

# remove the partition dates from complaints
Expand All @@ -71,17 +60,17 @@ def get_latest_snapshot(dataframe):
df2 = df2.withColumnRenamed("id", "complaint_id")

# Create date and time columns
df = df.withColumn('case_created_date', f.date_format("created", "MM/dd/yyyy"))
df = df.withColumn('case_created_time', f.date_format("created", "HH:mm"))
df1 = df1.withColumn('case_created_date', f.date_format("created", "MM/dd/yyyy"))
df1 = df1.withColumn('case_created_time', f.date_format("created", "HH:mm"))
df2 = df2.withColumn('complaint_created_date', f.date_format("created", "MM/dd/yyyy"))
df2 = df2.withColumn('complaint_created_time', f.date_format("created", "HH:mm"))

# rename the created fields
df = df.withColumnRenamed("created", "case_created_datetime")
df1 = df1.withColumnRenamed("created", "case_created_datetime")
df2 = df2.withColumnRenamed("created", "complaint_created_datetime")

# join the tables
output = df.join(df2, df.id == df2.case_id, "left")
output = df1.join(df2, df1.id == df2.case_id, "left")
output = output.drop("id")

# convert to int so suitable for geo df
Expand Down
2 changes: 1 addition & 1 deletion terraform/etl/35-aws-glue-env-enforcement.tf
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module "noisework_complaints_to_refined" {
"--s3_bucket_target2" = "s3://${module.refined_zone_data_source.bucket_id}/env-enforcement/noisework_complaints_to_geocode"
"--enable-glue-datacatalog" = "true"
"--source_catalog_database" = module.department_env_enforcement_data_source.raw_zone_catalog_database_name
"--source_catalog_table" = "noiseworks_case"
"--source_catalog_table1" = "noiseworks_case"
"--source_catalog_table2" = "noiseworks_complaint"

}
Expand Down

0 comments on commit cf50cce

Please sign in to comment.