Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load via INSERT on Redshift when scratch bucket not available #114

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a66f4db
Load via INSERT on Redshift when scratch bucket not available
vinceatbluelabs Oct 27, 2020
d20d77f
Add TODO
vinceatbluelabs Oct 27, 2020
e4bd0ea
Retire SupportsMoveFromTempLocAfterFillingIt from DataUrlTarget
vinceatbluelabs Oct 27, 2020
f53587a
Merge remote-tracking branch 'origin/retire_protocol' into do_slow_re…
vinceatbluelabs Oct 27, 2020
ef0b8c1
Add comment
vinceatbluelabs Oct 27, 2020
643ee51
Implement for Vertica
vinceatbluelabs Oct 27, 2020
2ec1930
TODONE
vinceatbluelabs Oct 27, 2020
97dc95c
Fix refactored logic
vinceatbluelabs Oct 27, 2020
575a3cd
Revert change
vinceatbluelabs Oct 27, 2020
3314b64
Refactor
vinceatbluelabs Oct 27, 2020
ce9371e
Clean up and drop TODOs
vinceatbluelabs Oct 27, 2020
66a5f2f
Fix tests
vinceatbluelabs Oct 28, 2020
bdd6099
Fix unused import
vinceatbluelabs Oct 28, 2020
7b239cb
Fix flake8 issues
vinceatbluelabs Oct 28, 2020
dc7eb21
Fix coverage slip
vinceatbluelabs Oct 28, 2020
ce539ec
Fix missing special case revealed by Redshift testing
vinceatbluelabs Oct 28, 2020
f45011a
Unratchet a bit
vinceatbluelabs Oct 29, 2020
ad67b78
Revert chars/bytes fix--8 should be valid in UTF-8 for simple times
vinceatbluelabs Oct 29, 2020
4caebd9
Reratchet tests
vinceatbluelabs Oct 29, 2020
9ca1a7a
Clarify semantics of can_load_direct() and fix incorrect definition
vinceatbluelabs Oct 29, 2020
b03f064
Fix test
vinceatbluelabs Oct 29, 2020
504b39b
Clarify comment
vinceatbluelabs Oct 29, 2020
b58f999
Rename method to be more explicit
vinceatbluelabs Oct 29, 2020
b6fff82
Drop commented code
vinceatbluelabs Oct 29, 2020
d9ea8a1
Fix quality issue
vinceatbluelabs Oct 29, 2020
4edebe3
Shorten method names and make consistent
vinceatbluelabs Oct 29, 2020
82b940e
Fix function names in tests
vinceatbluelabs Oct 29, 2020
f2f1f0c
Clarify documentation
vinceatbluelabs Oct 29, 2020
e5a49ea
Merge remote-tracking branch 'origin/retire_protocol' into clarify_mo…
vinceatbluelabs Oct 29, 2020
44e7e68
Merge remote-tracking branch 'origin/clarify_move_algorithm_predicate…
vinceatbluelabs Nov 5, 2020
aa05ef7
Merge remote-tracking branch 'origin/master' into do_slow_redshift_lo…
vinceatbluelabs Nov 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.3400
92.3600
3 changes: 3 additions & 0 deletions records_mover/db/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
with TemporaryDirectory(prefix='temporary_loadable_directory_loc') as dirname:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loader is an abstract class made concrete by each of the database types which can load out of a records directory.

yield FilesystemDirectoryUrl(dirname)

def has_temporary_loadable_directory_loc(self) -> bool:
return True

@abstractmethod
def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]:
"""Candidates to look through when negotiating a common records format
Expand Down
6 changes: 6 additions & 0 deletions records_mover/db/redshift/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ class RedshiftLoader(LoaderFromRecordsDirectory):
def __init__(self,
db: Union[sqlalchemy.engine.Engine, sqlalchemy.engine.Connection],
meta: sqlalchemy.MetaData,
s3_temp_base_loc: Optional[BaseDirectoryUrl],
# TODO: Is this silly or?
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
temporary_s3_directory_loc: Callable[[], ContextManager[BaseDirectoryUrl]])\
-> None:
self.db = db
self.meta = meta
self.s3_temp_base_loc = s3_temp_base_loc
self.temporary_s3_directory_loc = temporary_s3_directory_loc

def load(self,
Expand Down Expand Up @@ -158,3 +161,6 @@ def best_scheme_to_load_from(self) -> str:
def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
with self.temporary_s3_directory_loc() as temp_loc:
yield temp_loc

def has_temporary_loadable_directory_loc(self) -> bool:
return self.s3_temp_base_loc is not None
1 change: 1 addition & 0 deletions records_mover/db/redshift/redshift_db_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self,
self._redshift_loader =\
RedshiftLoader(db=db,
meta=self.meta,
s3_temp_base_loc=s3_temp_base_loc,
temporary_s3_directory_loc=self.temporary_s3_directory_loc)
self._redshift_unloader =\
RedshiftUnloader(db=db,
Expand Down
3 changes: 3 additions & 0 deletions records_mover/db/vertica/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
with self.s3_temp_base_loc.temporary_directory() as temp_loc:
yield temp_loc

def has_temporary_loadable_directory_loc(self) -> bool:
raise NotImplementedError # TODO
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved

def aws_creds_sql(self, aws_id: str, aws_secret: str) -> str:
return """
ALTER SESSION SET UDPARAMETER FOR awslib aws_id={aws_id};
Expand Down
7 changes: 4 additions & 3 deletions records_mover/records/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
SupportsToFileobjsSource,
FileobjsSource, SupportsToDataframesSource)
from .targets.base import (RecordsTarget, SupportsMoveFromRecordsDirectory,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
Copy link
Contributor Author

@vinceatbluelabs vinceatbluelabs Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another one of those interfaces implemented by sources and targets (in this case, targets). I renamed this interface to be more tentative - as now it can tell you at runtime whether it is able to do do what it says on the tin.

Specifically, this interface has a function which tells the target to:

  1. create a temporary location that the target can load data from efficiently (e.g., an S3 bucket directory that Redshift can run a COPY statement from)
  2. tell the source to fill in that temporary location with a records directory (e.g., copy a CSV file from your local disk to that temporary S3 directory and add a manifest that Redshift likes)
  3. load from that temporary location (e.g., run the Redshift COPY command)

MightSupportMoveFromFileobjsSource,
SupportsMoveFromDataframes)
from .sources import base as sources_base
Expand Down Expand Up @@ -123,8 +123,9 @@ def move(records_source: RecordsSource,
as fileobjs_source:
return move(fileobjs_source, records_target, processing_instructions)
elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and
isinstance(records_target, SupportsMoveFromTempLocAfterFillingIt) and
records_source.has_compatible_format(records_target)):
isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and
records_source.has_compatible_format(records_target) and
records_target.can_move_from_temp_loc_after_filling_it()):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a key bit - I'm changing the move() function (the key algorithm for Records Mover) to only use this interface's function if the interface says it's OK.

In the most common case of a Table target, that means that the database in question has a temporary bucket location configured that it can do a bulk export to.

logger.info(f"Mover: copying from {records_source} to {records_target} "
f"by filling in a temporary location...")
return records_target.move_from_temp_loc_after_filling_it(records_source,
Expand Down
11 changes: 10 additions & 1 deletion records_mover/records/targets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,16 @@ def can_move_from_fileobjs_source(self) -> bool:
pass


class SupportsMoveFromTempLocAfterFillingIt(NegotiatesRecordsFormat, metaclass=ABCMeta):
class MightSupportMoveFromTempLocAfterFillingIt(NegotiatesRecordsFormat, metaclass=ABCMeta):
# TODO: Make sure Redshift and Vertica target is configured as
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
# well on this. Do we really not do any of this today?
@abstractmethod
def can_move_from_temp_loc_after_filling_it(self) -> bool:
"""Returns True if target as currently configured can be handed a
temporary location and fill it.
"""
pass

@abstractmethod
def move_from_temp_loc_after_filling_it(self,
records_source:
Expand Down
11 changes: 9 additions & 2 deletions records_mover/records/targets/data_url.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ..results import MoveResult
from ..records_directory import RecordsDirectory
from .base import (SupportsMoveFromDataframes,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
SupportsMoveToRecordsDirectory,
SupportsMoveFromRecordsDirectory)
from ..processing_instructions import ProcessingInstructions
Expand All @@ -16,7 +16,7 @@


class DataUrlTarget(SupportsMoveFromDataframes,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
SupportsMoveFromRecordsDirectory):
def __init__(self,
output_loc: BaseFileUrl,
Expand Down Expand Up @@ -63,6 +63,13 @@ def move_from_records_directory(self,
records_format.generate_filename('data'): self.output_loc.url
})

def can_move_from_temp_loc_after_filling_it(self) -> bool:
# we can always create a temporary directory
return True

# TODO: Why even does this implement this protocol? Can I go
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
# ahead and yank it and still have the tests pass? Try a PR and
# see.
def move_from_temp_loc_after_filling_it(self,
records_source:
SupportsMoveToRecordsDirectory,
Expand Down
24 changes: 14 additions & 10 deletions records_mover/records/targets/table/move_from_dataframes_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,29 @@ def __init__(self,
super().__init__(prep, target_table_details, processing_instructions)

def move(self) -> MoveResult:
if len(self.table_target.known_supported_records_formats()) != 0:
if self.table_target.can_move_from_fileobjs_source():
return self.move_from_dataframes_source_via_fileobjs()
else:
# Some databases, like Redshift, can't load from a
# stream, but can load from files on an object store
# they're pointed to.
return self.move_from_dataframes_source_via_records_directory()
# TODO: Clean up redundant clause
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
if (len(self.table_target.known_supported_records_formats()) != 0 and
self.table_target.can_move_from_fileobjs_source()):
return self.move_from_dataframes_source_via_fileobjs()
elif (len(self.table_target.known_supported_records_formats()) != 0 and
self.table_target.can_move_from_fileobjs_source() and
self.table_target.can_move_from_temp_loc_after_filling_it()):
# Some databases, like Redshift, can't load from a
# stream, but can load from files on an object store
# they're pointed to.
return self.move_from_dataframes_source_via_temporary_records_directory()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slightly more esoteric usage of this interface, where the database table target has been told to take a dataframe and load the data inside. There's a bit of a re-creation of a couple of parts of the move() algorithm here. Not sure I'm happy with that re-creation; it might be an area for future refactoring if I can figure out how to make it work as part of the main move() algorithm.

But I digress.

This changes things so that when we are given dataframe (or create one as an intermediary step), we can ingest it into a database even if that database doesn't have a temporary bucket configured.

else:
logger.info("Known formats for target database: "
f"{self.table_target.known_supported_records_formats()}")
logger.info("Table target can move from fileobjs source? "
f"{self.table_target.can_move_from_fileobjs_source()}")
logger.warning("Loading via INSERT statement as this DB "
"driver does not yet support more direct LOAD methods. "
"driver does not yet support or is not configured for "
"more direct load methods. "
"This may be very slow.")
return self.move_from_dataframes_source_via_insert()

def move_from_dataframes_source_via_records_directory(self) -> MoveResult:
def move_from_dataframes_source_via_temporary_records_directory(self) -> MoveResult:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify that this function uses a temp directory, as it was a bit of a surprise to me as well by the original name.

records_format = next(iter(self.table_target.known_supported_records_formats()), None)
with self.dfs_source.to_fileobjs_source(self.processing_instructions,
records_format) as fileobjs_source:
Expand Down
11 changes: 9 additions & 2 deletions records_mover/records/targets/table/target.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from records_mover.records.targets.base import (
SupportsMoveFromRecordsDirectory,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
MightSupportMoveFromFileobjsSource,
SupportsMoveFromDataframes,
)
Expand Down Expand Up @@ -31,7 +31,7 @@


class TableRecordsTarget(SupportsMoveFromRecordsDirectory,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
MightSupportMoveFromFileobjsSource,
SupportsMoveFromDataframes,
TargetTableDetails):
Expand Down Expand Up @@ -109,6 +109,13 @@ def can_move_from_this_format(self,
return False
return loader.can_load_this_format(source_records_format)

def can_move_from_temp_loc_after_filling_it(self) -> bool:
driver = self.db_driver(self.db_engine)
loader = driver.loader()
if loader is None:
return False
return loader.has_temporary_loadable_directory_loc()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key implementation of the expanded interface.


def move_from_temp_loc_after_filling_it(self,
records_source:
SupportsMoveToRecordsDirectory,
Expand Down