Skip to content

Commit

Permalink
[AIRFLOW-461] Support autodetected schemas in BigQuery run_load (#3880)
Browse files Browse the repository at this point in the history
  • Loading branch information
xnuinside authored and kaxil committed Dec 29, 2018
1 parent bc87ea6 commit ae4d7cb
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 14 deletions.
17 changes: 14 additions & 3 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ def run_copy(self,

def run_load(self,
destination_project_dataset_table,
schema_fields,
source_uris,
schema_fields=None,
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
Expand All @@ -864,7 +864,8 @@ def run_load(self,
schema_update_options=(),
src_fmt_configs=None,
time_partitioning=None,
cluster_fields=None):
cluster_fields=None,
autodetect=False):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
to BigQuery. See here:
Expand All @@ -882,7 +883,11 @@ def run_load(self,
:type destination_project_dataset_table: string
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
Required if autodetect=False; optional if autodetect=True.
:type schema_fields: list
:param autodetect: Attempt to autodetect the schema for CSV and JSON
source files.
:type autodetect: bool
:param source_uris: The source Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
per-object name can be used.
Expand Down Expand Up @@ -937,6 +942,11 @@ def run_load(self,
# if it's not, we raise a ValueError
# Refer to this link for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat

if schema_fields is None and not autodetect:
raise ValueError(
'You must either pass a schema or autodetect=True.')

if src_fmt_configs is None:
src_fmt_configs = {}

Expand Down Expand Up @@ -971,6 +981,7 @@ def run_load(self,

configuration = {
'load': {
'autodetect': autodetect,
'createDisposition': create_disposition,
'destinationTable': {
'projectId': destination_project,
Expand Down Expand Up @@ -1734,7 +1745,7 @@ def _split_tablename(table_input, default_project_id, var_name=None):

if '.' not in table_input:
raise ValueError(
'Expected deletion_dataset_table name in the format of '
'Expected target table name in the format of '
'<dataset>.<table>. Got: {}'.format(table_input))

if not default_project_id:
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def __init__(self,
project_id=None,
schema_fields=None,
gcs_schema_object=None,
time_partitioning={},
time_partitioning=None,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
Expand All @@ -323,7 +323,7 @@ def __init__(self,
self.bigquery_conn_id = bigquery_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to
self.time_partitioning = time_partitioning
self.time_partitioning = {} if time_partitioning is None else time_partitioning
self.labels = labels

def execute(self, context):
Expand Down
24 changes: 15 additions & 9 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(self,
external_table=False,
time_partitioning=None,
cluster_fields=None,
autodetect=False,
*args, **kwargs):

super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -190,20 +191,24 @@ def __init__(self,
self.src_fmt_configs = src_fmt_configs
self.time_partitioning = time_partitioning
self.cluster_fields = cluster_fields
self.autodetect = autodetect

def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

if not self.schema_fields and \
self.schema_object and \
self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
self.bucket,
self.schema_object).decode("utf-8"))
if not self.schema_fields:
if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
self.bucket,
self.schema_object).decode("utf-8"))
elif self.schema_object is None and self.autodetect is False:
raise ValueError('At least one of `schema_fields`, `schema_object`, '
'or `autodetect` must be passed.')

else:
schema_fields = self.schema_fields

Expand Down Expand Up @@ -234,6 +239,7 @@ def execute(self, context):
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
autodetect=self.autodetect,
create_disposition=self.create_disposition,
skip_leading_rows=self.skip_leading_rows,
write_disposition=self.write_disposition,
Expand Down
8 changes: 8 additions & 0 deletions tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,14 @@ def run_with_config(config):

mocked_rwc.assert_called_once()

@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
def test_run_with_auto_detect(self, run_with_config):
destination_project_dataset_table = "autodetect.table"
cursor = hook.BigQueryBaseCursor(mock.Mock(), "project_id")
cursor.run_load(destination_project_dataset_table, [], [], autodetect=True)
args, kwargs = run_with_config.call_args
self.assertIs(args[0]['load']['autodetect'], True)

@mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
@mock.patch("airflow.contrib.hooks.bigquery_hook.time")
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
Expand Down

0 comments on commit ae4d7cb

Please sign in to comment.