Skip to content

Commit

Permalink
refactor utils
Browse files Browse the repository at this point in the history
  • Loading branch information
majorgreys committed Jun 16, 2020
1 parent 8d293fd commit 074299f
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 66 deletions.
1 change: 1 addition & 0 deletions ext/opentelemetry-ext-celery/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ package_dir=
packages=find_namespace:
install_requires =
opentelemetry-api == 0.10.dev0
opentelemetry-instrumentation == 0.10.dev0
celery ~= 4.0

[options.extras_require]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,11 @@ def add(x, y):
import logging
import signal

from celery import registry, signals # pylint: disable=no-name-in-module
from celery import signals # pylint: disable=no-name-in-module

from opentelemetry import trace
from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.ext.celery.utils import (
attach_span,
detach_span,
retrieve_span,
retrieve_task_id,
set_attributes_from_context,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.ext.celery import utils
from opentelemetry.ext.celery.version import __version__
from opentelemetry.trace.status import Status, StatusCanonicalCode

Expand Down Expand Up @@ -110,58 +104,53 @@ def _uninstrument(self, **kwargs):
signals.task_retry.disconnect(self._trace_retry)

def _trace_prerun(self, *args, **kwargs):
task = kwargs.get("task")
task_id = kwargs.get("task_id")
logger.debug("prerun signal start task_id=%s", task_id)
task = utils.signal_retrieve_task(kwargs)
task_id = utils.signal_retrieve_task_id(kwargs)

if task is None or task_id is None:
logger.debug(
"Unable to extract the Task and the task_id. This version of Celery may not be supported."
)
return

logger.debug("prerun signal start task_id=%s", task_id)

span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER)

activation = self._tracer.use_span(span, end_on_exit=True)
activation.__enter__()
attach_span(task, task_id, (span, activation))
utils.attach_span(task, task_id, (span, activation))

@staticmethod
def _trace_postrun(*args, **kwargs):
task = kwargs.get("task")
task_id = kwargs.get("task_id")
logger.debug("postrun signal task_id=%s", task_id)
task = utils.signal_retrieve_task(kwargs)
task_id = utils.signal_retrieve_task_id(kwargs)

if task is None or task_id is None:
logger.debug(
"Unable to extract the Task and the task_id. This version of Celery may not be supported."
)
return

logger.debug("postrun signal task_id=%s", task_id)

# retrieve and finish the Span
span, activation = retrieve_span(task, task_id)
span, activation = utils.retrieve_span(task, task_id)
if span is None:
logger.warning("no existing span found for task_id=%s", task_id)
return

# request context tags
span.set_attribute(_TASK_TAG_KEY, _TASK_RUN)
set_attributes_from_context(span, kwargs)
set_attributes_from_context(span, task.request)
utils.set_attributes_from_context(span, kwargs)
utils.set_attributes_from_context(span, task.request)
span.set_attribute(_TASK_NAME_KEY, task.name)

activation.__exit__(None, None, None)
detach_span(task, task_id)
utils.detach_span(task, task_id)

def _trace_before_publish(self, *args, **kwargs):
# The `Task` instance **does not** include any information about the current
# execution, so it **must not** be used to retrieve `request` data.
# pylint: disable=no-member
task = registry.tasks.get(kwargs.get("sender"))
task_id = retrieve_task_id(kwargs)
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id_from_message(kwargs)

if task is None or task_id is None:
logger.debug(
"Unable to extract the Task and the task_id. This version of Celery may not be supported."
)
return

span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER)
Expand All @@ -170,44 +159,39 @@ def _trace_before_publish(self, *args, **kwargs):
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
span.set_attribute(_MESSAGE_ID_ATTRIBUTE_NAME, task_id)
span.set_attribute(_TASK_NAME_KEY, task.name)
set_attributes_from_context(span, kwargs)
utils.set_attributes_from_context(span, kwargs)

activation = self._tracer.use_span(span, end_on_exit=True)
activation.__enter__()
attach_span(task, task_id, (span, activation), is_publish=True)
utils.attach_span(task, task_id, (span, activation), is_publish=True)

@staticmethod
def _trace_after_publish(*args, **kwargs):
# pylint: disable=no-member
task = registry.tasks.get(kwargs.get("sender"))
task_id = retrieve_task_id(kwargs)
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id_from_message(kwargs)

if task is None or task_id is None:
logger.debug(
"Unable to extract the Task and the task_id. This version of Celery may not be supported."
)
return

# retrieve and finish the Span
_, activation = retrieve_span(task, task_id, is_publish=True)
_, activation = utils.retrieve_span(task, task_id, is_publish=True)
if activation is None:
logger.warning("no existing span found for task_id=%s", task_id)
return

activation.__exit__(None, None, None)
detach_span(task, task_id, is_publish=True)
utils.detach_span(task, task_id, is_publish=True)

@staticmethod
def _trace_failure(*args, **kwargs):
task = kwargs.get("sender")
task_id = kwargs.get("task_id")
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id(kwargs)

if task is None or task_id is None:
logger.debug(
"Unable to extract the Task and the task_id. This version of Celery may not be supported."
)
return

# retrieve and pass exception info to activation
span, _ = retrieve_span(task, task_id)
span, _ = utils.retrieve_span(task, task_id)
if span is None:
return

Expand All @@ -226,22 +210,14 @@ def _trace_failure(*args, **kwargs):

@staticmethod
def _trace_retry(*args, **kwargs):
task = kwargs.get("sender")
context = kwargs.get("request")
if task is None or context is None:
logger.debug(
"Unable to extract the Task or the Context. This version of Celery may not be supported."
)
return
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id_from_request(kwargs)
reason = utils.signal_retrieve_reason(kwargs)

reason = kwargs.get("reason")
if not reason:
logger.debug(
"Unable to extract the retry reason. This version of Celery may not be supported."
)
if task is None or task_id is None or reason is None:
return

span, _ = retrieve_span(task, context.id)
span, _ = utils.retrieve_span(task, task_id)
if span is None:
return

Expand Down
67 changes: 63 additions & 4 deletions ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import celery

logger = logging.getLogger(__name__)

# Celery Context key
CTX_KEY = "__otel_task_span"

Expand Down Expand Up @@ -148,16 +154,69 @@ def retrieve_span(task, task_id, is_publish=False):
return span_dict.get((task_id, is_publish), (None, None))


def retrieve_task_id(context):
def signal_retrieve_task(kwargs):
task = kwargs.get("task")
if task is None:
logger.debug("Unable to retrieve task from signal arguments")
return task


def signal_retrieve_task_from_sender(kwargs):
sender = kwargs.get("sender")
if sender is None:
logger.debug("Unable to retrieve the sender from signal arguments")
return

# before and after publish signals sender is the task name
# for retry and failure signals sender is the task object
if isinstance(sender, str):
sender = celery.registry.tasks.get(sender)
if sender is None:
logger.debug("Unable to retrieve the task from sender=%s", sender)
return

return sender


def signal_retrieve_task_id(kwargs):
task_id = kwargs.get("task_id")
if task_id is None:
logger.debug("Unable to retrieve task_id from signal arguments")
return task_id


def signal_retrieve_task_id_from_request(kwargs):
# retry signal does not include task_id as argument so use request argument
request = kwargs.get("request")
if request is None:
logger.debug("Unable to retrieve the request from signal arguments")

task_id = getattr(request, "id")
if task_id is None:
logger.debug("Unable to retrieve the task_id from the request")

return task_id


def signal_retrieve_task_id_from_message(kwargs):
"""Helper to retrieve the `Task` identifier from the message `body`.
This helper supports Protocol Version 1 and 2. The Protocol is well
detailed in the official documentation:
http://docs.celeryproject.org/en/latest/internals/protocol.html
"""
headers = context.get("headers")
body = context.get("body")
if headers is not None:
headers = kwargs.get("headers")
body = kwargs.get("body")
if headers is not None and len(headers) > 0:
# Protocol Version 2 (default from Celery 4.0)
return headers.get("id")
# Protocol Version 1
return body.get("id")




def signal_retrieve_reason(kwargs):
reason = kwargs.get("reason")
if not reason:
logger.debug("Unable to retrieve the retry reason")
return reason
3 changes: 1 addition & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ envlist =
pypy3-test-ext-redis

; opentelemetry-ext-celery
py3{4,5,6,7,8}-test-ext-celery
py3{5,6,7,8}-test-ext-celery
pypy3-test-ext-celery

; opentelemetry-ext-system-metrics
Expand Down Expand Up @@ -232,7 +232,6 @@ commands_pre =

getting-started: pip install -e {toxinidir}/opentelemetry-instrumentation -e {toxinidir}/ext/opentelemetry-ext-requests -e {toxinidir}/ext/opentelemetry-ext-wsgi -e {toxinidir}/ext/opentelemetry-ext-flask

celery: pip install {toxinidir}/opentelemetry-instrument
celery: pip install {toxinidir}/ext/opentelemetry-ext-celery[test]

grpc: pip install {toxinidir}/ext/opentelemetry-ext-grpc[test]
Expand Down

0 comments on commit 074299f

Please sign in to comment.