Skip to content

Commit

Permalink
update milvus service for dataprep and retriever (opea-project#255)
Browse files Browse the repository at this point in the history
* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* DataPrep extract info from table in the docs (opea-project#146)

* Add microservice for table extraction

Signed-off-by: Liangyx2 <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix

Signed-off-by: Liangyx2 <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update license copyright

Signed-off-by: Liangyx2 <[email protected]>

* DataPrep extract info from table in the docs

Signed-off-by: Liangyx2 <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refine

Signed-off-by: Liangyx2 <[email protected]>

* refine

Signed-off-by: Liangyx2 <[email protected]>

* Update prepare_doc_redis.py

* Update prepare_doc_qdrant.py

* Update prepare_doc_milvus.py

---------

Signed-off-by: Liangyx2 <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: chen, suyue <[email protected]>
Co-authored-by: XuhuiRen <[email protected]>
Signed-off-by: jinjunzh <[email protected]>

* Remove sensitive info logs (opea-project#251)

Signed-off-by: Chendi Xue <[email protected]>
Signed-off-by: jinjunzh <[email protected]>

* Added support for extracting info from image in the docs (opea-project#120)

Signed-off-by: Ye, Xinyu <[email protected]>
Signed-off-by: jinjunzh <[email protected]>

* enhance statistics ut coverage (opea-project#252)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Signed-off-by: jinjunzh <[email protected]>

* bump version (opea-project#253)

Signed-off-by: chensuyue <[email protected]>
Signed-off-by: jinjunzh <[email protected]>

* support file upload feature for milvus service

Signed-off-by: jinjunzh <[email protected]>

* update embedding with MOSEC_EMBEDDING_ENDPOINT

Signed-off-by: jinjunzh <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

Signed-off-by: jinjunzh <[email protected]>

* support file upload feature for milvus service

Signed-off-by: jinjunzh <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

Signed-off-by: jinjunzh <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

Signed-off-by: jinjunzh <[email protected]>

* fix duplicate ci test (opea-project#256)

* fix duplicate test

Signed-off-by: chensuyue <[email protected]>

* for test only

Signed-off-by: chensuyue <[email protected]>

* Revert "for test only"

This reverts commit a7718aa.

---------

Signed-off-by: chensuyue <[email protected]>
Signed-off-by: jinjunzh <[email protected]>

* DataPrep extract info from table in the docs (opea-project#146)

* Add microservice for table extraction

Signed-off-by: Liangyx2 <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix

Signed-off-by: Liangyx2 <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update license copyright

Signed-off-by: Liangyx2 <[email protected]>

* DataPrep extract info from table in the docs

Signed-off-by: Liangyx2 <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refine

Signed-off-by: Liangyx2 <[email protected]>

* refine

Signed-off-by: Liangyx2 <[email protected]>

* Update prepare_doc_redis.py

* Update prepare_doc_qdrant.py

* Update prepare_doc_milvus.py

---------

Signed-off-by: Liangyx2 <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: chen, suyue <[email protected]>
Co-authored-by: XuhuiRen <[email protected]>

---------

Signed-off-by: Liangyx2 <[email protected]>
Signed-off-by: jinjunzh <[email protected]>
Signed-off-by: Chendi Xue <[email protected]>
Signed-off-by: Ye, Xinyu <[email protected]>
Signed-off-by: chensuyue <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Liangyx2 <[email protected]>
Co-authored-by: chen, suyue <[email protected]>
Co-authored-by: XuhuiRen <[email protected]>
Co-authored-by: Chendi.Xue <[email protected]>
Co-authored-by: XinyuYe-Intel <[email protected]>
Co-authored-by: Sihan Chen <[email protected]>
Signed-off-by: Yogesh Pandey <[email protected]>
  • Loading branch information
8 people authored and yogeshmpandey committed Jul 10, 2024
1 parent 3a1b136 commit 23e4cb6
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 47 deletions.
73 changes: 67 additions & 6 deletions comps/dataprep/milvus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export https_proxy=${your_http_proxy}
export MILVUS=${your_milvus_host_ip}
export MILVUS_PORT=19530
export COLLECTION_NAME=${your_collection_name}
export TEI_EMBEDDING_ENDPOINT=${your_tei_endpoint}
export MOSEC_EMBEDDING_ENDPOINT=${your_embedding_endpoint}
```

## Start Document Preparation Microservice for Milvus with Python Script
Expand All @@ -41,27 +41,88 @@ python prepare_doc_milvus.py

```bash
cd ../../../../
docker build -t opea/dataprep-milvus:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/milvus/docker/Dockerfile .
docker build -t opea/dataprep-milvus:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg no_proxy=$no_proxy -f comps/dataprep/milvus/docker/Dockerfile .
```

## Run Docker with CLI

```bash
docker run -d --name="dataprep-milvus-server" -p 6010:6010 --ipc=host -v /your_document_path/:/home/user/doc -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e TEI_EMBEDDING_ENDPOINT=${your_tei_endpoint} -e MILVUS=${your_milvus_host_ip} opea/dataprep-milvus:latest
docker run -d --name="dataprep-milvus-server" -p 6010:6010 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy -e MOSEC_EMBEDDING_ENDPOINT=${your_embedding_endpoint} -e MILVUS=${your_milvus_host_ip} opea/dataprep-milvus:latest
```

# Invoke Microservice

Once document preparation microservice for Qdrant is started, user can use below command to invoke the microservice to convert the document to embedding and save to the database.
Once document preparation microservice for Milvus is started, user can use below command to invoke the microservice to convert the document to embedding and save to the database.

Make sure the file path after `files=@` is correct.

- Single file upload

```bash
curl -X POST -H "Content-Type: application/json" -d '{"path":"/home/user/doc/your_document_name"}' http://localhost:6010/v1/dataprep
curl -X POST \
-H "Content-Type: multipart/form-data" \
-F "files=@./file.pdf" \
http://localhost:6010/v1/dataprep
```

You can specify chunk_size and chunk_size by the following commands.

```bash
curl -X POST -H "Content-Type: application/json" -d '{"path":"/home/user/doc/your_document_name","chunk_size":1500,"chunk_overlap":100}' http://localhost:6010/v1/dataprep
curl -X POST \
-H "Content-Type: multipart/form-data" \
-F "files=@./file.pdf" \
-F "chunk_size=1500" \
-F "chunk_overlap=100" \
http://localhost:6010/v1/dataprep
```

- Multiple file upload

```bash
curl -X POST \
-H "Content-Type: multipart/form-data" \
-F "files=@./file1.pdf" \
-F "files=@./file2.pdf" \
-F "files=@./file3.pdf" \
http://localhost:6010/v1/dataprep
```

- Links upload (not supported for llama_index now)

```bash
curl -X POST \
-F 'link_list=["https://www.ces.tech/"]' \
http://localhost:6010/v1/dataprep
```

or

```python
import requests
import json

proxies = {"http": ""}
url = "http://localhost:6010/v1/dataprep"
urls = [
"https://towardsdatascience.com/no-gpu-no-party-fine-tune-bert-for-sentiment-analysis-with-vertex-ai-custom-jobs-d8fc410e908b?source=rss----7f60cf5620c9---4"
]
payload = {"link_list": json.dumps(urls)}

try:
resp = requests.post(url=url, data=payload, proxies=proxies)
print(resp.text)
resp.raise_for_status() # Raise an exception for unsuccessful HTTP status codes
print("Request successful!")
except requests.exceptions.RequestException as e:
print("An error occurred:", e)
```

We support table extraction from pdf documents. You can specify process_table and table_strategy by the following commands. "table_strategy" refers to the strategies to understand tables for table retrieval. As the setting progresses from "fast" to "hq" to "llm," the focus shifts towards deeper table understanding at the expense of processing speed. The default strategy is "fast".

Note: If you specify "table_strategy=llm", You should first start TGI Service, please refer to 1.2.1, 1.3.1 in https://github.com/opea-project/GenAIComps/tree/main/comps/llms/README.md, and then `export TGI_LLM_ENDPOINT="http://${your_ip}:8008"`.

```bash
curl -X POST -H "Content-Type: application/json" -d '{"path":"/home/user/doc/your_document_name","process_table":true,"table_strategy":"hq"}' http://localhost:6010/v1/dataprep
```

We support table extraction from pdf documents. You can specify process_table and table_strategy by the following commands. "table_strategy" refers to the strategies to understand tables for table retrieval. As the setting progresses from "fast" to "hq" to "llm," the focus shifts towards deeper table understanding at the expense of processing speed. The default strategy is "fast".
Expand Down
9 changes: 7 additions & 2 deletions comps/dataprep/milvus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
import os

# Embedding model
EMBED_MODEL = os.getenv("EMBED_MODEL", "maidalun1020/bce-embedding-base_v1")
TEI_EMBEDDING_MODEL = os.getenv("EMBED_MODEL", "maidalun1020/bce-embedding-base_v1")
# Embedding endpoints
EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT", "")
TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT", "")
# MILVUS configuration
MILVUS_HOST = os.getenv("MILVUS", "localhost")
MILVUS_PORT = int(os.getenv("MILVUS_PORT", 19530))
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "rag_milvus")

MOSEC_EMBEDDING_MODEL = "/root/bce-embedding-base_v1"
MOSEC_EMBEDDING_ENDPOINT = os.environ.get("MOSEC_EMBEDDING_ENDPOINT", "")
os.environ["OPENAI_API_BASE"] = MOSEC_EMBEDDING_ENDPOINT
os.environ["OPENAI_API_KEY"] = "Dummy key"
10 changes: 8 additions & 2 deletions comps/dataprep/milvus/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missin
build-essential \
libgl1-mesa-glx \
libjemalloc-dev \
default-jre \
vim

RUN useradd -m -s /bin/bash user && \
Expand All @@ -22,12 +23,17 @@ USER user

COPY comps /home/user/comps

RUN pip install --no-cache-dir --upgrade pip && \
if [ ${ARCH} = "cpu" ]; then pip install torch --index-url https://download.pytorch.org/whl/cpu; fi && \
RUN pip install --no-cache-dir --upgrade pip setuptools && \
if [ ${ARCH} = "cpu" ]; then pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \
pip install --no-cache-dir -r /home/user/comps/dataprep/milvus/requirements.txt

ENV PYTHONPATH=$PYTHONPATH:/home/user

USER root

RUN mkdir -p /home/user/comps/dataprep/milvus/uploaded_files && chown -R user /home/user/comps/dataprep/milvus/uploaded_files

USER user
WORKDIR /home/user/comps/dataprep/milvus

ENTRYPOINT ["python", "prepare_doc_milvus.py"]
Expand Down
206 changes: 181 additions & 25 deletions comps/dataprep/milvus/prepare_doc_milvus.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,74 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import json
import os
import sys
import uuid
from pathlib import Path
from typing import List, Optional, Union

from config import COLLECTION_NAME, EMBED_MODEL, EMBEDDING_ENDPOINT, MILVUS_HOST, MILVUS_PORT
from config import (
COLLECTION_NAME,
MILVUS_HOST,
MILVUS_PORT,
MOSEC_EMBEDDING_ENDPOINT,
MOSEC_EMBEDDING_MODEL,
TEI_EMBEDDING_ENDPOINT,
TEI_EMBEDDING_MODEL,
)
from fastapi import File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings, HuggingFaceHubEmbeddings
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings
from langchain_milvus.vectorstores import Milvus
from langchain_text_splitters import HTMLHeaderTextSplitter
from langsmith import traceable
from pyspark import SparkConf, SparkContext

from comps.cores.mega.micro_service import opea_microservices, register_microservice
from comps.cores.proto.docarray import DocPath
from comps.cores.telemetry.opea_telemetry import opea_telemetry
from comps.dataprep.utils import document_loader, get_tables_result
from comps import DocPath, opea_microservices, register_microservice
from comps.dataprep.utils import document_loader, get_tables_result, parse_html

# current_script_path = os.path.dirname(os.path.abspath(__file__))
# parent_dir = os.path.dirname(current_script_path)
# sys.path.append(parent_dir)
# from utils import document_loader
# workaround notes: cp comps/dataprep/utils.py ./milvus/utils.py
# from utils import document_loader, get_tables_result, parse_html
index_params = {"index_type": "FLAT", "metric_type": "IP", "params": {}}


@register_microservice(
name="opea_service@prepare_doc_milvus",
endpoint="/v1/dataprep",
host="0.0.0.0",
port=6010,
input_datatype=DocPath,
output_datatype=None,
)
# @opea_telemetry
def ingest_documents(doc_path: DocPath):
class MosecEmbeddings(OpenAIEmbeddings):
def _get_len_safe_embeddings(
self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None
) -> List[List[float]]:
_chunk_size = chunk_size or self.chunk_size
batched_embeddings: List[List[float]] = []
response = self.client.create(input=texts, **self._invocation_params)
if not isinstance(response, dict):
response = response.model_dump()
batched_embeddings.extend(r["embedding"] for r in response["data"])

_cached_empty_embedding: Optional[List[float]] = None

def empty_embedding() -> List[float]:
nonlocal _cached_empty_embedding
if _cached_empty_embedding is None:
average_embedded = self.client.create(input="", **self._invocation_params)
if not isinstance(average_embedded, dict):
average_embedded = average_embedded.model_dump()
_cached_empty_embedding = average_embedded["data"][0]["embedding"]
return _cached_empty_embedding

return [e if e is not None else empty_embedding() for e in batched_embeddings]


async def save_file_to_local_disk(save_path: str, file):
save_path = Path(save_path)
with save_path.open("wb") as fout:
try:
content = await file.read()
fout.write(content)
except Exception as e:
print(f"Write file failed. Exception: {e}")
raise HTTPException(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}")


def ingest_data_to_milvus(doc_path: DocPath):
"""Ingest document to Milvus."""
path = doc_path.path
print(f"Parsing document {path}.")
Expand All @@ -53,29 +91,147 @@ def ingest_documents(doc_path: DocPath):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

# Create vectorstore
if EMBEDDING_ENDPOINT:
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
print(f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}")
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
embedder = HuggingFaceHubEmbeddings(model=EMBEDDING_ENDPOINT)
print(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
print(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)

# Batch size
batch_size = 32
num_chunks = len(chunks)
for i in range(0, num_chunks, batch_size):
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks

_ = Milvus.from_texts(
texts=batch_texts,
embedding=embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params=index_params,
)
print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")

return True


def ingest_link_to_milvus(link_list: List[str]):
data_collection = parse_html(link_list)

texts = []
metadatas = []
for data, meta in data_collection:
doc_id = str(uuid.uuid4())
metadata = {"source": meta, "identify_id": doc_id}
texts.append(data)
metadatas.append(metadata)

# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
print(f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}")
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
print(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
print(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)

_ = Milvus.from_texts(
texts=texts,
metadatas=metadatas,
embedding=embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params=index_params,
)


@register_microservice(name="opea_service@prepare_doc_milvus", endpoint="/v1/dataprep", host="0.0.0.0", port=6010)
@traceable(run_type="tool")
async def ingest_documents(
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
):
print(f"files:{files}")
print(f"link_list:{link_list}")
if files and link_list:
raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.")

if files:
if not isinstance(files, list):
files = [files]
upload_folder = "./uploaded_files/"
if not os.path.exists(upload_folder):
Path(upload_folder).mkdir(parents=True, exist_ok=True)
uploaded_files = []
for file in files:
save_path = upload_folder + file.filename
await save_file_to_local_disk(save_path, file)
ingest_data_to_milvus(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
uploaded_files.append(save_path)
print(f"Successfully saved file {save_path}")

def process_files_wrapper(files):
if not isinstance(files, list):
files = [files]
for file in files:
ingest_data_to_milvus(DocPath(path=file, chunk_size=chunk_size, chunk_overlap=chunk_overlap))

try:
# Create a SparkContext
conf = SparkConf().setAppName("Parallel-dataprep").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create an RDD with parallel processing
parallel_num = min(len(uploaded_files), os.cpu_count())
rdd = sc.parallelize(uploaded_files, parallel_num)
# Perform a parallel operation
rdd_trans = rdd.map(process_files_wrapper)
rdd_trans.collect()
# Stop the SparkContext
sc.stop()
except:
# Stop the SparkContext
sc.stop()
return {"status": 200, "message": "Data preparation succeeded"}

if link_list:
try:
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
ingest_link_to_milvus(link_list)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.")

raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")


if __name__ == "__main__":
opea_microservices["opea_service@prepare_doc_milvus"].start()
Loading

0 comments on commit 23e4cb6

Please sign in to comment.