Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add create_bqstorage_client param to to_dataframe and to_arrow #9573

Merged
merged 6 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,19 @@ def dataset(self, dataset_id, project=None):

return DatasetReference(project, dataset_id)

def _create_bqstorage_client(self):
"""Create a BigQuery Storage API client using this client's credentials.

Returns:
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient:
plamut marked this conversation as resolved.
Show resolved Hide resolved
A BigQuery Storage API client.
"""
from google.cloud import bigquery_storage_v1beta1

return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=self._credentials
)

def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY):
"""API call: create the dataset via a POST request.

Expand Down
40 changes: 37 additions & 3 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3152,7 +3152,12 @@ def result(

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_arrow()
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
def to_arrow(
self,
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=False,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.

Expand Down Expand Up @@ -3185,6 +3190,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):

Reading from a specific partition or snapshot is not
currently supported by this method.
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.

This argument does nothing if ``bqstorage_client`` is supplied.

..versionadded:: 1.24.0

Returns:
pyarrow.Table
Expand All @@ -3199,12 +3214,20 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
..versionadded:: 1.17.0
"""
return self.result().to_arrow(
progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_dataframe()
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
create_bqstorage_client=False,
):
"""Return a pandas DataFrame from a QueryJob

Args:
Expand Down Expand Up @@ -3237,6 +3260,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
for details.

..versionadded:: 1.11.0
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.

This argument does nothing if ``bqstorage_client`` is supplied.

..versionadded:: 1.24.0

Returns:
A :class:`~pandas.DataFrame` populated with row data and column
Expand All @@ -3250,6 +3283,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
bqstorage_client=bqstorage_client,
dtypes=dtypes,
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
)

def __iter__(self):
Expand Down
136 changes: 102 additions & 34 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,12 @@ def _to_arrow_iterable(self, bqstorage_client=None):

# If changing the signature of this method, make sure to apply the same
# changes to job.QueryJob.to_arrow()
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
def to_arrow(
self,
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=False,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.

Expand Down Expand Up @@ -1489,6 +1494,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):

Reading from a specific partition or snapshot is not
currently supported by this method.
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.

This argument does nothing if ``bqstorage_client`` is supplied.

..versionadded:: 1.24.0

Returns:
pyarrow.Table
Expand All @@ -1504,22 +1519,33 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)

progress_bar = self._get_progress_bar(progress_bar_type)
owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

record_batches = []
for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client):
record_batches.append(record_batch)
try:
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(record_batch.num_rows)
record_batches = []
for record_batch in self._to_arrow_iterable(
bqstorage_client=bqstorage_client
):
record_batches.append(record_batch)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(record_batch.num_rows)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()

if record_batches:
return pyarrow.Table.from_batches(record_batches)
Expand Down Expand Up @@ -1558,14 +1584,20 @@ def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):

# If changing the signature of this method, make sure to apply the same
# changes to job.QueryJob.to_dataframe()
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
create_bqstorage_client=False,
):
"""Create a pandas DataFrame by loading all pages of a query.

Args:
bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient):
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.
from BigQuery.

This method requires the ``pyarrow`` and
``google-cloud-bigquery-storage`` libraries.
Expand Down Expand Up @@ -1602,6 +1634,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
progress bar as a graphical dialog box.

..versionadded:: 1.11.0
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.

This argument does nothing if ``bqstorage_client`` is supplied.

..versionadded:: 1.24.0

Returns:
pandas.DataFrame:
Expand All @@ -1621,32 +1663,44 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
if dtypes is None:
dtypes = {}

if bqstorage_client and self.max_results is not None:
if (
bqstorage_client or create_bqstorage_client
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
bqstorage_client = None

progress_bar = self._get_progress_bar(progress_bar_type)
owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

frames = []
for frame in self._to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
try:
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))
frames = []
for frame in self._to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()

# Avoid concatting an empty list.
if not frames:
Expand All @@ -1667,11 +1721,18 @@ class _EmptyRowIterator(object):
pages = ()
total_rows = 0

def to_arrow(self, progress_bar_type=None):
def to_arrow(
self,
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=False,
):
"""[Beta] Create an empty class:`pyarrow.Table`.

Args:
progress_bar_type (Optional[str]): Ignored. Added for compatibility with RowIterator.
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.

Returns:
pyarrow.Table: An empty :class:`pyarrow.Table`.
Expand All @@ -1680,13 +1741,20 @@ def to_arrow(self, progress_bar_type=None):
raise ValueError(_NO_PYARROW_ERROR)
return pyarrow.Table.from_arrays(())

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
create_bqstorage_client=False,
):
"""Create an empty dataframe.

Args:
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
dtypes (Any): Ignored. Added for compatibility with RowIterator.
progress_bar_type (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.

Returns:
pandas.DataFrame: An empty :class:`~pandas.DataFrame`.
Expand Down
33 changes: 33 additions & 0 deletions bigquery/samples/download_public_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_public_data(client):

# [START bigquery_pandas_public_data]
# TODO(developer): Import the client library.
# from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# TODO(developer): Set table_id to the fully-qualified table ID in standard
# SQL format, including the project ID and dataset ID.
table_id = "bigquery-public-data.usa_names.usa_1910_current"

# Use the BigQuery Storage API to speed-up downloads of large tables.
dataframe = client.list_rows(table_id).to_dataframe(create_bqstorage_client=True)

print(dataframe.info())
# [END bigquery_pandas_public_data]
34 changes: 34 additions & 0 deletions bigquery/samples/download_public_data_sandbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_public_data_sandbox(client):

# [START bigquery_pandas_public_data_sandbox]
# TODO(developer): Import the client library.
# from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# `SELECT *` is an anti-pattern in BigQuery because it is cheaper and
# faster to use the BigQuery Storage API directly, but BigQuery Sandbox
# users can only use the BigQuery Storage API to download query results.
query_string = "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_current`"

# Use the BigQuery Storage API to speed-up downloads of large tables.
dataframe = client.query(query_string).to_dataframe(create_bqstorage_client=True)

print(dataframe.info())
# [END bigquery_pandas_public_data_sandbox]
Loading