diff --git a/qa/L0_backend_python/response_sender/response_sender_test.py b/qa/L0_backend_python/response_sender/response_sender_test.py index 01d05de6a4..59e0701356 100644 --- a/qa/L0_backend_python/response_sender/response_sender_test.py +++ b/qa/L0_backend_python/response_sender/response_sender_test.py @@ -28,9 +28,67 @@ import numpy as np import tritonclient.grpc as grpcclient +from tritonclient.utils import InferenceServerException class ResponseSenderTest(unittest.TestCase): + _inputs_parameters_zero_response_pre_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": True, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_zero_response_post_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_one_response_pre_return = { + "number_of_response_before_return": 1, + "send_complete_final_flag_before_return": True, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_one_response_post_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 1, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_two_response_pre_return = { + "number_of_response_before_return": 2, + "send_complete_final_flag_before_return": True, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_two_response_post_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 2, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_response_pre_and_post_return = { + "number_of_response_before_return": 1, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 3, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_one_response_on_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": True, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + def _get_inputs( self, number_of_response_before_return, @@ -74,6 +132,15 @@ def callback(result, error): return callback, responses + def _infer_parallel(self, model_name, parallel_inputs): + callback, responses = self._generate_streaming_callback_and_responses_pair() + with grpcclient.InferenceServerClient("localhost:8001") as client: + client.start_stream(callback) + for inputs in parallel_inputs: + client.async_stream_infer(model_name, inputs) + client.stop_stream() + return responses + def _infer( self, model_name, @@ -90,12 +157,7 @@ def _infer( number_of_response_after_return, send_complete_final_flag_after_return, ) - callback, responses = self._generate_streaming_callback_and_responses_pair() - with grpcclient.InferenceServerClient("localhost:8001") as client: - client.start_stream(callback) - client.async_stream_infer(model_name, inputs) - client.stop_stream() - return responses + return self._infer_parallel(model_name, [inputs]) def _assert_responses_valid( self, @@ -128,6 +190,12 @@ def _assert_responses_valid( self.assertEqual(return_a_response, response_returned) self.assertEqual(number_of_response_after_return, after_return_response_count) + def _assert_responses_exception(self, responses, expected_message): + for response in responses: + self.assertIsNone(response["result"]) + self.assertIsInstance(response["error"], InferenceServerException) + self.assertIn(expected_message, response["error"].message()) + def _assert_decoupled_infer_success( self, number_of_response_before_return, @@ -211,82 +279,137 @@ def _assert_non_decoupled_infer_success( # Decoupled model send response final flag before request return. def test_decoupled_zero_response_pre_return(self): self._assert_decoupled_infer_success( - number_of_response_before_return=0, - send_complete_final_flag_before_return=True, - return_a_response=False, - number_of_response_after_return=0, - send_complete_final_flag_after_return=False, + **self._inputs_parameters_zero_response_pre_return ) # Decoupled model send response final flag after request return. def test_decoupled_zero_response_post_return(self): self._assert_decoupled_infer_success( - number_of_response_before_return=0, - send_complete_final_flag_before_return=False, - return_a_response=False, - number_of_response_after_return=0, - send_complete_final_flag_after_return=True, + **self._inputs_parameters_zero_response_post_return ) # Decoupled model send 1 response before request return. def test_decoupled_one_response_pre_return(self): self._assert_decoupled_infer_success( - number_of_response_before_return=1, - send_complete_final_flag_before_return=True, - return_a_response=False, - number_of_response_after_return=0, - send_complete_final_flag_after_return=False, + **self._inputs_parameters_one_response_pre_return ) # Decoupled model send 1 response after request return. def test_decoupled_one_response_post_return(self): self._assert_decoupled_infer_success( - number_of_response_before_return=0, - send_complete_final_flag_before_return=False, - return_a_response=False, - number_of_response_after_return=1, - send_complete_final_flag_after_return=True, + **self._inputs_parameters_one_response_post_return ) # Decoupled model send 2 response before request return. def test_decoupled_two_response_pre_return(self): self._assert_decoupled_infer_success( - number_of_response_before_return=2, - send_complete_final_flag_before_return=True, - return_a_response=False, - number_of_response_after_return=0, - send_complete_final_flag_after_return=False, + **self._inputs_parameters_two_response_pre_return ) # Decoupled model send 2 response after request return. def test_decoupled_two_response_post_return(self): self._assert_decoupled_infer_success( - number_of_response_before_return=0, - send_complete_final_flag_before_return=False, - return_a_response=False, - number_of_response_after_return=2, - send_complete_final_flag_after_return=True, + **self._inputs_parameters_two_response_post_return ) # Decoupled model send 1 and 3 responses before and after return. def test_decoupled_response_pre_and_post_return(self): self._assert_decoupled_infer_success( - number_of_response_before_return=1, - send_complete_final_flag_before_return=False, - return_a_response=False, - number_of_response_after_return=3, - send_complete_final_flag_after_return=True, + **self._inputs_parameters_response_pre_and_post_return ) # Non-decoupled model send 1 response on return. def test_non_decoupled_one_response_on_return(self): self._assert_non_decoupled_infer_success( - number_of_response_before_return=0, - send_complete_final_flag_before_return=False, - return_a_response=True, - number_of_response_after_return=0, - send_complete_final_flag_after_return=False, + **self._inputs_parameters_one_response_on_return + ) + + # Non-decoupled model send 1 response before return. + def test_non_decoupled_one_response_pre_return(self): + self._assert_non_decoupled_infer_success( + **self._inputs_parameters_one_response_pre_return + ) + + # Non-decoupled model send 1 response after return. + def test_non_decoupled_one_response_post_return(self): + self._assert_non_decoupled_infer_success( + **self._inputs_parameters_one_response_post_return + ) + + # Decoupled model requests each responding differently. + def test_decoupled_multiple_requests(self): + parallel_inputs = [ + self._get_inputs(**self._inputs_parameters_zero_response_pre_return), + self._get_inputs(**self._inputs_parameters_zero_response_post_return), + self._get_inputs(**self._inputs_parameters_one_response_pre_return), + self._get_inputs(**self._inputs_parameters_one_response_post_return), + self._get_inputs(**self._inputs_parameters_two_response_pre_return), + self._get_inputs(**self._inputs_parameters_two_response_post_return), + self._get_inputs(**self._inputs_parameters_response_pre_and_post_return), + ] + expected_number_of_response_before_return = 4 + expected_return_a_response = False + expected_number_of_response_after_return = 6 + + model_name = "response_sender_decoupled_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_decoupled_async_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + + # Non-decoupled model requests each responding differently. + def test_non_decoupled_multiple_requests(self): + parallel_inputs = [ + self._get_inputs(**self._inputs_parameters_one_response_on_return), + self._get_inputs(**self._inputs_parameters_one_response_pre_return), + self._get_inputs(**self._inputs_parameters_one_response_post_return), + ] + expected_number_of_response_before_return = 1 + expected_return_a_response = True + expected_number_of_response_after_return = 1 + + model_name = "response_sender_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_async_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + + # Decoupled model send 1 response on return. + def test_decoupled_one_response_on_return(self): + responses = self._infer( + model_name="response_sender_decoupled", + **self._inputs_parameters_one_response_on_return, + ) + self._assert_responses_exception( + responses, + expected_message="using the decoupled mode and the execute function must return None", ) + # TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback' + # using `py_future.result()` with error hangs on exit. if __name__ == "__main__": diff --git a/qa/L0_backend_python/response_sender/test.sh b/qa/L0_backend_python/response_sender/test.sh index 072b1f34dc..33db46edbb 100755 --- a/qa/L0_backend_python/response_sender/test.sh +++ b/qa/L0_backend_python/response_sender/test.sh @@ -51,6 +51,28 @@ mkdir -p models/response_sender_decoupled_async/1 && \ cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async/1/model.py && \ cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async && \ echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async/config.pbtxt +mkdir -p models/response_sender_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_batching/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender_batching/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_batching && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_batching/config.pbtxt +mkdir -p models/response_sender_decoupled_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_batching/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender_decoupled_batching/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_batching && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_batching/config.pbtxt && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_batching/config.pbtxt +mkdir -p models/response_sender_async_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_async_batching/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_async_batching/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_async_batching && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_async_batching/config.pbtxt +mkdir -p models/response_sender_decoupled_async_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_async_batching/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async_batching/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async_batching && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async_batching/config.pbtxt && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_async_batching/config.pbtxt TEST_LOG="response_sender_test.log" SERVER_LOG="response_sender_test.server.log" diff --git a/qa/python_models/response_sender/model.py b/qa/python_models/response_sender/model.py index 8749b83ee8..eb2273b3b7 100644 --- a/qa/python_models/response_sender/model.py +++ b/qa/python_models/response_sender/model.py @@ -31,7 +31,7 @@ class TritonPythonModel: def initialize(self, args): - self._common = ResponseSenderModelCommon(pb_utils) + self._common = ResponseSenderModelCommon(pb_utils, args) def execute(self, requests): return self._common.execute(requests, use_async=False) diff --git a/qa/python_models/response_sender/model_async.py b/qa/python_models/response_sender/model_async.py index b12eccef06..6ec7ab69c2 100644 --- a/qa/python_models/response_sender/model_async.py +++ b/qa/python_models/response_sender/model_async.py @@ -31,7 +31,7 @@ class TritonPythonModel: def initialize(self, args): - self._common = ResponseSenderModelCommon(pb_utils) + self._common = ResponseSenderModelCommon(pb_utils, args) async def execute(self, requests): return self._common.execute(requests, use_async=True) diff --git a/qa/python_models/response_sender/model_common.py b/qa/python_models/response_sender/model_common.py index 5ec670022c..bd89457bad 100644 --- a/qa/python_models/response_sender/model_common.py +++ b/qa/python_models/response_sender/model_common.py @@ -25,6 +25,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import asyncio +import json import threading import time @@ -32,8 +33,11 @@ class ResponseSenderModelCommon: - def __init__(self, pb_utils): + def __init__(self, pb_utils, args): self._pb_utils = pb_utils + self._is_decoupled = pb_utils.using_decoupled_model_transaction_policy( + json.loads(args["model_config"]) + ) self._background_tasks = set() def _get_instructions_from_request(self, request): @@ -119,6 +123,20 @@ def _send_responses(self, processed_requests, response_id_offset): batch_size = request["batch_size"] response_sender = request["response_sender"] send_complete_final_flag = request["send_complete_final_flag"] + + # TODO: gRPC frontend may segfault if non-decoupled model send response + # final flag separately from the response. + if ( + not self._is_decoupled + and number_of_response == 1 + and send_complete_final_flag + ): + response_sender.send( + self._create_response(batch_size, response_id=response_id_offset), + flags=self._pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL, + ) + continue + for response_id in range(number_of_response): response_sender.send( self._create_response( @@ -132,7 +150,7 @@ def _send_responses(self, processed_requests, response_id_offset): def _send_responses_delayed_threaded(self, processed_requests, response_id_offset): def response_thread(send_responses, processed_requests, response_id_offset): - time.sleep(1) # response after requests are released + time.sleep(0.5) # response after requests are released send_responses(processed_requests, response_id_offset) thread = threading.Thread( @@ -146,7 +164,7 @@ def _send_responses_delayed_async(self, processed_requests, response_id_offset): async def response_async( send_responses, processed_requests, response_id_offset ): - asyncio.sleep(1) # response after requests are released + await asyncio.sleep(0.5) # response after requests are released send_responses(processed_requests, response_id_offset) coro = response_async(