Skip to content

Commit

Permalink
1. doc_manager only set deleting. Document will de deleted in worker
Browse files Browse the repository at this point in the history
   thread
2. add delete_obsolete_files in doclistmaager, it will find
   status==delting and reference_count==0 file to delete
3. Add reference_count, when adding to kbgroup document reference_count
   will be updated to +1, when deleting from kbgroup it will be updated
   to -1
4. Add KBOperationLogs, currently only deleting file operation will be
   recoreder
5. Modifiy worker thread. when group is default, it will check to delete
   obsolete file
  • Loading branch information
zhangyongchao committed Dec 19, 2024
1 parent d932ad2 commit 7640161
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 161 deletions.
13 changes: 4 additions & 9 deletions lazyllm/tools/rag/doc_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .smart_embedding_index import SmartEmbeddingIndex
from .doc_node import DocNode
from .data_loaders import DirectoryReader
from .utils import DocListManager, gen_docid_wo_dlm
from .utils import DocListManager, gen_docid
from .global_metadata import GlobalMetadataDesc, RAG_DOC_ID, RAG_DOC_PATH
import threading
import time
Expand Down Expand Up @@ -83,10 +83,7 @@ def _lazy_init(self) -> None:
root_nodes = self._reader.load_data(paths)
for idx, node in enumerate(root_nodes):
node.global_metadata.update(metadatas[idx].copy() if metadatas else {})
if self._dlm:
node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else self._dlm.get_active_docid(paths[idx])
else:
node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid_wo_dlm(paths[idx])
node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(paths[idx])
node.global_metadata[RAG_DOC_PATH] = paths[idx]
self.store.update_nodes(root_nodes)
if self._dlm: self._dlm.update_kb_group_file_status(
Expand Down Expand Up @@ -222,6 +219,7 @@ def worker(self):

if self._kb_group_name == DocListManager.DEFAULT_GROUP_NAME:
self._dlm.init_tables()
self._dlm.delete_obsolete_files()
ids, files, metadatas = self._list_files(status=DocListManager.Status.waiting,
upload_status=DocListManager.Status.success)
if files:
Expand Down Expand Up @@ -252,10 +250,7 @@ def _add_files(self, input_files: List[str], ids: Optional[List[str]] = None,
root_nodes = self._reader.load_data(input_files)
for idx, node in enumerate(root_nodes):
node.global_metadata = metadatas[idx].copy() if metadatas else {}
if self._dlm:
node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else self._dlm.get_active_docid(input_files[idx])
else:
node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid_wo_dlm(input_files[idx])
node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(input_files[idx])
node.global_metadata[RAG_DOC_PATH] = input_files[idx]
temp_store = self._create_store({"type": "map"})
temp_store.update_nodes(root_nodes)
Expand Down
40 changes: 11 additions & 29 deletions lazyllm/tools/rag/doc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lazyllm
from lazyllm import FastapiApp as app
from .utils import DocListManager, BaseResponse
from .doc_impl import gen_docid
from .global_metadata import RAG_DOC_ID, RAG_DOC_PATH


Expand Down Expand Up @@ -73,11 +74,8 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa
file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files]
file_paths = [gen_unique_filepaths(ele) for ele in file_paths]
print("FILE PATHS", file_paths)
ids, file_paths = self._manager.add_files(
file_paths, metadatas=metadatas, status=DocListManager.Status.working
)
print("AFTER self._manager.add_files")
assert len(files) == len(ids), "len(files) uploaded vs len(ids) recorede"
ids = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working)
assert len(files) == len(ids), "len(files) uploaded vs len(ids) recored"
results = []
for file, path in zip(files, file_paths):
if os.path.exists(path):
Expand All @@ -96,9 +94,7 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa
except Exception as e:
lazyllm.LOG.error(f'writing file [{path}] to disk failed: [{e}]')
raise e
print("in LOOP, path", path)
file_id = self._manager.get_active_docid(path)
print("after get_active_docid, file_id:", file_id)
file_id = gen_docid(path)
self._manager.update_file_status([file_id], status=DocListManager.Status.success)
results.append('Success')

Expand Down Expand Up @@ -129,7 +125,8 @@ def add_files(self, files: List[str] = Body(...),
exist_id = exists_files_info.get(file, None)
if exist_id:
update_kws = dict(fileid=exist_id, status=DocListManager.Status.success)
if metadatas: update_kws["metadata"] = json.dumps(metadatas[idx])
if metadatas:
update_kws["meta"] = json.dumps(metadatas[idx])
self._manager.update_file_message(**update_kws)
exist_ids.append(exist_id)
id_mapping[file] = exist_id
Expand All @@ -140,9 +137,7 @@ def add_files(self, files: List[str] = Body(...),
else:
id_mapping[file] = None

new_ids, _ = self._manager.add_files(
new_files, metadatas=new_metadatas, status=DocListManager.Status.success
)
new_ids = self._manager.add_files(new_files, metadatas=new_metadatas, status=DocListManager.Status.success)
if group_name:
self._manager.add_files_to_kb_group(new_ids + exist_ids, group=group_name)

Expand Down Expand Up @@ -204,24 +199,11 @@ def delete_files(self, request: FileGroupRequest):
if request.group_name:
return self.delete_files_from_group(request)
else:
safe_delete_ids = set(self._manager.get_safe_delete_files())
print("Input file ids:", request.file_ids, ", safe delete ids:", safe_delete_ids)
tmp = request.file_ids
request.file_ids = []
for file_id in tmp:
if file_id in safe_delete_ids:
request.file_ids.append(file_id)
print("New fildids after safe delete:", request.file_ids)
self._manager.update_kb_group_file_status(
file_ids=request.file_ids, status=DocListManager.Status.deleting)
docs = self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleting)

for doc in docs:
print("DELETE doc:", doc)
if os.path.exists(path := doc[1]):
document_list = self._manager.delete_files(request.file_ids)
for doc in document_list:
print("DELETE doc:", doc.path)
if os.path.exists(path := doc.path):
os.remove(path)

self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleted)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)
Expand Down
Loading

0 comments on commit 7640161

Please sign in to comment.