Skip to content

Commit

Permalink
Help user on Redshift debugging, remove red herring error message on …
Browse files Browse the repository at this point in the history
…pure ASCII files (#45)
  • Loading branch information
vinceatbluelabs authored Apr 20, 2020
1 parent 9aecda7 commit 421b3d0
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 3 deletions.
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
90.1100
90.1200
2 changes: 1 addition & 1 deletion records_mover/db/bigquery/bigquery_db_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class BigQueryDBDriver(DBDriver):
def __init__(self,
db: Union[sqlalchemy.engine.Connection, sqlalchemy.engine.Engine],
url_resolver: UrlResolver,
**kwargs) -> None:
**kwargs: object) -> None:
super().__init__(db)
self._bigquery_loader = BigQueryLoader(db=self.db, url_resolver=url_resolver)

Expand Down
14 changes: 13 additions & 1 deletion records_mover/db/redshift/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,19 @@ def load(self,
empty_as_null=True,
**redshift_options) # type: ignore
logger.info(f"Starting Redshift COPY from {directory}...")
self.db.execute(copy)
redshift_pid: int = self.db.execute("SELECT pg_backend_pid();").scalar()
try:
self.db.execute(copy)
except sqlalchemy.exc.InternalError:
# Upon a load erorr, we receive:
#
# sqlalchemy.exc.InternalError:
# (psycopg2.errors.InternalError_) Load into table 'tablename'
# failed. Check 'stl_load_errors' system table for details.
logger.warning("Caught load error - "
"for details, run this query: "
f"SELECT * FROM stl_load_errors WHERE session={redshift_pid}")
raise
logger.info("Redshift COPY complete.")
return None # redshift doesn't give reliable info on load results

Expand Down
2 changes: 2 additions & 0 deletions records_mover/records/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def cant_handle_hint(fail_if_cant_handle_hint: bool, hint_name: str, hints: Reco
# But let's be ready if they change their minds:
'UTF-8': 'UTF8',
'Windows-1252': 'CP1252',
# even if the only data it saw was in ASCII, let's be ready to see more
'ascii': 'UTF8',
}

hint_compression_from_pandas = {
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/db/redshift/test_loader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import sqlalchemy
from records_mover.db.redshift.loader import RedshiftLoader
from records_mover.records.records_format import DelimitedRecordsFormat
from mock import patch, Mock, MagicMock
Expand Down Expand Up @@ -105,3 +106,44 @@ def test_load_non_s3(self,
empty_as_null=True,
abc=123)
self.mock_db.execute.assert_called_with(mock_copy)

@patch('records_mover.db.redshift.loader.CopyCommand')
@patch('records_mover.db.redshift.loader.complain_on_unhandled_hints')
@patch('records_mover.db.redshift.loader.redshift_copy_options')
@patch('records_mover.db.redshift.loader.Table')
def test_load_load_error(self,
mock_Table,
mock_redshift_copy_options,
mock_complain_on_unhandled_hints,
mock_CopyCommand):
mock_schema = Mock(name='schema')
mock_table = Mock(name='table')
mock_load_plan = Mock(name='load_plan')
mock_load_plan.records_format = Mock(name='records_format', spec=DelimitedRecordsFormat)
mock_load_plan.records_format.hints = {}
mock_directory = Mock(name='directory')
mock_directory.scheme = 'mumble'

mock_s3_directory = mock_directory.copy_to.return_value
mock_s3_directory.scheme = 's3'

mock_redshift_options = {'abc': 123}
mock_redshift_copy_options.return_value = mock_redshift_options
mock_copy = mock_CopyCommand.return_value
mock_s3_directory.loc.url = 's3://foo/bar/baz/'

def db_execute(command):
if command == 'SELECT pg_backend_pid();':
mock_result_proxy = Mock(name='result_proxy')
mock_result_proxy.scalar.return_value = 123
return mock_result_proxy
elif command == mock_copy:
raise sqlalchemy.exc.InternalError(command, {}, '')
raise NotImplementedError(command)

self.mock_db.execute.side_effect = db_execute
with self.assertRaises(sqlalchemy.exc.InternalError):
self.redshift_loader.load(schema=mock_schema,
table=mock_table,
load_plan=mock_load_plan,
directory=mock_directory)

0 comments on commit 421b3d0

Please sign in to comment.