diff --git a/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py b/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py index 238451cee..9808801ef 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py +++ b/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py @@ -75,8 +75,8 @@ WHERE m1.new_observation_concept_id IS NOT NULL """) -SANDBOX_QUERY = JINJA_ENV.from_string(""" -CREATE OR REPLACE TABLE `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` AS +SANDBOX_OLD_ROWS_QUERY = JINJA_ENV.from_string(""" +CREATE OR REPLACE TABLE `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` AS WITH concepts_with_only_maps_to_value AS ( SELECT cr1.concept_id_1 @@ -97,16 +97,15 @@ AND o.value_source_concept_id NOT IN (SELECT concept_id_1 FROM concepts_with_only_maps_to_value) """) -DELETE_QUERY = JINJA_ENV.from_string(""" +DELETE_OLD_ROWS_QUERY = JINJA_ENV.from_string(""" DELETE FROM `{{project}}.{{dataset}}.observation` WHERE observation_id IN ( - SELECT observation_id FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` + SELECT observation_id FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` ) """) -INSERT_QUERY = JINJA_ENV.from_string(""" -INSERT INTO `{{project}}.{{dataset}}.observation` -({{observation_fields}}) +SANDBOX_NEW_ROWS_QUERY = JINJA_ENV.from_string(""" +CREATE OR REPLACE TABLE `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_inserted` AS SELECT -- ROW_NUMBER() here can be 1 - 4. So, the newly generated IDs will be -- -- in the range of 100,000,000,000 - 499,999,999,999. -- @@ -136,13 +135,20 @@ o.value_source_concept_id, o.value_source_value, o.questionnaire_response_id -FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` o +FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` o JOIN `{{project}}.{{dataset}}.concept` c ON o.value_source_value = c.concept_code JOIN `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_mapping` m ON c.concept_id = m.concept_id """) +INSERT_NEW_ROWS_QUERY = JINJA_ENV.from_string(""" +INSERT INTO `{{project}}.{{dataset}}.observation` +({{observation_fields}}) +SELECT * +FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_inserted` +""") + INSERT_MAPPING_QUERY = JINJA_ENV.from_string(""" INSERT INTO `{{project}}.{{dataset}}._mapping_observation` (observation_id, src_id) @@ -156,7 +162,7 @@ m.new_value_as_concept_id ) * 100000000000 + o.observation_id AS observation_id, om.src_id -FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` o +FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` o JOIN `{{project}}.{{dataset}}.concept` c ON o.value_source_value = c.concept_code JOIN `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_mapping` m @@ -169,7 +175,7 @@ DELETE FROM `{{project}}.{{dataset}}._mapping_observation` WHERE observation_id IN ( SELECT observation_id - FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` + FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` ) """) @@ -222,27 +228,34 @@ def get_query_specs(self, *args, **keyword_args) -> query_spec_list: 100,000,000,000 * ((n)th record from the mapping table for the value_source_concept_id) + original observation_id. "n" can be 1 - 4. So, the newly generated IDs will be in the range of 100,000,000,000 - 499,999,999,999. """ - sandbox_query_dict = { + sandbox_old_rows_dict = { cdr_consts.QUERY: - SANDBOX_QUERY.render( + SANDBOX_OLD_ROWS_QUERY.render( project=self.project_id, sandbox_dataset=self.sandbox_dataset_id, sandbox_table=self.sandbox_table_for(OBSERVATION), dataset=self.dataset_id) } - delete_query_dict = { + delete_old_rows_dict = { cdr_consts.QUERY: - DELETE_QUERY.render( + DELETE_OLD_ROWS_QUERY.render( project=self.project_id, sandbox_dataset=self.sandbox_dataset_id, sandbox_table=self.sandbox_table_for(OBSERVATION), dataset=self.dataset_id) } - - insert_query_dict = { + sandbox_new_rows_dict = { cdr_consts.QUERY: - INSERT_QUERY.render( + SANDBOX_NEW_ROWS_QUERY.render( + project=self.project_id, + sandbox_dataset=self.sandbox_dataset_id, + sandbox_table=self.sandbox_table_for(OBSERVATION), + dataset=self.dataset_id) + } + insert_new_rows_query_dict = { + cdr_consts.QUERY: + INSERT_NEW_ROWS_QUERY.render( project=self.project_id, sandbox_dataset=self.sandbox_dataset_id, sandbox_table=self.sandbox_table_for(OBSERVATION), @@ -270,8 +283,9 @@ def get_query_specs(self, *args, **keyword_args) -> query_spec_list: } return [ - sandbox_query_dict, insert_query_dict, insert_mapping_query_dict, - delete_query_dict, delete_mapping_query_dict + sandbox_old_rows_dict, sandbox_new_rows_dict, + insert_new_rows_query_dict, insert_mapping_query_dict, + delete_old_rows_dict, delete_mapping_query_dict ] def setup_rule(self, client, *args, **keyword_args): @@ -303,10 +317,10 @@ def validate_rule(self, client): raise NotImplementedError("Please fix me.") def get_sandbox_tablenames(self): - return [ - self.sandbox_table_for(affected_table) - for affected_table in self._affected_tables - ] + old_data_sandbox = f'{self.sandbox_table_for(OBSERVATION)}_deleted' + new_data_sandbox = f'{self.sandbox_table_for(OBSERVATION)}_inserted' + mapping_sandbox = f'{self.sandbox_table_for(OBSERVATION)}_mapping_observation' + return [old_data_sandbox, new_data_sandbox, mapping_sandbox] if __name__ == '__main__': diff --git a/tests/integration_tests/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts_test.py b/tests/integration_tests/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts_test.py index c1b10fd34..da79f2be4 100644 --- a/tests/integration_tests/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts_test.py +++ b/tests/integration_tests/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts_test.py @@ -75,8 +75,10 @@ def setUpClass(cls): # defined in the resource_files folder. It has the columns defined in `create_rdr_snapshot.py` instead. cls.fq_mapping_table_name = f'{cls.project_id}.{cls.dataset_id}.{MAPPING_PREFIX}{OBSERVATION}' + sandbox_tables = cls.rule_instance.get_sandbox_tablenames() cls.fq_sandbox_table_names = [ - f'{cls.project_id}.{cls.sandbox_id}.{cls.rule_instance.sandbox_table_for(OBSERVATION)}' + f'{cls.project_id}.{cls.sandbox_id}.{table}' + for table in sandbox_tables ] cls.up_class = super().setUpClass() @@ -160,4 +162,4 @@ def test_convert_pre_post_coordinated_concepts(self): def tearDown(self): self.client.delete_table(self.fq_mapping_table_name, not_found_ok=True) - super().tearDown() \ No newline at end of file + super().tearDown()