Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add microservice level perf statistics #135

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions comps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@

# Telemetry
from comps.cores.telemetry.opea_telemetry import opea_telemetry

# Statistics
from comps.cores.mega.base_statistics import statistics_dict, register_statistics
85 changes: 85 additions & 0 deletions comps/cores/mega/base_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import numpy as np

# name => statistic dict
statistics_dict = {}


class BaseStatistics:
"""Base class to store in-memory statistics of an entity for measurement in one service."""

def __init__(
self,
):
self.response_times = [] # store responses time for all requests
self.first_token_latencies = [] # store first token latencies for all requests

def append_latency(self, latency, first_token_latency=None):
self.response_times.append(latency)
if first_token_latency:
self.first_token_latencies.append(first_token_latency)

def calcuate_statistics(self):
if not self.response_times:
return {
"p50_latency": None,
"p99_latency": None,
"average_latency": None,
}
# Calculate the P50 (median)
p50 = np.percentile(self.response_times, 50)

# Calculate the P99
p99 = np.percentile(self.response_times, 99)

avg = np.average(self.response_times)

return {
"p50_latency": p50,
"p99_latency": p99,
"average_latency": avg,
}

def calcuate_first_token_statistics(self):
if not self.first_token_latencies:
return {
"p50_latency_first_token": None,
"p99_latency_first_token": None,
"average_latency_first_token": None,
}
# Calculate the P50 (median)
p50 = np.percentile(self.first_token_latencies, 50)

# Calculate the P99
p99 = np.percentile(self.first_token_latencies, 99)

avg = np.average(self.first_token_latencies)

return {
"p50_latency_first_token": p50,
"p99_latency_first_token": p99,
"average_latency_first_token": avg,
}


def register_statistics(
names,
):
def decorator(func):
for name in names:
statistics_dict[name] = BaseStatistics()
return func

return decorator


def collect_all_statistics():
results = {}
if statistics_dict:
for name, statistic in statistics_dict.items():
tmp_dict = statistic.calcuate_statistics()
tmp_dict.update(statistic.calcuate_first_token_statistics())
results.update({name: tmp_dict})
return results
11 changes: 11 additions & 0 deletions comps/cores/mega/http_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uvicorn import Config, Server

from .base_service import BaseService
from .base_statistics import collect_all_statistics


class HTTPService(BaseService):
Expand Down Expand Up @@ -66,6 +67,16 @@ async def _health_check():
"""Get the health status of this GenAI microservice."""
return {"Service Title": self.title, "Service Description": self.description}

@app.get(
path="/v1/statistics",
summary="Get the statistics of GenAI services",
tags=["Debug"],
)
async def _get_statistics():
"""Get the statistics of GenAI services."""
result = collect_all_statistics()
return result

return app

async def initialize_server(self):
Expand Down
14 changes: 13 additions & 1 deletion comps/embeddings/langchain/embedding_tei_gaudi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@
# SPDX-License-Identifier: Apache-2.0

import os
import time

from langchain_community.embeddings import HuggingFaceHubEmbeddings
from langsmith import traceable

from comps import EmbedDoc768, ServiceType, TextDoc, opea_microservices, register_microservice
from comps import (
EmbedDoc768,
ServiceType,
TextDoc,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)


@register_microservice(
Expand All @@ -19,10 +28,13 @@
output_datatype=EmbedDoc768,
)
@traceable(run_type="embedding")
@register_statistics(names=["opea_service@embedding_tgi_gaudi"])
def embedding(input: TextDoc) -> EmbedDoc768:
start = time.time()
embed_vector = embeddings.embed_query(input.text)
embed_vector = embed_vector[:768] # Keep only the first 768 elements
res = EmbedDoc768(text=input.text, embedding=embed_vector)
statistics_dict["opea_service@embedding_tgi_gaudi"].append_latency(time.time() - start, None)
return res


Expand Down
18 changes: 17 additions & 1 deletion comps/llms/text-generation/tgi/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
# SPDX-License-Identifier: Apache-2.0

import os
import time

from fastapi.responses import StreamingResponse
from langchain_community.llms import HuggingFaceEndpoint
from langsmith import traceable

from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice
from comps import (
GeneratedDoc,
LLMParamsDoc,
ServiceType,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)


@register_microservice(
Expand All @@ -18,7 +27,9 @@
port=9000,
)
@traceable(run_type="llm")
@register_statistics(names=["opea_service@llm_tgi"])
def llm_generate(input: LLMParamsDoc):
start = time.time()
llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080")
llm = HuggingFaceEndpoint(
endpoint_url=llm_endpoint,
Expand All @@ -34,19 +45,24 @@ def llm_generate(input: LLMParamsDoc):

if input.streaming:

stream_gen_time = []

async def stream_generator():
chat_response = ""
async for text in llm.astream(input.query):
stream_gen_time.append(time.time() - start)
chat_response += text
chunk_repr = repr(text.encode("utf-8"))
print(f"[llm - chat_stream] chunk:{chunk_repr}")
yield f"data: {chunk_repr}\n\n"
print(f"[llm - chat_stream] stream response: {chat_response}")
statistics_dict["opea_service@llm_tgi"].append_latency(stream_gen_time[-1], stream_gen_time[0])
yield "data: [DONE]\n\n"

return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
response = llm.invoke(input.query)
statistics_dict["opea_service@llm_tgi"].append_latency(time.time() - start, None)
return GeneratedDoc(text=response, prompt=input.query)


Expand Down
14 changes: 13 additions & 1 deletion comps/reranks/langchain/reranking_tei_xeon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@

import json
import os
import time

import requests
from langchain_core.prompts import ChatPromptTemplate
from langsmith import traceable

from comps import LLMParamsDoc, SearchedDoc, ServiceType, opea_microservices, register_microservice
from comps import (
LLMParamsDoc,
SearchedDoc,
ServiceType,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)


@register_microservice(
Expand All @@ -21,7 +30,9 @@
output_datatype=LLMParamsDoc,
)
@traceable(run_type="llm")
@register_statistics(names=["opea_service@reranking_tgi_gaudi"])
def reranking(input: SearchedDoc) -> LLMParamsDoc:
start = time.time()
docs = [doc.text for doc in input.retrieved_docs]
url = tei_reranking_endpoint + "/rerank"
data = {"query": input.initial_query, "texts": docs}
Expand All @@ -36,6 +47,7 @@ def reranking(input: SearchedDoc) -> LLMParamsDoc:
prompt = ChatPromptTemplate.from_template(template)
doc = input.retrieved_docs[best_response["index"]]
final_prompt = prompt.format(context=doc.text, question=input.initial_query)
statistics_dict["opea_service@reranking_tgi_gaudi"].append_latency(time.time() - start, None)
return LLMParamsDoc(query=final_prompt.strip())


Expand Down
17 changes: 16 additions & 1 deletion comps/retrievers/langchain/retriever_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@
# SPDX-License-Identifier: Apache-2.0

import os
import time

from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from langchain_community.vectorstores import Redis
from langsmith import traceable
from redis_config import EMBED_MODEL, INDEX_NAME, INDEX_SCHEMA, REDIS_URL

from comps import EmbedDoc768, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice
from comps import (
EmbedDoc768,
SearchedDoc,
ServiceType,
TextDoc,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)

tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT")

Expand All @@ -21,7 +31,10 @@
port=7000,
)
@traceable(run_type="retriever")
@register_statistics(names=["opea_service@retriever_redis"])
def retrieve(input: EmbedDoc768) -> SearchedDoc:
start = time.time()

# Create vectorstore
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
Expand All @@ -41,6 +54,8 @@ def retrieve(input: EmbedDoc768) -> SearchedDoc:
for r in search_res:
searched_docs.append(TextDoc(text=r.page_content))
result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text)
statistics_dict["opea_service@retriever_redis"].append_latency(time.time() - start, None)

return result


Expand Down
Loading