diff --git a/.github/workflows/docker/code-scan.dockerfile b/.github/workflows/docker/code-scan.dockerfile index 786ec3ad8..129e146b0 100644 --- a/.github/workflows/docker/code-scan.dockerfile +++ b/.github/workflows/docker/code-scan.dockerfile @@ -4,7 +4,7 @@ ARG UBUNTU_VER=22.04 FROM ubuntu:${UBUNTU_VER} as devel -ENV LANG C.UTF-8 +ENV LANG=C.UTF-8 RUN apt-get update && apt-get install -y --no-install-recommends --fix-missing \ aspell \ diff --git a/.github/workflows/docker/ut.dockerfile b/.github/workflows/docker/ut.dockerfile index 328984ff5..1453b1693 100644 --- a/.github/workflows/docker/ut.dockerfile +++ b/.github/workflows/docker/ut.dockerfile @@ -4,7 +4,7 @@ ARG UBUNTU_VER=22.04 FROM ubuntu:${UBUNTU_VER} as devel -ENV LANG C.UTF-8 +ENV LANG=C.UTF-8 RUN apt-get update && apt-get install -y --no-install-recommends --fix-missing \ aspell \ diff --git a/.github/workflows/scripts/codeScan/bandit.sh b/.github/workflows/scripts/codeScan/bandit.sh index e0f5137d2..aa5aa93a7 100644 --- a/.github/workflows/scripts/codeScan/bandit.sh +++ b/.github/workflows/scripts/codeScan/bandit.sh @@ -6,7 +6,7 @@ source /GenAIComps/.github/workflows/scripts/change_color pip install bandit==1.7.8 log_dir=/GenAIComps/.github/workflows/scripts/codeScan -python -m bandit -r -lll -iii /GenAIComps 2>&1 | tee ${log_dir}/bandit.log +python -m bandit -r -lll -iii /GenAIComps > ${log_dir}/bandit.log exit_code=$? $BOLD_YELLOW && echo " ----------------- Current log file output start --------------------------" diff --git a/comps/agent/langchain/docker/Dockerfile b/comps/agent/langchain/docker/Dockerfile index 2540c7bad..62b4ea2bc 100644 --- a/comps/agent/langchain/docker/Dockerfile +++ b/comps/agent/langchain/docker/Dockerfile @@ -3,7 +3,7 @@ FROM python:3.11-slim -ENV LANG C.UTF-8 +ENV LANG=C.UTF-8 RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ build-essential \ diff --git a/comps/chathistory/mongo/docker/Dockerfile b/comps/chathistory/mongo/docker/Dockerfile index 986aac504..5209af835 100644 --- a/comps/chathistory/mongo/docker/Dockerfile +++ b/comps/chathistory/mongo/docker/Dockerfile @@ -3,7 +3,7 @@ FROM python:3.11-slim -ENV LANG C.UTF-8 +ENV LANG=C.UTF-8 RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ build-essential \ diff --git a/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py b/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py index 414807eaa..c55165061 100644 --- a/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py +++ b/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py @@ -75,7 +75,7 @@ def prepare_env(enable_ray=False, pip_requirements=None): def generate_log_name(file_list): file_set = f"{sorted(file_list)}" # print(f"file_set: {file_set}") - md5_str = hashlib.md5(file_set.encode()).hexdigest() + md5_str = hashlib.md5(file_set.encode(), usedforsecurity=False).hexdigest() return f"status/status_{md5_str}.log" diff --git a/comps/dataprep/utils.py b/comps/dataprep/utils.py index 786366a12..46acc8f5b 100644 --- a/comps/dataprep/utils.py +++ b/comps/dataprep/utils.py @@ -11,6 +11,7 @@ import re import shutil import signal +import subprocess import timeit import unicodedata import urllib.parse @@ -157,7 +158,19 @@ def load_doc(doc_path): """Load doc file.""" print("Converting doc file to docx file...") docx_path = doc_path + "x" - os.system(f"libreoffice --headless --invisible --convert-to docx --outdir {os.path.dirname(docx_path)} {doc_path}") + subprocess.run( + [ + "libreoffice", + "--headless", + "--invisible", + "--convert-to", + "docx", + "--outdir", + os.path.dirname(docx_path), + doc_path, + ], + check=True, + ) print("Converted doc file to docx file.") text = load_docx(docx_path) os.remove(docx_path) @@ -196,7 +209,19 @@ def load_ppt(ppt_path): """Load ppt file.""" print("Converting ppt file to pptx file...") pptx_path = ppt_path + "x" - os.system(f"libreoffice --headless --invisible --convert-to pptx --outdir {os.path.dirname(pptx_path)} {ppt_path}") + subprocess.run( + [ + "libreoffice", + "--headless", + "--invisible", + "--convert-to", + "docx", + "--outdir", + os.path.dirname(pptx_path), + ppt_path, + ], + check=True, + ) print("Converted ppt file to pptx file.") text = load_pptx(pptx_path) os.remove(pptx_path) diff --git a/comps/guardrails/pii_detection/data_utils.py b/comps/guardrails/pii_detection/data_utils.py index 29e9c4196..8340579fb 100644 --- a/comps/guardrails/pii_detection/data_utils.py +++ b/comps/guardrails/pii_detection/data_utils.py @@ -6,6 +6,7 @@ import multiprocessing import os import re +import subprocess import unicodedata from urllib.parse import urlparse, urlunparse @@ -79,7 +80,8 @@ def load_doc(doc_path): """Load doc file.""" txt_path = doc_path.replace(".doc", ".txt") try: - os.system(f'antiword "{doc_path}" > "{txt_path}"') + with open(txt_path, "w") as outfile: + subprocess.run(["antiword", doc_path], stdout=outfile, check=True) except: raise AssertionError( "antiword failed or not installed, if not installed," diff --git a/comps/guardrails/pii_detection/utils.py b/comps/guardrails/pii_detection/utils.py index 0766bec70..21f402c2a 100644 --- a/comps/guardrails/pii_detection/utils.py +++ b/comps/guardrails/pii_detection/utils.py @@ -74,7 +74,7 @@ def wrapper(*args, **kwargs): def generate_log_name(file_list): file_set = f"{sorted(file_list)}" # print(f"file_set: {file_set}") - md5_str = hashlib.md5(file_set.encode()).hexdigest() + md5_str = hashlib.md5(file_set.encode(), usedforsecurity=False).hexdigest() return f"status/status_{md5_str}.log" diff --git a/comps/llms/text-generation/ray_serve/README.md b/comps/llms/text-generation/ray_serve/README.md deleted file mode 100644 index ce58f6347..000000000 --- a/comps/llms/text-generation/ray_serve/README.md +++ /dev/null @@ -1,78 +0,0 @@ -# Ray-Serve Endpoint Service - -[Ray](https://docs.ray.io/en/latest/serve/index.html) is an LLM serving solution that makes it easy to deploy and manage a variety of open source LLMs, built on [Ray Serve](https://docs.ray.io/en/latest/serve/index.html), has native support for autoscaling and multi-node deployments, which is easy to use for LLM inference serving on Intel Gaudi2 accelerators. The Intel Gaudi2 accelerator supports both training and inference for deep learning models in particular for LLMs. Please visit [Habana AI products](<(https://habana.ai/products)>) for more details. - -## set up environment variables - -```bash -export HUGGINGFACEHUB_API_TOKEN= -export RAY_Serve_ENDPOINT="http://${your_ip}:8008" -export LLM_MODEL="meta-llama/Llama-2-7b-chat-hf" -``` - -For gated models such as `LLAMA-2`, you will have to pass the environment HUGGINGFACEHUB_API_TOKEN. Please follow this link [huggingface token](https://huggingface.co/docs/hub/security-tokens) to get the access token and export `HUGGINGFACEHUB_API_TOKEN` environment with the token. - -## Set up Ray Serve Service - -### Build docker - -```bash -bash build_docker_rayserve.sh -``` - -### Launch Ray Serve service - -```bash -bash launch_ray_service.sh -``` - -The `launch_vllm_service.sh` script accepts five parameters: - -- port_number: The port number assigned to the Ray Gaudi endpoint, with the default being 8008. -- model_name: The model name utilized for LLM, with the default set to meta-llama/Llama-2-7b-chat-hf. -- chat_processor: The chat processor for handling the prompts, with the default set to 'ChatModelNoFormat', and the optional selection can be 'ChatModelLlama', 'ChatModelGptJ" and "ChatModelGemma'. -- num_cpus_per_worker: The number of CPUs specifies the number of CPUs per worker process. -- num_hpus_per_worker: The number of HPUs specifies the number of HPUs per worker process. - -If you want to customize the port or model_name, can run: - -```bash -bash ./launch_ray_service.sh ${port_number} ${model_name} ${chat_processor} ${num_cpus_per_worker} ${num_hpus_per_worker} -``` - -### Query the service - -And then you can make requests with the OpenAI-compatible APIs like below to check the service status: - -```bash -curl http://${your_ip}:8008/v1/chat/completions \ - -H "Content-Type: application/json" \ - -d '{"model": "Llama-2-7b-chat-hf", "messages": [{"role": "user", "content": "What is Deep Learning?"}], "max_tokens": 32 }' -``` - -For more information about the OpenAI APIs, you can checkeck the [OpenAI official document](https://platform.openai.com/docs/api-reference/). - -## Set up OPEA microservice - -Then we warp the Ray Serve service into OPEA microcervice. - -### Build docker - -```bash -bash build_docker_microservice.sh -``` - -### Launch the microservice - -```bash -bash launch_microservice.sh -``` - -### Query the microservice - -```bash -curl http://${your_ip}:9000/v1/chat/completions \ - -X POST \ - -d '{"query":"What is Deep Learning?","max_new_tokens":17,"top_k":10,"top_p":0.95,"typical_p":0.95,"temperature":0.01,"repetition_penalty":1.03,"streaming":false}' \ - -H 'Content-Type: application/json' -``` diff --git a/comps/llms/text-generation/ray_serve/__init__.py b/comps/llms/text-generation/ray_serve/__init__.py deleted file mode 100644 index 916f3a44b..000000000 --- a/comps/llms/text-generation/ray_serve/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/llms/text-generation/ray_serve/api_openai_backend/__init__.py b/comps/llms/text-generation/ray_serve/api_openai_backend/__init__.py deleted file mode 100644 index 916f3a44b..000000000 --- a/comps/llms/text-generation/ray_serve/api_openai_backend/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/llms/text-generation/ray_serve/api_openai_backend/openai_protocol.py b/comps/llms/text-generation/ray_serve/api_openai_backend/openai_protocol.py deleted file mode 100644 index 7135f2be3..000000000 --- a/comps/llms/text-generation/ray_serve/api_openai_backend/openai_protocol.py +++ /dev/null @@ -1,381 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import time -import uuid -from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Tuple, Type, TypeVar, Union - -import yaml -from pydantic import BaseModel, Field, root_validator - -TModel = TypeVar("TModel", bound="ModelList") -TCompletion = TypeVar("TCompletion", bound="CompletionResponse") -TChatCompletion = TypeVar("TChatCompletion", bound="ChatCompletionResponse") -ModelT = TypeVar("ModelT", bound=BaseModel) - - -class ErrorResponse(BaseModel): - object: str = "error" - message: str - internal_message: str - type: str - param: Dict[str, Any] = {} - code: int - - -class ModelCard(BaseModel): - id: str - object: str = "model" - created: int = Field(default_factory=lambda: int(time.time())) - owned_by: str = "llmonray" - root: Optional[str] = None - parent: Optional[str] = None - - -class ModelList(BaseModel): - object: str = "list" - data: List[ModelCard] = [] - - -class UsageInfo(BaseModel): - prompt_tokens: int - total_tokens: int - completion_tokens: Optional[int] = 0 - - @classmethod - def from_response(cls, response: Union["ModelResponse", Dict[str, Any]]) -> "UsageInfo": - if isinstance(response, BaseModel): - response_dict = response.dict() - else: - response_dict = response - return cls( - prompt_tokens=response_dict["num_input_tokens"] or 0, - completion_tokens=response_dict["num_generated_tokens"] or 0, - total_tokens=(response_dict["num_input_tokens"] or 0) + (response_dict["num_generated_tokens"] or 0), - ) - - -class CompletionResponseChoice(BaseModel): - index: int - text: str - logprobs: Optional[int] = None - finish_reason: Optional[str] - - -class CompletionResponse(BaseModel): - id: str = Field(default_factory=lambda: f"cmpl-{str(uuid.uuid4().hex)}") - object: str = "text_completion" - created: int = Field(default_factory=lambda: int(time.time())) - model: str - choices: List[CompletionResponseChoice] - usage: Optional[UsageInfo] - - -class FunctionCall(BaseModel): - name: str - arguments: Optional[str] = None - - -class ToolCall(BaseModel): - function: FunctionCall - type: Literal["function"] - id: str - - def __str__(self): - return str(self.dict()) - - -class ChatMessage(BaseModel): - role: Literal["system", "assistant", "user", "tool"] - content: Optional[Union[str, list]] = None - tool_calls: Optional[List[ToolCall]] = None - tool_call_id: Optional[str] = None - - def __str__(self): - # if tool_calls is not None, then we are passing a tool message - # using get attr instead of just in case the attribute is deleted off of - # the object - if getattr(self, "tool_calls", None): - return str(self.content) - return str(self.dict()) - - -class ChatCompletionResponseChoice(BaseModel): - index: int - message: ChatMessage - finish_reason: Optional[str] - - -class Function(BaseModel): - name: str - description: Optional[str] = None - parameters: Optional[Dict[str, Any]] = None - - -class ToolChoice(BaseModel): - type: Literal["function"] - function: Function - - -class Tool(BaseModel): - type: Literal["function"] - function: Function - - -class DeltaRole(BaseModel): - role: Literal["system", "assistant", "user"] - - def __str__(self): - return self.role - - -class DeltaEOS(BaseModel): - class Config: - extra = "forbid" - - -class DeltaContent(BaseModel): - content: str - tool_calls: Optional[List[ToolCall]] = None - - def __str__(self): - if self.tool_calls: - return str(self.tool_calls) - else: - return str(self.dict()) - - -class DeltaChoices(BaseModel): - delta: Union[DeltaRole, DeltaContent, DeltaEOS] - index: int - finish_reason: Optional[str] - - -class ChatCompletionResponse(BaseModel): - id: str = Field(default_factory=lambda: f"chatcmpl-{str(uuid.uuid4().hex)}") - object: str - created: int = Field(default_factory=lambda: int(time.time())) - model: str - choices: List[Union[ChatCompletionResponseChoice, DeltaChoices]] - usage: Optional[UsageInfo] - - -class Prompt(BaseModel): - prompt: Union[str, List[ChatMessage]] - use_prompt_format: bool = True - parameters: Optional[Union[Dict[str, Any], BaseModel]] = None - tools: Optional[List[Tool]] = None - tool_choice: Union[Literal["auto", "none"], ToolChoice] = "auto" - - -class BaseModelExtended(BaseModel): - @classmethod - def parse_yaml(cls: Type[ModelT], file, **kwargs) -> ModelT: - kwargs.setdefault("Loader", yaml.SafeLoader) - dict_args = yaml.load(file, **kwargs) - return cls.parse_obj(dict_args) - - def yaml( - self, - *, - stream=None, - include=None, - exclude=None, - by_alias: bool = False, - skip_defaults: Union[bool, None] = None, - exclude_unset: bool = False, - exclude_defaults: bool = False, - exclude_none: bool = False, - **kwargs, - ): - """Generate a YAML representation of the model, `include` and `exclude` - arguments as per `dict()`.""" - return yaml.dump( - self.dict( - include=include, - exclude=exclude, - by_alias=by_alias, - skip_defaults=skip_defaults, - exclude_unset=exclude_unset, - exclude_defaults=exclude_defaults, - exclude_none=exclude_none, - ), - stream=stream, - **kwargs, - ) - - -class ComputedPropertyMixin: - """Include properties in the dict and json representations of the model.""" - - # Replace with pydantic.computed_field once it's available - @classmethod - def get_properties(cls): - return [prop for prop in dir(cls) if isinstance(getattr(cls, prop), property)] - - def dict(self, *args, **kwargs): - self.__dict__.update({prop: getattr(self, prop) for prop in self.get_properties()}) - return super().dict(*args, **kwargs) # type: ignore - - def json( - self, - *args, - **kwargs, - ) -> str: - self.__dict__.update({prop: getattr(self, prop) for prop in self.get_properties()}) - - return super().json(*args, **kwargs) # type: ignore - - -class ModelResponse(ComputedPropertyMixin, BaseModelExtended): - generated_text: Optional[str] = None - tool_calls: Optional[List[ToolCall]] = None - num_input_tokens: Optional[int] = None - num_input_tokens_batch: Optional[int] = None - num_generated_tokens: Optional[int] = None - num_generated_tokens_batch: Optional[int] = None - preprocessing_time: Optional[float] = None - generation_time: Optional[float] = None - timestamp: Optional[float] = Field(default_factory=time.time) - finish_reason: Optional[str] = None - error: Optional[ErrorResponse] = None - - @root_validator(skip_on_failure=True) - def text_or_error_or_finish_reason(cls, values): - if values.get("generated_text") is None and values.get("error") is None and values.get("finish_reason") is None: - raise ValueError("Either 'generated_text' or 'error' or 'finish_reason' must be set") - return values - - @classmethod - def merge_stream(cls, *responses: "ModelResponse") -> "ModelResponse": - """Merge a stream of responses into a single response. - - The generated text is concatenated. Fields are maxed, except for - num_generated_tokens and generation_time, which are summed. - """ - if len(responses) == 1: - return responses[0] - - generated_text = "".join([response.generated_text or "" for response in responses]) - num_input_tokens = [ - response.num_input_tokens for response in responses if response.num_input_tokens is not None - ] - max_num_input_tokens = max(num_input_tokens) if num_input_tokens else None - num_input_tokens_batch = [ - response.num_input_tokens_batch for response in responses if response.num_input_tokens_batch is not None - ] - max_num_input_tokens_batch = max(num_input_tokens_batch) if num_input_tokens_batch else None - num_generated_tokens = [ - response.num_generated_tokens for response in responses if response.num_generated_tokens is not None - ] - total_generated_tokens = sum(num_generated_tokens) if num_generated_tokens else None - num_generated_tokens_batch = [ - response.num_generated_tokens_batch - for response in responses - if response.num_generated_tokens_batch is not None - ] - total_generated_tokens_batch = sum(num_generated_tokens_batch) if num_generated_tokens_batch else None - preprocessing_time = [ - response.preprocessing_time for response in responses if response.preprocessing_time is not None - ] - max_preprocessing_time = max(preprocessing_time) if preprocessing_time else None - generation_time = [response.generation_time for response in responses if response.generation_time is not None] - total_generation_time = sum(generation_time) if generation_time else None - error = next((response.error for response in reversed(responses) if response.error), None) - - return cls( - generated_text=generated_text, - num_input_tokens=max_num_input_tokens, - num_input_tokens_batch=max_num_input_tokens_batch, - num_generated_tokens=total_generated_tokens, - num_generated_tokens_batch=total_generated_tokens_batch, - preprocessing_time=max_preprocessing_time, - generation_time=total_generation_time, - timestamp=responses[-1].timestamp, - finish_reason=responses[-1].finish_reason, - error=error, - ) - - @property - def total_time(self) -> Optional[float]: - if self.generation_time is None and self.preprocessing_time is None: - return None - return (self.preprocessing_time or 0) + (self.generation_time or 0) - - @property - def num_total_tokens(self) -> Optional[float]: - try: - return (self.num_input_tokens or 0) + (self.num_generated_tokens or 0) - except Exception: - return None - - @property - def num_total_tokens_batch(self) -> Optional[float]: - try: - return (self.num_input_tokens_batch or 0) + (self.num_generated_tokens_batch or 0) - except Exception: - return None - - def unpack(self) -> Tuple["ModelResponse", ...]: - return (self,) - - -class CompletionRequest(BaseModel): - model: str - prompt: str - suffix: Optional[str] = None - temperature: Optional[float] = None - top_p: Optional[float] = None - n: int = 1 - max_tokens: Optional[int] = 16 - stop: Optional[List[str]] = None - stream: bool = False - echo: Optional[bool] = False - presence_penalty: Optional[float] = None - frequency_penalty: Optional[float] = None - logprobs: Optional[int] = None - logit_bias: Optional[Dict[str, float]] = None - user: Optional[str] = None - - -class ChatCompletionRequest(BaseModel): - model: str - messages: List[ChatMessage] - temperature: Optional[float] = None - top_p: Optional[float] = None - n: int = 1 - max_tokens: Optional[int] = None - stop: Optional[List[str]] = None - stream: bool = False - presence_penalty: Optional[float] = None - frequency_penalty: Optional[float] = None - logprobs: Optional[int] = None - logit_bias: Optional[Dict[str, float]] = None - user: Optional[str] = None - tools: Optional[List[Tool]] = None - tool_choice: Union[Literal["auto", "none"], ToolChoice] = "auto" - ignore_eos: bool = False # used in vllm engine benchmark - - -class FinishReason(str, Enum): - LENGTH = "length" - STOP = "stop" - ERROR = "error" - CANCELLED = "cancelled" - TOOL_CALLS = "tool_calls" - - def __str__(self) -> str: - return self.value - - @classmethod - def from_vllm_finish_reason(cls, finish_reason: Optional[str]) -> Optional["FinishReason"]: - if finish_reason is None: - return None - if finish_reason == "stop": - return cls.STOP - if finish_reason == "length": - return cls.LENGTH - if finish_reason == "abort": - return cls.CANCELLED - return cls.STOP diff --git a/comps/llms/text-generation/ray_serve/api_openai_backend/query_client.py b/comps/llms/text-generation/ray_serve/api_openai_backend/query_client.py deleted file mode 100644 index 66b0c9d09..000000000 --- a/comps/llms/text-generation/ray_serve/api_openai_backend/query_client.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from typing import Dict - -from fastapi import HTTPException -from ray_serve.api_openai_backend.openai_protocol import ModelCard, Prompt -from ray_serve.api_openai_backend.request_handler import handle_request - - -class RouterQueryClient: - def __init__(self, serve_deployments): - self.serve_deployments = serve_deployments - - async def query(self, model: str, prompt: Prompt, request_id: str, streaming_reponse: bool): - if model in self.serve_deployments: - deploy_handle = self.serve_deployments[model] - else: - raise HTTPException(404, f"Could not find model with id {model}") - - request_config = prompt.parameters - temperature = request_config.get("temperature", 1.0) - top_p = request_config.get("top_p", 1.0) - max_new_tokens = request_config.get("max_tokens", None) - gen_config = {"max_new_tokens": max_new_tokens, "temperature": temperature, "top_p": top_p} - if temperature != 1.0 or top_p != 1.0: - gen_config.update({"do_sample": True}) - if request_config.get("ignore_eos", False): - gen_config.update({"ignore_eos": True}) - - async for x in handle_request( - model=model, - prompt=prompt, - request_id=request_id, - async_iterator=deploy_handle.options(stream=True) - .openai_call.options(stream=True, use_new_handle_api=True) - .remote( - prompt.prompt, - gen_config, - streaming_response=streaming_reponse, - tools=prompt.tools, - tool_choice=prompt.tool_choice, - ), - ): - yield x - - async def model(self, model_id: str) -> ModelCard: - """Get configurations for a supported model.""" - return ModelCard( - id=model_id, - root=model_id, - ) - - async def models(self) -> Dict[str, ModelCard]: - """Get configurations for supported models.""" - metadatas = {} - for model_id in self.serve_deployments: - metadatas[model_id] = await self.model(model_id) - return metadatas diff --git a/comps/llms/text-generation/ray_serve/api_openai_backend/request_handler.py b/comps/llms/text-generation/ray_serve/api_openai_backend/request_handler.py deleted file mode 100644 index 60b4aba9a..000000000 --- a/comps/llms/text-generation/ray_serve/api_openai_backend/request_handler.py +++ /dev/null @@ -1,124 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import asyncio -import traceback -from typing import AsyncIterator, List - -from fastapi import HTTPException, Request, status -from pydantic import ValidationError as PydanticValidationError -from ray_serve.api_openai_backend.openai_protocol import ErrorResponse, FinishReason, ModelResponse, Prompt -from starlette.responses import JSONResponse - - -class OpenAIHTTPException(Exception): - def __init__( - self, - status_code: int, - message: str, - type: str = "Unknown", - ) -> None: - self.status_code = status_code - self.message = message - self.type = type - - -def openai_exception_handler(r: Request, exc: OpenAIHTTPException): - assert isinstance(exc, OpenAIHTTPException), f"Unable to handle invalid exception {type(exc)}" - if exc.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR: - message = "Internal Server Error" - internal_message = message - exc_type = "InternalServerError" - else: - internal_message = extract_message_from_exception(exc) - message = exc.message - exc_type = exc.type - err_response = ModelResponse( - error=ErrorResponse( - message=message, - code=exc.status_code, - internal_message=internal_message, - type=exc_type, - ) - ) - return JSONResponse(content=err_response.dict(), status_code=exc.status_code) - - -def extract_message_from_exception(e: Exception) -> str: - # If the exception is a Ray exception, we need to dig through the text to get just - # the exception message without the stack trace - # This also works for normal exceptions (we will just return everything from - # format_exception_only in that case) - message_lines = traceback.format_exception_only(type(e), e)[-1].strip().split("\n") - message = "" - # The stack trace lines will be prefixed with spaces, so we need to start - # from the bottom and stop at the last line before a line with a space - found_last_line_before_stack_trace = False - for line in reversed(message_lines): - if not line.startswith(" "): - found_last_line_before_stack_trace = True - if found_last_line_before_stack_trace and line.startswith(" "): - break - message = line + "\n" + message - message = message.strip() - return message - - -async def handle_request( - model: str, - request_id: str, - prompt: Prompt, - async_iterator: AsyncIterator[ModelResponse], -): - # Handle errors for an ModelResopnse stream. - model_tags = {"model_id": model} - print("handle_request: ", model_tags) - - responses: List[ModelResponse] = [] - try: - async for response in async_iterator: - responses.append(response) - yield response - except asyncio.CancelledError as e: - # The request is cancelled. Try to return a last Model response, then raise - # We raise here because we don't want to interrupt the cancellation - yield _get_response_for_error(e, request_id=request_id) - raise - except Exception as e: - # Something went wrong. - yield _get_response_for_error(e, request_id=request_id) - # DO NOT RAISE. - # We do not raise here because that would cause a disconnection for streaming. - - -def _get_response_for_error(e, request_id: str): - """Convert an exception to an ModelResponse object.""" - status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - if isinstance(e, HTTPException): - status_code = e.status_code - elif isinstance(e, OpenAIHTTPException): - status_code = e.status_code - elif isinstance(e, PydanticValidationError): - status_code = 400 - else: - # Try to get the status code attribute - status_code = getattr(e, "status_code", status_code) - - if status_code == status.HTTP_500_INTERNAL_SERVER_ERROR: - message = "Internal Server Error" - exc_type = "InternalServerError" - else: - message = extract_message_from_exception(e) - exc_type = e.__class__.__name__ - - message += f" (Request ID: {request_id})" - - return ModelResponse( - error=ErrorResponse( - message=message, - code=status_code, - internal_message=message, - type=exc_type, - ), - finish_reason=FinishReason.ERROR, - ) diff --git a/comps/llms/text-generation/ray_serve/api_openai_backend/router_app.py b/comps/llms/text-generation/ray_serve/api_openai_backend/router_app.py deleted file mode 100644 index 8e6e946ab..000000000 --- a/comps/llms/text-generation/ray_serve/api_openai_backend/router_app.py +++ /dev/null @@ -1,349 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import os -import uuid -from typing import AsyncGenerator, List - -import async_timeout -from fastapi import FastAPI -from fastapi import Response as FastAPIResponse -from fastapi import status -from fastapi.middleware.cors import CORSMiddleware -from ray_serve.api_openai_backend.openai_protocol import ( - ChatCompletionRequest, - ChatCompletionResponse, - ChatCompletionResponseChoice, - ChatMessage, - CompletionRequest, - CompletionResponse, - CompletionResponseChoice, - DeltaChoices, - DeltaContent, - DeltaEOS, - DeltaRole, - ModelCard, - ModelList, - ModelResponse, - Prompt, - UsageInfo, -) -from ray_serve.api_openai_backend.query_client import RouterQueryClient -from ray_serve.api_openai_backend.request_handler import OpenAIHTTPException, openai_exception_handler -from starlette.responses import Response, StreamingResponse - -# timeout in 10 minutes. Streaming can take longer than 3 min -TIMEOUT = float(os.environ.get("ROUTER_HTTP_TIMEOUT", 1800)) - - -def init() -> FastAPI: - router_app = FastAPI() - router_app.add_exception_handler(OpenAIHTTPException, openai_exception_handler) - router_app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - return router_app - - -router_app = init() - - -async def _completions_wrapper( - completion_id: str, - body: CompletionRequest, - response: Response, - generator: AsyncGenerator[ModelResponse, None], -) -> AsyncGenerator[str, None]: - had_error = False - async with async_timeout.timeout(TIMEOUT): - all_results = [] - async for results in generator: - for subresult in results.unpack(): - all_results.append(subresult) - subresult_dict = subresult.dict() - if subresult_dict.get("error"): - response.status_code = subresult_dict["error"]["code"] - # Drop finish reason as OpenAI doesn't expect it - # for errors in streaming - subresult_dict["finish_reason"] = None - all_results.pop() - had_error = True - yield "data: " + ModelResponse(**subresult_dict).json() + "\n\n" - # Return early in case of an error - break - choices = [ - CompletionResponseChoice( - index=0, - text=subresult_dict["generated_text"] or "", - finish_reason=subresult_dict["finish_reason"], - ) - ] - usage = None - yield "data: " + CompletionResponse( - id=completion_id, - object="text_completion", - model=body.model, - choices=choices, - usage=usage, - ).json() + "\n\n" - if had_error: - # Return early in case of an error - break - if not had_error: - usage = UsageInfo.from_response(ModelResponse.merge_stream(*all_results)) if all_results else None - yield "data: " + CompletionResponse( - id=completion_id, - object="text_completion", - model=body.model, - choices=choices, - usage=usage, - ).json() + "\n\n" - yield "data: [DONE]\n\n" - - -async def _chat_completions_wrapper( - completion_id: str, - body: ChatCompletionRequest, - response: Response, - generator: AsyncGenerator[ModelResponse, None], -) -> AsyncGenerator[str, None]: - had_error = False - async with async_timeout.timeout(TIMEOUT): - finish_reason = None - choices: List[DeltaChoices] = [ - DeltaChoices( - delta=DeltaRole(role="assistant"), - index=0, - finish_reason=None, - ) - ] - chunk = ChatCompletionResponse( - id=completion_id, - object="chat.completion.chunk", - model=body.model, - choices=choices, - usage=None, - ) - data = chunk.json() - yield f"data: {data}\n\n" - - all_results = [] - async for results in generator: - for subresult in results.unpack(): - all_results.append(subresult) - subresult_dict = subresult.dict() - if subresult_dict.get("error"): - response.status_code = subresult_dict["error"]["code"] - # Drop finish reason as OpenAI doesn't expect it - # for errors in streaming - subresult_dict["finish_reason"] = None - all_results.pop() - had_error = True - yield "data: " + ModelResponse(**subresult_dict).json() + "\n\n" - # Return early in case of an error - break - else: - finish_reason = subresult_dict["finish_reason"] - choices = [ - DeltaChoices( - delta=DeltaContent( - content=subresult_dict["generated_text"] or "", - tool_calls=subresult_dict["tool_calls"] or None, - ), - index=0, - finish_reason=None, - ) - ] - chunk = ChatCompletionResponse( - id=completion_id, - object="chat.completion.chunk", - model=body.model, - choices=choices, - usage=None, - ) - # data = chunk.json(exclude_unset=True, ensure_ascii=False) - data = chunk.json() - yield f"data: {data}\n\n" - if had_error: - # Return early in case of an error - break - if not had_error: - choices = [ - DeltaChoices( - delta=DeltaEOS(), - index=0, - finish_reason=finish_reason, - ) - ] - usage = UsageInfo.from_response(ModelResponse.merge_stream(*all_results)) if all_results else None - chunk = ChatCompletionResponse( - id=completion_id, - object="chat.completion.result", - model=body.model, - choices=choices, - usage=usage, - ) - data = chunk.json() - yield f"data: {data}\n\n" - yield "data: [DONE]\n\n" - - -class Router: - def __init__( - self, - query_client: RouterQueryClient, - ) -> None: - self.query_client = query_client - - @router_app.get("/v1/models", response_model=ModelList) - async def models(self) -> ModelList: - """OpenAI API-compliant endpoint to get all models.""" - models = await self.query_client.models() - return ModelList(data=list(models.values())) - - # :path allows us to have slashes in the model name - @router_app.get("/v1/models/{model:path}", response_model=ModelCard) - async def model_data(self, model: str) -> ModelCard: - """OpenAI API-compliant endpoint to get one model. - - :param model: The model ID (e.g. "amazon/LightGPT") - """ - model = model.replace("--", "/") - model_data = await self.query_client.model(model) - if model_data is None: - raise OpenAIHTTPException( - message=f"Invalid model '{model}'", - status_code=status.HTTP_400_BAD_REQUEST, - type="InvalidModel", - ) - return model_data - - @router_app.post("/v1/completions") - async def completions( - self, - body: CompletionRequest, - response: FastAPIResponse, - ): - """Given a prompt, the model will return one or more predicted completions, - and can also return the probabilities of alternative tokens at each position. - - Returns: - A response object with completions. - """ - prompt = Prompt( - prompt=body.prompt, - parameters=dict(body), - use_prompt_format=False, - ) - request_id = f"cmpl-{str(uuid.uuid4().hex)}" - - if body.stream: - return StreamingResponse( - _completions_wrapper( - request_id, - body, - response, - self.query_client.query(body.model, prompt, request_id, body.stream), - ), - media_type="text/event-stream", - ) - else: - async with async_timeout.timeout(TIMEOUT): - results_reponse = self.query_client.query(body.model, prompt, request_id, body.stream) - async for results in results_reponse: - if results.error: - raise OpenAIHTTPException( - message=results.error.message, - status_code=results.error.code, - type=results.error.type, - ) - results = results.dict() - - choices = [ - CompletionResponseChoice( - index=0, - text=results["generated_text"] or "", - finish_reason=results["finish_reason"], - ) - ] - usage = UsageInfo.from_response(results) - - return CompletionResponse( - id=request_id, - object="text_completion", - model=body.model, - choices=choices, - usage=usage, - ) - - @router_app.post("/v1/chat/completions") - async def chat( - self, - body: ChatCompletionRequest, - response: FastAPIResponse, - ): - """Given a prompt, the model will return one or more predicted completions, - and can also return the probabilities of alternative tokens at each position. - - Returns: - A response object with completions. - """ - prompt = Prompt( - prompt=body.messages, - parameters=dict(body), - tools=body.tools, - tool_choice=body.tool_choice, - ) - request_id = f"chatcmpl-{str(uuid.uuid4().hex)}" - if body.stream: - return StreamingResponse( - _chat_completions_wrapper( - request_id, - body, - response, - self.query_client.query(body.model, prompt, request_id, body.stream), - ), - media_type="text/event-stream", - ) - else: - async with async_timeout.timeout(TIMEOUT): - results_reponse = self.query_client.query(body.model, prompt, request_id, body.stream) - async for results in results_reponse: - if results.error: - raise OpenAIHTTPException( - message=results.error.message, - status_code=results.error.code, - type=results.error.type, - ) - - if results.tool_calls is not None: - msg = ChatMessage(role="assistant", tool_calls=results.tool_calls) - # deleting this fields so that they don't appear in the response - del msg.tool_call_id - else: - msg = ChatMessage(role="assistant", content=results.generated_text or "") - - usage = UsageInfo.from_response(results.dict()) - return ChatCompletionResponse( - id=request_id, - object="chat.completion", - model=body.model, - choices=[ - ChatCompletionResponseChoice( - index=0, - message=msg, - finish_reason=results.finish_reason, - ) - ], - usage=usage, - ) - - @router_app.get("/v1/health_check") - async def health_check(self) -> bool: - """Check if the routher is still running.""" - return True diff --git a/comps/llms/text-generation/ray_serve/api_openai_backend/tools.py b/comps/llms/text-generation/ray_serve/api_openai_backend/tools.py deleted file mode 100644 index e815eb146..000000000 --- a/comps/llms/text-generation/ray_serve/api_openai_backend/tools.py +++ /dev/null @@ -1,180 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import json -import os -import re -from enum import Enum -from typing import List, Union - -import jinja2 -from ray_serve.api_openai_backend.openai_protocol import ChatMessage, FunctionCall, ToolCall - - -class ToolsCallsTemplateContext(Enum): - """This is used within the template to generate depending on the context.""" - - CALL_TOKEN = 1 - FUNCTIONS_LIST = 2 - FORCE_CALL = 3 - CALLS_NOTIF = 4 - TOOL_RESPONSE = 5 - - -class ToolsCallsTemplate: - def __init__(self, template_path=None): - self.trim_blocks = True - self.lstrip_blocks = True - if template_path is None: - template_path = os.path.dirname(__file__) + "/templates/tools_functions.jinja" - self.environment = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.dirname(template_path))) - self.template = self.environment.get_template(os.path.basename(template_path)) - self.template.globals["FUNCTIONS_LIST"] = ToolsCallsTemplateContext.FUNCTIONS_LIST - self.template.globals["FORCE_CALL"] = ToolsCallsTemplateContext.FORCE_CALL - self.template.globals["CALL_TOKEN"] = ToolsCallsTemplateContext.CALL_TOKEN - self.template.globals["CALLS_NOTIF"] = ToolsCallsTemplateContext.CALLS_NOTIF - self.template.globals["TOOL_RESPONSE"] = ToolsCallsTemplateContext.TOOL_RESPONSE - - def get_func_call_token(self) -> str: - """Return the special token used to find functions calls.""" - return self.template.render(CONTEXT=ToolsCallsTemplateContext.CALL_TOKEN) - - def render_toolcalls(self, tool_calls: List[ToolCall]): - return self.template.render(CONTEXT=ToolsCallsTemplateContext.CALLS_NOTIF, tool_calls=tool_calls) - - def render_toolmessage(self, message: ChatMessage): - return self.template.render(CONTEXT=ToolsCallsTemplateContext.TOOL_RESPONSE, message=message) - - def render_toolslist(self, tool_choice: Union[str, None], tools_list) -> str: - if isinstance(tool_choice, str) and tool_choice == "auto": - tool_choice = None - if tool_choice is not None: - for tool in tools_list: - # Search if the tool_choice is in the tools_list - if tool.type == "function" and tool.function.name == tool_choice: - return self.template.render(CONTEXT=ToolsCallsTemplateContext.FORCE_CALL, tool=tool) - return "" - else: - return self.template.render(CONTEXT=ToolsCallsTemplateContext.FUNCTIONS_LIST, tools_list=tools_list) - - -class OpenAIToolsPrompter: - """ - https://platform.openai.com/docs/assistants/tools - """ - - def __init__(self, template_path=None): - self.template = ToolsCallsTemplate(template_path) - self.call_token_str = self.template.get_func_call_token() - if self.call_token_str is None: - raise ValueError("There is something wrong with the tools template.") - else: - self.call_token_pre = self.call_token_str[0] - - def func_call_token_pre(self) -> str: - return self.call_token_pre - - def func_call_token_size(self) -> int: - return len(self.call_token_str) - - def func_call_token(self) -> str: - return self.call_token_str - - def content_from_assistant(self, message: ChatMessage) -> str: - text = self.template.render_toolcalls(message.tool_calls) - if message.content is None: - return text - else: - return message.content + "\n" + text - - def content_from_tool(self, message: ChatMessage) -> str: - return self.template.render_toolmessage(message) - - def inject_prompt(self, request, tools, tool_choice): - """Generate and inject the prompt for tools calls.""" - if tools is not None and self.call_token_str is not None and len(tools): - select_tool_choice = tool_choice if (tool_choice is not None and tool_choice != "auto") else None - text_inject = self.template.render_toolslist(tool_choice=select_tool_choice, tools_list=tools) - if request[-1].role == "user": - request[-1].content = text_inject + "\n The following is User Question: \n" + request[-1].content - return request - - -class ChatPromptCapture: - def __init__(self): - self.content: str = "" - self.func_call_content: str = "" - self.func_start_pos: int = -1 - self.print_end_pos: int = 0 - self.calls_list = [] - self.call_indx = 0 - - def reset(self): - self.content: str = "" - self.func_call_content: str = "" - self.func_start_pos: int = -1 - self.print_end_pos: int = 0 - self.calls_list = [] - - def make_calls_list(self, call_id: int, func_call_content): - if func_call_content is None: - return - try: - call_dict = json.loads(func_call_content) - call_dict["arguments"] = json.dumps(call_dict["arguments"]) - self.calls_list.append(ToolCall(id=f"call_{call_id}", type="function", function=FunctionCall(**call_dict))) - except Exception: - pass - - def process_full_output(self, output: str, openai_tools_prompter: OpenAIToolsPrompter, original_prompts): - ret_output = "" - # FIXME: for some model, prompt will be append along with answer, need to remove - start_pos = sum([len(prompt) for prompt in original_prompts]) - 6 - - if openai_tools_prompter.func_call_token() in output[start_pos:]: # we found func_call - if is_func := re.findall("(\{(.*)\})", output[start_pos:]): - for idx, found in enumerate(is_func): - func_call_content = found[0] - c1 = func_call_content.count("{") - c2 = func_call_content.count("}") - if c1 == c2: # We have the complete call block - func_call_content = found[0] - self.make_calls_list(idx, func_call_content) - else: - ret_output = output[start_pos:] - - return ret_output, self.calls_list - - def process_stream_output(self, output: str, openai_tools_prompter: OpenAIToolsPrompter): - ret_output = "" - self.content += output - - # scenario 1: not reach the length for identifying a func call. - if len(self.content) < openai_tools_prompter.func_call_token_size(): - # wait for possible function call - return ret_output, self.calls_list - - # scenario 2: reach the length for identifying if a func call. - if self.func_start_pos == -1: - if openai_tools_prompter.func_call_token() in self.content: # we found func_call - self.func_start_pos = self.content.index(openai_tools_prompter.func_call_token()) - return ret_output, self.calls_list - else: # unhold self.content - print_start_pos = self.print_end_pos - self.print_end_pos = len(self.content) - ret_output = self.content[print_start_pos : self.print_end_pos] - return ret_output, self.calls_list - - # scenario 3: wait until we can extract the function call - calls_list = [] - if is_func := re.findall("(\{(.*)\})", self.content): - for idx, found in enumerate(is_func): - func_call_content = found[0] - c1 = func_call_content.count("{") - c2 = func_call_content.count("}") - if c1 == c2: # We have the complete call block - self.make_calls_list(self.call_indx, func_call_content) - calls_list = self.calls_list - self.call_indx += 1 - self.reset() - return ret_output, calls_list diff --git a/comps/llms/text-generation/ray_serve/api_server_openai.py b/comps/llms/text-generation/ray_serve/api_server_openai.py deleted file mode 100644 index 002b3a633..000000000 --- a/comps/llms/text-generation/ray_serve/api_server_openai.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import os -import re -import sys -from typing import Any, Dict - -import ray -from easydict import EasyDict as edict -from ray import serve -from ray_serve.api_openai_backend.query_client import RouterQueryClient -from ray_serve.api_openai_backend.router_app import Router, router_app -from ray_serve.serve import LLMServe - - -def router_application(deployments, max_concurrent_queries): - """Create a Router Deployment. - - Router Deployment will point to a Serve Deployment for each specified base model, - and have a client to query each one. - """ - merged_client = RouterQueryClient(deployments) - - RouterDeployment = serve.deployment( - route_prefix="/", - max_concurrent_queries=max_concurrent_queries, # Maximum backlog for a single replica - )(serve.ingress(router_app)(Router)) - - return RouterDeployment.bind(merged_client) - - -def openai_serve_run(deployments, host, route_prefix, port, max_concurrent_queries): - router_app = router_application(deployments, max_concurrent_queries) - - serve.start(http_options={"host": host, "port": port}) - serve.run( - router_app, - name="router", - route_prefix=route_prefix, - ).options( - stream=True, - use_new_handle_api=True, - ) - deployment_address = f"http://{host}:{port}{route_prefix}" - print(f"Deployment is ready at `{deployment_address}`.") - return deployment_address - - -def get_deployment_actor_options(hpus_per_worker, ipex_enabled=False): - _ray_env_key = "env_vars" - # OMP_NUM_THREADS will be set by num_cpus, so not set in env - _predictor_runtime_env_ipex = { - "KMP_BLOCKTIME": "1", - "KMP_SETTINGS": "1", - "KMP_AFFINITY": "granularity=fine,compact,1,0", - "MALLOC_CONF": "oversize_threshold:1,background_thread:true,\ - metadata_thp:auto,dirty_decay_ms:9000000000,muzzy_decay_ms:9000000000", - } - runtime_env: Dict[str, Any] = {_ray_env_key: {}} - if ipex_enabled: - runtime_env[_ray_env_key].update(_predictor_runtime_env_ipex) - ray_actor_options: Dict[str, Any] = {"runtime_env": runtime_env} - ray_actor_options["resources"] = {"HPU": hpus_per_worker} - - return ray_actor_options - - -def main(argv=None): - import argparse - - parser = argparse.ArgumentParser(description="Serve LLM models with Ray Serve.", add_help=True) - parser.add_argument("--port_number", default=8080, type=int, help="Port number to serve on.") - parser.add_argument( - "--model_id_or_path", default="meta-llama/Llama-2-7b-chat-hf", type=str, help="Model id or path." - ) - parser.add_argument( - "--chat_processor", default="ChatModelNoFormat", type=str, help="Chat processor for aligning the prompts." - ) - parser.add_argument("--max_num_seqs", default=256, type=int, help="Maximum number of sequences to generate.") - parser.add_argument("--max_batch_size", default=8, type=int, help="Maximum batch size.") - parser.add_argument("--num_replicas", default=1, type=int, help="Number of replicas to start.") - parser.add_argument("--num_cpus_per_worker", default=8, type=int, help="Number of CPUs per worker.") - parser.add_argument("--num_hpus_per_worker", default=1, type=int, help="Number of HPUs per worker.") - - if len(sys.argv) == 1: - parser.print_help(sys.stderr) - sys.exit(1) - - args = parser.parse_args(argv) - - ray.init(address="auto") - - host_port = os.environ.get("RAY_Serve_ENDPOINT", "http://0.0.0.0:8080") - host = re.search(r"([\d\.]+)", host_port).group(1) - port = args.port_number - model_name = args.model_id_or_path.split("/")[-1] if args.model_id_or_path else "" - route_prefix = "/" - - infer_conf = {} - infer_conf["use_auth_token"] = os.environ.get("HF_TOKEN", None) - infer_conf["trust_remote_code"] = os.environ.get("TRUST_REMOTE_CODE", None) - infer_conf["model_id_or_path"] = args.model_id_or_path - infer_conf["chat_processor"] = args.chat_processor - infer_conf["max_batch_size"] = args.max_batch_size - infer_conf["max_num_seqs"] = args.max_num_seqs - infer_conf["num_replicas"] = args.num_replicas - infer_conf["num_cpus_per_worker"] = args.num_cpus_per_worker - infer_conf["num_hpus_per_worker"] = args.num_hpus_per_worker - infer_conf["max_concurrent_queries"] = int(os.environ.get("MAX_CONCURRENT_QUERIES", 100)) - infer_conf = edict(infer_conf) - - print(f"infer_conf: {infer_conf}") - - deployment = {} - ray_actor_options = get_deployment_actor_options(infer_conf["num_hpus_per_worker"]) - deployment[model_name] = LLMServe.options( - num_replicas=infer_conf["num_replicas"], - ray_actor_options=ray_actor_options, - max_concurrent_queries=infer_conf["max_concurrent_queries"], - ).bind(infer_conf, infer_conf["max_num_seqs"], infer_conf["max_batch_size"]) - deployment = edict(deployment) - openai_serve_run(deployment, host, route_prefix, port, infer_conf["max_concurrent_queries"]) - # input("Service is deployed successfully.") - while 1: - pass - - -if __name__ == "__main__": - main(sys.argv[1:]) diff --git a/comps/llms/text-generation/ray_serve/build_docker_microservice.sh b/comps/llms/text-generation/ray_serve/build_docker_microservice.sh deleted file mode 100644 index f317f89d4..000000000 --- a/comps/llms/text-generation/ray_serve/build_docker_microservice.sh +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -cd ../../../../ -docker build \ - -t opea/llm-ray:latest \ - --build-arg https_proxy=$https_proxy \ - --build-arg http_proxy=$http_proxy \ - -f comps/llms/text-generation/ray_serve/docker/Dockerfile.microservice . diff --git a/comps/llms/text-generation/ray_serve/build_docker_rayserve.sh b/comps/llms/text-generation/ray_serve/build_docker_rayserve.sh deleted file mode 100755 index 4ea462722..000000000 --- a/comps/llms/text-generation/ray_serve/build_docker_rayserve.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash - - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -cd ../../../../ - -docker build \ - -f comps/llms/text-generation/ray_serve/docker/Dockerfile.rayserve \ - -t ray_serve:habana \ - --network=host \ - --build-arg http_proxy=${http_proxy} \ - --build-arg https_proxy=${https_proxy} \ - --build-arg no_proxy=${no_proxy} . diff --git a/comps/llms/text-generation/ray_serve/docker/Dockerfile.microservice b/comps/llms/text-generation/ray_serve/docker/Dockerfile.microservice deleted file mode 100644 index 39a50ac8a..000000000 --- a/comps/llms/text-generation/ray_serve/docker/Dockerfile.microservice +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (c) 2024 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -FROM langchain/langchain:latest - -RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ - libgl1-mesa-glx \ - libjemalloc-dev \ - vim - -RUN useradd -m -s /bin/bash user && \ - mkdir -p /home/user && \ - chown -R user /home/user/ - -USER user - -COPY comps /home/user/comps - -RUN pip install --no-cache-dir --upgrade pip && \ - pip install --no-cache-dir -r /home/user/comps/llms/text-generation/ray_serve/requirements.txt - -ENV PYTHONPATH=$PYTHONPATH:/home/user - -WORKDIR /home/user/comps/llms/text-generation/ray_serve - -ENTRYPOINT ["bash", "entrypoint.sh"] diff --git a/comps/llms/text-generation/ray_serve/docker/Dockerfile.rayserve b/comps/llms/text-generation/ray_serve/docker/Dockerfile.rayserve deleted file mode 100644 index 220acc237..000000000 --- a/comps/llms/text-generation/ray_serve/docker/Dockerfile.rayserve +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -# FROM vault.habana.ai/gaudi-docker/1.15.1/ubuntu22.04/habanalabs/pytorch-installer-2.2.0:latest -FROM vault.habana.ai/gaudi-docker/1.16.0/ubuntu22.04/habanalabs/pytorch-installer-2.2.2:latest - -ENV LANG=en_US.UTF-8 - -WORKDIR /root/ray_serve - -# copy the source code to the package directory -COPY comps/llms/text-generation/ray_serve/ /root/ray_serve - -RUN pip install -r /root/ray_serve/docker/requirements.txt && \ - pip install --upgrade-strategy eager optimum[habana] - -RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config && \ - service ssh restart - -ENV no_proxy=localhost,127.0.0.1 -ENV PYTHONPATH=$PYTHONPATH:/root:/root/ray_serve - -# Required by DeepSpeed -ENV RAY_EXPERIMENTAL_NOSET_HABANA_VISIBLE_MODULES=1 - -ENV PT_HPU_LAZY_ACC_PAR_MODE=0 - -ENV PT_HPU_ENABLE_REFINE_DYNAMIC_SHAPES=0 - -ENV PT_HPU_ENABLE_WEIGHT_CPU_PERMUTE=0 - -ENV PT_HPU_ENABLE_LAZY_COLLECTIVES=true \ No newline at end of file diff --git a/comps/llms/text-generation/ray_serve/docker/requirements.txt b/comps/llms/text-generation/ray_serve/docker/requirements.txt deleted file mode 100644 index da4e88e13..000000000 --- a/comps/llms/text-generation/ray_serve/docker/requirements.txt +++ /dev/null @@ -1,10 +0,0 @@ - -async_timeout -easydict -numpy -py-cpuinfo -pydantic-yaml -ray>=2.10 -ray[serve,tune]>=2.10 -typer -typing>=3.7.4.3 diff --git a/comps/llms/text-generation/ray_serve/docker_compose_llm.yaml b/comps/llms/text-generation/ray_serve/docker_compose_llm.yaml deleted file mode 100644 index 0bfed637d..000000000 --- a/comps/llms/text-generation/ray_serve/docker_compose_llm.yaml +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (c) 2024 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -version: "3.8" - -services: - ray-service: - image: ray_serve:habana - container_name: ray-gaudi-server - ports: - - "8008:80" - volumes: - - "./data:/data" - environment: - no_proxy: ${no_proxy} - http_proxy: ${http_proxy} - https_proxy: ${https_proxy} - HF_TOKEN: ${HUGGINGFACEHUB_API_TOKEN} - HABANA_VISIBLE_DEVICES: all - OMPI_MCA_btl_vader_single_copy_mechanism: none - LLM_MODEL: ${LLM_MODEL} - TRUST_REMOTE_CODE: True - runtime: habana - cap_add: - - SYS_NICE - ipc: host - command: /bin/bash -c "ray start --head && python api_server_openai.py --port_number 80 --model_id_or_path $LLM_MODEL --chat_processor ChatModelLlama --num_cpus_per_worker 8 --num_hpus_per_worker 1" - llm: - image: opea/llm-ray:latest - container_name: llm-ray-gaudi-server - depends_on: - - ray-service - ports: - - "9000:9000" - ipc: host - environment: - no_proxy: ${no_proxy} - http_proxy: ${http_proxy} - https_proxy: ${https_proxy} - RAY_Serve_ENDPOINT: ${RAY_Serve_ENDPOINT} - HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN} - LLM_MODEL: ${LLM_MODEL} - restart: unless-stopped - -networks: - default: - driver: bridge diff --git a/comps/llms/text-generation/ray_serve/entrypoint.sh b/comps/llms/text-generation/ray_serve/entrypoint.sh deleted file mode 100644 index d60eddd36..000000000 --- a/comps/llms/text-generation/ray_serve/entrypoint.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env bash - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -pip --no-cache-dir install -r requirements-runtime.txt - -python llm.py diff --git a/comps/llms/text-generation/ray_serve/launch_microservice.sh b/comps/llms/text-generation/ray_serve/launch_microservice.sh deleted file mode 100644 index 5037fe62a..000000000 --- a/comps/llms/text-generation/ray_serve/launch_microservice.sh +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -docker run -d --rm \ - --name="llm-ray-server" \ - -p 9000:9000 \ - --ipc=host \ - -e http_proxy=$http_proxy \ - -e https_proxy=$https_proxy \ - -e RAY_Serve_ENDPOINT=$RAY_Serve_ENDPOINT \ - -e HUGGINGFACEHUB_API_TOKEN=$HUGGINGFACEHUB_API_TOKEN \ - -e LLM_MODEL=$LLM_MODEL \ - opea/llm-ray:latest diff --git a/comps/llms/text-generation/ray_serve/launch_ray_service.sh b/comps/llms/text-generation/ray_serve/launch_ray_service.sh deleted file mode 100755 index 7a98acf07..000000000 --- a/comps/llms/text-generation/ray_serve/launch_ray_service.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash - - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -# Set default values -default_port=8008 -default_model=${LLM_MODEL} -default_chat_processor="ChatModelLlama" -default_num_cpus_per_worker=8 -default_num_hpus_per_worker=1 - -# Assign arguments to variables -port_number=${1:-$default_port} -model_name=${2:-$default_model} -chat_processor=${3:-$default_chat_processor} -num_cpus_per_worker=${4:-$default_num_cpus_per_worker} -num_hpus_per_worker=${5:-$default_num_hpus_per_worker} - -# Check if all required arguments are provided -if [ "$#" -lt 0 ] || [ "$#" -gt 5 ]; then - echo "Usage: $0 [port_number] [model_name] [chat_processor] [num_cpus_per_worker] [num_hpus_per_worker]" - echo "Please customize the arguments you want to use. - - port_number: The port number assigned to the Ray Gaudi endpoint, with the default being 8080. - - model_name: The model name utilized for LLM, with the default set to meta-llama/Llama-2-7b-chat-hf. - - chat_processor: The chat processor for handling the prompts, with the default set to 'ChatModelNoFormat', and the optional selection can be 'ChatModelLlama', 'ChatModelGptJ" and "ChatModelGemma'. - - num_cpus_per_worker: The number of CPUs specifies the number of CPUs per worker process. - - num_hpus_per_worker: The number of HPUs specifies the number of HPUs per worker process." - exit 1 -fi - -# Build the Docker run command based on the number of cards -docker run -d --rm \ - --runtime=habana \ - --name="ray-service" \ - -v $PWD/data:/data \ - -e HABANA_VISIBLE_DEVICES=all \ - -e OMPI_MCA_btl_vader_single_copy_mechanism=none \ - --cap-add=sys_nice \ - --ipc=host \ - -p $port_number:80 \ - -e HF_TOKEN=$HUGGINGFACEHUB_API_TOKEN \ - -e TRUST_REMOTE_CODE=True \ - ray_serve:habana \ - /bin/bash -c "ray start --head && python api_server_openai.py --port_number 80 --model_id_or_path $model_name --chat_processor $chat_processor --num_cpus_per_worker $num_cpus_per_worker --num_hpus_per_worker $num_hpus_per_worker" diff --git a/comps/llms/text-generation/ray_serve/llm.py b/comps/llms/text-generation/ray_serve/llm.py deleted file mode 100644 index 5dad1fdd0..000000000 --- a/comps/llms/text-generation/ray_serve/llm.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) 2024 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - -from fastapi.responses import StreamingResponse -from langchain_openai import ChatOpenAI -from langsmith import traceable - -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice - - -@traceable(run_type="tool") -def post_process_text(text: str): - if text == " ": - return "data: @#$\n\n" - if text == "\n": - return "data:
\n\n" - if text.isspace(): - return None - new_text = text.replace(" ", "@#$") - return f"data: {new_text}\n\n" - - -@register_microservice( - name="opea_service@llm_ray", - service_type=ServiceType.LLM, - endpoint="/v1/chat/completions", - host="0.0.0.0", - port=9000, -) -@traceable(run_type="llm") -def llm_generate(input: LLMParamsDoc): - llm_endpoint = os.getenv("RAY_Serve_ENDPOINT", "http://localhost:8080") - llm_model = os.getenv("LLM_MODEL", "Llama-2-7b-chat-hf") - if "/" in llm_model: - llm_model = llm_model.split("/")[-1] - llm = ChatOpenAI( - openai_api_base=llm_endpoint + "/v1", - model_name=llm_model, - openai_api_key=os.getenv("OPENAI_API_KEY", "not_needed"), - max_tokens=input.max_new_tokens, - temperature=input.temperature, - streaming=input.streaming, - request_timeout=600, - ) - - if input.streaming: - - async def stream_generator(): - chat_response = "" - async for text in llm.astream(input.query): - text = text.content - chat_response += text - processed_text = post_process_text(text) - if text and processed_text: - if "" in text: - res = text.split("")[0] - if res != "": - yield res - break - yield processed_text - print(f"[llm - chat_stream] stream response: {chat_response}") - yield "data: [DONE]\n\n" - - return StreamingResponse(stream_generator(), media_type="text/event-stream") - else: - response = llm.invoke(input.query) - response = response.content - return GeneratedDoc(text=response, prompt=input.query) - - -if __name__ == "__main__": - opea_microservices["opea_service@llm_ray"].start() diff --git a/comps/llms/text-generation/ray_serve/requirements-runtime.txt b/comps/llms/text-generation/ray_serve/requirements-runtime.txt deleted file mode 100644 index 225adde27..000000000 --- a/comps/llms/text-generation/ray_serve/requirements-runtime.txt +++ /dev/null @@ -1 +0,0 @@ -langserve diff --git a/comps/llms/text-generation/ray_serve/requirements.txt b/comps/llms/text-generation/ray_serve/requirements.txt deleted file mode 100644 index 2f8b2ff4e..000000000 --- a/comps/llms/text-generation/ray_serve/requirements.txt +++ /dev/null @@ -1,14 +0,0 @@ -docarray[full] -fastapi -huggingface_hub -langchain==0.1.16 -langchain_openai -langsmith -openai -opentelemetry-api -opentelemetry-exporter-otlp -opentelemetry-sdk -prometheus-fastapi-instrumentator -ray[serve]>=2.10 -shortuuid -transformers diff --git a/comps/llms/text-generation/ray_serve/serve.py b/comps/llms/text-generation/ray_serve/serve.py deleted file mode 100644 index 5c33dde00..000000000 --- a/comps/llms/text-generation/ray_serve/serve.py +++ /dev/null @@ -1,569 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import asyncio -import functools -import os -import re -from enum import Enum -from queue import Empty -from typing import Any, AsyncGenerator, Dict, List, Literal, Optional, Tuple, Union - -import ray -import torch -from fastapi import HTTPException -from pydantic import BaseModel -from ray import serve -from ray_serve.api_openai_backend.openai_protocol import ChatMessage, ErrorResponse, ModelResponse -from ray_serve.api_openai_backend.tools import ChatPromptCapture, OpenAIToolsPrompter -from starlette.requests import Request -from starlette.responses import JSONResponse, StreamingResponse -from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer - -DEVICE_CPU = "cpu" -DEVICE_HPU = "hpu" - - -def load_tokenizer(model, tokenizer_name_or_path): - tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path) - if not model.config.is_encoder_decoder: - tokenizer.padding_side = "left" - # Some models like GPT2 do not have a PAD token so we have to set it if necessary - if model.config.model_type == "llama": - # unwind broken decapoda-research config - model.generation_config.pad_token_id = 0 - model.generation_config.bos_token_id = 1 - model.generation_config.eos_token_id = 2 - tokenizer.bos_token_id = model.generation_config.bos_token_id - tokenizer.eos_token_id = model.generation_config.eos_token_id - tokenizer.pad_token_id = model.generation_config.pad_token_id - tokenizer.pad_token = tokenizer.decode(tokenizer.pad_token_id) - tokenizer.eos_token = tokenizer.decode(tokenizer.eos_token_id) - tokenizer.bos_token = tokenizer.decode(tokenizer.bos_token_id) - - if tokenizer.pad_token is None: - tokenizer.pad_token = tokenizer.eos_token - model.generation_config.pad_token_id = model.generation_config.eos_token_id - - return tokenizer - - -class PromptFormat(Enum): - CHAT_FORMAT = 1 - PROMPTS_FORMAT = 2 - INVALID_FORMAT = 3 - - -def get_prompt_format(input: Union[List[str], List[dict], List[ChatMessage]]): - chat_format = True - prompts_format = True - for item in input: - if isinstance(item, str) or isinstance(item, list): - chat_format = False - elif isinstance(item, dict) or isinstance(item, ChatMessage): - prompts_format = False - else: - chat_format = False - prompts_format = False - break - if chat_format: - return PromptFormat.CHAT_FORMAT - if prompts_format: - return PromptFormat.PROMPTS_FORMAT - return PromptFormat.INVALID_FORMAT - - -class ChatModel: - human_id = "" - bot_id = "" - unknown_id = "" - MEANINGLESS_WORDS = ["", "", "<|endoftext|>", "
"] - stop_words = [""] - - def __init__(self, intro, human_id, bot_id, stop_words) -> None: - self.intro = intro - self.human_id = human_id - self.bot_id = bot_id - self.stop_words = stop_words - self.MEANINGLESS_WORDS.extend(self.stop_words) - - def prepare_prompt(self, messages: list): - """Prepare prompt from history messages.""" - prompt = "" - for msg in messages: - role, content = msg.role, msg.content - if role == "user": - prompt += f"{self.human_id}: {content}\n" - elif role == "assistant": - prompt += f"{self.bot_id}: {content}\n" - else: - prompt += f"{self.unknown_id}: {content}\n" - prompt += f"{self.bot_id}:" - return prompt - - def convert_output(self, output: str): - """Convert the model output to final answer.""" - human_id = self.human_id.strip() - bot_id = self.bot_id.strip() - if human_id != "": - output = output.split(human_id)[0] - if bot_id != "": - output = output.split(bot_id)[0] - for word in self.MEANINGLESS_WORDS: - output = output.replace(word, "") - text = output - # remove partial human_id or bot id - if "\n" in text and ( - human_id.startswith(text[text.rfind("\n") + 1 :]) or bot_id.startswith(text[text.rfind("\n") + 1]) - ): - text = text[: text.rfind("\n")] - return text - - def get_prompt(self, messages): - """Generate response based on messages.""" - prompt = self.prepare_prompt(messages) - return prompt - - -class ChatModelGptJ(ChatModel): - def __init__(self, intro, human_id, bot_id, stop_words): - super().__init__(intro, human_id, bot_id, stop_words) - - def prepare_prompt(self, messages: list): - """Prepare prompt from history messages.""" - prompt = self.intro - for msg in messages: - msg = dict(msg) - role, content = msg["role"], msg["content"] - if role == "user": - if self.human_id != "": - prompt += f"{self.human_id}:\n{content}\n" - else: - prompt += f"{content}\n" - elif role == "assistant": - if self.bot_id != "": - prompt += f"{self.bot_id}:\n{content}\n" - else: - prompt += f"{content}\n" - else: - prompt += f"### Unknown:\n{content}\n" - if self.bot_id != "": - prompt += f"{self.bot_id}:\n" - return prompt - - -class ChatModelLLama(ChatModel): - def __init__(self, intro="", human_id="[INST] {msg} [/INST]", bot_id="", stop_words=[]): - super().__init__(intro, human_id, bot_id, stop_words) - - def prepare_prompt(self, messages: list): - """Prepare prompt from history messages.""" - prompt = self.intro - for msg in messages: - msg = dict(msg) - role, content = msg["role"], msg["content"] - if role == "user": - if self.human_id != "": - prompt += self.human_id.format(msg=content) - else: - prompt += f"{content}\n" - elif role == "assistant": - prompt += f"{content}\n" - elif role == "tool": - prompt += f"{content}\n" - elif role == "system": - prompt += f"### system:\n{content}\n" - else: - prompt += f"### Unknown:\n{content}\n" - if self.bot_id != "": - prompt += f"{self.bot_id}:\n" - return prompt - - -class ChatModelGemma(ChatModel): - def __init__(self, intro, human_id, bot_id, stop_words): - super().__init__(intro, human_id, bot_id, stop_words) - - def prepare_prompt(self, messages: list): - """Prepare prompt from history messages.""" - prompt = self.intro - for msg in messages: - msg = dict(msg) - role, content = msg["role"], msg["content"] - if role == "user": - if self.human_id != "": - prompt += f"{self.human_id} {content}\n" - else: - prompt += f"{content}\n" - elif role == "assistant": - if self.bot_id != "": - prompt += f"{self.bot_id} {content}\n" - else: - prompt += f"{content}\n" - else: - prompt += f"### Unknown:\n{content}\n" - if self.bot_id != "": - prompt += f"{self.bot_id}:\n" - return prompt - - -class ChatModelNoFormat(ChatModel): - def __init__(self, intro, human_id, bot_id, stop_words): - super().__init__(intro, human_id, bot_id, stop_words) - - def prepare_prompt(self, messages: list): - """Prepare prompt from history messages.""" - prompt = "" - for msg in messages: - msg = dict(msg) - prompt += msg["content"] - return prompt - - -class GenerateResult(BaseModel): - text: str = "" - input_length: int = None - generate_length: int = None - - -class Predictor: - def __init__(self, infer_conf: dict) -> None: - model_id_or_path = infer_conf["model_id_or_path"] - use_auth_token = infer_conf["use_auth_token"] - trust_remote_code = infer_conf["trust_remote_code"] - - device = os.environ.get("DEVICE", "hpu") - - self.tokenizer = AutoTokenizer.from_pretrained( - model_id_or_path, use_auth_token=use_auth_token, trust_remote_code=trust_remote_code - ) - self.device = torch.device(device) - # now deepspeed predictor don't have the model - # so configure_tokenizer cannot be called - # this should be solved in the next pr - # where it is also a worker - # This can be removed then - if self.tokenizer.pad_token_id is None: - self.tokenizer.pad_token_id = self.tokenizer.eos_token_id - - self.input_length = None - - def tokenize_inputs(self, text): - input_tokens = self.tokenizer(text, return_tensors="pt", padding=True) - input_ids = input_tokens.input_ids - self.input_length = input_ids.size()[1] - input_ids = input_ids.to(device=self.device) - return input_ids, self.input_length - - def configure_tokenizer(self, model_name): - model = self.model - tokenizer = self.tokenizer - if re.search("llama", model.config.architectures[0], re.IGNORECASE): - # unwind broken decapoda-research config - model.generation_config.pad_token_id = 0 - model.generation_config.bos_token_id = 1 - model.generation_config.eos_token_id = 2 - - if ( - hasattr(model.generation_config, "pad_token_id") - and model.generation_config.pad_token_id is not None - and "chatglm" not in model_name - ): - tokenizer.pad_token_id = model.generation_config.pad_token_id - if ( - hasattr(model.generation_config, "eos_token_id") - and model.generation_config.eos_token_id is not None - and "chatglm" not in model_name - ): - tokenizer.eos_token_id = model.generation_config.eos_token_id - if hasattr(model.generation_config, "bos_token_id") and model.generation_config.bos_token_id is not None: - tokenizer.bos_token_id = model.generation_config.bos_token_id - - if tokenizer.pad_token_id is None: - model.generation_config.pad_token_id = tokenizer.pad_token_id = tokenizer.eos_token_id - - if model.generation_config.eos_token_id is None: - model.generation_config.eos_token_id = tokenizer.eos_token_id - - if not model.config.is_encoder_decoder: - tokenizer.padding_side = "left" - - if tokenizer.pad_token is None and tokenizer.pad_token_id is None: - tokenizer.pad_token = tokenizer.eos_token - model.generation_config.pad_token_id = model.generation_config.eos_token_id - - def generate(self, prompts: Union[str, List[str]], **config) -> Union[GenerateResult, List[GenerateResult], None]: - pass - - async def generate_async(self, prompts: Union[str, List[str]], **config) -> Union[str, List[str]]: - pass - - # output is streamed into streamer - def streaming_generate(self, prompt: str, streamer, **config) -> None: - pass - - def get_streamer(self): - pass - - async def stream_results(self, results_generator) -> AsyncGenerator[str, None]: - pass - - -class HPUPredictor(Predictor): - def __init__(self, infer_conf: dict): - super().__init__(infer_conf) - - model_id_or_path = infer_conf["model_id_or_path"] - use_auth_token = infer_conf["use_auth_token"] - trust_remote_code = infer_conf["trust_remote_code"] - self.cpus_per_worker = infer_conf["num_cpus_per_worker"] - self.hpus_per_worker = infer_conf["num_hpus_per_worker"] - # decide correct torch type for loading HF model - self.use_lazy_mode = True - self.use_hpu_graphs = False - # TODO add torch_compile, i.e. hpu specific configs. including quant - # if args.torch_compile and model.config.model_type == "llama": - # self.use_lazy_mode = False - - from optimum.habana.transformers.modeling_utils import adapt_transformers_to_gaudi - - # Tweak transformer to optimize performance on Gaudi - adapt_transformers_to_gaudi() - # Not using DeepSpeed, load model locally - self.device = torch.device("hpu") - model = AutoModelForCausalLM.from_pretrained( - model_id_or_path, use_auth_token=use_auth_token, trust_remote_code=trust_remote_code - ) - self.model = model.eval().to(self.device) - if self.use_hpu_graphs: - from habana_frameworks.torch.hpu import wrap_in_hpu_graph # pylint: disable=E0401 - - self.model = wrap_in_hpu_graph(self.model) - else: - print("Warning: use_hpu_graphs is set to False. This will hurt the performance.") - self.tokenizer = load_tokenizer(model, model_id_or_path) - - # Use dummy streamer to ignore other workers' outputs - def _create_dummy_streamer(self): - class DummyStreamer: - def put(self, value): - pass - - def end(self): - pass - - return DummyStreamer() - - def _process_config(self, config): - config["lazy_mode"] = self.use_lazy_mode - config["hpu_graphs"] = self.use_hpu_graphs - # max_new_tokens is required for hpu - if "max_new_tokens" not in config: - config["max_new_tokens"] = 128 - - def get_streamer(self): - return TextIteratorStreamer(self.tokenizer, skip_prompt=True, timeout=0, skip_special_tokens=True) - - def generate(self, prompt, **config): - self._process_config(config) - - input_ids, input_length = self.tokenize_inputs(prompt) - gen_tokens = self.model.generate(input_ids, **config) - decode_result = self.tokenizer.batch_decode(gen_tokens, skip_special_tokens=True) - if isinstance(decode_result, list) and len(decode_result) > 1: - return decode_result - elif isinstance(decode_result, list) and len(decode_result) == 1: - decode_result = decode_result[0] - return GenerateResult( - text=decode_result, - input_length=input_length, - generate_length=gen_tokens.size()[1] - input_length, - ) - - def streaming_generate(self, prompt, streamer, **config): - self._process_config(config) - input_ids, _ = self.tokenize_inputs(prompt) - self.model.generate( - input_ids, - streamer=streamer, - **config, - ) - - -chat_processor = { - "ChatModelLlama": ChatModelLLama, - "ChatModelGptJ": ChatModelGptJ, - "ChatModelGemma": ChatModelGemma, - "ChatModelNoFormat": ChatModelNoFormat, -} - - -# 1: Define a Ray Serve deployment. -@serve.deployment -class LLMServe: - _DEFAULT_MAX_BATCH_SIZE = 8 - _DEFAULT_MAX_NUM_SEQS = 256 - - def __init__( - self, infer_conf: dict, max_batch_size=_DEFAULT_MAX_BATCH_SIZE, max_num_seqs=_DEFAULT_MAX_NUM_SEQS - ) -> None: - # All the initialization code goes here - self.predictor = HPUPredictor(infer_conf) - self.loop = asyncio.get_running_loop() - self.process_tool = chat_processor[infer_conf["chat_processor"]]() - self.use_openai = False - - def consume_streamer(self, streamer): - for text in streamer: - yield text - - async def consume_streamer_async(self, streamer: TextIteratorStreamer): - while True: - try: - for token in streamer: - yield token - break - except Empty: - await asyncio.sleep(0.001) - - async def handle_streaming(self, prompt: Union[str, List[str]], config: Dict[str, Any]): - if isinstance(prompt, List): - error_message = "Streaming response is not supported when multiple prompts are provided." - if not self.use_openai: - yield JSONResponse( - status_code=400, - content=error_message, - ) - else: - yield ModelResponse( - error=ErrorResponse( - message=error_message, - code=400, - internal_message=error_message, - type="InternalServerError", - ) - ) - streamer = self.predictor.get_streamer() - self.loop.run_in_executor( - None, functools.partial(self.predictor.streaming_generate, prompt, streamer, **config) - ) - - if not self.use_openai: - yield StreamingResponse(self.consume_streamer_async(streamer), status_code=200, media_type="text/plain") - else: - async for output in self.consume_streamer_async(streamer): - processed_output = output - tool_call_list = None - if self.tools_capture_texts is not None: - (processed_output, tool_call_list) = self.tools_capture_texts( - output, self.openai_tools_prompter, prompt - ) - model_reponse = ModelResponse( - generated_text=processed_output, - tool_calls=tool_call_list, - num_input_tokens=self.predictor.input_length, - num_generate_tokens=1, - preprocessing_time=0, - ) - yield model_reponse - - async def handle_non_streaming(self, prompts, config) -> Union[JSONResponse, str]: - if isinstance(prompts, list): - return await self.handle_static_batch(prompts, **config) - return await self.handle_dynamic_batch((prompts, config)) - - @serve.batch(max_batch_size=_DEFAULT_MAX_BATCH_SIZE) - async def handle_dynamic_batch(self, requests): - batched_prompts: Dict[str, Tuple[Union[str, List[str]]]] = {} - for i, request in enumerate(requests): - prompt = request[0] - config = request[1] - key = str(dict(sorted(config.items()))) - batched_prompts.setdefault(key, ([], [])) - batched_prompts[key][0].append(prompt) - batched_prompts[key][1].append(i) - - results = [None] * len(requests) - for key, (prompts, indices) in batched_prompts.items(): - config = dict(eval(key)) - batched_results = self.predictor.generate(prompts, **config) - for index, result in zip(indices, batched_results): - results[index] = result - if not self.use_openai: - return results - else: - responses = [] - tool_call_list = None - for result in results: - if self.tools_capture_texts is not None: - result.text, tool_call_list = self.tools_capture_texts.process_full_output( - result.text, self.openai_tools_prompter, prompt - ) - responses.append( - ModelResponse( - generated_text=result[-1], - tool_calls=tool_call_list, - num_input_tokens=self.predictor.input_length, - num_generated_tokens=len(result[-1]), - preprocessing_time=0, - ) - ) - return responses - - async def handle_static_batch(self, prompts: List[str], **config: Dict[str, any]): - results = self.predictor.generate(prompts, **config) - if not self.use_openai: - return results - else: - return ModelResponse( - generated_text=results[0].text, - num_input_tokens=results[0].input_length, - num_input_tokens_batch=results[0].input_length, - num_generated_tokens=results[0].generate_length, - preprocessing_time=0, - ) - - def preprocess_prompts(self, input: Union[str, List], tools=None, tool_choice=None): - if isinstance(input, str): - return input - elif isinstance(input, List): - prompts = [] - images = [] - - prompt_format = get_prompt_format(input) - if prompt_format == PromptFormat.CHAT_FORMAT: - # Process the input prompts with tools - self.tool_call_list = None - self.openai_tools_prompter: OpenAIToolsPrompter = OpenAIToolsPrompter() if tools is not None else None - self.tools_capture_texts: ChatPromptCapture = None - if self.openai_tools_prompter is not None: - input = self.openai_tools_prompter.inject_prompt(input, tools, tool_choice) - self.tools_capture_texts = ChatPromptCapture() - for m in input: - if m.tool_calls is not None: # type: ignore - m.content = self.openai_tools_prompter.content_from_assistant(m) # type: ignore - elif m.tool_call_id is not None: # type: ignore - m.content = self.openai_tools_prompter.content_from_tool(m) # type: ignore - # Process the input prompts with MLLM tool - if self.process_tool is not None: - prompt = self.process_tool.get_prompt(input) - return prompt - else: - prompts.extend(input) - elif prompt_format == PromptFormat.PROMPTS_FORMAT: - prompts.extend(input) - else: - raise HTTPException(400, "Invalid prompt format.") - return prompts - else: - raise HTTPException(400, "Invalid prompt format.") - - async def openai_call(self, input: str, config: Dict, streaming_response=True, tools=None, tool_choice=None): - self.use_openai = True - prompts = self.preprocess_prompts(input, tools, tool_choice) - - if streaming_response: - async for result in self.handle_streaming(prompts, config): - yield result - else: - yield await self.handle_non_streaming(prompts, config) diff --git a/comps/prompt_registry/mongo/docker/Dockerfile b/comps/prompt_registry/mongo/docker/Dockerfile index 3438c86fb..db2e9c59d 100644 --- a/comps/prompt_registry/mongo/docker/Dockerfile +++ b/comps/prompt_registry/mongo/docker/Dockerfile @@ -3,7 +3,7 @@ FROM python:3.11-slim -ENV LANG C.UTF-8 +ENV LANG=C.UTF-8 RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ build-essential \ diff --git a/tests/test_agent_langchain.sh b/tests/test_agent_langchain.sh index db19f6c0f..ad9aae145 100644 --- a/tests/test_agent_langchain.sh +++ b/tests/test_agent_langchain.sh @@ -5,6 +5,7 @@ #set -xe WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" ip_address=$(hostname -I | awk '{print $1}') function build_docker_images() {