Skip to content

Commit

Permalink
Initial read_gbq support (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
ncclementi authored Sep 23, 2021
1 parent 77d9c6d commit 149d2c0
Show file tree
Hide file tree
Showing 13 changed files with 436 additions and 1 deletion.
16 changes: 16 additions & 0 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Linting

on:
push:
branches: main
pull_request:
branches: main

jobs:
checks:
name: "pre-commit hooks"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: pre-commit/[email protected]
54 changes: 54 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Tests

on: push

# When this workflow is queued, automatically cancel any previous running
# or pending jobs from the same branch
concurrency:
group: ${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ${{ matrix.os }}
defaults:
run:
shell: bash -l {0}
strategy:
fail-fast: false
matrix:
os: ["windows-latest", "ubuntu-latest", "macos-latest"]
python-version: ["3.7", "3.8", "3.9"]

steps:
- name: Checkout source
uses: actions/checkout@v2
with:
fetch-depth: 0 # Needed by codecov.io

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
with:
miniforge-variant: Mambaforge
miniforge-version: latest
use-mamba: true
channel-priority: strict
python-version: ${{ matrix.python-version }}
environment-file: ci/environment-${{ matrix.python-version }}.yaml
activate-environment: test-environment
auto-activate-base: false

- name: Install dask-bigquery
run: python -m pip install --no-deps -e .

- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@master
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
export_default_credentials: true

- name: Run tests
env:
DASK_BIGQUERY_PROJECT_ID: "${{ secrets.GCP_PROJECT_ID }}"
run: pytest -v dask_bigquery
17 changes: 17 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repos:
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
language_version: python3
exclude: versioneer.py
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
- id: flake8
language_version: python3
- repo: https://github.com/pycqa/isort
rev: 5.8.0
hooks:
- id: isort
language_version: python3
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
# dask-bigquery
# Dask-BigQuery

[![Tests](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml) [![Linting](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml)

Read data from Google BigQuery with Dask

## Installation

## Example

`dask-bigquery` assumes that you are already authenticated.

```python
import dask_bigquery

ddf = dask_bigquery.read_gbq(
project_id="your_project_id",
dataset_id="your_dataset",
table_id="your_table",
)

ddf.head()
```

## History

This project stems from the discussion in
[this Dask issue](https://github.com/dask/dask/issues/3121) and
[this initial implementation](https://gist.github.com/bnaul/4819f045ccbee160b60a530b6cfc0c98#file-dask_bigquery-py)
developed by [Brett Naul](https://github.com/bnaul), [Jacob Hayes](https://github.com/JacobHayes),
and [Steven Soojin Kim](https://github.com/mikss).

## License

[BSD-3](LICENSE)
14 changes: 14 additions & 0 deletions ci/environment-3.7.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.7
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
14 changes: 14 additions & 0 deletions ci/environment-3.8.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.8
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
14 changes: 14 additions & 0 deletions ci/environment-3.9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.9
- dask
- distributed
- pandas
- pyarrow
- pytest
- grpcio
- pandas-gbq
- google-cloud-bigquery
- google-cloud-bigquery-storage
1 change: 1 addition & 0 deletions dask_bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .core import read_gbq
155 changes: 155 additions & 0 deletions dask_bigquery/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from __future__ import annotations

from contextlib import contextmanager
from functools import partial

import pandas as pd
import pyarrow
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from google.cloud import bigquery, bigquery_storage


@contextmanager
def bigquery_clients(project_id):
"""This context manager is a temporary solution until there is an
upstream solution to handle this.
See googleapis/google-cloud-python#9457
and googleapis/gapic-generator-python#575 for reference.
"""
with bigquery.Client(project_id) as bq_client:
bq_storage_client = bigquery_storage.BigQueryReadClient(
credentials=bq_client._credentials
)
yield bq_client, bq_storage_client
bq_storage_client.transport.grpc_channel.close()


def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs):
"""Given a Storage API client and a stream name, yield all dataframes."""
return [
pyarrow.ipc.read_record_batch(
pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch),
schema,
).to_pandas()
for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs)
]


def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
read_kwargs: dict,
stream_name: str,
) -> pd.DataFrame:
"""Read a single batch of rows via BQ Storage API, in Arrow binary format.
Parameters
----------
create_read_session_request: callable
kwargs to pass to `bqs_client.create_read_session` as `request`
project_id: str
Name of the BigQuery project.
read_kwargs: dict
kwargs to pass to read_rows()
stream_name: str
BigQuery Storage API Stream "name"
NOTE: Please set if reading from Storage API without any `row_restriction`.
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
"""
with bigquery_clients(project_id) as (_, bqs_client):
session = bqs_client.create_read_session(make_create_read_session_request())
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs)
# NOTE: BQ Storage API can return empty streams
if len(shards) == 0:
shards = [schema.empty_table().to_pandas()]

return pd.concat(shards)


def read_gbq(
project_id: str,
dataset_id: str,
table_id: str,
row_filter="",
read_kwargs: dict = None,
):
"""Read table as dask dataframe using BigQuery Storage API via Arrow format.
Partitions will be approximately balanced according to BigQuery stream allocation logic.
Parameters
----------
project_id: str
Name of the BigQuery project id.
dataset_id: str
BigQuery dataset within project
table_id: str
BigQuery table within dataset
row_filter: str
SQL text filtering statement to pass to `row_restriction`
read_kwargs: dict
kwargs to pass to read_rows()
Returns
-------
Dask DataFrame
"""
read_kwargs = read_kwargs or {}
with bigquery_clients(project_id) as (bq_client, bqs_client):
table_ref = bq_client.get_table(f"{dataset_id}.{table_id}")
if table_ref.table_type == "VIEW":
raise TypeError("Table type VIEW not supported")

def make_create_read_session_request(row_filter=""):
return bigquery_storage.types.CreateReadSessionRequest(
max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide
parent=f"projects/{project_id}",
read_session=bigquery_storage.types.ReadSession(
data_format=bigquery_storage.types.DataFormat.ARROW,
read_options=bigquery_storage.types.ReadSession.TableReadOptions(
row_restriction=row_filter,
),
table=table_ref.to_bqstorage(),
),
)

# Create a read session in order to detect the schema.
# Read sessions are light weight and will be auto-deleted after 24 hours.
session = bqs_client.create_read_session(
make_create_read_session_request(row_filter=row_filter)
)
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
meta = schema.empty_table().to_pandas()

label = "read-gbq-"
output_name = label + tokenize(
project_id,
dataset_id,
table_id,
row_filter,
read_kwargs,
)

layer = DataFrameIOLayer(
output_name,
meta.columns,
[stream.name for stream in session.streams],
partial(
bigquery_read,
make_create_read_session_request,
project_id,
read_kwargs,
),
label=label,
)
divisions = tuple([None] * (len(session.streams) + 1))

graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)
Loading

0 comments on commit 149d2c0

Please sign in to comment.