Skip to content

Commit

Permalink
Merge branch 'refactor_comps' of https://github.com/opea-project/GenA…
Browse files Browse the repository at this point in the history
…IComps into refactor_comps

Signed-off-by: lvliang-intel <[email protected]>
  • Loading branch information
lvliang-intel committed Dec 19, 2024
2 parents 7d7fb58 + ee3b687 commit 910f77d
Show file tree
Hide file tree
Showing 25 changed files with 185 additions and 213 deletions.
2 changes: 2 additions & 0 deletions comps/cores/common/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# SPDX-License-Identifier: Apache-2.0

from abc import ABC, abstractmethod

from ..mega.logger import CustomLogger

logger = CustomLogger("OpeaComponent")


class OpeaComponent(ABC):
"""The OpeaComponent class serves as the base class for all components in the GenAIComps.
It provides a unified interface and foundational attributes that every derived component inherits and extends.
Expand Down
5 changes: 3 additions & 2 deletions comps/dataprep/src/integrations/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def get_boolean_env_var(var_name, default_value=False):
else:
return default_value


# Embedding model
EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5")
# TEI Embedding endpoints
Expand All @@ -44,14 +45,14 @@ def get_boolean_env_var(var_name, default_value=False):
SEARCH_BATCH_SIZE = int(os.getenv("SEARCH_BATCH_SIZE", 10))



#######################################################
# Redis #
#######################################################
# Redis Connection Information
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))


def format_redis_conn_from_env():
redis_url = os.getenv("REDIS_URL", None)
if redis_url:
Expand All @@ -68,8 +69,8 @@ def format_redis_conn_from_env():

return start + f"{REDIS_HOST}:{REDIS_PORT}"

REDIS_URL = format_redis_conn_from_env()

REDIS_URL = format_redis_conn_from_env()


#######################################################
Expand Down
68 changes: 35 additions & 33 deletions comps/dataprep/src/integrations/milvus_dataprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@
# SPDX-License-Identifier: Apache-2.0


import os
import json
import os
from pathlib import Path
from typing import List, Optional, Union
from fastapi import Body, File, Form, UploadFile, HTTPException
from .config import (
COLLECTION_NAME,
LOCAL_EMBEDDING_MODEL,
MILVUS_URI,
INDEX_PARAMS,
MOSEC_EMBEDDING_ENDPOINT,
MOSEC_EMBEDDING_MODEL,
TEI_EMBEDDING_ENDPOINT,
)

from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_milvus.vectorstores import Milvus
from langchain_text_splitters import HTMLHeaderTextSplitter
from comps import OpeaComponent, CustomLogger, DocPath, ServiceType

from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand All @@ -34,6 +27,15 @@
save_content_to_local_disk,
)

from .config import (
COLLECTION_NAME,
INDEX_PARAMS,
LOCAL_EMBEDDING_MODEL,
MILVUS_URI,
MOSEC_EMBEDDING_ENDPOINT,
MOSEC_EMBEDDING_MODEL,
TEI_EMBEDDING_ENDPOINT,
)

logger = CustomLogger("milvus_dataprep")
logflag = os.getenv("LOGFLAG", False)
Expand Down Expand Up @@ -187,8 +189,8 @@ def delete_by_partition_field(my_milvus, partition_field):


class OpeaMilvusDataprep(OpeaComponent):
"""
A specialized dataprep component derived from OpeaComponent for milvus dataprep services.
"""A specialized dataprep component derived from OpeaComponent for milvus dataprep services.
Attributes:
client (Milvus): An instance of the milvus client for vector database operations.
"""
Expand All @@ -199,7 +201,7 @@ def __init__(self, name: str, description: str, config: dict = None):

def _initialize_embedder(self):
if logflag:
logger.info(f"[ initialize embedder ] initializing milvus embedder...")
logger.info("[ initialize embedder ] initializing milvus embedder...")
# Define embeddings according to server type (TEI, MOSEC, or local)
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
Expand All @@ -219,15 +221,15 @@ def _initialize_embedder(self):
logger.info(f"[ milvus embedding ] LOCAL_EMBEDDING_MODEL:{LOCAL_EMBEDDING_MODEL}")
embeddings = HuggingFaceBgeEmbeddings(model_name=LOCAL_EMBEDDING_MODEL)
return embeddings

def check_health(self) -> bool:
"""
Checks the health of the dataprep service.
"""Checks the health of the dataprep service.
Returns:
bool: True if the service is reachable and healthy, False otherwise.
"""
if logflag:
logger.info(f"[ health check ] start to check health of milvus")
logger.info("[ health check ] start to check health of milvus")
try:
client = Milvus(
embedding_function=self.embedder,
Expand Down Expand Up @@ -255,8 +257,8 @@ async def ingest_files(
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
):
"""
Ingest files/links content into milvus database.
"""Ingest files/links content into milvus database.
Save in the format of vector[], the vector length depends on the emedding model type.
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
Expand Down Expand Up @@ -315,7 +317,7 @@ async def ingest_files(
process_table=process_table,
table_strategy=table_strategy,
),
self.embedder
self.embedder,
)
uploaded_files.append(save_path)
if logflag:
Expand All @@ -340,7 +342,9 @@ async def ingest_files(
try:
search_res = search_by_file(my_milvus.col, encoded_link + ".txt")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed when searching in Milvus db for link {link}.")
raise HTTPException(
status_code=500, detail=f"Failed when searching in Milvus db for link {link}."
)
if len(search_res) > 0:
if logflag:
logger.info(f"[ milvus ingest ] Link {link} already exists.")
Expand All @@ -359,7 +363,7 @@ async def ingest_files(
process_table=process_table,
table_strategy=table_strategy,
),
self.embedder
self.embedder,
)
if logflag:
logger.info(f"[ milvus ingest] Successfully saved link list {link_list}")
Expand All @@ -368,15 +372,13 @@ async def ingest_files(
raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")

async def get_files(self):
"""
Get file structure from milvus database in the format of
{
"name": "File Name",
"id": "File Name",
"type": "File",
"parent": "",
}
"""
"""Get file structure from milvus database in the format of
{
"name": "File Name",
"id": "File Name",
"type": "File",
"parent": "",
}"""

if logflag:
logger.info("[ milvus get ] start to get file structure")
Expand Down
66 changes: 35 additions & 31 deletions comps/dataprep/src/integrations/redis_dataprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@
# SPDX-License-Identifier: Apache-2.0


import os
import json
import redis
import os
from pathlib import Path
from typing import List, Optional, Union
from fastapi import Body, File, Form, UploadFile, HTTPException
from .config import (
EMBED_MODEL, TEI_EMBEDDING_ENDPOINT, INDEX_NAME,
KEY_INDEX_NAME, REDIS_URL, SEARCH_BATCH_SIZE
)

import redis
from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain_community.vectorstores import Redis
from langchain_huggingface import HuggingFaceEndpointEmbeddings
from langchain_text_splitters import HTMLHeaderTextSplitter
from redis.commands.search.field import TextField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from comps import OpeaComponent, CustomLogger, DocPath, ServiceType

from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand All @@ -32,6 +30,7 @@
save_content_to_local_disk,
)

from .config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL, SEARCH_BATCH_SIZE, TEI_EMBEDDING_ENDPOINT

logger = CustomLogger("redis_dataprep")
logflag = os.getenv("LOGFLAG", False)
Expand Down Expand Up @@ -216,8 +215,8 @@ def ingest_data_to_redis(doc_path: DocPath):


class OpeaRedisDataprep(OpeaComponent):
"""
A specialized dataprep component derived from OpeaComponent for redis dataprep services.
"""A specialized dataprep component derived from OpeaComponent for redis dataprep services.
Attributes:
client (redis.Redis): An instance of the redis client for vector database operations.
"""
Expand All @@ -230,24 +229,24 @@ def __init__(self, name: str, description: str, config: dict = None):

def _initialize_client(self) -> redis.Redis:
if logflag:
logger.info(f"[ initialize client ] initializing redis client...")
logger.info("[ initialize client ] initializing redis client...")

"""Initializes the redis client."""
try:
try:
client = redis.Redis(connection_pool=redis_pool)
return client
except Exception as e:
logger.error(f"fail to initialize redis client: {e}")
return None

def check_health(self) -> bool:
"""
Checks the health of the dataprep service.
"""Checks the health of the dataprep service.
Returns:
bool: True if the service is reachable and healthy, False otherwise.
"""
if logflag:
logger.info(f"[ health check ] start to check health of redis")
logger.info("[ health check ] start to check health of redis")
try:
if self.client.ping():
logger.info("[ health check ] Successfully connected to Redis!")
Expand All @@ -268,8 +267,8 @@ async def ingest_files(
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
):
"""
Ingest files/links content into redis database.
"""Ingest files/links content into redis database.
Save in the format of vector[768].
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
Expand Down Expand Up @@ -305,7 +304,8 @@ async def ingest_files(
logger.info(f"[ redis ingest] File {file.filename} does not exist.")
if key_ids:
raise HTTPException(
status_code=400, detail=f"Uploaded file {file.filename} already exists. Please change file name."
status_code=400,
detail=f"Uploaded file {file.filename} already exists. Please change file name.",
)

save_path = upload_folder + encode_file
Expand Down Expand Up @@ -370,15 +370,13 @@ async def ingest_files(
raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")

async def get_files(self):
"""
Get file structure from redis database in the format of
{
"name": "File Name",
"id": "File Name",
"type": "File",
"parent": "",
}
"""
"""Get file structure from redis database in the format of
{
"name": "File Name",
"id": "File Name",
"type": "File",
"parent": "",
}"""

if logflag:
logger.info("[ redis get ] start to get file structure")
Expand All @@ -394,7 +392,9 @@ async def get_files(self):
return file_list

while True:
response = self.client.execute_command("FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE)
response = self.client.execute_command(
"FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE
)
# no doc retrieved
if len(response) < 2:
break
Expand Down Expand Up @@ -474,7 +474,9 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}, File {file_path} does not exists.")
raise HTTPException(status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path.")
raise HTTPException(
status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path."
)
file_ids = key_ids.split("#")

# delete file keys id in db KEY_INDEX_NAME
Expand All @@ -493,7 +495,9 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.")
raise HTTPException(status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path.")
raise HTTPException(
status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path."
)

# delete file content
try:
Expand Down
13 changes: 6 additions & 7 deletions comps/dataprep/src/opea_dataprep_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@


import os
from comps import CustomLogger, OpeaComponentController

from comps import CustomLogger, OpeaComponentController

logger = CustomLogger("opea_dataprep_controller")
logflag = os.getenv("LOGFLAG", False)
Expand All @@ -19,16 +19,15 @@ def invoke(self, *args, **kwargs):

async def ingest_files(self, *args, **kwargs):
if logflag:
logger.info(f"[ dataprep controller ] ingest files")
logger.info("[ dataprep controller ] ingest files")
return await self.active_component.ingest_files(*args, **kwargs)

async def get_files(self, *args, **kwargs):
if logflag:
logger.info(f"[ dataprep controller ] get files")
logger.info("[ dataprep controller ] get files")
return await self.active_component.get_files(*args, **kwargs)

async def delete_files(self, *args, **kwargs):
if logflag:
logger.info(f"[ dataprep controller ] delete files")
logger.info("[ dataprep controller ] delete files")
return await self.active_component.delete_files(*args, **kwargs)

Loading

0 comments on commit 910f77d

Please sign in to comment.