Skip to content

Commit

Permalink
Adding gRPC instrumentor (#788)
Browse files Browse the repository at this point in the history
  • Loading branch information
aravinsiva authored Jul 22, 2020
1 parent e9527da commit 5ff9600
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 3 deletions.
2 changes: 2 additions & 0 deletions ext/opentelemetry-ext-grpc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Add status code to gRPC client spans
([896](https://github.com/open-telemetry/opentelemetry-python/pull/896))
- Add gRPC client and server instrumentors
([788](https://github.com/open-telemetry/opentelemetry-python/pull/788))

## 0.8b0

Expand Down
5 changes: 5 additions & 0 deletions ext/opentelemetry-ext-grpc/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ test =

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
grpc_client = opentelemetry.ext.grpc:GrpcInstrumentorClient
grpc_server = opentelemetry.ext.grpc:GrpcInstrumentorServer
141 changes: 139 additions & 2 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,150 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
# pylint:disable=no-name-in-module
# pylint:disable=relative-beyond-top-level
# pylint:disable=import-error
# pylint:disable=no-self-use
"""
Usage Client
------------
.. code-block:: python
import logging
import grpc
from opentelemetry import trace
from opentelemetry.ext.grpc import GrpcInstrumentorClient, client_interceptor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(ConsoleSpanExporter())
)
instrumentor = GrpcInstrumentorClient()
instrumentor.instrument()
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))
print("Greeter client received: " + response.message)
if __name__ == "__main__":
logging.basicConfig()
run()
Usage Server
------------
.. code-block:: python
import logging
from concurrent import futures
import grpc
from opentelemetry import trace
from opentelemetry.ext.grpc import GrpcInstrumentorServer, server_interceptor
from opentelemetry.ext.grpc.grpcext import intercept_server
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(ConsoleSpanExporter())
)
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
def serve():
server = grpc.server(futures.ThreadPoolExecutor())
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig()
serve()
"""
from contextlib import contextmanager

import grpc
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry import trace
from opentelemetry.ext.grpc.grpcext import intercept_channel, intercept_server
from opentelemetry.ext.grpc.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
# pylint:disable=unused-argument
# isort:skip


class GrpcInstrumentorServer(BaseInstrumentor):
def _instrument(self, **kwargs):
_wrap("grpc", "server", self.wrapper_fn)

def _uninstrument(self, **kwargs):
unwrap(grpc, "server")

def wrapper_fn(self, original_func, instance, args, kwargs):
server = original_func(*args, **kwargs)
return intercept_server(server, server_interceptor())


class GrpcInstrumentorClient(BaseInstrumentor):
def _instrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
_wrap("grpc", "secure_channel", self.wrapper_fn)

else:
_wrap("grpc", "insecure_channel", self.wrapper_fn)

def _uninstrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
unwrap(grpc, "secure_channel")

else:
unwrap(grpc, "insecure_channel")

@contextmanager
def wrapper_fn(self, original_func, instance, args, kwargs):
with original_func(*args, **kwargs) as channel:
yield intercept_channel(channel, client_interceptor())


def client_interceptor(tracer_provider=None):
Expand Down
58 changes: 57 additions & 1 deletion ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import opentelemetry.ext.grpc
from opentelemetry import trace
from opentelemetry.ext.grpc import server_interceptor
from opentelemetry.ext.grpc import GrpcInstrumentorServer, server_interceptor
from opentelemetry.ext.grpc.grpcext import intercept_server
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.test.test_base import TestBase
Expand All @@ -49,6 +49,62 @@ def service(self, handler_call_details):


class TestOpenTelemetryServerInterceptor(TestBase):
def test_instrumentor(self):
def handler(request, context):
return b""

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
)

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))

port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

try:
server.start()
channel.unary_unary("test")(b"test")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "test")
self.assertIs(span.kind, trace.SpanKind.SERVER)
self.check_span_instrumentation_info(span, opentelemetry.ext.grpc)
grpc_server_instrumentor.uninstrument()

def test_uninstrument(self):
def handler(request, context):
return b""

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
grpc_server_instrumentor.uninstrument()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
)

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))

port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

try:
server.start()
channel.unary_unary("test")(b"test")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)

def test_create_span(self):
"""Check that the interceptor wraps calls with spans server-side."""

Expand Down

0 comments on commit 5ff9600

Please sign in to comment.