diff --git a/data_steward/cdr_cleaner/clean_cdr.py b/data_steward/cdr_cleaner/clean_cdr.py index 990f1e828..14563ca39 100644 --- a/data_steward/cdr_cleaner/clean_cdr.py +++ b/data_steward/cdr_cleaner/clean_cdr.py @@ -19,6 +19,8 @@ from cdr_cleaner.cleaning_rules.convert_pre_post_coordinated_concepts import ConvertPrePostCoordinatedConcepts from cdr_cleaner.cleaning_rules.create_aian_lookup import CreateAIANLookup from cdr_cleaner.cleaning_rules.create_expected_ct_list import StoreExpectedCTList +from cdr_cleaner.cleaning_rules.deid.ct_additional_privacy_suppression import CTAdditionalPrivacyConceptSuppression +from cdr_cleaner.cleaning_rules.deid.rt_additional_privacy_suppression import RTAdditionalPrivacyConceptSuppression from cdr_cleaner.cleaning_rules.domain_alignment import DomainAlignment import cdr_cleaner.cleaning_rules.drop_duplicate_states as drop_duplicate_states from cdr_cleaner.cleaning_rules.drop_extreme_measurements import DropExtremeMeasurements @@ -301,6 +303,7 @@ (RegisteredCopeSurveyQuestionsSuppression,), (ExplicitIdentifierSuppression,), (CancerConceptSuppression,), + (RTAdditionalPrivacyConceptSuppression,), (StringFieldsSuppression,), (FreeTextSurveyResponseSuppression,), (DropOrphanedSurveyConductIds,), @@ -364,6 +367,7 @@ (IDFieldSuppression,), # Should run after any data remapping (CancerConceptSuppression,), # Should run after any data remapping rules (SectionParticipationConceptSuppression,), + (CTAdditionalPrivacyConceptSuppression,), (StringFieldsSuppression,), (AggregateZipCodes,), (DeidentifyAIANZip3Values,), diff --git a/data_steward/cdr_cleaner/cleaning_rules/deid/ct_additional_privacy_suppression.py b/data_steward/cdr_cleaner/cleaning_rules/deid/ct_additional_privacy_suppression.py new file mode 100644 index 000000000..d5fe916c2 --- /dev/null +++ b/data_steward/cdr_cleaner/cleaning_rules/deid/ct_additional_privacy_suppression.py @@ -0,0 +1,125 @@ +""" +Ensures that all the newly identified concepts as of 02/29/2024 in vocabulary are being suppressed +in the Controlled tier dataset and sandboxed in the sandbox dataset + + +Original Issue: DC-3749 + +The intent of this cleaning rule is to ensure the concepts to suppress in CT are sandboxed and suppressed. +""" + +# Python imports +import logging +import pandas as pd + +# Project imports +from resources import CT_ADDITIONAL_PRIVACY_CONCEPTS_PATH +from gcloud.bq import bigquery +from common import AOU_DEATH, CDM_TABLES +from utils import pipeline_logging +import constants.cdr_cleaner.clean_cdr as cdr_consts +from cdr_cleaner.cleaning_rules.deid.concept_suppression import \ + AbstractBqLookupTableConceptSuppression + +# Third party imports +from google.cloud.exceptions import GoogleCloudError + +LOGGER = logging.getLogger(__name__) +ISSUE_NUMBERS = ['DC3749'] + + +class CTAdditionalPrivacyConceptSuppression( + AbstractBqLookupTableConceptSuppression): + + def __init__(self, + project_id, + dataset_id, + sandbox_dataset_id, + table_namer=None): + """ + Initialize the class with proper info. + + Set the issue numbers, description and affected datasets. As other + tickets may affect this SQL, append them to the list of Jira Issues. + DO NOT REMOVE ORIGINAL JIRA ISSUE NUMBERS! + """ + desc = f'Any record with an concept_id equal to any of the values in ' \ + f'{ISSUE_NUMBERS} will be sandboxed and dropped from the domain tables' + ct_additional_privacy_concept_table = f'ct_additional_privacy_{ISSUE_NUMBERS[0]}' + super().__init__( + issue_numbers=ISSUE_NUMBERS, + description=desc, + affected_datasets=[cdr_consts.CONTROLLED_TIER_DEID], + project_id=project_id, + dataset_id=dataset_id, + sandbox_dataset_id=sandbox_dataset_id, + affected_tables=CDM_TABLES + [AOU_DEATH], + concept_suppression_lookup_table=ct_additional_privacy_concept_table, + table_namer=table_namer) + + def create_suppression_lookup_table(self, client): + df = pd.read_csv(CT_ADDITIONAL_PRIVACY_CONCEPTS_PATH) + dataset_ref = bigquery.DatasetReference(self.project_id, + self.sandbox_dataset_id) + table_ref = dataset_ref.table(self.concept_suppression_lookup_table) + result = client.load_table_from_dataframe(df, table_ref).result() + + if hasattr(result, 'errors') and result.errors: + LOGGER.error(f"Error running job {result.job_id}: {result.errors}") + raise GoogleCloudError( + f"Error running job {result.job_id}: {result.errors}") + + def validate_rule(self, client, *args, **keyword_args): + """ + Validates the cleaning rule which deletes or updates the data from the tables + + Method to run validation on cleaning rules that will be updating the values. + For example: + if your class updates all the datetime fields you should be implementing the + validation that checks if the date time values that needs to be updated no + longer exists in the table. + + if your class deletes a subset of rows in the tables you should be implementing + the validation that checks if the count of final final row counts + deleted rows + should equals to initial row counts of the affected tables. + + Raises RunTimeError if the validation fails. + """ + + raise NotImplementedError("Please fix me.") + + def setup_validation(self, client, *args, **keyword_args): + """ + Run required steps for validation setup + + Method to run to setup validation on cleaning rules that will be updating or deleting the values. + For example: + if your class updates all the datetime fields you should be implementing the + logic to get the initial list of values which adhere to a condition we are looking for. + + if your class deletes a subset of rows in the tables you should be implementing + the logic to get the row counts of the tables prior to applying cleaning rule + + """ + raise NotImplementedError("Please fix me.") + + +if __name__ == '__main__': + import cdr_cleaner.args_parser as parser + import cdr_cleaner.clean_cdr_engine as clean_engine + + ARGS = parser.default_parse_args() + pipeline_logging.configure(level=logging.DEBUG, add_console_handler=True) + + if ARGS.list_queries: + clean_engine.add_console_logging() + query_list = clean_engine.get_query_list( + ARGS.project_id, ARGS.dataset_id, ARGS.sandbox_dataset_id, + [(CTAdditionalPrivacyConceptSuppression,)]) + for query in query_list: + LOGGER.info(query) + else: + clean_engine.add_console_logging(ARGS.console_log) + clean_engine.clean_dataset(ARGS.project_id, ARGS.dataset_id, + ARGS.sandbox_dataset_id, + [(CTAdditionalPrivacyConceptSuppression,)]) diff --git a/data_steward/cdr_cleaner/cleaning_rules/deid/rt_additional_privacy_suppression.py b/data_steward/cdr_cleaner/cleaning_rules/deid/rt_additional_privacy_suppression.py new file mode 100644 index 000000000..c938388e1 --- /dev/null +++ b/data_steward/cdr_cleaner/cleaning_rules/deid/rt_additional_privacy_suppression.py @@ -0,0 +1,125 @@ +""" +Ensures that all the newly identified concepts as of 02/29/2024 in vocabulary are being suppressed +in the Registered tier dataset and sandboxed in the sandbox dataset + + +Original Issue: DC-3749 + +The intent of this cleaning rule is to ensure the concepts to suppress in RT are sandboxed and suppressed. +""" + +# Python imports +import logging +import pandas as pd + +# Project imports +from resources import RT_ADDITIONAL_PRIVACY_CONCEPTS_PATH +from gcloud.bq import bigquery +from common import AOU_DEATH, CDM_TABLES +from utils import pipeline_logging +import constants.cdr_cleaner.clean_cdr as cdr_consts +from cdr_cleaner.cleaning_rules.deid.concept_suppression import \ + AbstractBqLookupTableConceptSuppression + +# Third party imports +from google.cloud.exceptions import GoogleCloudError + +LOGGER = logging.getLogger(__name__) +ISSUE_NUMBERS = ['DC3749'] + + +class RTAdditionalPrivacyConceptSuppression( + AbstractBqLookupTableConceptSuppression): + + def __init__(self, + project_id, + dataset_id, + sandbox_dataset_id, + table_namer=None): + """ + Initialize the class with proper info. + + Set the issue numbers, description and affected datasets. As other + tickets may affect this SQL, append them to the list of Jira Issues. + DO NOT REMOVE ORIGINAL JIRA ISSUE NUMBERS! + """ + desc = f'Any record with an concept_id equal to any of the values in ' \ + f'{ISSUE_NUMBERS} will be sandboxed and dropped from the domain tables' + rt_additional_privacy_concept_table = f'rt_additional_privacy_{ISSUE_NUMBERS[0]}' + super().__init__( + issue_numbers=ISSUE_NUMBERS, + description=desc, + affected_datasets=[cdr_consts.REGISTERED_TIER_DEID], + project_id=project_id, + dataset_id=dataset_id, + sandbox_dataset_id=sandbox_dataset_id, + affected_tables=CDM_TABLES + [AOU_DEATH], + concept_suppression_lookup_table=rt_additional_privacy_concept_table, + table_namer=table_namer) + + def create_suppression_lookup_table(self, client): + df = pd.read_csv(RT_ADDITIONAL_PRIVACY_CONCEPTS_PATH) + dataset_ref = bigquery.DatasetReference(self.project_id, + self.sandbox_dataset_id) + table_ref = dataset_ref.table(self.concept_suppression_lookup_table) + result = client.load_table_from_dataframe(df, table_ref).result() + + if hasattr(result, 'errors') and result.errors: + LOGGER.error(f"Error running job {result.job_id}: {result.errors}") + raise GoogleCloudError( + f"Error running job {result.job_id}: {result.errors}") + + def validate_rule(self, client, *args, **keyword_args): + """ + Validates the cleaning rule which deletes or updates the data from the tables + + Method to run validation on cleaning rules that will be updating the values. + For example: + if your class updates all the datetime fields you should be implementing the + validation that checks if the date time values that needs to be updated no + longer exists in the table. + + if your class deletes a subset of rows in the tables you should be implementing + the validation that checks if the count of final final row counts + deleted rows + should equals to initial row counts of the affected tables. + + Raises RunTimeError if the validation fails. + """ + + raise NotImplementedError("Please fix me.") + + def setup_validation(self, client, *args, **keyword_args): + """ + Run required steps for validation setup + + Method to run to setup validation on cleaning rules that will be updating or deleting the values. + For example: + if your class updates all the datetime fields you should be implementing the + logic to get the initial list of values which adhere to a condition we are looking for. + + if your class deletes a subset of rows in the tables you should be implementing + the logic to get the row counts of the tables prior to applying cleaning rule + + """ + raise NotImplementedError("Please fix me.") + + +if __name__ == '__main__': + import cdr_cleaner.args_parser as parser + import cdr_cleaner.clean_cdr_engine as clean_engine + + ARGS = parser.default_parse_args() + pipeline_logging.configure(level=logging.DEBUG, add_console_handler=True) + + if ARGS.list_queries: + clean_engine.add_console_logging() + query_list = clean_engine.get_query_list( + ARGS.project_id, ARGS.dataset_id, ARGS.sandbox_dataset_id, + [(RTAdditionalPrivacyConceptSuppression,)]) + for query in query_list: + LOGGER.info(query) + else: + clean_engine.add_console_logging(ARGS.console_log) + clean_engine.clean_dataset(ARGS.project_id, ARGS.dataset_id, + ARGS.sandbox_dataset_id, + [(RTAdditionalPrivacyConceptSuppression,)]) diff --git a/data_steward/resources.py b/data_steward/resources.py index 5375c136b..a4e02da23 100644 --- a/data_steward/resources.py +++ b/data_steward/resources.py @@ -66,6 +66,10 @@ CDR_CLEANER_PATH = os.path.join(resource_files_path, 'cdr_cleaner') REPLACED_PRIVACY_CONCEPTS_PATH = os.path.join( CDR_CLEANER_PATH, 'controlled_tier_replaced_privacy_concepts.csv') +CT_ADDITIONAL_PRIVACY_CONCEPTS_PATH = os.path.join( + CDR_CLEANER_PATH, 'ct_additional_privacy_concepts.csv') +RT_ADDITIONAL_PRIVACY_CONCEPTS_PATH = os.path.join( + CDR_CLEANER_PATH, 'rt_additional_privacy_concepts.csv') COPE_SUPPRESSION_PATH = os.path.join(CDR_CLEANER_PATH, 'cope_suppression') RT_CT_COPE_SUPPRESSION_CSV_PATH = os.path.join(COPE_SUPPRESSION_PATH, 'rt_ct_cope_suppression.csv')