Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fetch Migration] Monitoring script for Data Prepper #264

Merged
merged 2 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions FetchMigration/index_configuration_tool/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import argparse
import time
from typing import Optional, List

import requests
from prometheus_client import Metric
from prometheus_client.parser import text_string_to_metric_families

from endpoint_info import EndpointInfo

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
except requests.exceptions.RequestException:
return None
# Based on response headers defined in Data Prepper's PrometheusMetricsHandler.java class
metrics = response.content.decode('utf-8')
# Collect generator return values into list
return list(text_string_to_metric_families(metrics))


def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]:
for metric_family in metric_families:
if metric_family.name.endswith(metric_suffix):
return int(metric_family.samples[0].value)
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_part_count: Optional[int],
prev_no_part_count: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_part_count is not None and no_part_count > prev_no_part_count > 0:
return True
return False


def run(args: argparse.Namespace, wait_seconds: int) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
kartg marked this conversation as resolved.
Show resolved Hide resolved
prev_no_partitions_count = 0
terminal = False
while not terminal:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How high is the risk that a doc fails to be written for whatever reason, and we're stuck here forever because doc_count is never >= target? Should we also have a timeout value that if doc_count doesn't change for X seconds/minutes, we give up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good callout. I'll capture this as a follow up item in JIRA - https://opensearch.atlassian.net/browse/MIGRATIONS-1303

# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, args.target_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count

if not terminal:
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
shutdown_pipeline(endpoint)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python monitor.py",
description="""Monitoring process for a running Data Prepper pipeline.
The first input is the Data Prepper URL endpoint.
The second input is the target doc_count for termination.""",
formatter_class=argparse.RawTextHelpFormatter
)
# Required positional arguments
arg_parser.add_argument(
"dp_endpoint",
help="URL endpoint for the running Data Prepper process"
)
arg_parser.add_argument(
"target_count",
type=int,
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
kartg marked this conversation as resolved.
Show resolved Hide resolved
)
cli_args = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
run(cli_args, 30)
print("\n##### Ending monitor tool... #####\n")
1 change: 1 addition & 0 deletions FetchMigration/index_configuration_tool/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
jsondiff>=2.0.0
prometheus-client>=0.17.1
pyyaml>=6.0
requests>=2.28.2
responses>=0.23.1
143 changes: 143 additions & 0 deletions FetchMigration/index_configuration_tool/tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import argparse
import unittest
from unittest.mock import patch, MagicMock, PropertyMock

import requests
import responses
from prometheus_client.parser import text_string_to_metric_families

import monitor
from endpoint_info import EndpointInfo

# Constants
TEST_ENDPOINT = "test"
TEST_AUTH = ("user", "pass")
TEST_FLAG = False
TEST_METRIC_NAME = "test_metric"
TEST_METRIC_VALUE = 123.45
TEST_PROMETHEUS_METRIC_STRING = "# HELP " + TEST_METRIC_NAME + " Unit Test Metric\n"\
+ "# TYPE " + TEST_METRIC_NAME + " gauge\n" \
+ TEST_METRIC_NAME + "{serviceName=\"unittest\",} " + str(TEST_METRIC_VALUE)


class TestMonitor(unittest.TestCase):
@patch('requests.post')
def test_shutdown(self, mock_post: MagicMock):
expected_shutdown_url = TEST_ENDPOINT + "/shutdown"
test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG)
monitor.shutdown_pipeline(test_endpoint)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG)

@patch('requests.get')
def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
# Set up GET response
mock_response = MagicMock()
# content is a property
mock_content = PropertyMock(return_value=bytes(TEST_PROMETHEUS_METRIC_STRING, "utf-8"))
type(mock_response).content = mock_content
mock_get.return_value = mock_response
# Test fetch
raw_metrics_list = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
mock_get.assert_called_once_with(expected_url, auth=None, verify=True)
self.assertEqual(1, len(raw_metrics_list))
test_metric = raw_metrics_list[0]
self.assertEqual(TEST_METRIC_NAME, test_metric.name)
self.assertTrue(len(test_metric.type) > 0)
self.assertTrue(len(test_metric.documentation) > 0)
self.assertEqual(1, len(test_metric.samples))
test_sample = test_metric.samples[0]
self.assertEqual(TEST_METRIC_NAME, test_sample.name)
self.assertEqual(TEST_METRIC_VALUE, test_sample.value)
self.assertTrue(len(test_sample.labels) > 0)

@responses.activate
def test_fetch_prometheus_metrics_failure(self):
# Set up expected GET call with a mock exception
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
responses.get(expected_url, body=requests.Timeout())
# Test fetch
result = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
self.assertIsNone(result)

def test_get_metric_value(self):
# Return value is an int
expected_val = int(TEST_METRIC_VALUE)
test_input = list(text_string_to_metric_families(TEST_PROMETHEUS_METRIC_STRING))
# Should fetch by suffix
val = monitor.get_metric_value(test_input, "metric")
self.assertEqual(expected_val, val)
# No matching metric returns None
val = monitor.get_metric_value(test_input, "invalid")
self.assertEqual(None, val)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, mock_sleep: MagicMock,
mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_get.return_value = None
# Check will first fail, then pass
mock_check.side_effect = [False, True]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock,
mock_sleep: MagicMock, mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_get.return_value = None
mock_check.return_value = True
# Fetch call will first fail, then succeed
mock_fetch.side_effect = [None, MagicMock()]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

def test_check_if_complete(self):
# If any of the optional values are missing, we are not complete
self.assertFalse(monitor.check_if_complete(None, 0, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, None, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, 0, None, 0, 2))
# Target count not reached
self.assertFalse(monitor.check_if_complete(1, None, None, 0, 2))
# Target count reached, but has records in flight
self.assertFalse(monitor.check_if_complete(2, 1, None, 0, 2))
# Target count reached, no records in flight, but no prev no_part_count
self.assertFalse(monitor.check_if_complete(2, 0, 1, 0, 2))
# Terminal state
self.assertTrue(monitor.check_if_complete(2, 0, 2, 1, 2))


if __name__ == '__main__':
unittest.main()