From 0811c018c9456f788d280874cfc1b92030cbe62b Mon Sep 17 00:00:00 2001 From: Tanmay Verma Date: Wed, 8 Nov 2023 15:16:46 -0800 Subject: [PATCH] Enhance testing for pending request count (#6532) * Enhance testing for pending request count * Improve the documentation * Add more documentation --- qa/L0_metrics/metrics_queue_size_test.py | 54 ++++++++++++++++++++---- qa/L0_metrics/test.sh | 6 ++- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/qa/L0_metrics/metrics_queue_size_test.py b/qa/L0_metrics/metrics_queue_size_test.py index 18a601d01b..0554274109 100755 --- a/qa/L0_metrics/metrics_queue_size_test.py +++ b/qa/L0_metrics/metrics_queue_size_test.py @@ -83,7 +83,10 @@ def setUp(self): url=self.server_url, concurrency=self.concurrency ) - def _validate_model_config(self, model_name): + # Test specific configurations + self.max_queue_size = 0 + + def _validate_model_config(self, model_name, max_queue_size=0): config = self.client.get_model_config(model_name) print(config) params = config.get("parameters", {}) @@ -91,6 +94,13 @@ def _validate_model_config(self, model_name): max_batch_size = config.get("max_batch_size") self.assertEqual(delay_ms, self.delay_ms) self.assertEqual(max_batch_size, self.max_batch_size) + + dynamic_batching = config.get("dynamic_batching", {}) + default_queue_policy = dynamic_batching.get("default_queue_policy", {}) + self.max_queue_size = default_queue_policy.get("max_queue_size", 0) + + self.assertEqual(self.max_queue_size, max_queue_size) + return config def _get_metrics(self): @@ -148,8 +158,10 @@ def _send_async_requests_sequence(self, num_seq_slots, model_name, inputs, futur ) num_sent += 1 - def _test_helper(self, model_name, batch_size, send_requests_func): - self._validate_model_config(model_name) + def _test_helper( + self, model_name, batch_size, send_requests_func, max_queue_size=0 + ): + self._validate_model_config(model_name, max_queue_size=max_queue_size) queue_size = QUEUE_METRIC_TEMPLATE.format(model_name=model_name) infer_count = INFER_METRIC_TEMPLATE.format(model_name=model_name) @@ -162,9 +174,16 @@ def _test_helper(self, model_name, batch_size, send_requests_func): # Give Triton a second to load all requests into queues time.sleep(1) - starting_queue_size = self.num_requests - batch_size # Start from (num_requests-batch_size) because 1 batch should be executing, # and the rest of the requests should be queued. + # If max_queue_size is specified then the queued requests would be capped + # at max_queue_size. + if max_queue_size != 0: + self._assert_metric_equals(queue_size, max_queue_size) + starting_queue_size = max_queue_size + else: + starting_queue_size = self.num_requests - batch_size + for expected_queue_size in range(starting_queue_size, 0, -1 * batch_size): self._assert_metric_equals(queue_size, expected_queue_size) time.sleep(self.delay_sec) @@ -174,13 +193,21 @@ def _test_helper(self, model_name, batch_size, send_requests_func): time.sleep(self.delay_sec) # All requests should've been executed without any batching - self._assert_metric_equals(infer_count, self.num_requests) - expected_exec_count = math.ceil(self.num_requests / batch_size) + expected_infer_count = starting_queue_size + batch_size + self._assert_metric_equals(infer_count, expected_infer_count) + expected_exec_count = math.ceil(expected_infer_count / batch_size) self._assert_metric_equals(exec_count, expected_exec_count) - # Verify no inference exceptions were raised + failed_count = 0 for future in futures: - future.get_result() + try: + future.get_result() + except Exception as e: + failed_count = failed_count + 1 + + self.assertEqual( + failed_count, self.num_requests - batch_size - starting_queue_size + ) def test_default_scheduler(self): model_name = "default" @@ -194,6 +221,17 @@ def test_dynamic_batch_scheduler(self): batch_size = self.max_batch_size self._test_helper(model_name, batch_size, self._send_async_requests) + def test_fail_max_queue_size(self): + model_name = "max_queue_size" + # This test checks whether metrics are properly accounts for requests + # that fail to enqueue on the server. The test sets the max_queue_size + # and any additional requests beyond the specified queue size should fail + # instead of waiting for execution. + batch_size = self.max_batch_size + self._test_helper( + model_name, batch_size, self._send_async_requests, max_queue_size=4 + ) + def test_sequence_batch_scheduler_direct(self): model_name = "sequence_direct" # With sufficient queue delay and minimum_slot_utilization set, we diff --git a/qa/L0_metrics/test.sh b/qa/L0_metrics/test.sh index 9b0f634b82..dea1c62041 100755 --- a/qa/L0_metrics/test.sh +++ b/qa/L0_metrics/test.sh @@ -325,6 +325,10 @@ DYNAMIC_MODEL="${MODELDIR}/dynamic" cp -r "${DEFAULT_MODEL}" "${DYNAMIC_MODEL}" echo -e "\ndynamic_batching { max_queue_delay_microseconds: ${MAX_QUEUE_DELAY_US} }\n" >> "${DYNAMIC_MODEL}/config.pbtxt" +MAX_QUEUE_SIZE_MODEL="${MODELDIR}/max_queue_size" +cp -r "${DEFAULT_MODEL}" "${MAX_QUEUE_SIZE_MODEL}" +echo -e "\ndynamic_batching { max_queue_delay_microseconds: ${MAX_QUEUE_DELAY_US} default_queue_policy { max_queue_size: 4 } }\n" >> "${MAX_QUEUE_SIZE_MODEL}/config.pbtxt" + SEQUENCE_DIRECT_MODEL="${MODELDIR}/sequence_direct" cp -r "${DEFAULT_MODEL}" "${SEQUENCE_DIRECT_MODEL}" echo -e "\nsequence_batching { direct { max_queue_delay_microseconds: ${MAX_QUEUE_DELAY_US}, minimum_slot_utilization: 1.0 } }\n" >> "${SEQUENCE_DIRECT_MODEL}/config.pbtxt" @@ -347,7 +351,7 @@ run_and_check_server python3 ${PYTHON_TEST} 2>&1 | tee ${CLIENT_LOG} kill $SERVER_PID wait $SERVER_PID -expected_tests=5 +expected_tests=6 check_unit_test "${expected_tests}" if [ $RET -eq 0 ]; then