Skip to content

Commit

Permalink
Merge pull request #345 from jrobichaud/queue-info
Browse files Browse the repository at this point in the history
  • Loading branch information
jrobichaud authored Oct 19, 2023
2 parents 4b82f27 + d87db34 commit 5dba808
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 245 deletions.
10 changes: 6 additions & 4 deletions django_structlog/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ class DjangoStructLogConfig(AppConfig):

def ready(self):
if app_settings.CELERY_ENABLED:
from .celery.receivers import connect_celery_signals
from .celery.receivers import CeleryReceiver

connect_celery_signals()
self._celery_receiver = CeleryReceiver()
self._celery_receiver.connect_signals()

if app_settings.COMMAND_LOGGING_ENABLED:
from .commands import init_command_signals
from .commands import DjangoCommandReceiver

init_command_signals()
self._django_command_receiver = DjangoCommandReceiver()
self._django_command_receiver.connect_signals()
230 changes: 139 additions & 91 deletions django_structlog/celery/receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,107 +6,155 @@
logger = structlog.getLogger(__name__)


def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
import celery

if celery.current_app.conf.task_protocol < 2:
return

context = structlog.contextvars.get_merged_contextvars(logger)
if "task_id" in context:
context["parent_task_id"] = context.pop("task_id")

signals.modify_context_before_task_publish.send(
sender=receiver_before_task_publish, context=context
)

headers["__django_structlog__"] = context


def receiver_after_task_publish(sender=None, headers=None, body=None, **kwargs):
logger.info(
"task_enqueued",
child_task_id=headers.get("id") if headers else body.get("id"),
child_task_name=headers.get("task") if headers else body.get("task"),
)


def receiver_task_pre_run(task_id, task, *args, **kwargs):
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(task_id=task_id)
metadata = getattr(task.request, "__django_structlog__", {})
structlog.contextvars.bind_contextvars(**metadata)
signals.bind_extra_task_metadata.send(
sender=receiver_task_pre_run, task=task, logger=logger
)
logger.info("task_started", task=task.name)


def receiver_task_retry(request=None, reason=None, einfo=None, **kwargs):
logger.warning("task_retrying", reason=reason)

class CeleryReceiver:
def __init__(self):
self._priority = None

def receiver_before_task_publish(
self,
sender=None,
headers=None,
body=None,
properties=None,
routing_key=None,
**kwargs,
):
import celery

if celery.current_app.conf.task_protocol < 2:
return

context = structlog.contextvars.get_merged_contextvars(logger)
if "task_id" in context:
context["parent_task_id"] = context.pop("task_id")

signals.modify_context_before_task_publish.send(
sender=self.receiver_before_task_publish,
context=context,
task_routing_key=routing_key,
task_properties=properties,
)
if properties:
self._priority = properties.get("priority", None)

def receiver_task_success(result=None, **kwargs):
signals.pre_task_succeeded.send(
sender=receiver_task_success, logger=logger, result=result
)
logger.info("task_succeeded")
headers["__django_structlog__"] = context

def receiver_after_task_publish(
self, sender=None, headers=None, body=None, routing_key=None, **kwargs
):
properties = {}
if self._priority is not None:
properties["priority"] = self._priority
self._priority = None

def receiver_task_failure(
task_id=None,
exception=None,
traceback=None,
einfo=None,
sender=None,
*args,
**kwargs,
):
throws = getattr(sender, "throws", ())
if isinstance(exception, throws):
logger.info(
"task_failed",
error=str(exception),
"task_enqueued",
child_task_id=headers.get("id") if headers else body.get("id"),
child_task_name=headers.get("task") if headers else body.get("task"),
routing_key=routing_key,
**properties,
)
else:
logger.exception(
"task_failed",
error=str(exception),
exception=exception,
)


def receiver_task_revoked(
request=None, terminated=None, signum=None, expired=None, **kwargs
):
metadata = getattr(request, "__django_structlog__", {}).copy()
metadata["task_id"] = request.id
metadata["task"] = request.task

logger.warning(
"task_revoked",
terminated=terminated,
signum=signum.value if signum is not None else None,
signame=signum.name if signum is not None else None,
expired=expired,
**metadata,
)
def receiver_task_prerun(self, task_id, task, *args, **kwargs):
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(task_id=task_id)
metadata = getattr(task.request, "__django_structlog__", {})
structlog.contextvars.bind_contextvars(**metadata)
signals.bind_extra_task_metadata.send(
sender=self.receiver_task_prerun, task=task, logger=logger
)
logger.info("task_started", task=task.name)

def receiver_task_retry(self, request=None, reason=None, einfo=None, **kwargs):
logger.warning("task_retrying", reason=reason)

def receiver_task_unknown(message=None, exc=None, name=None, id=None, **kwargs):
logger.error(
"task_not_found",
task=name,
task_id=id,
)
def receiver_task_success(self, result=None, **kwargs):
signals.pre_task_succeeded.send(
sender=self.receiver_task_success, logger=logger, result=result
)
logger.info("task_succeeded")

def receiver_task_failure(
self,
task_id=None,
exception=None,
traceback=None,
einfo=None,
sender=None,
*args,
**kwargs,
):
throws = getattr(sender, "throws", ())
if isinstance(exception, throws):
logger.info(
"task_failed",
error=str(exception),
)
else:
logger.exception(
"task_failed",
error=str(exception),
exception=exception,
)

def receiver_task_revoked(
self, request=None, terminated=None, signum=None, expired=None, **kwargs
):
metadata = getattr(request, "__django_structlog__", {}).copy()
metadata["task_id"] = request.id
metadata["task"] = request.task

logger.warning(
"task_revoked",
terminated=terminated,
signum=signum.value if signum is not None else None,
signame=signum.name if signum is not None else None,
expired=expired,
**metadata,
)

def receiver_task_unknown(
self, message=None, exc=None, name=None, id=None, **kwargs
):
logger.error(
"task_not_found",
task=name,
task_id=id,
)

def receiver_task_rejected(message=None, exc=None, **kwargs):
logger.exception("task_rejected", task_id=message.properties.get("correlation_id"))
def receiver_task_rejected(self, message=None, exc=None, **kwargs):
logger.exception(
"task_rejected", task_id=message.properties.get("correlation_id")
)

def connect_signals(self):
from celery.signals import (
before_task_publish,
after_task_publish,
)

def connect_celery_signals():
from celery.signals import before_task_publish, after_task_publish
before_task_publish.connect(self.receiver_before_task_publish)
after_task_publish.connect(self.receiver_after_task_publish)

def connect_worker_signals(self):
from celery.signals import (
before_task_publish,
after_task_publish,
task_prerun,
task_retry,
task_success,
task_failure,
task_revoked,
task_unknown,
task_rejected,
)

before_task_publish.connect(receiver_before_task_publish)
after_task_publish.connect(receiver_after_task_publish)
before_task_publish.connect(self.receiver_before_task_publish)
after_task_publish.connect(self.receiver_after_task_publish)
task_prerun.connect(self.receiver_task_prerun)
task_retry.connect(self.receiver_task_retry)
task_success.connect(self.receiver_task_success)
task_failure.connect(self.receiver_task_failure)
task_revoked.connect(self.receiver_task_revoked)
task_unknown.connect(self.receiver_task_unknown)
task_rejected.connect(self.receiver_task_rejected)
4 changes: 3 additions & 1 deletion django_structlog/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
""" Signal to modify context passed over to ``celery`` task's context. You must modify the ``context`` dict.
:param context: the context dict that will be passed over to the task runner's logger
:param task_routing_key: routing key of the task
:param task_properties: task's message properties
>>> from django.dispatch import receiver
>>> from django_structlog.celery import signals
>>>
>>> @receiver(signals.modify_context_before_task_publish)
... def receiver_modify_context_before_task_publish(sender, signal, context, **kwargs):
... def receiver_modify_context_before_task_publish(sender, signal, context, task_routing_key=None, task_properties=None, **kwargs):
... keys_to_keep = {"request_id", "parent_task_id"}
... new_dict = {
... key_to_keep: context[key_to_keep]
Expand Down
27 changes: 3 additions & 24 deletions django_structlog/celery/steps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from celery import bootsteps

from . import receivers
from .receivers import CeleryReceiver


class DjangoStructLogInitStep(bootsteps.Step):
Expand All @@ -16,26 +16,5 @@ class DjangoStructLogInitStep(bootsteps.Step):

def __init__(self, parent, **kwargs):
super().__init__(parent, **kwargs)
import celery
from celery.signals import (
before_task_publish,
after_task_publish,
task_prerun,
task_retry,
task_success,
task_failure,
task_revoked,
)

before_task_publish.connect(receivers.receiver_before_task_publish)
after_task_publish.connect(receivers.receiver_after_task_publish)
task_prerun.connect(receivers.receiver_task_pre_run)
task_retry.connect(receivers.receiver_task_retry)
task_success.connect(receivers.receiver_task_success)
task_failure.connect(receivers.receiver_task_failure)
task_revoked.connect(receivers.receiver_task_revoked)
if celery.VERSION > (4,):
from celery.signals import task_unknown, task_rejected

task_unknown.connect(receivers.receiver_task_unknown)
task_rejected.connect(receivers.receiver_task_rejected)
self.receiver = CeleryReceiver()
self.receiver.connect_worker_signals()
59 changes: 30 additions & 29 deletions django_structlog/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,40 @@
import uuid

logger = structlog.getLogger(__name__)
stack = []


def pre_receiver(sender, *args, **kwargs):
command_id = str(uuid.uuid4())
if len(stack):
parent_command_id, _ = stack[-1]
tokens = structlog.contextvars.bind_contextvars(
parent_command_id=parent_command_id, command_id=command_id
class DjangoCommandReceiver:
def __init__(self):
self.stack = []

def pre_receiver(self, sender, *args, **kwargs):
command_id = str(uuid.uuid4())
if len(self.stack):
parent_command_id, _ = self.stack[-1]
tokens = structlog.contextvars.bind_contextvars(
parent_command_id=parent_command_id, command_id=command_id
)
else:
tokens = structlog.contextvars.bind_contextvars(command_id=command_id)
self.stack.append((command_id, tokens))

logger.info(
"command_started",
command_name=sender.__module__.replace(".management.commands", ""),
)
else:
tokens = structlog.contextvars.bind_contextvars(command_id=command_id)
stack.append((command_id, tokens))

logger.info(
"command_started",
command_name=sender.__module__.replace(".management.commands", ""),
)
def post_receiver(self, sender, outcome, *args, **kwargs):
logger.info("command_finished")

if len(self.stack): # pragma: no branch
command_id, tokens = self.stack.pop()
structlog.contextvars.reset_contextvars(**tokens)

def post_receiver(sender, outcome, *args, **kwargs):
logger.info("command_finished")
def connect_signals(self):
try:
from django_extensions.management.signals import pre_command, post_command
except ModuleNotFoundError: # pragma: no cover
return

if len(stack):
command_id, tokens = stack.pop()
structlog.contextvars.reset_contextvars(**tokens)


def init_command_signals():
try:
from django_extensions.management.signals import pre_command, post_command
except ModuleNotFoundError: # pragma: no cover
return

pre_command.connect(pre_receiver)
post_command.connect(post_receiver)
pre_command.connect(self.pre_receiver)
post_command.connect(self.post_receiver)
2 changes: 1 addition & 1 deletion django_structlog_demo_project/home/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

def enqueue_successful_task(request):
logger.info("Enqueuing successful task")
successful_task.delay(foo="bar")
successful_task.apply_async(foo="bar", priority=5)
return HttpResponse(status=201)


Expand Down
Loading

0 comments on commit 5dba808

Please sign in to comment.