-
Notifications
You must be signed in to change notification settings - Fork 514
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(arq): add arq integration (#1872)
Initial integration for arq
- Loading branch information
Showing
8 changed files
with
452 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
name: Test arq | ||
|
||
on: | ||
push: | ||
branches: | ||
- master | ||
- release/** | ||
|
||
pull_request: | ||
|
||
# Cancel in progress workflows on pull_requests. | ||
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value | ||
concurrency: | ||
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} | ||
cancel-in-progress: true | ||
|
||
permissions: | ||
contents: read | ||
|
||
env: | ||
BUILD_CACHE_KEY: ${{ github.sha }} | ||
CACHED_BUILD_PATHS: | | ||
${{ github.workspace }}/dist-serverless | ||
jobs: | ||
test: | ||
name: arq, python ${{ matrix.python-version }}, ${{ matrix.os }} | ||
runs-on: ${{ matrix.os }} | ||
timeout-minutes: 45 | ||
|
||
strategy: | ||
fail-fast: false | ||
matrix: | ||
python-version: ["3.7","3.8","3.9","3.10","3.11"] | ||
# python3.6 reached EOL and is no longer being supported on | ||
# new versions of hosted runners on Github Actions | ||
# ubuntu-20.04 is the last version that supported python3.6 | ||
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877 | ||
os: [ubuntu-20.04] | ||
|
||
steps: | ||
- uses: actions/checkout@v3 | ||
- uses: actions/setup-python@v4 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
|
||
- name: Setup Test Env | ||
run: | | ||
pip install codecov "tox>=3,<4" | ||
- name: Test arq | ||
timeout-minutes: 45 | ||
shell: bash | ||
run: | | ||
set -x # print commands that are executed | ||
coverage erase | ||
./scripts/runtox.sh "${{ matrix.python-version }}-arq" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch | ||
coverage combine .coverage* | ||
coverage xml -i | ||
codecov --file coverage.xml | ||
check_required_tests: | ||
name: All arq tests passed or skipped | ||
needs: test | ||
# Always run this, even if a dependent job failed | ||
if: always() | ||
runs-on: ubuntu-20.04 | ||
steps: | ||
- name: Check for failures | ||
if: contains(needs.test.result, 'failure') | ||
run: | | ||
echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
from __future__ import absolute_import | ||
|
||
import sys | ||
|
||
from sentry_sdk._compat import reraise | ||
from sentry_sdk._types import MYPY | ||
from sentry_sdk import Hub | ||
from sentry_sdk.consts import OP | ||
from sentry_sdk.hub import _should_send_default_pii | ||
from sentry_sdk.integrations import DidNotEnable, Integration | ||
from sentry_sdk.integrations.logging import ignore_logger | ||
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK | ||
from sentry_sdk.utils import ( | ||
capture_internal_exceptions, | ||
event_from_exception, | ||
SENSITIVE_DATA_SUBSTITUTE, | ||
) | ||
|
||
try: | ||
import arq.worker | ||
from arq.version import VERSION as ARQ_VERSION | ||
from arq.connections import ArqRedis | ||
from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker | ||
except ImportError: | ||
raise DidNotEnable("Arq is not installed") | ||
|
||
if MYPY: | ||
from typing import Any, Dict, Optional | ||
|
||
from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint | ||
|
||
from arq.jobs import Job | ||
from arq.typing import WorkerCoroutine | ||
from arq.worker import Function | ||
|
||
ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob) | ||
|
||
|
||
class ArqIntegration(Integration): | ||
identifier = "arq" | ||
|
||
@staticmethod | ||
def setup_once(): | ||
# type: () -> None | ||
|
||
try: | ||
if isinstance(ARQ_VERSION, str): | ||
version = tuple(map(int, ARQ_VERSION.split(".")[:2])) | ||
else: | ||
version = ARQ_VERSION.version[:2] | ||
except (TypeError, ValueError): | ||
raise DidNotEnable("arq version unparsable: {}".format(ARQ_VERSION)) | ||
|
||
if version < (0, 23): | ||
raise DidNotEnable("arq 0.23 or newer required.") | ||
|
||
patch_enqueue_job() | ||
patch_run_job() | ||
patch_func() | ||
|
||
ignore_logger("arq.worker") | ||
|
||
|
||
def patch_enqueue_job(): | ||
# type: () -> None | ||
old_enqueue_job = ArqRedis.enqueue_job | ||
|
||
async def _sentry_enqueue_job(self, function, *args, **kwargs): | ||
# type: (ArqRedis, str, *Any, **Any) -> Optional[Job] | ||
hub = Hub.current | ||
|
||
if hub.get_integration(ArqIntegration) is None: | ||
return await old_enqueue_job(self, function, *args, **kwargs) | ||
|
||
with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function): | ||
return await old_enqueue_job(self, function, *args, **kwargs) | ||
|
||
ArqRedis.enqueue_job = _sentry_enqueue_job | ||
|
||
|
||
def patch_run_job(): | ||
# type: () -> None | ||
old_run_job = Worker.run_job | ||
|
||
async def _sentry_run_job(self, job_id, score): | ||
# type: (Worker, str, int) -> None | ||
hub = Hub(Hub.current) | ||
|
||
if hub.get_integration(ArqIntegration) is None: | ||
return await old_run_job(self, job_id, score) | ||
|
||
with hub.push_scope() as scope: | ||
scope._name = "arq" | ||
scope.clear_breadcrumbs() | ||
|
||
transaction = Transaction( | ||
name="unknown arq task", | ||
status="ok", | ||
op=OP.QUEUE_TASK_ARQ, | ||
source=TRANSACTION_SOURCE_TASK, | ||
) | ||
|
||
with hub.start_transaction(transaction): | ||
return await old_run_job(self, job_id, score) | ||
|
||
Worker.run_job = _sentry_run_job | ||
|
||
|
||
def _capture_exception(exc_info): | ||
# type: (ExcInfo) -> None | ||
hub = Hub.current | ||
|
||
if hub.scope.transaction is not None: | ||
if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: | ||
hub.scope.transaction.set_status("aborted") | ||
return | ||
|
||
hub.scope.transaction.set_status("internal_error") | ||
|
||
event, hint = event_from_exception( | ||
exc_info, | ||
client_options=hub.client.options if hub.client else None, | ||
mechanism={"type": ArqIntegration.identifier, "handled": False}, | ||
) | ||
hub.capture_event(event, hint=hint) | ||
|
||
|
||
def _make_event_processor(ctx, *args, **kwargs): | ||
# type: (Dict[Any, Any], *Any, **Any) -> EventProcessor | ||
def event_processor(event, hint): | ||
# type: (Event, Hint) -> Optional[Event] | ||
|
||
hub = Hub.current | ||
|
||
with capture_internal_exceptions(): | ||
if hub.scope.transaction is not None: | ||
hub.scope.transaction.name = ctx["job_name"] | ||
event["transaction"] = ctx["job_name"] | ||
|
||
tags = event.setdefault("tags", {}) | ||
tags["arq_task_id"] = ctx["job_id"] | ||
tags["arq_task_retry"] = ctx["job_try"] > 1 | ||
extra = event.setdefault("extra", {}) | ||
extra["arq-job"] = { | ||
"task": ctx["job_name"], | ||
"args": args | ||
if _should_send_default_pii() | ||
else SENSITIVE_DATA_SUBSTITUTE, | ||
"kwargs": kwargs | ||
if _should_send_default_pii() | ||
else SENSITIVE_DATA_SUBSTITUTE, | ||
"retry": ctx["job_try"], | ||
} | ||
|
||
return event | ||
|
||
return event_processor | ||
|
||
|
||
def _wrap_coroutine(name, coroutine): | ||
# type: (str, WorkerCoroutine) -> WorkerCoroutine | ||
async def _sentry_coroutine(ctx, *args, **kwargs): | ||
# type: (Dict[Any, Any], *Any, **Any) -> Any | ||
hub = Hub.current | ||
if hub.get_integration(ArqIntegration) is None: | ||
return await coroutine(*args, **kwargs) | ||
|
||
hub.scope.add_event_processor( | ||
_make_event_processor({**ctx, "job_name": name}, *args, **kwargs) | ||
) | ||
|
||
try: | ||
result = await coroutine(ctx, *args, **kwargs) | ||
except Exception: | ||
exc_info = sys.exc_info() | ||
_capture_exception(exc_info) | ||
reraise(*exc_info) | ||
|
||
return result | ||
|
||
return _sentry_coroutine | ||
|
||
|
||
def patch_func(): | ||
# type: () -> None | ||
old_func = arq.worker.func | ||
|
||
def _sentry_func(*args, **kwargs): | ||
# type: (*Any, **Any) -> Function | ||
hub = Hub.current | ||
|
||
if hub.get_integration(ArqIntegration) is None: | ||
return old_func(*args, **kwargs) | ||
|
||
func = old_func(*args, **kwargs) | ||
|
||
if not getattr(func, "_sentry_is_patched", False): | ||
func.coroutine = _wrap_coroutine(func.name, func.coroutine) | ||
func._sentry_is_patched = True | ||
|
||
return func | ||
|
||
arq.worker.func = _sentry_func |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
import pytest | ||
|
||
pytest.importorskip("arq") |
Oops, something went wrong.