Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduplication option #38

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ sample [target-config.json](/sample_config/target-config-exchange-rates-api.json
* `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector
(if it finds an old row with same key, updates it. Otherwise it inserts the new row)
- WARNING: We do not recommend using `incremental` option (which uses `MERGE` SQL statement). It might result in loss of production data, because historical records get updated. Instead, we recommend using the `append` replication method, which will preserve historical data.
- There's also the option to set an additional parameter `deduplication_property` in order to deduplicate values. Based on an attribute (Typically a timestamp), a query will run before the loading part to BigQuery, removing all duplicates.
- And besides that, the possibility to choose the order of this parameter with `deduplication_order` with:
* **DESC**: Default option. Keeps the one with the bigger deduplication_property or a random element between those who have the bigger one.
* **ASC**: Keeps the one with the smaller deduplication_property or a random element between those who have the bigger one.



Sample **target-config.json** file:

Expand Down
8 changes: 7 additions & 1 deletion target_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def main():
validate_records = config.get("validate_records", True)
add_metadata_columns = config.get("add_metadata_columns", True)

# deduplication arguments
deduplication_property = config.get("deduplication_property", "")
deduplication_order = config.get("deduplication_order", "DESC")

# we can pass merge state option via CLI param
merge_state_messages_cli = flags.merge_state_messages

Expand Down Expand Up @@ -126,7 +130,9 @@ def main():
table_suffix=table_suffix,
add_metadata_columns=add_metadata_columns,
table_configs=table_configs,
max_cache=max_cache
max_cache=max_cache,
deduplication_property=deduplication_property,
deduplication_order=deduplication_order
)

# write a state file
Expand Down
93 changes: 77 additions & 16 deletions target_bigquery/processhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def __init__(self, logger, **kwargs):
# LoadJobProcessHandler kwargs
self.truncate = kwargs.get("truncate", False)
self.incremental = kwargs.get("incremental", False)

# deduplication kwargs
self.deduplication_property = kwargs.get("deduplication_property", False)
self.deduplication_order = kwargs.get("deduplication_order", "DESC")

self.add_metadata_columns = kwargs.get("add_metadata_columns", True)
self.validate_records = kwargs.get("validate_records", True)
self.table_configs = kwargs.get("table_configs", {}) or {}
Expand Down Expand Up @@ -226,6 +231,27 @@ def primary_key_condition(self, stream):
return " and ".join(keys)
#TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema)
#TODO: test it with dupe ids in the data
def primary_keys(self, stream):
keys = [f"temp.{k}" for k in self.key_properties[stream]]
if len(keys) < 1:
raise Exception(f"No primary keys specified from the tap and Incremental option selected")
return " , ".join(keys)

def partition_primary_keys(self, stream):
keys = [f"s.{k}" for k in self.key_properties[stream]]
if len(keys) < 1:
raise Exception(f"No primary partition keys specified from the tap and Incremental option selected")
return " , ".join(keys)

def duplicate_condition(self):
keys = "and t." + self.deduplication_property + "=s." + self.deduplication_property
return keys

def first_primary_key(self, stream):
keys = self.key_properties[stream][0]
if len(keys) < 1:
raise Exception(f"No first primary key specified from the tap and Incremental option selected")
return "t." + keys

def _do_temp_table_based_load(self, rows):
assert isinstance(rows, dict)
Expand Down Expand Up @@ -269,23 +295,58 @@ def _do_temp_table_based_load(self, rows):
self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.")
table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}"
try:
self.client.get_table(table_id)
column_names = [x.name for x in self.bq_schemas[stream]]

query ="""MERGE `{table}` t
USING `{temp_table}` s
ON {primary_key_condition}
WHEN MATCHED THEN
UPDATE SET {set_values}
WHEN NOT MATCHED THEN
INSERT ({new_cols}) VALUES ({cols})
""".format(table=table_id,
temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}",
primary_key_condition=self.primary_key_condition(stream),
set_values=', '.join(f'{c}=s.{c}' for c in column_names),
new_cols=', '.join(column_names),
cols=', '.join(f's.{c}' for c in column_names))

self.client.get_table(table_id)
column_names = [f"`{x.name}`" for x in self.bq_schemas[stream]]

if self.deduplication_property:

copy_config = CopyJobConfig()
copy_config.write_disposition = WriteDisposition.WRITE_TRUNCATE

query_duplicates = """CREATE OR REPLACE TABLE `{temp_table}` AS
with last_versions as (
select {primary_keys} from `{temp_table}` temp
), dedup as(
select s.*,
ROW_NUMBER() OVER (PARTITION BY {partition_keys} ORDER BY {partition_deduplication_property} {deduplication_order}) AS finish_rank
from `{temp_table}` s
left join last_versions t on {primary_key_condition}
where {first_primary_key} is not null),
final_table as(
SELECT * FROM dedup WHERE finish_rank = 1)
SELECT * EXCEPT (finish_rank) FROM final_table ;
""".format(
temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}",
primary_keys=str(self.primary_keys(stream)),
partition_keys=str(self.partition_primary_keys(stream)),
temp_deduplication_property=str("temp." + str(self.deduplication_property)),
partition_deduplication_property=str("s." + str(self.deduplication_property)),
deduplication_property=str(self.deduplication_property),
primary_key_condition=self.primary_key_condition(stream),
duplicate_condition=self.duplicate_condition(),
deduplication_order=str(self.deduplication_order),
first_primary_key=self.first_primary_key(stream))

job_config = QueryJobConfig()
query_job = self.client.query(query_duplicates, job_config=job_config)
query_job.result()

self.logger.info(f"Removed duplicates by attribute: {str(self.deduplication_property)} and {str(self.deduplication_order)} order.")

query = """MERGE `{table}` t
USING `{temp_table}` s
ON {primary_key_condition}
WHEN MATCHED THEN
UPDATE SET {set_values}
WHEN NOT MATCHED THEN
INSERT ({new_cols}) VALUES ({cols})
""".format(table=table_id,
temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}",
primary_key_condition=self.primary_key_condition(stream),
set_values=', '.join(f'{c}=s.{c}' for c in column_names),
new_cols=', '.join(column_names),
cols=', '.join(f's.{c}' for c in column_names))
job_config = QueryJobConfig()
query_job = self.client.query(query, job_config=job_config)
query_job.result()
Expand Down