Skip to content

Commit

Permalink
adding grpc integration
Browse files Browse the repository at this point in the history
  • Loading branch information
aravinsiva committed Jun 12, 2020
1 parent 17741ab commit eebd8fd
Showing 1 changed file with 37 additions and 16 deletions.
53 changes: 37 additions & 16 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,70 @@
# pylint:disable=import-self
# pylint:disable=no-name-in-module
# pylint:disable=relative-beyond-top-level
from contextlib import contextmanager

import grpc
from concurrent import futures
from opentelemetry import trace
from opentelemetry.ext.grpc.version import __version__
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.trace import get_tracer
from wrapt import ObjectProxy
from wrapt import wrap_function_wrapper as _wrap
from opentelemetry.ext.grpc.grpcext import intercept_channel, intercept_server

import pdb
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
)


class GrpcInstrumentorServer(BaseInstrumentor):
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer_provider()
server = None

def _instrument(self, **kwargs):
# tracer = self.get_trace(kwargs)

_wrap(grpc, 'server', server_interceptor(tracer_provider=get_tracer_provider(**kwargs)))
_wrap(grpc, 'secure_channel', server_interceptor(tracer_provider=get_tracer_provider(**kwargs)))
self.server = grpc.server(futures.ThreadPoolExecutor())
self.server = intercept_server(self.server, server_interceptor())

def _uninstrument(self, **kwargs):
_unwrap(grpc, 'server')
if hasattr(self.server, 'stop'):
return self.server.stop(kwargs.get('grace'))


class GrpcInstrumentorClient(BaseInstrumentor):
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer_provider()
channel = None

def _instrument(self, **kwargs):
# tracer = get_tracer_provider(kwargs)
_wrap(grpc, 'insecure_channel', client_interceptor(tracer_provider=get_tracer_provider(**kwargs)))
_wrap(grpc, 'secure_channel', client_interceptor(tracer_provider=get_tracer_provider(**kwargs)))
hostport = kwargs.get('hostport')

if kwargs.get('channel_type') == 'secure':
self.channel = secure_channel_wrapper(hostport, kwargs.get("credentials"))

else:
self.channel = insecure_channel_wrapper(hostport)

def _uninstrument(self, **kwargs):
_unwrap(grpc, 'secure_channel')
_unwrap(grpc, 'insecure_channel')

if hasattr(self.channel, 'close'):
return self.channel.close()

def _unwrap(obj, attr):
func = getattr(obj, attr, None)

if func and isinstance(func, ObjectProxy) and hasattr(func, "__wrapped__"):
setattr(obj, attr, func.__wrapped__)
@contextmanager
def insecure_channel_wrapper(hostport):
with grpc.insecure_channel(hostport) as channel:
yield intercept_channel(channel, client_interceptor())


def get_tracer_provider(**kwargs):
return kwargs.get("tracer_provider")
@contextmanager
def secure_channel_wrapper(hostport, credentials):
with grpc.secure_channel(hostport, credentials) as channel:
yield intercept_channel(channel, client_interceptor())


def client_interceptor(tracer_provider=None):
Expand Down

0 comments on commit eebd8fd

Please sign in to comment.