From 3c564f8f82bc01d04b12170a0cb45430e082ce8c Mon Sep 17 00:00:00 2001 From: Benjamin Gonzalez <74670721+benWize@users.noreply.github.com> Date: Thu, 30 Sep 2021 17:46:14 -0500 Subject: [PATCH] [BEAM-11985] Python Bigtable - Implement IO Request Count metrics (#15562) Co-authored-by: Alik Rodriguez --- .../examples/cookbook/bigtableio_it_test.py | 4 +- .../apache_beam/internal/metrics/metric.py | 40 +++++ sdks/python/apache_beam/io/gcp/bigtableio.py | 58 +++++++- .../apache_beam/io/gcp/bigtableio_test.py | 137 ++++++++++++++++++ .../io/gcp/resource_identifiers.py | 5 + .../apache_beam/metrics/monitoring_infos.py | 5 + 6 files changed, 246 insertions(+), 3 deletions(-) create mode 100644 sdks/python/apache_beam/io/gcp/bigtableio_test.py diff --git a/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py index f12c9457d53f..ffe82caba1ac 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py @@ -179,8 +179,8 @@ def test_bigtable_write(self): with beam.Pipeline(options=pipeline_options) as pipeline: config_data = { 'project_id': self.project, - 'instance_id': self.instance, - 'table_id': self.table + 'instance_id': self.instance_id, + 'table_id': self.table_id } _ = ( pipeline diff --git a/sdks/python/apache_beam/internal/metrics/metric.py b/sdks/python/apache_beam/internal/metrics/metric.py index 0510674d5431..c149fe1ebebd 100644 --- a/sdks/python/apache_beam/internal/metrics/metric.py +++ b/sdks/python/apache_beam/internal/metrics/metric.py @@ -223,3 +223,43 @@ def convert_to_canonical_status_string(self, status): http_status_code in http_to_canonical_gcp_status): return http_to_canonical_gcp_status[http_status_code] return str(http_status_code) + + @staticmethod + def bigtable_error_code_to_grpc_status_string(grpc_status_code): + # type: (int) -> str + + """ + Converts the bigtable error code to a canonical GCP status code string. + + This Bigtable client library is not using the canonical http status code + values (i.e. https://cloud.google.com/apis/design/errors)" + Instead they are numbered using an enum with these values corresponding + to each status code: https://cloud.google.com/bigtable/docs/status-codes + + Args: + grpc_status_code: An int that corresponds to an enum of status codes + + Returns: + A GCP status code string + """ + grpc_to_canonical_gcp_status = { + 0: 'ok', + 1: 'cancelled', + 2: 'unknown', + 3: 'invalid_argument', + 4: 'deadline_exceeded', + 5: 'not_found', + 6: 'already_exists', + 7: 'permission_denied', + 8: 'resource_exhausted', + 9: 'failed_precondition', + 10: 'aborted', + 11: 'out_of_range', + 12: 'unimplemented', + 13: 'internal', + 14: 'unavailable' + } + if (grpc_status_code is not None and + grpc_status_code in grpc_to_canonical_gcp_status): + return grpc_to_canonical_gcp_status[grpc_status_code] + return str(grpc_status_code) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index 4887c11f5b67..0ae1f7c37f19 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -40,13 +40,38 @@ import logging import apache_beam as beam +from apache_beam.internal.metrics.metric import ServiceCallMetric +from apache_beam.io.gcp import resource_identifiers from apache_beam.metrics import Metrics +from apache_beam.metrics import monitoring_infos from apache_beam.transforms.display import DisplayDataItem _LOGGER = logging.getLogger(__name__) try: from google.cloud.bigtable import Client + from google.cloud.bigtable.batcher import MutationsBatcher + + FLUSH_COUNT = 1000 + MAX_ROW_BYTES = 5242880 # 5MB + + class _MutationsBatcher(MutationsBatcher): + def __init__( + self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES): + super(_MutationsBatcher, self).__init__(table, flush_count, max_row_bytes) + self.rows = [] + + def set_flush_callback(self, callback_fn): + self.callback_fn = callback_fn + + def flush(self): + if len(self.rows) != 0: + rows = self.table.mutate_rows(self.rows) + self.callback_fn(rows) + self.total_mutation_count = 0 + self.total_size = 0 + self.rows = [] + except ImportError: _LOGGER.warning( 'ImportError: from google.cloud.bigtable import Client', exc_info=True) @@ -78,6 +103,7 @@ def __init__(self, project_id, instance_id, table_id): } self.table = None self.batcher = None + self.service_call_metric = None self.written = Metrics.counter(self.__class__, 'Written Row') def __getstate__(self): @@ -87,14 +113,44 @@ def __setstate__(self, options): self.beam_options = options self.table = None self.batcher = None + self.service_call_metric = None self.written = Metrics.counter(self.__class__, 'Written Row') + def write_mutate_metrics(self, rows): + for status in rows: + grpc_status_string = ( + ServiceCallMetric.bigtable_error_code_to_grpc_status_string( + status.code)) + self.service_call_metric.call(grpc_status_string) + + def start_service_call_metrics(self, project_id, instance_id, table_id): + resource = resource_identifiers.BigtableTable( + project_id, instance_id, table_id) + labels = { + monitoring_infos.SERVICE_LABEL: 'BigTable', + # TODO(JIRA-11985): Add Ptransform label. + monitoring_infos.METHOD_LABEL: 'google.bigtable.v2.MutateRows', + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.BIGTABLE_PROJECT_ID_LABEL: ( + self.beam_options['project_id']), + monitoring_infos.INSTANCE_ID_LABEL: self.beam_options['instance_id'], + monitoring_infos.TABLE_ID_LABEL: self.beam_options['table_id'] + } + return ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) + def start_bundle(self): if self.table is None: client = Client(project=self.beam_options['project_id']) instance = client.instance(self.beam_options['instance_id']) self.table = instance.table(self.beam_options['table_id']) - self.batcher = self.table.mutations_batcher() + self.service_call_metric = self.start_service_call_metrics( + self.beam_options['project_id'], + self.beam_options['instance_id'], + self.beam_options['table_id']) + self.batcher = _MutationsBatcher(self.table) + self.batcher.set_flush_callback(self.write_mutate_metrics) def process(self, row): self.written.inc() diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_test.py new file mode 100644 index 000000000000..29703ea3df5d --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigtableio_test.py @@ -0,0 +1,137 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for BigTable service.""" + +# pytype: skip-file +import datetime +import string +import unittest +import uuid +from random import choice + +from mock import MagicMock +from mock import patch + +from apache_beam.internal.metrics.metric import ServiceCallMetric +from apache_beam.io.gcp import bigtableio +from apache_beam.io.gcp import resource_identifiers +from apache_beam.metrics import monitoring_infos +from apache_beam.metrics.execution import MetricsEnvironment + +# Protect against environments where bigtable library is not available. +try: + from google.cloud.bigtable import client, row + from google.cloud.bigtable.instance import Instance + from google.cloud.bigtable.table import Table + from google.rpc.code_pb2 import OK, ALREADY_EXISTS + from google.rpc.status_pb2 import Status +except ImportError as e: + client = None + + +@unittest.skipIf(client is None, 'Bigtable dependencies are not installed') +class TestWriteBigTable(unittest.TestCase): + TABLE_PREFIX = "python-test" + _PROJECT_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8] + _INSTANCE_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8] + _TABLE_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8] + + def setUp(self): + client = MagicMock() + instance = Instance(self._INSTANCE_ID, client) + self.table = Table(self._TABLE_ID, instance) + + def test_write_metrics(self): + MetricsEnvironment.process_wide_container().reset() + write_fn = bigtableio._BigTableWriteFn( + self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID) + write_fn.table = self.table + write_fn.start_bundle() + number_of_rows = 2 + error = Status() + error.message = 'Entity already exists.' + error.code = ALREADY_EXISTS + success = Status() + success.message = 'Success' + success.code = OK + rows_response = [error, success] * number_of_rows + with patch.object(Table, 'mutate_rows', return_value=rows_response): + direct_rows = [self.generate_row(i) for i in range(number_of_rows * 2)] + for direct_row in direct_rows: + write_fn.process(direct_row) + write_fn.finish_bundle() + self.verify_write_call_metric( + self._PROJECT_ID, + self._INSTANCE_ID, + self._TABLE_ID, + ServiceCallMetric.bigtable_error_code_to_grpc_status_string( + ALREADY_EXISTS), + 2) + self.verify_write_call_metric( + self._PROJECT_ID, + self._INSTANCE_ID, + self._TABLE_ID, + ServiceCallMetric.bigtable_error_code_to_grpc_status_string(OK), + 2) + + def generate_row(self, index=0): + rand = choice(string.ascii_letters + string.digits) + value = ''.join(rand for i in range(100)) + column_family_id = 'cf1' + key = "beam_key%s" % ('{0:07}'.format(index)) + direct_row = row.DirectRow(row_key=key) + for column_id in range(10): + direct_row.set_cell( + column_family_id, ('field%s' % column_id).encode('utf-8'), + value, + datetime.datetime.now()) + return direct_row + + def verify_write_call_metric( + self, project_id, instance_id, table_id, status, count): + """Check if a metric was recorded for the Datastore IO write API call.""" + process_wide_monitoring_infos = list( + MetricsEnvironment.process_wide_container(). + to_runner_api_monitoring_infos(None).values()) + resource = resource_identifiers.BigtableTable( + project_id, instance_id, table_id) + labels = { + monitoring_infos.SERVICE_LABEL: 'BigTable', + monitoring_infos.METHOD_LABEL: 'google.bigtable.v2.MutateRows', + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.BIGTABLE_PROJECT_ID_LABEL: project_id, + monitoring_infos.INSTANCE_ID_LABEL: instance_id, + monitoring_infos.TABLE_ID_LABEL: table_id, + monitoring_infos.STATUS_LABEL: status + } + expected_mi = monitoring_infos.int64_counter( + monitoring_infos.API_REQUEST_COUNT_URN, count, labels=labels) + expected_mi.ClearField("start_time") + + found = False + for actual_mi in process_wide_monitoring_infos: + actual_mi.ClearField("start_time") + if expected_mi == actual_mi: + found = True + break + self.assertTrue( + found, "Did not find write call metric with status: %s" % status) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/resource_identifiers.py b/sdks/python/apache_beam/io/gcp/resource_identifiers.py index a89a9e17a324..573d8ac5ed1b 100644 --- a/sdks/python/apache_beam/io/gcp/resource_identifiers.py +++ b/sdks/python/apache_beam/io/gcp/resource_identifiers.py @@ -42,3 +42,8 @@ def GoogleCloudStorageBucket(bucket_id): def DatastoreNamespace(project_id, namespace_id): return '//bigtable.googleapis.com/projects/%s/namespaces/%s' % ( project_id, namespace_id) + + +def BigtableTable(project_id, instance_id, table_id): + return '//bigtable.googleapis.com/projects/%s/instances/%s/tables/%s' % ( + project_id, instance_id, table_id) diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 2d8faa8fe535..2c909598f094 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -103,6 +103,11 @@ common_urns.monitoring_info_labels.DATASTORE_PROJECT.label_props.name) DATASTORE_NAMESPACE_LABEL = ( common_urns.monitoring_info_labels.DATASTORE_NAMESPACE.label_props.name) +BIGTABLE_PROJECT_ID_LABEL = ( + common_urns.monitoring_info_labels.BIGTABLE_PROJECT_ID.label_props.name) +INSTANCE_ID_LABEL = ( + common_urns.monitoring_info_labels.INSTANCE_ID.label_props.name) +TABLE_ID_LABEL = (common_urns.monitoring_info_labels.TABLE_ID.label_props.name) def extract_counter_value(monitoring_info_proto):