Skip to content

Commit

Permalink
Allow kwargs to be passed through upsert_rows in db_sync
Browse files Browse the repository at this point in the history
This more accurately copies the behavior of copy_rows and allows
passing arbitrary kwargs to the target db copy method.
  • Loading branch information
austinweisgrau committed Oct 10, 2024
1 parent d310e1c commit 111acf9
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions parsons/databases/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def table_sync_incremental(
dest_max,
updated_at_column,
primary_key,
**kwargs,
)

logger.info("Upserted %s updated rows to %s.", rows_upserted, destination_table)
Expand Down Expand Up @@ -401,6 +402,7 @@ def upsert_rows(
], # Type hint is probably incomplete
updated_at_column: str,
primary_key: str,
**kwargs,
) -> int:
"""
Upsert rows from the source to the destination based on updated_at_column
Expand All @@ -416,6 +418,8 @@ def upsert_rows(
Column which tracks the update timestamp
primary_key:
Column which serves as unique primary key
**kwargs: args
Optional copy arguments for destination database.
`Returns:`
total_rows_written: int
"""
Expand Down Expand Up @@ -448,7 +452,7 @@ def upsert_rows(
if not len(rows) or len(buffer) >= self.write_chunk_size:
logger.debug("Copying %s rows to %s", len(buffer), destination_table_name)
if not self.dest_db.table_exists(destination_table_name):
self.dest_db.copy(buffer, destination_table_name, if_exists="append")
self.dest_db.copy(buffer, destination_table_name, if_exists="append", **kwargs)
else:
# Load buffer to temp table, upsert from temp table
temp_table_name = (
Expand All @@ -460,12 +464,12 @@ def upsert_rows(
# If certain columns in buffer are null, the types may not line up
# in destination db. We need a way to deal with this, but there is
# no unified API for handling schema
if isinstance(self.dest_db, GoogleBigQuery):
kwargs = {
"schema": self.dest_db.client.get_table(destination_table_name).schema
}
else:
if not kwargs:
kwargs = {}
if "schema" not in kwargs and isinstance(self.dest_db, GoogleBigQuery):
kwargs["schema"] = self.dest_db.client.get_table(
destination_table_name
).schema

self.dest_db.copy(buffer, temp_table_name, if_exists="drop", **kwargs)
try:
Expand Down

0 comments on commit 111acf9

Please sign in to comment.