Skip to content

Commit

Permalink
refactor: refactor WorkerRequestHandler module
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Nov 10, 2022
1 parent b1a3470 commit d910673
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion jina/serve/runtimes/gateway/graph/topology_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from jina.excepts import InternalNetworkError
from jina.serve.networking import GrpcConnectionPool
from jina.serve.runtimes.helper import _parse_specific_params
from jina.serve.runtimes.worker.worker_request_handler import (
from jina.serve.runtimes.worker.request_handling import (
WorkerRequestHandler,
)
from jina.types.request.data import DataRequest
Expand Down
2 changes: 1 addition & 1 deletion jina/serve/runtimes/gateway/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jina.serve.runtimes.gateway.graph.topology_graph import TopologyGraph
from jina.serve.runtimes.helper import _is_param_for_specific_executor
from jina.serve.runtimes.monitoring import MonitoringRequestMixin
from jina.serve.runtimes.worker.worker_request_handler import WorkerRequestHandler
from jina.serve.runtimes.worker.request_handling import WorkerRequestHandler

if TYPE_CHECKING: # pragma: no cover
from asyncio import Future
Expand Down
2 changes: 1 addition & 1 deletion jina/serve/runtimes/head/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

from jina.serve.runtimes.monitoring import MonitoringRequestMixin
from jina.serve.runtimes.worker.worker_request_handler import WorkerRequestHandler
from jina.serve.runtimes.worker.request_handling import WorkerRequestHandler

if TYPE_CHECKING: # pragma: no cover
from jina.logging.logger import JinaLogger
Expand Down
12 changes: 6 additions & 6 deletions jina/serve/runtimes/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from jina.serve.instrumentation import MetricsTimer
from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime
from jina.serve.runtimes.helper import _get_grpc_server_options
from jina.serve.runtimes.worker.worker_request_handler import (
from jina.serve.runtimes.worker.request_handling import (
WorkerRequestHandler,
)
from jina.types.request.data import DataRequest
Expand Down Expand Up @@ -100,7 +100,7 @@ async def async_setup(self):
# Keep this initialization order
# otherwise readiness check is not valid
# The WorkerRequestHandler needs to be started BEFORE the grpc server
self._worker_request_handler = WorkerRequestHandler(
self._request_handler = WorkerRequestHandler(
self.args,
self.logger,
self.metrics_registry,
Expand Down Expand Up @@ -165,7 +165,7 @@ async def async_teardown(self):
"""Close the data request handler"""
self._health_servicer.enter_graceful_shutdown()
await self.async_cancel()
self._worker_request_handler.close()
self._request_handler.close()

async def process_single_data(self, request: DataRequest, context) -> DataRequest:
"""
Expand All @@ -188,7 +188,7 @@ async def endpoint_discovery(self, empty, context) -> jina_pb2.EndpointsProto:
self.logger.debug('got an endpoint discovery request')
endpointsProto = jina_pb2.EndpointsProto()
endpointsProto.endpoints.extend(
list(self._worker_request_handler._executor.requests.keys())
list(self._request_handler._executor.requests.keys())
)
return endpointsProto

Expand Down Expand Up @@ -222,7 +222,7 @@ async def process_data(self, requests: List[DataRequest], context) -> DataReques
tracing_context = self._extract_tracing_context(
context.invocation_metadata()
)
result = await self._worker_request_handler.handle(
result = await self._request_handler.handle(
requests=requests, tracing_context=tracing_context
)
if self._successful_requests_metrics:
Expand All @@ -241,7 +241,7 @@ async def process_data(self, requests: List[DataRequest], context) -> DataReques
exc_info=not self.args.quiet_error,
)

requests[0].add_exception(ex, self._worker_request_handler._executor)
requests[0].add_exception(ex, self._request_handler._executor)
context.set_trailing_metadata((('is-error', 'true'),))
if self._failed_requests_metrics:
self._failed_requests_metrics.inc()
Expand Down

0 comments on commit d910673

Please sign in to comment.