Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentor and auto instrumentation support for aiohttp #1075

Merged
merged 4 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Updating span name to match semantic conventions
([#972](https://github.com/open-telemetry/opentelemetry-python/pull/972))
- Add instrumentor and auto instrumentation support for aiohttp
([#1075](https://github.com/open-telemetry/opentelemetry-python/pull/1075))

## Version 0.12b0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,17 @@ package_dir=
=src
packages=find_namespace:
install_requires =
opentelemetry-api >= 0.12.dev0
opentelemetry-api == 0.13dev0
opentelemetry-instrumentation == 0.13dev0
aiohttp ~= 3.0
wrapt >= 1.0.0, < 2.0.0

[options.packages.find]
where = src

[options.extras_require]
test =

[options.entry_points]
opentelemetry_instrumentor =
aiohttp-client = opentelemetry.instrumentation.aiohttp_client:AioHttpClientInstrumentor
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,73 @@

Usage
-----
Explicitly instrumenting a single client session:

.. code:: python
.. code:: python

import aiohttp
from opentelemetry.instrumentation.aiohttp_client import (
create_trace_config,
url_path_span_name
)
import yarl
import aiohttp
from opentelemetry.instrumentation.aiohttp_client import (
create_trace_config,
url_path_span_name
)
import yarl

def strip_query_params(url: yarl.URL) -> str:
return str(url.with_query(None))
def strip_query_params(url: yarl.URL) -> str:
return str(url.with_query(None))

async with aiohttp.ClientSession(trace_configs=[create_trace_config(
# Remove all query params from the URL attribute on the span.
url_filter=strip_query_params,
# Use the URL's path as the span name.
span_name=url_path_span_name
)]) as session:
async with session.get(url) as response:
await response.text()
async with aiohttp.ClientSession(trace_configs=[create_trace_config(
# Remove all query params from the URL attribute on the span.
url_filter=strip_query_params,
# Use the URL's path as the span name.
span_name=url_path_span_name
)]) as session:
async with session.get(url) as response:
await response.text()

Instrumenting all client sessions:

.. code:: python

import aiohttp
from opentelemetry.instrumentation.aiohttp_client import (
AioHttpClientInstrumentor
)

# Enable instrumentation
AioHttpClientInstrumentor().instrument()

# Create a session and make an HTTP get request
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
await response.text()

API
---
"""

import contextlib
import socket
import types
import typing

import aiohttp
import wrapt

from opentelemetry import context as context_api
from opentelemetry import propagators, trace
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.utils import http_status_to_canonical_code
from opentelemetry.trace import SpanKind
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
http_status_to_canonical_code,
unwrap,
)
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.status import Status, StatusCanonicalCode

_UrlFilterT = typing.Optional[typing.Callable[[str], str]]
_SpanNameT = typing.Optional[
typing.Union[typing.Callable[[aiohttp.TraceRequestStartParams], str], str]
]


def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str:
"""Extract a span name from the request URL path.
Expand All @@ -73,12 +102,9 @@ def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str:


def create_trace_config(
url_filter: typing.Optional[typing.Callable[[str], str]] = None,
span_name: typing.Optional[
typing.Union[
typing.Callable[[aiohttp.TraceRequestStartParams], str], str
]
] = None,
url_filter: _UrlFilterT = None,
span_name: _SpanNameT = None,
tracer_provider: TracerProvider = None,
) -> aiohttp.TraceConfig:
"""Create an aiohttp-compatible trace configuration.

Expand All @@ -104,6 +130,7 @@ def create_trace_config(
such as API keys or user personal information.

:param str span_name: Override the default span name.
:param tracer_provider: optional TracerProvider from which to get a Tracer

:return: An object suitable for use with :py:class:`aiohttp.ClientSession`.
:rtype: :py:class:`aiohttp.TraceConfig`
Expand All @@ -113,7 +140,7 @@ def create_trace_config(
# Explicitly specify the type for the `span_name` param and rtype to work
# around this issue.

tracer = trace.get_tracer_provider().get_tracer(__name__, __version__)
tracer = get_tracer(__name__, __version__, tracer_provider)

def _end_trace(trace_config_ctx: types.SimpleNamespace):
context_api.detach(trace_config_ctx.token)
Expand All @@ -124,6 +151,10 @@ async def on_request_start(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestStartParams,
):
if context_api.get_value("suppress_instrumentation"):
trace_config_ctx.span = None
return

http_method = params.method.upper()
if trace_config_ctx.span_name is None:
request_span_name = "HTTP {}".format(http_method)
Expand Down Expand Up @@ -155,6 +186,9 @@ async def on_request_end(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestEndParams,
):
if trace_config_ctx.span is None:
return

trace_config_ctx.span.set_status(
Status(http_status_to_canonical_code(int(params.response.status)))
)
Expand All @@ -171,6 +205,9 @@ async def on_request_exception(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestExceptionParams,
):
if trace_config_ctx.span is None:
return

if isinstance(
params.exception,
(aiohttp.ServerTimeoutError, aiohttp.TooManyRedirects),
Expand All @@ -186,6 +223,7 @@ async def on_request_exception(
status = StatusCanonicalCode.UNAVAILABLE

trace_config_ctx.span.set_status(Status(status))
trace_config_ctx.span.record_exception(params.exception)
_end_trace(trace_config_ctx)

def _trace_config_ctx_factory(**kwargs):
Expand All @@ -203,3 +241,84 @@ def _trace_config_ctx_factory(**kwargs):
trace_config.on_request_exception.append(on_request_exception)

return trace_config


def _instrument(
tracer_provider: TracerProvider = None,
url_filter: _UrlFilterT = None,
span_name: _SpanNameT = None,
):
"""Enables tracing of all ClientSessions

When a ClientSession gets created a TraceConfig is automatically added to
the session's trace_configs.
"""
# pylint:disable=unused-argument
def instrumented_init(wrapped, instance, args, kwargs):
if context_api.get_value("suppress_instrumentation"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have this check in instrumented methods. Does this exist to prevent instrumenting libraries in the first place or to prevent generating spans for certain code paths? I think it's the later. In other words, I think we should still instrument the libraries but instrumentation itself should check for this for each request and skip generating spans in case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once AioHttpClientInstrumentor().instrument() is called, the library is always instrumented. this check just supresses instrumentation for a single ClientSession.
aiohttp already comes with built in hooks for instrumenting certain aspects. To make use of these hooks one has to pass one or more TraceConfig objects to the ClientSession constructor. The AioHttpClientInstrumentor piggybacks on this mechanism by wrapping the ClientSession constructor to automatically add a TraceConfig with the respective hooks every time a ClientSession is created. The suppress_instrumentation is to exclude single ClientSessions from being instrumented.

return wrapped(*args, **kwargs)

trace_configs = list(kwargs.get("trace_configs") or ())

trace_config = create_trace_config(
url_filter=url_filter,
span_name=span_name,
tracer_provider=tracer_provider,
)
trace_config.opentelemetry_aiohttp_trace_config = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something like trace_config.opentelemetry_aiohttp_instrumented is more descriptive?

trace_configs.append(trace_config)

kwargs["trace_configs"] = trace_configs
return wrapped(*args, **kwargs)

wrapt.wrap_function_wrapper(
aiohttp.ClientSession, "__init__", instrumented_init
)


def _uninstrument():
"""Disables instrumenting for all newly created ClientSessions"""
unwrap(aiohttp.ClientSession, "__init__")


def _uninstrument_session(client_session: aiohttp.ClientSession):
"""Disables instrumentation for the given ClientSession"""
# pylint: disable=protected-access
trace_configs = client_session._trace_configs
client_session._trace_configs = [
trace_config
for trace_config in trace_configs
if not hasattr(trace_config, "opentelemetry_aiohttp_trace_config")
]


class AioHttpClientInstrumentor(BaseInstrumentor):
"""An instrumentor for aiohttp client sessions

See `BaseInstrumentor`
"""

def _instrument(self, **kwargs):
"""Instruments aiohttp ClientSession

Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``url_filter``: A callback to process the requested URL prior to adding
it as a span attribute. This can be useful to remove sensitive data
such as API keys or user personal information.
``span_name``: Override the default span name.
"""
_instrument(
tracer_provider=kwargs.get("tracer_provider"),
url_filter=kwargs.get("url_filter"),
span_name=kwargs.get("span_name"),
)

def _uninstrument(self, **kwargs):
_uninstrument()

@staticmethod
def uninstrument_session(client_session: aiohttp.ClientSession):
"""Disables instrumentation for the given session"""
_uninstrument_session(client_session)
Loading