Skip to content

Commit

Permalink
Fix Dataprep Potential Error in get_file (#540)
Browse files Browse the repository at this point in the history
* fix get file error & refine logs

Signed-off-by: letonghan <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: letonghan <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
letonghan and pre-commit-ci[bot] authored Aug 21, 2024
1 parent 945b9e4 commit 04ff8bf
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions comps/dataprep/redis/langchain/prepare_doc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def ingest_data_to_redis(doc_path: DocPath):
"""Ingest document to Redis."""
path = doc_path.path
if logflag:
logger.info(f"Parsing document {path}.")
logger.info(f"[ ingest data ] Parsing document {path}.")

if path.endswith(".html"):
headers_to_split_on = [
Expand All @@ -195,13 +195,15 @@ def ingest_data_to_redis(doc_path: DocPath):
)

content = document_loader(path)
if logflag:
logger.info("[ ingest data ] file content loaded")

chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
if logflag:
logger.info("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")
logger.info(f"[ ingest data ] Done preprocessing. Created {len(chunks)} chunks of the original pdf")

file_name = doc_path.path.split("/")[-1]
return ingest_chunks_to_redis(file_name, chunks)
Expand All @@ -217,8 +219,8 @@ async def ingest_documents(
table_strategy: str = Form("fast"),
):
if logflag:
logger.info(f"files:{files}")
logger.info(f"link_list:{link_list}")
logger.info(f"[ upload ] files:{files}")
logger.info(f"[ upload ] link_list:{link_list}")

r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)
Expand All @@ -231,15 +233,17 @@ async def ingest_documents(
for file in files:
encode_file = encode_filename(file.filename)
doc_id = "file:" + encode_file
if logflag:
logger.info(f"[ upload ] processing file {doc_id}")

# check whether the file already exists
key_ids = None
try:
key_ids = search_by_id(client, doc_id).key_ids
if logflag:
logger.info(f"[ upload file ] File {file.filename} already exists.")
logger.info(f"[ upload ] File {file.filename} already exists.")
except Exception as e:
logger.info(f"[ upload file ] File {file.filename} does not exist.")
logger.info(f"[ upload ] File {file.filename} does not exist.")
if key_ids:
raise HTTPException(
status_code=400, detail=f"Uploaded file {file.filename} already exists. Please change file name."
Expand All @@ -258,7 +262,7 @@ async def ingest_documents(
)
uploaded_files.append(save_path)
if logflag:
logger.info(f"Successfully saved file {save_path}")
logger.info(f"[ upload ] Successfully saved file {save_path}")

# def process_files_wrapper(files):
# if not isinstance(files, list):
Expand Down Expand Up @@ -293,15 +297,17 @@ async def ingest_documents(
for link in link_list:
encoded_link = encode_filename(link)
doc_id = "file:" + encoded_link + ".txt"
if logflag:
logger.info(f"[ upload ] processing link {doc_id}")

# check whether the link file already exists
key_ids = None
try:
key_ids = search_by_id(client, doc_id).key_ids
if logflag:
logger.info(f"[ upload file ] Link {link} already exists.")
logger.info(f"[ upload ] Link {link} already exists.")
except Exception as e:
logger.info(f"[ upload file ] Link {link} does not exist. Keep storing.")
logger.info(f"[ upload ] Link {link} does not exist. Keep storing.")
if key_ids:
raise HTTPException(
status_code=400, detail=f"Uploaded link {link} already exists. Please change another link."
Expand All @@ -320,8 +326,7 @@ async def ingest_documents(
)
)
if logflag:
logger.info(f"Successfully saved link list {link_list}")
logger.info({"status": 200, "message": "Data preparation succeeded"})
logger.info(f"[ upload ] Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}

raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")
Expand All @@ -332,12 +337,20 @@ async def ingest_documents(
)
async def rag_get_file_structure():
if logflag:
logger.info("[ dataprep - get file ] start to get file structure")
logger.info("[ get ] start to get file structure")

# define redis client
r = redis.Redis(connection_pool=redis_pool)
offset = 0
file_list = []

# check index existence
res = check_index_existance(r.ft(KEY_INDEX_NAME))
if not res:
if logflag:
logger.info(f"[ get ] index {KEY_INDEX_NAME} does not exist")
return file_list

while True:
response = r.execute_command("FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE)
# no doc retrieved
Expand All @@ -349,7 +362,7 @@ async def rag_get_file_structure():
if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE:
break
if logflag:
logger.info(file_list)
logger.info(f"[get] final file_list: {file_list}")
return file_list


Expand All @@ -372,49 +385,49 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
# delete all uploaded files
if file_path == "all":
if logflag:
logger.info("[dataprep - del] delete all files")
logger.info("[ delete ] delete all files")

# drop index KEY_INDEX_NAME
if check_index_existance(client):
try:
assert drop_index(index_name=KEY_INDEX_NAME)
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] {e}. Fail to drop index {KEY_INDEX_NAME}.")
logger.info(f"[ delete ] {e}. Fail to drop index {KEY_INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"Fail to drop index {KEY_INDEX_NAME}.")
else:
logger.info(f"[dataprep - del] Index {KEY_INDEX_NAME} does not exits.")
logger.info(f"[ delete ] Index {KEY_INDEX_NAME} does not exits.")

# drop index INDEX_NAME
if check_index_existance(client2):
try:
assert drop_index(index_name=INDEX_NAME)
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] {e}. Fail to drop index {INDEX_NAME}.")
logger.info(f"[ delete ] {e}. Fail to drop index {INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.")
else:
if logflag:
logger.info(f"[dataprep - del] Index {INDEX_NAME} does not exits.")
logger.info(f"[ delete ] Index {INDEX_NAME} does not exits.")

# delete files on local disk
try:
remove_folder_with_ignore(upload_folder)
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] {e}. Fail to delete {upload_folder}.")
logger.info(f"[ delete ] {e}. Fail to delete {upload_folder}.")
raise HTTPException(status_code=500, detail=f"Fail to delete {upload_folder}.")

if logflag:
logger.info("[dataprep - del] successfully delete all files.")
logger.info("[ delete ] successfully delete all files.")
create_upload_folder(upload_folder)
if logflag:
logger.info({"status": True})
return {"status": True}

delete_path = Path(upload_folder + "/" + encode_filename(file_path))
if logflag:
logger.info(f"[dataprep - del] delete_path: {delete_path}")
logger.info(f"[ delete ] delete_path: {delete_path}")

# partially delete files
if delete_path.exists():
Expand All @@ -425,7 +438,7 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
key_ids = search_by_id(client, doc_id).key_ids
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] {e}, File {file_path} does not exists.")
logger.info(f"[ delete ] {e}, File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path."
)
Expand All @@ -438,7 +451,7 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
assert delete_by_id(client, doc_id)
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
logger.info(f"[ delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.")

# delete file content in db INDEX_NAME
Expand All @@ -448,7 +461,7 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
content = search_by_id(client2, file_id).content
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] {e}. File {file_path} does not exists.")
logger.info(f"[ delete ] {e}. File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path."
)
Expand All @@ -458,7 +471,7 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
assert delete_by_id(client2, file_id)
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] {e}. File {file_path} delete failed for db {INDEX_NAME}")
logger.info(f"[ delete ] {e}. File {file_path} delete failed for db {INDEX_NAME}")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.")

# delete file on local disk
Expand All @@ -470,7 +483,7 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
# delete folder
else:
if logflag:
logger.info(f"[dataprep - del] Delete folder {file_path} is not supported for now.")
logger.info(f"[ delete ] Delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")
else:
raise HTTPException(status_code=404, detail=f"File {file_path} not found. Please check file_path.")
Expand Down

0 comments on commit 04ff8bf

Please sign in to comment.