From 6ddd8082913de729eefb12add770bd34652e894f Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 10 Oct 2023 18:24:45 -0700 Subject: [PATCH 1/5] Add ability to cancel AsyncIO gRPC stream or non-stream requests --- .../library/tritonclient/grpc/aio/__init__.py | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/python/library/tritonclient/grpc/aio/__init__.py b/src/python/library/tritonclient/grpc/aio/__init__.py index fc5eaccdb..37414dacb 100755 --- a/src/python/library/tritonclient/grpc/aio/__init__.py +++ b/src/python/library/tritonclient/grpc/aio/__init__.py @@ -624,7 +624,7 @@ async def infer( except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def stream_infer( + def stream_infer( self, inputs_iterator, stream_timeout=None, @@ -636,7 +636,7 @@ async def stream_infer( Parameters ---------- - inputs_iterator : async_generator + inputs_iterator : asynchronous iterator Async iterator that yields a dict(s) consists of the input parameters to the async_stream_infer function defined in tritonclient.grpc.InferenceServerClient. @@ -653,9 +653,15 @@ async def stream_infer( Returns ------- - async_generator + asynchronous iterator Yield tuple holding (InferResult, InferenceServerException) objects. + This object can be used to cancel the inference request like below: + ---------- + it = stream_infer(...) + ret = it.cancel() + ---------- + Raises ------ InferenceServerException @@ -708,14 +714,16 @@ async def _request_iterator(inputs_iterator): parameters=inputs["parameters"], ) - try: - response_iterator = self._client_stub.ModelStreamInfer( - _request_iterator(inputs_iterator), - metadata=metadata, - timeout=stream_timeout, - compression=_grpc_compression_type(compression_algorithm), - ) - async for response in response_iterator: + class _ResponseIterator: + def __init__(self, grpc_call, verbose): + self._grpc_call = grpc_call + self._verbose = verbose + + def __aiter__(self): + return self + + async def __anext__(self): + response = await self._grpc_call.__aiter__().__anext__() if self._verbose: print(response) result = error = None @@ -723,6 +731,18 @@ async def _request_iterator(inputs_iterator): error = InferenceServerException(msg=response.error_message) else: result = InferResult(response.infer_response) - yield (result, error) + return result, error + + def cancel(self): + return self._grpc_call.cancel() + + try: + grpc_call = self._client_stub.ModelStreamInfer( + _request_iterator(inputs_iterator), + metadata=metadata, + timeout=stream_timeout, + compression=_grpc_compression_type(compression_algorithm), + ) + return _ResponseIterator(grpc_call, self._verbose) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) From 80215a725d3cca11748acfc934da38216b3f1a44 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 12 Oct 2023 13:25:13 -0700 Subject: [PATCH 2/5] Add docs on AsyncIO request cancellation --- README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/README.md b/README.md index d969468bc..c39c800e1 100644 --- a/README.md +++ b/README.md @@ -547,6 +547,23 @@ sent via this stream. client.stop_stream(cancel_requests=True) ``` +For AsyncIO requests, an AsyncIO task wrapping an `infer()` coroutine can be +safely cancelled. + +```python + infer_task = asyncio.create_task(aio_client.infer()) + await something_else + infer_task.cancel() +``` + +For AsyncIO streaming requests, `cancel()` can be called on the asynchronous +iterator returned by `stream_infer()` API. + +```python + responses_iterator = aio_client.stream_infer() + responses_iterator.cancel() +``` + See more details about these APIs in [grpc/\_client.py](src/python/library/tritonclient/grpc/_client.py). From 6d21e88d3508509fcb346a9fd09e7bf3030fac76 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 12 Oct 2023 14:52:37 -0700 Subject: [PATCH 3/5] Improve AsyncIO docs --- README.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c39c800e1..7ebc4ce60 100644 --- a/README.md +++ b/README.md @@ -547,8 +547,11 @@ sent via this stream. client.stop_stream(cancel_requests=True) ``` -For AsyncIO requests, an AsyncIO task wrapping an `infer()` coroutine can be -safely cancelled. +See more details about these APIs in +[grpc/\_client.py](src/python/library/tritonclient/grpc/_client.py). + +For gRPC AsyncIO requests, an AsyncIO task wrapping an `infer()` coroutine can +be safely cancelled. ```python infer_task = asyncio.create_task(aio_client.infer()) @@ -556,8 +559,8 @@ safely cancelled. infer_task.cancel() ``` -For AsyncIO streaming requests, `cancel()` can be called on the asynchronous -iterator returned by `stream_infer()` API. +For gRPC AsyncIO streaming requests, `cancel()` can be called on the +asynchronous iterator returned by `stream_infer()` API. ```python responses_iterator = aio_client.stream_infer() @@ -565,7 +568,7 @@ iterator returned by `stream_infer()` API. ``` See more details about these APIs in -[grpc/\_client.py](src/python/library/tritonclient/grpc/_client.py). +[grpc/\aio/\__init__.py](src/python/library/tritonclient/grpc/aio/__init__.py). See [request_cancellation](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/request_cancellation.md) in the server user-guide to learn about how this is handled on the From 2f7044f93e9e7feeca7b02e131e6c09370436e10 Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Thu, 12 Oct 2023 17:46:47 -0700 Subject: [PATCH 4/5] Improve example code styling Co-authored-by: Ryan McCormick --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7ebc4ce60..b9890d99e 100644 --- a/README.md +++ b/README.md @@ -554,7 +554,7 @@ For gRPC AsyncIO requests, an AsyncIO task wrapping an `infer()` coroutine can be safely cancelled. ```python - infer_task = asyncio.create_task(aio_client.infer()) + infer_task = asyncio.create_task(aio_client.infer(...)) await something_else infer_task.cancel() ``` @@ -563,7 +563,7 @@ For gRPC AsyncIO streaming requests, `cancel()` can be called on the asynchronous iterator returned by `stream_infer()` API. ```python - responses_iterator = aio_client.stream_infer() + responses_iterator = aio_client.stream_infer(...) responses_iterator.cancel() ``` From e6bd54c57d363084a8ec66577e84feb50beeeb84 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 12 Oct 2023 17:51:02 -0700 Subject: [PATCH 5/5] Skip await on documentation --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index b9890d99e..787d21677 100644 --- a/README.md +++ b/README.md @@ -555,7 +555,6 @@ be safely cancelled. ```python infer_task = asyncio.create_task(aio_client.infer(...)) - await something_else infer_task.cancel() ``` @@ -568,7 +567,7 @@ asynchronous iterator returned by `stream_infer()` API. ``` See more details about these APIs in -[grpc/\aio/\__init__.py](src/python/library/tritonclient/grpc/aio/__init__.py). +[grpc/aio/\__init__.py](src/python/library/tritonclient/grpc/aio/__init__.py). See [request_cancellation](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/request_cancellation.md) in the server user-guide to learn about how this is handled on the