Skip to content

Commit

Permalink
Revert "Add E2E Promeheus metrics to applications (#845)"
Browse files Browse the repository at this point in the history
This reverts commit a6998a1.
  • Loading branch information
Spycsh authored Nov 4, 2024
1 parent a6998a1 commit 43e6824
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 71 deletions.
8 changes: 4 additions & 4 deletions comps/cores/mega/base_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def append_latency(self, latency, first_token_latency=None):
if first_token_latency:
self.first_token_latencies.append(first_token_latency)

def calculate_statistics(self):
def calcuate_statistics(self):
if not self.response_times:
return {
"p50_latency": None,
Expand All @@ -42,7 +42,7 @@ def calculate_statistics(self):
"average_latency": avg,
}

def calculate_first_token_statistics(self):
def calcuate_first_token_statistics(self):
if not self.first_token_latencies:
return {
"p50_latency_first_token": None,
Expand Down Expand Up @@ -79,7 +79,7 @@ def collect_all_statistics():
results = {}
if statistics_dict:
for name, statistic in statistics_dict.items():
tmp_dict = statistic.calculate_statistics()
tmp_dict.update(statistic.calculate_first_token_statistics())
tmp_dict = statistic.calcuate_statistics()
tmp_dict.update(statistic.calcuate_first_token_statistics())
results.update({name: tmp_dict})
return results
1 change: 0 additions & 1 deletion comps/cores/mega/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def __init__(
self.input_datatype = input_datatype
self.output_datatype = output_datatype
self.service = MicroService(
self.__class__.__name__,
service_role=ServiceRoleType.MEGASERVICE,
service_type=ServiceType.GATEWAY,
host=self.host,
Expand Down
11 changes: 1 addition & 10 deletions comps/cores/mega/http_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,7 @@ def __init__(
self.uvicorn_kwargs = uvicorn_kwargs or {}
self.cors = cors
self._app = self._create_app()

# remove part before '@', used by register_microservice() callers, and
# part after '/', added by MicroService(), to get real service name
suffix = self.title.split("/")[0].split("@")[-1].lower()
instrumentator = Instrumentator(
inprogress_name=f"http_requests_inprogress_{suffix}",
should_instrument_requests_inprogress=True,
inprogress_labels=True,
)
instrumentator.instrument(self._app).expose(self._app)
Instrumentator().instrument(self._app).expose(self._app)

@property
def app(self):
Expand Down
4 changes: 2 additions & 2 deletions comps/cores/mega/micro_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class MicroService:

def __init__(
self,
name: str,
name: Optional[str] = None,
service_role: ServiceRoleType = ServiceRoleType.MICROSERVICE,
service_type: ServiceType = ServiceType.LLM,
protocol: str = "http",
Expand Down Expand Up @@ -141,7 +141,7 @@ def endpoint_path(self):


def register_microservice(
name: str,
name: Optional[str] = None,
service_role: ServiceRoleType = ServiceRoleType.MICROSERVICE,
service_type: ServiceType = ServiceType.UNDEFINED,
protocol: str = "http",
Expand Down
51 changes: 5 additions & 46 deletions comps/cores/mega/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
import json
import os
import re
import time
from typing import Dict, List

import aiohttp
import requests
from fastapi.responses import StreamingResponse
from prometheus_client import Histogram
from pydantic import BaseModel

from ..proto.docarray import LLMParams
Expand All @@ -24,36 +22,10 @@
LOGFLAG = os.getenv("LOGFLAG", False)


class OrchestratorMetrics:
# Because:
# - CI creates several orchestrator instances
# - Prometheus requires metrics to be singletons
# - Oorchestror instances are not provided their own names
# Metrics are class members with "megaservice" name prefix
first_token_latency = Histogram("megaservice_first_token_latency", "First token latency (histogram)")
inter_token_latency = Histogram("megaservice_inter_token_latency", "Inter-token latency (histogram)")
request_latency = Histogram("megaservice_request_latency", "Whole request/reply latency (histogram)")

def __init__(self) -> None:
pass

def token_update(self, token_start: float, is_first: bool) -> float:
now = time.time()
if is_first:
self.first_token_latency.observe(now - token_start)
else:
self.inter_token_latency.observe(now - token_start)
return now

def request_update(self, req_start: float) -> None:
self.request_latency.observe(time.time() - req_start)


class ServiceOrchestrator(DAG):
"""Manage 1 or N micro services in a DAG through Python API."""

def __init__(self) -> None:
self.metrics = OrchestratorMetrics()
self.services = {} # all services, id -> service
super().__init__()

Expand All @@ -80,12 +52,11 @@ async def schedule(self, initial_inputs: Dict | BaseModel, llm_parameters: LLMPa
if LOGFLAG:
logger.info(initial_inputs)

req_start = time.time()
timeout = aiohttp.ClientTimeout(total=1000)
async with aiohttp.ClientSession(trust_env=True, timeout=timeout) as session:
pending = {
asyncio.create_task(
self.execute(session, req_start, node, initial_inputs, runtime_graph, llm_parameters, **kwargs)
self.execute(session, node, initial_inputs, runtime_graph, llm_parameters, **kwargs)
)
for node in self.ind_nodes()
}
Expand Down Expand Up @@ -130,9 +101,7 @@ def fake_stream(text):
inputs = self.process_outputs(runtime_graph.predecessors(d_node), result_dict)
pending.add(
asyncio.create_task(
self.execute(
session, req_start, d_node, inputs, runtime_graph, llm_parameters, **kwargs
)
self.execute(session, d_node, inputs, runtime_graph, llm_parameters, **kwargs)
)
)
nodes_to_keep = []
Expand All @@ -159,7 +128,6 @@ def process_outputs(self, prev_nodes: List, result_dict: Dict) -> Dict:
async def execute(
self,
session: aiohttp.client.ClientSession,
req_start: float,
cur_node: str,
inputs: Dict,
runtime_graph: DAG,
Expand All @@ -176,6 +144,7 @@ async def execute(
for field, value in llm_parameters_dict.items():
if inputs.get(field) != value:
inputs[field] = value

# pre-process
inputs = self.align_inputs(inputs, cur_node, runtime_graph, llm_parameters_dict, **kwargs)

Expand All @@ -202,11 +171,8 @@ async def execute(
downstream_endpoint = self.services[downstream[0]].endpoint_path

def generate():
token_start = req_start
if response:
# response.elapsed = time until first headers received
buffered_chunk_str = ""
is_first = True
for chunk in response.iter_content(chunk_size=None):
if chunk:
if downstream:
Expand All @@ -225,15 +191,9 @@ def generate():
else:
raise Exception("Other response types not supported yet!")
buffered_chunk_str = "" # clear
yield from self.token_generator(
res_txt, token_start, is_first=is_first, is_last=is_last
)
token_start = time.time()
yield from self.token_generator(res_txt, is_last=is_last)
else:
yield chunk
token_start = self.metrics.token_update(token_start, is_first)
is_first = False
self.metrics.request_update(req_start)

return (
StreamingResponse(self.align_generator(generate(), **kwargs), media_type="text/event-stream"),
Expand Down Expand Up @@ -296,12 +256,11 @@ def extract_chunk_str(self, chunk_str):
chunk_str = chunk_str[: -len(suffix)]
return chunk_str

def token_generator(self, sentence: str, token_start: float, is_first: bool, is_last: bool) -> str:
def token_generator(self, sentence, is_last=False):
prefix = "data: "
suffix = "\n\n"
tokens = re.findall(r"\s?\S+\s?", sentence, re.UNICODE)
for token in tokens:
yield prefix + repr(token.replace("\\n", "\n").encode("utf-8")) + suffix
token_start = self.metrics.token_update(token_start, is_first)
if is_last:
yield "data: [DONE]\n\n"
10 changes: 2 additions & 8 deletions tests/cores/mega/test_service_orchestrator_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0

import json
import time
import unittest

from fastapi.responses import StreamingResponse
Expand Down Expand Up @@ -67,16 +66,11 @@ def test_extract_chunk_str(self):
self.assertEqual(res, "example test.")

def test_token_generator(self):
start = time.time()
sentence = "I write an example test.</s>"
for i in self.service_builder.token_generator(
sentence=sentence, token_start=start, is_first=True, is_last=False
):
for i in self.service_builder.token_generator(sentence=sentence, is_last=False):
self.assertTrue(i.startswith("data: b'"))

for i in self.service_builder.token_generator(
sentence=sentence, token_start=start, is_first=False, is_last=True
):
for i in self.service_builder.token_generator(sentence=sentence, is_last=True):
self.assertTrue(i.startswith("data: "))


Expand Down

0 comments on commit 43e6824

Please sign in to comment.