Skip to content

Commit

Permalink
Clean up use of suppress_instrumentation in context and fix httpx bug (
Browse files Browse the repository at this point in the history
…#2061)

* Clean up use of suppress_instrumentation in context and fix httpx bug

* Clean up use of suppress_instrumentation in context and fix httpx bug

* changelog

* fix tests

* fix import

* fmt

* update dep

* lint

* remove unused imports

* apply lint

* Fix version for pika

* Fix lint

---------
  • Loading branch information
adriangb authored Dec 22, 2023
1 parent e5aa74f commit 9563ee7
Show file tree
Hide file tree
Showing 25 changed files with 175 additions and 301 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2002](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2002))
- `opentelemetry-instrument-grpc` Fix arity of context.abort for AIO RPCs
([#2066](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2066))
- Consolidate instrumentation suppression mechanisms and fix bug in httpx instrumentation
([#2061](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2061))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ classifiers = [
]
dependencies = [
"opentelemetry-api ~= 1.5",
"opentelemetry-instrumentation == 0.44b0.dev",
"wrapt >= 1.0.0, < 2.0.0",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder
from opentelemetry.instrumentation.aio_pika.utils import (
is_instrumentation_enabled,
)
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.trace import Span, Tracer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

from aio_pika.abc import AbstractChannel, AbstractMessage

from opentelemetry.instrumentation.aio_pika.utils import (
is_instrumentation_enabled,
)
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def response_hook(span: Span, params: typing.Union[
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
http_status_to_status_code,
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.propagate import inject
Expand Down Expand Up @@ -179,7 +179,7 @@ async def on_request_start(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestStartParams,
):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
trace_config_ctx.span = None
return

Expand Down Expand Up @@ -282,7 +282,7 @@ def _instrument(

# pylint:disable=unused-argument
def instrumented_init(wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return wrapped(*args, **kwargs)

client_trace_configs = list(kwargs.get("trace_configs") or [])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
from http_server_mock import HttpServerMock
from pkg_resources import iter_entry_points

from opentelemetry import context
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation import aiohttp_client
from opentelemetry.instrumentation.aiohttp_client import (
AioHttpClientInstrumentor,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import suppress_instrumentation
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import Span, StatusCode
Expand Down Expand Up @@ -512,25 +511,17 @@ async def uninstrument_request(server: aiohttp.test_utils.TestServer):
self.assert_spans(1)

def test_suppress_instrumentation(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
run_with_test_server(
self.get_default_request(), self.URL, self.default_handler
)
finally:
context.detach(token)
self.assert_spans(0)

@staticmethod
async def suppressed_request(server: aiohttp.test_utils.TestServer):
async with aiohttp.test_utils.TestClient(server) as client:
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
await client.get(TestAioHttpClientInstrumentor.URL)
context.detach(token)
with suppress_instrumentation():
await client.get(TestAioHttpClientInstrumentor.URL)

def test_suppress_instrumentation_after_creation(self):
run_with_test_server(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.propagators.textmap import CarrierT, Getter, Setter
Expand Down Expand Up @@ -218,7 +218,7 @@ def _create_processing_span(

def _wrap_send_message(self, sqs_class: type) -> None:
def send_wrapper(wrapped, instance, args, kwargs):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return wrapped(*args, **kwargs)
queue_url = kwargs.get("QueueUrl")
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
Expand Down Expand Up @@ -252,7 +252,7 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
# original exception
if (
context.get_value(_SUPPRESS_INSTRUMENTATION_KEY)
not is_instrumentation_enabled()
or not queue_url
or not entries
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ def response_hook(span, service_name, operation_name, result):
from botocore.exceptions import ClientError
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api

# FIXME: fix the importing of this private attribute when the location of the _SUPPRESS_HTTP_INSTRUMENTATION_KEY is defined.
from opentelemetry.context import _SUPPRESS_HTTP_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.botocore.extensions import _find_extension
from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
Expand All @@ -98,7 +94,8 @@ def response_hook(span, service_name, operation_name, result):
from opentelemetry.instrumentation.botocore.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
is_instrumentation_enabled,
suppress_http_instrumentation,
unwrap,
)
from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator
Expand Down Expand Up @@ -171,7 +168,7 @@ def _patched_endpoint_prepare_request(

# pylint: disable=too-many-branches
def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return original_func(*args, **kwargs)

call_context = _determine_call_context(instance, args)
Expand Down Expand Up @@ -200,25 +197,20 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
_safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context)

token = context_api.attach(
context_api.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True)
)

result = None
try:
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
raise
else:
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
with suppress_http_instrumentation():
result = None
try:
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
raise
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
finally:
context_api.detach(token)
_safe_invoke(extension.after_service_call)

self._call_response_hook(span, call_context, result)

return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@
)

from opentelemetry import trace as trace_api
from opentelemetry.context import (
_SUPPRESS_HTTP_INSTRUMENTATION_KEY,
attach,
detach,
set_value,
)
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import (
suppress_http_instrumentation,
suppress_instrumentation,
)
from opentelemetry.propagate import get_global_textmap, set_global_textmap
from opentelemetry.propagators.aws.aws_xray_propagator import TRACE_HEADER_KEY
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -341,23 +338,17 @@ def check_headers(**kwargs):
@mock_xray
def test_suppress_instrumentation_xray_client(self):
xray_client = self._make_client("xray")
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
with suppress_instrumentation():
xray_client.put_trace_segments(TraceSegmentDocuments=["str1"])
xray_client.put_trace_segments(TraceSegmentDocuments=["str2"])
finally:
detach(token)
self.assertEqual(0, len(self.get_finished_spans()))

@mock_xray
def test_suppress_http_instrumentation_xray_client(self):
xray_client = self._make_client("xray")
token = attach(set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True))
try:
with suppress_http_instrumentation():
xray_client.put_trace_segments(TraceSegmentDocuments=["str1"])
xray_client.put_trace_segments(TraceSegmentDocuments=["str2"])
finally:
detach(token)
self.assertEqual(2, len(self.get_finished_spans()))

@mock_s3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import grpc
from grpc.aio import ClientCallDetails

from opentelemetry import context
from opentelemetry.instrumentation.grpc._client import (
OpenTelemetryClientInterceptor,
_carrier_setter,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode
Expand Down Expand Up @@ -139,9 +138,10 @@ async def _wrap_stream_response(self, span, call):
span.end()

def tracing_skipped(self, client_call_details):
return context.get_value(
_SUPPRESS_INSTRUMENTATION_KEY
) or not self.rpc_matches_filters(client_call_details)
return (
not is_instrumentation_enabled()
or not self.rpc_matches_filters(client_call_details)
)

def rpc_matches_filters(self, client_call_details):
return self._filter is None or self._filter(client_call_details)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

import grpc

from opentelemetry import context, trace
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import grpcext
from opentelemetry.instrumentation.grpc._utilities import RpcInfo
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.propagate import inject
from opentelemetry.propagators.textmap import Setter
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -123,7 +123,7 @@ def _trace_result(self, span, rpc_info, result):
return result

def _intercept(self, request, metadata, client_info, invoker):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return invoker(request, metadata)

if not metadata:
Expand Down Expand Up @@ -219,7 +219,7 @@ def _intercept_server_stream(
def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return invoker(request_or_iterator, metadata)

if self._filter is not None and not self._filter(client_info):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ def run(self, result=None):
import pytest

import opentelemetry.instrumentation.grpc
from opentelemetry import context, trace
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import (
GrpcAioInstrumentorClient,
aio_client_interceptors,
)
from opentelemetry.instrumentation.grpc._aio_client import (
UnaryUnaryAioClientInterceptor,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import suppress_instrumentation
from opentelemetry.propagate import get_global_textmap, set_global_textmap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.mock_textmap import MockTextMapPropagator
Expand Down Expand Up @@ -314,53 +314,33 @@ async def test_client_interceptor_trace_context_propagation(self):
set_global_textmap(previous_propagator)

async def test_unary_unary_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
response = await simple_method(self._stub)
assert response.response_data == "data"

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)

async def test_unary_stream_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
async for response in server_streaming_method(self._stub):
self.assertEqual(response.response_data, "data")

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)

async def test_stream_unary_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
response = await client_streaming_method(self._stub)
assert response.response_data == "data"

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)

async def test_stream_stream_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
async for response in bidirectional_streaming_method(self._stub):
self.assertEqual(response.response_data, "data")

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)
Loading

0 comments on commit 9563ee7

Please sign in to comment.