Skip to content

Commit

Permalink
patch: Escaping table identifiers for deduplication and overwrite cas…
Browse files Browse the repository at this point in the history
…es (#38)
  • Loading branch information
emishas authored May 8, 2023
1 parent c4cc54f commit e1b4ee1
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions target_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ def clean_up(self) -> None:
if self._is_dedupe_before_upsert_candidate():
# We can't use MERGE with a non-unique key, so we need to dedupe the temp table into
# a _SESSION scoped intermediate table.
tmp = f"{target.name}__tmp"
tmp = f"{self.merge_target.name}__tmp"
dedupe_query = (
f"SELECT * FROM {self.table} "
f"SELECT * FROM {self.table.get_escaped_name()} "
f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(self.key_properties)} "
f"ORDER BY COALESCE({', '.join(date_columns)}) DESC) = 1"
)
Expand Down Expand Up @@ -531,10 +531,11 @@ def clean_up(self) -> None:
# Do it in a transaction to avoid partial writes.
target = self.overwrite_target.as_table()
self.client.query(
f"DROP TABLE IF EXISTS {self.overwrite_target.get_escaped_name()}; "
f"CREATE TABLE {self.overwrite_target} LIKE {self.table.get_escaped_name()} AS "
f"SELECT * FROM {self.table.get_escaped_name()};"
f"DROP TABLE IF EXISTS {self.table.get_escaped_name()};"
f"DROP TABLE IF EXISTS {self.overwrite_target.get_escaped_name()}; CREATE TABLE"
f" {self.overwrite_target.get_escaped_name()} LIKE"
f" {self.table.get_escaped_name()} AS SELECT * FROM"
f" {self.table.get_escaped_name()};DROP TABLE IF EXISTS"
f" {self.table.get_escaped_name()};"
).result()
self.table = self.merge_target
self.merge_target = None
Expand Down

0 comments on commit e1b4ee1

Please sign in to comment.