Skip to content

Commit

Permalink
Merge branch 'main' into bigquery_fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cmdelrio authored Jan 30, 2024
2 parents b59e64b + 9fe215f commit 748124a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 8 deletions.
28 changes: 22 additions & 6 deletions parsons/action_kit/action_kit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import requests
import time
import math

from parsons.etl.table import Table
from parsons.utilities import check_env
Expand Down Expand Up @@ -1307,15 +1308,30 @@ def collect_upload_errors(self, result_array):
for res in result_array:
upload_id = res.get("id")
if upload_id:
# Pend until upload is complete
while True:
upload = self._base_get(endpoint="upload", entity_id=upload_id)
if not upload or upload.get("status") != "new":
if upload.get("is_completed"):
break
else:
time.sleep(1)
error_data = self._base_get(
endpoint="uploaderror", params={"upload": upload_id}
)
logger.debug(f"error collect result: {error_data}")
errors.extend(error_data.get("objects") or [])

# ActionKit limits length of error list returned
# Iterate until all errors are gathered
error_count = upload.get("has_errors")
limit = 20

error_pages = math.ceil(error_count / limit)
for page in range(0, error_pages):
error_data = self._base_get(
endpoint="uploaderror",
params={
"upload": upload_id,
"_limit": limit,
"_offset": page * limit,
},
)
logger.debug(f"error collect result: {error_data}")
errors.extend(error_data.get("objects", []))

return errors
11 changes: 11 additions & 0 deletions parsons/databases/redshift/rs_table_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,17 @@ def get_columns_list(self, schema, table_name):
`Returns:`
A list of column names.
"""
schema = (
f'"{schema}"'
if not (schema.startswith('"') and schema.endswith('"'))
else schema
)

table_name = (
f'"{table_name}"'
if not (table_name.startswith('"') and table_name.endswith('"'))
else table_name
)

first_row = self.query(f"select * from {schema}.{table_name} limit 1")

Expand Down
27 changes: 25 additions & 2 deletions test/test_action_kit.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,31 @@ def test_table_split(self):
)

def test_collect_errors(self):
resp_mock = mock.MagicMock()
type(resp_mock.get()).json = lambda x: {"is_completed": True, "has_errors": 25}
self.actionkit.conn = resp_mock

self.actionkit.collect_upload_errors([{"id": "12345"}])
self.actionkit.conn.get.assert_called_with(

self.actionkit.conn.get.assert_any_call(
"https://domain.actionkit.com/rest/v1/upload/12345/", params=None
)

# With 25 errors, we will view two pages
self.actionkit.conn.get.assert_any_call(
"https://domain.actionkit.com/rest/v1/uploaderror/",
params={"upload": "12345"},
params={"upload": "12345", "_limit": 20, "_offset": 0},
)
self.actionkit.conn.get.assert_any_call(
"https://domain.actionkit.com/rest/v1/uploaderror/",
params={"upload": "12345", "_limit": 20, "_offset": 20},
)

# Assert that we don't attempt to view a third page
assert (
mock.call(
"https://domain.actionkit.com/rest/v1/uploaderror/",
params={"upload": "12345", "_limit": 20, "_offset": 40},
)
not in self.actionkit.conn.get.call_args_list
), "Called with invalid arguments."

0 comments on commit 748124a

Please sign in to comment.