Skip to content

Commit

Permalink
Reapply "Refactor the Celery Beat integration (#3105)" (#3144) (#3175)
Browse files Browse the repository at this point in the history
This reverts the revert that was done to mitigate the regression error with Crons not being sending ok/error checkins. This reapplies the refactoring and also fixes the root cause of the regression and also adds integration tests to make sure it does not happen again.
  • Loading branch information
antonpirker authored Jun 18, 2024
1 parent c8fc781 commit 009fa4f
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 145 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Start Redis
uses: supercharge/[email protected]
- name: Setup Test Env
run: |
pip install coverage tox
Expand Down Expand Up @@ -108,6 +110,8 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Start Redis
uses: supercharge/[email protected]
- name: Setup Test Env
run: |
pip install coverage tox
Expand Down
5 changes: 5 additions & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
"asyncpg",
}

FRAMEWORKS_NEEDING_REDIS = {
"celery",
}

FRAMEWORKS_NEEDING_CLICKHOUSE = {
"clickhouse_driver",
}
Expand Down Expand Up @@ -275,6 +279,7 @@ def render_template(group, frameworks, py_versions_pinned, py_versions_latest):
"needs_aws_credentials": bool(set(frameworks) & FRAMEWORKS_NEEDING_AWS),
"needs_clickhouse": bool(set(frameworks) & FRAMEWORKS_NEEDING_CLICKHOUSE),
"needs_postgres": bool(set(frameworks) & FRAMEWORKS_NEEDING_POSTGRES),
"needs_redis": bool(set(frameworks) & FRAMEWORKS_NEEDING_REDIS),
"needs_github_secrets": bool(
set(frameworks) & FRAMEWORKS_NEEDING_GITHUB_SECRETS
),
Expand Down
5 changes: 5 additions & 0 deletions scripts/split-tox-gh-actions/templates/test_group.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
- uses: getsentry/action-clickhouse-in-ci@v1
{% endif %}

{% if needs_redis %}
- name: Start Redis
uses: supercharge/[email protected]
{% endif %}

- name: Setup Test Env
run: |
pip install coverage tox
Expand Down
17 changes: 8 additions & 9 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ def __init__(
self.monitor_beat_tasks = monitor_beat_tasks
self.exclude_beat_tasks = exclude_beat_tasks

if monitor_beat_tasks:
_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals()
_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals(monitor_beat_tasks)

@staticmethod
def setup_once():
Expand Down Expand Up @@ -167,11 +166,11 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
"""
updated_headers = original_headers.copy()
with capture_internal_exceptions():
headers = {}
if span is not None:
headers = dict(
Scope.get_current_scope().iter_trace_propagation_headers(span=span)
)
# if span is None (when the task was started by Celery Beat)
# this will return the trace headers from the scope.
headers = dict(
Scope.get_isolation_scope().iter_trace_propagation_headers(span=span)
)

if monitor_beat_tasks:
headers.update(
Expand Down
168 changes: 72 additions & 96 deletions sentry_sdk/integrations/celery/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,133 +113,109 @@ def _get_monitor_config(celery_schedule, app, monitor_name):
return monitor_config


def _patch_beat_apply_entry():
# type: () -> None
def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
# type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
"""
Makes sure that the Sentry Crons information is set in the Celery Beat task's
headers so that is is monitored with Sentry Crons.
This is only called by Celery Beat. After apply_entry is called
Celery will call apply_async to put the task in the queue.
Add Sentry Crons information to the schedule_entry headers.
"""
from sentry_sdk.integrations.celery import CeleryIntegration

original_apply_entry = Scheduler.apply_entry

def sentry_apply_entry(*args, **kwargs):
# type: (*Any, **Any) -> None
scheduler, schedule_entry = args
app = scheduler.app

celery_schedule = schedule_entry.schedule
monitor_name = schedule_entry.name

integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return original_apply_entry(*args, **kwargs)

if match_regex_list(monitor_name, integration.exclude_beat_tasks):
return original_apply_entry(*args, **kwargs)
if not integration.monitor_beat_tasks:
return

# Tasks started by Celery Beat start a new Trace
scope = Scope.get_isolation_scope()
scope.set_new_propagation_context()
scope._name = "celery-beat"
monitor_name = schedule_entry.name

monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
task_should_be_excluded = match_regex_list(
monitor_name, integration.exclude_beat_tasks
)
if task_should_be_excluded:
return

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)
celery_schedule = schedule_entry.schedule
app = scheduler.app

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers
is_supported_schedule = bool(monitor_config)
if not is_supported_schedule:
return

return original_apply_entry(*args, **kwargs)
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

Scheduler.apply_entry = sentry_apply_entry
check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

def _patch_redbeat_maybe_due():
# type: () -> None

if RedBeatScheduler is None:
return
def _wrap_beat_scheduler(original_function):
# type: (Callable[..., Any]) -> Callable[..., Any]
"""
Makes sure that:
- a new Sentry trace is started for each task started by Celery Beat and
it is propagated to the task.
- the Sentry Crons information is set in the Celery Beat task's
headers so that is is monitored with Sentry Crons.
After the patched function is called,
Celery Beat will call apply_async to put the task in the queue.
"""
# Patch only once
# Can't use __name__ here, because some of our tests mock original_apply_entry
already_patched = "sentry_patched_scheduler" in str(original_function)
if already_patched:
return original_function

from sentry_sdk.integrations.celery import CeleryIntegration

original_maybe_due = RedBeatScheduler.maybe_due

def sentry_maybe_due(*args, **kwargs):
def sentry_patched_scheduler(*args, **kwargs):
# type: (*Any, **Any) -> None
scheduler, schedule_entry = args
app = scheduler.app

celery_schedule = schedule_entry.schedule
monitor_name = schedule_entry.name

integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return original_maybe_due(*args, **kwargs)

task_should_be_excluded = match_regex_list(
monitor_name, integration.exclude_beat_tasks
)
if task_should_be_excluded:
return original_maybe_due(*args, **kwargs)
return original_function(*args, **kwargs)

# Tasks started by Celery Beat start a new Trace
scope = Scope.get_isolation_scope()
scope.set_new_propagation_context()
scope._name = "celery-beat"

monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)
scheduler, schedule_entry = args
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})
return original_function(*args, **kwargs)

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers
return sentry_patched_scheduler

return original_maybe_due(*args, **kwargs)

RedBeatScheduler.maybe_due = sentry_maybe_due
def _patch_beat_apply_entry():
# type: () -> None
Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)


def _setup_celery_beat_signals():
def _patch_redbeat_maybe_due():
# type: () -> None
task_success.connect(crons_task_success)
task_failure.connect(crons_task_failure)
task_retry.connect(crons_task_retry)
if RedBeatScheduler is None:
return

RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)


def _setup_celery_beat_signals(monitor_beat_tasks):
# type: (bool) -> None
if monitor_beat_tasks:
task_success.connect(crons_task_success)
task_failure.connect(crons_task_failure)
task_retry.connect(crons_task_retry)


def crons_task_success(sender, **kwargs):
Expand Down
7 changes: 4 additions & 3 deletions sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,10 @@ def iter_headers(self):
def iter_trace_propagation_headers(self, *args, **kwargs):
# type: (Any, Any) -> Generator[Tuple[str, str], None, None]
"""
Return HTTP headers which allow propagation of trace data. Data taken
from the span representing the request, if available, or the current
span on the scope if not.
Return HTTP headers which allow propagation of trace data.
If a span is given, the trace data will taken from the span.
If no span is given, the trace data is taken from the scope.
"""
client = Scope.get_client()
if not client.options.get("propagate_traces"):
Expand Down
58 changes: 58 additions & 0 deletions tests/integrations/celery/integration_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import signal
import tempfile
import threading
import time

from celery.beat import Scheduler

from sentry_sdk.utils import logger


class ImmediateScheduler(Scheduler):
"""
A custom scheduler that starts tasks immediately after starting Celery beat.
"""

def setup_schedule(self):
super().setup_schedule()
for _, entry in self.schedule.items():
self.apply_entry(entry)

def tick(self):
# Override tick to prevent the normal schedule cycle
return 1


def kill_beat(beat_pid_file, delay_seconds=1):
"""
Terminates Celery Beat after the given `delay_seconds`.
"""
logger.info("Starting Celery Beat killer...")
time.sleep(delay_seconds)
pid = int(open(beat_pid_file, "r").read())
logger.info("Terminating Celery Beat...")
os.kill(pid, signal.SIGTERM)


def run_beat(celery_app, runtime_seconds=1, loglevel="warning", quiet=True):
"""
Run Celery Beat that immediately starts tasks.
The Celery Beat instance is automatically terminated after `runtime_seconds`.
"""
logger.info("Starting Celery Beat...")
pid_file = os.path.join(tempfile.mkdtemp(), f"celery-beat-{os.getpid()}.pid")

t = threading.Thread(
target=kill_beat,
args=(pid_file,),
kwargs={"delay_seconds": runtime_seconds},
)
t.start()

beat_instance = celery_app.Beat(
loglevel=loglevel,
quiet=quiet,
pidfile=pid_file,
)
beat_instance.run()
Loading

0 comments on commit 009fa4f

Please sign in to comment.