diff --git a/comps/cores/mega/base_statistics.py b/comps/cores/mega/base_statistics.py index e22840c8b..b0285e73f 100644 --- a/comps/cores/mega/base_statistics.py +++ b/comps/cores/mega/base_statistics.py @@ -21,7 +21,7 @@ def append_latency(self, latency, first_token_latency=None): if first_token_latency: self.first_token_latencies.append(first_token_latency) - def calcuate_statistics(self): + def calculate_statistics(self): if not self.response_times: return { "p50_latency": None, @@ -42,7 +42,7 @@ def calcuate_statistics(self): "average_latency": avg, } - def calcuate_first_token_statistics(self): + def calculate_first_token_statistics(self): if not self.first_token_latencies: return { "p50_latency_first_token": None, @@ -79,7 +79,7 @@ def collect_all_statistics(): results = {} if statistics_dict: for name, statistic in statistics_dict.items(): - tmp_dict = statistic.calcuate_statistics() - tmp_dict.update(statistic.calcuate_first_token_statistics()) + tmp_dict = statistic.calculate_statistics() + tmp_dict.update(statistic.calculate_first_token_statistics()) results.update({name: tmp_dict}) return results diff --git a/comps/cores/mega/gateway.py b/comps/cores/mega/gateway.py index 3c5792843..148143ba7 100644 --- a/comps/cores/mega/gateway.py +++ b/comps/cores/mega/gateway.py @@ -42,6 +42,7 @@ def __init__( self.input_datatype = input_datatype self.output_datatype = output_datatype self.service = MicroService( + self.__class__.__name__, service_role=ServiceRoleType.MEGASERVICE, service_type=ServiceType.GATEWAY, host=self.host, diff --git a/comps/cores/mega/http_service.py b/comps/cores/mega/http_service.py index 86c5ba8d5..62b9cd5e3 100644 --- a/comps/cores/mega/http_service.py +++ b/comps/cores/mega/http_service.py @@ -33,7 +33,16 @@ def __init__( self.uvicorn_kwargs = uvicorn_kwargs or {} self.cors = cors self._app = self._create_app() - Instrumentator().instrument(self._app).expose(self._app) + + # remove part before '@', used by register_microservice() callers, and + # part after '/', added by MicroService(), to get real service name + suffix = self.title.split("/")[0].split("@")[-1].lower() + instrumentator = Instrumentator( + inprogress_name=f"http_requests_inprogress_{suffix}", + should_instrument_requests_inprogress=True, + inprogress_labels=True, + ) + instrumentator.instrument(self._app).expose(self._app) @property def app(self): diff --git a/comps/cores/mega/micro_service.py b/comps/cores/mega/micro_service.py index 9d707fa68..89e4cd944 100644 --- a/comps/cores/mega/micro_service.py +++ b/comps/cores/mega/micro_service.py @@ -17,7 +17,7 @@ class MicroService: def __init__( self, - name: Optional[str] = None, + name: str, service_role: ServiceRoleType = ServiceRoleType.MICROSERVICE, service_type: ServiceType = ServiceType.LLM, protocol: str = "http", @@ -141,7 +141,7 @@ def endpoint_path(self): def register_microservice( - name: Optional[str] = None, + name: str, service_role: ServiceRoleType = ServiceRoleType.MICROSERVICE, service_type: ServiceType = ServiceType.UNDEFINED, protocol: str = "http", diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 76c2f1336..72a5aa66f 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -6,11 +6,13 @@ import json import os import re +import time from typing import Dict, List import aiohttp import requests from fastapi.responses import StreamingResponse +from prometheus_client import Histogram from pydantic import BaseModel from ..proto.docarray import LLMParams @@ -22,10 +24,36 @@ LOGFLAG = os.getenv("LOGFLAG", False) +class OrchestratorMetrics: + # Because: + # - CI creates several orchestrator instances + # - Prometheus requires metrics to be singletons + # - Oorchestror instances are not provided their own names + # Metrics are class members with "megaservice" name prefix + first_token_latency = Histogram("megaservice_first_token_latency", "First token latency (histogram)") + inter_token_latency = Histogram("megaservice_inter_token_latency", "Inter-token latency (histogram)") + request_latency = Histogram("megaservice_request_latency", "Whole request/reply latency (histogram)") + + def __init__(self) -> None: + pass + + def token_update(self, token_start: float, is_first: bool) -> float: + now = time.time() + if is_first: + self.first_token_latency.observe(now - token_start) + else: + self.inter_token_latency.observe(now - token_start) + return now + + def request_update(self, req_start: float) -> None: + self.request_latency.observe(time.time() - req_start) + + class ServiceOrchestrator(DAG): """Manage 1 or N micro services in a DAG through Python API.""" def __init__(self) -> None: + self.metrics = OrchestratorMetrics() self.services = {} # all services, id -> service super().__init__() @@ -52,11 +80,12 @@ async def schedule(self, initial_inputs: Dict | BaseModel, llm_parameters: LLMPa if LOGFLAG: logger.info(initial_inputs) + req_start = time.time() timeout = aiohttp.ClientTimeout(total=1000) async with aiohttp.ClientSession(trust_env=True, timeout=timeout) as session: pending = { asyncio.create_task( - self.execute(session, node, initial_inputs, runtime_graph, llm_parameters, **kwargs) + self.execute(session, req_start, node, initial_inputs, runtime_graph, llm_parameters, **kwargs) ) for node in self.ind_nodes() } @@ -101,7 +130,9 @@ def fake_stream(text): inputs = self.process_outputs(runtime_graph.predecessors(d_node), result_dict) pending.add( asyncio.create_task( - self.execute(session, d_node, inputs, runtime_graph, llm_parameters, **kwargs) + self.execute( + session, req_start, d_node, inputs, runtime_graph, llm_parameters, **kwargs + ) ) ) nodes_to_keep = [] @@ -128,6 +159,7 @@ def process_outputs(self, prev_nodes: List, result_dict: Dict) -> Dict: async def execute( self, session: aiohttp.client.ClientSession, + req_start: float, cur_node: str, inputs: Dict, runtime_graph: DAG, @@ -144,7 +176,6 @@ async def execute( for field, value in llm_parameters_dict.items(): if inputs.get(field) != value: inputs[field] = value - # pre-process inputs = self.align_inputs(inputs, cur_node, runtime_graph, llm_parameters_dict, **kwargs) @@ -171,8 +202,11 @@ async def execute( downstream_endpoint = self.services[downstream[0]].endpoint_path def generate(): + token_start = req_start if response: + # response.elapsed = time until first headers received buffered_chunk_str = "" + is_first = True for chunk in response.iter_content(chunk_size=None): if chunk: if downstream: @@ -191,9 +225,15 @@ def generate(): else: raise Exception("Other response types not supported yet!") buffered_chunk_str = "" # clear - yield from self.token_generator(res_txt, is_last=is_last) + yield from self.token_generator( + res_txt, token_start, is_first=is_first, is_last=is_last + ) + token_start = time.time() else: yield chunk + token_start = self.metrics.token_update(token_start, is_first) + is_first = False + self.metrics.request_update(req_start) return ( StreamingResponse(self.align_generator(generate(), **kwargs), media_type="text/event-stream"), @@ -256,11 +296,12 @@ def extract_chunk_str(self, chunk_str): chunk_str = chunk_str[: -len(suffix)] return chunk_str - def token_generator(self, sentence, is_last=False): + def token_generator(self, sentence: str, token_start: float, is_first: bool, is_last: bool) -> str: prefix = "data: " suffix = "\n\n" tokens = re.findall(r"\s?\S+\s?", sentence, re.UNICODE) for token in tokens: yield prefix + repr(token.replace("\\n", "\n").encode("utf-8")) + suffix + token_start = self.metrics.token_update(token_start, is_first) if is_last: yield "data: [DONE]\n\n" diff --git a/tests/cores/mega/test_service_orchestrator_streaming.py b/tests/cores/mega/test_service_orchestrator_streaming.py index cd46507a8..d2331dab6 100644 --- a/tests/cores/mega/test_service_orchestrator_streaming.py +++ b/tests/cores/mega/test_service_orchestrator_streaming.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import json +import time import unittest from fastapi.responses import StreamingResponse @@ -66,11 +67,16 @@ def test_extract_chunk_str(self): self.assertEqual(res, "example test.") def test_token_generator(self): + start = time.time() sentence = "I write an example test." - for i in self.service_builder.token_generator(sentence=sentence, is_last=False): + for i in self.service_builder.token_generator( + sentence=sentence, token_start=start, is_first=True, is_last=False + ): self.assertTrue(i.startswith("data: b'")) - for i in self.service_builder.token_generator(sentence=sentence, is_last=True): + for i in self.service_builder.token_generator( + sentence=sentence, token_start=start, is_first=False, is_last=True + ): self.assertTrue(i.startswith("data: "))