Skip to content

Commit

Permalink
Add Lineage metrics to Python BigQueryIO (#32116)
Browse files Browse the repository at this point in the history
* Add Lineage metrics to Python BigQueryIO

* Introduce metric.Lineage StringSet wrapper
  Reflect Java SDK #32090

* Direct Read

* Export Read

* ReadAllFromBigQuery

* FILE_LOAD Write

* fix lint; add tests

* Consistent metrics name

* Update sdks/python/apache_beam/metrics/metric.py

Co-authored-by: Danny McCormick <[email protected]>

---------

Co-authored-by: Danny McCormick <[email protected]>
  • Loading branch information
Abacn and damccorm authored Aug 14, 2024
1 parent 7a4850d commit c23e603
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 14 deletions.
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def chain_after(result):
from apache_beam.io.iobase import SourceBundle
from apache_beam.io.textio import _TextSource as TextSource
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
Expand Down Expand Up @@ -809,6 +810,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.get(), project=self._get_project())
elif not self.table_reference.projectId:
self.table_reference.projectId = self._get_project()
Lineage.sources().add(
'bigquery',
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)

schema, metadata_list = self._export_files(bq)
self.export_result = _BigQueryExportResult(
Expand Down Expand Up @@ -1157,6 +1163,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
Lineage.sources().add(
"bigquery",
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)

if self.use_native_datetime:
requested_session.data_format = bq_storage.types.DataFormat.ARROW
Expand Down
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from apache_beam.io import filesystems as fs
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import trigger
Expand Down Expand Up @@ -564,6 +565,11 @@ def process_one(self, element, job_name_prefix):
write_disposition = self.write_disposition
wait_for_job = True
self._observed_tables.add(copy_to_reference.tableId)
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
else:
wait_for_job = False
write_disposition = 'WRITE_APPEND'
Expand Down Expand Up @@ -735,6 +741,12 @@ def process(
yield pvalue.TaggedOutput(
TriggerLoadJobs.TEMP_TABLES,
bigquery_tools.get_hashable_destination(table_reference))
else:
Lineage.sinks().add(
'bigquery',
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)

_LOGGER.info(
'Triggering job %s to load data to BigQuery table %s.'
Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
Expand Down Expand Up @@ -510,6 +511,9 @@ def test_load_job_id_used(self):
| "GetJobs" >> beam.Map(lambda x: x[1])

assert_that(jobs, equal_to([job_reference]), label='CheckJobProjectIds')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["bigquery:project1.dataset1.table1"]))

def test_load_job_id_use_for_copy_job(self):
destination = 'project1:dataset1.table1'
Expand Down Expand Up @@ -563,6 +567,9 @@ def test_load_job_id_use_for_copy_job(self):
job_reference
]),
label='CheckCopyJobProjectIds')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["bigquery:project1.dataset1.table1"]))

@mock.patch('time.sleep')
def test_wait_for_load_job_completion(self, sleep_mock):
Expand Down Expand Up @@ -725,6 +732,9 @@ def test_multiple_partition_files(self):
copy_jobs | "CountCopyJobs" >> combiners.Count.Globally(),
equal_to([6]),
label='CheckCopyJobCount')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["bigquery:project1.dataset1.table1"]))

@parameterized.expand([
param(write_disposition=BigQueryDisposition.WRITE_TRUNCATE),
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.textio import _TextSource
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import ValueProvider
Expand Down Expand Up @@ -261,6 +262,12 @@ def process(self,
for metadata in metadata_list:
yield self._create_source(metadata.path, schema)

Lineage.sources().add(
'bigquery',
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)

if element.query is not None:
self.bq._delete_table(
table_reference.projectId,
Expand Down
18 changes: 6 additions & 12 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ def test_bad_schema_public_api_export(self, get_table):
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="dataset.sample_table",
method="EXPORT",
project="project",
output_type='BEAM_ROW')
pipeline

@mock.patch.object(BigQueryWrapper, 'get_table')
def test_bad_schema_public_api_direct_read(self, get_table):
Expand All @@ -159,21 +158,19 @@ def test_bad_schema_public_api_direct_read(self, get_table):
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="dataset.sample_table",
method="DIRECT_READ",
project="project",
output_type='BEAM_ROW')
pipeline

def test_unsupported_value_provider(self):
with self.assertRaisesRegex(TypeError,
'ReadFromBigQuery: table must be of type string'
'; got ValueProvider instead'):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table=value_provider.ValueProvider(), output_type='BEAM_ROW')
pipeline

def test_unsupported_callable(self):
def filterTable(table):
Expand All @@ -185,35 +182,32 @@ def filterTable(table):
'ReadFromBigQuery: table must be of type string'
'; got a callable instead'):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table=res, output_type='BEAM_ROW')
pipeline

def test_unsupported_query_export(self):
with self.assertRaisesRegex(
ValueError,
"Both a query and an output type of 'BEAM_ROW' were specified. "
"'BEAM_ROW' is not currently supported with queries."):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="project:dataset.sample_table",
method="EXPORT",
query='SELECT name FROM dataset.sample_table',
output_type='BEAM_ROW')
pipeline

def test_unsupported_query_direct_read(self):
with self.assertRaisesRegex(
ValueError,
"Both a query and an output type of 'BEAM_ROW' were specified. "
"'BEAM_ROW' is not currently supported with queries."):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="project:dataset.sample_table",
method="DIRECT_READ",
query='SELECT name FROM dataset.sample_table',
output_type='BEAM_ROW')
pipeline

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.bigquery import _StreamToBigQuery
from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder
from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
Expand All @@ -61,6 +62,7 @@
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
Expand All @@ -85,9 +87,11 @@
from apitools.base.py.exceptions import HttpError
from apitools.base.py.exceptions import HttpForbiddenError
from google.cloud import bigquery as gcp_bigquery
from google.cloud import bigquery_storage_v1 as bq_storage
from google.api_core import exceptions
except ImportError:
gcp_bigquery = None
bq_storage = None
HttpError = None
HttpForbiddenError = None
exceptions = None
Expand Down Expand Up @@ -460,6 +464,8 @@ def test_create_temp_dataset_exception(self, exception_type, error_message):
self.assertIn(error_message, exc.exception.args[0])

@parameterized.expand([
# read without exception
param(responses=[], expected_retries=0),
# first attempt returns a Http 500 blank error and retries
# second attempt returns a Http 408 blank error and retries,
# third attempt passes
Expand Down Expand Up @@ -540,6 +546,9 @@ def store_callback(unused_request):
# metadata (numBytes), and once to retrieve the table's schema
# Any additional calls are retries
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set(["bigquery:project.dataset.table"]))

@parameterized.expand([
# first attempt returns a Http 429 with transient reason and retries
Expand Down Expand Up @@ -719,6 +728,40 @@ def test_read_export_exception(self, exception_type, error_message):
mock_query_job.assert_called()
self.assertIn(error_message, exc.exception.args[0])

def test_read_direct_lineage(self):
with mock.patch.object(bigquery_tools.BigQueryWrapper,
'_bigquery_client'),\
mock.patch.object(bq_storage.BigQueryReadClient,
'create_read_session'),\
beam.Pipeline() as p:

_ = p | ReadFromBigQuery(
method=ReadFromBigQuery.Method.DIRECT_READ,
table='project:dataset.table')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set(["bigquery:project.dataset.table"]))

def test_read_all_lineage(self):
with mock.patch.object(_BigQueryReadSplit, '_export_files') as export, \
beam.Pipeline() as p:

export.return_value = (None, [])

_ = (
p
| beam.Create([
beam.io.ReadFromBigQueryRequest(table='project1:dataset1.table1'),
beam.io.ReadFromBigQueryRequest(table='project2:dataset2.table2')
])
| beam.io.ReadAllFromBigQuery(gcs_location='gs://bucket/tmp'))
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set([
'bigquery:project1.dataset1.table1',
'bigquery:project2.dataset2.table2'
]))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySink(unittest.TestCase):
Expand Down
Loading

0 comments on commit c23e603

Please sign in to comment.