Skip to content

Commit

Permalink
fix doc-manager add file bugs (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
wzh1994 authored Oct 30, 2024
1 parent c9d7b3d commit 549db38
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 24 deletions.
4 changes: 2 additions & 2 deletions lazyllm/tools/rag/doc_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ def worker(self):
continue
time.sleep(10)

def _list_files(self, status: str = DocListManager.Status.all, upload_status: str = DocListManager.Status.all
) -> Tuple[List[str], List[str]]:
def _list_files(self, status: Union[str, List[str]] = DocListManager.Status.all,
upload_status: Union[str, List[str]] = DocListManager.Status.all) -> Tuple[List[str], List[str]]:
if self._doc_files: return None, self._doc_files
ids, paths = [], []
for row in self._dlm.list_kb_group_files(group=self._kb_group_name, status=status,
Expand Down
8 changes: 7 additions & 1 deletion lazyllm/tools/rag/doc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,13 @@ def delete_files(self, request: FileGroupRequest):
else:
self._manager.update_kb_group_file_status(
file_ids=request.file_ids, status=DocListManager.Status.deleting)
self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleting)
docs = self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleting)

for doc in docs:
if os.path.exists(path := doc[1]):
os.remove(path)

self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleted)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)
Expand Down
2 changes: 1 addition & 1 deletion lazyllm/tools/rag/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, dataset_path: str, embed: Optional[Union[Callable, Dict[str,
self._submodules.append(embed)
self._dlm = DocListManager(dataset_path, name).init_tables()
self._kbs = {DocListManager.DEDAULT_GROUP_NAME: DocImpl(embed=self._embed, dlm=self._dlm)}
if manager: self._manager = DocManager(self._dlm)
if manager: self._manager = ServerModule(DocManager(self._dlm))
if server: self._doc = ServerModule(self._doc)

def add_kb_group(self, name):
Expand Down
53 changes: 34 additions & 19 deletions lazyllm/tools/rag/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import shutil
import hashlib
from typing import List, Callable, Generator, Dict, Any, Optional, Union
from typing import List, Callable, Generator, Dict, Any, Optional, Union, Tuple
from abc import ABC, abstractmethod

import pydantic
Expand Down Expand Up @@ -29,7 +29,7 @@ class Status:
success = 'success'
failed = 'failed'
deleting = 'deleting'
deleting = 'deleted'
deleted = 'deleted'

def __init__(self, path, name):
self._path = path
Expand All @@ -52,13 +52,12 @@ def init_tables(self) -> 'DocListManager':
for root, _, files in os.walk(self._path):
files = [os.path.join(root, file_path) for file_path in files]
files_list.extend(files)
ids = self.add_files(files_list, status=DocListManager.Status.success)
self.add_files_to_kb_group(ids, group=DocListManager.DEDAULT_GROUP_NAME)
self.add_files(files_list, status=DocListManager.Status.success)
return self

def delete_files(self, file_ids: List[str]):
def delete_files(self, file_ids: List[str], real: bool = False):
self.update_kb_group_file_status(file_ids=file_ids, status=DocListManager.Status.deleting)
self._delete_files(file_ids)
self._delete_files(file_ids, real)

@abstractmethod
def table_inited(self): pass
Expand All @@ -81,12 +80,18 @@ def add_kb_group(self, name): pass
def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, details: bool = False,
status: Union[str, List[str]] = Status.all,
exclude_status: Optional[Union[str, List[str]]] = None,
upload_status: str = Status.all,
upload_status: Union[str, List[str]] = Status.all,
exclude_upload_status: Optional[Union[str, List[str]]] = None): pass

@abstractmethod
def add_files(self, files: List[str], metadatas: Optional[List] = None,
status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: pass
status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]:
ids = self._add_files(files, metadatas, status, batch_size)
self.add_files_to_kb_group(ids, group=DocListManager.DEDAULT_GROUP_NAME)
return ids

@abstractmethod
def _add_files(self, files: List[str], metadatas: Optional[List] = None,
status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: pass

@abstractmethod
def update_file_message(self, fileid: str, **kw): pass
Expand All @@ -95,7 +100,7 @@ def update_file_message(self, fileid: str, **kw): pass
def add_files_to_kb_group(self, file_ids: List[str], group: str): pass

@abstractmethod
def _delete_files(self, file_ids: List[str]): pass
def _delete_files(self, file_ids: List[str], real: bool = False): pass

@abstractmethod
def delete_files_from_kb_group(self, file_ids: List[str], group: str): pass
Expand All @@ -104,7 +109,7 @@ def delete_files_from_kb_group(self, file_ids: List[str], group: str): pass
def get_file_status(self, fileid: str): pass

@abstractmethod
def update_file_status(self, file_ids: List[str], status: str): pass
def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: pass

@abstractmethod
def update_kb_group_file_status(self, file_ids: Union[str, List[str]],
Expand Down Expand Up @@ -213,7 +218,7 @@ def add_kb_group(self, name):
def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, details: bool = False,
status: Union[str, List[str]] = DocListManager.Status.all,
exclude_status: Optional[Union[str, List[str]]] = None,
upload_status: str = DocListManager.Status.all,
upload_status: Union[str, List[str]] = DocListManager.Status.all,
exclude_upload_status: Optional[Union[str, List[str]]] = None):
query = """
SELECT documents.doc_id, documents.path, documents.status,
Expand Down Expand Up @@ -251,8 +256,8 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de
if not details: return [row[:2] for row in rows]
return rows

def add_files(self, files: List[str], metadatas: Optional[List] = None,
status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64):
def _add_files(self, files: List[str], metadatas: Optional[List] = None,
status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64):
results = []
for i in range(0, len(files), batch_size):
batch_files = files[i:i + batch_size]
Expand Down Expand Up @@ -290,7 +295,9 @@ def add_files_to_kb_group(self, file_ids: List[str], group: str):
VALUES (?, ?, ?)
""", (doc_id, group, DocListManager.Status.waiting))

def _delete_files(self, file_ids: List[str]):
def _delete_files(self, file_ids: List[str], real: bool = False):
if not real:
return self.update_file_status(file_ids, DocListManager.Status.deleted)
with self._conn:
for doc_id in file_ids:
self._conn.execute("DELETE FROM documents WHERE doc_id = ?", (doc_id,))
Expand All @@ -305,10 +312,18 @@ def get_file_status(self, fileid: str):
cursor = self._conn.execute("SELECT status FROM documents WHERE doc_id = ?", (fileid,))
return cursor.fetchone()

def update_file_status(self, file_ids: List[str], status: str):
with self._conn:
for fileid in file_ids:
self._conn.execute("UPDATE documents SET status = ? WHERE doc_id = ?", (status, fileid))
def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]:
updated_files = []

for i in range(0, len(file_ids), batch_size):
batch = file_ids[i:i + batch_size]
placeholders = ', '.join('?' for _ in batch)
sql = f'UPDATE documents SET status = ? WHERE doc_id IN ({placeholders}) RETURNING doc_id, path'

with self._conn:
cursor = self._conn.execute(sql, [status] + batch)
updated_files.extend(cursor.fetchall())
return updated_files

def update_kb_group_file_status(self, file_ids: Union[str, List[str]], status: str, group: Optional[str] = None):
if isinstance(file_ids, str): file_ids = [file_ids]
Expand Down
5 changes: 4 additions & 1 deletion tests/basic_tests/test_doc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def test_delete_files(self):
self.manager.add_files([self.test_file_1, self.test_file_2])
self.manager.delete_files([hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest()])
files_list = self.manager.list_files(details=True)
assert len(files_list) == 2
files_list = self.manager.list_files(details=True, exclude_status=DocListManager.Status.deleted)
assert len(files_list) == 1
assert not any(self.test_file_1.endswith(row[1]) for row in files_list)

Expand Down Expand Up @@ -204,7 +206,8 @@ def test_upload_files_and_upload_files_to_kb(self):

data = dict(override='false', metadatas=json.dumps([{"key": "value"}, {"key": "value2"}]), user_path='path')
response = requests.post(self.get_url('upload_files', **data), files=files)
assert response.status_code == 200 and len(response.json().get('data')[0]) == 2
assert response.status_code == 200 and response.json().get('code') == 200, response.json()
assert len(response.json().get('data')[0]) == 2

response = requests.get(self.get_url('list_files', details=False))
ids = response.json().get('data')
Expand Down

0 comments on commit 549db38

Please sign in to comment.