From af2197f8a65a2f5a69d7715c37e68ae4624de1fc Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Mon, 20 Nov 2023 17:42:41 +0100 Subject: [PATCH] Add instrumentation to content-app The instrumentation code from the upstream PR was brought down to pulpcore because of https://github.com/pypi/support/issues/3353. closes #3829 --- CHANGES/3829.feature | 1 + pulpcore/content/__init__.py | 4 +- pulpcore/content/instrumentation.py | 267 ++++++++++++++++++ pulpcore/tests/functional/__init__.py | 5 + .../pulp_file/test_telemetry_collection.py | 62 ++++ pulpcore/tests/functional/api/test_status.py | 5 - .../tests/functional/assets/otel_server.py | 1 + 7 files changed, 339 insertions(+), 6 deletions(-) create mode 100644 CHANGES/3829.feature create mode 100644 pulpcore/content/instrumentation.py create mode 100644 pulpcore/tests/functional/api/pulp_file/test_telemetry_collection.py diff --git a/CHANGES/3829.feature b/CHANGES/3829.feature new file mode 100644 index 00000000000..08bfcccc4a8 --- /dev/null +++ b/CHANGES/3829.feature @@ -0,0 +1 @@ +Added instrumentation to content-app to track telemetry data. diff --git a/pulpcore/content/__init__.py b/pulpcore/content/__init__.py index 4928dfd3df1..992a590db75 100644 --- a/pulpcore/content/__init__.py +++ b/pulpcore/content/__init__.py @@ -8,6 +8,8 @@ from asgiref.sync import sync_to_async from aiohttp import web +from .instrumentation import middleware as instrumentation + import django @@ -29,7 +31,7 @@ log = logging.getLogger(__name__) -app = web.Application(middlewares=[authenticate]) +app = web.Application(middlewares=[authenticate, instrumentation]) CONTENT_MODULE_NAME = "content" diff --git a/pulpcore/content/instrumentation.py b/pulpcore/content/instrumentation.py new file mode 100644 index 00000000000..2782beb9778 --- /dev/null +++ b/pulpcore/content/instrumentation.py @@ -0,0 +1,267 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# TODO: This is a copy of https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1800; +# it can be removed once the following issues will be resolved: +# 1. https://github.com/pypi/support/issues/3353 +# 2. https://github.com/open-telemetry/opentelemetry-python-contrib/issues/2053 + +import urllib +from aiohttp import web +from multidict import CIMultiDictProxy +from timeit import default_timer +from typing import Tuple, Dict, List, Union + +from opentelemetry import context, trace, metrics +from opentelemetry.context import _SUPPRESS_HTTP_INSTRUMENTATION_KEY +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import http_status_to_status_code +from opentelemetry.propagators.textmap import Getter +from opentelemetry.propagate import extract +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.semconv.metrics import MetricInstruments +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.http import get_excluded_urls +from opentelemetry.util.http import remove_url_credentials + +_duration_attrs = [ + SpanAttributes.HTTP_METHOD, + SpanAttributes.HTTP_HOST, + SpanAttributes.HTTP_SCHEME, + SpanAttributes.HTTP_STATUS_CODE, + SpanAttributes.HTTP_FLAVOR, + SpanAttributes.HTTP_SERVER_NAME, + SpanAttributes.NET_HOST_NAME, + SpanAttributes.NET_HOST_PORT, + SpanAttributes.HTTP_ROUTE, +] + +_active_requests_count_attrs = [ + SpanAttributes.HTTP_METHOD, + SpanAttributes.HTTP_HOST, + SpanAttributes.HTTP_SCHEME, + SpanAttributes.HTTP_FLAVOR, + SpanAttributes.HTTP_SERVER_NAME, +] + +__version__ = "0.42b0.dev" +_instruments = ("aiohttp ~= 3.0",) + +tracer = trace.get_tracer(__name__) +meter = metrics.get_meter(__name__, __version__) +_excluded_urls = get_excluded_urls("AIOHTTP_SERVER") + + +def _parse_duration_attrs(req_attrs): + duration_attrs = {} + for attr_key in _duration_attrs: + if req_attrs.get(attr_key) is not None: + duration_attrs[attr_key] = req_attrs[attr_key] + return duration_attrs + + +def _parse_active_request_count_attrs(req_attrs): + active_requests_count_attrs = {} + for attr_key in _active_requests_count_attrs: + if req_attrs.get(attr_key) is not None: + active_requests_count_attrs[attr_key] = req_attrs[attr_key] + return active_requests_count_attrs + + +def get_default_span_details(request: web.Request) -> Tuple[str, dict]: + """Default implementation for get_default_span_details + Args: + request: the request object itself. + Returns: + a tuple of the span name, and any attributes to attach to the span. + """ + span_name = request.path.strip() or f"HTTP {request.method}" + return span_name, {} + + +def _get_view_func(request: web.Request) -> str: + """Returns the name of the request handler. + Args: + request: the request object itself. + Returns: + a string containing the name of the handler function + """ + try: + return request.match_info.handler.__name__ + except AttributeError: + return "unknown" + + +def collect_request_attributes(request: web.Request) -> Dict: + """Collects HTTP request attributes from the ASGI scope and returns a + dictionary to be used as span creation attributes.""" + + server_host, port, http_url = ( + request.url.host, + request.url.port, + str(request.url), + ) + query_string = request.query_string + if query_string and http_url: + if isinstance(query_string, bytes): + query_string = query_string.decode("utf8") + http_url += "?" + urllib.parse.unquote(query_string) + + result = { + SpanAttributes.HTTP_SCHEME: request.scheme, + SpanAttributes.HTTP_HOST: server_host, + SpanAttributes.NET_HOST_PORT: port, + SpanAttributes.HTTP_ROUTE: _get_view_func(request), + SpanAttributes.HTTP_FLAVOR: f"{request.version.major}.{request.version.minor}", + SpanAttributes.HTTP_TARGET: request.path, + SpanAttributes.HTTP_URL: remove_url_credentials(http_url), + } + + http_method = request.method + if http_method: + result[SpanAttributes.HTTP_METHOD] = http_method + + http_host_value_list = [request.host] if type(request.host) is not list else request.host + if http_host_value_list: + result[SpanAttributes.HTTP_SERVER_NAME] = ",".join(http_host_value_list) + http_user_agent = request.headers.get("user-agent") + if http_user_agent: + result[SpanAttributes.HTTP_USER_AGENT] = http_user_agent + + # remove None values + result = {k: v for k, v in result.items() if v is not None} + + return result + + +def set_status_code(span, status_code: int) -> None: + """Adds HTTP response attributes to span using the status_code argument.""" + + try: + status_code = int(status_code) + except ValueError: + span.set_status( + Status( + StatusCode.ERROR, + "Non-integer HTTP status: " + repr(status_code), + ) + ) + else: + span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code) + span.set_status(Status(http_status_to_status_code(status_code, server_span=True))) + + +class AiohttpGetter(Getter): + """Extract current trace from headers""" + + def get(self, carrier, key: str) -> Union[List, None]: + """Getter implementation to retrieve an HTTP header value from the ASGI + scope. + + Args: + carrier: ASGI scope object + key: header name in scope + Returns: + A list of all header values matching the key, or None if the key + does not match any header. + """ + headers: CIMultiDictProxy = carrier.headers + if not headers: + return None + return headers.getall(key, None) + + def keys(self, carrier: Dict) -> List: + return list(carrier.keys()) + + +getter = AiohttpGetter() + + +@web.middleware +async def middleware(request, handler): + """Middleware for aiohttp implementing tracing logic""" + if ( + context.get_value("suppress_instrumentation") + or context.get_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY) + or _excluded_urls.url_disabled(request.url.path) + ): + return await handler(request) + + span_name, additional_attributes = get_default_span_details(request) + + req_attrs = collect_request_attributes(request) + duration_attrs = _parse_duration_attrs(req_attrs) + active_requests_count_attrs = _parse_active_request_count_attrs(req_attrs) + + duration_histogram = meter.create_histogram( + name=MetricInstruments.HTTP_SERVER_DURATION, + unit="ms", + description="measures the duration of the inbound HTTP request", + ) + + active_requests_counter = meter.create_up_down_counter( + name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS, + unit="requests", + description="measures the number of concurrent HTTP requests those are currently in flight", + ) + + with tracer.start_as_current_span( + span_name, + context=extract(request, getter=getter), + kind=trace.SpanKind.SERVER, + ) as span: + attributes = collect_request_attributes(request) + attributes.update(additional_attributes) + span.set_attributes(attributes) + start = default_timer() + active_requests_counter.add(1, active_requests_count_attrs) + try: + resp = await handler(request) + set_status_code(span, resp.status) + except web.HTTPException as ex: + set_status_code(span, ex.status_code) + raise + finally: + duration = max((default_timer() - start) * 1000, 0) + duration_histogram.record(duration, duration_attrs) + active_requests_counter.add(-1, active_requests_count_attrs) + return resp + + +class _InstrumentedApplication(web.Application): + """Insert tracing middleware""" + + def __init__(self, *args, **kwargs): + middlewares = kwargs.pop("middlewares", []) + middlewares.insert(0, middleware) + kwargs["middlewares"] = middlewares + super().__init__(*args, **kwargs) + + +class AioHttpServerInstrumentor(BaseInstrumentor): + # pylint: disable=protected-access,attribute-defined-outside-init + """An instrumentor for aiohttp.web.Application + + See `BaseInstrumentor` + """ + + def _instrument(self, **kwargs): + self._original_app = web.Application + setattr(web, "Application", _InstrumentedApplication) + + def _uninstrument(self, **kwargs): + setattr(web, "Application", self._original_app) + + def instrumentation_dependencies(self): + return _instruments diff --git a/pulpcore/tests/functional/__init__.py b/pulpcore/tests/functional/__init__.py index 37365716b15..9cb7c292819 100644 --- a/pulpcore/tests/functional/__init__.py +++ b/pulpcore/tests/functional/__init__.py @@ -448,6 +448,11 @@ async def _send_request(): return _received_otel_span +@pytest.fixture +def test_path(): + return os.getenv("PYTEST_CURRENT_TEST").split()[0] + + # Webserver Fixtures diff --git a/pulpcore/tests/functional/api/pulp_file/test_telemetry_collection.py b/pulpcore/tests/functional/api/pulp_file/test_telemetry_collection.py new file mode 100644 index 00000000000..92c0a581914 --- /dev/null +++ b/pulpcore/tests/functional/api/pulp_file/test_telemetry_collection.py @@ -0,0 +1,62 @@ +import requests +import uuid + +from urllib.parse import urljoin + +from pulpcore.client.pulp_file import FileFileDistribution, RepositoryAddRemoveContent + + +def test_get_requests( + file_distribution_api_client, + file_repository_api_client, + file_repo_with_auto_publish, + file_content_unit_with_name_factory, + gen_object_with_cleanup, + monitor_task, + received_otel_span, + test_path, +): + """Test if content-app correctly returns mime-types based on filenames.""" + files = { + "otel_test_file1.tar.gz": file_content_unit_with_name_factory("otel_test_file1.tar.gz"), + "otel_test_file2.xml.gz": file_content_unit_with_name_factory("otel_test_file2.xml.gz"), + "otel_test_file3.txt": file_content_unit_with_name_factory("otel_test_file3.txt"), + } + units_to_add = list(map(lambda f: f.pulp_href, files.values())) + data = RepositoryAddRemoveContent(add_content_units=units_to_add) + monitor_task( + file_repository_api_client.modify(file_repo_with_auto_publish.pulp_href, data).task + ) + + data = FileFileDistribution( + name=str(uuid.uuid4()), + base_path=str(uuid.uuid4()), + repository=file_repo_with_auto_publish.pulp_href, + ) + distribution = gen_object_with_cleanup(file_distribution_api_client, data) + + for file_name, content_unit in files.items(): + url = urljoin(distribution.base_url, content_unit.relative_path) + + s = requests.Session() + s.headers = {"User-Agent": test_path} + + s.get(url) + assert received_otel_span( + { + "http.method": "GET", + "http.target": f"/pulp/content/{distribution.base_path}/{file_name}", + "http.status_code": 200, + "http.user_agent": test_path, + } + ) + + s.head(url) + assert received_otel_span( + { + "http.method": "HEAD", + "http.target": f"/pulp/content/{distribution.base_path}/{file_name}", + "http.status_code": 200, + "http.user_agent": test_path, + } + ) diff --git a/pulpcore/tests/functional/api/test_status.py b/pulpcore/tests/functional/api/test_status.py index a01c3179c8b..2c48f4bbfba 100644 --- a/pulpcore/tests/functional/api/test_status.py +++ b/pulpcore/tests/functional/api/test_status.py @@ -58,11 +58,6 @@ } -@pytest.fixture -def test_path(): - return os.getenv("PYTEST_CURRENT_TEST").split()[0] - - @pytest.mark.parallel def test_get_authenticated(test_path, status_api_client, received_otel_span): """GET the status path with valid credentials. diff --git a/pulpcore/tests/functional/assets/otel_server.py b/pulpcore/tests/functional/assets/otel_server.py index cd19ca35559..ca6600f71e1 100644 --- a/pulpcore/tests/functional/assets/otel_server.py +++ b/pulpcore/tests/functional/assets/otel_server.py @@ -68,6 +68,7 @@ async def _traces_handler(request): for item in span.attributes } spans.append(attrs) + print(attrs) raise web.HTTPOk() async def _test_handler(request):