-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do slow redshift unload via SELECT when bucket unload not available #117
Do slow redshift unload via SELECT when bucket unload not available #117
Conversation
It's not needed anymore, presumably due to changes in the move() algorithm.
…dshift_load_when_bucket_load_not_available
…s' into do_slow_redshift_unload_when_bucket_unload_not_available
…ve_algorithm_predicates
…load_when_bucket_unload_not_available
@@ -1 +1 @@ | |||
93.6900 | |||
93.7300 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested the bulk of the new code added, and threw in some unrelated tests as well to get this number up.
@@ -42,6 +42,9 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool | |||
method""" | |||
... | |||
|
|||
def temporary_loadable_directory_scheme(self) -> str: | |||
return 'file' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a refresher:
- Scheme/URL scheme: the "s3" in
s3://bucket/directory/file.csv
- DBDriver abstract classes: This is a wrapper on top of SQLAlchemy's database dialects that describe how to load/unload data and also provide some extra information about things like the numeric ranges of types.
- Loader/Unloader abstract classes: These are the interfaces extended for each database to show how to do bulk imports/exports.
- source/target: sources and targets are things like files on disk, directories in S3, tables in databases, etc - you move data with Records Mover from a source to a target.
- Source/Target abstract classes: these are abstract interfaces that show what capabilities a given source or target have. The table source and table target of course use the DBDriver interface extensively to know what capabilities a specific database have.
- move() algorithm - uses Source and Target objects and inspect to find out which particular Source/Target abstract classes are implemented to figure out the fastest way to move data from a source to a target.
So this is an additional exported bit of info on the Loader interface that helps us know that if we had to create a temporary directory to load from, where it would be created. We'll see where that's used later on in the PR here.
def can_unload_to_scheme(self, scheme: str) -> bool: | ||
# Unloading is done via streams, so it is scheme-independent | ||
# and requires no scratch buckets. | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In one movement scenario, we ask a source database to export into a records directory (basically a pile of CSVs with a JSON file for metadata). This is how we can find out if the source database can export into the right kind of place for the target database (or whatever) to import.
@@ -47,7 +47,7 @@ def unload(self, | |||
table: str, | |||
unload_plan: RecordsUnloadPlan, | |||
directory: RecordsDirectory) -> Optional[int]: | |||
if not self.s3_available(): | |||
if not self.s3_export_available(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm separating out the concepts of "does the database have the right add-ons installed to export to S3" and "do we have a temporary S3 bucket available".
Even if we don't have a temporary S3 bucket available, we may be able to export straight to the destination S3 bucket!
return [DelimitedRecordsFormat(variant='vertica')] | ||
else: | ||
return [] | ||
return [DelimitedRecordsFormat(variant='vertica')] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an existing kludge to try to accomplish some of the "negotiation" happening with this PR. Now that we have it more fully baked at the move() algorithm level, we can tear down this hack, as it's no longer needed.
@@ -126,6 +127,8 @@ def move(records_source: RecordsSource, | |||
elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and | |||
isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and | |||
records_source.has_compatible_format(records_target) and | |||
records_source. | |||
can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These three lines added lines are the meat of this PR. They result in us not trying to do bulk exports when we'd need a temporary bucket and don't have one. As a result, we move on and try a less efficient approach rather than dying with an exception that asks the user to provide a temporary bucket.
The first clause is generally about database bulk exports - e.g., table2url, table2file, table2recordsdir, etc.
The second clause is used when the target isn't a file - e.g., table2table - we need a temporary place to dump all the CSV files, and need to make sure that the temporary place is somewhere that the source can currently export.
expensive when data is large and/or network bandwidth is | ||
limited. | ||
""" | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the abstract classes that show what capabilities a Source has.
loader = driver.loader() | ||
if loader is None: | ||
raise TypeError("Please check can_move_from_temp_loc_after_filling_it() " | ||
"before calling this") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't the ideal from a type safety perspective, but I looked at a few alternatives and couldn't come up with a better approach without making the move() method much uglier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the understanding that this not shown to the External API, and mostly as a signal to fix logic elsewhere, I see no reason to belabor this point.
…e' of vinceatbluelabs.github.com:bluelabsio/records-mover into do_slow_redshift_unload_when_bucket_unload_not_available
with TemporaryDirectory(prefix='temporary_loadable_directory_loc') as dirname: | ||
yield FilesystemDirectoryUrl(dirname) | ||
|
||
def has_temporary_loadable_directory_loc(self) -> bool: | ||
"""Returns True if a temporary directory can be provided by | ||
temporary_loadable_directory_loc()""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be more consistent with documentation in this key class.
s3_temp_base_loc=None) | ||
self.assertTrue(redshift_unloader.can_unload_to_scheme('s3')) | ||
|
||
def test_can_unload_to_scheme_file_with_temp_bucket_True(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the most pedantic comment, but we're here.... True is capitalized in the function name but not in others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment about capitalizing True in a function name, but otherwise LGTM.
s3_temp_base_loc=None) | ||
self.assertFalse(redshift_unloader.can_unload_to_scheme('file')) | ||
|
||
def test_can_unload_to_scheme_file_with_temp_bucket_true(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
F811 redefinition of unused 'test_can_unload_to_scheme_file_with_temp_bucket_true' from line 47
This allows Redshift to export data even when we don't have a scratch bucket to copy it to first.
The particular method is similar to the fix done for importing data (#114) - adding some predicates to the Source/Target and the underlying DBDriver interfaces so that the move() algorithm can figure out whether for a given URL scheme it can do a bulk export or whether the move() algorithm should do things via SELECT.