Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial read_gbq support #4

Merged
merged 46 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
94c41f6
add precommit config
ncclementi Aug 5, 2021
48becdb
add read_gbq
ncclementi Aug 5, 2021
a934259
add setup and req
ncclementi Aug 5, 2021
04bdd80
modifications suggested by bnaul
ncclementi Aug 6, 2021
ab16a32
raise error when table type is VIEW
ncclementi Aug 6, 2021
455f749
add linting github actions
ncclementi Aug 6, 2021
c417d5f
add comment on context manager related to possible upstram solution
ncclementi Aug 6, 2021
4839bbb
avoid scanning table when creating partitions
ncclementi Aug 11, 2021
774e79b
add first read_gbq test
ncclementi Aug 17, 2021
7bdd66a
add partitioning test
ncclementi Aug 17, 2021
31a1253
use pytest fixtures
ncclementi Aug 18, 2021
db4edb4
use context manager on test
ncclementi Aug 18, 2021
be1efbd
ignore bare except for now
ncclementi Aug 18, 2021
35cbdc6
remove prefix from delayed kwargs
ncclementi Aug 18, 2021
40de1ea
make dataset name random, remove annotate
ncclementi Aug 18, 2021
45e0004
better name for delayed _read_rows_arrow
ncclementi Aug 18, 2021
de93e88
implementation of HLG - wip
ncclementi Aug 19, 2021
3070ae3
Slight refactor
jrbourbeau Aug 20, 2021
b43daf6
Minor test tweaks
jrbourbeau Aug 20, 2021
50f3c6a
Update requirements.txt
ncclementi Sep 16, 2021
f8a578c
use context manager for bq client
ncclementi Sep 17, 2021
a91c73c
remove with_storage_api since it is always true
ncclementi Sep 17, 2021
548f2fb
remove partition fields option
ncclementi Sep 17, 2021
d3ffa79
add test github actions setup
ncclementi Sep 17, 2021
44096a1
add ci environments
ncclementi Sep 17, 2021
b19dca4
trigger ci
ncclementi Sep 17, 2021
982a5f5
trigger ci again
ncclementi Sep 17, 2021
4292ac3
add pytest to envs
ncclementi Sep 17, 2021
14ba56c
Only run CI on push events
jrbourbeau Sep 20, 2021
32b6686
Minor cleanup
jrbourbeau Sep 20, 2021
97b5d21
Use mamba
jrbourbeau Sep 20, 2021
e03e731
update docstrings
ncclementi Sep 21, 2021
d73b686
missing docstring
ncclementi Sep 21, 2021
3f8e397
trigger ci - testing workflow
ncclementi Sep 21, 2021
64fe0ec
use env variable for project id
ncclementi Sep 21, 2021
6f94825
add test for read with row_filter
ncclementi Sep 21, 2021
1a51981
add test for read with kwargs
ncclementi Sep 21, 2021
acb404e
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
d78c2a9
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
2b46c4f
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
5ac1358
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
216a4e7
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
46e4923
Update dask_bigquery/tests/test_core.py
ncclementi Sep 21, 2021
3204bc2
tweak on docstrings
ncclementi Sep 22, 2021
f17cfb8
add readme content
ncclementi Sep 22, 2021
d1398c2
Minor updates
jrbourbeau Sep 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Linting

on:
push:
branches: main
pull_request:
branches: main

jobs:
checks:
name: "pre-commit hooks"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: pre-commit/[email protected]
54 changes: 54 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Tests

on: push

# When this workflow is queued, automatically cancel any previous running
# or pending jobs from the same branch
concurrency:
group: ${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ${{ matrix.os }}
defaults:
run:
shell: bash -l {0}
strategy:
fail-fast: false
matrix:
os: ["windows-latest", "ubuntu-latest", "macos-latest"]
python-version: ["3.7", "3.8", "3.9"]

steps:
- name: Checkout source
uses: actions/checkout@v2
with:
fetch-depth: 0 # Needed by codecov.io

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
with:
miniforge-variant: Mambaforge
miniforge-version: latest
use-mamba: true
channel-priority: strict
python-version: ${{ matrix.python-version }}
environment-file: ci/environment-${{ matrix.python-version }}.yaml
activate-environment: test-environment
auto-activate-base: false

- name: Install dask-bigquery
run: python -m pip install --no-deps -e .

- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@master
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
export_default_credentials: true

- name: Run tests
env:
DASK_BIGQUERY_PROJECT_ID: "${{ secrets.GCP_PROJECT_ID }}"
run: pytest -v dask_bigquery
17 changes: 17 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repos:
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
language_version: python3
exclude: versioneer.py
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
- id: flake8
language_version: python3
- repo: https://github.com/pycqa/isort
rev: 5.8.0
hooks:
- id: isort
language_version: python3
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
# dask-bigquery
# Dask-BigQuery

[![Tests](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml) [![Linting](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml)

Read data from Google BigQuery with Dask

## Installation

## Example

`dask-bigquery` assumes that you are already authenticated.

```python
import dask_bigquery

ddf = dask_bigquery.read_gbq(
project_id="your_project_id",
dataset_id="your_dataset",
table_id="your_table",
)

ddf.head()
```

## History

This project stems from the discussion in
[this Dask issue](https://github.com/dask/dask/issues/3121) and
[this initial implementation](https://gist.github.com/bnaul/4819f045ccbee160b60a530b6cfc0c98#file-dask_bigquery-py)
developed by [Brett Naul](https://github.com/bnaul), [Jacob Hayes](https://github.com/JacobHayes),
and [Steven Soojin Kim](https://github.com/mikss).

## License

[BSD-3](LICENSE)
14 changes: 14 additions & 0 deletions ci/environment-3.7.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.7
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
14 changes: 14 additions & 0 deletions ci/environment-3.8.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.8
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
14 changes: 14 additions & 0 deletions ci/environment-3.9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.9
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
1 change: 1 addition & 0 deletions dask_bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .core import read_gbq
155 changes: 155 additions & 0 deletions dask_bigquery/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from __future__ import annotations

from contextlib import contextmanager
from functools import partial

import pandas as pd
import pyarrow
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from google.cloud import bigquery, bigquery_storage


@contextmanager
def bigquery_clients(project_id):
"""This context manager is a temporary solution until there is an
upstream solution to handle this.
See googleapis/google-cloud-python#9457
and googleapis/gapic-generator-python#575 for reference.
"""
with bigquery.Client(project_id) as bq_client:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be this PR, but it would be really helpful if we could attribute these requests to Dask/Dask-BigQuery. #6

bq_storage_client = bigquery_storage.BigQueryReadClient(
credentials=bq_client._credentials
)
yield bq_client, bq_storage_client
bq_storage_client.transport.grpc_channel.close()


def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs):
"""Given a Storage API client and a stream name, yield all dataframes."""
return [
pyarrow.ipc.read_record_batch(
pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch),
schema,
).to_pandas()
for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs)
]


def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
read_kwargs: dict,
stream_name: str,
) -> pd.DataFrame:
"""Read a single batch of rows via BQ Storage API, in Arrow binary format.

Parameters
----------
create_read_session_request: callable
kwargs to pass to `bqs_client.create_read_session` as `request`
project_id: str
Name of the BigQuery project.
read_kwargs: dict
kwargs to pass to read_rows()
stream_name: str
BigQuery Storage API Stream "name"
NOTE: Please set if reading from Storage API without any `row_restriction`.
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
"""
with bigquery_clients(project_id) as (_, bqs_client):
session = bqs_client.create_read_session(make_create_read_session_request())
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs)
# NOTE: BQ Storage API can return empty streams
if len(shards) == 0:
shards = [schema.empty_table().to_pandas()]

return pd.concat(shards)


def read_gbq(
project_id: str,
dataset_id: str,
table_id: str,
row_filter="",
read_kwargs: dict = None,
):
"""Read table as dask dataframe using BigQuery Storage API via Arrow format.
Partitions will be approximately balanced according to BigQuery stream allocation logic.

Parameters
----------
project_id: str
Name of the BigQuery project id.
dataset_id: str
BigQuery dataset within project
table_id: str
BigQuery table within dataset
row_filter: str
SQL text filtering statement to pass to `row_restriction`
read_kwargs: dict
kwargs to pass to read_rows()

Returns
-------
Dask DataFrame
"""
read_kwargs = read_kwargs or {}
with bigquery_clients(project_id) as (bq_client, bqs_client):
table_ref = bq_client.get_table(f"{dataset_id}.{table_id}")
if table_ref.table_type == "VIEW":
raise TypeError("Table type VIEW not supported")

def make_create_read_session_request(row_filter=""):
return bigquery_storage.types.CreateReadSessionRequest(
max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide
parent=f"projects/{project_id}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should projects be hard coded here?

Nvm, looking at the docstring for CreateReadSessionRequest, it appears the answer is "yes, projects should be hardcoded"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_session=bigquery_storage.types.ReadSession(
data_format=bigquery_storage.types.DataFormat.ARROW,
read_options=bigquery_storage.types.ReadSession.TableReadOptions(
row_restriction=row_filter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau I'm not quite sure if this works if we don't use the default, should we remove this, since we removed the partition field options. We seem to always leave it as ""
Here is some documentation to review https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1beta2/types.html#google.cloud.bigquery_storage_v1beta2.types.ReadSession.TableReadOptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row_filter is quite useful as it will perform the filtering server-side and can avoid a lot of extraneous IO. I don't see any reason to remove it, but probably it should be made into just a more generic TableReadOptions object like you linked to so that it can be used for column selection as well

one other small note: the doc you linked is for the beta API, v1 was released since our original implementation https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1/types.html#google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions. there are a couple of other references to the beta docs throughout as well

),
table=table_ref.to_bqstorage(),
),
)

# Create a read session in order to detect the schema.
# Read sessions are light weight and will be auto-deleted after 24 hours.
session = bqs_client.create_read_session(
make_create_read_session_request(row_filter=row_filter)
)
Comment on lines +123 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does session have a close method, or some other cleanup method, we should call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
meta = schema.empty_table().to_pandas()

label = "read-gbq-"
output_name = label + tokenize(
project_id,
dataset_id,
table_id,
row_filter,
read_kwargs,
)

layer = DataFrameIOLayer(
output_name,
meta.columns,
[stream.name for stream in session.streams],
partial(
bigquery_read,
make_create_read_session_request,
project_id,
read_kwargs,
),
label=label,
)
divisions = tuple([None] * (len(session.streams) + 1))

graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)
Loading