-
-
Notifications
You must be signed in to change notification settings - Fork 18.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added query_config parameter to read_gbq #14742
Changes from 29 commits
55bf05c
dad9288
9a16a8c
f9fae0c
42dc9e6
c66169d
a96811d
ad35a43
ddb4fd1
94fa514
d69ed7f
834a2ff
640be7a
b849300
a952710
0b365da
c199935
028c8be
ce8ebe4
146f0f3
8fe77b2
c21588a
395c0e9
8a38650
929ad1a
86ed96d
0ac26a2
99521aa
df5dec6
8720b03
ec590af
2e02d76
e2f801f
b97a1be
82f4409
3a238a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4562,6 +4562,20 @@ destination DataFrame as well as a preferred column order as follows: | |
index_col='index_column_name', | ||
col_order=['col1', 'col2', 'col3'], projectid) | ||
|
||
|
||
You can specify the query config as parameter | ||
|
||
.. code-block:: python | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. say why this is useful as well. If you have a doc-link to things that you might want to pass here, pls add it. |
||
|
||
config = { | ||
'query': { | ||
"useQueryCache": False | ||
} | ||
} | ||
data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', | ||
config=config, projectid) | ||
|
||
|
||
.. note:: | ||
|
||
You can find your project id in the `Google developers console <https://console.developers.google.com>`__. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,8 @@ Other enhancements | |
|
||
- ``pd.read_excel`` now preserves sheet order when using ``sheetname=None`` (:issue:`9930`) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a mini section to the docs and put a pointer here (or is the doc-string enough)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add this PR number as the issue number |
||
|
||
- ``pd.read_gbq`` method now allows query configuration preferences (:issue:`14742`) | ||
|
||
- New ``UnsortedIndexError`` (subclass of ``KeyError``) raised when indexing/slicing into an | ||
unsorted MultiIndex (:issue:`11897`). This allows differentiation between errors due to lack | ||
of sorting or an incorrect key. See :ref:`here <advanced.unsorted>` | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -375,7 +375,7 @@ def process_insert_errors(self, insert_errors): | |
|
||
raise StreamingInsertError | ||
|
||
def run_query(self, query): | ||
def run_query(self, query, **kwargs): | ||
try: | ||
from googleapiclient.errors import HttpError | ||
except: | ||
|
@@ -385,16 +385,30 @@ def run_query(self, query): | |
_check_google_client_version() | ||
|
||
job_collection = self.service.jobs() | ||
job_data = { | ||
'configuration': { | ||
'query': { | ||
'query': query, | ||
'useLegacySql': self.dialect == 'legacy' | ||
# 'allowLargeResults', 'createDisposition', | ||
# 'preserveNulls', destinationTable, useQueryCache | ||
} | ||
|
||
job_config = { | ||
'query': { | ||
'query': query, | ||
'useLegacySql': self.dialect == 'legacy' | ||
# 'allowLargeResults', 'createDisposition', | ||
# 'preserveNulls', destinationTable, useQueryCache | ||
} | ||
} | ||
config = kwargs.get('config') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a comment on what you are doing here (and why) |
||
if config is not None: | ||
if 'query' in config: | ||
if 'query' in config['query'] and query is not None: | ||
raise ValueError("Query statement can't be specified " | ||
"inside config while it is specified " | ||
"as parameter") | ||
|
||
job_config['query'].update(config['query']) | ||
else: | ||
raise ValueError("Only 'query' job type is supported") | ||
|
||
job_data = { | ||
'configuration': job_config | ||
} | ||
|
||
self._start_timer() | ||
try: | ||
|
@@ -622,8 +636,9 @@ def _parse_entry(field_value, field_type): | |
|
||
|
||
def read_gbq(query, project_id=None, index_col=None, col_order=None, | ||
reauth=False, verbose=True, private_key=None, dialect='legacy'): | ||
"""Load data from Google BigQuery. | ||
reauth=False, verbose=True, private_key=None, dialect='legacy', | ||
**kwargs): | ||
r"""Load data from Google BigQuery. | ||
|
||
THIS IS AN EXPERIMENTAL LIBRARY | ||
|
||
|
@@ -682,6 +697,17 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, | |
|
||
.. versionadded:: 0.19.0 | ||
|
||
**kwargs : Arbitrary keyword arguments | ||
config (dict): query config parameters for job processing. | ||
For example: | ||
|
||
config = {'query': {'useQueryCache': False}} | ||
|
||
For more information see `BigQuery SQL Reference | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes this is a good reference, add this above where I indicated |
||
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no indentation relative to "For more ...) is needed here (otherwise possibly will give errors when building the docs) |
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you put the mini example here |
||
.. versionadded:: 0.20.0 | ||
|
||
Returns | ||
------- | ||
df: DataFrame | ||
|
@@ -698,7 +724,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, | |
connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, | ||
private_key=private_key, | ||
dialect=dialect) | ||
schema, pages = connector.run_query(query) | ||
schema, pages = connector.run_query(query, **kwargs) | ||
dataframe_list = [] | ||
while len(pages) > 0: | ||
page = pages.pop() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -711,6 +711,91 @@ def test_invalid_option_for_sql_dialect(self): | |
gbq.read_gbq(sql_statement, project_id=_get_project_id(), | ||
dialect='standard', private_key=_get_private_key_path()) | ||
|
||
def test_query_with_parameters(self): | ||
sql_statement = "SELECT @param1 + @param2 as VALID_RESULT" | ||
config = { | ||
'query': { | ||
"useLegacySql": False, | ||
"parameterMode": "named", | ||
"queryParameters": [ | ||
{ | ||
"name": "param1", | ||
"parameterType": { | ||
"type": "INTEGER" | ||
}, | ||
"parameterValue": { | ||
"value": 1 | ||
} | ||
}, | ||
{ | ||
"name": "param2", | ||
"parameterType": { | ||
"type": "INTEGER" | ||
}, | ||
"parameterValue": { | ||
"value": 2 | ||
} | ||
} | ||
] | ||
} | ||
} | ||
# Test that a query that relies on parameters fails | ||
# when parameters are not supplied via configuration | ||
with tm.assertRaises(ValueError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this necessary? I thought There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jreback Yes, configuration is optional. But this unit test is very special. It processes query with parameters. And in this case you must pass parameters values in configuration. I've made 2 unit tests. So if you think this test if very special I can remove that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its fine to test. is it seems that this tests means its required somehow though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jreback so I don't need to change anything here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we update the comment to better explain why we expect a failure here? For example, |
||
gbq.read_gbq(sql_statement, project_id=_get_project_id(), | ||
private_key=_get_private_key_path()) | ||
|
||
# Test that the query is successful because we have supplied | ||
# the correct query parameters via the 'config' option | ||
df = gbq.read_gbq(sql_statement, project_id=_get_project_id(), | ||
private_key=_get_private_key_path(), | ||
config=config) | ||
tm.assert_frame_equal(df, DataFrame({'VALID_RESULT': [3]})) | ||
|
||
def test_query_inside_configuration(self): | ||
query_no_use = 'SELECT "PI_WRONG" as VALID_STRING' | ||
query = 'SELECT "PI" as VALID_STRING' | ||
config = { | ||
'query': { | ||
"query": query, | ||
"useQueryCache": False, | ||
} | ||
} | ||
# Test that it can't pass query both | ||
# inside config and as parameter | ||
with tm.assertRaises(ValueError): | ||
gbq.read_gbq(query_no_use, project_id=_get_project_id(), | ||
private_key=_get_private_key_path(), | ||
config=config) | ||
|
||
df = gbq.read_gbq(None, project_id=_get_project_id(), | ||
private_key=_get_private_key_path(), | ||
config=config) | ||
tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']})) | ||
|
||
def test_configuration_without_query(self): | ||
sql_statement = 'SELECT 1' | ||
config = { | ||
'copy': { | ||
"sourceTable": { | ||
"projectId": _get_project_id(), | ||
"datasetId": "publicdata:samples", | ||
"tableId": "wikipedia" | ||
}, | ||
"destinationTable": { | ||
"projectId": _get_project_id(), | ||
"datasetId": "publicdata:samples", | ||
"tableId": "wikipedia_copied" | ||
}, | ||
} | ||
} | ||
# Test that only 'query' configurations are supported | ||
# nor 'copy','load','extract' | ||
with tm.assertRaises(ValueError): | ||
gbq.read_gbq(sql_statement, project_id=_get_project_id(), | ||
private_key=_get_private_key_path(), | ||
config=config) | ||
|
||
|
||
class TestToGBQIntegration(tm.TestCase): | ||
# Changes to BigQuery table schema may take up to 2 minutes as of May 2015 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add starting in 0.20.0 (or you can add a versionadded tag)