Skip to content

Commit

Permalink
Add response sender test for new behavior (#7257)
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui authored May 31, 2024
1 parent 59fc4d4 commit 27d5dfd
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 51 deletions.
215 changes: 169 additions & 46 deletions qa/L0_backend_python/response_sender/response_sender_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,67 @@

import numpy as np
import tritonclient.grpc as grpcclient
from tritonclient.utils import InferenceServerException


class ResponseSenderTest(unittest.TestCase):
_inputs_parameters_zero_response_pre_return = {
"number_of_response_before_return": 0,
"send_complete_final_flag_before_return": True,
"return_a_response": False,
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}
_inputs_parameters_zero_response_post_return = {
"number_of_response_before_return": 0,
"send_complete_final_flag_before_return": False,
"return_a_response": False,
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": True,
}
_inputs_parameters_one_response_pre_return = {
"number_of_response_before_return": 1,
"send_complete_final_flag_before_return": True,
"return_a_response": False,
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}
_inputs_parameters_one_response_post_return = {
"number_of_response_before_return": 0,
"send_complete_final_flag_before_return": False,
"return_a_response": False,
"number_of_response_after_return": 1,
"send_complete_final_flag_after_return": True,
}
_inputs_parameters_two_response_pre_return = {
"number_of_response_before_return": 2,
"send_complete_final_flag_before_return": True,
"return_a_response": False,
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}
_inputs_parameters_two_response_post_return = {
"number_of_response_before_return": 0,
"send_complete_final_flag_before_return": False,
"return_a_response": False,
"number_of_response_after_return": 2,
"send_complete_final_flag_after_return": True,
}
_inputs_parameters_response_pre_and_post_return = {
"number_of_response_before_return": 1,
"send_complete_final_flag_before_return": False,
"return_a_response": False,
"number_of_response_after_return": 3,
"send_complete_final_flag_after_return": True,
}
_inputs_parameters_one_response_on_return = {
"number_of_response_before_return": 0,
"send_complete_final_flag_before_return": False,
"return_a_response": True,
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}

def _get_inputs(
self,
number_of_response_before_return,
Expand Down Expand Up @@ -74,6 +132,15 @@ def callback(result, error):

return callback, responses

def _infer_parallel(self, model_name, parallel_inputs):
callback, responses = self._generate_streaming_callback_and_responses_pair()
with grpcclient.InferenceServerClient("localhost:8001") as client:
client.start_stream(callback)
for inputs in parallel_inputs:
client.async_stream_infer(model_name, inputs)
client.stop_stream()
return responses

def _infer(
self,
model_name,
Expand All @@ -90,12 +157,7 @@ def _infer(
number_of_response_after_return,
send_complete_final_flag_after_return,
)
callback, responses = self._generate_streaming_callback_and_responses_pair()
with grpcclient.InferenceServerClient("localhost:8001") as client:
client.start_stream(callback)
client.async_stream_infer(model_name, inputs)
client.stop_stream()
return responses
return self._infer_parallel(model_name, [inputs])

def _assert_responses_valid(
self,
Expand Down Expand Up @@ -128,6 +190,12 @@ def _assert_responses_valid(
self.assertEqual(return_a_response, response_returned)
self.assertEqual(number_of_response_after_return, after_return_response_count)

def _assert_responses_exception(self, responses, expected_message):
for response in responses:
self.assertIsNone(response["result"])
self.assertIsInstance(response["error"], InferenceServerException)
self.assertIn(expected_message, response["error"].message())

def _assert_decoupled_infer_success(
self,
number_of_response_before_return,
Expand Down Expand Up @@ -211,82 +279,137 @@ def _assert_non_decoupled_infer_success(
# Decoupled model send response final flag before request return.
def test_decoupled_zero_response_pre_return(self):
self._assert_decoupled_infer_success(
number_of_response_before_return=0,
send_complete_final_flag_before_return=True,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
**self._inputs_parameters_zero_response_pre_return
)

# Decoupled model send response final flag after request return.
def test_decoupled_zero_response_post_return(self):
self._assert_decoupled_infer_success(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=True,
**self._inputs_parameters_zero_response_post_return
)

# Decoupled model send 1 response before request return.
def test_decoupled_one_response_pre_return(self):
self._assert_decoupled_infer_success(
number_of_response_before_return=1,
send_complete_final_flag_before_return=True,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
**self._inputs_parameters_one_response_pre_return
)

# Decoupled model send 1 response after request return.
def test_decoupled_one_response_post_return(self):
self._assert_decoupled_infer_success(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=1,
send_complete_final_flag_after_return=True,
**self._inputs_parameters_one_response_post_return
)

# Decoupled model send 2 response before request return.
def test_decoupled_two_response_pre_return(self):
self._assert_decoupled_infer_success(
number_of_response_before_return=2,
send_complete_final_flag_before_return=True,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
**self._inputs_parameters_two_response_pre_return
)

# Decoupled model send 2 response after request return.
def test_decoupled_two_response_post_return(self):
self._assert_decoupled_infer_success(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=2,
send_complete_final_flag_after_return=True,
**self._inputs_parameters_two_response_post_return
)

# Decoupled model send 1 and 3 responses before and after return.
def test_decoupled_response_pre_and_post_return(self):
self._assert_decoupled_infer_success(
number_of_response_before_return=1,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=3,
send_complete_final_flag_after_return=True,
**self._inputs_parameters_response_pre_and_post_return
)

# Non-decoupled model send 1 response on return.
def test_non_decoupled_one_response_on_return(self):
self._assert_non_decoupled_infer_success(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=True,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
**self._inputs_parameters_one_response_on_return
)

# Non-decoupled model send 1 response before return.
def test_non_decoupled_one_response_pre_return(self):
self._assert_non_decoupled_infer_success(
**self._inputs_parameters_one_response_pre_return
)

# Non-decoupled model send 1 response after return.
def test_non_decoupled_one_response_post_return(self):
self._assert_non_decoupled_infer_success(
**self._inputs_parameters_one_response_post_return
)

# Decoupled model requests each responding differently.
def test_decoupled_multiple_requests(self):
parallel_inputs = [
self._get_inputs(**self._inputs_parameters_zero_response_pre_return),
self._get_inputs(**self._inputs_parameters_zero_response_post_return),
self._get_inputs(**self._inputs_parameters_one_response_pre_return),
self._get_inputs(**self._inputs_parameters_one_response_post_return),
self._get_inputs(**self._inputs_parameters_two_response_pre_return),
self._get_inputs(**self._inputs_parameters_two_response_post_return),
self._get_inputs(**self._inputs_parameters_response_pre_and_post_return),
]
expected_number_of_response_before_return = 4
expected_return_a_response = False
expected_number_of_response_after_return = 6

model_name = "response_sender_decoupled_batching"
responses = self._infer_parallel(model_name, parallel_inputs)
self._assert_responses_valid(
responses,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_decoupled_async_batching"
responses = self._infer_parallel(model_name, parallel_inputs)
self._assert_responses_valid(
responses,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)

# Non-decoupled model requests each responding differently.
def test_non_decoupled_multiple_requests(self):
parallel_inputs = [
self._get_inputs(**self._inputs_parameters_one_response_on_return),
self._get_inputs(**self._inputs_parameters_one_response_pre_return),
self._get_inputs(**self._inputs_parameters_one_response_post_return),
]
expected_number_of_response_before_return = 1
expected_return_a_response = True
expected_number_of_response_after_return = 1

model_name = "response_sender_batching"
responses = self._infer_parallel(model_name, parallel_inputs)
self._assert_responses_valid(
responses,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_async_batching"
responses = self._infer_parallel(model_name, parallel_inputs)
self._assert_responses_valid(
responses,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)

# Decoupled model send 1 response on return.
def test_decoupled_one_response_on_return(self):
responses = self._infer(
model_name="response_sender_decoupled",
**self._inputs_parameters_one_response_on_return,
)
self._assert_responses_exception(
responses,
expected_message="using the decoupled mode and the execute function must return None",
)
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.


if __name__ == "__main__":
Expand Down
22 changes: 22 additions & 0 deletions qa/L0_backend_python/response_sender/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,28 @@ mkdir -p models/response_sender_decoupled_async/1 && \
cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async/1/model.py && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async && \
echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async/config.pbtxt
mkdir -p models/response_sender_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_batching/1 && \
cp ../../python_models/response_sender/model.py models/response_sender_batching/1 && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_batching && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_batching/config.pbtxt
mkdir -p models/response_sender_decoupled_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_batching/1 && \
cp ../../python_models/response_sender/model.py models/response_sender_decoupled_batching/1 && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_batching && \
echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_batching/config.pbtxt && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_batching/config.pbtxt
mkdir -p models/response_sender_async_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_async_batching/1 && \
cp ../../python_models/response_sender/model_async.py models/response_sender_async_batching/1/model.py && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_async_batching && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_async_batching/config.pbtxt
mkdir -p models/response_sender_decoupled_async_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_async_batching/1 && \
cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async_batching/1/model.py && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async_batching && \
echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async_batching/config.pbtxt && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_async_batching/config.pbtxt

TEST_LOG="response_sender_test.log"
SERVER_LOG="response_sender_test.server.log"
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils)
self._common = ResponseSenderModelCommon(pb_utils, args)

def execute(self, requests):
return self._common.execute(requests, use_async=False)
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils)
self._common = ResponseSenderModelCommon(pb_utils, args)

async def execute(self, requests):
return self._common.execute(requests, use_async=True)
24 changes: 21 additions & 3 deletions qa/python_models/response_sender/model_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import json
import threading
import time

import numpy as np


class ResponseSenderModelCommon:
def __init__(self, pb_utils):
def __init__(self, pb_utils, args):
self._pb_utils = pb_utils
self._is_decoupled = pb_utils.using_decoupled_model_transaction_policy(
json.loads(args["model_config"])
)
self._background_tasks = set()

def _get_instructions_from_request(self, request):
Expand Down Expand Up @@ -119,6 +123,20 @@ def _send_responses(self, processed_requests, response_id_offset):
batch_size = request["batch_size"]
response_sender = request["response_sender"]
send_complete_final_flag = request["send_complete_final_flag"]

# TODO: gRPC frontend may segfault if non-decoupled model send response
# final flag separately from the response.
if (
not self._is_decoupled
and number_of_response == 1
and send_complete_final_flag
):
response_sender.send(
self._create_response(batch_size, response_id=response_id_offset),
flags=self._pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL,
)
continue

for response_id in range(number_of_response):
response_sender.send(
self._create_response(
Expand All @@ -132,7 +150,7 @@ def _send_responses(self, processed_requests, response_id_offset):

def _send_responses_delayed_threaded(self, processed_requests, response_id_offset):
def response_thread(send_responses, processed_requests, response_id_offset):
time.sleep(1) # response after requests are released
time.sleep(0.5) # response after requests are released
send_responses(processed_requests, response_id_offset)

thread = threading.Thread(
Expand All @@ -146,7 +164,7 @@ def _send_responses_delayed_async(self, processed_requests, response_id_offset):
async def response_async(
send_responses, processed_requests, response_id_offset
):
asyncio.sleep(1) # response after requests are released
await asyncio.sleep(0.5) # response after requests are released
send_responses(processed_requests, response_id_offset)

coro = response_async(
Expand Down

0 comments on commit 27d5dfd

Please sign in to comment.