Skip to content

Commit

Permalink
Add Python backend request cancellation test (#6364)
Browse files Browse the repository at this point in the history
* Add cancelled response status test

* Add Python backend request cancellation test

* Add Python backend decoupled request cancellation test

* Simplified response if cancelled

* Test response_sender.send() after closed

* Rollback test response_sender.send() after closed

* Rollback non-decoupled any response on cancel
  • Loading branch information
kthui authored Oct 6, 2023
1 parent bdf227c commit 2bf543b
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 2 deletions.
33 changes: 33 additions & 0 deletions qa/L0_backend_python/decoupled/decoupled_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,39 @@ def test_decoupled_send_after_close_error(self):
"The completed request size must be zero.",
)

def test_decoupled_execute_cancel(self):
model_name = "execute_cancel"
log_path = "decoupled_server.log"
execute_delay = 4.0 # seconds
shape = [1, 1]

user_data = UserData()
with grpcclient.InferenceServerClient("localhost:8001") as client:
client.start_stream(callback=partial(callback, user_data))
input_data = np.array([[execute_delay]], dtype=np.float32)
inputs = [
grpcclient.InferInput(
"EXECUTE_DELAY", shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
client.async_stream_infer(model_name, inputs)
time.sleep(2) # model delay for decoupling request and response sender
time.sleep(2) # ensure the request is executing
client.stop_stream(cancel_requests=True)
time.sleep(2) # ensure the cancellation is delivered

self.assertFalse(user_data._completed_requests.empty())
while not user_data._completed_requests.empty():
data_item = user_data._completed_requests.get()
self.assertIsInstance(data_item, InferenceServerException)
self.assertEqual(data_item.status(), "StatusCode.CANCELLED")

with open(log_path, mode="r", encoding="utf-8", errors="strict") as f:
log_text = f.read()
self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text)
self.assertIn("[execute_cancel] Request cancelled at ", log_text)


if __name__ == "__main__":
unittest.main()
7 changes: 6 additions & 1 deletion qa/L0_backend_python/decoupled/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

CLIENT_PY=./decoupled_test.py
CLIENT_LOG="./decoupled_client.log"
EXPECTED_NUM_TESTS="5"
EXPECTED_NUM_TESTS="6"
TEST_RESULT_FILE='test_results.txt'
TRITON_DIR=${TRITON_DIR:="/opt/tritonserver"}
SERVER=${TRITON_DIR}/bin/tritonserver
Expand All @@ -50,6 +50,11 @@ mkdir -p models/dlpack_add_sub/1/
cp ../../python_models/dlpack_add_sub/model.py models/dlpack_add_sub/1/
cp ../../python_models/dlpack_add_sub/config.pbtxt models/dlpack_add_sub/

mkdir -p models/execute_cancel/1/
cp ../../python_models/execute_cancel/model.py ./models/execute_cancel/1/
cp ../../python_models/execute_cancel/config.pbtxt ./models/execute_cancel/
echo "model_transaction_policy { decoupled: True }" >> ./models/execute_cancel/config.pbtxt

git clone https://github.com/triton-inference-server/python_backend -b $PYTHON_BACKEND_REPO_TAG
mkdir -p models/square_int32/1/
cp python_backend/examples/decoupled/square_model.py models/square_int32/1/model.py
Expand Down
38 changes: 38 additions & 0 deletions qa/L0_backend_python/lifecycle/lifecycle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
sys.path.append("../../common")

import queue
import time
import unittest
from functools import partial

Expand Down Expand Up @@ -70,6 +71,7 @@ def test_error_code(self):
("UNAVAILABLE", "[StatusCode.UNAVAILABLE]"),
("UNSUPPORTED", "[StatusCode.UNIMPLEMENTED]"),
("ALREADY_EXISTS", "[StatusCode.ALREADY_EXISTS]"),
("CANCELLED", "[StatusCode.CANCELLED]"),
("(default)", "[StatusCode.INTERNAL] unrecognized"),
]
with self._shm_leak_detector.Probe() as shm_probe:
Expand All @@ -91,6 +93,42 @@ def test_error_code(self):
expected_grpc_error_start + " error code: " + error,
)

def test_execute_cancel(self):
model_name = "execute_cancel"
log_path = "lifecycle_server.log"
execute_delay = 4.0 # seconds
shape = [1, 1]
response = {"responded": False, "result": None, "error": None}

def callback(result, error):
response["responded"] = True
response["result"] = result
response["error"] = error

with self._shm_leak_detector.Probe() as shm_probe:
with grpcclient.InferenceServerClient("localhost:8001") as client:
input_data = np.array([[execute_delay]], dtype=np.float32)
inputs = [
grpcclient.InferInput(
"EXECUTE_DELAY", shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
exec_future = client.async_infer(model_name, inputs, callback)
time.sleep(2) # ensure the request is executing
self.assertFalse(response["responded"])
exec_future.cancel()
time.sleep(2) # ensure the cancellation is delivered
self.assertTrue(response["responded"])

self.assertEqual(response["result"], None)
self.assertIsInstance(response["error"], InferenceServerException)
self.assertEqual(response["error"].status(), "StatusCode.CANCELLED")
with open(log_path, mode="r", encoding="utf-8", errors="strict") as f:
log_text = f.read()
self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text)
self.assertIn("[execute_cancel] Request cancelled at ", log_text)

def test_batch_error(self):
# The execute_error model returns an error for the first and third
# request and successfully processes the second request. This is making
Expand Down
6 changes: 5 additions & 1 deletion qa/L0_backend_python/lifecycle/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

CLIENT_LOG="./lifecycle_client.log"
EXPECTED_NUM_TESTS="4"
EXPECTED_NUM_TESTS="5"
TEST_RESULT_FILE='test_results.txt'
source ../common.sh
source ../../common/util.sh
Expand All @@ -44,6 +44,10 @@ mkdir -p models/error_code/1/
cp ../../python_models/error_code/model.py ./models/error_code/1/
cp ../../python_models/error_code/config.pbtxt ./models/error_code/

mkdir -p models/execute_cancel/1/
cp ../../python_models/execute_cancel/model.py ./models/execute_cancel/1/
cp ../../python_models/execute_cancel/config.pbtxt ./models/execute_cancel/

mkdir -p models/execute_error/1/
cp ../../python_models/execute_error/model.py ./models/execute_error/1/
cp ../../python_models/execute_error/config.pbtxt ./models/execute_error/
Expand Down
1 change: 1 addition & 0 deletions qa/python_models/error_code/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def execute(self, requests):
"UNAVAILABLE": pb_utils.TritonError.UNAVAILABLE,
"UNSUPPORTED": pb_utils.TritonError.UNSUPPORTED,
"ALREADY_EXISTS": pb_utils.TritonError.ALREADY_EXISTS,
"CANCELLED": pb_utils.TritonError.CANCELLED,
}

responses = []
Expand Down
47 changes: 47 additions & 0 deletions qa/python_models/execute_cancel/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

name: "execute_cancel"
backend: "python"
max_batch_size: 1

input [
{
name: "EXECUTE_DELAY"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

output [
{
name: "DUMMY_OUT"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

instance_group [{ kind: KIND_CPU }]
108 changes: 108 additions & 0 deletions qa/python_models/execute_cancel/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import json
import threading
import time

import triton_python_backend_utils as pb_utils


class TritonPythonModel:
def initialize(self, args):
self._logger = pb_utils.Logger
self._model_config = json.loads(args["model_config"])
self._using_decoupled = pb_utils.using_decoupled_model_transaction_policy(
self._model_config
)

def execute(self, requests):
processed_requests = []
for request in requests:
delay_tensor = pb_utils.get_input_tensor_by_name(
request, "EXECUTE_DELAY"
).as_numpy()
delay = delay_tensor[0][0] # seconds
if self._using_decoupled:
processed_requests.append(
{"response_sender": request.get_response_sender(), "delay": delay}
)
else:
processed_requests.append({"request": request, "delay": delay})
if self._using_decoupled:
return self._execute_decoupled(processed_requests)
return self._execute_processed_requests(processed_requests)

def _execute_processed_requests(self, processed_requests):
responses = []
for processed_request in processed_requests:
error = pb_utils.TritonError(message="not cancelled")
object_to_check_cancelled = None
if "response_sender" in processed_request:
object_to_check_cancelled = processed_request["response_sender"]
elif "request" in processed_request:
object_to_check_cancelled = processed_request["request"]
delay = processed_request["delay"] # seconds
time_elapsed = 0.0 # seconds
while time_elapsed < delay:
time.sleep(1)
time_elapsed += 1.0
if object_to_check_cancelled.is_cancelled():
self._logger.log_info(
"[execute_cancel] Request cancelled at "
+ str(time_elapsed)
+ " s"
)
error = pb_utils.TritonError(
message="cancelled", code=pb_utils.TritonError.CANCELLED
)
break
self._logger.log_info(
"[execute_cancel] Request not cancelled at "
+ str(time_elapsed)
+ " s"
)
responses.append(pb_utils.InferenceResponse(error=error))
return responses

def _execute_decoupled(self, processed_requests):
def response_thread(execute_processed_requests, processed_requests):
time.sleep(2) # execute after requests are released
responses = execute_processed_requests(processed_requests)
for i in range(len(responses)): # len(responses) == len(processed_requests)
response_sender = processed_requests[i]["response_sender"]
response_sender.send(responses[i])
response_sender.send(
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL
)

thread = threading.Thread(
target=response_thread,
args=(self._execute_processed_requests, processed_requests),
)
thread.daemon = True
thread.start()
return None

0 comments on commit 2bf543b

Please sign in to comment.