Skip to content

Commit

Permalink
Remove obsolete and deprecated bigquery native read. (#23557)
Browse files Browse the repository at this point in the history
Defers to builtin read with a warning rather than raising an exception.
  • Loading branch information
robertwb authored Oct 11, 2022
1 parent 5595f40 commit 3ffdf8d
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 768 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* Decreased TextSource CPU utilization by 2.3x (Java) ([#23193](https://github.com/apache/beam/issues/23193)).
* Fixed bug when using SpannerIO with RuntimeValueProvider options (Java) ([#22146](https://github.com/apache/beam/issues/22146)).
* Fixed issue for unicode rendering on WriteToBigQuery ([#10785](https://github.com/apache/beam/issues/10785))
* Remove obsolete variant of BigQuery Read ([#23559](https://github.com/apache/beam/issues/23559)).
* Bumped google-cloud-spanner dependency version to 3.x for Python SDK ([#21198](https://github.com/apache/beam/issues/21198)).

## New Features / Improvements
Expand Down
173 changes: 20 additions & 153 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ def chain_after(result):
import random
import time
import uuid
import warnings
from dataclasses import dataclass
from typing import Dict
from typing import List
Expand Down Expand Up @@ -482,11 +483,6 @@ def RowAsDictJsonCoder(*args, **kwargs):
return bigquery_tools.RowAsDictJsonCoder(*args, **kwargs)


@deprecated(since='2.11.0', current="bigquery_tools.BigQueryReader")
def BigQueryReader(*args, **kwargs):
return bigquery_tools.BigQueryReader(*args, **kwargs)


@deprecated(since='2.11.0', current="bigquery_tools.BigQueryWriter")
def BigQueryWriter(*args, **kwargs):
return bigquery_tools.BigQueryWriter(*args, **kwargs)
Expand Down Expand Up @@ -591,158 +587,29 @@ def BigQuerySource(
kms_key=None,
use_dataflow_native_source=False):
if use_dataflow_native_source:
return _BigQuerySource(
table,
dataset,
project,
query,
validate,
coder,
use_standard_sql,
flatten_results,
kms_key)
else:
return ReadFromBigQuery(
table=table,
dataset=dataset,
project=project,
query=query,
validate=validate,
coder=coder,
use_standard_sql=use_standard_sql,
flatten_results=flatten_results,
use_json_exports=True,
kms_key=kms_key)
warnings.warn(
"Native sources no longer implemented; "
"falling back to standard Beam source.")
return ReadFromBigQuery(
table=table,
dataset=dataset,
project=project,
query=query,
validate=validate,
coder=coder,
use_standard_sql=use_standard_sql,
flatten_results=flatten_results,
use_json_exports=True,
kms_key=kms_key)


@deprecated(since='2.25.0', current="ReadFromBigQuery")
class _BigQuerySource(dataflow_io.NativeSource):
def _BigQuerySource(*args, **kwargs):
"""A source based on a BigQuery table."""
def __init__(
self,
table=None,
dataset=None,
project=None,
query=None,
validate=False,
coder=None,
use_standard_sql=False,
flatten_results=True,
kms_key=None,
temp_dataset=None):
"""Initialize a :class:`BigQuerySource`.
Args:
table (str): The ID of a BigQuery table. If specified all data of the
table will be used as input of the current source. The ID must contain
only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores
``_``. If dataset and query arguments are :data:`None` then the table
argument must contain the entire table reference specified as:
``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.
dataset (str): The ID of the dataset containing this table or
:data:`None` if the table reference is specified entirely by the table
argument or a query is specified.
project (str): The ID of the project containing this table or
:data:`None` if the table reference is specified entirely by the table
argument or a query is specified.
query (str): A query to be used instead of arguments table, dataset, and
project.
validate (bool): If :data:`True`, various checks will be done when source
gets initialized (e.g., is table present?). This should be
:data:`True` for most scenarios in order to catch errors as early as
possible (pipeline construction instead of pipeline execution). It
should be :data:`False` if the table is created during pipeline
execution by a previous step.
coder (~apache_beam.coders.coders.Coder): The coder for the table
rows if serialized to disk. If :data:`None`, then the default coder is
:class:`~apache_beam.io.gcp.bigquery_tools.RowAsDictJsonCoder`,
which will interpret every line in a file as a JSON serialized
dictionary. This argument needs a value only in special cases when
returning table rows as dictionaries is not desirable.
use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
dialect for this query. The default value is :data:`False`.
If set to :data:`True`, the query will use BigQuery's updated SQL
dialect with improved standards compliance.
This parameter is ignored for table inputs.
flatten_results (bool): Flattens all nested and repeated fields in the
query results. The default value is :data:`True`.
kms_key (str): Optional Cloud KMS key name for use when creating new
tables.
temp_dataset (``google.cloud.bigquery.dataset.DatasetReference``):
The dataset in which to create temporary tables when performing file
loads. By default, a new dataset is created in the execution project for
temporary tables.
Raises:
ValueError: if any of the following is true:
1) the table reference as a string does not match the expected format
2) neither a table nor a query is specified
3) both a table and a query is specified.
"""

# Import here to avoid adding the dependency for local running scenarios.
try:
# pylint: disable=wrong-import-order, wrong-import-position
from apitools.base import py # pylint: disable=unused-import
except ImportError:
raise ImportError(
'Google Cloud IO not available, '
'please install apache_beam[gcp]')

if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif table is None and query is None:
raise ValueError('A BigQuery table or a query must be specified')
elif table is not None:
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
self.query = None
self.use_legacy_sql = True
else:
self.query = query
# TODO(BEAM-1082): Change the internal flag to be standard_sql
self.use_legacy_sql = not use_standard_sql
self.table_reference = None

self.validate = validate
self.flatten_results = flatten_results
self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
self.kms_key = kms_key
self.temp_dataset = temp_dataset

def display_data(self):
if self.query is not None:
res = {'query': DisplayDataItem(self.query, label='Query')}
else:
if self.table_reference.projectId is not None:
tableSpec = '{}:{}.{}'.format(
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
else:
tableSpec = '{}.{}'.format(
self.table_reference.datasetId, self.table_reference.tableId)
res = {'table': DisplayDataItem(tableSpec, label='Table')}

res['validation'] = DisplayDataItem(
self.validate, label='Validation Enabled')
return res

@property
def format(self):
"""Source format name required for remote execution."""
return 'bigquery'

def reader(self, test_bigquery_client=None):
return bigquery_tools.BigQueryReader(
source=self,
test_bigquery_client=test_bigquery_client,
use_legacy_sql=self.use_legacy_sql,
flatten_results=self.flatten_results,
kms_key=self.kms_key)
warnings.warn(
"Native sources no longer implemented; "
"falling back to standard Beam source.")
return ReadFromBigQuery(*args, **kwargs)


# TODO(https://github.com/apache/beam/issues/21622): remove the serialization
Expand Down
114 changes: 0 additions & 114 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,120 +187,6 @@ def test_invalid_json_neg_inf(self):
self.json_compliance_exception(float('-inf'))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySource(unittest.TestCase):
def test_display_data_item_on_validate_true(self):
source = beam.io.BigQuerySource(
'dataset.table', validate=True, use_dataflow_native_source=True)

dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', True),
DisplayDataItemMatcher('table', 'dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

def test_table_reference_display_data(self):
source = beam.io.BigQuerySource(
'dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

source = beam.io.BigQuerySource(
'project:dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'project:dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

source = beam.io.BigQuerySource(
'xyz.com:project:dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'xyz.com:project:dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

def test_parse_table_reference(self):
source = beam.io.BigQuerySource(
'dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')

source = beam.io.BigQuerySource(
'project:dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.projectId, 'project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')

source = beam.io.BigQuerySource(
'xyz.com:project:dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.projectId, 'xyz.com:project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')

source = beam.io.BigQuerySource(
query='my_query', use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)
self.assertTrue(source.use_legacy_sql)

def test_query_only_display_data(self):
source = beam.io.BigQuerySource(
query='my_query', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('query', 'my_query')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

def test_specify_query_sql_format(self):
source = beam.io.BigQuerySource(
query='my_query',
use_standard_sql=True,
use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertFalse(source.use_legacy_sql)

def test_specify_query_flattened_records(self):
source = beam.io.BigQuerySource(
query='my_query',
flatten_results=False,
use_dataflow_native_source=True)
self.assertFalse(source.flatten_results)

def test_specify_query_unflattened_records(self):
source = beam.io.BigQuerySource(
query='my_query', flatten_results=True, use_dataflow_native_source=True)
self.assertTrue(source.flatten_results)

def test_specify_query_without_table(self):
source = beam.io.BigQuerySource(
query='my_query', use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)

def test_date_partitioned_table_name(self):
source = beam.io.BigQuerySource(
'dataset.table$20030102',
validate=True,
use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', True),
DisplayDataItemMatcher('table', 'dataset.table$20030102')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestJsonToDictCoder(unittest.TestCase):
@staticmethod
Expand Down
Loading

0 comments on commit 3ffdf8d

Please sign in to comment.