Skip to content

Commit

Permalink
Merge pull request #2015 from LBHackney-IT/di-447-update-data-quality…
Browse files Browse the repository at this point in the history
…-tests

Script and job to generate metadata for Housing data quality tests after results table produced.
  • Loading branch information
annajgibson authored Dec 9, 2024
2 parents 0fdb5dc + 48b54d5 commit 13d53c8
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 0 deletions.
85 changes: 85 additions & 0 deletions scripts/jobs/housing/housing_gx_dq_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# flake8: noqa: F821

import awswrangler as wr
from datetime import datetime
import logging
import sys

from awsglue.utils import getResolvedOptions
import great_expectations as gx
import pandas as pd
from scripts.helpers.housing_gx_dq_inputs import table_list, partition_keys
import scripts.jobs.housing.housing_person_reshape_gx_suite
import scripts.jobs.housing.housing_tenure_reshape_gx_suite
import scripts.jobs.housing.housing_contacts_reshape_gx_suite
import scripts.jobs.housing.housing_assets_reshape_gx_suite
import scripts.jobs.housing.housing_homeowner_record_sheet_gx_suite
import scripts.jobs.housing.housing_dwellings_list_gx_suite

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database',
'target_table']
args = getResolvedOptions(sys.argv, arg_keys)
locals().update(args)


def main():
# add GX context
context = gx.get_context(mode="file", project_root_dir=s3_target_location)

df_all_suite_list = []

for table in table_list:

# get expectation suite for dataset
suite = context.suites.get(name=f'{table}_suite')
expectations = suite.expectations

# drop columns not needed
cols_to_drop = ['notes', 'result_format', 'catch_exceptions',
'rendered_content', 'windows', 'batch_id']

suite_df = pd.DataFrame()
for i in expectations:
temp_i = i
temp_df = pd.json_normalize(dict(temp_i))
temp_df['expectation_type'] = temp_i.expectation_type
temp_df['dataset_name'] = table
temp_df = temp_df.drop(columns=cols_to_drop)
suite_df = pd.concat([suite_df, temp_df])

df_all_suite_list.append(suite_df)

df = pd.concat(df_all_suite_list)

# add expectation_id
df['expectation_id'] = df['expectation_type'] + "_" + df['dataset_name']

df['import_year'] = datetime.today().year
df['import_month'] = datetime.today().month
df['import_day'] = datetime.today().day
df['import_date'] = datetime.today().strftime('%Y%m%d')

# set dtypes for Athena with default of string
dict_values = ['string' for _ in range(len(df.columns))]
dtype_dict = dict(zip(df.columns, dict_values))

# write to s3
wr.s3.to_parquet(
df=df,
path=s3_target_location,
dataset=True,
database=target_database,
table=target_table,
mode="overwrite",
partition_cols=partition_keys,
dtype=dtype_dict
)

logger.info(f'GX Data Quality test metadata written to {s3_target_location}')


if __name__ == '__main__':
main()
52 changes: 52 additions & 0 deletions terraform/etl/54-aws-glue-housing-apply-gx-dq-tests.tf
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,55 @@ module "housing_apply_gx_dq_tests" {

script_name = "housing_apply_gx_dq_tests"
}

module "housing_gx_dq_metadata" {
source = "../modules/aws-glue-job"
is_production_environment = local.is_production_environment
is_live_environment = local.is_live_environment

count = local.is_live_environment ? 1 : 0

department = module.department_housing_data_source
job_name = "${local.short_identifier_prefix}Housing GX Data Quality Metadata"
glue_scripts_bucket_id = module.glue_scripts_data_source.bucket_id
glue_temp_bucket_id = module.glue_temp_storage_data_source.bucket_id
glue_job_timeout = 360
helper_module_key = data.aws_s3_object.helpers.key
pydeequ_zip_key = data.aws_s3_object.pydeequ.key
spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id
trigger_enabled = local.is_production_environment
number_of_workers_for_glue_job = 2
job_parameters = {
"--job-bookmark-option" = "job-bookmark-enable"
"--enable-glue-datacatalog" = "true"
"--enable-continuous-cloudwatch-log" = "true"
"--additional-python-modules" = "great_expectations==1.2.1,numpy==1.26.1,awswrangler==3.10.0"
"--region_name" = data.aws_region.current.name
"--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com"
"--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/housing/data-quality-test-metadata/"
"--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/housing/data-quality-test-metadata/"
"--target_database" = "housing-raw-zone"
"--target_table" = "housing_gx_data_quality_test_metadata"
}

script_name = "housing_gx_dq_metadata"
}

resource "aws_glue_trigger" "housing_gx_dq_metadata" {
name = "${local.short_identifier_prefix}Housing GX Data Quality Metadata Trigger"
type = "CONDITIONAL"
tags = module.department_housing_data_source.tags
enabled = local.is_production_environment
count = local.is_live_environment ? 1 : 0

actions {
job_name = "${local.short_identifier_prefix}Housing GX Data Quality Metadata"
}

predicate {
conditions {
job_name = "${local.short_identifier_prefix}Housing GX Data Quality Testing"
state = "SUCCEEDED"
}
}
}

0 comments on commit 13d53c8

Please sign in to comment.