-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Load via INSERT on Redshift when scratch bucket not available #114
Load via INSERT on Redshift when scratch bucket not available #114
Conversation
records_mover/records/targets/table/move_from_dataframes_source.py
Outdated
Show resolved
Hide resolved
It's not needed anymore, presumably due to changes in the move() algorithm.
…dshift_load_when_bucket_load_not_available
@@ -23,11 +23,19 @@ class RedshiftLoader(LoaderFromRecordsDirectory): | |||
def __init__(self, | |||
db: Union[sqlalchemy.engine.Engine, sqlalchemy.engine.Connection], | |||
meta: sqlalchemy.MetaData, | |||
temporary_s3_directory_loc: Callable[[], ContextManager[BaseDirectoryUrl]])\ | |||
s3_temp_base_loc: Optional[BaseDirectoryUrl])\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was doing a too-clever-by-half thing before and passing in a function that creates a temporary directory in the right place in S3.
Now I need to know things like "do I actually have a bucket in which to create the temporary directory", so I'm passing in something more low level (at the price of duplicating a three line function between two classes with no inheritance relationship).
@@ -47,6 +47,12 @@ def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]: | |||
with TemporaryDirectory(prefix='temporary_loadable_directory_loc') as dirname: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loader is an abstract class made concrete by each of the database types which can load out of a records directory.
@@ -2,7 +2,7 @@ | |||
SupportsToFileobjsSource, | |||
FileobjsSource, SupportsToDataframesSource) | |||
from .targets.base import (RecordsTarget, SupportsMoveFromRecordsDirectory, | |||
SupportsMoveFromTempLocAfterFillingIt, | |||
MightSupportMoveFromTempLocAfterFillingIt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another one of those interfaces implemented by sources and targets (in this case, targets). I renamed this interface to be more tentative - as now it can tell you at runtime whether it is able to do do what it says on the tin.
Specifically, this interface has a function which tells the target to:
- create a temporary location that the target can load data from efficiently (e.g., an S3 bucket directory that Redshift can run a COPY statement from)
- tell the source to fill in that temporary location with a records directory (e.g., copy a CSV file from your local disk to that temporary S3 directory and add a manifest that Redshift likes)
- load from that temporary location (e.g., run the Redshift COPY command)
records_source.has_compatible_format(records_target)): | ||
isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and | ||
records_source.has_compatible_format(records_target) and | ||
records_target.can_move_from_temp_loc_after_filling_it()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a key bit - I'm changing the move() function (the key algorithm for Records Mover) to only use this interface's function if the interface says it's OK.
In the most common case of a Table target, that means that the database in question has a temporary bucket location configured that it can do a bulk export to.
"This may be very slow.") | ||
return self.move_from_dataframes_source_via_insert() | ||
|
||
def move_from_dataframes_source_via_records_directory(self) -> MoveResult: | ||
def move_from_dataframes_source_via_temporary_records_directory(self) -> MoveResult: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify that this function uses a temp directory, as it was a bit of a surprise to me as well by the original name.
loader = driver.loader() | ||
if loader is None: | ||
return False | ||
return loader.has_temporary_loadable_directory_loc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key implementation of the expanded interface.
…ve_algorithm_predicates
…s' into do_slow_redshift_load_when_bucket_load_not_available
…ad_when_bucket_load_not_available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As always, thank you for the comments. Very helpful. 👍🏼
This PR teaches Records Mover that it can use INSERT statements to import to Redshift when nothing else is possible. INSERT doesn't involve AWS credentials and a temporary S3 bucket.
This is sort of useful on its own, but primarily it sets me up in the next PR (#113) to do something similar for BigQuery, so folks don't need to configure a BigQuery Google Cloud Storage bucket just to be able to do imports like they're able to do today.