Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9650] Adding support for ReadAll from BigQuery transform #13170

Merged
merged 29 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
## I/Os
* ReadFromMongoDB can now be used with MongoDB Atlas (Python) ([BEAM-11266](https://issues.apache.org/jira/browse/BEAM-11266).)
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* There is a new transform `ReadAllFromBigQuery` that can receive multiple requests to read data from BigQuery at pipeline runtime. See [PR 13170](https://github.com/apache/beam/pull/13170), and [BEAM-9650](https://issues.apache.org/jira/browse/BEAM-9650).

## New Features / Improvements

Expand Down
268 changes: 186 additions & 82 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,44 @@
`ReadFromBigQuery`, you can use the flag `use_json_exports` to export
data as JSON, and receive base64-encoded bytes.

ReadAllFromBigQuery
-------------------
Beam 2.27.0 introduces a new transform called `ReadAllFromBigQuery` which
allows you to define table and query reads from BigQuery at pipeline
runtime.:::

read_requests = p | beam.Create([
ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'),
ReadFromBigQueryRequest(table='myproject.mydataset.mytable')])
results = read_requests | ReadAllFromBigQuery()

A good application for this transform is in streaming pipelines to
refresh a side input coming from BigQuery. This would work like so:::

side_input = (
p
| 'PeriodicImpulse' >> PeriodicImpulse(
first_timestamp, last_timestamp, interval, True)
| 'MapToReadRequest' >> beam.Map(
lambda x: ReadFromBigQueryRequest(table='dataset.table'))
| beam.io.ReadAllFromBigQuery())
main_input = (
p
| 'MpImpulse' >> beam.Create(sample_main_input_elements)
|
'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(
window.FixedWindows(main_input_windowing_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(
cross_join, rights=beam.pvalue.AsIter(side_input)))

**Note**: This transform is supported on Portable and Dataflow v2 runners.

**Note**: This transform does not currently clean up temporary datasets
created for its execution. (BEAM-11359)

Writing Data to BigQuery
========================

Expand Down Expand Up @@ -234,7 +272,6 @@ def compute_table_name(row):
from __future__ import absolute_import

import collections
import decimal
import itertools
import json
import logging
Expand All @@ -243,6 +280,8 @@ def compute_table_name(row):
import uuid
from builtins import object
from builtins import zip
from typing import Dict
from typing import Union

from future.utils import itervalues
from past.builtins import unicode
Expand All @@ -257,12 +296,15 @@ def compute_table_name(row):
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder
from apache_beam.io.gcp.bigquery_read_internal import _PassThroughThenCleanup
from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.iobase import RangeTracker
from apache_beam.io.iobase import SDFBoundedSourceReader
from apache_beam.io.iobase import SourceBundle
from apache_beam.io.textio import _TextSource as TextSource
from apache_beam.metrics import Metrics
Expand All @@ -284,6 +326,14 @@ def compute_table_name(row):
from apache_beam.transforms.window import GlobalWindows
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
from apache_beam.utils.annotations import experimental

try:
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
except ImportError:
DatasetReference = None
TableReference = None

__all__ = [
'TableRowJsonCoder',
Expand All @@ -292,6 +342,8 @@ def compute_table_name(row):
'BigQuerySink',
'WriteToBigQuery',
'ReadFromBigQuery',
'ReadFromBigQueryRequest',
'ReadAllFromBigQuery',
'SCHEMA_AUTODETECT',
]

Expand Down Expand Up @@ -591,84 +643,6 @@ def reader(self, test_bigquery_client=None):
kms_key=self.kms_key)


FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')


class _JsonToDictCoder(coders.Coder):
"""A coder for a JSON string to a Python dict."""
def __init__(self, table_schema):
self.fields = self._convert_to_tuple(table_schema.fields)
self._converters = {
'INTEGER': int,
'INT64': int,
'FLOAT': float,
'FLOAT64': float,
'NUMERIC': self._to_decimal,
'BYTES': self._to_bytes,
}

@staticmethod
def _to_decimal(value):
return decimal.Decimal(value)

@staticmethod
def _to_bytes(value):
"""Converts value from str to bytes on Python 3.x. Does nothing on
Python 2.7."""
return value.encode('utf-8')

@classmethod
def _convert_to_tuple(cls, table_field_schemas):
"""Recursively converts the list of TableFieldSchema instances to the
list of tuples to prevent errors when pickling and unpickling
TableFieldSchema instances.
"""
if not table_field_schemas:
return []

return [
FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name, x.type)
for x in table_field_schemas
]

def decode(self, value):
value = json.loads(value.decode('utf-8'))
return self._decode_with_schema(value, self.fields)

def _decode_with_schema(self, value, schema_fields):
for field in schema_fields:
if field.name not in value:
# The field exists in the schema, but it doesn't exist in this row.
# It probably means its value was null, as the extract to JSON job
# doesn't preserve null fields
value[field.name] = None
continue

if field.type == 'RECORD':
nested_values = value[field.name]
if field.mode == 'REPEATED':
for i, nested_value in enumerate(nested_values):
nested_values[i] = self._decode_with_schema(
nested_value, field.fields)
else:
value[field.name] = self._decode_with_schema(
nested_values, field.fields)
else:
try:
converter = self._converters[field.type]
value[field.name] = converter(value[field.name])
except KeyError:
# No need to do any conversion
pass
return value

def is_deterministic(self):
return True

def to_type_hint(self):
return dict


class _CustomBigQuerySource(BoundedSource):
def __init__(
self,
Expand Down Expand Up @@ -720,7 +694,7 @@ def __init__(
self.bigquery_job_labels = bigquery_job_labels or {}
self.use_json_exports = use_json_exports
self.temp_dataset = temp_dataset
self._job_name = job_name or 'AUTOMATIC_JOB_NAME'
self._job_name = job_name or 'BQ_EXPORT_JOB'
self._step_name = step_name
self._source_uuid = unique_id

Expand Down Expand Up @@ -1666,7 +1640,7 @@ def _compute_method(self, experiments, is_streaming_pipeline):
def expand(self, pcoll):
p = pcoll.pipeline

if (isinstance(self.table_reference, bigquery.TableReference) and
if (isinstance(self.table_reference, TableReference) and
self.table_reference.projectId is None):
self.table_reference.projectId = pcoll.pipeline.options.view_as(
GoogleCloudOptions).project
Expand Down Expand Up @@ -1878,6 +1852,7 @@ class ReadFromBigQuery(PTransform):
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro\
#avro_conversions
"""

COUNTER = 0

def __init__(self, gcs_location=None, *args, **kwargs):
Expand All @@ -1897,7 +1872,7 @@ def __init__(self, gcs_location=None, *args, **kwargs):
self._kwargs = kwargs

def expand(self, pcoll):
unique_id = str(uuid.uuid4())[0:10]
# TODO(BEAM-11115): Make ReadFromBQ rely on ReadAllFromBQ implementation.
temp_location = pcoll.pipeline.options.view_as(
GoogleCloudOptions).temp_location
job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
Expand Down Expand Up @@ -1931,3 +1906,132 @@ def file_path_to_remove(unused_elm):
*self._args,
**self._kwargs))
| _PassThroughThenCleanup(files_to_remove_pcoll))


class ReadFromBigQueryRequest:
"""
Class that defines data to read from BQ.
"""
def __init__(
self,
query: str = None,
use_standard_sql: bool = True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about other args here ? https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1823
(probably this can be a followup PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args are split between the read request and the transform. I would say all args exist in either.

table: Union[str, TableReference] = None,
flatten_results: bool = False):
"""
Only one of query or table should be specified.

:param query: SQL query to fetch data.
:param use_standard_sql:
Specifies whether to use BigQuery's standard SQL dialect for this query.
The default value is :data:`True`. If set to :data:`False`,
the query will use BigQuery's legacy SQL dialect.
This parameter is ignored for table inputs.
:param table:
The ID of the table to read. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
:param flatten_results:
Flattens all nested and repeated fields in the query results.
The default value is :data:`False`.
"""
self.flatten_results = flatten_results
self.query = query
self.use_standard_sql = use_standard_sql
self.table = table
self.validate()

# We use this internal object ID to generate BigQuery export directories.
self.obj_id = random.randint(0, 100000)

def validate(self):
if self.table is not None and self.query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif self.table is None and self.query is None:
raise ValueError('A BigQuery table or a query must be specified')
if self.table is not None:
if isinstance(self.table, str):
assert self.table.find('.'), (
'Expected a table reference '
'(PROJECT:DATASET.TABLE or DATASET.TABLE) instead of %s'
% self.table)


@experimental()
class ReadAllFromBigQuery(PTransform):
"""Read data from BigQuery.

PTransform:ReadFromBigQueryRequest->Rows

This PTransform uses a BigQuery export job to take a snapshot of the table
on GCS, and then reads from each produced file. Data is exported into
a new subdirectory for each export using UUIDs generated in
`ReadFromBigQueryRequest` objects.

It is recommended not to use this PTransform for streaming jobs on
GlobalWindow, since it will not be able to cleanup snapshots.

Args:
gcs_location (str): The name of the Google Cloud Storage
bucket where the extracted table should be written as a string. If
:data:`None`, then the temp_location parameter is used.
validate (bool): If :data:`True`, various checks will be done when source
gets initialized (e.g., is table present?).
kms_key (str): Experimental. Optional Cloud KMS key name for use when
creating new temporary tables.
"""
COUNTER = 0

def __init__(
self,
gcs_location: Union[str, ValueProvider] = None,
validate: bool = False,
kms_key: str = None,
temp_dataset: Union[str, DatasetReference] = None,
bigquery_job_labels: Dict[str, str] = None):
if gcs_location:
if not isinstance(gcs_location, (str, ValueProvider)):
raise TypeError(
'%s: gcs_location must be of type string'
' or ValueProvider; got %r instead' %
(self.__class__.__name__, type(gcs_location)))

self.gcs_location = gcs_location
self.validate = validate
self.kms_key = kms_key
self.bigquery_job_labels = bigquery_job_labels
self.temp_dataset = temp_dataset

def expand(self, pcoll):
job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
unique_id = str(uuid.uuid4())[0:10]

try:
step_name = self.label
except AttributeError:
step_name = 'ReadAllFromBigQuery_%d' % ReadAllFromBigQuery.COUNTER
ReadAllFromBigQuery.COUNTER += 1

sources_to_read, cleanup_locations = (
pcoll
| beam.ParDo(
_BigQueryReadSplit(
options=pcoll.pipeline.options,
gcs_location=self.gcs_location,
bigquery_job_labels=self.bigquery_job_labels,
job_name=job_name,
step_name=step_name,
unique_id=unique_id,
kms_key=self.kms_key,
project=project,
temp_dataset=self.temp_dataset)).with_outputs(
"location_to_cleanup", main="files_to_read")
)

return (
sources_to_read
| SDFBoundedSourceReader()
| _PassThroughThenCleanup(beam.pvalue.AsIter(cleanup_locations)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need a reshuffle before this cleanup step to make sure that above read is stable in case of a failure (unless we already have a reshuffle inside one of these transforms).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fusion break comes form the side input from AsIter. The files to cleaned up are passed as a side input to a downstream transform, which only executes after the previous is 'committed'

Loading