-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(monitors): Add basic ingestion via Kafka #45079
Merged
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
3950352
feat(cron): Add basic ingestion via Kafka
jan-auer 48ef92e
meta(devserver): Add crons to devserver
jan-auer a720de7
ref: Update with relay schema
jan-auer a916db1
ref(monitors): Rename all components
jan-auer e265a4e
fix(settings): Fix kafka topic and consumer configuration
jan-auer d1c6a5a
fix(monitors): Create topics in dev env
jan-auer ef4f440
Merge branch 'master' into feat/crons-ingest-kafka
jan-auer 741f37f
ref(monitors): Update consumer bootstrap
jan-auer 717a0aa
fix(devserver): Start monitors with ingest
jan-auer 6e92ff2
Merge branch 'master' into feat/crons-ingest-kafka
jan-auer 0fbfcea
fix(monitors): Use monitor GUID for updates
jan-auer 7b7fab7
feat(monitors): Support check-in creation via Kafka
jan-auer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any, MutableMapping | ||
|
||
from arroyo import Topic | ||
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration | ||
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload | ||
from arroyo.commit import ONCE_PER_SECOND | ||
from arroyo.processing.processor import StreamProcessor | ||
from django.conf import settings | ||
|
||
from sentry.monitors.consumers.check_in import StoreMonitorCheckInStrategyFactory | ||
from sentry.utils import kafka_config | ||
from sentry.utils.batching_kafka_consumer import create_topics | ||
|
||
|
||
def get_monitor_check_ins_consumer( | ||
topic: str, | ||
group_id: str, | ||
auto_offset_reset: str, | ||
strict_offset_reset: bool, | ||
force_topic: str | None, | ||
force_cluster: str | None, | ||
) -> StreamProcessor[KafkaPayload]: | ||
topic = force_topic or topic | ||
consumer_config = get_config( | ||
topic, | ||
group_id, | ||
auto_offset_reset=auto_offset_reset, | ||
strict_offset_reset=strict_offset_reset, | ||
force_cluster=force_cluster, | ||
) | ||
consumer = KafkaConsumer(consumer_config) | ||
return StreamProcessor( | ||
consumer=consumer, | ||
topic=Topic(topic), | ||
processor_factory=StoreMonitorCheckInStrategyFactory(), | ||
commit_policy=ONCE_PER_SECOND, | ||
) | ||
|
||
|
||
def get_config( | ||
topic: str, | ||
group_id: str, | ||
auto_offset_reset: str, | ||
strict_offset_reset: bool, | ||
force_cluster: str | None, | ||
) -> MutableMapping[str, Any]: | ||
cluster_name: str = force_cluster or settings.KAFKA_TOPICS[topic]["cluster"] | ||
create_topics(cluster_name, [topic]) | ||
return build_kafka_consumer_configuration( | ||
kafka_config.get_kafka_consumer_cluster_options( | ||
cluster_name, | ||
), | ||
group_id=group_id, | ||
auto_offset_reset=auto_offset_reset, | ||
strict_offset_reset=strict_offset_reset, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import datetime | ||
import logging | ||
from typing import Mapping | ||
|
||
import msgpack | ||
from arroyo.backends.kafka.consumer import KafkaPayload | ||
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory | ||
from arroyo.processing.strategies.commit import CommitOffsets | ||
from arroyo.processing.strategies.run_task import RunTask | ||
from arroyo.types import Commit, Message, Partition | ||
from django.db import transaction | ||
|
||
from sentry.models import CheckInStatus, Monitor, MonitorCheckIn, MonitorStatus, Project | ||
from sentry.signals import first_cron_checkin_received, first_cron_monitor_created | ||
from sentry.utils import json | ||
from sentry.utils.dates import to_datetime | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def process_message(message: Message[KafkaPayload]) -> None: | ||
wrapper = msgpack.unpackb(message.payload.value) | ||
|
||
params = json.loads(wrapper["payload"]) | ||
start_time = to_datetime(float(wrapper["start_time"])) | ||
project_id = int(wrapper["project_id"]) | ||
|
||
# TODO: Same as the check-in endpoints. Keep in sync or factor out. | ||
try: | ||
with transaction.atomic(): | ||
try: | ||
monitor = Monitor.objects.select_for_update().get( | ||
guid=params["monitor_id"], project_id=project_id | ||
) | ||
except Monitor.DoesNotExist: | ||
logger.debug("monitor does not exist: %s", params["monitor_id"]) | ||
return | ||
|
||
status = getattr(CheckInStatus, params["status"].upper()) | ||
duration = int(params["duration"]) if params.get("duration") is not None else None | ||
|
||
try: | ||
check_in = MonitorCheckIn.objects.select_for_update().get( | ||
guid=params["check_in_id"], | ||
project_id=project_id, | ||
monitor=monitor, | ||
) | ||
|
||
if duration is None: | ||
duration = int((start_time - check_in.date_added).total_seconds() * 1000) | ||
|
||
check_in.update(status=status, duration=duration) | ||
|
||
except MonitorCheckIn.DoesNotExist: | ||
# Infer the original start time of the check-in from the duration. | ||
# Note that the clock of this worker may be off from what Relay is reporting. | ||
date_added = start_time | ||
if duration is not None: | ||
date_added -= datetime.timedelta(seconds=duration) | ||
|
||
check_in = MonitorCheckIn.objects.create( | ||
project_id=project_id, | ||
monitor=monitor, | ||
guid=params["check_in_id"], | ||
duration=duration, | ||
status=status, | ||
date_added=date_added, | ||
date_updated=start_time, | ||
) | ||
|
||
project = Project.objects.get_from_cache(id=project_id) | ||
if not project.flags.has_cron_checkins: | ||
# Backfill users that already have cron monitors | ||
if not project.flags.has_cron_monitors: | ||
first_cron_monitor_created.send_robust( | ||
project=project, user=None, sender=Project | ||
) | ||
|
||
first_cron_checkin_received.send_robust( | ||
project=project, monitor_id=str(monitor.guid), sender=Project | ||
) | ||
|
||
if check_in.status == CheckInStatus.ERROR and monitor.status != MonitorStatus.DISABLED: | ||
monitor.mark_failed(start_time) | ||
return | ||
|
||
monitor_params = { | ||
"last_checkin": start_time, | ||
"next_checkin": monitor.get_next_scheduled_checkin(start_time), | ||
} | ||
|
||
if check_in.status == CheckInStatus.OK and monitor.status != MonitorStatus.DISABLED: | ||
monitor_params["status"] = MonitorStatus.OK | ||
|
||
Monitor.objects.filter(id=monitor.id).exclude(last_checkin__gt=start_time).update( | ||
**monitor_params | ||
) | ||
except Exception: | ||
# Skip this message and continue processing in the consumer. | ||
logger.exception("Failed to process check-in", exc_info=True) | ||
|
||
|
||
class StoreMonitorCheckInStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): | ||
def create_with_partitions( | ||
self, | ||
commit: Commit, | ||
partitions: Mapping[Partition, int], | ||
) -> ProcessingStrategy[KafkaPayload]: | ||
return RunTask( | ||
function=process_message, | ||
next_step=CommitOffsets(commit), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way we could add types for this btw?