Skip to content

Commit

Permalink
[BEAM-11985] Python Bigtable - Implement IO Request Count metrics (#1…
Browse files Browse the repository at this point in the history
…5562)

Co-authored-by: Alik Rodriguez <[email protected]>
  • Loading branch information
benWize and AlikRodriguez authored Sep 30, 2021
1 parent 351f5e7 commit 3c564f8
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ def test_bigtable_write(self):
with beam.Pipeline(options=pipeline_options) as pipeline:
config_data = {
'project_id': self.project,
'instance_id': self.instance,
'table_id': self.table
'instance_id': self.instance_id,
'table_id': self.table_id
}
_ = (
pipeline
Expand Down
40 changes: 40 additions & 0 deletions sdks/python/apache_beam/internal/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,43 @@ def convert_to_canonical_status_string(self, status):
http_status_code in http_to_canonical_gcp_status):
return http_to_canonical_gcp_status[http_status_code]
return str(http_status_code)

@staticmethod
def bigtable_error_code_to_grpc_status_string(grpc_status_code):
# type: (int) -> str

"""
Converts the bigtable error code to a canonical GCP status code string.
This Bigtable client library is not using the canonical http status code
values (i.e. https://cloud.google.com/apis/design/errors)"
Instead they are numbered using an enum with these values corresponding
to each status code: https://cloud.google.com/bigtable/docs/status-codes
Args:
grpc_status_code: An int that corresponds to an enum of status codes
Returns:
A GCP status code string
"""
grpc_to_canonical_gcp_status = {
0: 'ok',
1: 'cancelled',
2: 'unknown',
3: 'invalid_argument',
4: 'deadline_exceeded',
5: 'not_found',
6: 'already_exists',
7: 'permission_denied',
8: 'resource_exhausted',
9: 'failed_precondition',
10: 'aborted',
11: 'out_of_range',
12: 'unimplemented',
13: 'internal',
14: 'unavailable'
}
if (grpc_status_code is not None and
grpc_status_code in grpc_to_canonical_gcp_status):
return grpc_to_canonical_gcp_status[grpc_status_code]
return str(grpc_status_code)
58 changes: 57 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,38 @@
import logging

import apache_beam as beam
from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import Metrics
from apache_beam.metrics import monitoring_infos
from apache_beam.transforms.display import DisplayDataItem

_LOGGER = logging.getLogger(__name__)

try:
from google.cloud.bigtable import Client
from google.cloud.bigtable.batcher import MutationsBatcher

FLUSH_COUNT = 1000
MAX_ROW_BYTES = 5242880 # 5MB

class _MutationsBatcher(MutationsBatcher):
def __init__(
self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
super(_MutationsBatcher, self).__init__(table, flush_count, max_row_bytes)
self.rows = []

def set_flush_callback(self, callback_fn):
self.callback_fn = callback_fn

def flush(self):
if len(self.rows) != 0:
rows = self.table.mutate_rows(self.rows)
self.callback_fn(rows)
self.total_mutation_count = 0
self.total_size = 0
self.rows = []

except ImportError:
_LOGGER.warning(
'ImportError: from google.cloud.bigtable import Client', exc_info=True)
Expand Down Expand Up @@ -78,6 +103,7 @@ def __init__(self, project_id, instance_id, table_id):
}
self.table = None
self.batcher = None
self.service_call_metric = None
self.written = Metrics.counter(self.__class__, 'Written Row')

def __getstate__(self):
Expand All @@ -87,14 +113,44 @@ def __setstate__(self, options):
self.beam_options = options
self.table = None
self.batcher = None
self.service_call_metric = None
self.written = Metrics.counter(self.__class__, 'Written Row')

def write_mutate_metrics(self, rows):
for status in rows:
grpc_status_string = (
ServiceCallMetric.bigtable_error_code_to_grpc_status_string(
status.code))
self.service_call_metric.call(grpc_status_string)

def start_service_call_metrics(self, project_id, instance_id, table_id):
resource = resource_identifiers.BigtableTable(
project_id, instance_id, table_id)
labels = {
monitoring_infos.SERVICE_LABEL: 'BigTable',
# TODO(JIRA-11985): Add Ptransform label.
monitoring_infos.METHOD_LABEL: 'google.bigtable.v2.MutateRows',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.BIGTABLE_PROJECT_ID_LABEL: (
self.beam_options['project_id']),
monitoring_infos.INSTANCE_ID_LABEL: self.beam_options['instance_id'],
monitoring_infos.TABLE_ID_LABEL: self.beam_options['table_id']
}
return ServiceCallMetric(
request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
base_labels=labels)

def start_bundle(self):
if self.table is None:
client = Client(project=self.beam_options['project_id'])
instance = client.instance(self.beam_options['instance_id'])
self.table = instance.table(self.beam_options['table_id'])
self.batcher = self.table.mutations_batcher()
self.service_call_metric = self.start_service_call_metrics(
self.beam_options['project_id'],
self.beam_options['instance_id'],
self.beam_options['table_id'])
self.batcher = _MutationsBatcher(self.table)
self.batcher.set_flush_callback(self.write_mutate_metrics)

def process(self, row):
self.written.inc()
Expand Down
137 changes: 137 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for BigTable service."""

# pytype: skip-file
import datetime
import string
import unittest
import uuid
from random import choice

from mock import MagicMock
from mock import patch

from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.gcp import bigtableio
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricsEnvironment

# Protect against environments where bigtable library is not available.
try:
from google.cloud.bigtable import client, row
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.table import Table
from google.rpc.code_pb2 import OK, ALREADY_EXISTS
from google.rpc.status_pb2 import Status
except ImportError as e:
client = None


@unittest.skipIf(client is None, 'Bigtable dependencies are not installed')
class TestWriteBigTable(unittest.TestCase):
TABLE_PREFIX = "python-test"
_PROJECT_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
_INSTANCE_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
_TABLE_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]

def setUp(self):
client = MagicMock()
instance = Instance(self._INSTANCE_ID, client)
self.table = Table(self._TABLE_ID, instance)

def test_write_metrics(self):
MetricsEnvironment.process_wide_container().reset()
write_fn = bigtableio._BigTableWriteFn(
self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID)
write_fn.table = self.table
write_fn.start_bundle()
number_of_rows = 2
error = Status()
error.message = 'Entity already exists.'
error.code = ALREADY_EXISTS
success = Status()
success.message = 'Success'
success.code = OK
rows_response = [error, success] * number_of_rows
with patch.object(Table, 'mutate_rows', return_value=rows_response):
direct_rows = [self.generate_row(i) for i in range(number_of_rows * 2)]
for direct_row in direct_rows:
write_fn.process(direct_row)
write_fn.finish_bundle()
self.verify_write_call_metric(
self._PROJECT_ID,
self._INSTANCE_ID,
self._TABLE_ID,
ServiceCallMetric.bigtable_error_code_to_grpc_status_string(
ALREADY_EXISTS),
2)
self.verify_write_call_metric(
self._PROJECT_ID,
self._INSTANCE_ID,
self._TABLE_ID,
ServiceCallMetric.bigtable_error_code_to_grpc_status_string(OK),
2)

def generate_row(self, index=0):
rand = choice(string.ascii_letters + string.digits)
value = ''.join(rand for i in range(100))
column_family_id = 'cf1'
key = "beam_key%s" % ('{0:07}'.format(index))
direct_row = row.DirectRow(row_key=key)
for column_id in range(10):
direct_row.set_cell(
column_family_id, ('field%s' % column_id).encode('utf-8'),
value,
datetime.datetime.now())
return direct_row

def verify_write_call_metric(
self, project_id, instance_id, table_id, status, count):
"""Check if a metric was recorded for the Datastore IO write API call."""
process_wide_monitoring_infos = list(
MetricsEnvironment.process_wide_container().
to_runner_api_monitoring_infos(None).values())
resource = resource_identifiers.BigtableTable(
project_id, instance_id, table_id)
labels = {
monitoring_infos.SERVICE_LABEL: 'BigTable',
monitoring_infos.METHOD_LABEL: 'google.bigtable.v2.MutateRows',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.BIGTABLE_PROJECT_ID_LABEL: project_id,
monitoring_infos.INSTANCE_ID_LABEL: instance_id,
monitoring_infos.TABLE_ID_LABEL: table_id,
monitoring_infos.STATUS_LABEL: status
}
expected_mi = monitoring_infos.int64_counter(
monitoring_infos.API_REQUEST_COUNT_URN, count, labels=labels)
expected_mi.ClearField("start_time")

found = False
for actual_mi in process_wide_monitoring_infos:
actual_mi.ClearField("start_time")
if expected_mi == actual_mi:
found = True
break
self.assertTrue(
found, "Did not find write call metric with status: %s" % status)


if __name__ == '__main__':
unittest.main()
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/gcp/resource_identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ def GoogleCloudStorageBucket(bucket_id):
def DatastoreNamespace(project_id, namespace_id):
return '//bigtable.googleapis.com/projects/%s/namespaces/%s' % (
project_id, namespace_id)


def BigtableTable(project_id, instance_id, table_id):
return '//bigtable.googleapis.com/projects/%s/instances/%s/tables/%s' % (
project_id, instance_id, table_id)
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@
common_urns.monitoring_info_labels.DATASTORE_PROJECT.label_props.name)
DATASTORE_NAMESPACE_LABEL = (
common_urns.monitoring_info_labels.DATASTORE_NAMESPACE.label_props.name)
BIGTABLE_PROJECT_ID_LABEL = (
common_urns.monitoring_info_labels.BIGTABLE_PROJECT_ID.label_props.name)
INSTANCE_ID_LABEL = (
common_urns.monitoring_info_labels.INSTANCE_ID.label_props.name)
TABLE_ID_LABEL = (common_urns.monitoring_info_labels.TABLE_ID.label_props.name)


def extract_counter_value(monitoring_info_proto):
Expand Down

0 comments on commit 3c564f8

Please sign in to comment.