From 23e4cb6cc3f8e57cfc00959e4381def501ae3cb2 Mon Sep 17 00:00:00 2001 From: jasperzhu Date: Thu, 4 Jul 2024 19:56:03 +0800 Subject: [PATCH] update milvus service for dataprep and retriever (#255) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * DataPrep extract info from table in the docs (#146) * Add microservice for table extraction Signed-off-by: Liangyx2 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix Signed-off-by: Liangyx2 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update license copyright Signed-off-by: Liangyx2 * DataPrep extract info from table in the docs Signed-off-by: Liangyx2 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * refine Signed-off-by: Liangyx2 * refine Signed-off-by: Liangyx2 * Update prepare_doc_redis.py * Update prepare_doc_qdrant.py * Update prepare_doc_milvus.py --------- Signed-off-by: Liangyx2 Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: chen, suyue Co-authored-by: XuhuiRen <44249229+XuhuiRen@users.noreply.github.com> Signed-off-by: jinjunzh * Remove sensitive info logs (#251) Signed-off-by: Chendi Xue Signed-off-by: jinjunzh * Added support for extracting info from image in the docs (#120) Signed-off-by: Ye, Xinyu Signed-off-by: jinjunzh * enhance statistics ut coverage (#252) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Signed-off-by: jinjunzh * bump version (#253) Signed-off-by: chensuyue Signed-off-by: jinjunzh * support file upload feature for milvus service Signed-off-by: jinjunzh * update embedding with MOSEC_EMBEDDING_ENDPOINT Signed-off-by: jinjunzh * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Signed-off-by: jinjunzh * support file upload feature for milvus service Signed-off-by: jinjunzh * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Signed-off-by: jinjunzh * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Signed-off-by: jinjunzh * fix duplicate ci test (#256) * fix duplicate test Signed-off-by: chensuyue * for test only Signed-off-by: chensuyue * Revert "for test only" This reverts commit a7718aa6e615c6bc472c2f287735c305c6f7a475. --------- Signed-off-by: chensuyue Signed-off-by: jinjunzh * DataPrep extract info from table in the docs (#146) * Add microservice for table extraction Signed-off-by: Liangyx2 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix Signed-off-by: Liangyx2 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update license copyright Signed-off-by: Liangyx2 * DataPrep extract info from table in the docs Signed-off-by: Liangyx2 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * refine Signed-off-by: Liangyx2 * refine Signed-off-by: Liangyx2 * Update prepare_doc_redis.py * Update prepare_doc_qdrant.py * Update prepare_doc_milvus.py --------- Signed-off-by: Liangyx2 Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: chen, suyue Co-authored-by: XuhuiRen <44249229+XuhuiRen@users.noreply.github.com> --------- Signed-off-by: Liangyx2 Signed-off-by: jinjunzh Signed-off-by: Chendi Xue Signed-off-by: Ye, Xinyu Signed-off-by: chensuyue Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Liangyx2 Co-authored-by: chen, suyue Co-authored-by: XuhuiRen <44249229+XuhuiRen@users.noreply.github.com> Co-authored-by: Chendi.Xue Co-authored-by: XinyuYe-Intel Co-authored-by: Sihan Chen <39623753+Spycsh@users.noreply.github.com> Signed-off-by: Yogesh Pandey --- comps/dataprep/milvus/README.md | 73 ++++++- comps/dataprep/milvus/config.py | 9 +- comps/dataprep/milvus/docker/Dockerfile | 10 +- comps/dataprep/milvus/prepare_doc_milvus.py | 206 +++++++++++++++--- comps/dataprep/milvus/requirements.txt | 11 +- comps/retrievers/langchain/milvus/README.md | 6 +- comps/retrievers/langchain/milvus/config.py | 6 + .../langchain/milvus/requirements.txt | 2 + .../langchain/milvus/retriever_milvus.py | 43 +++- 9 files changed, 319 insertions(+), 47 deletions(-) diff --git a/comps/dataprep/milvus/README.md b/comps/dataprep/milvus/README.md index ddf740f89..738869a82 100644 --- a/comps/dataprep/milvus/README.md +++ b/comps/dataprep/milvus/README.md @@ -24,7 +24,7 @@ export https_proxy=${your_http_proxy} export MILVUS=${your_milvus_host_ip} export MILVUS_PORT=19530 export COLLECTION_NAME=${your_collection_name} -export TEI_EMBEDDING_ENDPOINT=${your_tei_endpoint} +export MOSEC_EMBEDDING_ENDPOINT=${your_embedding_endpoint} ``` ## Start Document Preparation Microservice for Milvus with Python Script @@ -41,27 +41,88 @@ python prepare_doc_milvus.py ```bash cd ../../../../ -docker build -t opea/dataprep-milvus:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/milvus/docker/Dockerfile . +docker build -t opea/dataprep-milvus:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg no_proxy=$no_proxy -f comps/dataprep/milvus/docker/Dockerfile . ``` ## Run Docker with CLI ```bash -docker run -d --name="dataprep-milvus-server" -p 6010:6010 --ipc=host -v /your_document_path/:/home/user/doc -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e TEI_EMBEDDING_ENDPOINT=${your_tei_endpoint} -e MILVUS=${your_milvus_host_ip} opea/dataprep-milvus:latest +docker run -d --name="dataprep-milvus-server" -p 6010:6010 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy -e MOSEC_EMBEDDING_ENDPOINT=${your_embedding_endpoint} -e MILVUS=${your_milvus_host_ip} opea/dataprep-milvus:latest ``` # Invoke Microservice -Once document preparation microservice for Qdrant is started, user can use below command to invoke the microservice to convert the document to embedding and save to the database. +Once document preparation microservice for Milvus is started, user can use below command to invoke the microservice to convert the document to embedding and save to the database. + +Make sure the file path after `files=@` is correct. + +- Single file upload ```bash -curl -X POST -H "Content-Type: application/json" -d '{"path":"/home/user/doc/your_document_name"}' http://localhost:6010/v1/dataprep +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./file.pdf" \ + http://localhost:6010/v1/dataprep ``` You can specify chunk_size and chunk_size by the following commands. ```bash -curl -X POST -H "Content-Type: application/json" -d '{"path":"/home/user/doc/your_document_name","chunk_size":1500,"chunk_overlap":100}' http://localhost:6010/v1/dataprep +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./file.pdf" \ + -F "chunk_size=1500" \ + -F "chunk_overlap=100" \ + http://localhost:6010/v1/dataprep +``` + +- Multiple file upload + +```bash +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./file1.pdf" \ + -F "files=@./file2.pdf" \ + -F "files=@./file3.pdf" \ + http://localhost:6010/v1/dataprep +``` + +- Links upload (not supported for llama_index now) + +```bash +curl -X POST \ + -F 'link_list=["https://www.ces.tech/"]' \ + http://localhost:6010/v1/dataprep +``` + +or + +```python +import requests +import json + +proxies = {"http": ""} +url = "http://localhost:6010/v1/dataprep" +urls = [ + "https://towardsdatascience.com/no-gpu-no-party-fine-tune-bert-for-sentiment-analysis-with-vertex-ai-custom-jobs-d8fc410e908b?source=rss----7f60cf5620c9---4" +] +payload = {"link_list": json.dumps(urls)} + +try: + resp = requests.post(url=url, data=payload, proxies=proxies) + print(resp.text) + resp.raise_for_status() # Raise an exception for unsuccessful HTTP status codes + print("Request successful!") +except requests.exceptions.RequestException as e: + print("An error occurred:", e) +``` + +We support table extraction from pdf documents. You can specify process_table and table_strategy by the following commands. "table_strategy" refers to the strategies to understand tables for table retrieval. As the setting progresses from "fast" to "hq" to "llm," the focus shifts towards deeper table understanding at the expense of processing speed. The default strategy is "fast". + +Note: If you specify "table_strategy=llm", You should first start TGI Service, please refer to 1.2.1, 1.3.1 in https://github.com/opea-project/GenAIComps/tree/main/comps/llms/README.md, and then `export TGI_LLM_ENDPOINT="http://${your_ip}:8008"`. + +```bash +curl -X POST -H "Content-Type: application/json" -d '{"path":"/home/user/doc/your_document_name","process_table":true,"table_strategy":"hq"}' http://localhost:6010/v1/dataprep ``` We support table extraction from pdf documents. You can specify process_table and table_strategy by the following commands. "table_strategy" refers to the strategies to understand tables for table retrieval. As the setting progresses from "fast" to "hq" to "llm," the focus shifts towards deeper table understanding at the expense of processing speed. The default strategy is "fast". diff --git a/comps/dataprep/milvus/config.py b/comps/dataprep/milvus/config.py index 474850798..06aa60975 100644 --- a/comps/dataprep/milvus/config.py +++ b/comps/dataprep/milvus/config.py @@ -4,10 +4,15 @@ import os # Embedding model -EMBED_MODEL = os.getenv("EMBED_MODEL", "maidalun1020/bce-embedding-base_v1") +TEI_EMBEDDING_MODEL = os.getenv("EMBED_MODEL", "maidalun1020/bce-embedding-base_v1") # Embedding endpoints -EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT", "") +TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT", "") # MILVUS configuration MILVUS_HOST = os.getenv("MILVUS", "localhost") MILVUS_PORT = int(os.getenv("MILVUS_PORT", 19530)) COLLECTION_NAME = os.getenv("COLLECTION_NAME", "rag_milvus") + +MOSEC_EMBEDDING_MODEL = "/root/bce-embedding-base_v1" +MOSEC_EMBEDDING_ENDPOINT = os.environ.get("MOSEC_EMBEDDING_ENDPOINT", "") +os.environ["OPENAI_API_BASE"] = MOSEC_EMBEDDING_ENDPOINT +os.environ["OPENAI_API_KEY"] = "Dummy key" diff --git a/comps/dataprep/milvus/docker/Dockerfile b/comps/dataprep/milvus/docker/Dockerfile index 4b142fc4e..b141579c4 100644 --- a/comps/dataprep/milvus/docker/Dockerfile +++ b/comps/dataprep/milvus/docker/Dockerfile @@ -12,6 +12,7 @@ RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missin build-essential \ libgl1-mesa-glx \ libjemalloc-dev \ + default-jre \ vim RUN useradd -m -s /bin/bash user && \ @@ -22,12 +23,17 @@ USER user COPY comps /home/user/comps -RUN pip install --no-cache-dir --upgrade pip && \ - if [ ${ARCH} = "cpu" ]; then pip install torch --index-url https://download.pytorch.org/whl/cpu; fi && \ +RUN pip install --no-cache-dir --upgrade pip setuptools && \ + if [ ${ARCH} = "cpu" ]; then pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \ pip install --no-cache-dir -r /home/user/comps/dataprep/milvus/requirements.txt ENV PYTHONPATH=$PYTHONPATH:/home/user +USER root + +RUN mkdir -p /home/user/comps/dataprep/milvus/uploaded_files && chown -R user /home/user/comps/dataprep/milvus/uploaded_files + +USER user WORKDIR /home/user/comps/dataprep/milvus ENTRYPOINT ["python", "prepare_doc_milvus.py"] diff --git a/comps/dataprep/milvus/prepare_doc_milvus.py b/comps/dataprep/milvus/prepare_doc_milvus.py index 1b19a2f36..c14ba0f5c 100644 --- a/comps/dataprep/milvus/prepare_doc_milvus.py +++ b/comps/dataprep/milvus/prepare_doc_milvus.py @@ -1,36 +1,74 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import json import os -import sys +import uuid +from pathlib import Path +from typing import List, Optional, Union -from config import COLLECTION_NAME, EMBED_MODEL, EMBEDDING_ENDPOINT, MILVUS_HOST, MILVUS_PORT +from config import ( + COLLECTION_NAME, + MILVUS_HOST, + MILVUS_PORT, + MOSEC_EMBEDDING_ENDPOINT, + MOSEC_EMBEDDING_MODEL, + TEI_EMBEDDING_ENDPOINT, + TEI_EMBEDDING_MODEL, +) +from fastapi import File, Form, HTTPException, UploadFile from langchain.text_splitter import RecursiveCharacterTextSplitter -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings, HuggingFaceHubEmbeddings +from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings from langchain_milvus.vectorstores import Milvus from langchain_text_splitters import HTMLHeaderTextSplitter +from langsmith import traceable +from pyspark import SparkConf, SparkContext -from comps.cores.mega.micro_service import opea_microservices, register_microservice -from comps.cores.proto.docarray import DocPath -from comps.cores.telemetry.opea_telemetry import opea_telemetry -from comps.dataprep.utils import document_loader, get_tables_result +from comps import DocPath, opea_microservices, register_microservice +from comps.dataprep.utils import document_loader, get_tables_result, parse_html -# current_script_path = os.path.dirname(os.path.abspath(__file__)) -# parent_dir = os.path.dirname(current_script_path) -# sys.path.append(parent_dir) -# from utils import document_loader +# workaround notes: cp comps/dataprep/utils.py ./milvus/utils.py +# from utils import document_loader, get_tables_result, parse_html +index_params = {"index_type": "FLAT", "metric_type": "IP", "params": {}} -@register_microservice( - name="opea_service@prepare_doc_milvus", - endpoint="/v1/dataprep", - host="0.0.0.0", - port=6010, - input_datatype=DocPath, - output_datatype=None, -) -# @opea_telemetry -def ingest_documents(doc_path: DocPath): +class MosecEmbeddings(OpenAIEmbeddings): + def _get_len_safe_embeddings( + self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None + ) -> List[List[float]]: + _chunk_size = chunk_size or self.chunk_size + batched_embeddings: List[List[float]] = [] + response = self.client.create(input=texts, **self._invocation_params) + if not isinstance(response, dict): + response = response.model_dump() + batched_embeddings.extend(r["embedding"] for r in response["data"]) + + _cached_empty_embedding: Optional[List[float]] = None + + def empty_embedding() -> List[float]: + nonlocal _cached_empty_embedding + if _cached_empty_embedding is None: + average_embedded = self.client.create(input="", **self._invocation_params) + if not isinstance(average_embedded, dict): + average_embedded = average_embedded.model_dump() + _cached_empty_embedding = average_embedded["data"][0]["embedding"] + return _cached_empty_embedding + + return [e if e is not None else empty_embedding() for e in batched_embeddings] + + +async def save_file_to_local_disk(save_path: str, file): + save_path = Path(save_path) + with save_path.open("wb") as fout: + try: + content = await file.read() + fout.write(content) + except Exception as e: + print(f"Write file failed. Exception: {e}") + raise HTTPException(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}") + + +def ingest_data_to_milvus(doc_path: DocPath): """Ingest document to Milvus.""" path = doc_path.path print(f"Parsing document {path}.") @@ -53,13 +91,20 @@ def ingest_documents(doc_path: DocPath): table_chunks = get_tables_result(path, doc_path.table_strategy) chunks = chunks + table_chunks print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") + # Create vectorstore - if EMBEDDING_ENDPOINT: + if MOSEC_EMBEDDING_ENDPOINT: + # create embeddings using MOSEC endpoint service + print(f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}") + embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL) + elif TEI_EMBEDDING_ENDPOINT: # create embeddings using TEI endpoint service - embedder = HuggingFaceHubEmbeddings(model=EMBEDDING_ENDPOINT) + print(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") + embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT) else: # create embeddings using local embedding model - embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) + print(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") + embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL) # Batch size batch_size = 32 @@ -67,15 +112,126 @@ def ingest_documents(doc_path: DocPath): for i in range(0, num_chunks, batch_size): batch_chunks = chunks[i : i + batch_size] batch_texts = batch_chunks - _ = Milvus.from_texts( texts=batch_texts, embedding=embedder, collection_name=COLLECTION_NAME, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}, + index_params=index_params, ) print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") + return True + + +def ingest_link_to_milvus(link_list: List[str]): + data_collection = parse_html(link_list) + + texts = [] + metadatas = [] + for data, meta in data_collection: + doc_id = str(uuid.uuid4()) + metadata = {"source": meta, "identify_id": doc_id} + texts.append(data) + metadatas.append(metadata) + + # Create vectorstore + if MOSEC_EMBEDDING_ENDPOINT: + # create embeddings using MOSEC endpoint service + print(f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}") + embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL) + elif TEI_EMBEDDING_ENDPOINT: + # create embeddings using TEI endpoint service + print(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") + embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT) + else: + # create embeddings using local embedding model + print(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") + embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL) + + _ = Milvus.from_texts( + texts=texts, + metadatas=metadatas, + embedding=embedder, + collection_name=COLLECTION_NAME, + connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}, + index_params=index_params, + ) + + +@register_microservice(name="opea_service@prepare_doc_milvus", endpoint="/v1/dataprep", host="0.0.0.0", port=6010) +@traceable(run_type="tool") +async def ingest_documents( + files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + link_list: Optional[str] = Form(None), + chunk_size: int = Form(1500), + chunk_overlap: int = Form(100), + process_table: bool = Form(False), + table_strategy: str = Form("fast"), +): + print(f"files:{files}") + print(f"link_list:{link_list}") + if files and link_list: + raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.") + + if files: + if not isinstance(files, list): + files = [files] + upload_folder = "./uploaded_files/" + if not os.path.exists(upload_folder): + Path(upload_folder).mkdir(parents=True, exist_ok=True) + uploaded_files = [] + for file in files: + save_path = upload_folder + file.filename + await save_file_to_local_disk(save_path, file) + ingest_data_to_milvus( + DocPath( + path=save_path, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + process_table=process_table, + table_strategy=table_strategy, + ) + ) + uploaded_files.append(save_path) + print(f"Successfully saved file {save_path}") + + def process_files_wrapper(files): + if not isinstance(files, list): + files = [files] + for file in files: + ingest_data_to_milvus(DocPath(path=file, chunk_size=chunk_size, chunk_overlap=chunk_overlap)) + + try: + # Create a SparkContext + conf = SparkConf().setAppName("Parallel-dataprep").setMaster("local[*]") + sc = SparkContext(conf=conf) + # Create an RDD with parallel processing + parallel_num = min(len(uploaded_files), os.cpu_count()) + rdd = sc.parallelize(uploaded_files, parallel_num) + # Perform a parallel operation + rdd_trans = rdd.map(process_files_wrapper) + rdd_trans.collect() + # Stop the SparkContext + sc.stop() + except: + # Stop the SparkContext + sc.stop() + return {"status": 200, "message": "Data preparation succeeded"} + + if link_list: + try: + link_list = json.loads(link_list) # Parse JSON string to list + if not isinstance(link_list, list): + raise HTTPException(status_code=400, detail="link_list should be a list.") + ingest_link_to_milvus(link_list) + print(f"Successfully saved link list {link_list}") + return {"status": 200, "message": "Data preparation succeeded"} + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") + + raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") + if __name__ == "__main__": opea_microservices["opea_service@prepare_doc_milvus"].start() diff --git a/comps/dataprep/milvus/requirements.txt b/comps/dataprep/milvus/requirements.txt index 9f71a69d4..c6f5f4fd2 100644 --- a/comps/dataprep/milvus/requirements.txt +++ b/comps/dataprep/milvus/requirements.txt @@ -4,25 +4,26 @@ docarray[full] docx2txt easyocr fastapi -frontend==0.0.3 huggingface_hub langchain langchain-community langchain-text-splitters langchain_milvus +langsmith markdown numpy +openai opentelemetry-api opentelemetry-exporter-otlp opentelemetry-sdk pandas Pillow prometheus-fastapi-instrumentator -pydantic==2.7.3 -pymilvus==2.4.3 -pymupdf==1.24.5 -python-docx==0.8.11 +pymupdf +pyspark +python-docx python-pptx sentence_transformers shortuuid +tiktoken unstructured[all-docs]==0.11.5 diff --git a/comps/retrievers/langchain/milvus/README.md b/comps/retrievers/langchain/milvus/README.md index a0c2cfeba..d1bbc80da 100644 --- a/comps/retrievers/langchain/milvus/README.md +++ b/comps/retrievers/langchain/milvus/README.md @@ -21,13 +21,13 @@ export https_proxy=${your_http_proxy} export MILVUS=${your_milvus_host_ip} export MILVUS_PORT=19530 export COLLECTION_NAME=${your_collection_name} -export TEI_EMBEDDING_ENDPOINT=${your_tei_endpoint} +export MOSEC_EMBEDDING_ENDPOINT=${your_emdding_endpoint} ``` ## Start Retriever Service ```bash -export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060" +export MOSEC_EMBEDDING_ENDPOINT="http://${your_ip}:6060" python langchain/retriever_redis.py ``` @@ -43,7 +43,7 @@ docker build -t opea/retriever-milvus:latest --build-arg https_proxy=$https_prox ## Run Docker with CLI ```bash -docker run -d --name="retriever-milvus-server" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e TEI_EMBEDDING_ENDPOINT=${your_tei_endpoint} -e MILVUS=${your_milvus_host_ip} opea/retriever-milvus:latest +docker run -d --name="retriever-milvus-server" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e MOSEC_EMBEDDING_ENDPOINT=${your_emdding_endpoint} -e MILVUS=${your_milvus_host_ip} opea/retriever-milvus:latest ``` # 🚀3. Consume Retriever Service diff --git a/comps/retrievers/langchain/milvus/config.py b/comps/retrievers/langchain/milvus/config.py index e41d08aaa..dcbe167b5 100644 --- a/comps/retrievers/langchain/milvus/config.py +++ b/comps/retrievers/langchain/milvus/config.py @@ -11,3 +11,9 @@ MILVUS_HOST = os.getenv("MILVUS", "localhost") MILVUS_PORT = int(os.getenv("MILVUS_PORT", 19530)) COLLECTION_NAME = os.getenv("COLLECTION_NAME", "rag_milvus") + + +MOSEC_EMBEDDING_ENDPOINT = os.environ.get("MOSEC_EMBEDDING_ENDPOINT", "") +os.environ["OPENAI_API_BASE"] = MOSEC_EMBEDDING_ENDPOINT +os.environ["OPENAI_API_KEY"] = "Dummy key" +MODEL_ID = "/root/bce-embedding-base_v1" diff --git a/comps/retrievers/langchain/milvus/requirements.txt b/comps/retrievers/langchain/milvus/requirements.txt index 8e2b11d20..8aa51936b 100644 --- a/comps/retrievers/langchain/milvus/requirements.txt +++ b/comps/retrievers/langchain/milvus/requirements.txt @@ -8,6 +8,7 @@ langchain langchain-community langchain_milvus numpy +openai opentelemetry-api opentelemetry-exporter-otlp opentelemetry-sdk @@ -20,3 +21,4 @@ pymupdf==1.24.5 python-docx==0.8.11 sentence_transformers shortuuid +tiktoken diff --git a/comps/retrievers/langchain/milvus/retriever_milvus.py b/comps/retrievers/langchain/milvus/retriever_milvus.py index 4f0b14b8a..ba7bb38db 100644 --- a/comps/retrievers/langchain/milvus/retriever_milvus.py +++ b/comps/retrievers/langchain/milvus/retriever_milvus.py @@ -4,9 +4,18 @@ import argparse import os import time +from typing import List, Optional -from config import COLLECTION_NAME, EMBED_ENDPOINT, EMBED_MODEL, MILVUS_HOST, MILVUS_PORT -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings +from config import ( + COLLECTION_NAME, + EMBED_ENDPOINT, + EMBED_MODEL, + MILVUS_HOST, + MILVUS_PORT, + MODEL_ID, + MOSEC_EMBEDDING_ENDPOINT, +) +from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings from langchain_milvus.vectorstores import Milvus from langsmith import traceable @@ -22,6 +31,31 @@ ) +class MosecEmbeddings(OpenAIEmbeddings): + def _get_len_safe_embeddings( + self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None + ) -> List[List[float]]: + _chunk_size = chunk_size or self.chunk_size + batched_embeddings: List[List[float]] = [] + response = self.client.create(input=texts, **self._invocation_params) + if not isinstance(response, dict): + response = response.model_dump() + batched_embeddings.extend(r["embedding"] for r in response["data"]) + + _cached_empty_embedding: Optional[List[float]] = None + + def empty_embedding() -> List[float]: + nonlocal _cached_empty_embedding + if _cached_empty_embedding is None: + average_embedded = self.client.create(input="", **self._invocation_params) + if not isinstance(average_embedded, dict): + average_embedded = average_embedded.model_dump() + _cached_empty_embedding = average_embedded["data"][0]["embedding"] + return _cached_empty_embedding + + return [e if e is not None else empty_embedding() for e in batched_embeddings] + + @register_microservice( name="opea_service@retriever_milvus", service_type=ServiceType.RETRIEVER, @@ -65,9 +99,10 @@ def retrieve(input: EmbedDoc768) -> SearchedDoc: if __name__ == "__main__": # Create vectorstore - if EMBED_ENDPOINT: + if MOSEC_EMBEDDING_ENDPOINT: # create embeddings using TEI endpoint service - embeddings = HuggingFaceHubEmbeddings(model=EMBED_ENDPOINT) + # embeddings = HuggingFaceHubEmbeddings(model=EMBED_ENDPOINT) + embeddings = MosecEmbeddings(model=MODEL_ID) else: # create embeddings using local embedding model embeddings = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)