Skip to content

Commit

Permalink
ENH: Added configuration parameter to read_gbq
Browse files Browse the repository at this point in the history
Now more complicated queries could be processed.

Author: Dmitry L <[email protected]>
Author: necnec <[email protected]>
Author: Dmitry <[email protected]>

Closes #14742 from necnec/bigquery-udf-resources and squashes the following commits:

3a238a5 [necnec] config->configuration
82f4409 [necnec] Add some documentation & formatting
b97a1be [Dmitry L] formatting
e2f801f [Dmitry] hotfix
2e02d76 [Dmitry L] Merge remote-tracking branch 'pandas-dev/master' into bigquery-udf-resources
ec590af [Dmitry L] Throw exception if more than 1 job type in config
8720b03 [Dmitry L] Delete trailing whitespaces
df5dec6 [Dmitry L] configuration->config & formatting
99521aa [Dmitry L] Formatting, documentation, new unit test
0ac26a2 [Dmitry L] added pull request number in whitens
86ed96d [Dmitry L] Merge branch 'master' into bigquery-udf-resources
929ad1a [Dmitry L] formatting: delete whitespace
8a38650 [Dmitry L] Added example configuration & job_configuration refactoring
395c0e9 [Dmitry L] fix formatting
c21588a [Dmitry L] Merge remote-tracking branch 'pandas-dev/master' into bigquery-udf-resources
8fe77b2 [necnec] Merge branch 'bigquery-udf-resources'
146f0f3 [necnec] Merge branch 'master' into bigquery-udf-resources
ce8ebe4 [necnec] Merge branch 'bigquery-udf-resources'
028c8be [necnec] Solve formating problems
c199935 [Dmitry L] Make query configuration more general
0b365da [Dmitry L] delete newlines
a952710 [Dmitry L] Move whatsnew BQ Enhancements -> Enhancements
b849300 [Dmitry L] Change whatsnew 0.19.2 -> 0.20.0
640be7a [Dmitry L] Change whatnew 0.19.0->0.19.2
834a2ff [necnec] Merge branch 'bigquery-udf-resources'
d69ed7f [Dmitry L] check tests
94fa514 [necnec] test formatting
ddb4fd1 [necnec] Merge branch 'bigquery-udf-resources'
ad35a43 [necnec] fix whatsnew text
a96811d [necnec] add unit tests read_gbq: query parameters, cache
c66169d [necnec] add read_gbq tests: query parameters and cache
42dc9e6 [necnec] Merge remote-tracking branch 'origin/bigquery-udf-resources'
f9fae0c [necnec] Fix formatting
9a16a8c [necnec] Merge branch 'bigquery-udf-resources'
dad9288 [necnec] Change parameter to kwargs
55bf05c [Dmitry L] Added udf_resource_uri parameter to read_gbq
  • Loading branch information
necnec authored and jreback committed Jan 3, 2017
1 parent 74e20a0 commit ff3c464
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 12 deletions.
16 changes: 16 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4652,6 +4652,22 @@ destination DataFrame as well as a preferred column order as follows:
index_col='index_column_name',
col_order=['col1', 'col2', 'col3'], projectid)
Starting with 0.20.0, you can specify the query config as parameter to use additional options of your job.
For more information about query configuration parameters see
`here <https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`__.

.. code-block:: python
configuration = {
'query': {
"useQueryCache": False
}
}
data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table',
configuration=configuration, projectid)
.. note::

You can find your project id in the `Google developers console <https://console.developers.google.com>`__.
Expand Down
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v0.20.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ Other enhancements
- ``pd.read_excel`` now preserves sheet order when using ``sheetname=None`` (:issue:`9930`)
- Multiple offset aliases with decimal points are now supported (e.g. '0.5min' is parsed as '30s') (:issue:`8419`)

- ``pd.read_gbq`` method now allows query configuration preferences (:issue:`14742`)

- New ``UnsortedIndexError`` (subclass of ``KeyError``) raised when indexing/slicing into an
unsorted MultiIndex (:issue:`11897`). This allows differentiation between errors due to lack
of sorting or an incorrect key. See :ref:`here <advanced.unsorted>`
Expand Down
53 changes: 41 additions & 12 deletions pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def process_insert_errors(self, insert_errors):

raise StreamingInsertError

def run_query(self, query):
def run_query(self, query, **kwargs):
try:
from googleapiclient.errors import HttpError
except:
Expand All @@ -385,16 +385,33 @@ def run_query(self, query):
_check_google_client_version()

job_collection = self.service.jobs()
job_data = {
'configuration': {
'query': {
'query': query,
'useLegacySql': self.dialect == 'legacy'
# 'allowLargeResults', 'createDisposition',
# 'preserveNulls', destinationTable, useQueryCache
}

job_config = {
'query': {
'query': query,
'useLegacySql': self.dialect == 'legacy'
# 'allowLargeResults', 'createDisposition',
# 'preserveNulls', destinationTable, useQueryCache
}
}
config = kwargs.get('configuration')
if config is not None:
if len(config) != 1:
raise ValueError("Only one job type must be specified, but "
"given {}".format(','.join(config.keys())))
if 'query' in config:
if 'query' in config['query'] and query is not None:
raise ValueError("Query statement can't be specified "
"inside config while it is specified "
"as parameter")

job_config['query'].update(config['query'])
else:
raise ValueError("Only 'query' job type is supported")

job_data = {
'configuration': job_config
}

self._start_timer()
try:
Expand Down Expand Up @@ -622,8 +639,9 @@ def _parse_entry(field_value, field_type):


def read_gbq(query, project_id=None, index_col=None, col_order=None,
reauth=False, verbose=True, private_key=None, dialect='legacy'):
"""Load data from Google BigQuery.
reauth=False, verbose=True, private_key=None, dialect='legacy',
**kwargs):
r"""Load data from Google BigQuery.
THIS IS AN EXPERIMENTAL LIBRARY
Expand Down Expand Up @@ -682,6 +700,17 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
.. versionadded:: 0.19.0
**kwargs : Arbitrary keyword arguments
configuration (dict): query config parameters for job processing.
For example:
configuration = {'query': {'useQueryCache': False}}
For more information see `BigQuery SQL Reference
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`
.. versionadded:: 0.20.0
Returns
-------
df: DataFrame
Expand All @@ -698,7 +727,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
connector = GbqConnector(project_id, reauth=reauth, verbose=verbose,
private_key=private_key,
dialect=dialect)
schema, pages = connector.run_query(query)
schema, pages = connector.run_query(query, **kwargs)
dataframe_list = []
while len(pages) > 0:
page = pages.pop()
Expand Down
85 changes: 85 additions & 0 deletions pandas/io/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,91 @@ def test_invalid_option_for_sql_dialect(self):
gbq.read_gbq(sql_statement, project_id=_get_project_id(),
dialect='standard', private_key=_get_private_key_path())

def test_query_with_parameters(self):
sql_statement = "SELECT @param1 + @param2 as VALID_RESULT"
config = {
'query': {
"useLegacySql": False,
"parameterMode": "named",
"queryParameters": [
{
"name": "param1",
"parameterType": {
"type": "INTEGER"
},
"parameterValue": {
"value": 1
}
},
{
"name": "param2",
"parameterType": {
"type": "INTEGER"
},
"parameterValue": {
"value": 2
}
}
]
}
}
# Test that a query that relies on parameters fails
# when parameters are not supplied via configuration
with tm.assertRaises(ValueError):
gbq.read_gbq(sql_statement, project_id=_get_project_id(),
private_key=_get_private_key_path())

# Test that the query is successful because we have supplied
# the correct query parameters via the 'config' option
df = gbq.read_gbq(sql_statement, project_id=_get_project_id(),
private_key=_get_private_key_path(),
configuration=config)
tm.assert_frame_equal(df, DataFrame({'VALID_RESULT': [3]}))

def test_query_inside_configuration(self):
query_no_use = 'SELECT "PI_WRONG" as VALID_STRING'
query = 'SELECT "PI" as VALID_STRING'
config = {
'query': {
"query": query,
"useQueryCache": False,
}
}
# Test that it can't pass query both
# inside config and as parameter
with tm.assertRaises(ValueError):
gbq.read_gbq(query_no_use, project_id=_get_project_id(),
private_key=_get_private_key_path(),
configuration=config)

df = gbq.read_gbq(None, project_id=_get_project_id(),
private_key=_get_private_key_path(),
configuration=config)
tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']}))

def test_configuration_without_query(self):
sql_statement = 'SELECT 1'
config = {
'copy': {
"sourceTable": {
"projectId": _get_project_id(),
"datasetId": "publicdata:samples",
"tableId": "wikipedia"
},
"destinationTable": {
"projectId": _get_project_id(),
"datasetId": "publicdata:samples",
"tableId": "wikipedia_copied"
},
}
}
# Test that only 'query' configurations are supported
# nor 'copy','load','extract'
with tm.assertRaises(ValueError):
gbq.read_gbq(sql_statement, project_id=_get_project_id(),
private_key=_get_private_key_path(),
configuration=config)


class TestToGBQIntegration(tm.TestCase):
# Changes to BigQuery table schema may take up to 2 minutes as of May 2015
Expand Down

0 comments on commit ff3c464

Please sign in to comment.