Skip to content

Commit

Permalink
Revert "fix: don't check for exception a second time since send_reque…
Browse files Browse the repository at this point in the history
…sts_once handles the exception"

This reverts commit cf2c75a.
  • Loading branch information
Girish Chandrashekar committed Nov 3, 2022
1 parent 2346d98 commit 651acb9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
7 changes: 6 additions & 1 deletion jina/serve/runtimes/gateway/graph/topology_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import grpc.aio
from grpc.aio import AioRpcError

from jina import __default_endpoint__
from jina.excepts import InternalNetworkError
from jina.serve.networking import GrpcConnectionPool
Expand Down Expand Up @@ -159,7 +160,7 @@ async def _wait_previous_and_send(
return request, metadata
# otherwise, send to executor and get response
try:
resp, metadata = await connection_pool.send_requests_once(
result = await connection_pool.send_requests_once(
requests=self.parts_to_send,
deployment=self.name,
metadata=self._metadata,
Expand All @@ -168,6 +169,10 @@ async def _wait_previous_and_send(
timeout=self._timeout_send,
retries=self._retries,
)
if isinstance(result, (AioRpcError, InternalNetworkError)):
raise result
else:
resp, metadata = result
if WorkerRequestHandler._KEY_RESULT in resp.parameters:
# Accumulate results from each Node and then add them to the original
self.result_in_params_returned = resp.parameters[
Expand Down
21 changes: 12 additions & 9 deletions jina/serve/runtimes/head/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,16 +338,18 @@ async def _handle_data_request(

uses_before_metadata = None
if self.uses_before_address:
(
response,
uses_before_metadata,
) = await self.connection_pool.send_requests_once(
uses_before_result = await self.connection_pool.send_requests_once(
requests,
deployment='uses_before',
timeout=self.timeout_send,
retries=self._retries,
)
requests = [response]

if isinstance(uses_before_result, (AioRpcError, InternalNetworkError)):
raise uses_before_result
else:
(response, uses_before_metadata) = uses_before_result
requests = [response]

(
worker_results,
Expand All @@ -369,15 +371,16 @@ async def _handle_data_request(
response_request = worker_results[0]
uses_after_metadata = None
if self.uses_after_address:
(
response_request,
uses_after_metadata,
) = await self.connection_pool.send_requests_once(
uses_after_result = await self.connection_pool.send_requests_once(
worker_results,
deployment='uses_after',
timeout=self.timeout_send,
retries=self._retries,
)
if isinstance(uses_after_result, (AioRpcError, InternalNetworkError)):
raise uses_after_result
else:
(response_request, uses_after_metadata) = uses_after_result
elif len(worker_results) > 1 and self._reduce:
response_request = WorkerRequestHandler.reduce_requests(worker_results)
elif len(worker_results) > 1 and not self._reduce:
Expand Down

0 comments on commit 651acb9

Please sign in to comment.