-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for BigQuery bulk export (to Avro, for now) #136
Merged
Merged
Changes from 6 commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
e095590
Add support for BigQuery bulk export (to Avro, for now)
vinceatbluelabs 0fa5321
Improve test coverage
vinceatbluelabs 28f48ab
Improve test coverage
vinceatbluelabs 5ebb19b
Improve test coverage
vinceatbluelabs de77caa
Improve test coverage
vinceatbluelabs 09ceea5
Improve test coverage
vinceatbluelabs 2a721af
Improve test coverage
vinceatbluelabs f9f88ad
Improve test coverage
vinceatbluelabs 8b10df9
Ratchet coverage
vinceatbluelabs 5c04074
Ratchet
vinceatbluelabs c00afd2
Flake8 fixes
vinceatbluelabs d5c8e84
Old Python fixes
vinceatbluelabs 35e7e29
Break up big file
vinceatbluelabs 99631b6
Fix known_supported_records_formats_for_load()
vinceatbluelabs ba7d15b
Try with config.use_avro_logical_types
vinceatbluelabs 2dcfda0
Set config.use_avro_logical_types on import as well
vinceatbluelabs 8898433
Deal with Avro limitations
vinceatbluelabs 16b041c
Improve coverage
vinceatbluelabs 059b2a7
Unratchet
vinceatbluelabs cba3e2e
Ratchet
vinceatbluelabs f69b823
Bump bigfiles
vinceatbluelabs 03f1e28
Drop unneeded import
vinceatbluelabs ebbf53f
Ratchet
vinceatbluelabs 0179a71
Reformat array literal
vinceatbluelabs 63114b5
Move schema adjustment logic
vinceatbluelabs 1568ada
Improve coverage
vinceatbluelabs 48a7fdb
Ratchet coverage
vinceatbluelabs e0aab23
Ratchet
vinceatbluelabs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import sqlalchemy | ||
import pprint | ||
vinceatbluelabs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from contextlib import contextmanager | ||
from typing import List, Iterator, Optional, Union, Tuple | ||
import logging | ||
from google.cloud.bigquery.dbapi.connection import Connection | ||
from google.cloud.bigquery.client import Client | ||
from google.cloud.bigquery.job import ExtractJobConfig | ||
from records_mover.db.unloader import Unloader | ||
from records_mover.records.records_format import BaseRecordsFormat, AvroRecordsFormat | ||
from records_mover.url.base import BaseDirectoryUrl | ||
from records_mover.url.resolver import UrlResolver | ||
from records_mover.records.unload_plan import RecordsUnloadPlan | ||
from records_mover.records.records_directory import RecordsDirectory | ||
from records_mover.db.errors import NoTemporaryBucketConfiguration | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class BigQueryUnloader(Unloader): | ||
def __init__(self, | ||
db: Union[sqlalchemy.engine.Connection, sqlalchemy.engine.Engine], | ||
url_resolver: UrlResolver, | ||
gcs_temp_base_loc: Optional[BaseDirectoryUrl])\ | ||
-> None: | ||
self.db = db | ||
self.url_resolver = url_resolver | ||
self.gcs_temp_base_loc = gcs_temp_base_loc | ||
super().__init__(db=db) | ||
|
||
def can_unload_format(self, target_records_format: BaseRecordsFormat) -> bool: | ||
if isinstance(target_records_format, AvroRecordsFormat): | ||
return True | ||
return False | ||
|
||
def can_unload_to_scheme(self, scheme: str) -> bool: | ||
return scheme == 'gs' | ||
|
||
def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]: | ||
return [AvroRecordsFormat()] | ||
|
||
@contextmanager | ||
def temporary_unloadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]: | ||
if self.gcs_temp_base_loc is None: | ||
raise NoTemporaryBucketConfiguration('Please provide a scratch GCS URL in your config ' | ||
'(e.g., set SCRATCH_GCS_URL to a gs:// URL)') | ||
else: | ||
with self.gcs_temp_base_loc.temporary_directory() as temp_loc: | ||
yield temp_loc | ||
|
||
def _parse_bigquery_schema_name(self, schema: str) -> Tuple[Optional[str], str]: | ||
# https://github.com/mxmzdlv/pybigquery/blob/master/pybigquery/sqlalchemy_bigquery.py#L320 | ||
dataset = None | ||
project = None | ||
|
||
schema_split = schema.split('.') | ||
if len(schema_split) == 1: | ||
dataset, = schema_split | ||
elif len(schema_split) == 2: | ||
project, dataset = schema_split | ||
else: | ||
raise ValueError(f"Could not understand schema name {schema}") | ||
|
||
return (project, dataset) | ||
|
||
def _extract_job_config(self, unload_plan: RecordsUnloadPlan) -> ExtractJobConfig: | ||
config = ExtractJobConfig() | ||
if isinstance(unload_plan.records_format, AvroRecordsFormat): | ||
config.destination_format = 'AVRO' | ||
else: | ||
raise NotImplementedError(f'Please add support for {unload_plan.records_format}') | ||
return config | ||
|
||
def unload(self, | ||
schema: str, | ||
table: str, | ||
unload_plan: RecordsUnloadPlan, | ||
directory: RecordsDirectory) -> Optional[int]: | ||
if directory.scheme != 'gs': | ||
with self.temporary_unloadable_directory_loc() as temp_gcs_loc: | ||
temp_directory = RecordsDirectory(temp_gcs_loc) | ||
out = self.unload(schema=schema, | ||
table=table, | ||
unload_plan=unload_plan, | ||
directory=temp_directory) | ||
temp_directory.copy_to(directory.loc) | ||
return out | ||
logger.info("Loading from records directory into BigQuery") | ||
# https://googleapis.github.io/google-cloud-python/latest/bigquery/usage/tables.html#creating-a-table | ||
connection: Connection =\ | ||
self.db.engine.raw_connection().connection | ||
# https://google-cloud.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.html | ||
client: Client = connection._client | ||
project_id, dataset_id = self._parse_bigquery_schema_name(schema) | ||
job_config = self._extract_job_config(unload_plan) | ||
|
||
records_format = unload_plan.records_format | ||
filename = records_format.generate_filename('output') | ||
destination_uri = directory.loc.file_in_this_directory(filename) | ||
job = client.extract_table(f"{schema}.{table}", | ||
destination_uri.url, | ||
# Must match the destination dataset location. | ||
job_config=job_config) | ||
job.result() # Waits for table load to complete. | ||
logger.info(f"Unloaded from {dataset_id}:{table} into {filename}") | ||
directory.save_preliminary_manifest() | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced this was ever needed - it's in the example code, but everything seems to run fine without it, so there must be some inference logic to figure out the dataset location based on the client object.