diff --git a/sentry_sdk/hub.py b/sentry_sdk/hub.py index df9de10fe4..6757b24b77 100644 --- a/sentry_sdk/hub.py +++ b/sentry_sdk/hub.py @@ -8,6 +8,7 @@ from sentry_sdk.consts import INSTRUMENTER from sentry_sdk.scope import Scope from sentry_sdk.client import Client +from sentry_sdk.profiler import Profile from sentry_sdk.tracing import NoOpSpan, Span, Transaction from sentry_sdk.session import Session from sentry_sdk.utils import ( @@ -548,6 +549,9 @@ def start_transaction( sampling_context.update(custom_sampling_context) transaction._set_initial_sampling_decision(sampling_context=sampling_context) + profile = Profile(transaction, hub=self) + profile._set_initial_sampling_decision(sampling_context=sampling_context) + # we don't bother to keep spans if we already know we're not going to # send the transaction if transaction.sampled: diff --git a/sentry_sdk/integrations/asgi.py b/sentry_sdk/integrations/asgi.py index c84e5ba454..6952957618 100644 --- a/sentry_sdk/integrations/asgi.py +++ b/sentry_sdk/integrations/asgi.py @@ -14,7 +14,6 @@ from sentry_sdk.hub import Hub, _should_send_default_pii from sentry_sdk.integrations._wsgi_common import _filter_headers from sentry_sdk.integrations.modules import _get_installed_modules -from sentry_sdk.profiler import start_profiling from sentry_sdk.sessions import auto_session_tracking from sentry_sdk.tracing import ( SOURCE_FOR_STYLE, @@ -176,7 +175,7 @@ async def _run_app(self, scope, callback): with hub.start_transaction( transaction, custom_sampling_context={"asgi_scope": scope} - ), start_profiling(transaction, hub): + ): # XXX: Would be cool to have correct span status, but we # would have to wrap send(). That is a bit hard to do with # the current abstraction over ASGI 2/3. diff --git a/sentry_sdk/integrations/django/asgi.py b/sentry_sdk/integrations/django/asgi.py index 955d8d19e8..721b2444cf 100644 --- a/sentry_sdk/integrations/django/asgi.py +++ b/sentry_sdk/integrations/django/asgi.py @@ -7,7 +7,6 @@ """ import asyncio -import threading from sentry_sdk import Hub, _functools from sentry_sdk._types import MYPY @@ -92,7 +91,7 @@ async def sentry_wrapped_callback(request, *args, **kwargs): with hub.configure_scope() as sentry_scope: if sentry_scope.profile is not None: - sentry_scope.profile.active_thread_id = threading.current_thread().ident + sentry_scope.profile.update_active_thread_id() with hub.start_span( op=OP.VIEW_RENDER, description=request.resolver_match.view_name diff --git a/sentry_sdk/integrations/django/views.py b/sentry_sdk/integrations/django/views.py index 735822aa72..6c03b33edb 100644 --- a/sentry_sdk/integrations/django/views.py +++ b/sentry_sdk/integrations/django/views.py @@ -1,5 +1,3 @@ -import threading - from sentry_sdk.consts import OP from sentry_sdk.hub import Hub from sentry_sdk._types import MYPY @@ -79,7 +77,7 @@ def sentry_wrapped_callback(request, *args, **kwargs): # set the active thread id to the handler thread for sync views # this isn't necessary for async views since that runs on main if sentry_scope.profile is not None: - sentry_scope.profile.active_thread_id = threading.current_thread().ident + sentry_scope.profile.update_active_thread_id() with hub.start_span( op=OP.VIEW_RENDER, description=request.resolver_match.view_name diff --git a/sentry_sdk/integrations/fastapi.py b/sentry_sdk/integrations/fastapi.py index 8bbf32eeff..32c511d74a 100644 --- a/sentry_sdk/integrations/fastapi.py +++ b/sentry_sdk/integrations/fastapi.py @@ -1,5 +1,4 @@ import asyncio -import threading from sentry_sdk._types import MYPY from sentry_sdk.hub import Hub, _should_send_default_pii @@ -78,9 +77,7 @@ def _sentry_call(*args, **kwargs): hub = Hub.current with hub.configure_scope() as sentry_scope: if sentry_scope.profile is not None: - sentry_scope.profile.active_thread_id = ( - threading.current_thread().ident - ) + sentry_scope.profile.update_active_thread_id() return old_call(*args, **kwargs) dependant.call = _sentry_call diff --git a/sentry_sdk/integrations/starlette.py b/sentry_sdk/integrations/starlette.py index aec194a779..7b213f186b 100644 --- a/sentry_sdk/integrations/starlette.py +++ b/sentry_sdk/integrations/starlette.py @@ -2,7 +2,6 @@ import asyncio import functools -import threading from sentry_sdk._compat import iteritems from sentry_sdk._types import MYPY @@ -413,9 +412,7 @@ def _sentry_sync_func(*args, **kwargs): with hub.configure_scope() as sentry_scope: if sentry_scope.profile is not None: - sentry_scope.profile.active_thread_id = ( - threading.current_thread().ident - ) + sentry_scope.profile.update_active_thread_id() request = args[0] diff --git a/sentry_sdk/integrations/wsgi.py b/sentry_sdk/integrations/wsgi.py index 03ce665489..f8b41dc12c 100644 --- a/sentry_sdk/integrations/wsgi.py +++ b/sentry_sdk/integrations/wsgi.py @@ -12,7 +12,6 @@ from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_ROUTE from sentry_sdk.sessions import auto_session_tracking from sentry_sdk.integrations._wsgi_common import _filter_headers -from sentry_sdk.profiler import start_profiling from sentry_sdk._types import MYPY @@ -132,7 +131,7 @@ def __call__(self, environ, start_response): with hub.start_transaction( transaction, custom_sampling_context={"wsgi_environ": environ} - ), start_profiling(transaction, hub): + ): try: rv = self.app( environ, diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index 0ce44a031b..3277cebde4 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -21,7 +21,6 @@ import time import uuid from collections import deque -from contextlib import contextmanager import sentry_sdk from sentry_sdk._compat import PY33, PY311 @@ -39,14 +38,15 @@ from typing import Callable from typing import Deque from typing import Dict - from typing import Generator from typing import List from typing import Optional from typing import Set from typing import Sequence from typing import Tuple from typing_extensions import TypedDict + import sentry_sdk.tracing + from sentry_sdk._types import SamplingContext ThreadId = str @@ -108,6 +108,7 @@ {"profile_id": str}, ) + try: from gevent.monkey import is_module_patched # type: ignore except ImportError: @@ -118,12 +119,25 @@ def is_module_patched(*args, **kwargs): return False +try: + from gevent import get_hub as get_gevent_hub # type: ignore +except ImportError: + + def get_gevent_hub(): + # type: () -> Any + return None + + +def is_gevent(): + # type: () -> bool + return is_module_patched("threading") or is_module_patched("_thread") + + _scheduler = None # type: Optional[Scheduler] def setup_profiler(options): # type: (Dict[str, Any]) -> None - """ `buffer_secs` determines the max time a sample will be buffered for `frequency` determines the number of samples to take per second (Hz) @@ -141,7 +155,7 @@ def setup_profiler(options): frequency = 101 - if is_module_patched("threading") or is_module_patched("_thread"): + if is_gevent(): # If gevent has patched the threading modules then we cannot rely on # them to spawn a native thread for sampling. # Instead we default to the GeventScheduler which is capable of @@ -333,22 +347,80 @@ def get_frame_name(frame): MAX_PROFILE_DURATION_NS = int(3e10) # 30 seconds +def get_current_thread_id(thread=None): + # type: (Optional[threading.Thread]) -> Optional[int] + """ + Try to get the id of the current thread, with various fall backs. + """ + + # if a thread is specified, that takes priority + if thread is not None: + try: + thread_id = thread.ident + if thread_id is not None: + return thread_id + except AttributeError: + pass + + # if the app is using gevent, we should look at the gevent hub first + # as the id there differs from what the threading module reports + if is_gevent(): + gevent_hub = get_gevent_hub() + if gevent_hub is not None: + try: + # this is undocumented, so wrap it in try except to be safe + return gevent_hub.thread_ident + except AttributeError: + pass + + # use the current thread's id if possible + try: + current_thread_id = threading.current_thread().ident + if current_thread_id is not None: + return current_thread_id + except AttributeError: + pass + + # if we can't get the current thread id, fall back to the main thread id + try: + main_thread_id = threading.main_thread().ident + if main_thread_id is not None: + return main_thread_id + except AttributeError: + pass + + # we've tried everything, time to give up + return None + + class Profile(object): def __init__( self, - scheduler, # type: Scheduler transaction, # type: sentry_sdk.tracing.Transaction hub=None, # type: Optional[sentry_sdk.Hub] + scheduler=None, # type: Optional[Scheduler] ): # type: (...) -> None - self.scheduler = scheduler - self.transaction = transaction + self.scheduler = _scheduler if scheduler is None else scheduler self.hub = hub + + self.event_id = uuid.uuid4().hex # type: str + + # Here, we assume that the sampling decision on the transaction has been finalized. + # + # We cannot keep a reference to the transaction around here because it'll create + # a reference cycle. So we opt to pull out just the necessary attributes. + self._transaction_sampled = transaction.sampled # type: Optional[bool] + self.sampled = None # type: Optional[bool] + + # Various framework integrations are capable of overwriting the active thread id. + # If it is set to `None` at the end of the profile, we fall back to the default. + self._default_active_thread_id = get_current_thread_id() or 0 # type: int self.active_thread_id = None # type: Optional[int] + self.start_ns = 0 # type: int self.stop_ns = 0 # type: int self.active = False # type: bool - self.event_id = uuid.uuid4().hex # type: str self.indexed_frames = {} # type: Dict[RawFrame, int] self.indexed_stacks = {} # type: Dict[RawStackId, int] @@ -358,12 +430,79 @@ def __init__( transaction._profile = self + def update_active_thread_id(self): + # type: () -> None + self.active_thread_id = get_current_thread_id() + + def _set_initial_sampling_decision(self, sampling_context): + # type: (SamplingContext) -> None + """ + Sets the profile's sampling decision according to the following + precdence rules: + + 1. If the transaction to be profiled is not sampled, that decision + will be used, regardless of anything else. + + 2. Use `profiles_sample_rate` to decide. + """ + + # The corresponding transaction was not sampled, + # so don't generate a profile for it. + if not self._transaction_sampled: + self.sampled = False + return + + # The profiler hasn't been properly initialized. + if self.scheduler is None: + self.sampled = False + return + + hub = self.hub or sentry_sdk.Hub.current + client = hub.client + + # The client is None, so we can't get the sample rate. + if client is None: + self.sampled = False + return + + options = client.options + sample_rate = options["_experiments"].get("profiles_sample_rate") + + # The profiles_sample_rate option was not set, so profiling + # was never enabled. + if sample_rate is None: + self.sampled = False + return + + # Now we roll the dice. random.random is inclusive of 0, but not of 1, + # so strict < is safe here. In case sample_rate is a boolean, cast it + # to a float (True becomes 1.0 and False becomes 0.0) + self.sampled = random.random() < float(sample_rate) + def get_profile_context(self): # type: () -> ProfileContext return {"profile_id": self.event_id} - def __enter__(self): + def start(self): # type: () -> None + if not self.sampled: + return + + assert self.scheduler, "No scheduler specified" + self.start_ns = nanosecond_time() + self.scheduler.start_profiling(self) + + def stop(self): + # type: () -> None + if not self.sampled: + return + + assert self.scheduler, "No scheduler specified" + self.scheduler.stop_profiling(self) + self.stop_ns = nanosecond_time() + + def __enter__(self): + # type: () -> Profile hub = self.hub or sentry_sdk.Hub.current _, scope = hub._stack[-1] @@ -372,13 +511,13 @@ def __enter__(self): self._context_manager_state = (hub, scope, old_profile) - self.start_ns = nanosecond_time() - self.scheduler.start_profiling(self) + self.start() + + return self def __exit__(self, ty, value, tb): # type: (Optional[Any], Optional[Any], Optional[Any]) -> None - self.scheduler.stop_profiling(self) - self.stop_ns = nanosecond_time() + self.stop() _, scope, old_profile = self._context_manager_state del self._context_manager_state @@ -477,7 +616,7 @@ def to_json(self, event_opt, options): "transactions": [ { "id": event_opt["event_id"], - "name": self.transaction.name, + "name": event_opt["transaction"], # we start the transaction before the profile and this is # the transaction start time relative to the profile, so we # hardcode it to 0 until we can start the profile before @@ -485,9 +624,9 @@ def to_json(self, event_opt, options): # use the duration of the profile instead of the transaction # because we end the transaction after the profile "relative_end_ns": str(self.stop_ns - self.start_ns), - "trace_id": self.transaction.trace_id, + "trace_id": event_opt["contexts"]["trace"]["trace_id"], "active_thread_id": str( - self.transaction._active_thread_id + self._default_active_thread_id if self.active_thread_id is None else self.active_thread_id ), @@ -725,46 +864,3 @@ def run(self): # after sleeping, make sure to take the current # timestamp so we can use it next iteration last = time.perf_counter() - - -def _should_profile(transaction, hub): - # type: (sentry_sdk.tracing.Transaction, sentry_sdk.Hub) -> bool - - # The corresponding transaction was not sampled, - # so don't generate a profile for it. - if not transaction.sampled: - return False - - # The profiler hasn't been properly initialized. - if _scheduler is None: - return False - - client = hub.client - - # The client is None, so we can't get the sample rate. - if client is None: - return False - - options = client.options - profiles_sample_rate = options["_experiments"].get("profiles_sample_rate") - - # The profiles_sample_rate option was not set, so profiling - # was never enabled. - if profiles_sample_rate is None: - return False - - return random.random() < float(profiles_sample_rate) - - -@contextmanager -def start_profiling(transaction, hub=None): - # type: (sentry_sdk.tracing.Transaction, Optional[sentry_sdk.Hub]) -> Generator[None, None, None] - hub = hub or sentry_sdk.Hub.current - - # if profiling was not enabled, this should be a noop - if _should_profile(transaction, hub): - assert _scheduler is not None - with Profile(_scheduler, transaction, hub): - yield - else: - yield diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index 61c6a7190b..0e3cb97036 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -1,6 +1,5 @@ import uuid import random -import threading import time from datetime import datetime, timedelta @@ -567,7 +566,6 @@ class Transaction(Span): "_contexts", "_profile", "_baggage", - "_active_thread_id", ) def __init__( @@ -606,11 +604,6 @@ def __init__( self._contexts = {} # type: Dict[str, Any] self._profile = None # type: Optional[sentry_sdk.profiler.Profile] self._baggage = baggage - # for profiling, we want to know on which thread a transaction is started - # to accurately show the active thread in the UI - self._active_thread_id = ( - threading.current_thread().ident - ) # used by profiling.py def __repr__(self): # type: () -> str @@ -628,6 +621,22 @@ def __repr__(self): ) ) + def __enter__(self): + # type: () -> Transaction + super(Transaction, self).__enter__() + + if self._profile is not None: + self._profile.__enter__() + + return self + + def __exit__(self, ty, value, tb): + # type: (Optional[Any], Optional[Any], Optional[Any]) -> None + if self._profile is not None: + self._profile.__exit__(ty, value, tb) + + super(Transaction, self).__exit__(ty, value, tb) + @property def containing_transaction(self): # type: () -> Transaction @@ -707,9 +716,10 @@ def finish(self, hub=None, end_timestamp=None): "spans": finished_spans, } # type: Event - if hub.client is not None and self._profile is not None: + if self._profile is not None and self._profile.sampled: event["profile"] = self._profile contexts.update({"profile": self._profile.get_profile_context()}) + self._profile = None if has_custom_measurements_enabled(): event["measurements"] = self._measurements diff --git a/tests/integrations/django/asgi/test_asgi.py b/tests/integrations/django/asgi/test_asgi.py index 0652a5fdcb..3e8a79b763 100644 --- a/tests/integrations/django/asgi/test_asgi.py +++ b/tests/integrations/django/asgi/test_asgi.py @@ -78,7 +78,9 @@ async def test_async_views(sentry_init, capture_events, application): @pytest.mark.skipif( django.VERSION < (3, 1), reason="async views have been introduced in Django 3.1" ) -async def test_active_thread_id(sentry_init, capture_envelopes, endpoint, application): +async def test_active_thread_id( + sentry_init, capture_envelopes, teardown_profiling, endpoint, application +): sentry_init( integrations=[DjangoIntegration()], traces_sample_rate=1.0, diff --git a/tests/integrations/fastapi/test_fastapi.py b/tests/integrations/fastapi/test_fastapi.py index 9c24ce2e44..7d3aa3ffbd 100644 --- a/tests/integrations/fastapi/test_fastapi.py +++ b/tests/integrations/fastapi/test_fastapi.py @@ -155,7 +155,7 @@ def test_legacy_setup( @pytest.mark.parametrize("endpoint", ["/sync/thread_ids", "/async/thread_ids"]) -def test_active_thread_id(sentry_init, capture_envelopes, endpoint): +def test_active_thread_id(sentry_init, capture_envelopes, teardown_profiling, endpoint): sentry_init( traces_sample_rate=1.0, _experiments={"profiles_sample_rate": 1.0}, diff --git a/tests/integrations/starlette/test_starlette.py b/tests/integrations/starlette/test_starlette.py index a279142995..5e4b071235 100644 --- a/tests/integrations/starlette/test_starlette.py +++ b/tests/integrations/starlette/test_starlette.py @@ -846,7 +846,7 @@ def test_legacy_setup( @pytest.mark.parametrize("endpoint", ["/sync/thread_ids", "/async/thread_ids"]) -def test_active_thread_id(sentry_init, capture_envelopes, endpoint): +def test_active_thread_id(sentry_init, capture_envelopes, teardown_profiling, endpoint): sentry_init( traces_sample_rate=1.0, _experiments={"profiles_sample_rate": 1.0}, diff --git a/tests/test_profiler.py b/tests/test_profiler.py index f0613c9c65..52f3d6d7c8 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -1,20 +1,25 @@ import inspect +import mock import os import sys import threading import pytest +from collections import Counter +from sentry_sdk import start_transaction from sentry_sdk.profiler import ( GeventScheduler, Profile, ThreadScheduler, extract_frame, extract_stack, + get_current_thread_id, get_frame_name, setup_profiler, ) from sentry_sdk.tracing import Transaction +from sentry_sdk._queue import Queue try: import gevent @@ -64,6 +69,40 @@ def test_profiler_valid_mode(mode, teardown_profiling): setup_profiler({"_experiments": {"profiler_mode": mode}}) +@pytest.mark.parametrize( + ("profiles_sample_rate", "profile_count"), + [ + pytest.param(1.0, 1, id="100%"), + pytest.param(0.0, 0, id="0%"), + pytest.param(None, 0, id="disabled"), + ], +) +def test_profiled_transaction( + sentry_init, + capture_envelopes, + teardown_profiling, + profiles_sample_rate, + profile_count, +): + sentry_init( + traces_sample_rate=1.0, + _experiments={"profiles_sample_rate": profiles_sample_rate}, + ) + + envelopes = capture_envelopes() + + with start_transaction(name="profiling"): + pass + + count_item_types = Counter() + for envelope in envelopes: + for item in envelope.items: + count_item_types[item.type] += 1 + + assert count_item_types["transaction"] == 1 + assert count_item_types["profile"] == profile_count + + def get_frame(depth=1): """ This function is not exactly true to its name. Depending on @@ -282,6 +321,70 @@ def test_extract_stack_with_cache(): assert frame1 is frame2, i +def test_get_current_thread_id_explicit_thread(): + results = Queue(maxsize=1) + + def target1(): + pass + + def target2(): + results.put(get_current_thread_id(thread1)) + + thread1 = threading.Thread(target=target1) + thread1.start() + + thread2 = threading.Thread(target=target2) + thread2.start() + + thread2.join() + thread1.join() + + assert thread1.ident == results.get(timeout=1) + + +@requires_gevent +def test_get_current_thread_id_gevent_in_thread(): + results = Queue(maxsize=1) + + def target(): + job = gevent.spawn(get_current_thread_id) + job.join() + results.put(job.value) + + thread = threading.Thread(target=target) + thread.start() + thread.join() + assert thread.ident == results.get(timeout=1) + + +def test_get_current_thread_id_running_thread(): + results = Queue(maxsize=1) + + def target(): + results.put(get_current_thread_id()) + + thread = threading.Thread(target=target) + thread.start() + thread.join() + assert thread.ident == results.get(timeout=1) + + +def test_get_current_thread_id_main_thread(): + results = Queue(maxsize=1) + + def target(): + # mock that somehow the current thread doesn't exist + with mock.patch("threading.current_thread", side_effect=[None]): + results.put(get_current_thread_id()) + + thread_id = threading.main_thread().ident if sys.version_info >= (3, 4) else None + + thread = threading.Thread(target=target) + thread.start() + thread.join() + assert thread_id == results.get(timeout=1) + + def get_scheduler_threads(scheduler): return [thread for thread in threading.enumerate() if thread.name == scheduler.name] @@ -635,7 +738,7 @@ def test_profile_processing( ): with scheduler_class(frequency=1000) as scheduler: transaction = Transaction() - profile = Profile(scheduler, transaction) + profile = Profile(transaction, scheduler=scheduler) profile.start_ns = start_ns for ts, sample in samples: profile.write(ts, process_test_sample(sample))