diff --git a/comps/cores/common/component.py b/comps/cores/common/component.py index 32f098bd1c..d90403f267 100644 --- a/comps/cores/common/component.py +++ b/comps/cores/common/component.py @@ -2,10 +2,12 @@ # SPDX-License-Identifier: Apache-2.0 from abc import ABC, abstractmethod + from ..mega.logger import CustomLogger logger = CustomLogger("OpeaComponent") + class OpeaComponent(ABC): """The OpeaComponent class serves as the base class for all components in the GenAIComps. It provides a unified interface and foundational attributes that every derived component inherits and extends. diff --git a/comps/dataprep/src/integrations/config.py b/comps/dataprep/src/integrations/config.py index 24c9ab0040..4bcb5dce5a 100644 --- a/comps/dataprep/src/integrations/config.py +++ b/comps/dataprep/src/integrations/config.py @@ -32,6 +32,7 @@ def get_boolean_env_var(var_name, default_value=False): else: return default_value + # Embedding model EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5") # TEI Embedding endpoints @@ -44,7 +45,6 @@ def get_boolean_env_var(var_name, default_value=False): SEARCH_BATCH_SIZE = int(os.getenv("SEARCH_BATCH_SIZE", 10)) - ####################################################### # Redis # ####################################################### @@ -52,6 +52,7 @@ def get_boolean_env_var(var_name, default_value=False): REDIS_HOST = os.getenv("REDIS_HOST", "localhost") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) + def format_redis_conn_from_env(): redis_url = os.getenv("REDIS_URL", None) if redis_url: @@ -68,8 +69,8 @@ def format_redis_conn_from_env(): return start + f"{REDIS_HOST}:{REDIS_PORT}" -REDIS_URL = format_redis_conn_from_env() +REDIS_URL = format_redis_conn_from_env() ####################################################### diff --git a/comps/dataprep/src/integrations/milvus_dataprep.py b/comps/dataprep/src/integrations/milvus_dataprep.py index 511ba5d6a2..5cfb994903 100644 --- a/comps/dataprep/src/integrations/milvus_dataprep.py +++ b/comps/dataprep/src/integrations/milvus_dataprep.py @@ -2,26 +2,19 @@ # SPDX-License-Identifier: Apache-2.0 -import os import json +import os from pathlib import Path from typing import List, Optional, Union -from fastapi import Body, File, Form, UploadFile, HTTPException -from .config import ( - COLLECTION_NAME, - LOCAL_EMBEDDING_MODEL, - MILVUS_URI, - INDEX_PARAMS, - MOSEC_EMBEDDING_ENDPOINT, - MOSEC_EMBEDDING_MODEL, - TEI_EMBEDDING_ENDPOINT, -) + +from fastapi import Body, File, Form, HTTPException, UploadFile from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings from langchain_core.documents import Document from langchain_milvus.vectorstores import Milvus from langchain_text_splitters import HTMLHeaderTextSplitter -from comps import OpeaComponent, CustomLogger, DocPath, ServiceType + +from comps import CustomLogger, DocPath, OpeaComponent, ServiceType from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -34,6 +27,15 @@ save_content_to_local_disk, ) +from .config import ( + COLLECTION_NAME, + INDEX_PARAMS, + LOCAL_EMBEDDING_MODEL, + MILVUS_URI, + MOSEC_EMBEDDING_ENDPOINT, + MOSEC_EMBEDDING_MODEL, + TEI_EMBEDDING_ENDPOINT, +) logger = CustomLogger("milvus_dataprep") logflag = os.getenv("LOGFLAG", False) @@ -187,8 +189,8 @@ def delete_by_partition_field(my_milvus, partition_field): class OpeaMilvusDataprep(OpeaComponent): - """ - A specialized dataprep component derived from OpeaComponent for milvus dataprep services. + """A specialized dataprep component derived from OpeaComponent for milvus dataprep services. + Attributes: client (Milvus): An instance of the milvus client for vector database operations. """ @@ -199,7 +201,7 @@ def __init__(self, name: str, description: str, config: dict = None): def _initialize_embedder(self): if logflag: - logger.info(f"[ initialize embedder ] initializing milvus embedder...") + logger.info("[ initialize embedder ] initializing milvus embedder...") # Define embeddings according to server type (TEI, MOSEC, or local) if MOSEC_EMBEDDING_ENDPOINT: # create embeddings using MOSEC endpoint service @@ -219,15 +221,15 @@ def _initialize_embedder(self): logger.info(f"[ milvus embedding ] LOCAL_EMBEDDING_MODEL:{LOCAL_EMBEDDING_MODEL}") embeddings = HuggingFaceBgeEmbeddings(model_name=LOCAL_EMBEDDING_MODEL) return embeddings - + def check_health(self) -> bool: - """ - Checks the health of the dataprep service. + """Checks the health of the dataprep service. + Returns: bool: True if the service is reachable and healthy, False otherwise. """ if logflag: - logger.info(f"[ health check ] start to check health of milvus") + logger.info("[ health check ] start to check health of milvus") try: client = Milvus( embedding_function=self.embedder, @@ -255,8 +257,8 @@ async def ingest_files( process_table: bool = Form(False), table_strategy: str = Form("fast"), ): - """ - Ingest files/links content into milvus database. + """Ingest files/links content into milvus database. + Save in the format of vector[], the vector length depends on the emedding model type. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: @@ -315,7 +317,7 @@ async def ingest_files( process_table=process_table, table_strategy=table_strategy, ), - self.embedder + self.embedder, ) uploaded_files.append(save_path) if logflag: @@ -340,7 +342,9 @@ async def ingest_files( try: search_res = search_by_file(my_milvus.col, encoded_link + ".txt") except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed when searching in Milvus db for link {link}.") + raise HTTPException( + status_code=500, detail=f"Failed when searching in Milvus db for link {link}." + ) if len(search_res) > 0: if logflag: logger.info(f"[ milvus ingest ] Link {link} already exists.") @@ -359,7 +363,7 @@ async def ingest_files( process_table=process_table, table_strategy=table_strategy, ), - self.embedder + self.embedder, ) if logflag: logger.info(f"[ milvus ingest] Successfully saved link list {link_list}") @@ -368,15 +372,13 @@ async def ingest_files( raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") async def get_files(self): - """ - Get file structure from milvus database in the format of - { - "name": "File Name", - "id": "File Name", - "type": "File", - "parent": "", - } - """ + """Get file structure from milvus database in the format of + { + "name": "File Name", + "id": "File Name", + "type": "File", + "parent": "", + }""" if logflag: logger.info("[ milvus get ] start to get file structure") diff --git a/comps/dataprep/src/integrations/redis_dataprep.py b/comps/dataprep/src/integrations/redis_dataprep.py index 180d28387f..9db2e7d9fc 100644 --- a/comps/dataprep/src/integrations/redis_dataprep.py +++ b/comps/dataprep/src/integrations/redis_dataprep.py @@ -2,16 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 -import os import json -import redis +import os from pathlib import Path from typing import List, Optional, Union -from fastapi import Body, File, Form, UploadFile, HTTPException -from .config import ( - EMBED_MODEL, TEI_EMBEDDING_ENDPOINT, INDEX_NAME, - KEY_INDEX_NAME, REDIS_URL, SEARCH_BATCH_SIZE -) + +import redis +from fastapi import Body, File, Form, HTTPException, UploadFile from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_community.vectorstores import Redis @@ -19,7 +16,8 @@ from langchain_text_splitters import HTMLHeaderTextSplitter from redis.commands.search.field import TextField from redis.commands.search.indexDefinition import IndexDefinition, IndexType -from comps import OpeaComponent, CustomLogger, DocPath, ServiceType + +from comps import CustomLogger, DocPath, OpeaComponent, ServiceType from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -32,6 +30,7 @@ save_content_to_local_disk, ) +from .config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL, SEARCH_BATCH_SIZE, TEI_EMBEDDING_ENDPOINT logger = CustomLogger("redis_dataprep") logflag = os.getenv("LOGFLAG", False) @@ -216,8 +215,8 @@ def ingest_data_to_redis(doc_path: DocPath): class OpeaRedisDataprep(OpeaComponent): - """ - A specialized dataprep component derived from OpeaComponent for redis dataprep services. + """A specialized dataprep component derived from OpeaComponent for redis dataprep services. + Attributes: client (redis.Redis): An instance of the redis client for vector database operations. """ @@ -230,24 +229,24 @@ def __init__(self, name: str, description: str, config: dict = None): def _initialize_client(self) -> redis.Redis: if logflag: - logger.info(f"[ initialize client ] initializing redis client...") + logger.info("[ initialize client ] initializing redis client...") """Initializes the redis client.""" - try: + try: client = redis.Redis(connection_pool=redis_pool) return client except Exception as e: logger.error(f"fail to initialize redis client: {e}") return None - + def check_health(self) -> bool: - """ - Checks the health of the dataprep service. + """Checks the health of the dataprep service. + Returns: bool: True if the service is reachable and healthy, False otherwise. """ if logflag: - logger.info(f"[ health check ] start to check health of redis") + logger.info("[ health check ] start to check health of redis") try: if self.client.ping(): logger.info("[ health check ] Successfully connected to Redis!") @@ -268,8 +267,8 @@ async def ingest_files( process_table: bool = Form(False), table_strategy: str = Form("fast"), ): - """ - Ingest files/links content into redis database. + """Ingest files/links content into redis database. + Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: @@ -305,7 +304,8 @@ async def ingest_files( logger.info(f"[ redis ingest] File {file.filename} does not exist.") if key_ids: raise HTTPException( - status_code=400, detail=f"Uploaded file {file.filename} already exists. Please change file name." + status_code=400, + detail=f"Uploaded file {file.filename} already exists. Please change file name.", ) save_path = upload_folder + encode_file @@ -370,15 +370,13 @@ async def ingest_files( raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") async def get_files(self): - """ - Get file structure from redis database in the format of - { - "name": "File Name", - "id": "File Name", - "type": "File", - "parent": "", - } - """ + """Get file structure from redis database in the format of + { + "name": "File Name", + "id": "File Name", + "type": "File", + "parent": "", + }""" if logflag: logger.info("[ redis get ] start to get file structure") @@ -394,7 +392,9 @@ async def get_files(self): return file_list while True: - response = self.client.execute_command("FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE) + response = self.client.execute_command( + "FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE + ) # no doc retrieved if len(response) < 2: break @@ -474,7 +474,9 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): except Exception as e: if logflag: logger.info(f"[ redis delete ] {e}, File {file_path} does not exists.") - raise HTTPException(status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path.") + raise HTTPException( + status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path." + ) file_ids = key_ids.split("#") # delete file keys id in db KEY_INDEX_NAME @@ -493,7 +495,9 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): except Exception as e: if logflag: logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.") - raise HTTPException(status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path.") + raise HTTPException( + status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path." + ) # delete file content try: diff --git a/comps/dataprep/src/opea_dataprep_controller.py b/comps/dataprep/src/opea_dataprep_controller.py index 39d9fa03b9..f879776b5e 100644 --- a/comps/dataprep/src/opea_dataprep_controller.py +++ b/comps/dataprep/src/opea_dataprep_controller.py @@ -3,8 +3,8 @@ import os -from comps import CustomLogger, OpeaComponentController +from comps import CustomLogger, OpeaComponentController logger = CustomLogger("opea_dataprep_controller") logflag = os.getenv("LOGFLAG", False) @@ -19,16 +19,15 @@ def invoke(self, *args, **kwargs): async def ingest_files(self, *args, **kwargs): if logflag: - logger.info(f"[ dataprep controller ] ingest files") + logger.info("[ dataprep controller ] ingest files") return await self.active_component.ingest_files(*args, **kwargs) - + async def get_files(self, *args, **kwargs): if logflag: - logger.info(f"[ dataprep controller ] get files") + logger.info("[ dataprep controller ] get files") return await self.active_component.get_files(*args, **kwargs) - + async def delete_files(self, *args, **kwargs): if logflag: - logger.info(f"[ dataprep controller ] delete files") + logger.info("[ dataprep controller ] delete files") return await self.active_component.delete_files(*args, **kwargs) - diff --git a/comps/dataprep/src/opea_dataprep_microservice.py b/comps/dataprep/src/opea_dataprep_microservice.py index ac2b83ab45..92753acb83 100644 --- a/comps/dataprep/src/opea_dataprep_microservice.py +++ b/comps/dataprep/src/opea_dataprep_microservice.py @@ -5,10 +5,12 @@ import os import time from typing import List, Optional, Union + from fastapi import Body, File, Form, UploadFile -from integrations.redis_dataprep import OpeaRedisDataprep from integrations.milvus_dataprep import OpeaMilvusDataprep +from integrations.redis_dataprep import OpeaRedisDataprep from opea_dataprep_controller import OpeaDataprepController + from comps import ( CustomLogger, ServiceType, @@ -19,7 +21,6 @@ ) from comps.dataprep.src.utils import create_upload_folder - logger = CustomLogger("opea_dataprep_microservice") logflag = os.getenv("LOGFLAG", False) upload_folder = "./uploaded_files/" @@ -98,7 +99,7 @@ async def get_files(): start = time.time() if logflag: - logger.info(f"[ get ] start to get ingested files") + logger.info("[ get ] start to get ingested files") try: # Use the controller to invoke the active component @@ -126,7 +127,7 @@ async def delete_files(file_path: str = Body(..., embed=True)): start = time.time() if logflag: - logger.info(f"[ delete ] start to delete ingested files") + logger.info("[ delete ] start to delete ingested files") try: # Use the controller to invoke the active component diff --git a/comps/embeddings/src/integrations/dependency/bridgetower/README.md b/comps/embeddings/src/integrations/dependency/bridgetower/README.md index 3d8d4819c9..f7cdeb5787 100644 --- a/comps/embeddings/src/integrations/dependency/bridgetower/README.md +++ b/comps/embeddings/src/integrations/dependency/bridgetower/README.md @@ -29,7 +29,6 @@ docker compose -f compose_intel_hpu.yaml up -d ## 🚀2. Start MMEI on Xeon CPU - - Xeon CPU ```bash diff --git a/comps/embeddings/src/integrations/dependency/clip/README.md b/comps/embeddings/src/integrations/dependency/clip/README.md index 79822c8e66..af880af3a8 100644 --- a/comps/embeddings/src/integrations/dependency/clip/README.md +++ b/comps/embeddings/src/integrations/dependency/clip/README.md @@ -75,4 +75,3 @@ curl http://localhost:6990/v1/embeddings\ -d '{"input":["Hello, world!","How are you?"], "dimensions":100}' \ -H 'Content-Type: application/json' ``` - diff --git a/comps/embeddings/src/integrations/dependency/clip/__init__.py b/comps/embeddings/src/integrations/dependency/clip/__init__.py index 41ff853e9e..916f3a44b2 100644 --- a/comps/embeddings/src/integrations/dependency/clip/__init__.py +++ b/comps/embeddings/src/integrations/dependency/clip/__init__.py @@ -1,4 +1,2 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 - - diff --git a/comps/embeddings/src/integrations/dependency/clip/clip_embedding.py b/comps/embeddings/src/integrations/dependency/clip/clip_embedding.py index ba9251bc38..f010245dd2 100644 --- a/comps/embeddings/src/integrations/dependency/clip/clip_embedding.py +++ b/comps/embeddings/src/integrations/dependency/clip/clip_embedding.py @@ -49,4 +49,3 @@ def get_video_embeddings(self, frames_batch): video_embeddings = video_embeddings / video_embeddings.norm(dim=-1, keepdim=True) vid_embs.append(video_embeddings) return torch.cat(vid_embs, dim=0) - diff --git a/comps/embeddings/src/integrations/dependency/clip/clip_server.py b/comps/embeddings/src/integrations/dependency/clip/clip_server.py index 1cf29f51e4..d74605cdc3 100644 --- a/comps/embeddings/src/integrations/dependency/clip/clip_server.py +++ b/comps/embeddings/src/integrations/dependency/clip/clip_server.py @@ -6,8 +6,8 @@ import time from typing import List, Union -from dateparser.search import search_dates from clip_embedding import vCLIP +from dateparser.search import search_dates from comps import ( CustomLogger, @@ -122,4 +122,3 @@ async def get_embeddings(text: Union[str, List[str]]) -> List[List[float]]: if __name__ == "__main__": embeddings = vCLIP({"model_name": "openai/clip-vit-base-patch32", "num_frm": 4}) opea_microservices["opea_service@embedding_multimodal_clip"].start() - diff --git a/comps/embeddings/src/integrations/dependency/clip/compose_intel_cpu.yaml b/comps/embeddings/src/integrations/dependency/clip/compose_intel_cpu.yaml index b8a4b9b1f3..112804aa42 100644 --- a/comps/embeddings/src/integrations/dependency/clip/compose_intel_cpu.yaml +++ b/comps/embeddings/src/integrations/dependency/clip/compose_intel_cpu.yaml @@ -19,4 +19,3 @@ services: networks: default: driver: bridge - diff --git a/comps/embeddings/src/integrations/opea_mosec_embedding.py b/comps/embeddings/src/integrations/opea_mosec_embedding.py index 6d15acd164..e4539e3e6d 100644 --- a/comps/embeddings/src/integrations/opea_mosec_embedding.py +++ b/comps/embeddings/src/integrations/opea_mosec_embedding.py @@ -1,24 +1,22 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -import requests import os + +import requests from openai import AsyncClient -from comps import OpeaComponent, CustomLogger, ServiceType -from comps.cores.proto.api_protocol import ( - EmbeddingRequest, - EmbeddingResponse, -) +from comps import CustomLogger, OpeaComponent, ServiceType +from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse logger = CustomLogger("opea_mosec_embedding") logflag = os.getenv("LOGFLAG", False) DEFAULT_MODEL = "/home/user/bge-large-zh-v1.5/" + class OpeaMosecEmbedding(OpeaComponent): - """ - A specialized embedding component derived from OpeaComponent for TEI embedding services. + """A specialized embedding component derived from OpeaComponent for TEI embedding services. Attributes: client (AsyncInferenceClient): An instance of the async client for embedding generation. @@ -31,8 +29,7 @@ def __init__(self, name: str, description: str, config: dict = None): self.client = AsyncClient(api_key="fake", base_url=self.base_url) async def invoke(self, input: EmbeddingRequest) -> EmbeddingResponse: - """ - Invokes the embedding service to generate embeddings for the provided input. + """Invokes the embedding service to generate embeddings for the provided input. Args: input (EmbeddingRequest): The input in OpenAI embedding format, including text(s) and optional parameters like model. @@ -58,8 +55,7 @@ async def invoke(self, input: EmbeddingRequest) -> EmbeddingResponse: return embeddings def check_health(self) -> bool: - """ - Checks the health of the embedding service. + """Checks the health of the embedding service. Returns: bool: True if the service is reachable and healthy, False otherwise. diff --git a/comps/embeddings/src/integrations/opea_multimodal_embedding_bridgetower.py b/comps/embeddings/src/integrations/opea_multimodal_embedding_bridgetower.py index aea220426c..9d3e468642 100644 --- a/comps/embeddings/src/integrations/opea_multimodal_embedding_bridgetower.py +++ b/comps/embeddings/src/integrations/opea_multimodal_embedding_bridgetower.py @@ -4,26 +4,17 @@ import asyncio import base64 import os -import requests -from comps import OpeaComponent, CustomLogger, ServiceType +import requests -from comps import ( - CustomLogger, - EmbedMultimodalDoc, - MultimodalDoc, - ServiceType, - TextDoc, - TextImageDoc, -) +from comps import CustomLogger, EmbedMultimodalDoc, MultimodalDoc, OpeaComponent, ServiceType, TextDoc, TextImageDoc logger = CustomLogger("opea_multimodal_embedding_bridgetower") logflag = os.getenv("LOGFLAG", False) class OpeaMultimodalEmbeddingBrigeTower(OpeaComponent): - """ - A specialized embedding component derived from OpeaComponent for local deployed BrigeTower multimodal embedding services. + """A specialized embedding component derived from OpeaComponent for local deployed BrigeTower multimodal embedding services. Attributes: model_name (str): The name of the embedding model used. @@ -34,8 +25,7 @@ def __init__(self, name: str, description: str, config: dict = None): self.base_url = os.getenv("MMEI_EMBEDDING_ENDPOINT", "http://localhost:8080") async def invoke(self, input: MultimodalDoc) -> EmbedMultimodalDoc: - """ - Invokes the embedding service to generate embeddings for the provided input. + """Invokes the embedding service to generate embeddings for the provided input. Args: input (Union[str, List[str]]): The input text(s) for which embeddings are to be generated. @@ -58,7 +48,9 @@ async def invoke(self, input: MultimodalDoc) -> EmbedMultimodalDoc: "Please verify the input type and try again." ) - response = await asyncio.to_thread(requests.post, f"{self.base_url}/v1/encode", headers={"Content-Type": "application/json"}, json=json) + response = await asyncio.to_thread( + requests.post, f"{self.base_url}/v1/encode", headers={"Content-Type": "application/json"}, json=json + ) response_json = response.json() embed_vector = response_json["embedding"] if isinstance(input, TextDoc): diff --git a/comps/embeddings/src/integrations/opea_tei_embedding.py b/comps/embeddings/src/integrations/opea_tei_embedding.py index 3439caa9bb..1bfc708fea 100644 --- a/comps/embeddings/src/integrations/opea_tei_embedding.py +++ b/comps/embeddings/src/integrations/opea_tei_embedding.py @@ -1,18 +1,16 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -import requests -from typing import List, Union -import os import json +import os +from typing import List, Union + +import requests from huggingface_hub import AsyncInferenceClient +from comps import CustomLogger, OpeaComponent, ServiceType from comps.cores.mega.utils import get_access_token -from comps import OpeaComponent, CustomLogger, ServiceType -from comps.cores.proto.api_protocol import ( - EmbeddingRequest, - EmbeddingResponse, -) +from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse logger = CustomLogger("opea_tei_embedding") logflag = os.getenv("LOGFLAG", False) @@ -20,9 +18,9 @@ CLIENTID = os.getenv("CLIENTID") CLIENT_SECRET = os.getenv("CLIENT_SECRET") + class OpeaTEIEmbedding(OpeaComponent): - """ - A specialized embedding component derived from OpeaComponent for TEI embedding services. + """A specialized embedding component derived from OpeaComponent for TEI embedding services. Attributes: client (AsyncInferenceClient): An instance of the async client for embedding generation. @@ -36,7 +34,9 @@ def __init__(self, name: str, description: str, config: dict = None): def _initialize_client(self) -> AsyncInferenceClient: """Initializes the AsyncInferenceClient.""" - access_token = (get_access_token(TOKEN_URL, CLIENTID, CLIENT_SECRET) if TOKEN_URL and CLIENTID and CLIENT_SECRET else None) + access_token = ( + get_access_token(TOKEN_URL, CLIENTID, CLIENT_SECRET) if TOKEN_URL and CLIENTID and CLIENT_SECRET else None + ) headers = {"Authorization": f"Bearer {access_token}"} if access_token else {} return AsyncInferenceClient( model=f"{self.base_url}/v1/embeddings", @@ -45,8 +45,7 @@ def _initialize_client(self) -> AsyncInferenceClient: ) async def invoke(self, input: EmbeddingRequest) -> EmbeddingResponse: - """ - Invokes the embedding service to generate embeddings for the provided input. + """Invokes the embedding service to generate embeddings for the provided input. Args: input (EmbeddingRequest): The input in OpenAI embedding format, including text(s) and optional parameters like model. @@ -64,14 +63,15 @@ async def invoke(self, input: EmbeddingRequest) -> EmbeddingResponse: raise ValueError("Invalid input format: Only string or list of strings are supported.") else: raise TypeError("Unsupported input type: input must be a string or list of strings.") - response = await self.client.post(json={"input": texts, "encoding_format": input.encoding_format, "model": input.model, "user": input.user}, - task="text-embedding") + response = await self.client.post( + json={"input": texts, "encoding_format": input.encoding_format, "model": input.model, "user": input.user}, + task="text-embedding", + ) embeddings = json.loads(response.decode()) return EmbeddingResponse(**embeddings) def check_health(self) -> bool: - """ - Checks the health of the embedding service. + """Checks the health of the embedding service. Returns: bool: True if the service is reachable and healthy, False otherwise. diff --git a/comps/embeddings/src/integrations/predictionguard_embedding.py b/comps/embeddings/src/integrations/predictionguard_embedding.py index f2c4d77194..a1fa0991ab 100644 --- a/comps/embeddings/src/integrations/predictionguard_embedding.py +++ b/comps/embeddings/src/integrations/predictionguard_embedding.py @@ -5,16 +5,14 @@ import os from predictionguard import PredictionGuard -from comps import OpeaComponent, CustomLogger, ServiceType -from comps.cores.proto.api_protocol import ( - EmbeddingRequest, - EmbeddingResponse, - EmbeddingResponseData -) + +from comps import CustomLogger, OpeaComponent, ServiceType +from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse, EmbeddingResponseData logger = CustomLogger("predictionguard_embedding") logflag = os.getenv("LOGFLAG", False) + class PredictionguardEmbedding(OpeaComponent): """A specialized embedding component derived from OpeaComponent for interacting with Prediction Guard services. @@ -49,9 +47,9 @@ def check_health(self) -> bool: response = self.client.embeddings.create(model="bridgetower-large-itm-mlm-itc", input=[{"text": "hello"}]) # Check if the response is a valid dictionary and contains the expected 'model' key - if isinstance(response, dict) and 'model' in response: + if isinstance(response, dict) and "model" in response: # Check if the model matches the expected model name - if response['model'] == self.model_name: + if response["model"] == self.model_name: return True else: return False @@ -64,10 +62,8 @@ def check_health(self) -> bool: logger.error(f"Health check failed due to an exception: {e}") return False - async def invoke(self, input: EmbeddingRequest) -> EmbeddingResponse: - """ - Invokes the embedding service to generate embeddings for the provided input. + """Invokes the embedding service to generate embeddings for the provided input. Args: input (EmbeddingRequest): The input in OpenAI embedding format, including text(s) and optional parameters like model. diff --git a/comps/embeddings/src/opea_embedding_microservice.py b/comps/embeddings/src/opea_embedding_microservice.py index 05ab7156c7..d86461b296 100644 --- a/comps/embeddings/src/opea_embedding_microservice.py +++ b/comps/embeddings/src/opea_embedding_microservice.py @@ -4,6 +4,10 @@ import os import time +from integrations.opea_mosec_embedding import OpeaMosecEmbedding +from integrations.opea_tei_embedding import OpeaTEIEmbedding +from integrations.predictionguard_embedding import PredictionguardEmbedding + from comps import ( CustomLogger, OpeaComponentController, @@ -13,13 +17,7 @@ register_statistics, statistics_dict, ) -from comps.cores.proto.api_protocol import ( - EmbeddingRequest, - EmbeddingResponse, -) -from integrations.opea_tei_embedding import OpeaTEIEmbedding -from integrations.predictionguard_embedding import PredictionguardEmbedding -from integrations.opea_mosec_embedding import OpeaMosecEmbedding +from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse logger = CustomLogger("opea_embedding_microservice") logflag = os.getenv("LOGFLAG", False) diff --git a/comps/embeddings/src/opea_multimodal_embedding_microservice.py b/comps/embeddings/src/opea_multimodal_embedding_microservice.py index 3ca1ca6fd7..6f0ba8a78b 100644 --- a/comps/embeddings/src/opea_multimodal_embedding_microservice.py +++ b/comps/embeddings/src/opea_multimodal_embedding_microservice.py @@ -4,8 +4,12 @@ import os import time +from integrations.opea_multimodal_embedding_bridgetower import OpeaMultimodalEmbeddingBrigeTower + from comps import ( CustomLogger, + EmbedMultimodalDoc, + MultimodalDoc, OpeaComponentController, ServiceType, opea_microservices, @@ -13,13 +17,6 @@ register_statistics, statistics_dict, ) -from comps import ( - EmbedMultimodalDoc, - MultimodalDoc, -) - -from integrations.opea_multimodal_embedding_bridgetower import OpeaMultimodalEmbeddingBrigeTower - logger = CustomLogger("opea_multimodal_embedding_microservice") logflag = os.getenv("LOGFLAG", False) @@ -44,6 +41,7 @@ port = int(os.getenv("MM_EMBEDDING_PORT_MICROSERVICE", 6000)) + @register_microservice( name="opea_service@multimodal_embedding", service_type=ServiceType.EMBEDDING, diff --git a/comps/retrievers/src/integrations/__init__.py b/comps/retrievers/src/integrations/__init__.py index 4582b4f9aa..916f3a44b2 100644 --- a/comps/retrievers/src/integrations/__init__.py +++ b/comps/retrievers/src/integrations/__init__.py @@ -1,2 +1,2 @@ # Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 \ No newline at end of file +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/retrievers/src/integrations/config.py b/comps/retrievers/src/integrations/config.py index d84fbd6fb5..fa6f2ba3b1 100644 --- a/comps/retrievers/src/integrations/config.py +++ b/comps/retrievers/src/integrations/config.py @@ -38,6 +38,7 @@ def get_boolean_env_var(var_name, default_value=False): # Set DEBUG env var to "true" if you wish to enable LC debugging module if DEBUG: import langchain + langchain.debug = True # Embedding model @@ -50,7 +51,6 @@ def get_boolean_env_var(var_name, default_value=False): parent_dir = os.path.dirname(current_file_path) - ####################################################### # Redis # ####################################################### @@ -58,6 +58,7 @@ def get_boolean_env_var(var_name, default_value=False): REDIS_HOST = os.getenv("REDIS_HOST", "localhost") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) + def format_redis_conn_from_env(): redis_url = os.getenv("REDIS_URL", None) if redis_url: @@ -74,8 +75,8 @@ def format_redis_conn_from_env(): return start + f"{REDIS_HOST}:{REDIS_PORT}" -REDIS_URL = format_redis_conn_from_env() +REDIS_URL = format_redis_conn_from_env() ####################################################### diff --git a/comps/retrievers/src/integrations/milvus_retrievers.py b/comps/retrievers/src/integrations/milvus_retrievers.py index 1a2f328898..d855eb4449 100644 --- a/comps/retrievers/src/integrations/milvus_retrievers.py +++ b/comps/retrievers/src/integrations/milvus_retrievers.py @@ -4,26 +4,21 @@ import os from typing import List, Optional + +from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings +from langchain_milvus.vectorstores import Milvus + +from comps import CustomLogger, EmbedDoc, OpeaComponent, SearchedDoc, ServiceType, TextDoc + from .config import ( COLLECTION_NAME, + INDEX_PARAMS, LOCAL_EMBEDDING_MODEL, MILVUS_URI, - INDEX_PARAMS, MOSEC_EMBEDDING_ENDPOINT, MOSEC_EMBEDDING_MODEL, TEI_EMBEDDING_ENDPOINT, ) -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings -from langchain_milvus.vectorstores import Milvus -from comps import ( - OpeaComponent, - CustomLogger, - EmbedDoc, - SearchedDoc, - ServiceType, - TextDoc, -) - logger = CustomLogger("milvus_retrievers") logflag = os.getenv("LOGFLAG", False) @@ -55,8 +50,8 @@ def empty_embedding() -> List[float]: class OpeaMilvusRetriever(OpeaComponent): - """ - A specialized retriever component derived from OpeaComponent for milvus retriever services. + """A specialized retriever component derived from OpeaComponent for milvus retriever services. + Attributes: client (Milvus): An instance of the milvus client for vector database operations. """ @@ -99,15 +94,15 @@ def _initialize_client(self) -> Milvus: except Exception as e: logger.error(f"fail to initialize milvus client: {e}") return None - + def check_health(self) -> bool: - """ - Checks the health of the retriever service. + """Checks the health of the retriever service. + Returns: bool: True if the service is reachable and healthy, False otherwise. """ if logflag: - logger.info(f"[ check health ] start to check health of milvus") + logger.info("[ check health ] start to check health of milvus") try: _ = self.client.client.list_collections() logger.info("[ check health ] Successfully connected to Milvus!") @@ -117,8 +112,8 @@ def check_health(self) -> bool: return False async def invoke(self, input: EmbedDoc) -> SearchedDoc: - """ - Search the Milvus index for the most similar documents to the input query. + """Search the Milvus index for the most similar documents to the input query. + Args: input (EmbedDoc): The input query to search for. Output: diff --git a/comps/retrievers/src/integrations/redis_retrievers.py b/comps/retrievers/src/integrations/redis_retrievers.py index 5cf299e7a3..dd610f852b 100644 --- a/comps/retrievers/src/integrations/redis_retrievers.py +++ b/comps/retrievers/src/integrations/redis_retrievers.py @@ -4,32 +4,23 @@ import os from typing import Union -from .config import EMBED_MODEL, INDEX_NAME, REDIS_URL, TEI_EMBEDDING_ENDPOINT + from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_community.vectorstores import Redis from langchain_huggingface import HuggingFaceEndpointEmbeddings -from comps import ( - OpeaComponent, - CustomLogger, - EmbedDoc, - SearchedDoc, - ServiceType, -) -from comps.cores.proto.api_protocol import ( - ChatCompletionRequest, - EmbeddingResponse, - RetrievalRequest, - RetrievalResponse, -) +from comps import CustomLogger, EmbedDoc, OpeaComponent, SearchedDoc, ServiceType +from comps.cores.proto.api_protocol import ChatCompletionRequest, EmbeddingResponse, RetrievalRequest, RetrievalResponse + +from .config import EMBED_MODEL, INDEX_NAME, REDIS_URL, TEI_EMBEDDING_ENDPOINT logger = CustomLogger("redis_retrievers") logflag = os.getenv("LOGFLAG", False) class OpeaRedisRetriever(OpeaComponent): - """ - A specialized retriever component derived from OpeaComponent for redis retriever services. + """A specialized retriever component derived from OpeaComponent for redis retriever services. + Attributes: client (redis.Redis): An instance of the redis client for vector database operations. """ @@ -48,21 +39,21 @@ def __init__(self, name: str, description: str, config: dict = None): def _initialize_client(self) -> Redis: """Initializes the redis client.""" - try: + try: client = Redis(embedding=self.embeddings, index_name=INDEX_NAME, redis_url=REDIS_URL) return client except Exception as e: logger.error(f"fail to initialize redis client: {e}") return None - + def check_health(self) -> bool: - """ - Checks the health of the retriever service. + """Checks the health of the retriever service. + Returns: bool: True if the service is reachable and healthy, False otherwise. """ if logflag: - logger.info(f"[ health check ] start to check health of redis") + logger.info("[ health check ] start to check health of redis") try: if self.client.client.ping(): logger.info("[ health check ] Successfully connected to Redis!") @@ -71,11 +62,11 @@ def check_health(self) -> bool: logger.info(f"[ health check ] Failed to connect to Redis: {e}") return False - async def invoke(self, - input: Union[EmbedDoc, RetrievalRequest, ChatCompletionRequest] - ) -> Union[SearchedDoc, RetrievalResponse, ChatCompletionRequest]: - """ - Search the Redis index for the most similar documents to the input query. + async def invoke( + self, input: Union[EmbedDoc, RetrievalRequest, ChatCompletionRequest] + ) -> Union[SearchedDoc, RetrievalResponse, ChatCompletionRequest]: + """Search the Redis index for the most similar documents to the input query. + Args: input (Union[EmbedDoc, RetrievalRequest, ChatCompletionRequest]): The input query to search for. Output: @@ -105,7 +96,9 @@ async def invoke(self, search_res = await self.client.asimilarity_search_by_vector(embedding=embedding_data_input, k=input.k) elif input.search_type == "similarity_distance_threshold": if input.distance_threshold is None: - raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever") + raise ValueError( + "distance_threshold must be provided for " + "similarity_distance_threshold retriever" + ) search_res = await self.client.asimilarity_search_by_vector( embedding=input.embedding, k=input.k, distance_threshold=input.distance_threshold ) diff --git a/comps/retrievers/src/opea_retrievers_microservice.py b/comps/retrievers/src/opea_retrievers_microservice.py index f7b43662ed..04004b2524 100644 --- a/comps/retrievers/src/opea_retrievers_microservice.py +++ b/comps/retrievers/src/opea_retrievers_microservice.py @@ -5,14 +5,16 @@ import os import time from typing import Union -from integrations.redis_retrievers import OpeaRedisRetriever + from integrations.milvus_retrievers import OpeaMilvusRetriever +from integrations.redis_retrievers import OpeaRedisRetriever + from comps import ( CustomLogger, - ServiceType, - OpeaComponentController, EmbedDoc, + OpeaComponentController, SearchedDoc, + ServiceType, TextDoc, opea_microservices, register_microservice, @@ -26,7 +28,6 @@ RetrievalResponseData, ) - logger = CustomLogger("opea_retrievers_microservice") logflag = os.getenv("LOGFLAG", False) # Initialize Controller @@ -64,7 +65,7 @@ @register_statistics(names=["opea_service@retrievers"]) async def ingest_files( input: Union[EmbedDoc, RetrievalRequest, ChatCompletionRequest] - ) -> Union[SearchedDoc, RetrievalResponse, ChatCompletionRequest]: +) -> Union[SearchedDoc, RetrievalResponse, ChatCompletionRequest]: start = time.time() if logflag: @@ -95,9 +96,9 @@ async def ingest_files( if logflag: logger.info(f"[ retrieval ] Output generated: {response}") - + return result - + except Exception as e: logger.error(f"[ retrieval ] Error during retrieval invocation: {e}") raise diff --git a/tests/dataprep/test_dataprep_milvus_langchain.sh b/tests/dataprep/test_dataprep_milvus_langchain.sh index a12791588b..de736fdd32 100644 --- a/tests/dataprep/test_dataprep_milvus_langchain.sh +++ b/tests/dataprep/test_dataprep_milvus_langchain.sh @@ -70,7 +70,7 @@ function validate_service() { # check response status if [ "$HTTP_STATUS" -ne "200" ]; then echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" - + if [[ $SERVICE_NAME == *"dataprep_upload_link"* ]]; then docker logs test-comps-dataprep-milvus-mosec-server >> ${LOG_PATH}/mosec-embedding.log fi diff --git a/tests/dataprep/test_dataprep_redis_langchain.sh b/tests/dataprep/test_dataprep_redis_langchain.sh index f6eaf9125d..bd8685c3c6 100644 --- a/tests/dataprep/test_dataprep_redis_langchain.sh +++ b/tests/dataprep/test_dataprep_redis_langchain.sh @@ -23,7 +23,7 @@ function build_docker_images() { function start_service() { REDIS_PORT=6380 docker run -d --name="test-comps-dataprep-redis-langchain" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p $REDIS_PORT:6379 -p 8002:8001 --ipc=host redis/redis-stack:7.2.0-v9 - + embed_port=5439 embed_model="BAAI/bge-base-en-v1.5" docker run -d -p $embed_port:80 -v ./data:/data --name test-comps-dataprep-redis-langchain-tei-server -e http_proxy=$http_proxy -e https_proxy=$https_proxy --pull always ghcr.io/huggingface/text-embeddings-inference:cpu-1.5 --model-id $embed_model