diff --git a/qa/L0_backend_python/response_sender/response_sender_test.py b/qa/L0_backend_python/response_sender/response_sender_test.py new file mode 100644 index 0000000000..01d05de6a4 --- /dev/null +++ b/qa/L0_backend_python/response_sender/response_sender_test.py @@ -0,0 +1,293 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import unittest + +import numpy as np +import tritonclient.grpc as grpcclient + + +class ResponseSenderTest(unittest.TestCase): + def _get_inputs( + self, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + shape = [1, 1] + inputs = [ + grpcclient.InferInput("NUMBER_OF_RESPONSE_BEFORE_RETURN", shape, "UINT8"), + grpcclient.InferInput( + "SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN", shape, "BOOL" + ), + grpcclient.InferInput("RETURN_A_RESPONSE", shape, "BOOL"), + grpcclient.InferInput("NUMBER_OF_RESPONSE_AFTER_RETURN", shape, "UINT8"), + grpcclient.InferInput( + "SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN", shape, "BOOL" + ), + ] + inputs[0].set_data_from_numpy( + np.array([[number_of_response_before_return]], np.uint8) + ) + inputs[1].set_data_from_numpy( + np.array([[send_complete_final_flag_before_return]], bool) + ) + inputs[2].set_data_from_numpy(np.array([[return_a_response]], bool)) + inputs[3].set_data_from_numpy( + np.array([[number_of_response_after_return]], np.uint8) + ) + inputs[4].set_data_from_numpy( + np.array([[send_complete_final_flag_after_return]], bool) + ) + return inputs + + def _generate_streaming_callback_and_responses_pair(self): + responses = [] # [{"result": result, "error": error}, ...] + + def callback(result, error): + responses.append({"result": result, "error": error}) + + return callback, responses + + def _infer( + self, + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + inputs = self._get_inputs( + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + callback, responses = self._generate_streaming_callback_and_responses_pair() + with grpcclient.InferenceServerClient("localhost:8001") as client: + client.start_stream(callback) + client.async_stream_infer(model_name, inputs) + client.stop_stream() + return responses + + def _assert_responses_valid( + self, + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ): + before_return_response_count = 0 + response_returned = False + after_return_response_count = 0 + for response in responses: + result, error = response["result"], response["error"] + self.assertIsNone(error) + result_np = result.as_numpy(name="INDEX") + response_id = result_np.sum() / result_np.shape[0] + if response_id < 1000: + self.assertFalse( + response_returned, + "Expect at most one response returned per request.", + ) + response_returned = True + elif response_id < 2000: + before_return_response_count += 1 + elif response_id < 3000: + after_return_response_count += 1 + else: + raise ValueError(f"Unexpected response_id: {response_id}") + self.assertEqual(number_of_response_before_return, before_return_response_count) + self.assertEqual(return_a_response, response_returned) + self.assertEqual(number_of_response_after_return, after_return_response_count) + + def _assert_decoupled_infer_success( + self, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + model_name = "response_sender_decoupled" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_decoupled_async" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ) + + def _assert_non_decoupled_infer_success( + self, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + model_name = "response_sender" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_async" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ) + + # Decoupled model send response final flag before request return. + def test_decoupled_zero_response_pre_return(self): + self._assert_decoupled_infer_success( + number_of_response_before_return=0, + send_complete_final_flag_before_return=True, + return_a_response=False, + number_of_response_after_return=0, + send_complete_final_flag_after_return=False, + ) + + # Decoupled model send response final flag after request return. + def test_decoupled_zero_response_post_return(self): + self._assert_decoupled_infer_success( + number_of_response_before_return=0, + send_complete_final_flag_before_return=False, + return_a_response=False, + number_of_response_after_return=0, + send_complete_final_flag_after_return=True, + ) + + # Decoupled model send 1 response before request return. + def test_decoupled_one_response_pre_return(self): + self._assert_decoupled_infer_success( + number_of_response_before_return=1, + send_complete_final_flag_before_return=True, + return_a_response=False, + number_of_response_after_return=0, + send_complete_final_flag_after_return=False, + ) + + # Decoupled model send 1 response after request return. + def test_decoupled_one_response_post_return(self): + self._assert_decoupled_infer_success( + number_of_response_before_return=0, + send_complete_final_flag_before_return=False, + return_a_response=False, + number_of_response_after_return=1, + send_complete_final_flag_after_return=True, + ) + + # Decoupled model send 2 response before request return. + def test_decoupled_two_response_pre_return(self): + self._assert_decoupled_infer_success( + number_of_response_before_return=2, + send_complete_final_flag_before_return=True, + return_a_response=False, + number_of_response_after_return=0, + send_complete_final_flag_after_return=False, + ) + + # Decoupled model send 2 response after request return. + def test_decoupled_two_response_post_return(self): + self._assert_decoupled_infer_success( + number_of_response_before_return=0, + send_complete_final_flag_before_return=False, + return_a_response=False, + number_of_response_after_return=2, + send_complete_final_flag_after_return=True, + ) + + # Decoupled model send 1 and 3 responses before and after return. + def test_decoupled_response_pre_and_post_return(self): + self._assert_decoupled_infer_success( + number_of_response_before_return=1, + send_complete_final_flag_before_return=False, + return_a_response=False, + number_of_response_after_return=3, + send_complete_final_flag_after_return=True, + ) + + # Non-decoupled model send 1 response on return. + def test_non_decoupled_one_response_on_return(self): + self._assert_non_decoupled_infer_success( + number_of_response_before_return=0, + send_complete_final_flag_before_return=False, + return_a_response=True, + number_of_response_after_return=0, + send_complete_final_flag_after_return=False, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/qa/L0_backend_python/response_sender/test.sh b/qa/L0_backend_python/response_sender/test.sh new file mode 100755 index 0000000000..072b1f34dc --- /dev/null +++ b/qa/L0_backend_python/response_sender/test.sh @@ -0,0 +1,89 @@ +#!/bin/bash +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +source ../../common/util.sh + +RET=0 + +# +# Test response sender under decoupled / non-decoupled +# +rm -rf models && mkdir models +mkdir -p models/response_sender/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender +mkdir -p models/response_sender_decoupled/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender_decoupled/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled/config.pbtxt +mkdir -p models/response_sender_async/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_async/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_async/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_async +mkdir -p models/response_sender_decoupled_async/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_async/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async/config.pbtxt + +TEST_LOG="response_sender_test.log" +SERVER_LOG="response_sender_test.server.log" +SERVER_ARGS="--model-repository=${MODELDIR}/response_sender/models --backend-directory=${BACKEND_DIR} --log-verbose=1" + +run_server +if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 +fi + +set +e +SERVER_LOG=$SERVER_LOG python3 -m pytest --junitxml=concurrency_test.report.xml response_sender_test.py > $TEST_LOG 2>&1 +if [ $? -ne 0 ]; then + echo -e "\n***\n*** response sender test FAILED\n***" + cat $TEST_LOG + RET=1 +fi +set -e + +kill $SERVER_PID +wait $SERVER_PID + +# +# Test async response sender under decoupled / non-decoupled +# + +# TODO + +if [ $RET -eq 1 ]; then + echo -e "\n***\n*** Response sender test FAILED\n***" +else + echo -e "\n***\n*** Response sender test Passed\n***" +fi +exit $RET diff --git a/qa/L0_backend_python/test.sh b/qa/L0_backend_python/test.sh index 98c88f7e9e..bbaabbaf10 100755 --- a/qa/L0_backend_python/test.sh +++ b/qa/L0_backend_python/test.sh @@ -409,7 +409,7 @@ fi # Disable variants test for Jetson since already built without GPU Tensor support # Disable decoupled test because it uses GPU tensors if [ "$TEST_JETSON" == "0" ]; then - SUBTESTS="ensemble bls decoupled" + SUBTESTS="ensemble bls decoupled response_sender" # [DLIS-6093] Disable variants test for Windows since tests are not executed in docker container (cannot apt update/install) # [DLIS-5970] Disable io tests for Windows since GPU Tensors are not supported # [DLIS-6122] Disable model_control & request_rescheduling tests for Windows since they require load/unload diff --git a/qa/python_models/response_sender/config.pbtxt b/qa/python_models/response_sender/config.pbtxt new file mode 100644 index 0000000000..ef0c29e3bf --- /dev/null +++ b/qa/python_models/response_sender/config.pbtxt @@ -0,0 +1,65 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +backend: "python" +max_batch_size: 8 + +input [ + { + name: "NUMBER_OF_RESPONSE_BEFORE_RETURN" + data_type: TYPE_UINT8 + dims: [ 1 ] + }, + { + name: "SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN" + data_type: TYPE_BOOL + dims: [ 1 ] + }, + { + name: "RETURN_A_RESPONSE" + data_type: TYPE_BOOL + dims: [ 1 ] + }, + { + name: "NUMBER_OF_RESPONSE_AFTER_RETURN" + data_type: TYPE_UINT8 + dims: [ 1 ] + }, + { + name: "SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN" + data_type: TYPE_BOOL + dims: [ 1 ] + } +] +output [ + { + name: "INDEX" + data_type: TYPE_UINT16 + dims: [ 1 ] + } +] + +instance_group [{ kind: KIND_CPU }] diff --git a/qa/python_models/response_sender/model.py b/qa/python_models/response_sender/model.py new file mode 100644 index 0000000000..8749b83ee8 --- /dev/null +++ b/qa/python_models/response_sender/model.py @@ -0,0 +1,37 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import triton_python_backend_utils as pb_utils +from model_common import ResponseSenderModelCommon + + +class TritonPythonModel: + def initialize(self, args): + self._common = ResponseSenderModelCommon(pb_utils) + + def execute(self, requests): + return self._common.execute(requests, use_async=False) diff --git a/qa/python_models/response_sender/model_async.py b/qa/python_models/response_sender/model_async.py new file mode 100644 index 0000000000..b12eccef06 --- /dev/null +++ b/qa/python_models/response_sender/model_async.py @@ -0,0 +1,37 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import triton_python_backend_utils as pb_utils +from model_common import ResponseSenderModelCommon + + +class TritonPythonModel: + def initialize(self, args): + self._common = ResponseSenderModelCommon(pb_utils) + + async def execute(self, requests): + return self._common.execute(requests, use_async=True) diff --git a/qa/python_models/response_sender/model_common.py b/qa/python_models/response_sender/model_common.py new file mode 100644 index 0000000000..5ec670022c --- /dev/null +++ b/qa/python_models/response_sender/model_common.py @@ -0,0 +1,210 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import asyncio +import threading +import time + +import numpy as np + + +class ResponseSenderModelCommon: + def __init__(self, pb_utils): + self._pb_utils = pb_utils + self._background_tasks = set() + + def _get_instructions_from_request(self, request): + """ + Determine the execution instructions from the inputs. This test tries to examine + all the corner cases with using response sender. + + Assumptions: The request batch size can be larger than one. + + There are 5 inputs in the model that control the model behavior: + * NUMBER_OF_RESPONSE_BEFORE_RETURN (UINT8): + Determines the number of responses before returning from execute function. + * SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN (BOOL): + Determines whether the final flag will be sent before return. + * RETURN_A_RESPONSE (BOOL): + Return the response when the model is returning from `execute` function. + * NUMBER_OF_RESPONSE_AFTER_RETURN (UINT8): + Determines the number of responses after return. + * SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN (BOOL): + Determines whether the final flag will be sent before return. + + Note: + * If the batch size of a request is larger than one, the sum of the values in + the batch will be used for determining the value of each input of the + request. + * The response_id is used to determine the difference between responses sent + during execute, when execute returns, or after execute returns. + """ + instr = {} + return_a_response_np = self._pb_utils.get_input_tensor_by_name( + request, "RETURN_A_RESPONSE" + ).as_numpy() + instr["batch_size"] = return_a_response_np.shape[0] + instr["return_a_response"] = bool(return_a_response_np.sum()) + instr["number_of_pre_return_response"] = ( + self._pb_utils.get_input_tensor_by_name( + request, "NUMBER_OF_RESPONSE_BEFORE_RETURN" + ) + .as_numpy() + .sum() + ) + instr["number_of_post_return_response"] = ( + self._pb_utils.get_input_tensor_by_name( + request, "NUMBER_OF_RESPONSE_AFTER_RETURN" + ) + .as_numpy() + .sum() + ) + instr["send_complete_final_flag_pre_return"] = bool( + self._pb_utils.get_input_tensor_by_name( + request, "SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN" + ) + .as_numpy() + .sum() + ) + instr["send_complete_final_flag_post_return"] = bool( + self._pb_utils.get_input_tensor_by_name( + request, "SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN" + ) + .as_numpy() + .sum() + ) + return instr + + def _is_response_sender_needed(self, instr): + return ( + instr["number_of_pre_return_response"] > 0 + or instr["number_of_post_return_response"] > 0 + or instr["send_complete_final_flag_pre_return"] + or instr["send_complete_final_flag_post_return"] + ) + + def _create_response(self, batch_size, response_id): + output_tensor = self._pb_utils.Tensor( + "INDEX", np.array([[response_id] for _ in range(batch_size)], np.uint16) + ) + response = self._pb_utils.InferenceResponse(output_tensors=[output_tensor]) + return response + + def _send_responses(self, processed_requests, response_id_offset): + for request in processed_requests: + number_of_response = request["number_of_response"] + batch_size = request["batch_size"] + response_sender = request["response_sender"] + send_complete_final_flag = request["send_complete_final_flag"] + for response_id in range(number_of_response): + response_sender.send( + self._create_response( + batch_size, response_id=(response_id_offset + response_id) + ) + ) + if send_complete_final_flag: + response_sender.send( + flags=self._pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) + + def _send_responses_delayed_threaded(self, processed_requests, response_id_offset): + def response_thread(send_responses, processed_requests, response_id_offset): + time.sleep(1) # response after requests are released + send_responses(processed_requests, response_id_offset) + + thread = threading.Thread( + target=response_thread, + args=(self._send_responses, processed_requests, response_id_offset), + ) + thread.daemon = True + thread.start() + + def _send_responses_delayed_async(self, processed_requests, response_id_offset): + async def response_async( + send_responses, processed_requests, response_id_offset + ): + asyncio.sleep(1) # response after requests are released + send_responses(processed_requests, response_id_offset) + + coro = response_async( + self._send_responses, processed_requests, response_id_offset + ) + task = asyncio.create_task(coro) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + def execute(self, requests, use_async): + pre_return_processed_requests = [] + return_responses = [] + post_return_processed_requests = [] + + for request in requests: + instr = self._get_instructions_from_request(request) + + response_sender = None + if self._is_response_sender_needed(instr): + response_sender = request.get_response_sender() + + pre_return_processed_requests.append( + { + "number_of_response": instr["number_of_pre_return_response"], + "batch_size": instr["batch_size"], + "response_sender": response_sender, + "send_complete_final_flag": instr[ + "send_complete_final_flag_pre_return" + ], + } + ) + post_return_processed_requests.append( + { + "number_of_response": instr["number_of_post_return_response"], + "batch_size": instr["batch_size"], + "response_sender": response_sender, + "send_complete_final_flag": instr[ + "send_complete_final_flag_post_return" + ], + } + ) + + response = None + if instr["return_a_response"]: + response = self._create_response(instr["batch_size"], response_id=0) + return_responses.append(response) + + self._send_responses(pre_return_processed_requests, response_id_offset=1000) + + if use_async: + self._send_responses_delayed_async( + post_return_processed_requests, response_id_offset=2000 + ) + else: + self._send_responses_delayed_threaded( + post_return_processed_requests, response_id_offset=2000 + ) + + if return_responses == [None for _ in requests]: + return None + return return_responses