diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 0622941272bafb..c4110a2b0290a3 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2639,6 +2639,7 @@ def build_cdc_postgres_init_db_volume(settings): KAFKA_INGEST_REPLAY_EVENTS = "ingest-replay-events" KAFKA_INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" KAFKA_INGEST_OCCURRENCES = "ingest-occurrences" +KAFKA_INGEST_MONITORS = "ingest-monitors" KAFKA_REGION_TO_CONTROL = "region-to-control" KAFKA_EVENTSTREAM_GENERIC = "generic-events" @@ -2687,6 +2688,7 @@ def build_cdc_postgres_init_db_volume(settings): KAFKA_INGEST_REPLAY_EVENTS: {"cluster": "default"}, KAFKA_INGEST_REPLAYS_RECORDINGS: {"cluster": "default"}, KAFKA_INGEST_OCCURRENCES: {"cluster": "default"}, + KAFKA_INGEST_MONITORS: {"cluster": "default"}, # Metrics Testing Topics KAFKA_SNUBA_GENERICS_METRICS_CS: {"cluster": "default"}, # Region to Control Silo messaging - eg UserIp and AuditLog diff --git a/src/sentry/models/monitorcheckin.py b/src/sentry/models/monitorcheckin.py index 56a654dd589ced..a83aeb2760eaf9 100644 --- a/src/sentry/models/monitorcheckin.py +++ b/src/sentry/models/monitorcheckin.py @@ -59,9 +59,10 @@ class MonitorCheckIn(Model): duration = BoundedPositiveIntegerField(null=True) date_added = models.DateTimeField(default=timezone.now) date_updated = models.DateTimeField(default=timezone.now) - objects = BaseManager(cache_fields=("guid",)) attachment_id = BoundedBigIntegerField(null=True) + objects = BaseManager(cache_fields=("guid",)) + class Meta: app_label = "sentry" db_table = "sentry_monitorcheckin" diff --git a/src/sentry/monitors/consumers/__init__.py b/src/sentry/monitors/consumers/__init__.py new file mode 100644 index 00000000000000..ec56576011d905 --- /dev/null +++ b/src/sentry/monitors/consumers/__init__.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from typing import Any, MutableMapping + +from arroyo import Topic +from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration +from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload +from arroyo.commit import ONCE_PER_SECOND +from arroyo.processing.processor import StreamProcessor +from django.conf import settings + +from sentry.monitors.consumers.check_in import StoreMonitorCheckInStrategyFactory +from sentry.utils import kafka_config +from sentry.utils.batching_kafka_consumer import create_topics + + +def get_monitor_check_ins_consumer( + topic: str, + group_id: str, + auto_offset_reset: str, + strict_offset_reset: bool, + force_topic: str | None, + force_cluster: str | None, +) -> StreamProcessor[KafkaPayload]: + topic = force_topic or topic + consumer_config = get_config( + topic, + group_id, + auto_offset_reset=auto_offset_reset, + strict_offset_reset=strict_offset_reset, + force_cluster=force_cluster, + ) + consumer = KafkaConsumer(consumer_config) + return StreamProcessor( + consumer=consumer, + topic=Topic(topic), + processor_factory=StoreMonitorCheckInStrategyFactory(), + commit_policy=ONCE_PER_SECOND, + ) + + +def get_config( + topic: str, + group_id: str, + auto_offset_reset: str, + strict_offset_reset: bool, + force_cluster: str | None, +) -> MutableMapping[str, Any]: + cluster_name: str = force_cluster or settings.KAFKA_TOPICS[topic]["cluster"] + create_topics(cluster_name, [topic]) + return build_kafka_consumer_configuration( + kafka_config.get_kafka_consumer_cluster_options( + cluster_name, + ), + group_id=group_id, + auto_offset_reset=auto_offset_reset, + strict_offset_reset=strict_offset_reset, + ) diff --git a/src/sentry/monitors/consumers/check_in.py b/src/sentry/monitors/consumers/check_in.py new file mode 100644 index 00000000000000..0627970002e6dc --- /dev/null +++ b/src/sentry/monitors/consumers/check_in.py @@ -0,0 +1,112 @@ +import datetime +import logging +from typing import Mapping + +import msgpack +from arroyo.backends.kafka.consumer import KafkaPayload +from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory +from arroyo.processing.strategies.commit import CommitOffsets +from arroyo.processing.strategies.run_task import RunTask +from arroyo.types import Commit, Message, Partition +from django.db import transaction + +from sentry.models import CheckInStatus, Monitor, MonitorCheckIn, MonitorStatus, Project +from sentry.signals import first_cron_checkin_received, first_cron_monitor_created +from sentry.utils import json +from sentry.utils.dates import to_datetime + +logger = logging.getLogger(__name__) + + +def process_message(message: Message[KafkaPayload]) -> None: + wrapper = msgpack.unpackb(message.payload.value) + + params = json.loads(wrapper["payload"]) + start_time = to_datetime(float(wrapper["start_time"])) + project_id = int(wrapper["project_id"]) + + # TODO: Same as the check-in endpoints. Keep in sync or factor out. + try: + with transaction.atomic(): + try: + monitor = Monitor.objects.select_for_update().get( + guid=params["monitor_id"], project_id=project_id + ) + except Monitor.DoesNotExist: + logger.debug("monitor does not exist: %s", params["monitor_id"]) + return + + status = getattr(CheckInStatus, params["status"].upper()) + duration = int(params["duration"]) if params.get("duration") is not None else None + + try: + check_in = MonitorCheckIn.objects.select_for_update().get( + guid=params["check_in_id"], + project_id=project_id, + monitor=monitor, + ) + + if duration is None: + duration = int((start_time - check_in.date_added).total_seconds() * 1000) + + check_in.update(status=status, duration=duration) + + except MonitorCheckIn.DoesNotExist: + # Infer the original start time of the check-in from the duration. + # Note that the clock of this worker may be off from what Relay is reporting. + date_added = start_time + if duration is not None: + date_added -= datetime.timedelta(seconds=duration) + + check_in = MonitorCheckIn.objects.create( + project_id=project_id, + monitor=monitor, + guid=params["check_in_id"], + duration=duration, + status=status, + date_added=date_added, + date_updated=start_time, + ) + + project = Project.objects.get_from_cache(id=project_id) + if not project.flags.has_cron_checkins: + # Backfill users that already have cron monitors + if not project.flags.has_cron_monitors: + first_cron_monitor_created.send_robust( + project=project, user=None, sender=Project + ) + + first_cron_checkin_received.send_robust( + project=project, monitor_id=str(monitor.guid), sender=Project + ) + + if check_in.status == CheckInStatus.ERROR and monitor.status != MonitorStatus.DISABLED: + monitor.mark_failed(start_time) + return + + monitor_params = { + "last_checkin": start_time, + "next_checkin": monitor.get_next_scheduled_checkin(start_time), + } + + if check_in.status == CheckInStatus.OK and monitor.status != MonitorStatus.DISABLED: + monitor_params["status"] = MonitorStatus.OK + + Monitor.objects.filter(id=monitor.id).exclude(last_checkin__gt=start_time).update( + **monitor_params + ) + except Exception: + # Skip this message and continue processing in the consumer. + logger.exception("Failed to process check-in", exc_info=True) + + +class StoreMonitorCheckInStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): + def create_with_partitions( + self, + commit: Commit, + partitions: Mapping[Partition, int], + ) -> ProcessingStrategy[KafkaPayload]: + return RunTask( + function=process_message, + next_step=CommitOffsets(commit), + ) diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 91fd6d29ddbe1b..cb2188feaafc15 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -27,6 +27,9 @@ "--no-strict-offset-reset", ] +# NOTE: These do NOT start automatically. Add your daemon to the `daemons` list +# in `devserver()` like so: +# daemons += [_get_daemon("my_new_daemon")] _DEFAULT_DAEMONS = { "worker": ["sentry", "run", "worker", "-c", "1", "--autoreload"], "cron": ["sentry", "run", "cron", "--autoreload"], @@ -96,6 +99,7 @@ ], "metrics-billing": ["sentry", "run", "billing-metrics-consumer", "--no-strict-offset-reset"], "profiles": ["sentry", "run", "ingest-profiles", "--no-strict-offset-reset"], + "monitors": ["sentry", "run", "ingest-monitors", "--no-strict-offset-reset"], } @@ -324,7 +328,7 @@ def devserver( ] if settings.SENTRY_USE_RELAY: - daemons += [_get_daemon("ingest")] + daemons += [_get_daemon("ingest"), _get_daemon("monitors")] if settings.SENTRY_USE_PROFILING: daemons += [_get_daemon("profiles")] diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 96a193e37b2e8c..c9806ed1dde704 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -699,6 +699,19 @@ def replays_recordings_consumer(**options): run_processor_with_signals(consumer) +@run.command("ingest-monitors") +@log_options() +@click.option("--topic", default="ingest-monitors", help="Topic to get monitor check-in data from.") +@kafka_options("ingest-monitors") +@strict_offset_reset_option() +@configuration +def monitors_consumer(**options): + from sentry.monitors.consumers import get_monitor_check_ins_consumer + + consumer = get_monitor_check_ins_consumer(**options) + run_processor_with_signals(consumer) + + @run.command("indexer-last-seen-updater") @log_options() @configuration