Skip to content

Commit

Permalink
fix: add executor before measuring sent measure
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Nov 10, 2022
1 parent 7c18ccc commit 7e3bff3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
metrics_registry: Optional['CollectorRegistry'] = None,
tracer_provider: Optional['trace.TracerProvider'] = None,
meter_provider: Optional['metrics.MeterProvider'] = None,
deployment_name: str = '',
**kwargs,
):
"""Initialize private parameters and execute private loading functions.
Expand All @@ -39,6 +40,7 @@ def __init__(
:param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor of from the data request handler
:param tracer_provider: Optional tracer_provider that will be provided to the executor for tracing
:param meter_provider: Optional meter_provider that will be provided to the executor for metrics
:param deployment_name: name of the deployment to use as Executor name to set in requests
:param kwargs: extra keyword arguments
"""
super().__init__()
Expand All @@ -56,6 +58,7 @@ def __init__(
else None
)
self._init_monitoring(metrics_registry, meter)
self.deployment_name = deployment_name

def _init_monitoring(
self,
Expand Down Expand Up @@ -296,6 +299,9 @@ async def handle(

docs = self._set_result(requests, return_data, docs)

for req in requests:
req.add_executor(self.deployment_name)

self._record_docs_processed_monitoring(requests, docs)
self._record_response_size_monitoring(requests)

Expand Down
13 changes: 6 additions & 7 deletions jina/serve/runtimes/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(
"""
self._health_servicer = health.HealthServicer(experimental_non_blocking=True)
super().__init__(args, **kwargs)
self.deployment_name = self.name.split('/')[0]

async def async_setup(self):
"""
Expand Down Expand Up @@ -102,11 +101,12 @@ async def async_setup(self):
# otherwise readiness check is not valid
# The WorkerRequestHandler needs to be started BEFORE the grpc server
self._worker_request_handler = WorkerRequestHandler(
self.args,
self.logger,
self.metrics_registry,
self.tracer_provider,
self.meter_provider,
args=self.args,
logger=self.logger,
metrics_registry=self.metrics_registry,
tracer_provider=self.tracer_provider,
meter_provider=self.meter_provider,
deployment_name=self.name.split('/')[0]
)
await self._async_setup_grpc_server()

Expand Down Expand Up @@ -226,7 +226,6 @@ async def process_data(self, requests: List[DataRequest], context) -> DataReques
result = await self._worker_request_handler.handle(
requests=requests, tracing_context=tracing_context
)
result.add_executor(self.deployment_name)
if self._successful_requests_metrics:
self._successful_requests_metrics.inc()
if self._successful_requests_counter:
Expand Down

0 comments on commit 7e3bff3

Please sign in to comment.