Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial read_gbq support #4

Merged
merged 46 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
94c41f6
add precommit config
ncclementi Aug 5, 2021
48becdb
add read_gbq
ncclementi Aug 5, 2021
a934259
add setup and req
ncclementi Aug 5, 2021
04bdd80
modifications suggested by bnaul
ncclementi Aug 6, 2021
ab16a32
raise error when table type is VIEW
ncclementi Aug 6, 2021
455f749
add linting github actions
ncclementi Aug 6, 2021
c417d5f
add comment on context manager related to possible upstram solution
ncclementi Aug 6, 2021
4839bbb
avoid scanning table when creating partitions
ncclementi Aug 11, 2021
774e79b
add first read_gbq test
ncclementi Aug 17, 2021
7bdd66a
add partitioning test
ncclementi Aug 17, 2021
31a1253
use pytest fixtures
ncclementi Aug 18, 2021
db4edb4
use context manager on test
ncclementi Aug 18, 2021
be1efbd
ignore bare except for now
ncclementi Aug 18, 2021
35cbdc6
remove prefix from delayed kwargs
ncclementi Aug 18, 2021
40de1ea
make dataset name random, remove annotate
ncclementi Aug 18, 2021
45e0004
better name for delayed _read_rows_arrow
ncclementi Aug 18, 2021
de93e88
implementation of HLG - wip
ncclementi Aug 19, 2021
3070ae3
Slight refactor
jrbourbeau Aug 20, 2021
b43daf6
Minor test tweaks
jrbourbeau Aug 20, 2021
50f3c6a
Update requirements.txt
ncclementi Sep 16, 2021
f8a578c
use context manager for bq client
ncclementi Sep 17, 2021
a91c73c
remove with_storage_api since it is always true
ncclementi Sep 17, 2021
548f2fb
remove partition fields option
ncclementi Sep 17, 2021
d3ffa79
add test github actions setup
ncclementi Sep 17, 2021
44096a1
add ci environments
ncclementi Sep 17, 2021
b19dca4
trigger ci
ncclementi Sep 17, 2021
982a5f5
trigger ci again
ncclementi Sep 17, 2021
4292ac3
add pytest to envs
ncclementi Sep 17, 2021
14ba56c
Only run CI on push events
jrbourbeau Sep 20, 2021
32b6686
Minor cleanup
jrbourbeau Sep 20, 2021
97b5d21
Use mamba
jrbourbeau Sep 20, 2021
e03e731
update docstrings
ncclementi Sep 21, 2021
d73b686
missing docstring
ncclementi Sep 21, 2021
3f8e397
trigger ci - testing workflow
ncclementi Sep 21, 2021
64fe0ec
use env variable for project id
ncclementi Sep 21, 2021
6f94825
add test for read with row_filter
ncclementi Sep 21, 2021
1a51981
add test for read with kwargs
ncclementi Sep 21, 2021
acb404e
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
d78c2a9
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
2b46c4f
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
5ac1358
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
216a4e7
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
46e4923
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
3204bc2
tweak on docstrings
ncclementi Sep 22, 2021
f17cfb8
add readme content
ncclementi Sep 22, 2021
d1398c2
Minor updates
jrbourbeau Sep 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Linting

on:
push:
branches: main
pull_request:
branches: main

jobs:
checks:
name: "pre-commit hooks"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: pre-commit/[email protected]
54 changes: 54 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Tests

on: push

# When this workflow is queued, automatically cancel any previous running
# or pending jobs from the same branch
concurrency:
group: ${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ${{ matrix.os }}
defaults:
run:
shell: bash -l {0}
strategy:
fail-fast: false
matrix:
os: ["windows-latest", "ubuntu-latest", "macos-latest"]
python-version: ["3.7", "3.8", "3.9"]

steps:
- name: Checkout source
uses: actions/checkout@v2
with:
fetch-depth: 0 # Needed by codecov.io

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
with:
miniforge-variant: Mambaforge
miniforge-version: latest
use-mamba: true
channel-priority: strict
python-version: ${{ matrix.python-version }}
environment-file: ci/environment-${{ matrix.python-version }}.yaml
activate-environment: test-environment
auto-activate-base: false

- name: Install dask-bigquery
run: python -m pip install --no-deps -e .

- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@master
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
export_default_credentials: true

- name: Run tests
env:
DASK_BIGQUERY_PROJECT_ID: "${{ secrets.GCP_PROJECT_ID }}"
run: pytest -v dask_bigquery
17 changes: 17 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repos:
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
language_version: python3
exclude: versioneer.py
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
- id: flake8
language_version: python3
- repo: https://github.com/pycqa/isort
rev: 5.8.0
hooks:
- id: isort
language_version: python3
14 changes: 14 additions & 0 deletions ci/environment-3.7.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.7
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
14 changes: 14 additions & 0 deletions ci/environment-3.8.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.8
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
14 changes: 14 additions & 0 deletions ci/environment-3.9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.9
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
1 change: 1 addition & 0 deletions dask_bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .core import read_gbq
155 changes: 155 additions & 0 deletions dask_bigquery/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from __future__ import annotations

from contextlib import contextmanager
from functools import partial

import pandas as pd
import pyarrow
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from google.cloud import bigquery, bigquery_storage


@contextmanager
def bigquery_client(project_id=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that this always returns both BQ and BQS, should it be called bigquery_clients? 🙂

"""This context manager is a temporary solution until there is an
upstream solution to handle this.
See googleapis/google-cloud-python#9457
and googleapis/gapic-generator-python#575 for reference.
"""
with bigquery.Client(project_id) as bq_client:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be this PR, but it would be really helpful if we could attribute these requests to Dask/Dask-BigQuery. #6

bq_storage_client = bigquery_storage.BigQueryReadClient(
credentials=bq_client._credentials
)
yield bq_client, bq_storage_client
bq_storage_client.transport.grpc_channel.close()


def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs):
"""Given a Storage API client and a stream name, yield all dataframes."""
return [
pyarrow.ipc.read_record_batch(
pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch),
schema,
).to_pandas()
for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs)
]


def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
read_kwargs: dict,
stream_name: str,
) -> pd.DataFrame:
"""Read a single batch of rows via BQ Storage API, in Arrow binary format.

Parameters
----------
create_read_session_request: callable
kwargs to pass to `bqs_client.create_read_session` as `request`
project_id: str
Name of the BigQuery project.
read_kwargs: dict
kwargs to pass to read_rows()
stream_name: str
BigQuery Storage API Stream "name"
NOTE: Please set if reading from Storage API without any `row_restriction`.
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
"""
with bigquery_client(project_id) as (bq_client, bqs_client):
session = bqs_client.create_read_session(make_create_read_session_request())
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs)
# NOTE: BQ Storage API can return empty streams
if len(shards) == 0:
shards = [schema.empty_table().to_pandas()]

return pd.concat(shards)


def read_gbq(
project_id: str,
dataset_id: str,
table_id: str,
row_filter="",
read_kwargs: dict = None,
):
"""Read table as dask dataframe using BigQuery Storage API via Arrow format.
Partitions will be approximately balanced according to BigQuery stream allocation logic.

Parameters
----------
project_id: str
Name of the BigQuery project.
dataset_id: str
BigQuery dataset within project
table_id: str
BigQuery table within dataset
row_filter: str
SQL text filtering statement to pass to `row_restriction`
read_kwargs: dict
kwargs to pass to read_rows()

Returns
-------
Dask DataFrame
"""
read_kwargs = read_kwargs or {}
with bigquery_client(project_id) as (bq_client, bqs_client):
table_ref = bq_client.get_table(f"{dataset_id}.{table_id}")
if table_ref.table_type == "VIEW":
raise TypeError("Table type VIEW not supported")

def make_create_read_session_request(row_filter=""):
return bigquery_storage.types.CreateReadSessionRequest(
max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide
parent=f"projects/{project_id}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should projects be hard coded here?

Nvm, looking at the docstring for CreateReadSessionRequest, it appears the answer is "yes, projects should be hardcoded"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_session=bigquery_storage.types.ReadSession(
data_format=bigquery_storage.types.DataFormat.ARROW,
read_options=bigquery_storage.types.ReadSession.TableReadOptions(
row_restriction=row_filter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau I'm not quite sure if this works if we don't use the default, should we remove this, since we removed the partition field options. We seem to always leave it as ""
Here is some documentation to review https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1beta2/types.html#google.cloud.bigquery_storage_v1beta2.types.ReadSession.TableReadOptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row_filter is quite useful as it will perform the filtering server-side and can avoid a lot of extraneous IO. I don't see any reason to remove it, but probably it should be made into just a more generic TableReadOptions object like you linked to so that it can be used for column selection as well

one other small note: the doc you linked is for the beta API, v1 was released since our original implementation https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1/types.html#google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions. there are a couple of other references to the beta docs throughout as well

),
table=table_ref.to_bqstorage(),
),
)

# Create a read session in order to detect the schema.
# Read sessions are light weight and will be auto-deleted after 24 hours.
session = bqs_client.create_read_session(
make_create_read_session_request(row_filter=row_filter)
)
Comment on lines +123 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does session have a close method, or some other cleanup method, we should call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
meta = schema.empty_table().to_pandas()

label = "read-gbq-"
output_name = label + tokenize(
project_id,
dataset_id,
table_id,
row_filter,
read_kwargs,
)

layer = DataFrameIOLayer(
output_name,
meta.columns,
[stream.name for stream in session.streams],
partial(
bigquery_read,
make_create_read_session_request,
project_id,
read_kwargs,
),
label=label,
)
divisions = tuple([None] * (len(session.streams) + 1))

graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)
89 changes: 89 additions & 0 deletions dask_bigquery/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
import random
import uuid

import pandas as pd
import pytest
from dask.dataframe.utils import assert_eq
from distributed.utils_test import cluster_fixture # noqa: F401
from distributed.utils_test import client, loop # noqa: F401
from google.cloud import bigquery

from dask_bigquery import read_gbq


@pytest.fixture
def df():
records = [
{
"name": random.choice(["fred", "wilma", "barney", "betty"]),
"number": random.randint(0, 100),
"idx": i,
}
for i in range(10)
]

yield pd.DataFrame(records)


@pytest.fixture
def dataset(df):
"Push some data to BigQuery using pandas gbq"
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID", "dask-bigquery")
dataset_id = uuid.uuid4().hex
table_id = "table_test"
# push data to gbq
pd.DataFrame.to_gbq(
df,
destination_table=f"{dataset_id}.{table_id}",
project_id=project_id,
chunksize=5,
if_exists="append",
)
yield (project_id, dataset_id, table_id)

with bigquery.Client() as bq_client:
bq_client.delete_dataset(
dataset=f"{project_id}.{dataset_id}",
delete_contents=True,
)


# test simple read
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
def test_read_gbq(df, dataset, client):
"""Test simple read of data pushed to BigQuery using pandas-gbq"""
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
project_id, dataset_id, table_id = dataset
ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id)

assert list(ddf.columns) == ["name", "number", "idx"]
assert ddf.npartitions == 2
assert assert_eq(ddf.set_index("idx"), df.set_index("idx"))


def test_read_row_filter(df, dataset, client):
"Test read data with a row restriction providing `row_filter`"
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
project_id, dataset_id, table_id = dataset
ddf = read_gbq(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
row_filter="idx < 5",
)

assert list(ddf.columns) == ["name", "number", "idx"]
assert ddf.npartitions == 2
assert assert_eq(ddf.set_index("idx").loc[:4], df.set_index("idx").loc[:4])


def test_read_kwargs(df, dataset, client):
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
"Test read data with a `read_kwargs`"
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
project_id, dataset_id, table_id = dataset
ddf = read_gbq(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
read_kwargs={"timeout": 1e-12},
)

with pytest.raises(Exception, match="504 Deadline Exceeded"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice use of match=! I'm curious if we can catch a more specific exception here (e.g. ValueError)? What type of error is raised when a timeout occurs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the traceback of the error I got, it's an actual Exception, do you think we can write this better?

distributed.worker - WARNING - Compute Failed
Function:  subgraph_callable-4f693566-2e95-419b-8783-b36b04b3
args:      ('projects/dask-bigquery/locations/us/sessions/CAISDFRjM3NzSmxoZm1BMxoCanEaAmpk/streams/GgJqcRoCamQoAg')
kwargs:    {}
Exception: Exception('504 Deadline Exceeded')

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
/var/folders/1y/ydztfpnd11b6qmvbb8_x56jh0000gn/T/ipykernel_51820/2349744101.py in <module>
----> 1 test_kwargs.compute()

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    284         dask.base.compute
    285         """
--> 286         (result,) = compute(self, traverse=False, **kwargs)
    287         return result
    288 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2669                     should_rejoin = False
   2670             try:
-> 2671                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2672             finally:
   2673                 for f in futures.values():

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1946             else:
   1947                 local_worker = None
-> 1948             return self.sync(
   1949                 self._gather,
   1950                 futures,

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    843             return future
    844         else:
--> 845             return sync(
    846                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    847             )

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    323     if error[0]:
    324         typ, exc, tb = error[0]
--> 325         raise exc.with_traceback(tb)
    326     else:
    327         return result[0]

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/utils.py in f()
    306             if callback_timeout is not None:
    307                 future = asyncio.wait_for(future, callback_timeout)
--> 308             result[0] = yield future
    309         except Exception:
    310             error[0] = sys.exc_info()

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1811                             exc = CancelledError(key)
   1812                         else:
-> 1813                             raise exception.with_traceback(traceback)
   1814                         raise exc
   1815                     if errors == "skip":

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/optimization.py in __call__()
    967         if not len(args) == len(self.inkeys):
    968             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    970 
    971     def __reduce__(self):

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/Documents/git/dask-bigquery/dask_bigquery/core.py in bigquery_read()
     65             pyarrow.py_buffer(session.arrow_schema.serialized_schema)
     66         )
---> 67         shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs)
     68         # NOTE: BQ Storage API can return empty streams
     69         if len(shards) == 0:

~/Documents/git/dask-bigquery/dask_bigquery/core.py in _stream_to_dfs()
     35             schema,
     36         ).to_pandas()
---> 37         for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs)
     38     ]
     39 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/client.py in read_rows()
    122         """
    123         gapic_client = super(BigQueryReadClient, self)
--> 124         stream = gapic_client.read_rows(
    125             read_stream=name,
    126             offset=offset,

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py in read_rows()
    596 
    597         # Send the request.
--> 598         response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
    599 
    600         # Done; return the response.

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__()
    143             kwargs["metadata"] = metadata
    144 
--> 145         return wrapped_func(*args, **kwargs)
    146 
    147 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func()
    284                 self._initial, self._maximum, multiplier=self._multiplier
    285             )
--> 286             return retry_target(
    287                 target,
    288                 self._predicate,

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/retry.py in retry_target()
    187     for sleep in sleep_generator:
    188         try:
--> 189             return target()
    190 
    191         # pylint: disable=broad-except

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/timeout.py in func_with_timeout()
    100             """Wrapped function that adds timeout."""
    101             kwargs["timeout"] = self._timeout
--> 102             return func(*args, **kwargs)
    103 
    104         return func_with_timeout

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable()
    162             )
    163         except grpc.RpcError as exc:
--> 164             six.raise_from(exceptions.from_grpc_error(exc), exc)
    165 
    166     return error_remapped_callable

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/six.py in raise_from()

Exception: 504 Deadline Exceeded

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Since an Exception is being raised, I don't think we can do any better than what you've already got here

ddf.compute()
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dask
distributed
google-cloud-bigquery >= 2.11.0
google-cloud-bigquery-storage
pandas
pandas-gbq
pyarrow
Loading