Skip to content

Commit

Permalink
Support delete for Milvus vector db in Dataprep (#368)
Browse files Browse the repository at this point in the history
* support delete for milvus

Signed-off-by: letonghan <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: letonghan <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
letonghan and pre-commit-ci[bot] authored Jul 31, 2024
1 parent 55b457b commit 767a14c
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 64 deletions.
233 changes: 170 additions & 63 deletions comps/dataprep/milvus/prepare_doc_milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,33 @@
TEI_EMBEDDING_ENDPOINT,
TEI_EMBEDDING_MODEL,
)
from fastapi import File, Form, HTTPException, UploadFile
from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_milvus.vectorstores import Milvus
from langchain_text_splitters import HTMLHeaderTextSplitter
from langsmith import traceable
from pyspark import SparkConf, SparkContext

from comps import DocPath, opea_microservices, register_microservice
from comps.dataprep.utils import document_loader, get_separators, get_tables_result, parse_html
from comps.dataprep.utils import (
create_upload_folder,
document_loader,
encode_filename,
get_file_structure,
get_separators,
get_tables_result,
parse_html,
remove_folder_with_ignore,
save_content_to_local_disk,
)

# workaround notes: cp comps/dataprep/utils.py ./milvus/utils.py
# from utils import document_loader, get_tables_result, parse_html
index_params = {"index_type": "FLAT", "metric_type": "IP", "params": {}}
partition_field_name = "filename"
upload_folder = "./uploaded_files/"


class MosecEmbeddings(OpenAIEmbeddings):
Expand Down Expand Up @@ -57,21 +70,11 @@ def empty_embedding() -> List[float]:
return [e if e is not None else empty_embedding() for e in batched_embeddings]


async def save_file_to_local_disk(save_path: str, file):
save_path = Path(save_path)
with save_path.open("wb") as fout:
try:
content = await file.read()
fout.write(content)
except Exception as e:
print(f"Write file failed. Exception: {e}")
raise HTTPException(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}")


def ingest_data_to_milvus(doc_path: DocPath):
"""Ingest document to Milvus."""
path = doc_path.path
print(f"Parsing document {path}.")
file_name = path.split("/")[-1]
print(f"[ ingest data ] Parsing document {path}, file name: {file_name}.")

if path.endswith(".html"):
headers_to_split_on = [
Expand All @@ -90,51 +93,47 @@ def ingest_data_to_milvus(doc_path: DocPath):
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")
print("[ ingest data ] Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
print(f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}")
print(
f"[ ingest data ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT}, MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
)
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
print(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
print(f"[ ingest data ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
print(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
print(f"[ ingest data ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)

# Batch size
batch_size = 32
num_chunks = len(chunks)
for i in range(0, num_chunks, batch_size):
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks
_ = Milvus.from_texts(
texts=batch_texts,
embedding=embedder,
# insert documents to Milvus
insert_docs = []
for chunk in chunks:
insert_docs.append(Document(page_content=chunk, metadata={partition_field_name: file_name}))

try:
_ = Milvus.from_documents(
insert_docs,
embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params=index_params,
partition_key_field=partition_field_name,
)
print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")

return True
except Exception as e:
print(f"[ ingest data ] fail to ingest data into Milvus. error: {e}")
return False

print(f"[ ingest data ] Docs ingested from {path} to Milvus collection {COLLECTION_NAME}.")

def ingest_link_to_milvus(link_list: List[str]):
data_collection = parse_html(link_list)
return True

texts = []
metadatas = []
for data, meta in data_collection:
doc_id = str(uuid.uuid4())
metadata = {"source": meta, "identify_id": doc_id}
texts.append(data)
metadatas.append(metadata)

async def ingest_link_to_milvus(link_list: List[str]):
# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
Expand All @@ -149,14 +148,22 @@ def ingest_link_to_milvus(link_list: List[str]):
print(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)

_ = Milvus.from_texts(
texts=texts,
metadatas=metadatas,
embedding=embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params=index_params,
)
for link in link_list:
content = parse_html([link])[0][0]
print(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
print(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)

document = Document(page_content=content, metadata={partition_field_name: encoded_link + ".txt"})
_ = Milvus.from_documents(
document,
embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
partition_key_field=partition_field_name,
)


@register_microservice(name="opea_service@prepare_doc_milvus", endpoint="/v1/dataprep", host="0.0.0.0", port=6010)
Expand All @@ -177,30 +184,26 @@ async def ingest_documents(
if files:
if not isinstance(files, list):
files = [files]
upload_folder = "./uploaded_files/"
if not os.path.exists(upload_folder):
Path(upload_folder).mkdir(parents=True, exist_ok=True)
uploaded_files = []
for file in files:
save_path = upload_folder + file.filename
await save_file_to_local_disk(save_path, file)
ingest_data_to_milvus(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
await save_content_to_local_disk(save_path, file)
uploaded_files.append(save_path)
print(f"Successfully saved file {save_path}")

def process_files_wrapper(files):
if not isinstance(files, list):
files = [files]
for file in files:
ingest_data_to_milvus(DocPath(path=file, chunk_size=chunk_size, chunk_overlap=chunk_overlap))
assert ingest_data_to_milvus(
DocPath(
path=file,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)

try:
# Create a SparkContext
Expand All @@ -224,7 +227,7 @@ def process_files_wrapper(files):
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
ingest_link_to_milvus(link_list)
await ingest_link_to_milvus(link_list)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
Expand All @@ -233,5 +236,109 @@ def process_files_wrapper(files):
raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")


@register_microservice(
name="opea_service@prepare_doc_milvus_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6011
)
@traceable(run_type="tool")
async def rag_get_file_structure():
print("[ dataprep - get file ] start to get file structure")

if not Path(upload_folder).exists():
print("No file uploaded, return empty list.")
return []

file_content = get_file_structure(upload_folder)
return file_content


def delete_all_data(my_milvus):
print("[ delete ] deleting all data in milvus")
my_milvus.delete(expr="pk >= 0")
my_milvus.col.flush()
print("[ delete ] delete success: all data")


def delete_by_partition_field(my_milvus, partition_field):
print(f"[ delete ] deleting {partition_field_name} {partition_field}")
pks = my_milvus.get_pks(f'{partition_field_name} == "{partition_field}"')
print(f"[ delete ] target pks: {pks}")
res = my_milvus.delete(pks)
my_milvus.col.flush()
print(f"[ delete ] delete success: {res}")


@register_microservice(
name="opea_service@prepare_doc_milvus_del", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6012
)
@traceable(run_type="tool")
async def delete_single_file(file_path: str = Body(..., embed=True)):
"""Delete file according to `file_path`.
`file_path`:
- file/link path (e.g. /path/to/file.txt)
- "all": delete all files uploaded
"""
# create embedder obj
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
print(
f"[ dataprep - del ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
)
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
print(f"[ dataprep - del ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
print(f"[ dataprep - del ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)

# define Milvus obj
my_milvus = Milvus(
embedding_function=embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params=index_params,
auto_id=True,
)

# delete all uploaded files
if file_path == "all":
print("[ dataprep - del ] deleting all files")
delete_all_data(my_milvus)
remove_folder_with_ignore(upload_folder)
print("[ dataprep - del ] successfully delete all files.")
create_upload_folder(upload_folder)
return {"status": True}

encode_file_name = encode_filename(file_path)
delete_path = Path(upload_folder + "/" + encode_file_name)
print(f"[dataprep - del] delete_path: {delete_path}")

# partially delete files
if delete_path.exists():
# file
if delete_path.is_file():
print(f"[dataprep - del] deleting file {encode_file_name}")
try:
delete_by_partition_field(my_milvus, encode_file_name)
delete_path.unlink()
print(f"[dataprep - del] file {encode_file_name} deleted")
return {"status": True}
except Exception as e:
print(f"[dataprep - del] fail to delete file {delete_path}: {e}")
return {"status": False}
# folder
else:
print("[dataprep - del] delete folder is not supported for now.")
return {"status": False}
else:
raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.")


if __name__ == "__main__":
create_upload_folder(upload_folder)
opea_microservices["opea_service@prepare_doc_milvus"].start()
opea_microservices["opea_service@prepare_doc_milvus_file"].start()
opea_microservices["opea_service@prepare_doc_milvus_del"].start()
Loading

0 comments on commit 767a14c

Please sign in to comment.