diff --git a/README.md b/README.md index 19aae79..e3998bd 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,12 @@ sample [target-config.json](/sample_config/target-config-exchange-rates-api.json * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector (if it finds an old row with same key, updates it. Otherwise it inserts the new row) - WARNING: We do not recommend using `incremental` option (which uses `MERGE` SQL statement). It might result in loss of production data, because historical records get updated. Instead, we recommend using the `append` replication method, which will preserve historical data. +- There's also the option to set an additional parameter `deduplication_property` in order to deduplicate values. Based on an attribute (Typically a timestamp), a query will run before the loading part to BigQuery, removing all duplicates. + - And besides that, the possibility to choose the order of this parameter with `deduplication_order` with: + * **DESC**: Default option. Keeps the one with the bigger deduplication_property or a random element between those who have the bigger one. + * **ASC**: Keeps the one with the smaller deduplication_property or a random element between those who have the bigger one. + + Sample **target-config.json** file: diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index 1988bef..540874a 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -75,6 +75,10 @@ def main(): validate_records = config.get("validate_records", True) add_metadata_columns = config.get("add_metadata_columns", True) + # deduplication arguments + deduplication_property = config.get("deduplication_property", "") + deduplication_order = config.get("deduplication_order", "DESC") + # we can pass merge state option via CLI param merge_state_messages_cli = flags.merge_state_messages @@ -126,7 +130,9 @@ def main(): table_suffix=table_suffix, add_metadata_columns=add_metadata_columns, table_configs=table_configs, - max_cache=max_cache + max_cache=max_cache, + deduplication_property=deduplication_property, + deduplication_order=deduplication_order ) # write a state file diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 16f2e96..51e8a52 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -34,6 +34,11 @@ def __init__(self, logger, **kwargs): # LoadJobProcessHandler kwargs self.truncate = kwargs.get("truncate", False) self.incremental = kwargs.get("incremental", False) + + # deduplication kwargs + self.deduplication_property = kwargs.get("deduplication_property", False) + self.deduplication_order = kwargs.get("deduplication_order", "DESC") + self.add_metadata_columns = kwargs.get("add_metadata_columns", True) self.validate_records = kwargs.get("validate_records", True) self.table_configs = kwargs.get("table_configs", {}) or {} @@ -226,6 +231,27 @@ def primary_key_condition(self, stream): return " and ".join(keys) #TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema) #TODO: test it with dupe ids in the data + def primary_keys(self, stream): + keys = [f"temp.{k}" for k in self.key_properties[stream]] + if len(keys) < 1: + raise Exception(f"No primary keys specified from the tap and Incremental option selected") + return " , ".join(keys) + + def partition_primary_keys(self, stream): + keys = [f"s.{k}" for k in self.key_properties[stream]] + if len(keys) < 1: + raise Exception(f"No primary partition keys specified from the tap and Incremental option selected") + return " , ".join(keys) + + def duplicate_condition(self): + keys = "and t." + self.deduplication_property + "=s." + self.deduplication_property + return keys + + def first_primary_key(self, stream): + keys = self.key_properties[stream][0] + if len(keys) < 1: + raise Exception(f"No first primary key specified from the tap and Incremental option selected") + return "t." + keys def _do_temp_table_based_load(self, rows): assert isinstance(rows, dict) @@ -269,23 +295,58 @@ def _do_temp_table_based_load(self, rows): self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.") table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}" try: - self.client.get_table(table_id) - column_names = [x.name for x in self.bq_schemas[stream]] - - query ="""MERGE `{table}` t - USING `{temp_table}` s - ON {primary_key_condition} - WHEN MATCHED THEN - UPDATE SET {set_values} - WHEN NOT MATCHED THEN - INSERT ({new_cols}) VALUES ({cols}) - """.format(table=table_id, - temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", - primary_key_condition=self.primary_key_condition(stream), - set_values=', '.join(f'{c}=s.{c}' for c in column_names), - new_cols=', '.join(column_names), - cols=', '.join(f's.{c}' for c in column_names)) + self.client.get_table(table_id) + column_names = [f"`{x.name}`" for x in self.bq_schemas[stream]] + + if self.deduplication_property: + + copy_config = CopyJobConfig() + copy_config.write_disposition = WriteDisposition.WRITE_TRUNCATE + + query_duplicates = """CREATE OR REPLACE TABLE `{temp_table}` AS + with last_versions as ( + select {primary_keys} from `{temp_table}` temp + ), dedup as( + select s.*, + ROW_NUMBER() OVER (PARTITION BY {partition_keys} ORDER BY {partition_deduplication_property} {deduplication_order}) AS finish_rank + from `{temp_table}` s + left join last_versions t on {primary_key_condition} + where {first_primary_key} is not null), + final_table as( + SELECT * FROM dedup WHERE finish_rank = 1) + SELECT * EXCEPT (finish_rank) FROM final_table ; + """.format( + temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", + primary_keys=str(self.primary_keys(stream)), + partition_keys=str(self.partition_primary_keys(stream)), + temp_deduplication_property=str("temp." + str(self.deduplication_property)), + partition_deduplication_property=str("s." + str(self.deduplication_property)), + deduplication_property=str(self.deduplication_property), + primary_key_condition=self.primary_key_condition(stream), + duplicate_condition=self.duplicate_condition(), + deduplication_order=str(self.deduplication_order), + first_primary_key=self.first_primary_key(stream)) + + job_config = QueryJobConfig() + query_job = self.client.query(query_duplicates, job_config=job_config) + query_job.result() + + self.logger.info(f"Removed duplicates by attribute: {str(self.deduplication_property)} and {str(self.deduplication_order)} order.") + + query = """MERGE `{table}` t + USING `{temp_table}` s + ON {primary_key_condition} + WHEN MATCHED THEN + UPDATE SET {set_values} + WHEN NOT MATCHED THEN + INSERT ({new_cols}) VALUES ({cols}) + """.format(table=table_id, + temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", + primary_key_condition=self.primary_key_condition(stream), + set_values=', '.join(f'{c}=s.{c}' for c in column_names), + new_cols=', '.join(column_names), + cols=', '.join(f's.{c}' for c in column_names)) job_config = QueryJobConfig() query_job = self.client.query(query, job_config=job_config) query_job.result()