Skip to content

Commit

Permalink
[DC-3759] Add additional suppression
Browse files Browse the repository at this point in the history
  • Loading branch information
nishanthpp93 committed Mar 7, 2024
1 parent e6d88ec commit de1ccc6
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 0 deletions.
4 changes: 4 additions & 0 deletions data_steward/cdr_cleaner/clean_cdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from cdr_cleaner.cleaning_rules.convert_pre_post_coordinated_concepts import ConvertPrePostCoordinatedConcepts
from cdr_cleaner.cleaning_rules.create_aian_lookup import CreateAIANLookup
from cdr_cleaner.cleaning_rules.create_expected_ct_list import StoreExpectedCTList
from cdr_cleaner.cleaning_rules.deid.ct_additional_privacy_suppression import CTAdditionalPrivacyConceptSuppression
from cdr_cleaner.cleaning_rules.deid.rt_additional_privacy_suppression import RTAdditionalPrivacyConceptSuppression
from cdr_cleaner.cleaning_rules.domain_alignment import DomainAlignment
import cdr_cleaner.cleaning_rules.drop_duplicate_states as drop_duplicate_states
from cdr_cleaner.cleaning_rules.drop_extreme_measurements import DropExtremeMeasurements
Expand Down Expand Up @@ -301,6 +303,7 @@
(RegisteredCopeSurveyQuestionsSuppression,),
(ExplicitIdentifierSuppression,),
(CancerConceptSuppression,),
(RTAdditionalPrivacyConceptSuppression,),
(StringFieldsSuppression,),
(FreeTextSurveyResponseSuppression,),
(DropOrphanedSurveyConductIds,),
Expand Down Expand Up @@ -364,6 +367,7 @@
(IDFieldSuppression,), # Should run after any data remapping
(CancerConceptSuppression,), # Should run after any data remapping rules
(SectionParticipationConceptSuppression,),
(CTAdditionalPrivacyConceptSuppression,),
(StringFieldsSuppression,),
(AggregateZipCodes,),
(DeidentifyAIANZip3Values,),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Ensures that all the newly identified concepts as of 02/29/2024 in vocabulary are being suppressed
in the Controlled tier dataset and sandboxed in the sandbox dataset
Original Issue: DC-3749
The intent of this cleaning rule is to ensure the concepts to suppress in CT are sandboxed and suppressed.
"""

# Python imports
import logging
import pandas as pd

# Project imports
from resources import CT_ADDITIONAL_PRIVACY_CONCEPTS_PATH
from gcloud.bq import bigquery
from common import AOU_DEATH, CDM_TABLES
from utils import pipeline_logging
import constants.cdr_cleaner.clean_cdr as cdr_consts
from cdr_cleaner.cleaning_rules.deid.concept_suppression import \
AbstractBqLookupTableConceptSuppression

# Third party imports
from google.cloud.exceptions import GoogleCloudError

LOGGER = logging.getLogger(__name__)
ISSUE_NUMBERS = ['DC3749']


class CTAdditionalPrivacyConceptSuppression(
AbstractBqLookupTableConceptSuppression):

def __init__(self,
project_id,
dataset_id,
sandbox_dataset_id,
table_namer=None):
"""
Initialize the class with proper info.
Set the issue numbers, description and affected datasets. As other
tickets may affect this SQL, append them to the list of Jira Issues.
DO NOT REMOVE ORIGINAL JIRA ISSUE NUMBERS!
"""
desc = f'Any record with an concept_id equal to any of the values in ' \
f'{ISSUE_NUMBERS} will be sandboxed and dropped from the domain tables'
ct_additional_privacy_concept_table = f'ct_additional_privacy_{ISSUE_NUMBERS[0]}'
super().__init__(
issue_numbers=ISSUE_NUMBERS,
description=desc,
affected_datasets=[cdr_consts.CONTROLLED_TIER_DEID],
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
affected_tables=CDM_TABLES + [AOU_DEATH],
concept_suppression_lookup_table=ct_additional_privacy_concept_table,
table_namer=table_namer)

def create_suppression_lookup_table(self, client):
df = pd.read_csv(CT_ADDITIONAL_PRIVACY_CONCEPTS_PATH)
dataset_ref = bigquery.DatasetReference(self.project_id,
self.sandbox_dataset_id)
table_ref = dataset_ref.table(self.concept_suppression_lookup_table)
result = client.load_table_from_dataframe(df, table_ref).result()

if hasattr(result, 'errors') and result.errors:
LOGGER.error(f"Error running job {result.job_id}: {result.errors}")
raise GoogleCloudError(
f"Error running job {result.job_id}: {result.errors}")

def validate_rule(self, client, *args, **keyword_args):
"""
Validates the cleaning rule which deletes or updates the data from the tables
Method to run validation on cleaning rules that will be updating the values.
For example:
if your class updates all the datetime fields you should be implementing the
validation that checks if the date time values that needs to be updated no
longer exists in the table.
if your class deletes a subset of rows in the tables you should be implementing
the validation that checks if the count of final final row counts + deleted rows
should equals to initial row counts of the affected tables.
Raises RunTimeError if the validation fails.
"""

raise NotImplementedError("Please fix me.")

def setup_validation(self, client, *args, **keyword_args):
"""
Run required steps for validation setup
Method to run to setup validation on cleaning rules that will be updating or deleting the values.
For example:
if your class updates all the datetime fields you should be implementing the
logic to get the initial list of values which adhere to a condition we are looking for.
if your class deletes a subset of rows in the tables you should be implementing
the logic to get the row counts of the tables prior to applying cleaning rule
"""
raise NotImplementedError("Please fix me.")


if __name__ == '__main__':
import cdr_cleaner.args_parser as parser
import cdr_cleaner.clean_cdr_engine as clean_engine

ARGS = parser.default_parse_args()
pipeline_logging.configure(level=logging.DEBUG, add_console_handler=True)

if ARGS.list_queries:
clean_engine.add_console_logging()
query_list = clean_engine.get_query_list(
ARGS.project_id, ARGS.dataset_id, ARGS.sandbox_dataset_id,
[(CTAdditionalPrivacyConceptSuppression,)])
for query in query_list:
LOGGER.info(query)
else:
clean_engine.add_console_logging(ARGS.console_log)
clean_engine.clean_dataset(ARGS.project_id, ARGS.dataset_id,
ARGS.sandbox_dataset_id,
[(CTAdditionalPrivacyConceptSuppression,)])
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Ensures that all the newly identified concepts as of 02/29/2024 in vocabulary are being suppressed
in the Registered tier dataset and sandboxed in the sandbox dataset
Original Issue: DC-3749
The intent of this cleaning rule is to ensure the concepts to suppress in RT are sandboxed and suppressed.
"""

# Python imports
import logging
import pandas as pd

# Project imports
from resources import RT_ADDITIONAL_PRIVACY_CONCEPTS_PATH
from gcloud.bq import bigquery
from common import AOU_DEATH, CDM_TABLES
from utils import pipeline_logging
import constants.cdr_cleaner.clean_cdr as cdr_consts
from cdr_cleaner.cleaning_rules.deid.concept_suppression import \
AbstractBqLookupTableConceptSuppression

# Third party imports
from google.cloud.exceptions import GoogleCloudError

LOGGER = logging.getLogger(__name__)
ISSUE_NUMBERS = ['DC3749']


class RTAdditionalPrivacyConceptSuppression(
AbstractBqLookupTableConceptSuppression):

def __init__(self,
project_id,
dataset_id,
sandbox_dataset_id,
table_namer=None):
"""
Initialize the class with proper info.
Set the issue numbers, description and affected datasets. As other
tickets may affect this SQL, append them to the list of Jira Issues.
DO NOT REMOVE ORIGINAL JIRA ISSUE NUMBERS!
"""
desc = f'Any record with an concept_id equal to any of the values in ' \
f'{ISSUE_NUMBERS} will be sandboxed and dropped from the domain tables'
rt_additional_privacy_concept_table = f'rt_additional_privacy_{ISSUE_NUMBERS[0]}'
super().__init__(
issue_numbers=ISSUE_NUMBERS,
description=desc,
affected_datasets=[cdr_consts.REGISTERED_TIER_DEID],
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
affected_tables=CDM_TABLES + [AOU_DEATH],
concept_suppression_lookup_table=rt_additional_privacy_concept_table,
table_namer=table_namer)

def create_suppression_lookup_table(self, client):
df = pd.read_csv(RT_ADDITIONAL_PRIVACY_CONCEPTS_PATH)
dataset_ref = bigquery.DatasetReference(self.project_id,
self.sandbox_dataset_id)
table_ref = dataset_ref.table(self.concept_suppression_lookup_table)
result = client.load_table_from_dataframe(df, table_ref).result()

if hasattr(result, 'errors') and result.errors:
LOGGER.error(f"Error running job {result.job_id}: {result.errors}")
raise GoogleCloudError(
f"Error running job {result.job_id}: {result.errors}")

def validate_rule(self, client, *args, **keyword_args):
"""
Validates the cleaning rule which deletes or updates the data from the tables
Method to run validation on cleaning rules that will be updating the values.
For example:
if your class updates all the datetime fields you should be implementing the
validation that checks if the date time values that needs to be updated no
longer exists in the table.
if your class deletes a subset of rows in the tables you should be implementing
the validation that checks if the count of final final row counts + deleted rows
should equals to initial row counts of the affected tables.
Raises RunTimeError if the validation fails.
"""

raise NotImplementedError("Please fix me.")

def setup_validation(self, client, *args, **keyword_args):
"""
Run required steps for validation setup
Method to run to setup validation on cleaning rules that will be updating or deleting the values.
For example:
if your class updates all the datetime fields you should be implementing the
logic to get the initial list of values which adhere to a condition we are looking for.
if your class deletes a subset of rows in the tables you should be implementing
the logic to get the row counts of the tables prior to applying cleaning rule
"""
raise NotImplementedError("Please fix me.")


if __name__ == '__main__':
import cdr_cleaner.args_parser as parser
import cdr_cleaner.clean_cdr_engine as clean_engine

ARGS = parser.default_parse_args()
pipeline_logging.configure(level=logging.DEBUG, add_console_handler=True)

if ARGS.list_queries:
clean_engine.add_console_logging()
query_list = clean_engine.get_query_list(
ARGS.project_id, ARGS.dataset_id, ARGS.sandbox_dataset_id,
[(RTAdditionalPrivacyConceptSuppression,)])
for query in query_list:
LOGGER.info(query)
else:
clean_engine.add_console_logging(ARGS.console_log)
clean_engine.clean_dataset(ARGS.project_id, ARGS.dataset_id,
ARGS.sandbox_dataset_id,
[(RTAdditionalPrivacyConceptSuppression,)])
4 changes: 4 additions & 0 deletions data_steward/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
CDR_CLEANER_PATH = os.path.join(resource_files_path, 'cdr_cleaner')
REPLACED_PRIVACY_CONCEPTS_PATH = os.path.join(
CDR_CLEANER_PATH, 'controlled_tier_replaced_privacy_concepts.csv')
CT_ADDITIONAL_PRIVACY_CONCEPTS_PATH = os.path.join(
CDR_CLEANER_PATH, 'ct_additional_privacy_concepts.csv')
RT_ADDITIONAL_PRIVACY_CONCEPTS_PATH = os.path.join(
CDR_CLEANER_PATH, 'rt_additional_privacy_concepts.csv')
COPE_SUPPRESSION_PATH = os.path.join(CDR_CLEANER_PATH, 'cope_suppression')
RT_CT_COPE_SUPPRESSION_CSV_PATH = os.path.join(COPE_SUPPRESSION_PATH,
'rt_ct_cope_suppression.csv')
Expand Down

0 comments on commit de1ccc6

Please sign in to comment.