diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 6a1b2dab910f..5261c2b99efd 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -14,8 +14,22 @@ """Shared helper functions for connecting BigQuery and pandas.""" +import collections +import concurrent.futures import warnings +from six.moves import queue + +try: + from google.cloud import bigquery_storage_v1beta1 +except ImportError: # pragma: NO COVER + bigquery_storage_v1beta1 = None + +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None + try: import pyarrow import pyarrow.parquet @@ -25,7 +39,23 @@ from google.cloud.bigquery import schema +_NO_BQSTORAGE_ERROR = ( + "The google-cloud-bigquery-storage library is not installed, " + "please install google-cloud-bigquery-storage to use bqstorage features." +) + STRUCT_TYPES = ("RECORD", "STRUCT") +_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. + + +class _DownloadState(object): + """Flag to indicate that a thread should exit early.""" + + def __init__(self): + # No need for a lock because reading/replacing a variable is defined to + # be an atomic operation in the Python language definition (enforced by + # the global interpreter lock). + self.done = False def pyarrow_datetime(): @@ -123,7 +153,7 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) -def to_arrow(dataframe, bq_schema): +def dataframe_to_arrow(dataframe, bq_schema): """Convert pandas dataframe to Arrow table, using BigQuery schema. Args: @@ -158,7 +188,7 @@ def to_arrow(dataframe, bq_schema): return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) -def to_parquet(dataframe, bq_schema, filepath): +def dataframe_to_parquet(dataframe, bq_schema, filepath): """Write dataframe as a Parquet file, according to the desired BQ schema. This function requires the :mod:`pyarrow` package. Arrow is used as an @@ -176,5 +206,165 @@ def to_parquet(dataframe, bq_schema, filepath): if pyarrow is None: raise ValueError("pyarrow is required for BigQuery schema conversion.") - arrow_table = to_arrow(dataframe, bq_schema) + arrow_table = dataframe_to_arrow(dataframe, bq_schema) pyarrow.parquet.write_table(arrow_table, filepath) + + +def _tabledata_list_page_to_dataframe(page, column_names, dtypes): + columns = collections.defaultdict(list) + for row in page: + for column in column_names: + columns[column].append(row[column]) + for column in dtypes: + columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) + return pandas.DataFrame(columns, columns=column_names) + + +def download_dataframe_tabledata_list(pages, schema, dtypes): + """Use (slower, but free) tabledata.list to construct a DataFrame.""" + column_names = [field.name for field in schema] + for page in pages: + yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) + + +def _download_dataframe_bqstorage_stream( + download_state, + bqstorage_client, + column_names, + dtypes, + session, + stream, + worker_queue, +): + position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) + rowstream = bqstorage_client.read_rows(position).rows(session) + + for page in rowstream.pages: + if download_state.done: + return + # page.to_dataframe() does not preserve column order in some versions + # of google-cloud-bigquery-storage. Access by column name to rearrange. + frame = page.to_dataframe(dtypes=dtypes)[column_names] + worker_queue.put(frame) + + +def _nowait(futures): + """Separate finished and unfinished threads, much like + :func:`concurrent.futures.wait`, but don't wait. + """ + done = [] + not_done = [] + for future in futures: + if future.done(): + done.append(future) + else: + not_done.append(future) + return done, not_done + + +def download_dataframe_bqstorage( + project_id, + table, + bqstorage_client, + column_names, + dtypes, + preserve_order=False, + selected_fields=None, +): + """Use (faster, but billable) BQ Storage API to construct DataFrame.""" + if "$" in table.table_id: + raise ValueError( + "Reading from a specific partition is not currently supported." + ) + if "@" in table.table_id: + raise ValueError("Reading from a specific snapshot is not currently supported.") + + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + if selected_fields is not None: + for field in selected_fields: + read_options.selected_fields.append(field.name) + + requested_streams = 0 + if preserve_order: + requested_streams = 1 + + session = bqstorage_client.create_read_session( + table.to_bqstorage(), + "projects/{}".format(project_id), + read_options=read_options, + requested_streams=requested_streams, + ) + + # Avoid reading rows from an empty table. pandas.concat will fail on an + # empty list. + if not session.streams: + yield pandas.DataFrame(columns=column_names) + return + + total_streams = len(session.streams) + + # Use _DownloadState to notify worker threads when to quit. + # See: https://stackoverflow.com/a/29237343/101923 + download_state = _DownloadState() + + # Create a queue to collect frames as they are created in each thread. + worker_queue = queue.Queue() + + with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: + try: + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 + not_done = [ + pool.submit( + _download_dataframe_bqstorage_stream, + download_state, + bqstorage_client, + column_names, + dtypes, + session, + stream, + worker_queue, + ) + for stream in session.streams + ] + + while not_done: + # Don't block on the worker threads. For performance reasons, + # we want to block on the queue's get method, instead. This + # prevents the queue from filling up, because the main thread + # has smaller gaps in time between calls to the queue's get + # method. For a detailed explaination, see: + # https://friendliness.dev/2019/06/18/python-nowait/ + done, not_done = _nowait(not_done) + for future in done: + # Call result() on any finished threads to raise any + # exceptions encountered. + future.result() + + try: + frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) + yield frame + except queue.Empty: # pragma: NO COVER + continue + + # Return any remaining values after the workers finished. + while not worker_queue.empty(): + try: + # Include a timeout because even though the queue is + # non-empty, it doesn't guarantee that a subsequent call to + # get() will not block. + frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) + yield frame + except queue.Empty: # pragma: NO COVER + continue + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + download_state.done = True + + # Shutdown all background threads, now that they should know to + # exit early. + pool.shutdown(wait=True) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 9fd4c5368efa..65d6915c7ea2 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1310,7 +1310,9 @@ def load_table_from_dataframe( try: if pyarrow and job_config.schema: - _pandas_helpers.to_parquet(dataframe, job_config.schema, tmppath) + _pandas_helpers.dataframe_to_parquet( + dataframe, job_config.schema, tmppath + ) else: if job_config.schema: warnings.warn( diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 1f9bb5eee3d4..7af3bc6f48b4 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -16,18 +16,12 @@ from __future__ import absolute_import -import collections -import concurrent.futures import copy import datetime -import json import operator -import threading -import time import warnings import six -from six.moves import queue try: from google.cloud import bigquery_storage_v1beta1 @@ -49,6 +43,7 @@ import google.cloud._helpers from google.cloud.bigquery import _helpers +from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.schema import _build_schema_resource from google.cloud.bigquery.schema import _parse_schema_resource @@ -68,13 +63,6 @@ "library. Please install tqdm to use the progress bar functionality." ) _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' -_MARKER = object() -_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds. - -# Send multiple updates from the worker threads, so there are at least a few -# waiting next time the prgrogess bar is updated. -_PROGRESS_UPDATES_PER_INTERVAL = 3 -_PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL def _reference_getter(table): @@ -1371,7 +1359,8 @@ def _get_next_page_response(self): @property def schema(self): - """List[google.cloud.bigquery.schema.SchemaField]: Table's schema.""" + """List[google.cloud.bigquery.schema.SchemaField]: The subset of + columns to be read from the table.""" return list(self._schema) @property @@ -1379,214 +1368,6 @@ def total_rows(self): """int: The total number of rows in the table.""" return self._total_rows - def _to_dataframe_dtypes(self, page, column_names, dtypes): - columns = collections.defaultdict(list) - for row in page: - for column in column_names: - columns[column].append(row[column]) - for column in dtypes: - columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) - return pandas.DataFrame(columns, columns=column_names) - - def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): - """Use (slower, but free) tabledata.list to construct a DataFrame.""" - column_names = [field.name for field in self.schema] - frames = [] - - for page in iter(self.pages): - current_frame = self._to_dataframe_dtypes(page, column_names, dtypes) - frames.append(current_frame) - - if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(len(current_frame)) - - if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() - - return pandas.concat(frames, ignore_index=True) - - def _to_dataframe_bqstorage_stream( - self, bqstorage_client, dtypes, columns, session, stream, worker_queue - ): - position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - rowstream = bqstorage_client.read_rows(position).rows(session) - - frames = [] - for page in rowstream.pages: - if self._to_dataframe_finished: - return - frames.append(page.to_dataframe(dtypes=dtypes)) - - try: - worker_queue.put_nowait(page.num_items) - except queue.Full: - # It's okay if we miss a few progress updates. Don't slow - # down parsing for that. - pass - - # Avoid errors on unlucky streams with no blocks. pandas.concat - # will fail on an empty list. - if not frames: - return pandas.DataFrame(columns=columns) - - # page.to_dataframe() does not preserve column order. Rearrange at - # the end using manually-parsed schema. - return pandas.concat(frames)[columns] - - def _process_worker_updates(self, worker_queue, progress_queue): - last_update_time = time.time() - current_update = 0 - - # Sum all updates in a contant loop. - while True: - try: - current_update += worker_queue.get(timeout=_PROGRESS_INTERVAL) - - # Time to send to the progress bar queue? - current_time = time.time() - elapsed_time = current_time - last_update_time - if elapsed_time > _PROGRESS_WORKER_INTERVAL: - progress_queue.put(current_update) - last_update_time = current_time - current_update = 0 - - except queue.Empty: - # Keep going, unless there probably aren't going to be any - # additional updates. - if self._to_dataframe_finished: - progress_queue.put(current_update) - return - - def _process_progress_updates(self, progress_queue, progress_bar): - if progress_bar is None: - return - - # Output all updates since the last interval. - while True: - try: - next_update = progress_queue.get_nowait() - progress_bar.update(next_update) - except queue.Empty: - break - - if self._to_dataframe_finished: - progress_bar.close() - return - - def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): - """Use (faster, but billable) BQ Storage API to construct DataFrame.""" - if bigquery_storage_v1beta1 is None: - raise ValueError(_NO_BQSTORAGE_ERROR) - - if "$" in self._table.table_id: - raise ValueError( - "Reading from a specific partition is not currently supported." - ) - if "@" in self._table.table_id: - raise ValueError( - "Reading from a specific snapshot is not currently supported." - ) - - read_options = bigquery_storage_v1beta1.types.TableReadOptions() - if self._selected_fields is not None: - for field in self._selected_fields: - read_options.selected_fields.append(field.name) - - requested_streams = 0 - if self._preserve_order: - requested_streams = 1 - - session = bqstorage_client.create_read_session( - self._table.to_bqstorage(), - "projects/{}".format(self._project), - read_options=read_options, - requested_streams=requested_streams, - ) - - # We need to parse the schema manually so that we can rearrange the - # columns. - schema = json.loads(session.avro_schema.schema) - columns = [field["name"] for field in schema["fields"]] - - # Avoid reading rows from an empty table. pandas.concat will fail on an - # empty list. - if not session.streams: - return pandas.DataFrame(columns=columns) - - total_streams = len(session.streams) - - # Use _to_dataframe_finished to notify worker threads when to quit. - # See: https://stackoverflow.com/a/29237343/101923 - self._to_dataframe_finished = False - - # Create a queue to track progress updates across threads. - worker_queue = _NoopProgressBarQueue() - progress_queue = None - progress_thread = None - if progress_bar is not None: - worker_queue = queue.Queue() - progress_queue = queue.Queue() - progress_thread = threading.Thread( - target=self._process_worker_updates, args=(worker_queue, progress_queue) - ) - progress_thread.start() - - def get_frames(pool): - frames = [] - - # Manually submit jobs and wait for download to complete rather - # than using pool.map because pool.map continues running in the - # background even if there is an exception on the main thread. - # See: https://github.com/googleapis/google-cloud-python/pull/7698 - not_done = [ - pool.submit( - self._to_dataframe_bqstorage_stream, - bqstorage_client, - dtypes, - columns, - session, - stream, - worker_queue, - ) - for stream in session.streams - ] - - while not_done: - done, not_done = concurrent.futures.wait( - not_done, timeout=_PROGRESS_INTERVAL - ) - frames.extend([future.result() for future in done]) - - # The progress bar needs to update on the main thread to avoid - # contention over stdout / stderr. - self._process_progress_updates(progress_queue, progress_bar) - - return frames - - with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: - try: - frames = get_frames(pool) - finally: - # No need for a lock because reading/replacing a variable is - # defined to be an atomic operation in the Python language - # definition (enforced by the global interpreter lock). - self._to_dataframe_finished = True - - # Shutdown all background threads, now that they should know to - # exit early. - pool.shutdown(wait=True) - if progress_thread is not None: - progress_thread.join() - - # Update the progress bar one last time to close it. - self._process_progress_updates(progress_queue, progress_bar) - return pandas.concat(frames, ignore_index=True) - def _get_progress_bar(self, progress_bar_type): """Construct a tqdm progress bar object, if tqdm is installed.""" if tqdm is None: @@ -1613,6 +1394,45 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None + def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): + """Create an iterable of pandas DataFrames, to process the table as a stream. + + See ``to_dataframe`` for argument descriptions. + """ + if bqstorage_client is not None: + column_names = [field.name for field in self._schema] + try: + # Iterate over the stream so that read errors are raised (and + # the method can then fallback to tabledata.list). + for frame in _pandas_helpers.download_dataframe_bqstorage( + self._project, + self._table, + bqstorage_client, + column_names, + dtypes, + preserve_order=self._preserve_order, + selected_fields=self._selected_fields, + ): + yield frame + return + except google.api_core.exceptions.Forbidden: + # Don't hide errors such as insufficient permissions to create + # a read session, or the API is not enabled. Both of those are + # clearly problems if the developer has explicitly asked for + # BigQuery Storage API support. + raise + except google.api_core.exceptions.GoogleAPICallError: + # There is a known issue with reading from small anonymous + # query results tables, so some errors are expected. Rather + # than throw those errors, try reading the DataFrame again, but + # with the tabledata.list API. + pass + + for frame in _pandas_helpers.download_dataframe_tabledata_list( + iter(self.pages), self.schema, dtypes + ): + yield frame + def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Create a pandas DataFrame by loading all pages of a query. @@ -1682,25 +1502,28 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non progress_bar = self._get_progress_bar(progress_bar_type) - if bqstorage_client is not None: - try: - return self._to_dataframe_bqstorage( - bqstorage_client, dtypes, progress_bar=progress_bar - ) - except google.api_core.exceptions.Forbidden: - # Don't hide errors such as insufficient permissions to create - # a read session, or the API is not enabled. Both of those are - # clearly problems if the developer has explicitly asked for - # BigQuery Storage API support. - raise - except google.api_core.exceptions.GoogleAPICallError: - # There is a known issue with reading from small anonymous - # query results tables, so some errors are expected. Rather - # than throw those errors, try reading the DataFrame again, but - # with the tabledata.list API. - pass + frames = [] + for frame in self._to_dataframe_iterable( + bqstorage_client=bqstorage_client, dtypes=dtypes + ): + frames.append(frame) + + if progress_bar is not None: + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(len(frame)) - return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar) + if progress_bar is not None: + # Indicate that the download has finished. + progress_bar.close() + + # Avoid concatting an empty list. + if not frames: + column_names = [field.name for field in self._schema] + return pandas.DataFrame(columns=column_names) + return pandas.concat(frames, ignore_index=True) class _EmptyRowIterator(object): diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 40b4548dae28..1c95aef0cec9 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -444,7 +444,7 @@ def test_bq_to_arrow_array_w_special_floats(module_under_test): @pytest.mark.skipIf(pandas is None, "Requires `pandas`") @pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") -def test_to_arrow_w_required_fields(module_under_test): +def test_dataframe_to_arrow_w_required_fields(module_under_test): bq_schema = ( schema.SchemaField("field01", "STRING", mode="REQUIRED"), schema.SchemaField("field02", "BYTES", mode="REQUIRED"), @@ -489,7 +489,7 @@ def test_to_arrow_w_required_fields(module_under_test): } ) - arrow_table = module_under_test.to_arrow(dataframe, bq_schema) + arrow_table = module_under_test.dataframe_to_arrow(dataframe, bq_schema) arrow_schema = arrow_table.schema assert len(arrow_schema) == len(bq_schema) @@ -499,7 +499,7 @@ def test_to_arrow_w_required_fields(module_under_test): @pytest.mark.skipIf(pandas is None, "Requires `pandas`") @pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") -def test_to_arrow_w_unknown_type(module_under_test): +def test_dataframe_to_arrow_w_unknown_type(module_under_test): bq_schema = ( schema.SchemaField("field00", "UNKNOWN_TYPE"), schema.SchemaField("field01", "STRING"), @@ -516,7 +516,7 @@ def test_to_arrow_w_unknown_type(module_under_test): ) with warnings.catch_warnings(record=True) as warned: - arrow_table = module_under_test.to_arrow(dataframe, bq_schema) + arrow_table = module_under_test.dataframe_to_arrow(dataframe, bq_schema) arrow_schema = arrow_table.schema assert len(warned) == 1 @@ -531,18 +531,18 @@ def test_to_arrow_w_unknown_type(module_under_test): @pytest.mark.skipIf(pandas is None, "Requires `pandas`") -def test_to_parquet_without_pyarrow(module_under_test, monkeypatch): +def test_dataframe_to_parquet_without_pyarrow(module_under_test, monkeypatch): monkeypatch.setattr(module_under_test, "pyarrow", None) with pytest.raises(ValueError) as exc: - module_under_test.to_parquet(pandas.DataFrame(), (), None) + module_under_test.dataframe_to_parquet(pandas.DataFrame(), (), None) assert "pyarrow is required" in str(exc) @pytest.mark.skipIf(pandas is None, "Requires `pandas`") @pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") -def test_to_parquet_w_missing_columns(module_under_test, monkeypatch): +def test_dataframe_to_parquet_w_missing_columns(module_under_test, monkeypatch): with pytest.raises(ValueError) as exc: - module_under_test.to_parquet( + module_under_test.dataframe_to_parquet( pandas.DataFrame(), (schema.SchemaField("not_found", "STRING"),), None ) assert "columns in schema must match" in str(exc) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 378323ce2932..a0ded16173d4 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import concurrent.futures import itertools import json import time @@ -22,7 +21,6 @@ import mock import pytest import six -from six.moves import queue import google.api_core.exceptions @@ -1859,9 +1857,6 @@ def test_to_dataframe_w_bqstorage_nonempty(self): from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader - # Speed up testing. - mut._PROGRESS_INTERVAL = 0.01 - bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -1893,21 +1888,13 @@ def test_to_dataframe_w_bqstorage_nonempty(self): {"colA": -1, "colB": "def", "colC": 4.0}, ] - def blocking_to_dataframe(*args, **kwargs): - # Sleep for longer than the waiting interval so that we know we're - # only reading one page per loop at most. - time.sleep(2 * mut._PROGRESS_INTERVAL) - return pandas.DataFrame(page_items, columns=["colA", "colB", "colC"]) - mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_page.to_dataframe.return_value = pandas.DataFrame( + page_items, columns=["colA", "colB", "colC"] + ) mock_pages = (mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) - # Test that full queue errors are ignored. - mock_queue = mock.create_autospec(mut._NoopProgressBarQueue) - mock_queue().put_nowait.side_effect = queue.Full - schema = [ schema.SchemaField("colA", "IGNORED"), schema.SchemaField("colC", "IGNORED"), @@ -1923,10 +1910,7 @@ def blocking_to_dataframe(*args, **kwargs): selected_fields=schema, ) - with mock.patch.object(mut, "_NoopProgressBarQueue", mock_queue), mock.patch( - "concurrent.futures.wait", wraps=concurrent.futures.wait - ) as mock_wait: - got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) # Are the columns in the expected order? column_names = ["colA", "colC", "colB"] @@ -1937,12 +1921,6 @@ def blocking_to_dataframe(*args, **kwargs): total_rows = len(page_items) * total_pages self.assertEqual(len(got.index), total_rows) - # Make sure that this test looped through multiple progress intervals. - self.assertGreaterEqual(mock_wait.call_count, 2) - - # Make sure that this test pushed to the progress queue. - self.assertEqual(mock_queue().put_nowait.call_count, total_pages) - @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"