Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wedamija committed Dec 14, 2024
1 parent 7e170d1 commit b9d2073
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/sentry/remote_subscriptions/consumers/result_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from collections import defaultdict
from collections.abc import Generator, Mapping
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Generic, Literal, TypeVar

import sentry_sdk
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
self.mode = mode
if mode == "parallel":
self.parallel = True
self.parallel_executor = ProcessPoolExecutor(max_workers=max_workers)
self.parallel_executor = ThreadPoolExecutor(max_workers=max_workers)

if max_batch_size is not None:
self.max_batch_size = max_batch_size
Expand Down
100 changes: 96 additions & 4 deletions tests/sentry/uptime/consumers/test_results_consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
from arroyo import Message
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy
from arroyo.types import BrokerValue, Partition, Topic
from sentry_kafka_schemas.schema_types.uptime_results_v1 import (
CHECKSTATUS_FAILURE,
Expand Down Expand Up @@ -50,7 +51,9 @@ def setUp(self):
owner=self.user,
)

def send_result(self, result: CheckResult):
def send_result(
self, result: CheckResult, consumer: ProcessingStrategy[KafkaPayload] | None = None
):
codec = kafka_definition.get_topic_codec(kafka_definition.Topic.UPTIME_RESULTS)
message = Message(
BrokerValue(
Expand All @@ -61,10 +64,11 @@ def send_result(self, result: CheckResult):
)
)
with self.feature(UptimeDomainCheckFailure.build_ingest_feature_name()):
factory = UptimeResultsStrategyFactory()
commit = mock.Mock()
if consumer is None:
factory = UptimeResultsStrategyFactory()
commit = mock.Mock()
consumer = factory.create_with_partitions(commit, {self.partition: 0})

consumer = factory.create_with_partitions(commit, {self.partition: 0})
consumer.submit(message)

def test(self):
Expand Down Expand Up @@ -607,3 +611,91 @@ def test_onboarding_success_graduate(self):
AUTO_DETECTED_ACTIVE_SUBSCRIPTION_INTERVAL.total_seconds()
)
assert uptime_subscription.url == new_uptime_subscription.url

def test_parallel(self) -> None:
"""
Validates that the consumer in parallel mode correctly groups check-ins
into groups by their monitor slug / environment
"""

factory = UptimeResultsStrategyFactory(mode="parallel", max_batch_size=3, max_workers=1)
consumer = factory.create_with_partitions(mock.Mock(), {self.partition: 0})
with mock.patch.object(type(factory.result_processor), "__call__") as mock_processor_call:
subscription_2 = self.create_uptime_subscription(
subscription_id=uuid.uuid4().hex, interval_seconds=300, url="http://santry.io"
)

result_1 = self.create_uptime_result(
self.subscription.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=5),
)

self.send_result(result_1, consumer=consumer)
result_2 = self.create_uptime_result(
self.subscription.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=4),
)

self.send_result(result_2, consumer=consumer)
# This will fill the batch
result_3 = self.create_uptime_result(
subscription_2.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=4),
)
self.send_result(result_3, consumer=consumer)
# Should be no calls yet, since we didn't send the batch
assert mock_processor_call.call_count == 0
# One more causes the previous batch to send
self.send_result(
self.create_uptime_result(
subscription_2.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=3),
),
consumer=consumer,
)

assert mock_processor_call.call_count == 3
mock_processor_call.assert_has_calls([call(result_1), call(result_2), call(result_3)])

@mock.patch(
"sentry.remote_subscriptions.consumers.result_consumer.ResultsStrategyFactory.process_group"
)
def test_parallel_grouping(self, mock_process_group) -> None:
"""
Validates that the consumer in parallel mode correctly groups check-ins
into groups by their monitor slug / environment
"""

factory = UptimeResultsStrategyFactory(mode="parallel", max_batch_size=3, max_workers=1)
consumer = factory.create_with_partitions(mock.Mock(), {self.partition: 0})
subscription_2 = self.create_uptime_subscription(
subscription_id=uuid.uuid4().hex, interval_seconds=300, url="http://santry.io"
)

result_1 = self.create_uptime_result(
self.subscription.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=5),
)

self.send_result(result_1, consumer=consumer)
result_2 = self.create_uptime_result(
self.subscription.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=4),
)

self.send_result(result_2, consumer=consumer)
# This will fill the batch
result_3 = self.create_uptime_result(
subscription_2.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=4),
)
self.send_result(result_3, consumer=consumer)
# Should be no calls yet, since we didn't send the batch
assert mock_process_group.call_count == 0
# One more causes the previous batch to send
self.send_result(result_3, consumer=consumer)
assert mock_process_group.call_count == 2
group_1 = mock_process_group.mock_calls[0].args[0]
group_2 = mock_process_group.mock_calls[1].args[0]
assert group_1 == [result_1, result_2]
assert group_2 == [result_3]

0 comments on commit b9d2073

Please sign in to comment.