diff --git a/comps/__init__.py b/comps/__init__.py index 624260e99..6b9bd0d9f 100644 --- a/comps/__init__.py +++ b/comps/__init__.py @@ -29,3 +29,6 @@ # Telemetry from comps.cores.telemetry.opea_telemetry import opea_telemetry + +# Statistics +from comps.cores.mega.base_statistics import statistics_dict, register_statistics diff --git a/comps/cores/mega/base_statistics.py b/comps/cores/mega/base_statistics.py new file mode 100644 index 000000000..e22840c8b --- /dev/null +++ b/comps/cores/mega/base_statistics.py @@ -0,0 +1,85 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np + +# name => statistic dict +statistics_dict = {} + + +class BaseStatistics: + """Base class to store in-memory statistics of an entity for measurement in one service.""" + + def __init__( + self, + ): + self.response_times = [] # store responses time for all requests + self.first_token_latencies = [] # store first token latencies for all requests + + def append_latency(self, latency, first_token_latency=None): + self.response_times.append(latency) + if first_token_latency: + self.first_token_latencies.append(first_token_latency) + + def calcuate_statistics(self): + if not self.response_times: + return { + "p50_latency": None, + "p99_latency": None, + "average_latency": None, + } + # Calculate the P50 (median) + p50 = np.percentile(self.response_times, 50) + + # Calculate the P99 + p99 = np.percentile(self.response_times, 99) + + avg = np.average(self.response_times) + + return { + "p50_latency": p50, + "p99_latency": p99, + "average_latency": avg, + } + + def calcuate_first_token_statistics(self): + if not self.first_token_latencies: + return { + "p50_latency_first_token": None, + "p99_latency_first_token": None, + "average_latency_first_token": None, + } + # Calculate the P50 (median) + p50 = np.percentile(self.first_token_latencies, 50) + + # Calculate the P99 + p99 = np.percentile(self.first_token_latencies, 99) + + avg = np.average(self.first_token_latencies) + + return { + "p50_latency_first_token": p50, + "p99_latency_first_token": p99, + "average_latency_first_token": avg, + } + + +def register_statistics( + names, +): + def decorator(func): + for name in names: + statistics_dict[name] = BaseStatistics() + return func + + return decorator + + +def collect_all_statistics(): + results = {} + if statistics_dict: + for name, statistic in statistics_dict.items(): + tmp_dict = statistic.calcuate_statistics() + tmp_dict.update(statistic.calcuate_first_token_statistics()) + results.update({name: tmp_dict}) + return results diff --git a/comps/cores/mega/http_service.py b/comps/cores/mega/http_service.py index ee2d453ee..470059b82 100644 --- a/comps/cores/mega/http_service.py +++ b/comps/cores/mega/http_service.py @@ -7,6 +7,7 @@ from uvicorn import Config, Server from .base_service import BaseService +from .base_statistics import collect_all_statistics class HTTPService(BaseService): @@ -66,6 +67,16 @@ async def _health_check(): """Get the health status of this GenAI microservice.""" return {"Service Title": self.title, "Service Description": self.description} + @app.get( + path="/v1/statistics", + summary="Get the statistics of GenAI services", + tags=["Debug"], + ) + async def _get_statistics(): + """Get the statistics of GenAI services.""" + result = collect_all_statistics() + return result + return app async def initialize_server(self): diff --git a/comps/embeddings/langchain/embedding_tei_gaudi.py b/comps/embeddings/langchain/embedding_tei_gaudi.py index c1e379ace..9c148edb5 100644 --- a/comps/embeddings/langchain/embedding_tei_gaudi.py +++ b/comps/embeddings/langchain/embedding_tei_gaudi.py @@ -2,11 +2,20 @@ # SPDX-License-Identifier: Apache-2.0 import os +import time from langchain_community.embeddings import HuggingFaceHubEmbeddings from langsmith import traceable -from comps import EmbedDoc768, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import ( + EmbedDoc768, + ServiceType, + TextDoc, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) @register_microservice( @@ -19,10 +28,13 @@ output_datatype=EmbedDoc768, ) @traceable(run_type="embedding") +@register_statistics(names=["opea_service@embedding_tgi_gaudi"]) def embedding(input: TextDoc) -> EmbedDoc768: + start = time.time() embed_vector = embeddings.embed_query(input.text) embed_vector = embed_vector[:768] # Keep only the first 768 elements res = EmbedDoc768(text=input.text, embedding=embed_vector) + statistics_dict["opea_service@embedding_tgi_gaudi"].append_latency(time.time() - start, None) return res diff --git a/comps/llms/text-generation/tgi/llm.py b/comps/llms/text-generation/tgi/llm.py index ff1b2bb92..3d9079a69 100644 --- a/comps/llms/text-generation/tgi/llm.py +++ b/comps/llms/text-generation/tgi/llm.py @@ -2,12 +2,21 @@ # SPDX-License-Identifier: Apache-2.0 import os +import time from fastapi.responses import StreamingResponse from langchain_community.llms import HuggingFaceEndpoint from langsmith import traceable -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice +from comps import ( + GeneratedDoc, + LLMParamsDoc, + ServiceType, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) @register_microservice( @@ -18,7 +27,9 @@ port=9000, ) @traceable(run_type="llm") +@register_statistics(names=["opea_service@llm_tgi"]) def llm_generate(input: LLMParamsDoc): + start = time.time() llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") llm = HuggingFaceEndpoint( endpoint_url=llm_endpoint, @@ -34,19 +45,24 @@ def llm_generate(input: LLMParamsDoc): if input.streaming: + stream_gen_time = [] + async def stream_generator(): chat_response = "" async for text in llm.astream(input.query): + stream_gen_time.append(time.time() - start) chat_response += text chunk_repr = repr(text.encode("utf-8")) print(f"[llm - chat_stream] chunk:{chunk_repr}") yield f"data: {chunk_repr}\n\n" print(f"[llm - chat_stream] stream response: {chat_response}") + statistics_dict["opea_service@llm_tgi"].append_latency(stream_gen_time[-1], stream_gen_time[0]) yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: response = llm.invoke(input.query) + statistics_dict["opea_service@llm_tgi"].append_latency(time.time() - start, None) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/reranks/langchain/reranking_tei_xeon.py b/comps/reranks/langchain/reranking_tei_xeon.py index 0b7a0016f..394264743 100644 --- a/comps/reranks/langchain/reranking_tei_xeon.py +++ b/comps/reranks/langchain/reranking_tei_xeon.py @@ -3,12 +3,21 @@ import json import os +import time import requests from langchain_core.prompts import ChatPromptTemplate from langsmith import traceable -from comps import LLMParamsDoc, SearchedDoc, ServiceType, opea_microservices, register_microservice +from comps import ( + LLMParamsDoc, + SearchedDoc, + ServiceType, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) @register_microservice( @@ -21,7 +30,9 @@ output_datatype=LLMParamsDoc, ) @traceable(run_type="llm") +@register_statistics(names=["opea_service@reranking_tgi_gaudi"]) def reranking(input: SearchedDoc) -> LLMParamsDoc: + start = time.time() docs = [doc.text for doc in input.retrieved_docs] url = tei_reranking_endpoint + "/rerank" data = {"query": input.initial_query, "texts": docs} @@ -36,6 +47,7 @@ def reranking(input: SearchedDoc) -> LLMParamsDoc: prompt = ChatPromptTemplate.from_template(template) doc = input.retrieved_docs[best_response["index"]] final_prompt = prompt.format(context=doc.text, question=input.initial_query) + statistics_dict["opea_service@reranking_tgi_gaudi"].append_latency(time.time() - start, None) return LLMParamsDoc(query=final_prompt.strip()) diff --git a/comps/retrievers/langchain/retriever_redis.py b/comps/retrievers/langchain/retriever_redis.py index 15bf65add..50b461d34 100644 --- a/comps/retrievers/langchain/retriever_redis.py +++ b/comps/retrievers/langchain/retriever_redis.py @@ -2,13 +2,23 @@ # SPDX-License-Identifier: Apache-2.0 import os +import time from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings from langchain_community.vectorstores import Redis from langsmith import traceable from redis_config import EMBED_MODEL, INDEX_NAME, INDEX_SCHEMA, REDIS_URL -from comps import EmbedDoc768, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import ( + EmbedDoc768, + SearchedDoc, + ServiceType, + TextDoc, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT") @@ -21,12 +31,15 @@ port=7000, ) @traceable(run_type="retriever") +@register_statistics(names=["opea_service@retriever_redis"]) def retrieve(input: EmbedDoc768) -> SearchedDoc: + start = time.time() search_res = vector_db.similarity_search_by_vector(embedding=input.embedding) searched_docs = [] for r in search_res: searched_docs.append(TextDoc(text=r.page_content)) result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) + statistics_dict["opea_service@retriever_redis"].append_latency(time.time() - start, None) return result