Skip to content

Commit

Permalink
Added support for schema auto-detection feature in `LoadTableFromStor…
Browse files Browse the repository at this point in the history
…ageJob` (#3648)
  • Loading branch information
WillianFuks authored and dhermes committed Jul 28, 2017
1 parent 597657e commit b94a326
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 14 deletions.
45 changes: 38 additions & 7 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ def _error_result_to_exception(error_result):
status_code, error_result.get('message', ''), errors=[error_result])


class AutoDetectSchema(_TypedProperty):
"""Typed Property for ``autodetect`` properties.
:raises ValueError: on ``set`` operation if ``instance.schema``
is already defined.
"""
def __set__(self, instance, value):
self._validate(value)
if instance.schema:
raise ValueError('A schema should not be already defined '
'when using schema auto-detection')
setattr(instance._configuration, self._backing_name, value)


class Compression(_EnumProperty):
"""Pseudo-enum for ``compression`` properties."""
GZIP = 'GZIP'
Expand Down Expand Up @@ -505,6 +519,7 @@ class _LoadConfiguration(object):
"""
_allow_jagged_rows = None
_allow_quoted_newlines = None
_autodetect = None
_create_disposition = None
_encoding = None
_field_delimiter = None
Expand Down Expand Up @@ -544,9 +559,10 @@ def __init__(self, name, destination, source_uris, client, schema=()):
super(LoadTableFromStorageJob, self).__init__(name, client)
self.destination = destination
self.source_uris = source_uris
# Let the @property do validation.
self.schema = schema
self._configuration = _LoadConfiguration()
# Let the @property do validation. This must occur after all other
# attributes have been set.
self.schema = schema

@property
def schema(self):
Expand All @@ -564,12 +580,20 @@ def schema(self, value):
:type value: list of :class:`SchemaField`
:param value: fields describing the schema
:raises: TypeError if 'value' is not a sequence, or ValueError if
any item in the sequence is not a SchemaField
:raises TypeError: If ``value`is not a sequence.
:raises ValueError: If any item in the sequence is not
a ``SchemaField``.
"""
if not all(isinstance(field, SchemaField) for field in value):
raise ValueError('Schema items must be fields')
self._schema = tuple(value)
if not value:
self._schema = ()
else:
if not all(isinstance(field, SchemaField) for field in value):
raise ValueError('Schema items must be fields')
if self.autodetect:
raise ValueError(
'Schema can not be set if `autodetect` property is True')

self._schema = tuple(value)

@property
def input_file_bytes(self):
Expand Down Expand Up @@ -625,6 +649,11 @@ def output_rows(self):
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowQuotedNewlines
"""

autodetect = AutoDetectSchema('autodetect', bool)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect
"""

create_disposition = CreateDisposition('create_disposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition
Expand Down Expand Up @@ -676,6 +705,8 @@ def _populate_config_resource(self, configuration):
configuration['allowJaggedRows'] = self.allow_jagged_rows
if self.allow_quoted_newlines is not None:
configuration['allowQuotedNewlines'] = self.allow_quoted_newlines
if self.autodetect is not None:
configuration['autodetect'] = self.autodetect
if self.create_disposition is not None:
configuration['createDisposition'] = self.create_disposition
if self.encoding is not None:
Expand Down
81 changes: 74 additions & 7 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import base64
import csv
import datetime
import json
import operator
Expand All @@ -21,6 +22,8 @@
import unittest
import uuid

import six

from google.cloud import bigquery
from google.cloud._helpers import UTC
from google.cloud.bigquery import dbapi
Expand Down Expand Up @@ -290,8 +293,6 @@ def test_update_table(self):

@staticmethod
def _fetch_single_page(table):
import six

iterator = table.fetch_data()
page = six.next(iterator.pages)
return list(page)
Expand Down Expand Up @@ -341,7 +342,6 @@ def test_insert_data_then_dump_table(self):
sorted(ROWS, key=by_age))

def test_load_table_from_local_file_then_dump_table(self):
import csv
from google.cloud._testing import _NamedTemporaryFile

ROWS = [
Expand Down Expand Up @@ -432,7 +432,6 @@ def test_load_table_from_local_avro_file_then_dump_table(self):
sorted(ROWS, key=by_wavelength))

def test_load_table_from_storage_then_dump_table(self):
import csv
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.storage import Client as StorageClient

Expand All @@ -448,11 +447,11 @@ def test_load_table_from_storage_then_dump_table(self):
]
TABLE_NAME = 'test_table'

s_client = StorageClient()
storage_client = StorageClient()

# In the **very** rare case the bucket name is reserved, this
# fails with a ConnectionError.
bucket = s_client.create_bucket(BUCKET_NAME)
bucket = storage_client.create_bucket(BUCKET_NAME)
self.to_delete.append(bucket)

blob = bucket.blob(BLOB_NAME)
Expand Down Expand Up @@ -501,6 +500,75 @@ def test_load_table_from_storage_then_dump_table(self):
self.assertEqual(sorted(rows, key=by_age),
sorted(ROWS, key=by_age))

def test_load_table_from_storage_w_autodetect_schema(self):
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.storage import Client as StorageClient
from google.cloud.bigquery import SchemaField

local_id = unique_resource_id()
bucket_name = 'bq_load_test' + local_id
blob_name = 'person_ages.csv'
gs_url = 'gs://{}/{}'.format(bucket_name, blob_name)
rows = [
('Phred Phlyntstone', 32),
('Bharney Rhubble', 33),
('Wylma Phlyntstone', 29),
('Bhettye Rhubble', 27),
] * 100 # BigQuery internally uses the first 100 rows to detect schema
table_name = 'test_table'

storage_client = StorageClient()

# In the **very** rare case the bucket name is reserved, this
# fails with a ConnectionError.
bucket = storage_client.create_bucket(bucket_name)
self.to_delete.append(bucket)

blob = bucket.blob(blob_name)

with _NamedTemporaryFile() as temp:
with open(temp.name, 'w') as csv_write:
writer = csv.writer(csv_write)
writer.writerow(('Full Name', 'Age'))
writer.writerows(rows)

with open(temp.name, 'rb') as csv_read:
blob.upload_from_file(csv_read, content_type='text/csv')

self.to_delete.insert(0, blob)

dataset = Config.CLIENT.dataset(
_make_dataset_name('load_gcs_then_dump'))

retry_403(dataset.create)()
self.to_delete.append(dataset)

table = dataset.table(table_name)
self.to_delete.insert(0, table)

job = Config.CLIENT.load_table_from_storage(
'bq_load_storage_test_' + local_id, table, gs_url)
job.autodetect = True

job.begin()

# Allow for 90 seconds of "warm up" before rows visible. See
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
retry = RetryInstanceState(_job_done, max_tries=8)
retry(job.reload)()

table.reload()
field_name = SchemaField(
u'Full_Name', u'string', u'NULLABLE', None, ())
field_age = SchemaField(u'Age', u'integer', u'NULLABLE', None, ())
self.assertEqual(table.schema, [field_name, field_age])

actual_rows = self._fetch_single_page(table)
by_age = operator.itemgetter(1)
self.assertEqual(
sorted(actual_rows, key=by_age), sorted(rows, key=by_age))

def test_job_cancel(self):
DATASET_NAME = _make_dataset_name('job_cancel')
JOB_NAME = 'fetch_' + DATASET_NAME
Expand Down Expand Up @@ -674,7 +742,6 @@ def test_dbapi_w_standard_sql_types(self):
self.assertIsNone(row)

def _load_table_for_dml(self, rows, dataset_name, table_name):
import csv
from google.cloud._testing import _NamedTemporaryFile

dataset = Config.CLIENT.dataset(dataset_name)
Expand Down
82 changes: 82 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ def _verifyBooleanConfigProperties(self, job, config):
config['allowQuotedNewlines'])
else:
self.assertIsNone(job.allow_quoted_newlines)
if 'autodetect' in config:
self.assertEqual(
job.autodetect, config['autodetect'])
else:
self.assertIsNone(job.autodetect)
if 'ignoreUnknownValues' in config:
self.assertEqual(job.ignore_unknown_values,
config['ignoreUnknownValues'])
Expand Down Expand Up @@ -277,6 +282,7 @@ def test_ctor(self):
# set/read from resource['configuration']['load']
self.assertIsNone(job.allow_jagged_rows)
self.assertIsNone(job.allow_quoted_newlines)
self.assertIsNone(job.autodetect)
self.assertIsNone(job.create_disposition)
self.assertIsNone(job.encoding)
self.assertIsNone(job.field_delimiter)
Expand Down Expand Up @@ -326,6 +332,41 @@ def test_schema_setter(self):
job.schema = [full_name, age]
self.assertEqual(job.schema, [full_name, age])

def test_schema_setter_w_autodetect(self):
from google.cloud.bigquery.schema import SchemaField

client = _Client(self.PROJECT)
table = _Table()
full_name = SchemaField('full_name', 'STRING')
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
job.autodetect = False
job.schema = [full_name]
self.assertEqual(job.schema, [full_name])

job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
job.autodetect = True
with self.assertRaises(ValueError):
job.schema = [full_name]

def test_autodetect_setter_w_schema(self):
from google.cloud.bigquery.schema import SchemaField

client = _Client(self.PROJECT)
table = _Table()
full_name = SchemaField('full_name', 'STRING')
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)

job.autodetect = True
job.schema = []
self.assertEqual(job.schema, [])

job.autodetect = False
job.schema = [full_name]
self.assertEqual(job.autodetect, False)

with self.assertRaises(ValueError):
job.autodetect = True

def test_props_set_by_server(self):
import datetime
from google.cloud._helpers import UTC
Expand Down Expand Up @@ -491,6 +532,47 @@ def test_begin_w_bound_client(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_autodetect(self):
path = '/projects/{}/jobs'.format(self.PROJECT)
resource = self._makeResource()
resource['configuration']['load']['autodetect'] = True
# Ensure None for missing server-set props
del resource['statistics']['creationTime']
del resource['etag']
del resource['selfLink']
del resource['user_email']
conn = _Connection(resource)
client = _Client(project=self.PROJECT, connection=conn)
table = _Table()
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
job.autodetect = True
job.begin()

sent = {
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_NAME,
},
'configuration': {
'load': {
'sourceUris': [self.SOURCE1],
'destinationTable': {
'projectId': self.PROJECT,
'datasetId': self.DS_NAME,
'tableId': self.TABLE_NAME,
},
'autodetect': True
},
},
}
expected_request = {
'method': 'POST',
'path': path,
'data': sent,
}
self.assertEqual(conn._requested, [expected_request])
self._verifyResourceProperties(job, resource)

def test_begin_w_alternate_client(self):
from google.cloud.bigquery.schema import SchemaField

Expand Down

0 comments on commit b94a326

Please sign in to comment.