Skip to content

Commit

Permalink
Add documentation for sequence request timeout. Add test (#5571)
Browse files Browse the repository at this point in the history
* Add documentation for sequence timeout

* Add test

* Address comment

* Fix up
  • Loading branch information
GuanLuo authored Mar 31, 2023
1 parent 4c10e0a commit 4383251
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 6 deletions.
29 changes: 25 additions & 4 deletions docs/protocol/extension_schedule_policy.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -31,15 +31,17 @@
This document describes Triton's schedule policy extension. The
schedule-policy extension allows an inference request to provide
parameters that influence how Triton handles and schedules the
request. Because this extension is supported, Triton reports
request. Because this extension is supported, Triton reports
“schedule_policy” in the extensions field of its Server Metadata.
Note the policies are specific to [dynamic
batcher](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/model_configuration.md#dynamic-batcher)
and not [sequence
and only experimental support to [sequence
batcher](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/model_configuration.md#sequence-batcher)
with the [direct](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/architecture.md#direct)
scheduling strategy.

## Dynamic Batcher

The schedule-policy extension uses request parameters to indicate the
policy. The parameters and their type are:

Expand All @@ -55,6 +57,25 @@ policy. The parameters and their type are:
the time Triton will take a model-specific action such as
terminating the request.

Both parameters are optional and if not specified Triton will handle
Both parameters are optional and, if not specified, Triton will handle
the request using the default priority and timeout values appropriate
for the model.

## Sequence Batcher with Direct Scheduling Strategy

**Note that the schedule policy for sequence batcher is at experimental stage
and it is subject to change.**

The schedule-policy extension uses request parameters to indicate the
policy. The parameters and their type are:

- "timeout" : int64 value indicating the timeout value for the
request, in microseconds. If the request cannot be completed within
the time Triton will terminate the request, as well as the corresponding
sequence and received requests of the sequence. The timeout will only be
applied to requests of the sequences that haven't been allocated a batch slot
for execution, the requests of the sequences that have been allocated batch
slots will not be affected by the timeout setting.

The parameter is optional and, if not specified, Triton will handle
the request and corresponding sequence based on the model configuration.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

name: "identity_fp32_timeout"
backend: "python"
max_batch_size: 1

input [
{
name: "INPUT0"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

output [
{
name: "OUTPUT0"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

instance_group [
{
count: 1
kind : KIND_CPU
}
]

sequence_batching {
max_sequence_idle_microseconds: 50000000
}
137 changes: 136 additions & 1 deletion qa/L0_sequence_batcher/sequence_batcher_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -36,6 +36,8 @@
import numpy as np
import test_util as tu
import sequence_util as su
import tritonclient.grpc as grpcclient
from tritonclient.utils import InferenceServerException

TEST_SYSTEM_SHARED_MEMORY = bool(
int(os.environ.get('TEST_SYSTEM_SHARED_MEMORY', 0)))
Expand Down Expand Up @@ -2849,5 +2851,138 @@ def test_queue_delay_full_min_util(self):
self.cleanup_shm_regions(precreated_shm1_handles)


class SequenceBatcherRequestTimeoutTest(su.SequenceBatcherTestUtil):

def setUp(self):
super(SequenceBatcherRequestTimeoutTest, self).setUp()
# By default, find tritonserver on "localhost", but can be overridden
# with TRITONSERVER_IPADDR envvar
self.server_address_ = os.environ.get('TRITONSERVER_IPADDR',
'localhost') + ":8001"

self.model_name_ = "identity_fp32_timeout"
self.tensor_data_ = np.ones(shape=[1, 1], dtype=np.float32)
self.inputs_ = [grpcclient.InferInput('INPUT0', [1, 1], "FP32")]
self.inputs_[0].set_data_from_numpy(self.tensor_data_)

def send_sequence_with_timeout(self,
seq_id,
callback,
timeout_us=3000000,
request_pause_sec=0):
with grpcclient.InferenceServerClient(
self.server_address_) as triton_client:
triton_client.start_stream(callback=callback)
triton_client.async_stream_infer(self.model_name_,
self.inputs_,
sequence_id=seq_id,
sequence_start=True,
timeout=timeout_us)
if (request_pause_sec != 0):
time.sleep(request_pause_sec)
triton_client.async_stream_infer(self.model_name_,
self.inputs_,
sequence_id=seq_id,
timeout=timeout_us)
if (request_pause_sec != 0):
time.sleep(request_pause_sec)
triton_client.async_stream_infer(self.model_name_,
self.inputs_,
sequence_id=seq_id,
sequence_end=True,
timeout=timeout_us)

def test_request_timeout(self):
# Test long running model that receives requests with shorter timeout,
# expect the timeout will only be expired on backlog sequence and reject
# all requests of the sequence once expired.
# Sending two sequences while the model can only process one sequence
# at a time. Each model execution takes 5 second and all requests have
# 3 second timeout, so the second sequence will be rejected.

# correlation ID is 1-index
seq1_res = []
seq2_res = []
seq1_callback = lambda result, error: seq1_res.append((result, error))
seq2_callback = lambda result, error: seq2_res.append((result, error))

# send sequence with 1s interval to ensure processing order
threads = []
threads.append(
threading.Thread(target=self.send_sequence_with_timeout,
args=(1, seq1_callback)))
threads.append(
threading.Thread(target=self.send_sequence_with_timeout,
args=(2, seq2_callback)))
threads[0].start()
time.sleep(1)
threads[1].start()
for t in threads:
t.join()

for result, error in seq1_res:
self.assertIsNone(
error,
"Expect sucessful inference for sequence 1 requests, got error: {}"
.format(error))
np.testing.assert_allclose(
result.as_numpy("OUTPUT0"),
self.tensor_data_,
err_msg="Unexpected output tensor, got {}".format(
result.as_numpy("OUTPUT0")))

for _, error in seq2_res:
self.assertIsNotNone(error, "Expect error for sequence 2 requests")
with self.assertRaisesRegex(
InferenceServerException,
"timeout of the corresponding sequence has been expired",
msg="Unexpected error: {}".format(error)):
raise error

def test_send_request_after_timeout(self):
# Similar to test_request_timeout, but the sequence to be timed out
# will send the last request after the sequence has been timed out,
# and expecting server to return error regarding sending request of
# an untracked sequence

seq1_res = []
seq2_res = []
seq1_callback = lambda result, error: seq1_res.append((result, error))
seq2_callback = lambda result, error: seq2_res.append((result, error))

threads = []
threads.append(
threading.Thread(target=self.send_sequence_with_timeout,
args=(1, seq1_callback)))
# Each request will be sent with a pause, so the third request
# will be sent after the sequence has been timed out
threads.append(
threading.Thread(target=self.send_sequence_with_timeout,
args=(2, seq2_callback),
kwargs={'request_pause_sec': 2}))
threads[0].start()
time.sleep(1)
threads[1].start()
for t in threads:
t.join()

# Check error message of the last request and the rest
# separately
for _, error in seq2_res[0:-1]:
self.assertIsNotNone(error, "Expect error for sequence 2 requests")
with self.assertRaisesRegex(
InferenceServerException,
"timeout of the corresponding sequence has been expired",
msg="Unexpected error: {}".format(error)):
raise error
_, last_err = seq2_res[-1]
self.assertIsNotNone(last_err, "Expect error for sequence 2 requests")
with self.assertRaisesRegex(
InferenceServerException,
"must specify the START flag on the first request",
msg="Unexpected error: {}".format(last_err)):
raise last_err


if __name__ == '__main__':
unittest.main()
70 changes: 69 additions & 1 deletion qa/L0_sequence_batcher/test.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright 2018-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -47,6 +47,14 @@ export CUDA_VISIBLE_DEVICES=0
CLIENT_LOG="./client.log"
BATCHER_TEST=sequence_batcher_test.py

if [ -z "$TEST_SYSTEM_SHARED_MEMORY" ]; then
TEST_SYSTEM_SHARED_MEMORY="0"
fi

if [ -z "$TEST_CUDA_SHARED_MEMORY" ]; then
TEST_CUDA_SHARED_MEMORY="0"
fi

if [ -z "$TEST_VALGRIND" ]; then
TEST_VALGRIND="0"
fi
Expand Down Expand Up @@ -717,6 +725,66 @@ for i in $QUEUE_DELAY_TESTS ; do
set -e
done

# Test request timeout with sequence batcher
# only run the test outside shared memory setting as
# shared memory feature is irrelevant
if [ "$TEST_SYSTEM_SHARED_MEMORY" -ne 1 ] && [ "$TEST_CUDA_SHARED_MEMORY" -ne 1 ]; then
export NO_BATCHING=0
export MODEL_INSTANCES=1
export BATCHER_TYPE="FIXED"

TEST_CASE=SequenceBatcherRequestTimeoutTest
MODEL_PATH=request_timeout_models
mkdir -p ${MODEL_PATH}/identity_fp32_timeout/1
cp ../python_models/identity_fp32_timeout/model.py ${MODEL_PATH}/identity_fp32_timeout/1/.

SERVER_ARGS="--model-repository=$MODELDIR/$MODEL_PATH ${SERVER_ARGS_EXTRA}"
SERVER_LOG="./$TEST_CASE.$MODEL_PATH.serverlog"

if [ "$TEST_VALGRIND" -eq 1 ]; then
LEAKCHECK_LOG="./$i.$MODEL_PATH.valgrind.log"
LEAKCHECK_ARGS="$LEAKCHECK_ARGS_BASE --log-file=$LEAKCHECK_LOG"
run_server_leakcheck
else
run_server
fi

if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

echo "Test: $TEST_CASE, repository $MODEL_PATH" >>$CLIENT_LOG

set +e
python3 $BATCHER_TEST $TEST_CASE >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
echo -e "\n***\n*** Test $TEST_CASE Failed\n***" >>$CLIENT_LOG
echo -e "\n***\n*** Test $TEST_CASE Failed\n***"
RET=1
else
check_test_results $TEST_RESULT_FILE 2
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Result Verification Failed\n***"
RET=1
fi
fi
set -e

kill_server

set +e
if [ "$TEST_VALGRIND" -eq 1 ]; then
python3 ../common/check_valgrind_log.py -f $LEAKCHECK_LOG
if [ $? -ne 0 ]; then
RET=1
fi
fi
set -e
fi

if [ $RET -eq 0 ]; then
echo -e "\n***\n*** Test Passed\n***"
else
Expand Down

0 comments on commit 4383251

Please sign in to comment.