Skip to content

Commit

Permalink
Add test for decoupled returning response and dynamic batched requests
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui committed May 22, 2024
1 parent 5f045d3 commit c324d80
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 12 deletions.
174 changes: 167 additions & 7 deletions qa/L0_backend_python/response_sender/response_sender_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import numpy as np
import tritonclient.grpc as grpcclient
from tritonclient.utils import InferenceServerException


class ResponseSenderTest(unittest.TestCase):
Expand Down Expand Up @@ -67,6 +68,94 @@ def _get_inputs(
)
return inputs

def _get_parallel_inputs_and_response_types_decoupled(self):
return {
"parallel_inputs": [
self._get_inputs(
number_of_response_before_return=0,
send_complete_final_flag_before_return=True,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
),
self._get_inputs(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=True,
),
self._get_inputs(
number_of_response_before_return=1,
send_complete_final_flag_before_return=True,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
),
self._get_inputs(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=1,
send_complete_final_flag_after_return=True,
),
self._get_inputs(
number_of_response_before_return=2,
send_complete_final_flag_before_return=True,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
),
self._get_inputs(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=2,
send_complete_final_flag_after_return=True,
),
self._get_inputs(
number_of_response_before_return=1,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=3,
send_complete_final_flag_after_return=True,
),
],
"number_of_response_before_return": 4,
"return_a_response": 0,
"number_of_response_after_return": 6,
}

def _get_parallel_inputs_and_response_types_non_decoupled(self):
return {
"parallel_inputs": [
self._get_inputs(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=True,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
),
self._get_inputs(
number_of_response_before_return=1,
send_complete_final_flag_before_return=True,
return_a_response=False,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
),
self._get_inputs(
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=False,
number_of_response_after_return=1,
send_complete_final_flag_after_return=True,
),
],
"number_of_response_before_return": 1,
"return_a_response": 1,
"number_of_response_after_return": 1,
}

def _generate_streaming_callback_and_responses_pair(self):
responses = [] # [{"result": result, "error": error}, ...]

Expand All @@ -75,6 +164,16 @@ def callback(result, error):

return callback, responses

def _infer_parallel(self, model_name, parallel_inputs):
callback, responses = self._generate_streaming_callback_and_responses_pair()
with grpcclient.InferenceServerClient("localhost:8001") as client:
client.start_stream(callback)
for inputs in parallel_inputs:
client.async_stream_infer(model_name, inputs)
time.sleep(2) # to collect all responses
client.stop_stream()
return responses

def _infer(
self,
model_name,
Expand All @@ -91,13 +190,7 @@ def _infer(
number_of_response_after_return,
send_complete_final_flag_after_return,
)
callback, responses = self._generate_streaming_callback_and_responses_pair()
with grpcclient.InferenceServerClient("localhost:8001") as client:
client.start_stream(callback)
client.async_stream_infer(model_name, inputs)
time.sleep(2) # to collect all responses
client.stop_stream()
return responses
return self._infer_parallel(model_name, [inputs])

def _assert_responses_valid(
self,
Expand Down Expand Up @@ -128,6 +221,12 @@ def _assert_responses_valid(
self.assertEqual(return_a_response, response_returned)
self.assertEqual(number_of_response_after_return, after_return_response_count)

def _assert_responses_exception(self, responses, expected_message):
for response in responses:
self.assertIsNone(response["result"])
self.assertIsInstance(response["error"], InferenceServerException)
self.assertIn(expected_message, response["error"].message())

def _assert_decoupled_infer_success(
self,
number_of_response_before_return,
Expand Down Expand Up @@ -308,6 +407,67 @@ def test_non_decoupled_one_response_post_return(self):
send_complete_final_flag_after_return=True,
)

# Decoupled model requests each responding differently.
def test_decoupled_multiple_requests(self):
model_name = "response_sender_decoupled_batching"
ios = self._get_parallel_inputs_and_response_types_decoupled()
responses = self._infer_parallel(model_name, ios["parallel_inputs"])
self._assert_responses_valid(
responses,
ios["number_of_response_before_return"],
ios["return_a_response"],
ios["number_of_response_after_return"],
)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_decoupled_async_batching"
ios = self._get_parallel_inputs_and_response_types_decoupled()
responses = self._infer_parallel(model_name, ios["parallel_inputs"])
self._assert_responses_valid(
responses,
ios["number_of_response_before_return"],
ios["return_a_response"],
ios["number_of_response_after_return"],
)

# Non-decoupled model requests each responding differently.
def test_non_decoupled_multiple_requests(self):
model_name = "response_sender_batching"
ios = self._get_parallel_inputs_and_response_types_non_decoupled()
responses = self._infer_parallel(model_name, ios["parallel_inputs"])
self._assert_responses_valid(
responses,
ios["number_of_response_before_return"],
ios["return_a_response"],
ios["number_of_response_after_return"],
)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_async_batching"
ios = self._get_parallel_inputs_and_response_types_non_decoupled()
responses = self._infer_parallel(model_name, ios["parallel_inputs"])
self._assert_responses_valid(
responses,
ios["number_of_response_before_return"],
ios["return_a_response"],
ios["number_of_response_after_return"],
)

# Decoupled model send 1 response on return.
def test_decoupled_one_response_on_return(self):
responses = self._infer(
model_name="response_sender_decoupled",
number_of_response_before_return=0,
send_complete_final_flag_before_return=False,
return_a_response=True,
number_of_response_after_return=0,
send_complete_final_flag_after_return=False,
)
self._assert_responses_exception(
responses,
expected_message="using the decoupled mode and the execute function must return None",
)
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.


if __name__ == "__main__":
unittest.main()
22 changes: 22 additions & 0 deletions qa/L0_backend_python/response_sender/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,28 @@ mkdir -p models/response_sender_decoupled_async/1 && \
cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async/1/model.py && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async && \
echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async/config.pbtxt
mkdir -p models/response_sender_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_batching/1 && \
cp ../../python_models/response_sender/model.py models/response_sender_batching/1 && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_batching && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_batching/config.pbtxt
mkdir -p models/response_sender_decoupled_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_batching/1 && \
cp ../../python_models/response_sender/model.py models/response_sender_decoupled_batching/1 && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_batching && \
echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_batching/config.pbtxt && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_batching/config.pbtxt
mkdir -p models/response_sender_async_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_async_batching/1 && \
cp ../../python_models/response_sender/model_async.py models/response_sender_async_batching/1/model.py && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_async_batching && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_async_batching/config.pbtxt
mkdir -p models/response_sender_decoupled_async_batching/1 && \
cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_async_batching/1 && \
cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async_batching/1/model.py && \
cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async_batching && \
echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async_batching/config.pbtxt && \
echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_async_batching/config.pbtxt

TEST_LOG="response_sender_test.log"
SERVER_LOG="response_sender_test.server.log"
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils)
self._common = ResponseSenderModelCommon(pb_utils, args)

def execute(self, requests):
return self._common.execute(requests, use_async=False)
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils)
self._common = ResponseSenderModelCommon(pb_utils, args)

async def execute(self, requests):
return self._common.execute(requests, use_async=True)
24 changes: 21 additions & 3 deletions qa/python_models/response_sender/model_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import json
import threading
import time

import numpy as np


class ResponseSenderModelCommon:
def __init__(self, pb_utils):
def __init__(self, pb_utils, args):
self._pb_utils = pb_utils
self._is_decoupled = pb_utils.using_decoupled_model_transaction_policy(
json.loads(args["model_config"])
)
self._background_tasks = set()

def _get_instructions_from_request(self, request):
Expand Down Expand Up @@ -94,6 +98,20 @@ def _send_responses(self, processed_requests, response_id_offset):
batch_size = request["batch_size"]
response_sender = request["response_sender"]
send_complete_final_flag = request["send_complete_final_flag"]

# TODO: gRPC frontend may segfault if non-decoupled model send response
# final flag separately from the response.
if (
not self._is_decoupled
and number_of_response == 1
and send_complete_final_flag
):
response_sender.send(
self._create_response(batch_size, response_id=response_id_offset),
flags=self._pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL,
)
continue

for response_id in range(number_of_response):
response_sender.send(
self._create_response(
Expand All @@ -107,7 +125,7 @@ def _send_responses(self, processed_requests, response_id_offset):

def _send_responses_delayed_threaded(self, processed_requests, response_id_offset):
def response_thread(send_responses, processed_requests, response_id_offset):
time.sleep(1) # response after requests are released
time.sleep(0.5) # response after requests are released
send_responses(processed_requests, response_id_offset)

thread = threading.Thread(
Expand All @@ -121,7 +139,7 @@ def _send_responses_delayed_async(self, processed_requests, response_id_offset):
async def response_async(
send_responses, processed_requests, response_id_offset
):
asyncio.sleep(1) # response after requests are released
await asyncio.sleep(0.5) # response after requests are released
send_responses(processed_requests, response_id_offset)

coro = response_async(
Expand Down

0 comments on commit c324d80

Please sign in to comment.