Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add successful, pending and failed metrics to HeadRuntime #5374

Merged
merged 16 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/fundamentals/flow/instrumenting-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,13 @@ You can find more information on the different type of metrics in Prometheus [he
|-----------------------------------------|-------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------|
| `jina_receiving_request_seconds` | [Histogram](https://opentelemetry.io/docs/reference/specification/metrics/api/#histogram) | Measures the time elapsed between receiving a request from the Gateway and sending back the response. |
| `jina_sending_request_seconds` | [Histogram](https://opentelemetry.io/docs/reference/specification/metrics/api/#histogram) | Measures the time elapsed between sending a downstream request to an Executor and receiving the response back. |
| `jina_sent_request_bytes` | [Histogram](https://opentelemetry.io/docs/reference/specification/metrics/api/#histogram) | Measures the size in bytes of the request sent by the Head to the Executor. |
| `jina_number_of_pending_requests` | [UpDownCounter](https://opentelemetry.io/docs/reference/specification/metrics/api/#updowncounter)| Counts the number of pending requests. |
| `jina_successful_requests` | [Counter](https://opentelemetry.io/docs/reference/specification/metrics/api/#counter) | Counts the number of successful requests returned by the Head. |
| `jina_failed_requests` | [Counter](https://opentelemetry.io/docs/reference/specification/metrics/api/#counter) | Counts the number of failed requests returned by the Head. |
| `jina_sent_request_bytes` | [Histogram](https://opentelemetry.io/docs/reference/specification/metrics/api/#histogram) | Measures the size in bytes of the request sent by the Head to the Executor. |
| `jina_received_response_bytes` | [Histogram](https://opentelemetry.io/docs/reference/specification/metrics/api/#histogram) | Measures the size in bytes of the response returned by the Executor. |
| `jina_received_request_bytes` | [Histogram](https://opentelemetry.io/docs/reference/specification/metrics/api/#histogram) | Measures the size of the request in bytes received at the Head level. |
| `jina_sent_response_bytes` | [Histogram](https://opentelemetry.io/docs/reference/specification/metrics/api/#histogram) | Measures the size in bytes of the response returned from the Head to the Gateway. |

#### Executor Pods

Expand Down
8 changes: 4 additions & 4 deletions jina/serve/runtimes/gateway/graph/topology_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
from jina.excepts import InternalNetworkError
from jina.serve.networking import GrpcConnectionPool
from jina.serve.runtimes.helper import _parse_specific_params
from jina.serve.runtimes.request_handlers.worker_request_handler import (
WorkerRequestHandler,
)
from jina.serve.runtimes.worker.request_handling import WorkerRequestHandler
from jina.types.request.data import DataRequest


Expand Down Expand Up @@ -127,7 +125,9 @@ async def _wait_previous_and_send(
request.parameters = _parse_specific_params(
request.parameters, self.name
)
req_to_send = copy.deepcopy(request) if copy_request_at_send else request
req_to_send = (
copy.deepcopy(request) if copy_request_at_send else request
)
self.parts_to_send.append(req_to_send)
# this is a specific needs
if len(self.parts_to_send) == self.number_of_parts:
Expand Down
211 changes: 4 additions & 207 deletions jina/serve/runtimes/gateway/request_handling.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
import asyncio
import copy
import time
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple

import grpc.aio
from docarray import DocumentArray

from docarray import DocumentArray
from jina.excepts import InternalNetworkError
from jina.importer import ImportExtensions
from jina.helper import GATEWAY_NAME
from jina.proto import jina_pb2
from jina.serve.networking import GrpcConnectionPool
from jina.serve.runtimes.gateway.graph.topology_graph import TopologyGraph
from jina.serve.runtimes.helper import _is_param_for_specific_executor
from jina.serve.runtimes.request_handlers.worker_request_handler import WorkerRequestHandler
from jina.serve.runtimes.monitoring import MonitoringRequestMixin
from jina.serve.runtimes.worker.request_handling import WorkerRequestHandler

if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from asyncio import Future

from opentelemetry.metrics import Meter
Expand All @@ -24,207 +22,6 @@
from jina.types.request import Request


class MonitoringRequestMixin:
"""
Mixin for the request handling monitoring

:param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor or from the data request handler
:param runtime_name: optional runtime_name that will be registered during monitoring
"""

def __init__(
self,
metrics_registry: Optional['CollectorRegistry'] = None,
meter: Optional['Meter'] = None,
runtime_name: Optional[str] = None,
):

self._request_init_time = {} if metrics_registry else None
self._meter_request_init_time = {} if meter else None

if metrics_registry:
with ImportExtensions(
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Counter, Gauge, Summary

from jina.serve.monitoring import _SummaryDeprecated

self._receiving_request_metrics = Summary(
'receiving_request_seconds',
'Time spent processing successful request',
registry=metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(runtime_name)

self._pending_requests_metrics = Gauge(
'number_of_pending_requests',
'Number of pending requests',
registry=metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(runtime_name)

self._failed_requests_metrics = Counter(
'failed_requests',
'Number of failed requests',
registry=metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(runtime_name)

self._successful_requests_metrics = Counter(
'successful_requests',
'Number of successful requests',
registry=metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(runtime_name)

self._request_size_metrics = _SummaryDeprecated(
old_name='request_size_bytes',
name='received_request_bytes',
documentation='The size in bytes of the request returned to the client',
namespace='jina',
labelnames=('runtime_name',),
registry=metrics_registry,
).labels(runtime_name)

self._sent_response_bytes = Summary(
'sent_response_bytes',
'The size in bytes of the request returned to the client',
namespace='jina',
labelnames=('runtime_name',),
registry=metrics_registry,
).labels(runtime_name)

else:
self._receiving_request_metrics = None
self._pending_requests_metrics = None
self._failed_requests_metrics = None
self._successful_requests_metrics = None
self._request_size_metrics = None
self._sent_response_bytes = None

if meter:
self._receiving_request_histogram = meter.create_histogram(
name='jina_receiving_request_seconds',
description='Time spent processing successful request',
)

self._pending_requests_up_down_counter = meter.create_up_down_counter(
name='jina_number_of_pending_requests',
description='Number of pending requests',
)

self._failed_requests_counter = meter.create_counter(
name='jina_failed_requests',
description='Number of failed requests',
)

self._successful_requests_counter = meter.create_counter(
name='jina_successful_requests',
description='Number of successful requests',
)

self._request_size_histogram = meter.create_histogram(
name='jina_received_request_bytes',
description='The size in bytes of the request returned to the client',
)

self._sent_response_bytes_histogram = meter.create_histogram(
name='jina_sent_response_bytes',
description='The size in bytes of the request returned to the client',
)
else:
self._receiving_request_histogram = None
self._pending_requests_up_down_counter = None
self._failed_requests_counter = None
self._successful_requests_counter = None
self._request_size_histogram = None
self._sent_response_bytes_histogram = None
self._metric_labels = {'runtime_name': runtime_name}

def _update_start_request_metrics(self, request: 'Request'):
if self._request_size_metrics:
self._request_size_metrics.observe(request.nbytes)
if self._request_size_histogram:
self._request_size_histogram.record(
request.nbytes, attributes=self._metric_labels
)

if self._receiving_request_metrics:
self._request_init_time[request.request_id] = time.time()
if self._receiving_request_histogram:
self._meter_request_init_time[request.request_id] = time.time()

if self._pending_requests_metrics:
self._pending_requests_metrics.inc()
if self._pending_requests_up_down_counter:
self._pending_requests_up_down_counter.add(
1, attributes=self._metric_labels
)

def _update_end_successful_requests_metrics(self, result: 'Request'):
if (
self._receiving_request_metrics
): # this one should only be observed when the metrics is succesful
init_time = self._request_init_time.pop(
result.request_id
) # need to pop otherwise it stays in memory forever
self._receiving_request_metrics.observe(time.time() - init_time)
if (
self._receiving_request_histogram
): # this one should only be observed when the metrics is succesful
init_time = self._meter_request_init_time.pop(
result.request_id
) # need to pop otherwise it stays in memory forever
self._receiving_request_histogram.record(
time.time() - init_time, attributes=self._metric_labels
)

if self._pending_requests_metrics:
self._pending_requests_metrics.dec()
if self._pending_requests_up_down_counter:
self._pending_requests_up_down_counter.add(
-1, attributes=self._metric_labels
)

if self._successful_requests_metrics:
self._successful_requests_metrics.inc()
if self._successful_requests_counter:
self._successful_requests_counter.add(1, attributes=self._metric_labels)

if self._sent_response_bytes:
self._sent_response_bytes.observe(result.nbytes)
if self._sent_response_bytes_histogram:
self._sent_response_bytes_histogram.record(
result.nbytes, attributes=self._metric_labels
)

def _update_end_failed_requests_metrics(self):
if self._pending_requests_metrics:
self._pending_requests_metrics.dec()
if self._pending_requests_up_down_counter:
self._pending_requests_up_down_counter.add(
-1, attributes=self._metric_labels
)

if self._failed_requests_metrics:
self._failed_requests_metrics.inc()
if self._failed_requests_counter:
self._failed_requests_counter.add(1, attributes=self._metric_labels)

def _update_end_request_metrics(self, result: 'Request'):

if result.status.code != jina_pb2.StatusProto.ERROR:
self._update_end_successful_requests_metrics(result)
else:
self._update_end_failed_requests_metrics()


class GatewayRequestHandler(MonitoringRequestMixin):
"""
Class that handles the requests arriving to the gateway and the result extracted from the requests future.
Expand Down
Loading