Skip to content

Commit

Permalink
depedency cascade (#833)
Browse files Browse the repository at this point in the history
* cascade

* black

---------

Co-authored-by: Elyse Weiss <[email protected]>
  • Loading branch information
elyse-weiss and Elyse Weiss authored Jun 6, 2023
1 parent 52ec230 commit 2d057fa
Showing 1 changed file with 3 additions and 7 deletions.
10 changes: 3 additions & 7 deletions parsons/databases/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ def query_with_connection(self, sql, connection, parameters=None, commit=True):
# rows in the correct order

with self.cursor(connection) as cursor:

if "credentials" not in sql:
logger.debug(f"SQL Query: {sql}")
cursor.execute(sql, parameters)
Expand All @@ -235,7 +234,6 @@ def query_with_connection(self, sql, connection, parameters=None, commit=True):
return None

else:

# Fetch the data in batches, and "pickle" the rows to a temp file.
# (We pickle rather than writing to, say, a CSV, so that we maintain
# all the type information for each field.)
Expand Down Expand Up @@ -397,7 +395,6 @@ def copy_s3(
"""

with self.connection() as connection:

if self._create_table_precheck(connection, table_name, if_exists):
if template_table:
sql = f"CREATE TABLE {table_name} (LIKE {template_table})"
Expand Down Expand Up @@ -620,7 +617,6 @@ def copy(
cols = None

with self.connection() as connection:

# Check to see if the table exists. If it does not or if_exists = drop, then
# create the new table.
if self._create_table_precheck(connection, table_name, if_exists):
Expand Down Expand Up @@ -859,7 +855,6 @@ def generate_manifest(
# Generate manifest file
manifest = {"entries": []}
for bucket in buckets:

# Retrieve list of files in bucket
key_list = s3.list_keys(bucket, prefix=prefix)
for key in key_list:
Expand Down Expand Up @@ -982,7 +977,6 @@ def upsert(
raise ValueError("Primary key column contains duplicate values.")

with self.connection() as connection:

try:
# Copy to a staging table
logger.info(f"Building staging table: {staging_tbl}")
Expand Down Expand Up @@ -1087,7 +1081,9 @@ def drop_dependencies_for_cols(self, schema, table, cols):
tbl = self.query_with_connection(sql_depend, connection)
dropped_views = [row["table_name"] for row in tbl]
if dropped_views:
sql_drop = "\n".join([f"drop view {view};" for view in dropped_views])
sql_drop = "\n".join(
[f"drop view {view} CASCADE;" for view in dropped_views]
)
tbl = self.query_with_connection(sql_drop, connection)
logger.info(f"Dropped the following views: {dropped_views}")

Expand Down

0 comments on commit 2d057fa

Please sign in to comment.