Skip to content

Commit

Permalink
[Fetch Migration] Monitoring script for Data Prepper
Browse files Browse the repository at this point in the history
This change adds a monitor.py Python script that can monitor the running Data Prepper pipeline and shut it down once the target document count has been reached (as reported by Data Prepper's documentsSuccess Prometheus metric) and an idle pipeline is detected.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Aug 16, 2023
1 parent 2c8e3ab commit 92c8ce7
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 0 deletions.
95 changes: 95 additions & 0 deletions FetchMigration/index_configuration_tool/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import argparse
import time
from typing import Optional, List

import requests
from prometheus_client import Metric
from prometheus_client.parser import text_string_to_metric_families

from endpoint_info import EndpointInfo

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> List[Metric]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
# Based on response headers defined in Data Prepper's PrometheusMetricsHandler.java class
metrics = response.content.decode('utf-8')
# Collect generator return values into list
return list(text_string_to_metric_families(metrics))


def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]:
for metric_family in metric_families:
if metric_family.name.endswith(metric_suffix):
return int(metric_family.samples[0].value)
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_part_count: Optional[int],
prev_no_part_count: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_part_count is not None and no_part_count > prev_no_part_count > 0:
return True
return False


def run(args: argparse.Namespace, wait_seconds: int) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
prev_no_partitions_count = 0
terminal = False
target_doc_count = int(args.target_count)
while not terminal:
metrics = fetch_prometheus_metrics(endpoint)
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
shutdown_pipeline(endpoint)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python monitor.py",
description="Monitoring process for a running Data Prepper pipeline.\n" +
"The first input is the Data Prepper URL endpoint.\n" +
"The second input is the target doc_count for termination.",
formatter_class=argparse.RawTextHelpFormatter
)
# Required positional arguments
arg_parser.add_argument(
"dp_endpoint",
help="URL endpoint for the running Data Prepper process"
)
arg_parser.add_argument(
"target_count",
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
)
cli_args = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
run(cli_args, 30)
print("\n##### Ending monitor tool... #####\n")
1 change: 1 addition & 0 deletions FetchMigration/index_configuration_tool/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
jsondiff>=2.0.0
prometheus-client>=0.17.1
pyyaml>=6.0
requests>=2.28.2
responses>=0.23.1
107 changes: 107 additions & 0 deletions FetchMigration/index_configuration_tool/tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import argparse
import unittest
from unittest.mock import patch, MagicMock, PropertyMock

from prometheus_client.parser import text_string_to_metric_families

import monitor
from endpoint_info import EndpointInfo

# Constants
TEST_ENDPOINT = "test"
TEST_AUTH = ("user", "pass")
TEST_FLAG = False
TEST_METRIC_NAME = "test_metric"
TEST_METRIC_VALUE = 123.45
TEST_PROMETHEUS_METRIC_STRING = "# HELP " + TEST_METRIC_NAME + " Unit Test Metric\n"\
+ "# TYPE " + TEST_METRIC_NAME + " gauge\n" \
+ TEST_METRIC_NAME + "{serviceName=\"unittest\",} " + str(TEST_METRIC_VALUE)


class TestMonitor(unittest.TestCase):
@patch('requests.post')
# Note that mock objects are passed bottom-up from the patch order above
def test_shutdown(self, mock_post: MagicMock):
expected_shutdown_url = TEST_ENDPOINT + "/shutdown"
test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG)
monitor.shutdown_pipeline(test_endpoint)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG)

@patch('requests.get')
# Note that mock objects are passed bottom-up from the patch order above
def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
# Set up GET response
mock_response = MagicMock()
# content is a property
mock_content = PropertyMock(return_value=bytes(TEST_PROMETHEUS_METRIC_STRING, "utf-8"))
type(mock_response).content = mock_content
mock_get.return_value = mock_response
# Test fetch
raw_metrics_list = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
mock_get.assert_called_once_with(expected_url, auth=None, verify=True)
self.assertEqual(1, len(raw_metrics_list))
test_metric = raw_metrics_list[0]
self.assertEqual(TEST_METRIC_NAME, test_metric.name)
self.assertTrue(len(test_metric.type) > 0)
self.assertTrue(len(test_metric.documentation) > 0)
self.assertEqual(1, len(test_metric.samples))
test_sample = test_metric.samples[0]
self.assertEqual(TEST_METRIC_NAME, test_sample.name)
self.assertEqual(TEST_METRIC_VALUE, test_sample.value)
self.assertTrue(len(test_sample.labels) > 0)

def test_get_metric_value(self):
# Return value is an int
expected_val = int(TEST_METRIC_VALUE)
test_input = list(text_string_to_metric_families(TEST_PROMETHEUS_METRIC_STRING))
# Should fetch by suffix
val = monitor.get_metric_value(test_input, "metric")
self.assertEqual(expected_val, val)
# No matching metric returns None
val = monitor.get_metric_value(test_input, "invalid")
self.assertEqual(None, val)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, mock_sleep: MagicMock,
mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_fetch.return_value = None
mock_get.return_value = None
# Check will first fail, then pass
mock_check.side_effect = [False, True]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

def test_check_if_complete(self):
# If any of the optional values are missing, we are not complete
self.assertFalse(monitor.check_if_complete(None, 0, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, None, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, 0, None, 0, 2))
# Target count not reached
self.assertFalse(monitor.check_if_complete(1, None, None, 0, 2))
# Target count reached, but has records in flight
self.assertFalse(monitor.check_if_complete(2, 1, None, 0, 2))
# Target count reached, no records in flight, but no prev no_part_count
self.assertFalse(monitor.check_if_complete(2, 0, 1, 0, 2))
# Terminal state
self.assertTrue(monitor.check_if_complete(2, 0, 2, 1, 2))


if __name__ == '__main__':
unittest.main()

0 comments on commit 92c8ce7

Please sign in to comment.