Skip to content

Commit

Permalink
opentelemetry-instrumentation-celery: don't detach a None token (open…
Browse files Browse the repository at this point in the history
  • Loading branch information
xrmx authored Oct 28, 2024
1 parent 8582da5 commit 7cbe586
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2901])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2901)
- `opentelemetry-instrumentation-system-metrics` Update metric units to conform to UCUM conventions.
([#2922](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2922))
- `opentelemetry-instrumentation-celery` Don't detach context without a None token
([#2927](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2927))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ def _trace_postrun(self, *args, **kwargs):
self.update_task_duration_time(task_id)
labels = {"task": task.name, "worker": task.request.hostname}
self._record_histograms(task_id, labels)
context_api.detach(token)
# if the process sending the task is not instrumented
# there's no incoming context and no token to detach
if token is not None:
context_api.detach(token)

def _trace_before_publish(self, *args, **kwargs):
task = utils.retrieve_task_from_sender(kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import threading
import time

from wrapt import wrap_function_wrapper

from opentelemetry import baggage, context
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor, utils
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind, StatusCode
Expand Down Expand Up @@ -185,6 +188,40 @@ def test_baggage(self):

self.assertEqual(task.result, {"key": "value"})

def test_task_not_instrumented_does_not_raise(self):
def _retrieve_context_wrapper_none_token(
wrapped, instance, args, kwargs
):
ctx = wrapped(*args, **kwargs)
if ctx is None:
return ctx
span, activation, _ = ctx
return span, activation, None

wrap_function_wrapper(
utils,
"retrieve_context",
_retrieve_context_wrapper_none_token,
)

CeleryInstrumentor().instrument()

result = task_add.delay(1, 2)

timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

# TODO: assert we don't have "TypeError: expected an instance of Token, got None" in logs
self.assertTrue(result)

unwrap(utils, "retrieve_context")


class TestCelerySignatureTask(TestBase):
def setUp(self):
Expand Down

0 comments on commit 7cbe586

Please sign in to comment.