Skip to content

Commit

Permalink
[DC-3598] Update convert_pre_post_coordinated_concepts to create an…
Browse files Browse the repository at this point in the history
… INSERT sandbox as well as the DELETE sandbox (#1823)

* [DC-3598] Add sandbox creation

* [DC-3598] Update the test sandboxes

* [DC-3598] Update the query run order
  • Loading branch information
brendagutman authored Dec 12, 2023
1 parent 0111830 commit 40538cf
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
WHERE m1.new_observation_concept_id IS NOT NULL
""")

SANDBOX_QUERY = JINJA_ENV.from_string("""
CREATE OR REPLACE TABLE `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` AS
SANDBOX_OLD_ROWS_QUERY = JINJA_ENV.from_string("""
CREATE OR REPLACE TABLE `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` AS
WITH concepts_with_only_maps_to_value AS (
SELECT cr1.concept_id_1
Expand All @@ -97,16 +97,15 @@
AND o.value_source_concept_id NOT IN (SELECT concept_id_1 FROM concepts_with_only_maps_to_value)
""")

DELETE_QUERY = JINJA_ENV.from_string("""
DELETE_OLD_ROWS_QUERY = JINJA_ENV.from_string("""
DELETE FROM `{{project}}.{{dataset}}.observation`
WHERE observation_id IN (
SELECT observation_id FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}`
SELECT observation_id FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted`
)
""")

INSERT_QUERY = JINJA_ENV.from_string("""
INSERT INTO `{{project}}.{{dataset}}.observation`
({{observation_fields}})
SANDBOX_NEW_ROWS_QUERY = JINJA_ENV.from_string("""
CREATE OR REPLACE TABLE `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_inserted` AS
SELECT
-- ROW_NUMBER() here can be 1 - 4. So, the newly generated IDs will be --
-- in the range of 100,000,000,000 - 499,999,999,999. --
Expand Down Expand Up @@ -136,13 +135,20 @@
o.value_source_concept_id,
o.value_source_value,
o.questionnaire_response_id
FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` o
FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` o
JOIN `{{project}}.{{dataset}}.concept` c
ON o.value_source_value = c.concept_code
JOIN `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_mapping` m
ON c.concept_id = m.concept_id
""")

INSERT_NEW_ROWS_QUERY = JINJA_ENV.from_string("""
INSERT INTO `{{project}}.{{dataset}}.observation`
({{observation_fields}})
SELECT *
FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_inserted`
""")

INSERT_MAPPING_QUERY = JINJA_ENV.from_string("""
INSERT INTO `{{project}}.{{dataset}}._mapping_observation`
(observation_id, src_id)
Expand All @@ -156,7 +162,7 @@
m.new_value_as_concept_id
) * 100000000000 + o.observation_id AS observation_id,
om.src_id
FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}` o
FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted` o
JOIN `{{project}}.{{dataset}}.concept` c
ON o.value_source_value = c.concept_code
JOIN `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_mapping` m
Expand All @@ -169,7 +175,7 @@
DELETE FROM `{{project}}.{{dataset}}._mapping_observation`
WHERE observation_id IN (
SELECT observation_id
FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}`
FROM `{{project}}.{{sandbox_dataset}}.{{sandbox_table}}_deleted`
)
""")

Expand Down Expand Up @@ -222,27 +228,34 @@ def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
100,000,000,000 * ((n)th record from the mapping table for the value_source_concept_id) + original observation_id.
"n" can be 1 - 4. So, the newly generated IDs will be in the range of 100,000,000,000 - 499,999,999,999.
"""
sandbox_query_dict = {
sandbox_old_rows_dict = {
cdr_consts.QUERY:
SANDBOX_QUERY.render(
SANDBOX_OLD_ROWS_QUERY.render(
project=self.project_id,
sandbox_dataset=self.sandbox_dataset_id,
sandbox_table=self.sandbox_table_for(OBSERVATION),
dataset=self.dataset_id)
}

delete_query_dict = {
delete_old_rows_dict = {
cdr_consts.QUERY:
DELETE_QUERY.render(
DELETE_OLD_ROWS_QUERY.render(
project=self.project_id,
sandbox_dataset=self.sandbox_dataset_id,
sandbox_table=self.sandbox_table_for(OBSERVATION),
dataset=self.dataset_id)
}

insert_query_dict = {
sandbox_new_rows_dict = {
cdr_consts.QUERY:
INSERT_QUERY.render(
SANDBOX_NEW_ROWS_QUERY.render(
project=self.project_id,
sandbox_dataset=self.sandbox_dataset_id,
sandbox_table=self.sandbox_table_for(OBSERVATION),
dataset=self.dataset_id)
}
insert_new_rows_query_dict = {
cdr_consts.QUERY:
INSERT_NEW_ROWS_QUERY.render(
project=self.project_id,
sandbox_dataset=self.sandbox_dataset_id,
sandbox_table=self.sandbox_table_for(OBSERVATION),
Expand Down Expand Up @@ -270,8 +283,9 @@ def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
}

return [
sandbox_query_dict, insert_query_dict, insert_mapping_query_dict,
delete_query_dict, delete_mapping_query_dict
sandbox_old_rows_dict, sandbox_new_rows_dict,
insert_new_rows_query_dict, insert_mapping_query_dict,
delete_old_rows_dict, delete_mapping_query_dict
]

def setup_rule(self, client, *args, **keyword_args):
Expand Down Expand Up @@ -303,10 +317,10 @@ def validate_rule(self, client):
raise NotImplementedError("Please fix me.")

def get_sandbox_tablenames(self):
return [
self.sandbox_table_for(affected_table)
for affected_table in self._affected_tables
]
old_data_sandbox = f'{self.sandbox_table_for(OBSERVATION)}_deleted'
new_data_sandbox = f'{self.sandbox_table_for(OBSERVATION)}_inserted'
mapping_sandbox = f'{self.sandbox_table_for(OBSERVATION)}_mapping_observation'
return [old_data_sandbox, new_data_sandbox, mapping_sandbox]


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ def setUpClass(cls):
# defined in the resource_files folder. It has the columns defined in `create_rdr_snapshot.py` instead.
cls.fq_mapping_table_name = f'{cls.project_id}.{cls.dataset_id}.{MAPPING_PREFIX}{OBSERVATION}'

sandbox_tables = cls.rule_instance.get_sandbox_tablenames()
cls.fq_sandbox_table_names = [
f'{cls.project_id}.{cls.sandbox_id}.{cls.rule_instance.sandbox_table_for(OBSERVATION)}'
f'{cls.project_id}.{cls.sandbox_id}.{table}'
for table in sandbox_tables
]

cls.up_class = super().setUpClass()
Expand Down Expand Up @@ -160,4 +162,4 @@ def test_convert_pre_post_coordinated_concepts(self):

def tearDown(self):
self.client.delete_table(self.fq_mapping_table_name, not_found_ok=True)
super().tearDown()
super().tearDown()

0 comments on commit 40538cf

Please sign in to comment.