diff --git a/ext/opentelemetry-ext-grpc/CHANGELOG.md b/ext/opentelemetry-ext-grpc/CHANGELOG.md index fff83ab5a7b..5f272a80f8a 100644 --- a/ext/opentelemetry-ext-grpc/CHANGELOG.md +++ b/ext/opentelemetry-ext-grpc/CHANGELOG.md @@ -11,6 +11,8 @@ Released 2020-07-28 - Add gRPC client and server instrumentors ([788](https://github.com/open-telemetry/opentelemetry-python/pull/788)) +- Add metric recording (bytes in/out, errors, latency) to gRPC client + ## 0.8b0 Released 2020-05-27 diff --git a/ext/opentelemetry-ext-grpc/setup.cfg b/ext/opentelemetry-ext-grpc/setup.cfg index 0a4e4e5b679..c7dac4c3561 100644 --- a/ext/opentelemetry-ext-grpc/setup.cfg +++ b/ext/opentelemetry-ext-grpc/setup.cfg @@ -41,6 +41,7 @@ package_dir= packages=find_namespace: install_requires = opentelemetry-api == 0.12.dev0 + opentelemetry-sdk == 0.12.dev0 grpcio ~= 1.27 [options.extras_require] diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py index 368ae55f2e6..1a665662818 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py @@ -33,6 +33,10 @@ SimpleExportSpanProcessor, ) + from opentelemetry import metrics + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter + try: from .gen import helloworld_pb2, helloworld_pb2_grpc except ImportError: @@ -42,7 +46,12 @@ trace.get_tracer_provider().add_span_processor( SimpleExportSpanProcessor(ConsoleSpanExporter()) ) - instrumentor = GrpcInstrumentorClient() + + # Set meter provider to opentelemetry-sdk's MeterProvider + metrics.set_meter_provider(MeterProvider()) + + # Optional - export GRPC specific metrics (latency, bytes in/out, errors) by passing an exporter + instrumentor = GrpcInstrumentorClient(exporter=ConsoleMetricsExporter(), interval=10) instrumentor.instrument() def run(): @@ -109,6 +118,7 @@ def serve(): serve() """ from contextlib import contextmanager +from functools import partial import grpc from wrapt import wrap_function_wrapper as _wrap @@ -139,11 +149,21 @@ def wrapper_fn(self, original_func, instance, args, kwargs): class GrpcInstrumentorClient(BaseInstrumentor): def _instrument(self, **kwargs): + exporter = kwargs.get("exporter", None) + interval = kwargs.get("interval", 30) if kwargs.get("channel_type") == "secure": - _wrap("grpc", "secure_channel", self.wrapper_fn) + _wrap( + "grpc", + "secure_channel", + partial(self.wrapper_fn, exporter, interval), + ) else: - _wrap("grpc", "insecure_channel", self.wrapper_fn) + _wrap( + "grpc", + "insecure_channel", + partial(self.wrapper_fn, exporter, interval), + ) def _uninstrument(self, **kwargs): if kwargs.get("channel_type") == "secure": @@ -152,17 +172,28 @@ def _uninstrument(self, **kwargs): else: unwrap(grpc, "insecure_channel") - @contextmanager - def wrapper_fn(self, original_func, instance, args, kwargs): - with original_func(*args, **kwargs) as channel: - yield intercept_channel(channel, client_interceptor()) - - -def client_interceptor(tracer_provider=None): + def wrapper_fn( + self, exporter, interval, original_func, instance, args, kwargs + ): + channel = original_func(*args, **kwargs) + tracer_provider = kwargs.get("tracer_provider") + return intercept_channel( + channel, + client_interceptor( + tracer_provider=tracer_provider, + exporter=exporter, + interval=interval, + ), + ) + + +def client_interceptor(tracer_provider=None, exporter=None, interval=30): """Create a gRPC client channel interceptor. Args: tracer: The tracer to use to create client-side spans. + exporter: The exporter that will receive client metrics + interval: Time between every export call Returns: An invocation-side interceptor object. @@ -171,7 +202,7 @@ def client_interceptor(tracer_provider=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _client.OpenTelemetryClientInterceptor(tracer) + return _client.OpenTelemetryClientInterceptor(tracer, exporter, interval) def server_interceptor(tracer_provider=None): diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py index 373d8f345cf..028804f599c 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py @@ -24,11 +24,12 @@ import grpc -from opentelemetry import propagators, trace +from opentelemetry import metrics, propagators, trace +from opentelemetry.sdk.metrics.export.controller import PushController from opentelemetry.trace.status import Status, StatusCanonicalCode from . import grpcext -from ._utilities import RpcInfo +from ._utilities import RpcInfo, TimedMetricRecorder class _GuardedSpan: @@ -63,7 +64,7 @@ def append_metadata( propagators.inject(append_metadata, metadata) -def _make_future_done_callback(span, rpc_info): +def _make_future_done_callback(span, rpc_info, client_info, metrics_recorder): def callback(response_future): with span: code = response_future.code() @@ -72,6 +73,10 @@ def callback(response_future): return response = response_future.result() rpc_info.response = response + if "ByteSize" in dir(response): + metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) return callback @@ -79,21 +84,34 @@ def callback(response_future): class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): - def __init__(self, tracer): + def __init__(self, tracer, exporter, interval): self._tracer = tracer + self._meter = None + if exporter and interval: + self._meter = metrics.get_meter(__name__) + self.controller = PushController( + meter=self._meter, exporter=exporter, interval=interval + ) + self._metrics_recorder = TimedMetricRecorder(self._meter, "client") + def _start_span(self, method): return self._tracer.start_as_current_span( name=method, kind=trace.SpanKind.CLIENT ) # pylint:disable=no-self-use - def _trace_result(self, guarded_span, rpc_info, result): + def _trace_result(self, guarded_span, rpc_info, result, client_info): # If the RPC is called asynchronously, release the guard and add a # callback so that the span can be finished once the future is done. if isinstance(result, grpc.Future): result.add_done_callback( - _make_future_done_callback(guarded_span.release(), rpc_info) + _make_future_done_callback( + guarded_span.release(), + rpc_info, + client_info, + self._metrics_recorder, + ) ) return result response = result @@ -104,11 +122,24 @@ def _trace_result(self, guarded_span, rpc_info, result): if isinstance(result, tuple): response = result[0] rpc_info.response = response + + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) return result def _start_guarded_span(self, *args, **kwargs): return _GuardedSpan(self._start_span(*args, **kwargs)) + def _bytes_out_iterator_wrapper(self, iterator, client_info): + for request in iterator: + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + yield request + def intercept_unary(self, request, metadata, client_info, invoker): if not metadata: mutable_metadata = OrderedDict() @@ -116,25 +147,37 @@ def intercept_unary(self, request, metadata, client_info, invoker): mutable_metadata = OrderedDict(metadata) with self._start_guarded_span(client_info.full_method) as guarded_span: - _inject_span_context(mutable_metadata) - metadata = tuple(mutable_metadata.items()) - - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request, - ) - - try: - result = invoker(request, metadata) - except grpc.RpcError as exc: - guarded_span.generated_span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + + # If protobuf is used, we can record the bytes in/out. Otherwise, we have no way + # to get the size of the request/response properly, so don't record anything + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request, ) - raise - return self._trace_result(guarded_span, rpc_info, result) + try: + result = invoker(request, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + return self._trace_result( + guarded_span, rpc_info, result, client_info + ) # For RPCs that stream responses, the result can be a generator. To record # the span across the generated responses and detect any errors, we wrap @@ -148,25 +191,44 @@ def _intercept_server_stream( mutable_metadata = OrderedDict(metadata) with self._start_span(client_info.full_method) as span: - _inject_span_context(mutable_metadata) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - ) - if client_info.is_client_stream: - rpc_info.request = request_or_iterator - - try: - result = invoker(request_or_iterator, metadata) - for response in result: - yield response - except grpc.RpcError as exc: - span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, ) - raise + + if client_info.is_client_stream: + rpc_info.request = request_or_iterator + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info + ) + else: + if "ByteSize" in dir(request_or_iterator): + self._metrics_recorder.record_bytes_out( + request_or_iterator.ByteSize(), + client_info.full_method, + ) + + try: + result = invoker(request_or_iterator, metadata) + + # Rewrap the result stream into a generator, and record the bytes received + for response in result: + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + yield response + except grpc.RpcError as exc: + span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise def intercept_stream( self, request_or_iterator, metadata, client_info, invoker @@ -182,21 +244,32 @@ def intercept_stream( mutable_metadata = OrderedDict(metadata) with self._start_guarded_span(client_info.full_method) as guarded_span: - _inject_span_context(mutable_metadata) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request_or_iterator, - ) + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request_or_iterator, + ) + + rpc_info.request = request_or_iterator - try: - result = invoker(request_or_iterator, metadata) - except grpc.RpcError as exc: - guarded_span.generated_span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info ) - raise - return self._trace_result(guarded_span, rpc_info, result) + try: + result = invoker(request_or_iterator, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + return self._trace_result( + guarded_span, rpc_info, result, client_info + ) diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py index b6ff7d311a4..1dfe31ec995 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py @@ -14,6 +14,13 @@ """Internal utilities.""" +from contextlib import contextmanager +from time import time + +import grpc + +from opentelemetry.sdk.metrics import Counter, ValueRecorder + class RpcInfo: def __init__( @@ -31,3 +38,75 @@ def __init__( self.request = request self.response = response self.error = error + + +class TimedMetricRecorder: + def __init__(self, meter, span_kind): + self._meter = meter + service_name = "grpcio" + self._span_kind = span_kind + base_attributes = ["method"] + + if self._meter: + self._duration = self._meter.create_metric( + name="{}/{}/duration".format(service_name, span_kind), + description="Duration of grpc requests to the server", + unit="ms", + value_type=float, + metric_type=ValueRecorder, + label_keys=base_attributes + ["error", "status_code"], + ) + self._error_count = self._meter.create_metric( + name="{}/{}/errors".format(service_name, span_kind), + description="Number of errors that were returned from the server", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=base_attributes + ["status_code"], + ) + self._bytes_in = self._meter.create_metric( + name="{}/{}/bytes_in".format(service_name, span_kind), + description="Number of bytes received from the server", + unit="by", + value_type=int, + metric_type=Counter, + label_keys=base_attributes, + ) + self._bytes_out = self._meter.create_metric( + name="{}/{}/bytes_out".format(service_name, span_kind), + description="Number of bytes sent out through gRPC", + unit="by", + value_type=int, + metric_type=Counter, + label_keys=base_attributes, + ) + + def record_bytes_in(self, bytes_in, method): + if self._meter: + labels = {"method": method} + self._bytes_in.add(bytes_in, labels) + + def record_bytes_out(self, bytes_out, method): + if self._meter: + labels = {"method": method} + self._bytes_out.add(bytes_out, labels) + + @contextmanager + def record_latency(self, method): + start_time = time() + labels = {"method": method, "status_code": grpc.StatusCode.OK} + try: + yield labels + except grpc.RpcError as exc: + if self._meter: + # pylint: disable=no-member + labels["status_code"] = exc.code() + self._error_count.add(1, labels) + labels["error"] = True + raise + finally: + if self._meter: + if "error" not in labels: + labels["error"] = False + elapsed_time = (time() - start_time) * 1000 + self._duration.record(elapsed_time, labels) diff --git a/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py b/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py index 47dc9fa0bb6..a668f05ca79 100644 --- a/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py +++ b/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py @@ -15,10 +15,12 @@ import grpc import opentelemetry.ext.grpc -from opentelemetry import metrics, trace -from opentelemetry.ext.grpc import client_interceptor -from opentelemetry.ext.grpc.grpcext import intercept_channel -from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry import trace +from opentelemetry.ext.grpc import GrpcInstrumentorClient +from opentelemetry.sdk.metrics.export.aggregate import ( + MinMaxSumCountAggregator, + SumAggregator, +) from opentelemetry.test.test_base import TestBase from tests.protobuf import test_server_pb2_grpc @@ -34,24 +36,81 @@ class TestClientProto(TestBase): def setUp(self): super().setUp() + GrpcInstrumentorClient().instrument( + exporter=self.memory_metrics_exporter + ) self.server = create_test_server(25565) self.server.start() - meter = metrics.get_meter(__name__) - interceptor = client_interceptor() - self.channel = intercept_channel( - grpc.insecure_channel("localhost:25565"), interceptor - ) + self.channel = grpc.insecure_channel("localhost:25565") self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) - self._controller = PushController( - meter, self.memory_metrics_exporter, 30 - ) - def tearDown(self): super().tearDown() + GrpcInstrumentorClient().uninstrument() self.memory_metrics_exporter.clear() self.server.stop(None) + def _verify_success_records(self, num_bytes_out, num_bytes_in, method): + # pylint: disable=protected-access,no-member + self.channel._interceptor.controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + bytes_in = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/bytes_in": + bytes_in = record + + self.assertIsNotNone(bytes_out) + self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out") + self.assertEqual(bytes_out.labels, (("method", method),)) + + self.assertIsNotNone(bytes_in) + self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in") + self.assertEqual(bytes_in.labels, (("method", method),)) + + self.assertIsNotNone(duration) + self.assertEqual(duration.instrument.name, "grpcio/client/duration") + self.assertEqual( + duration.labels, + ( + ("error", False), + ("method", method), + ("status_code", grpc.StatusCode.OK), + ), + ) + + self.assertEqual(type(bytes_out.aggregator), SumAggregator) + self.assertEqual(type(bytes_in.aggregator), SumAggregator) + self.assertEqual(type(duration.aggregator), MinMaxSumCountAggregator) + + self.assertEqual(bytes_out.aggregator.checkpoint, num_bytes_out) + self.assertEqual(bytes_in.aggregator.checkpoint, num_bytes_in) + + self.assertEqual(duration.aggregator.checkpoint.count, 1) + self.assertGreaterEqual(duration.aggregator.checkpoint.sum, 0) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + + self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod") + def test_unary_stream(self): server_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -64,6 +123,10 @@ def test_unary_stream(self): # Check version and name in span's instrumentation info self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + self._verify_success_records( + 8, 40, "/GRPCTestServer/ServerStreamingMethod" + ) + def test_stream_unary(self): client_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -76,6 +139,10 @@ def test_stream_unary(self): # Check version and name in span's instrumentation info self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + self._verify_success_records( + 40, 8, "/GRPCTestServer/ClientStreamingMethod" + ) + def test_stream_stream(self): bidirectional_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -90,10 +157,57 @@ def test_stream_stream(self): # Check version and name in span's instrumentation info self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + self._verify_success_records( + 40, 40, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + + def _verify_error_records(self, method): + # pylint: disable=protected-access,no-member + self.channel._interceptor.controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + errors = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/errors": + errors = record + + self.assertIsNotNone(bytes_out) + self.assertIsNotNone(errors) + self.assertIsNotNone(duration) + + self.assertEqual(errors.instrument.name, "grpcio/client/errors") + self.assertEqual( + errors.labels, + ( + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + self.assertEqual(errors.aggregator.checkpoint, 1) + + self.assertEqual( + duration.labels, + ( + ("error", True), + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) + self._verify_error_records("/GRPCTestServer/SimpleMethod") + spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -106,6 +220,7 @@ def test_error_stream_unary(self): with self.assertRaises(grpc.RpcError): client_streaming_method(self._stub, error=True) + self._verify_error_records("/GRPCTestServer/ClientStreamingMethod") spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -118,6 +233,8 @@ def test_error_unary_stream(self): with self.assertRaises(grpc.RpcError): server_streaming_method(self._stub, error=True) + self._verify_error_records("/GRPCTestServer/ServerStreamingMethod") + spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -130,6 +247,10 @@ def test_error_stream_stream(self): with self.assertRaises(grpc.RpcError): bidirectional_streaming_method(self._stub, error=True) + self._verify_error_records( + "/GRPCTestServer/BidirectionalStreamingMethod" + ) + spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -137,3 +258,31 @@ def test_error_stream_stream(self): span.status.canonical_code.value, grpc.StatusCode.INVALID_ARGUMENT.value[0], ) + + +class TestClientNoMetrics(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient().instrument() + self.server = create_test_server(25565) + self.server.start() + self.channel = grpc.insecure_channel("localhost:25565") + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.memory_metrics_exporter.clear() + self.server.stop(None) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info(span, opentelemetry.ext.grpc)