From 075c658b398ff683d00c39024bfc1343d937c6c4 Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Mon, 16 Dec 2024 22:01:17 +0800 Subject: [PATCH 01/10] 1. Support delete and reupload. 2. Implement safe delete --- lazyllm/tools/rag/doc_manager.py | 15 +- lazyllm/tools/rag/utils.py | 279 ++++++++++++++++++-------- tests/basic_tests/test_doc_manager.py | 40 ++-- 3 files changed, 233 insertions(+), 101 deletions(-) diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index 568da23e..6a9eb629 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -54,7 +54,10 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa return BaseResponse(code=400, msg=f'file [{files[idx].filename}]: {err_msg}', data=None) file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files] - ids = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) + ids, file_paths = self._manager.add_files( + file_paths, metadatas=metadatas, status=DocListManager.Status.working + ) + assert len(files) == len(ids), "len(files) uploaded vs len(ids) recorede" results = [] for file, path in zip(files, file_paths): if os.path.exists(path): @@ -116,7 +119,9 @@ def add_files(self, files: List[str] = Body(...), else: id_mapping[file] = None - new_ids = self._manager.add_files(new_files, metadatas=new_metadatas, status=DocListManager.Status.success) + new_ids, _ = self._manager.add_files( + new_files, metadatas=new_metadatas, status=DocListManager.Status.success + ) if group_name: self._manager.add_files_to_kb_group(new_ids + exist_ids, group=group_name) @@ -178,6 +183,12 @@ def delete_files(self, request: FileGroupRequest): if request.group_name: return self.delete_files_from_group(request) else: + safe_delete_ids = set(self._manager.get_safe_delete_files()) + tmp = request.file_ids + request.file_ids = [] + for file_id in tmp: + if file_id in safe_delete_ids: + request.file_ids.append(file_id) self._manager.update_kb_group_file_status( file_ids=request.file_ids, status=DocListManager.Status.deleting) docs = self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleting) diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index 26cd036b..5ce654ba 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -9,6 +9,9 @@ from .global_metadata import RAG_DOC_PATH, RAG_DOC_ID from lazyllm.common import override from lazyllm.common.queue import sqlite3_check_threadsafety +import sqlalchemy +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy import Column, insert, update, select import pydantic import sqlite3 @@ -20,6 +23,7 @@ import lazyllm from lazyllm import config +import uuid # min(32, (os.cpu_count() or 1) + 4) is the default number of workers for ThreadPoolExecutor config.add( @@ -34,6 +38,42 @@ def gen_docid(file_path: str) -> str: return hashlib.sha256(file_path.encode()).hexdigest() + +class KBDataBase(DeclarativeBase): + pass + + +class KBDocument(KBDataBase): + __tablename__ = "documents" + + doc_id = Column(sqlalchemy.Text) + filename = Column(sqlalchemy.Text, nullable=False, index=True) + path = Column(sqlalchemy.Text, nullable=False, index=True, primary_key=True) + created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) + meta = Column(sqlalchemy.Text, nullable=True) + status = Column(sqlalchemy.Text, nullable=False) + count = Column(sqlalchemy.Integer, default=0) + + +class KBGroup(KBDataBase): + __tablename__ = "document_groups" + + group_id = Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + group_name = Column(sqlalchemy.String, nullable=False, unique=True) + + +class KBGroupDocuments(KBDataBase): + __tablename__ = "kb_group_documents" + + id = Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + doc_id = Column(sqlalchemy.String, sqlalchemy.ForeignKey("documents.doc_id"), nullable=False) + group_name = Column(sqlalchemy.String, sqlalchemy.ForeignKey("document_groups.group_name"), nullable=False) + status = Column(sqlalchemy.Text, nullable=True) + log = Column(sqlalchemy.Text, nullable=True) + # unique constraint + __table_args__ = (sqlalchemy.UniqueConstraint('doc_id', 'group_name', name='uq_doc_to_group'),) + + class DocListManager(ABC): DEFAULT_GROUP_NAME = '__default__' __pool__ = dict() @@ -57,7 +97,7 @@ def __init__(self, path, name): def __new__(cls, *args, **kw): if cls is not DocListManager: return super().__new__(cls) - return __class__.__pool__[config['default_dlmanager']](*args, **kw) + return super().__new__(__class__.__pool__[config['default_dlmanager']]) def init_tables(self) -> 'DocListManager': if not self.table_inited(): @@ -76,63 +116,116 @@ def delete_files(self, file_ids: List[str], real: bool = False): self._delete_files(file_ids, real) @abstractmethod - def table_inited(self): pass + def table_inited(self): + pass @abstractmethod - def _init_tables(self): pass + def _init_tables(self): + pass @abstractmethod - def list_files(self, limit: Optional[int] = None, details: bool = False, - status: Union[str, List[str]] = Status.all, - exclude_status: Optional[Union[str, List[str]]] = None): pass + def list_files( + self, + limit: Optional[int] = None, + details: bool = False, + status: Union[str, List[str]] = Status.all, + exclude_status: Optional[Union[str, List[str]]] = None, + ): + pass @abstractmethod - def list_all_kb_group(self): pass + def list_all_kb_group(self): + pass @abstractmethod - def add_kb_group(self, name): pass + def add_kb_group(self, name): + pass @abstractmethod - def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, details: bool = False, - status: Union[str, List[str]] = Status.all, - exclude_status: Optional[Union[str, List[str]]] = None, - upload_status: Union[str, List[str]] = Status.all, - exclude_upload_status: Optional[Union[str, List[str]]] = None): pass + def list_kb_group_files( + self, + group: str = None, + limit: Optional[int] = None, + details: bool = False, + status: Union[str, List[str]] = Status.all, + exclude_status: Optional[Union[str, List[str]]] = None, + upload_status: Union[str, List[str]] = Status.all, + exclude_upload_status: Optional[Union[str, List[str]]] = None, + ): + pass def add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: - ids = self._add_files(files, metadatas, status, batch_size) + ids, filepaths = self._add_files(files, metadatas, status, batch_size) self.add_files_to_kb_group(ids, group=DocListManager.DEFAULT_GROUP_NAME) - return ids + return ids, filepaths @abstractmethod - def _add_files(self, files: List[str], metadatas: Optional[List] = None, - status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: pass + def get_filepaths(self, file_ids: List[str]): + pass @abstractmethod - def update_file_message(self, fileid: str, **kw): pass + def _add_files( + self, + files: List[str], + metadatas: Optional[List] = None, + status: Optional[str] = Status.waiting, + batch_size: int = 64, + ) -> List[str]: + pass @abstractmethod - def add_files_to_kb_group(self, file_ids: List[str], group: str): pass + def update_file_message(self, fileid: str, **kw): + pass @abstractmethod - def _delete_files(self, file_ids: List[str], real: bool = False): pass + def get_safe_delete_files(self): + pass @abstractmethod - def delete_files_from_kb_group(self, file_ids: List[str], group: str): pass + def add_files_to_kb_group(self, file_ids: List[str], group: str): + pass @abstractmethod - def get_file_status(self, fileid: str): pass + def _delete_files(self, file_ids: List[str], real: bool = False): + pass @abstractmethod - def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: pass + def delete_files_from_kb_group(self, file_ids: List[str], group: str): + pass @abstractmethod - def update_kb_group_file_status(self, file_ids: Union[str, List[str]], - status: str, group: Optional[str] = None): pass + def get_file_status(self, fileid: str): + pass @abstractmethod - def release(self): pass + def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: + pass + + @abstractmethod + def update_kb_group_file_status(self, file_ids: Union[str, List[str]], status: str, group: Optional[str] = None): + pass + + @abstractmethod + def release(self): + pass + + +def gen_unique_filepaths(ori_filepath: str) -> str: + """ + 根据传入的 base_filename 查询 KBDocument 表,确保生成唯一的 filename。 + 如果存在冲突,则在文件名后添加计数,直到找到唯一值。 + """ + if not os.path.exists(ori_filepath): + return ori_filepath + directory, filename = os.path.split(ori_filepath) + name, ext = os.path.splitext(filename) + ct = 1 + new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" + while os.path.exists(new_filepath): + ct += 1 + new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" + return new_filepath class SqliteDocListManager(DocListManager): @@ -144,38 +237,12 @@ def __init__(self, path, name): self._db_lock = FileLock(self._db_path + '.lock') # ensure that this connection is not used in another thread when sqlite3 is not threadsafe self._check_same_thread = not sqlite3_check_threadsafety() + self._engine = sqlalchemy.create_engine( + f"sqlite:///{self._db_path}?check_same_thread={self._check_same_thread}" + ) def _init_tables(self): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: - conn.execute(""" - CREATE TABLE IF NOT EXISTS documents ( - doc_id TEXT PRIMARY KEY, - filename TEXT NOT NULL, - path TEXT NOT NULL, - metadata TEXT, - status TEXT, - count INTEGER DEFAULT 0 - ) - """) - conn.execute(""" - CREATE TABLE IF NOT EXISTS document_groups ( - group_id INTEGER PRIMARY KEY AUTOINCREMENT, - group_name TEXT NOT NULL UNIQUE - ) - """) - conn.execute(""" - CREATE TABLE IF NOT EXISTS kb_group_documents ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - doc_id TEXT NOT NULL, - group_name TEXT NOT NULL, - status TEXT, - log TEXT, - UNIQUE (doc_id, group_name), - FOREIGN KEY(doc_id) REFERENCES documents(doc_id), - FOREIGN KEY(group_name) REFERENCES document_groups(group_name) - ) - """) - conn.commit() + KBDataBase.metadata.create_all(bind=self._engine) def table_inited(self): with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: @@ -238,7 +305,7 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de upload_status: Union[str, List[str]] = DocListManager.Status.all, exclude_upload_status: Optional[Union[str, List[str]]] = None): query = """ - SELECT documents.doc_id, documents.path, documents.status, documents.metadata, + SELECT documents.doc_id, documents.path, documents.status, documents.meta, kb_group_documents.group_name, kb_group_documents.status, kb_group_documents.log FROM kb_group_documents JOIN documents ON kb_group_documents.doc_id = documents.doc_id @@ -272,35 +339,50 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de if not details: return [row[:2] for row in rows] return rows + def get_filepaths(self, file_ids: List[str]) -> List[str]: + pass + def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64): - results = [] + ids = [] + filepaths = [] for i in range(0, len(files), batch_size): batch_files = files[i:i + batch_size] batch_metadatas = metadatas[i:i + batch_size] if metadatas else None - insert_values, params = [], [] + vals = [] for i, file_path in enumerate(batch_files): - filename = os.path.basename(file_path) - doc_id = gen_docid(file_path) + new_file_path = gen_unique_filepaths(file_path) + doc_id = gen_docid(new_file_path) metadata = batch_metadatas[i].copy() if batch_metadatas else {} metadata.setdefault(RAG_DOC_ID, doc_id) - metadata.setdefault(RAG_DOC_PATH, file_path) - metadata_str = json.dumps(metadata) - - insert_values.append("(?, ?, ?, ?, ?, ?)") - params.extend([doc_id, filename, file_path, metadata_str, status, 1]) - - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: - query = f""" - INSERT OR IGNORE INTO documents (doc_id, filename, path, metadata, status, count) - VALUES {', '.join(insert_values)} RETURNING doc_id; - """ - cursor = conn.execute(query, params) - results.extend([row[0] for row in cursor.fetchall()]) + metadata.setdefault(RAG_DOC_PATH, new_file_path) + + vals.append( + { + KBDocument.doc_id.name: doc_id, + KBDocument.filename.name: os.path.basename(file_path), + KBDocument.path.name: new_file_path, + KBDocument.meta.name: json.dumps(metadata), + KBDocument.status.name: status, + KBDocument.count.name: 1, + } + ) + with self._db_lock, self._engine.connect() as conn: + result = conn.execute( + insert(KBDocument) + .values(vals) + .prefix_with('OR IGNORE') + .returning(KBDocument.doc_id, KBDocument.path) + ) + returned_values = result.fetchall() conn.commit() - return results + ids.extend([ele.doc_id for ele in returned_values]) + filepaths.extend([ele.path for ele in returned_values]) + if "conn" in locals(): + conn.close() + return ids, filepaths # TODO(wangzhihong): set to metadatas and enable this function def update_file_message(self, fileid: str, **kw): @@ -310,6 +392,17 @@ def update_file_message(self, fileid: str, **kw): conn.execute(f"UPDATE documents SET {set_clause} WHERE doc_id = ?", params) conn.commit() + def get_safe_delete_files(self): + ids = [] + with self._db_lock, self._engine.connect() as conn: + stmt = select(KBDocument.doc_id).where(KBDocument.status.in_(["success", "failed"])) + result = conn.execute(stmt) + returned_values = result.fetchall() + ids = [ele.doc_id for ele in returned_values] + if "conn" in locals(): + conn.close() + return ids + def add_files_to_kb_group(self, file_ids: List[str], group: str): with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: for doc_id in file_ids: @@ -341,21 +434,37 @@ def get_file_status(self, fileid: str): def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: updated_files = [] - - for i in range(0, len(file_ids), batch_size): - batch = file_ids[i:i + batch_size] - placeholders = ', '.join('?' for _ in batch) - sql = f'UPDATE documents SET status = ? WHERE doc_id IN ({placeholders}) RETURNING doc_id, path' - - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: - cursor = conn.execute(sql, [status] + batch) - updated_files.extend(cursor.fetchall()) + with self._db_lock, self._engine.connect() as conn: + for i in range(0, len(file_ids), batch_size): + ids = file_ids[i : i + batch_size] + if status == "deleted": + stmt = ( + update(KBDocument) + .where(KBDocument.doc_id.in_(ids)) + .values(status=status, doc_id="deleted_doc") + .returning(KBDocument.doc_id, KBDocument.path) + ) + else: + stmt = ( + update(KBDocument) + .where(KBDocument.doc_id.in_(ids)) + .values(status=status) + .returning(KBDocument.doc_id, KBDocument.path) + ) + result = conn.execute(stmt) + returned_values = result.fetchall() conn.commit() + updated_files.extend([(row.doc_id, row.path) for row in returned_values]) + if "conn" in locals(): + conn.close() return updated_files def update_kb_group_file_status(self, file_ids: Union[str, List[str]], status: str, group: Optional[str] = None): if isinstance(file_ids, str): file_ids = [file_ids] - query, params = 'UPDATE kb_group_documents SET status = ? WHERE ', [status] + if "status" == "deleted": + query, params = 'UPDATE kb_group_documents SET doc_id = "deleted_doc", status = ? WHERE ', [status] + else: + query, params = 'UPDATE kb_group_documents SET status = ? WHERE ', [status] if group: query += 'group_name = ? AND ' params.append(group) diff --git a/tests/basic_tests/test_doc_manager.py b/tests/basic_tests/test_doc_manager.py index ebb0973a..d46a6729 100644 --- a/tests/basic_tests/test_doc_manager.py +++ b/tests/basic_tests/test_doc_manager.py @@ -30,8 +30,6 @@ def setUp(self): self.test_dir = test_dir = self.tmpdir.mkdir("test_documents") test_file_1, test_file_2 = test_dir.join("test1.txt"), test_dir.join("test2.txt") - test_file_1.write("This is a test file 1.") - test_file_2.write("This is a test file 2.") self.test_file_1, self.test_file_2 = str(test_file_1), str(test_file_2) self.manager = DocListManager(str(test_dir), "TestManager") @@ -40,6 +38,11 @@ def tearDown(self): shutil.rmtree(str(self.test_dir)) self.manager.release() + def mock_upload_file(self): + test_file_1, test_file_2 = self.test_dir.join("test1.txt"), self.test_dir.join("test2.txt") + test_file_1.write("This is a test file 1.") + test_file_2.write("This is a test file 2.") + def test_init_tables(self): self.manager.init_tables() assert self.manager.table_inited() is True @@ -48,6 +51,7 @@ def test_add_files(self): self.manager.init_tables() self.manager.add_files([self.test_file_1, self.test_file_2]) + self.mock_upload_file() files_list = self.manager.list_files(details=True) assert len(files_list) == 2 assert any(self.test_file_1.endswith(row[1]) for row in files_list) @@ -55,6 +59,10 @@ def test_add_files(self): def test_list_kb_group_files(self): self.manager.init_tables() + + self.manager.add_files([self.test_file_1, self.test_file_2]) + self.mock_upload_file() + files_list = self.manager.list_kb_group_files(DocListManager.DEFAULT_GROUP_NAME, details=True) assert len(files_list) == 2 files_list = self.manager.list_kb_group_files('group1', details=True) @@ -82,6 +90,7 @@ def test_delete_files(self): self.manager.init_tables() self.manager.add_files([self.test_file_1, self.test_file_2]) + self.mock_upload_file() self.manager.delete_files([hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest()]) files_list = self.manager.list_files(details=True) assert len(files_list) == 2 @@ -93,11 +102,12 @@ def test_update_file_message(self): self.manager.init_tables() self.manager.add_files([self.test_file_1]) + self.mock_upload_file() file_id = hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest() - self.manager.update_file_message(file_id, metadata="New metadata", status="processed") + self.manager.update_file_message(file_id, meta="New metadata", status="processed") conn = sqlite3.connect(self.manager._db_path) - cursor = conn.execute("SELECT metadata, status FROM documents WHERE doc_id = ?", (file_id,)) + cursor = conn.execute("SELECT meta, status FROM documents WHERE doc_id = ?", (file_id,)) row = cursor.fetchone() conn.close() @@ -107,18 +117,17 @@ def test_update_file_message(self): def test_get_and_update_file_status(self): self.manager.init_tables() + self.manager.add_files([self.test_file_1], status=DocListManager.Status.waiting) + self.mock_upload_file() + file_id = hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest() status = self.manager.get_file_status(file_id) - assert status[0] == DocListManager.Status.success + assert status[0] == DocListManager.Status.waiting - self.manager.add_files([self.test_file_1], status=DocListManager.Status.waiting) + self.manager.update_file_status([file_id], DocListManager.Status.success) status = self.manager.get_file_status(file_id) assert status[0] == DocListManager.Status.success - self.manager.update_file_status([file_id], DocListManager.Status.waiting) - status = self.manager.get_file_status(file_id) - assert status[0] == DocListManager.Status.waiting - def test_add_files_to_kb_group(self): self.manager.init_tables() files_list = self.manager.list_kb_group_files("group1", details=True) @@ -156,12 +165,14 @@ def setup_class(cls): cls.test_dir = test_dir = cls.tmpdir.mkdir("test_server") test_file_1, test_file_2 = test_dir.join("test1.txt"), test_dir.join("test2.txt") - test_file_1.write("This is a test file 1.") - test_file_2.write("This is a test file 2.") + cls.test_file_1, cls.test_file_2 = str(test_file_1), str(test_file_2) cls.manager = DocListManager(str(test_dir), "TestManager") cls.manager.init_tables() + cls.manager.add_files([cls.test_file_1, cls.test_file_2]) + test_file_1.write("This is a test file 1.") + test_file_2.write("This is a test file 2.") cls.manager.add_kb_group('group1') cls.manager.add_kb_group('extra_group') cls.server = lazyllm.ServerModule(DocManager(cls.manager)) @@ -255,12 +266,13 @@ def test_delete_files(self): assert response.status_code == 200 and response.json().get('code') == 200 response = requests.get(self.get_url('list_files')) - assert response.status_code == 200 and len(response.json().get('data')) == 5 + assert response.status_code == 200 and len(response.json().get('data')) == 5, response.json().get('data') response = requests.get(self.get_url('list_files', alive=True)) assert response.status_code == 200 and len(response.json().get('data')) == 4 response = requests.get(self.get_url('list_files_in_group', group_name='group1')) - assert response.status_code == 200 and len(response.json().get('data')) == 3 + # one file is deleted, its docid become "deleted" + assert response.status_code == 200 and len(response.json().get('data')) == 2, response.json().get('data') response = requests.get(self.get_url('list_files_in_group', group_name='group1', alive=True)) assert response.status_code == 200 and len(response.json().get('data')) == 1 From 7bdeed674aa75393ccc52325cf5f0dfce591bb0a Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Tue, 17 Dec 2024 10:14:42 +0800 Subject: [PATCH 02/10] delete document record in case of unique constraint failing --- lazyllm/tools/rag/utils.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index 5ce654ba..e7e256e8 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -11,7 +11,7 @@ from lazyllm.common.queue import sqlite3_check_threadsafety import sqlalchemy from sqlalchemy.orm import DeclarativeBase -from sqlalchemy import Column, insert, update, select +from sqlalchemy import Column, insert, update, select, delete import pydantic import sqlite3 @@ -23,7 +23,6 @@ import lazyllm from lazyllm import config -import uuid # min(32, (os.cpu_count() or 1) + 4) is the default number of workers for ThreadPoolExecutor config.add( @@ -436,12 +435,11 @@ def update_file_status(self, file_ids: List[str], status: str, batch_size: int = updated_files = [] with self._db_lock, self._engine.connect() as conn: for i in range(0, len(file_ids), batch_size): - ids = file_ids[i : i + batch_size] + ids = file_ids[i: i + batch_size] if status == "deleted": stmt = ( - update(KBDocument) + delete(KBDocument) .where(KBDocument.doc_id.in_(ids)) - .values(status=status, doc_id="deleted_doc") .returning(KBDocument.doc_id, KBDocument.path) ) else: From d932ad2e3bfa842d884407d96f98054b050e46db Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Thu, 19 Dec 2024 13:53:16 +0800 Subject: [PATCH 03/10] stash current modification --- lazyllm/tools/rag/data_loaders.py | 2 +- lazyllm/tools/rag/doc_impl.py | 15 +++- lazyllm/tools/rag/doc_manager.py | 30 ++++++- lazyllm/tools/rag/document.py | 6 +- lazyllm/tools/rag/utils.py | 110 +++++++++++++++++--------- tests/basic_tests/test_doc_manager.py | 47 +++++------ 6 files changed, 133 insertions(+), 77 deletions(-) diff --git a/lazyllm/tools/rag/data_loaders.py b/lazyllm/tools/rag/data_loaders.py index 3c8c0fcd..7c12a1a4 100644 --- a/lazyllm/tools/rag/data_loaders.py +++ b/lazyllm/tools/rag/data_loaders.py @@ -16,7 +16,7 @@ def load_data(self, input_files: Optional[List[str]] = None) -> List[DocNode]: file_readers = self._local_readers.copy() for key, func in self._global_readers.items(): if key not in file_readers: file_readers[key] = func - LOG.info(f"DirectoryReader loads data, input files: {input_files}") + # LOG.info(f"DirectoryReader loads data, input files: {input_files}") reader = SimpleDirectoryReader(input_files=input_files, file_extractor=file_readers) nodes: List[DocNode] = [] for doc in reader(): diff --git a/lazyllm/tools/rag/doc_impl.py b/lazyllm/tools/rag/doc_impl.py index 99abf6f9..bcc2e5c0 100644 --- a/lazyllm/tools/rag/doc_impl.py +++ b/lazyllm/tools/rag/doc_impl.py @@ -13,7 +13,7 @@ from .smart_embedding_index import SmartEmbeddingIndex from .doc_node import DocNode from .data_loaders import DirectoryReader -from .utils import DocListManager, gen_docid +from .utils import DocListManager, gen_docid_wo_dlm from .global_metadata import GlobalMetadataDesc, RAG_DOC_ID, RAG_DOC_PATH import threading import time @@ -83,7 +83,10 @@ def _lazy_init(self) -> None: root_nodes = self._reader.load_data(paths) for idx, node in enumerate(root_nodes): node.global_metadata.update(metadatas[idx].copy() if metadatas else {}) - node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(paths[idx]) + if self._dlm: + node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else self._dlm.get_active_docid(paths[idx]) + else: + node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid_wo_dlm(paths[idx]) node.global_metadata[RAG_DOC_PATH] = paths[idx] self.store.update_nodes(root_nodes) if self._dlm: self._dlm.update_kb_group_file_status( @@ -225,6 +228,7 @@ def worker(self): self._dlm.update_kb_group_file_status(ids, DocListManager.Status.working, group=self._kb_group_name) self._add_files(input_files=files, ids=ids, metadatas=metadatas) self._dlm.update_kb_group_file_status(ids, DocListManager.Status.success, group=self._kb_group_name) + time.sleep(3) continue time.sleep(10) @@ -248,12 +252,15 @@ def _add_files(self, input_files: List[str], ids: Optional[List[str]] = None, root_nodes = self._reader.load_data(input_files) for idx, node in enumerate(root_nodes): node.global_metadata = metadatas[idx].copy() if metadatas else {} - node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(input_files[idx]) + if self._dlm: + node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else self._dlm.get_active_docid(input_files[idx]) + else: + node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid_wo_dlm(input_files[idx]) node.global_metadata[RAG_DOC_PATH] = input_files[idx] temp_store = self._create_store({"type": "map"}) temp_store.update_nodes(root_nodes) all_groups = self.store.all_groups() - LOG.info(f"add_files: Trying to merge store with {all_groups}") + # LOG.info(f"add_files: Trying to merge store with {all_groups}") for group in all_groups: if group != LAZY_ROOT_NAME and not self.store.is_group_active(group): continue diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index 6a9eb629..a5e348a9 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -9,10 +9,27 @@ import lazyllm from lazyllm import FastapiApp as app from .utils import DocListManager, BaseResponse -from .doc_impl import gen_docid from .global_metadata import RAG_DOC_ID, RAG_DOC_PATH +def gen_unique_filepaths(ori_filepath: str) -> str: + """ + 根据传入的 base_filename 确保生成唯一的 filepath。 + 如果存在冲突,则在文件名后添加计数,直到找到唯一值。 + """ + assert not os.path.exists(ori_filepath), f"file already exists: {ori_filepath}" + if not os.path.exists(ori_filepath): + return ori_filepath + directory, filename = os.path.split(ori_filepath) + name, ext = os.path.splitext(filename) + ct = 1 + new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" + while os.path.exists(new_filepath): + ct += 1 + new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" + return new_filepath + + class DocManager(lazyllm.ModuleBase): def __init__(self, dlm: DocListManager) -> None: super().__init__() @@ -54,9 +71,12 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa return BaseResponse(code=400, msg=f'file [{files[idx].filename}]: {err_msg}', data=None) file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files] + file_paths = [gen_unique_filepaths(ele) for ele in file_paths] + print("FILE PATHS", file_paths) ids, file_paths = self._manager.add_files( file_paths, metadatas=metadatas, status=DocListManager.Status.working ) + print("AFTER self._manager.add_files") assert len(files) == len(ids), "len(files) uploaded vs len(ids) recorede" results = [] for file, path in zip(files, file_paths): @@ -76,8 +96,9 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa except Exception as e: lazyllm.LOG.error(f'writing file [{path}] to disk failed: [{e}]') raise e - - file_id = gen_docid(path) + print("in LOOP, path", path) + file_id = self._manager.get_active_docid(path) + print("after get_active_docid, file_id:", file_id) self._manager.update_file_status([file_id], status=DocListManager.Status.success) results.append('Success') @@ -184,16 +205,19 @@ def delete_files(self, request: FileGroupRequest): return self.delete_files_from_group(request) else: safe_delete_ids = set(self._manager.get_safe_delete_files()) + print("Input file ids:", request.file_ids, ", safe delete ids:", safe_delete_ids) tmp = request.file_ids request.file_ids = [] for file_id in tmp: if file_id in safe_delete_ids: request.file_ids.append(file_id) + print("New fildids after safe delete:", request.file_ids) self._manager.update_kb_group_file_status( file_ids=request.file_ids, status=DocListManager.Status.deleting) docs = self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleting) for doc in docs: + print("DELETE doc:", doc) if os.path.exists(path := doc[1]): os.remove(path) diff --git a/lazyllm/tools/rag/document.py b/lazyllm/tools/rag/document.py index da93e6c6..00cef393 100644 --- a/lazyllm/tools/rag/document.py +++ b/lazyllm/tools/rag/document.py @@ -88,10 +88,12 @@ def create_kb_group(self, name: str, doc_fields: Optional[Dict[str, DocField]] = return doc @property - def _impl(self): return self._impls.get_doc_by_kb_group(self._curr_group) + def _impl(self): + return self._impls.get_doc_by_kb_group(self._curr_group) @property - def manager(self): return getattr(self._impls, '_manager', None) + def manager(self): + return getattr(self._impls, '_manager', None) @DynamicDescriptor def create_node_group(self, name: str = None, *, transform: Callable, parent: str = LAZY_ROOT_NAME, diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index e7e256e8..6341f096 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -12,6 +12,7 @@ import sqlalchemy from sqlalchemy.orm import DeclarativeBase from sqlalchemy import Column, insert, update, select, delete +import uuid import pydantic import sqlite3 @@ -34,7 +35,8 @@ config.add("default_dlmanager", str, "sqlite", "DEFAULT_DOCLIST_MANAGER") -def gen_docid(file_path: str) -> str: + +def gen_docid_wo_dlm(file_path: str) -> str: return hashlib.sha256(file_path.encode()).hexdigest() @@ -42,15 +44,24 @@ class KBDataBase(DeclarativeBase): pass +class KBPrcessedFile(KBDataBase): + __tablename__ = "processed_file" + + path = Column(sqlalchemy.Text, nullable=False, primary_key=True) + meta = Column(sqlalchemy.Text, nullable=True) + created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) + + class KBDocument(KBDataBase): __tablename__ = "documents" - doc_id = Column(sqlalchemy.Text) + doc_id = Column(sqlalchemy.String(36), primary_key=True) filename = Column(sqlalchemy.Text, nullable=False, index=True) - path = Column(sqlalchemy.Text, nullable=False, index=True, primary_key=True) + path = Column(sqlalchemy.Text, nullable=False, index=True) created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) meta = Column(sqlalchemy.Text, nullable=True) - status = Column(sqlalchemy.Text, nullable=False) + status = Column(sqlalchemy.Text, nullable=False, index=True) + # active = Column(sqlalchemy.Boolean, default=True) count = Column(sqlalchemy.Integer, default=0) @@ -114,6 +125,10 @@ def delete_files(self, file_ids: List[str], real: bool = False): self.update_kb_group_file_status(file_ids=file_ids, status=DocListManager.Status.deleting) self._delete_files(file_ids, real) + @abstractmethod + def get_active_docid(self, file_path: str) -> str: + pass + @abstractmethod def table_inited(self): pass @@ -210,29 +225,13 @@ def release(self): pass -def gen_unique_filepaths(ori_filepath: str) -> str: - """ - 根据传入的 base_filename 查询 KBDocument 表,确保生成唯一的 filename。 - 如果存在冲突,则在文件名后添加计数,直到找到唯一值。 - """ - if not os.path.exists(ori_filepath): - return ori_filepath - directory, filename = os.path.split(ori_filepath) - name, ext = os.path.splitext(filename) - ct = 1 - new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" - while os.path.exists(new_filepath): - ct += 1 - new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" - return new_filepath - - class SqliteDocListManager(DocListManager): def __init__(self, path, name): super().__init__(path, name) root_dir = os.path.expanduser(os.path.join(config['home'], '.dbs')) os.makedirs(root_dir, exist_ok=True) self._db_path = os.path.join(root_dir, f'.lazyllm_dlmanager.{self._id}.db') + print(f"Database path: {self._db_path}") self._db_lock = FileLock(self._db_path + '.lock') # ensure that this connection is not used in another thread when sqlite3 is not threadsafe self._check_same_thread = not sqlite3_check_threadsafety() @@ -243,6 +242,24 @@ def __init__(self, path, name): def _init_tables(self): KBDataBase.metadata.create_all(bind=self._engine) + def get_active_docid(self, file_path: str) -> str: + doc_id = "" + with self._db_lock, self._engine.connect() as conn: + stmt = select(KBDocument.doc_id).where(KBDocument.path == file_path, KBDocument.status != "deleted") + result = conn.execute(stmt) + returned_values = result.fetchall() + if len(returned_values) == 0: + lazyllm.LOG.warning(f"No active docid for {file_path}") + elif len(returned_values) == 1: + print(f"Found 1 active docid for {file_path}: {returned_values[0].doc_id}") + doc_id = returned_values[0].doc_id + else: + doc_id = returned_values[0].doc_id + lazyllm.LOG.warning(f"Get len(returned_values) active docids:") + if "conn" in locals(): + conn.close() + return doc_id + def table_inited(self): with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='documents'") @@ -345,24 +362,41 @@ def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64): ids = [] filepaths = [] + if not files: + return ids, filepaths + + newly_added_files = [], meta_str_list = [] + with self._db_lock, self._engine.connect() as conn: + vals = [{KBPrcessedFile.path: ele, KBPrcessedFile.meta} for ele in files] + result = conn.execute( + insert(KBPrcessedFile) + .values(vals) + .prefix_with('OR IGNORE') + .returning(KBPrcessedFile.path, KBPrcessedFile.meta) + ) + conn.commit() + returned_values = result.fetchall() + newly_added_files = [row.path for row in returned_values] + meta_str_list = [row.meta for row in returned_values] + + files = newly_added_files for i in range(0, len(files), batch_size): batch_files = files[i:i + batch_size] batch_metadatas = metadatas[i:i + batch_size] if metadatas else None vals = [] for i, file_path in enumerate(batch_files): - new_file_path = gen_unique_filepaths(file_path) - doc_id = gen_docid(new_file_path) + doc_id = str(uuid.uuid4()) metadata = batch_metadatas[i].copy() if batch_metadatas else {} metadata.setdefault(RAG_DOC_ID, doc_id) - metadata.setdefault(RAG_DOC_PATH, new_file_path) + metadata.setdefault(RAG_DOC_PATH, file_path) vals.append( { KBDocument.doc_id.name: doc_id, KBDocument.filename.name: os.path.basename(file_path), - KBDocument.path.name: new_file_path, + KBDocument.path.name: file_path, KBDocument.meta.name: json.dumps(metadata), KBDocument.status.name: status, KBDocument.count.name: 1, @@ -435,22 +469,19 @@ def update_file_status(self, file_ids: List[str], status: str, batch_size: int = updated_files = [] with self._db_lock, self._engine.connect() as conn: for i in range(0, len(file_ids), batch_size): - ids = file_ids[i: i + batch_size] - if status == "deleted": - stmt = ( - delete(KBDocument) - .where(KBDocument.doc_id.in_(ids)) - .returning(KBDocument.doc_id, KBDocument.path) - ) - else: - stmt = ( - update(KBDocument) - .where(KBDocument.doc_id.in_(ids)) - .values(status=status) - .returning(KBDocument.doc_id, KBDocument.path) - ) + ids = file_ids[i : i + batch_size] + stmt = ( + update(KBDocument) + .where(KBDocument.doc_id.in_(ids)) + .values(status=status) + .returning(KBDocument.doc_id, KBDocument.path) + ) result = conn.execute(stmt) returned_values = result.fetchall() + if status == "deleted": + filepaths = [row.path for row in returned_values] + stmt = delete(KBPrcessedFile).where(KBPrcessedFile.path.in_(filepaths)) + conn.execute(stmt) conn.commit() updated_files.extend([(row.doc_id, row.path) for row in returned_values]) if "conn" in locals(): @@ -476,6 +507,7 @@ def release(self): conn.execute('delete from documents') conn.execute('delete from document_groups') conn.execute('delete from kb_group_documents') + conn.execute('delete from processed_file') conn.commit() def __reduce__(self): diff --git a/tests/basic_tests/test_doc_manager.py b/tests/basic_tests/test_doc_manager.py index d46a6729..61fcb7bd 100644 --- a/tests/basic_tests/test_doc_manager.py +++ b/tests/basic_tests/test_doc_manager.py @@ -30,7 +30,10 @@ def setUp(self): self.test_dir = test_dir = self.tmpdir.mkdir("test_documents") test_file_1, test_file_2 = test_dir.join("test1.txt"), test_dir.join("test2.txt") + test_file_1.write("This is a test file 1.") + test_file_2.write("This is a test file 2.") self.test_file_1, self.test_file_2 = str(test_file_1), str(test_file_2) + print(f"test_file_1:{test_file_1}, test_file_2:{test_file_2}") self.manager = DocListManager(str(test_dir), "TestManager") @@ -38,11 +41,6 @@ def tearDown(self): shutil.rmtree(str(self.test_dir)) self.manager.release() - def mock_upload_file(self): - test_file_1, test_file_2 = self.test_dir.join("test1.txt"), self.test_dir.join("test2.txt") - test_file_1.write("This is a test file 1.") - test_file_2.write("This is a test file 2.") - def test_init_tables(self): self.manager.init_tables() assert self.manager.table_inited() is True @@ -51,7 +49,6 @@ def test_add_files(self): self.manager.init_tables() self.manager.add_files([self.test_file_1, self.test_file_2]) - self.mock_upload_file() files_list = self.manager.list_files(details=True) assert len(files_list) == 2 assert any(self.test_file_1.endswith(row[1]) for row in files_list) @@ -59,10 +56,6 @@ def test_add_files(self): def test_list_kb_group_files(self): self.manager.init_tables() - - self.manager.add_files([self.test_file_1, self.test_file_2]) - self.mock_upload_file() - files_list = self.manager.list_kb_group_files(DocListManager.DEFAULT_GROUP_NAME, details=True) assert len(files_list) == 2 files_list = self.manager.list_kb_group_files('group1', details=True) @@ -90,7 +83,6 @@ def test_delete_files(self): self.manager.init_tables() self.manager.add_files([self.test_file_1, self.test_file_2]) - self.mock_upload_file() self.manager.delete_files([hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest()]) files_list = self.manager.list_files(details=True) assert len(files_list) == 2 @@ -102,12 +94,11 @@ def test_update_file_message(self): self.manager.init_tables() self.manager.add_files([self.test_file_1]) - self.mock_upload_file() file_id = hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest() - self.manager.update_file_message(file_id, meta="New metadata", status="processed") + self.manager.update_file_message(file_id, metadata="New metadata", status="processed") conn = sqlite3.connect(self.manager._db_path) - cursor = conn.execute("SELECT meta, status FROM documents WHERE doc_id = ?", (file_id,)) + cursor = conn.execute("SELECT metadata, status FROM documents WHERE doc_id = ?", (file_id,)) row = cursor.fetchone() conn.close() @@ -117,27 +108,30 @@ def test_update_file_message(self): def test_get_and_update_file_status(self): self.manager.init_tables() - self.manager.add_files([self.test_file_1], status=DocListManager.Status.waiting) - self.mock_upload_file() - file_id = hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest() status = self.manager.get_file_status(file_id) - assert status[0] == DocListManager.Status.waiting + assert status[0] == DocListManager.Status.success - self.manager.update_file_status([file_id], DocListManager.Status.success) + self.manager.add_files([self.test_file_1], status=DocListManager.Status.waiting) status = self.manager.get_file_status(file_id) assert status[0] == DocListManager.Status.success + self.manager.update_file_status([file_id], DocListManager.Status.waiting) + status = self.manager.get_file_status(file_id) + assert status[0] == DocListManager.Status.waiting + def test_add_files_to_kb_group(self): self.manager.init_tables() files_list = self.manager.list_kb_group_files("group1", details=True) assert len(files_list) == 0 - self.manager.add_files([self.test_file_1, self.test_file_2]) + print("\nCall add_files in unittest") + doc_ids = [self.manager.get_active_docid(ele) for ele in [self.test_file_1, self.test_file_2]] + print("doc_ids : ", doc_ids) files_list = self.manager.list_kb_group_files("group1", details=True) assert len(files_list) == 0 - self.manager.add_files_to_kb_group(get_fid([self.test_file_1, self.test_file_2]), group="group1") + self.manager.add_files_to_kb_group(doc_ids, group="group1") files_list = self.manager.list_kb_group_files("group1", details=True) assert len(files_list) == 2 @@ -165,14 +159,12 @@ def setup_class(cls): cls.test_dir = test_dir = cls.tmpdir.mkdir("test_server") test_file_1, test_file_2 = test_dir.join("test1.txt"), test_dir.join("test2.txt") - + test_file_1.write("This is a test file 1.") + test_file_2.write("This is a test file 2.") cls.test_file_1, cls.test_file_2 = str(test_file_1), str(test_file_2) cls.manager = DocListManager(str(test_dir), "TestManager") cls.manager.init_tables() - cls.manager.add_files([cls.test_file_1, cls.test_file_2]) - test_file_1.write("This is a test file 1.") - test_file_2.write("This is a test file 2.") cls.manager.add_kb_group('group1') cls.manager.add_kb_group('extra_group') cls.server = lazyllm.ServerModule(DocManager(cls.manager)) @@ -266,13 +258,12 @@ def test_delete_files(self): assert response.status_code == 200 and response.json().get('code') == 200 response = requests.get(self.get_url('list_files')) - assert response.status_code == 200 and len(response.json().get('data')) == 5, response.json().get('data') + assert response.status_code == 200 and len(response.json().get('data')) == 5 response = requests.get(self.get_url('list_files', alive=True)) assert response.status_code == 200 and len(response.json().get('data')) == 4 response = requests.get(self.get_url('list_files_in_group', group_name='group1')) - # one file is deleted, its docid become "deleted" - assert response.status_code == 200 and len(response.json().get('data')) == 2, response.json().get('data') + assert response.status_code == 200 and len(response.json().get('data')) == 3 response = requests.get(self.get_url('list_files_in_group', group_name='group1', alive=True)) assert response.status_code == 200 and len(response.json().get('data')) == 1 From 764016197c2596b2742efed5a9d547971fe4b3a7 Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Thu, 19 Dec 2024 20:11:06 +0800 Subject: [PATCH 04/10] 1. doc_manager only set deleting. Document will de deleted in worker thread 2. add delete_obsolete_files in doclistmaager, it will find status==delting and reference_count==0 file to delete 3. Add reference_count, when adding to kbgroup document reference_count will be updated to +1, when deleting from kbgroup it will be updated to -1 4. Add KBOperationLogs, currently only deleting file operation will be recoreder 5. Modifiy worker thread. when group is default, it will check to delete obsolete file --- lazyllm/tools/rag/doc_impl.py | 13 +- lazyllm/tools/rag/doc_manager.py | 40 ++--- lazyllm/tools/rag/utils.py | 237 +++++++++++++------------- tests/basic_tests/test_doc_manager.py | 12 +- 4 files changed, 141 insertions(+), 161 deletions(-) diff --git a/lazyllm/tools/rag/doc_impl.py b/lazyllm/tools/rag/doc_impl.py index bcc2e5c0..5ea593d5 100644 --- a/lazyllm/tools/rag/doc_impl.py +++ b/lazyllm/tools/rag/doc_impl.py @@ -13,7 +13,7 @@ from .smart_embedding_index import SmartEmbeddingIndex from .doc_node import DocNode from .data_loaders import DirectoryReader -from .utils import DocListManager, gen_docid_wo_dlm +from .utils import DocListManager, gen_docid from .global_metadata import GlobalMetadataDesc, RAG_DOC_ID, RAG_DOC_PATH import threading import time @@ -83,10 +83,7 @@ def _lazy_init(self) -> None: root_nodes = self._reader.load_data(paths) for idx, node in enumerate(root_nodes): node.global_metadata.update(metadatas[idx].copy() if metadatas else {}) - if self._dlm: - node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else self._dlm.get_active_docid(paths[idx]) - else: - node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid_wo_dlm(paths[idx]) + node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(paths[idx]) node.global_metadata[RAG_DOC_PATH] = paths[idx] self.store.update_nodes(root_nodes) if self._dlm: self._dlm.update_kb_group_file_status( @@ -222,6 +219,7 @@ def worker(self): if self._kb_group_name == DocListManager.DEFAULT_GROUP_NAME: self._dlm.init_tables() + self._dlm.delete_obsolete_files() ids, files, metadatas = self._list_files(status=DocListManager.Status.waiting, upload_status=DocListManager.Status.success) if files: @@ -252,10 +250,7 @@ def _add_files(self, input_files: List[str], ids: Optional[List[str]] = None, root_nodes = self._reader.load_data(input_files) for idx, node in enumerate(root_nodes): node.global_metadata = metadatas[idx].copy() if metadatas else {} - if self._dlm: - node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else self._dlm.get_active_docid(input_files[idx]) - else: - node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid_wo_dlm(input_files[idx]) + node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(input_files[idx]) node.global_metadata[RAG_DOC_PATH] = input_files[idx] temp_store = self._create_store({"type": "map"}) temp_store.update_nodes(root_nodes) diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index a5e348a9..1446c71f 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -9,6 +9,7 @@ import lazyllm from lazyllm import FastapiApp as app from .utils import DocListManager, BaseResponse +from .doc_impl import gen_docid from .global_metadata import RAG_DOC_ID, RAG_DOC_PATH @@ -73,11 +74,8 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files] file_paths = [gen_unique_filepaths(ele) for ele in file_paths] print("FILE PATHS", file_paths) - ids, file_paths = self._manager.add_files( - file_paths, metadatas=metadatas, status=DocListManager.Status.working - ) - print("AFTER self._manager.add_files") - assert len(files) == len(ids), "len(files) uploaded vs len(ids) recorede" + ids = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) + assert len(files) == len(ids), "len(files) uploaded vs len(ids) recored" results = [] for file, path in zip(files, file_paths): if os.path.exists(path): @@ -96,9 +94,7 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa except Exception as e: lazyllm.LOG.error(f'writing file [{path}] to disk failed: [{e}]') raise e - print("in LOOP, path", path) - file_id = self._manager.get_active_docid(path) - print("after get_active_docid, file_id:", file_id) + file_id = gen_docid(path) self._manager.update_file_status([file_id], status=DocListManager.Status.success) results.append('Success') @@ -129,7 +125,8 @@ def add_files(self, files: List[str] = Body(...), exist_id = exists_files_info.get(file, None) if exist_id: update_kws = dict(fileid=exist_id, status=DocListManager.Status.success) - if metadatas: update_kws["metadata"] = json.dumps(metadatas[idx]) + if metadatas: + update_kws["meta"] = json.dumps(metadatas[idx]) self._manager.update_file_message(**update_kws) exist_ids.append(exist_id) id_mapping[file] = exist_id @@ -140,9 +137,7 @@ def add_files(self, files: List[str] = Body(...), else: id_mapping[file] = None - new_ids, _ = self._manager.add_files( - new_files, metadatas=new_metadatas, status=DocListManager.Status.success - ) + new_ids = self._manager.add_files(new_files, metadatas=new_metadatas, status=DocListManager.Status.success) if group_name: self._manager.add_files_to_kb_group(new_ids + exist_ids, group=group_name) @@ -204,24 +199,11 @@ def delete_files(self, request: FileGroupRequest): if request.group_name: return self.delete_files_from_group(request) else: - safe_delete_ids = set(self._manager.get_safe_delete_files()) - print("Input file ids:", request.file_ids, ", safe delete ids:", safe_delete_ids) - tmp = request.file_ids - request.file_ids = [] - for file_id in tmp: - if file_id in safe_delete_ids: - request.file_ids.append(file_id) - print("New fildids after safe delete:", request.file_ids) - self._manager.update_kb_group_file_status( - file_ids=request.file_ids, status=DocListManager.Status.deleting) - docs = self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleting) - - for doc in docs: - print("DELETE doc:", doc) - if os.path.exists(path := doc[1]): + document_list = self._manager.delete_files(request.file_ids) + for doc in document_list: + print("DELETE doc:", doc.path) + if os.path.exists(path := doc.path): os.remove(path) - - self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleted) return BaseResponse() except Exception as e: return BaseResponse(code=500, msg=str(e), data=None) diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index 6341f096..b6c54e7a 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -12,6 +12,7 @@ import sqlalchemy from sqlalchemy.orm import DeclarativeBase from sqlalchemy import Column, insert, update, select, delete +from sqlalchemy.orm import sessionmaker import uuid import pydantic @@ -40,28 +41,30 @@ def gen_docid_wo_dlm(file_path: str) -> str: return hashlib.sha256(file_path.encode()).hexdigest() +def gen_docid(file_path: str) -> str: + return hashlib.sha256(file_path.encode()).hexdigest() + + class KBDataBase(DeclarativeBase): pass -class KBPrcessedFile(KBDataBase): - __tablename__ = "processed_file" - - path = Column(sqlalchemy.Text, nullable=False, primary_key=True) - meta = Column(sqlalchemy.Text, nullable=True) +class KBOperationLogs(KBDataBase): + __tablename__ = "operation_logs" + id = Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + log = Column(sqlalchemy.Text, nullable=False) created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) class KBDocument(KBDataBase): __tablename__ = "documents" - doc_id = Column(sqlalchemy.String(36), primary_key=True) + doc_id = Column(sqlalchemy.Text, primary_key=True) filename = Column(sqlalchemy.Text, nullable=False, index=True) - path = Column(sqlalchemy.Text, nullable=False, index=True) + path = Column(sqlalchemy.Text, nullable=False) created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) meta = Column(sqlalchemy.Text, nullable=True) status = Column(sqlalchemy.Text, nullable=False, index=True) - # active = Column(sqlalchemy.Boolean, default=True) count = Column(sqlalchemy.Integer, default=0) @@ -97,6 +100,7 @@ class Status: deleting = 'deleting' deleted = 'deleted' + DELETE_SAFE_STATUS_LIST = [Status.waiting, Status.success, Status.failed] def __init__(self, path, name): self._path = path self._name = name @@ -121,13 +125,16 @@ def init_tables(self) -> 'DocListManager': self.add_files(files_list, status=DocListManager.Status.success) return self - def delete_files(self, file_ids: List[str], real: bool = False): - self.update_kb_group_file_status(file_ids=file_ids, status=DocListManager.Status.deleting) - self._delete_files(file_ids, real) - - @abstractmethod - def get_active_docid(self, file_path: str) -> str: - pass + def delete_files(self, file_ids: List[str]) -> List[KBDocument]: + print("Delete files:", file_ids) + document_list = self.update_file_status_by_cond( + file_ids, self.DELETE_SAFE_STATUS_LIST, DocListManager.Status.deleting + ) + print("document_list: ", document_list) + safe_delete_ids = [doc.doc_id for doc in document_list] + print("Input file ids:", file_ids, ", safe delete ids:", safe_delete_ids) + self.update_kb_group_file_status(file_ids=safe_delete_ids, status=DocListManager.Status.deleting) + return document_list @abstractmethod def table_inited(self): @@ -170,14 +177,18 @@ def list_kb_group_files( def add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: - ids, filepaths = self._add_files(files, metadatas, status, batch_size) + ids = self._add_files(files, metadatas, status, batch_size) self.add_files_to_kb_group(ids, group=DocListManager.DEFAULT_GROUP_NAME) - return ids, filepaths + return ids @abstractmethod def get_filepaths(self, file_ids: List[str]): pass + @abstractmethod + def delete_obsolete_files(self): + pass + @abstractmethod def _add_files( self, @@ -193,7 +204,9 @@ def update_file_message(self, fileid: str, **kw): pass @abstractmethod - def get_safe_delete_files(self): + def update_file_status_by_cond( + self, file_ids: List[str], cond_status_list: Union[None, List[str]], new_status: str + ) -> List[KBDocument]: pass @abstractmethod @@ -238,28 +251,11 @@ def __init__(self, path, name): self._engine = sqlalchemy.create_engine( f"sqlite:///{self._db_path}?check_same_thread={self._check_same_thread}" ) + self._Session = sessionmaker(bind=self._engine) def _init_tables(self): KBDataBase.metadata.create_all(bind=self._engine) - def get_active_docid(self, file_path: str) -> str: - doc_id = "" - with self._db_lock, self._engine.connect() as conn: - stmt = select(KBDocument.doc_id).where(KBDocument.path == file_path, KBDocument.status != "deleted") - result = conn.execute(stmt) - returned_values = result.fetchall() - if len(returned_values) == 0: - lazyllm.LOG.warning(f"No active docid for {file_path}") - elif len(returned_values) == 1: - print(f"Found 1 active docid for {file_path}: {returned_values[0].doc_id}") - doc_id = returned_values[0].doc_id - else: - doc_id = returned_values[0].doc_id - lazyllm.LOG.warning(f"Get len(returned_values) active docids:") - if "conn" in locals(): - conn.close() - return doc_id - def table_inited(self): with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='documents'") @@ -358,35 +354,32 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de def get_filepaths(self, file_ids: List[str]) -> List[str]: pass + def delete_obsolete_files(self): + with self._db_lock, self._Session() as session: + docs_to_delete = ( + session.query(KBDocument) + .filter(KBDocument.status == DocListManager.Status.deleting, KBDocument.count == 0) + .all() + ) + for doc in docs_to_delete: + session.delete(doc) + log = KBOperationLogs(log=f"Delete obsolete file: {doc.doc_id}") + session.add(log) + session.commit() + def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64): ids = [] - filepaths = [] if not files: - return ids, filepaths - - newly_added_files = [], meta_str_list = [] - with self._db_lock, self._engine.connect() as conn: - vals = [{KBPrcessedFile.path: ele, KBPrcessedFile.meta} for ele in files] - result = conn.execute( - insert(KBPrcessedFile) - .values(vals) - .prefix_with('OR IGNORE') - .returning(KBPrcessedFile.path, KBPrcessedFile.meta) - ) - conn.commit() - returned_values = result.fetchall() - newly_added_files = [row.path for row in returned_values] - meta_str_list = [row.meta for row in returned_values] - - files = newly_added_files + return [] + for i in range(0, len(files), batch_size): batch_files = files[i:i + batch_size] batch_metadatas = metadatas[i:i + batch_size] if metadatas else None vals = [] for i, file_path in enumerate(batch_files): - doc_id = str(uuid.uuid4()) + doc_id = gen_docid(file_path) metadata = batch_metadatas[i].copy() if batch_metadatas else {} metadata.setdefault(RAG_DOC_ID, doc_id) @@ -399,23 +392,17 @@ def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] KBDocument.path.name: file_path, KBDocument.meta.name: json.dumps(metadata), KBDocument.status.name: status, - KBDocument.count.name: 1, + KBDocument.count.name: 0, } ) - with self._db_lock, self._engine.connect() as conn: - result = conn.execute( - insert(KBDocument) - .values(vals) - .prefix_with('OR IGNORE') - .returning(KBDocument.doc_id, KBDocument.path) + with self._db_lock, self._Session() as session: + result = session.execute( + insert(KBDocument).values(vals).prefix_with('OR IGNORE').returning(KBDocument.doc_id) ) returned_values = result.fetchall() - conn.commit() + session.commit() ids.extend([ele.doc_id for ele in returned_values]) - filepaths.extend([ele.path for ele in returned_values]) - if "conn" in locals(): - conn.close() - return ids, filepaths + return ids # TODO(wangzhihong): set to metadatas and enable this function def update_file_message(self, fileid: str, **kw): @@ -425,25 +412,44 @@ def update_file_message(self, fileid: str, **kw): conn.execute(f"UPDATE documents SET {set_clause} WHERE doc_id = ?", params) conn.commit() - def get_safe_delete_files(self): - ids = [] - with self._db_lock, self._engine.connect() as conn: - stmt = select(KBDocument.doc_id).where(KBDocument.status.in_(["success", "failed"])) - result = conn.execute(stmt) - returned_values = result.fetchall() - ids = [ele.doc_id for ele in returned_values] - if "conn" in locals(): - conn.close() - return ids + def update_file_status_by_cond( + self, file_ids: List[str], cond_status_list: Union[None, List[str]], new_status: str + ) -> List[KBDocument]: + rows = [] + if cond_status_list is None: + sql_cond = KBDocument.doc_id.in_(file_ids) + else: + sql_cond = sqlalchemy.and_(KBDocument.status.in_(cond_status_list), KBDocument.doc_id.in_(file_ids)) + with self._db_lock, self._Session() as session: + stmt = ( + update(KBDocument) + .where(sql_cond) + .values(status=new_status) + .returning(KBDocument.doc_id, KBDocument.path) + ) + print("raw sql: ", str(stmt.compile(compile_kwargs={"literal_binds": True}))) + rows = session.execute(stmt).fetchall() + session.commit() + return rows def add_files_to_kb_group(self, file_ids: List[str], group: str): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: + with self._db_lock, self._Session() as session: + vals = [] for doc_id in file_ids: - conn.execute(""" - INSERT OR IGNORE INTO kb_group_documents (doc_id, group_name, status) - VALUES (?, ?, ?) - """, (doc_id, group, DocListManager.Status.waiting)) - conn.commit() + vals = { + KBGroupDocuments.doc_id.name: doc_id, + KBGroupDocuments.group_name.name: group, + KBGroupDocuments.status.name: DocListManager.Status.waiting, + } + rows = session.execute( + insert(KBGroupDocuments).values(vals).prefix_with('OR IGNORE').returning(KBGroupDocuments.doc_id) + ).fetchall() + session.commit() + if not rows: + continue + doc = session.query(KBDocument).filter_by(doc_id=rows[0].doc_id).one() + doc.count += 1 + session.commit() def _delete_files(self, file_ids: List[str], real: bool = False): if not real: @@ -454,46 +460,45 @@ def _delete_files(self, file_ids: List[str], real: bool = False): conn.commit() def delete_files_from_kb_group(self, file_ids: List[str], group: str): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: + with self._db_lock, self._Session() as session: for doc_id in file_ids: - conn.execute("UPDATE kb_group_documents SET status = ? WHERE doc_id = ? AND group_name = ?", - (DocListManager.Status.deleted, doc_id, group)) - conn.commit() - - def get_file_status(self, fileid: str): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: - cursor = conn.execute("SELECT status FROM documents WHERE doc_id = ?", (fileid,)) - return cursor.fetchone() + vals = { + KBGroupDocuments.status.name: DocListManager.Status.deleted, + } + cond = sqlalchemy.and_(KBGroupDocuments.doc_id == doc_id, KBGroupDocuments.group_name == group) + rows = session.execute( + update(KBGroupDocuments).where(cond).values(vals).returning(KBGroupDocuments.doc_id) + ).fetchall() + session.commit() + if not rows: + continue + doc = session.query(KBDocument).filter_by(doc_id=rows[0].doc_id).one() + doc.count = max(0, doc.count - 1) + session.commit() def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: updated_files = [] - with self._db_lock, self._engine.connect() as conn: - for i in range(0, len(file_ids), batch_size): - ids = file_ids[i : i + batch_size] - stmt = ( - update(KBDocument) - .where(KBDocument.doc_id.in_(ids)) - .values(status=status) - .returning(KBDocument.doc_id, KBDocument.path) - ) - result = conn.execute(stmt) - returned_values = result.fetchall() - if status == "deleted": - filepaths = [row.path for row in returned_values] - stmt = delete(KBPrcessedFile).where(KBPrcessedFile.path.in_(filepaths)) - conn.execute(stmt) + + for i in range(0, len(file_ids), batch_size): + batch = file_ids[i : i + batch_size] + placeholders = ', '.join('?' for _ in batch) + sql = f'UPDATE documents SET status = ? WHERE doc_id IN ({placeholders}) RETURNING doc_id, path' + + with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: + cursor = conn.execute(sql, [status] + batch) + updated_files.extend(cursor.fetchall()) conn.commit() - updated_files.extend([(row.doc_id, row.path) for row in returned_values]) - if "conn" in locals(): - conn.close() return updated_files + def get_file_status(self, fileid: str): + with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: + cursor = conn.execute("SELECT status FROM documents WHERE doc_id = ?", (fileid,)) + return cursor.fetchone() + def update_kb_group_file_status(self, file_ids: Union[str, List[str]], status: str, group: Optional[str] = None): - if isinstance(file_ids, str): file_ids = [file_ids] - if "status" == "deleted": - query, params = 'UPDATE kb_group_documents SET doc_id = "deleted_doc", status = ? WHERE ', [status] - else: - query, params = 'UPDATE kb_group_documents SET status = ? WHERE ', [status] + if isinstance(file_ids, str): + file_ids = [file_ids] + query, params = 'UPDATE kb_group_documents SET status = ? WHERE ', [status] if group: query += 'group_name = ? AND ' params.append(group) @@ -507,7 +512,7 @@ def release(self): conn.execute('delete from documents') conn.execute('delete from document_groups') conn.execute('delete from kb_group_documents') - conn.execute('delete from processed_file') + conn.execute('delete from operation_logs') conn.commit() def __reduce__(self): diff --git a/tests/basic_tests/test_doc_manager.py b/tests/basic_tests/test_doc_manager.py index 61fcb7bd..23ee766a 100644 --- a/tests/basic_tests/test_doc_manager.py +++ b/tests/basic_tests/test_doc_manager.py @@ -86,7 +86,7 @@ def test_delete_files(self): self.manager.delete_files([hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest()]) files_list = self.manager.list_files(details=True) assert len(files_list) == 2 - files_list = self.manager.list_files(details=True, exclude_status=DocListManager.Status.deleted) + files_list = self.manager.list_files(details=True, exclude_status=DocListManager.Status.deleting) assert len(files_list) == 1 assert not any(self.test_file_1.endswith(row[1]) for row in files_list) @@ -95,10 +95,10 @@ def test_update_file_message(self): self.manager.add_files([self.test_file_1]) file_id = hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest() - self.manager.update_file_message(file_id, metadata="New metadata", status="processed") + self.manager.update_file_message(file_id, meta="New metadata", status="processed") conn = sqlite3.connect(self.manager._db_path) - cursor = conn.execute("SELECT metadata, status FROM documents WHERE doc_id = ?", (file_id,)) + cursor = conn.execute("SELECT meta, status FROM documents WHERE doc_id = ?", (file_id,)) row = cursor.fetchone() conn.close() @@ -125,13 +125,11 @@ def test_add_files_to_kb_group(self): files_list = self.manager.list_kb_group_files("group1", details=True) assert len(files_list) == 0 - print("\nCall add_files in unittest") - doc_ids = [self.manager.get_active_docid(ele) for ele in [self.test_file_1, self.test_file_2]] - print("doc_ids : ", doc_ids) + self.manager.add_files([self.test_file_1, self.test_file_2]) files_list = self.manager.list_kb_group_files("group1", details=True) assert len(files_list) == 0 - self.manager.add_files_to_kb_group(doc_ids, group="group1") + self.manager.add_files_to_kb_group(get_fid([self.test_file_1, self.test_file_2]), group="group1") files_list = self.manager.list_kb_group_files("group1", details=True) assert len(files_list) == 2 From bf77195c637f63e4e642918c33d860e58a4c48ad Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Thu, 19 Dec 2024 20:20:03 +0800 Subject: [PATCH 05/10] fix lint error --- lazyllm/tools/rag/doc_impl.py | 2 +- lazyllm/tools/rag/utils.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lazyllm/tools/rag/doc_impl.py b/lazyllm/tools/rag/doc_impl.py index 5ea593d5..ecdf7215 100644 --- a/lazyllm/tools/rag/doc_impl.py +++ b/lazyllm/tools/rag/doc_impl.py @@ -250,7 +250,7 @@ def _add_files(self, input_files: List[str], ids: Optional[List[str]] = None, root_nodes = self._reader.load_data(input_files) for idx, node in enumerate(root_nodes): node.global_metadata = metadatas[idx].copy() if metadatas else {} - node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(input_files[idx]) + node.global_metadata[RAG_DOC_ID] = ids[idx] if ids else gen_docid(input_files[idx]) node.global_metadata[RAG_DOC_PATH] = input_files[idx] temp_store = self._create_store({"type": "map"}) temp_store.update_nodes(root_nodes) diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index b6c54e7a..673ffac6 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -11,9 +11,8 @@ from lazyllm.common.queue import sqlite3_check_threadsafety import sqlalchemy from sqlalchemy.orm import DeclarativeBase -from sqlalchemy import Column, insert, update, select, delete +from sqlalchemy import Column, insert, update from sqlalchemy.orm import sessionmaker -import uuid import pydantic import sqlite3 @@ -101,6 +100,7 @@ class Status: deleted = 'deleted' DELETE_SAFE_STATUS_LIST = [Status.waiting, Status.success, Status.failed] + def __init__(self, path, name): self._path = path self._name = name @@ -480,7 +480,7 @@ def update_file_status(self, file_ids: List[str], status: str, batch_size: int = updated_files = [] for i in range(0, len(file_ids), batch_size): - batch = file_ids[i : i + batch_size] + batch = file_ids[i: i + batch_size] placeholders = ', '.join('?' for _ in batch) sql = f'UPDATE documents SET status = ? WHERE doc_id IN ({placeholders}) RETURNING doc_id, path' From f44c3bde00cecee6e1f88cddc4e81c2049bdfc92 Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Thu, 19 Dec 2024 20:28:00 +0800 Subject: [PATCH 06/10] restore old one-line style --- lazyllm/tools/rag/utils.py | 43 +++++++++++++------------------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index 673ffac6..37045432 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -137,12 +137,10 @@ def delete_files(self, file_ids: List[str]) -> List[KBDocument]: return document_list @abstractmethod - def table_inited(self): - pass + def table_inited(self): pass @abstractmethod - def _init_tables(self): - pass + def _init_tables(self): pass @abstractmethod def list_files( @@ -155,12 +153,10 @@ def list_files( pass @abstractmethod - def list_all_kb_group(self): - pass + def list_all_kb_group(self): pass @abstractmethod - def add_kb_group(self, name): - pass + def add_kb_group(self, name): pass @abstractmethod def list_kb_group_files( @@ -182,12 +178,10 @@ def add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] return ids @abstractmethod - def get_filepaths(self, file_ids: List[str]): - pass + def get_filepaths(self, file_ids: List[str]): pass @abstractmethod - def delete_obsolete_files(self): - pass + def delete_obsolete_files(self): pass @abstractmethod def _add_files( @@ -200,8 +194,7 @@ def _add_files( pass @abstractmethod - def update_file_message(self, fileid: str, **kw): - pass + def update_file_message(self, fileid: str, **kw): pass @abstractmethod def update_file_status_by_cond( @@ -210,32 +203,26 @@ def update_file_status_by_cond( pass @abstractmethod - def add_files_to_kb_group(self, file_ids: List[str], group: str): - pass + def add_files_to_kb_group(self, file_ids: List[str], group: str): pass @abstractmethod - def _delete_files(self, file_ids: List[str], real: bool = False): - pass + def _delete_files(self, file_ids: List[str], real: bool = False): pass @abstractmethod - def delete_files_from_kb_group(self, file_ids: List[str], group: str): - pass + def delete_files_from_kb_group(self, file_ids: List[str], group: str): pass @abstractmethod - def get_file_status(self, fileid: str): - pass + def get_file_status(self, fileid: str): pass @abstractmethod - def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: - pass + def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: pass @abstractmethod - def update_kb_group_file_status(self, file_ids: Union[str, List[str]], status: str, group: Optional[str] = None): - pass + def update_kb_group_file_status(self, file_ids: Union[str, List[str]], + status: str, group: Optional[str] = None): pass @abstractmethod - def release(self): - pass + def release(self): pass class SqliteDocListManager(DocListManager): From 134c9d65866556bd3d4816faeb788d0f46a816c8 Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Thu, 19 Dec 2024 20:32:44 +0800 Subject: [PATCH 07/10] remove function comment --- lazyllm/tools/rag/doc_manager.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index 1446c71f..841998fe 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -14,10 +14,6 @@ def gen_unique_filepaths(ori_filepath: str) -> str: - """ - 根据传入的 base_filename 确保生成唯一的 filepath。 - 如果存在冲突,则在文件名后添加计数,直到找到唯一值。 - """ assert not os.path.exists(ori_filepath), f"file already exists: {ori_filepath}" if not os.path.exists(ori_filepath): return ori_filepath From db63373f0dc2a2c8f85e5a8a60e1842d234f6fea Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Thu, 19 Dec 2024 20:37:47 +0800 Subject: [PATCH 08/10] remove print --- lazyllm/tools/rag/doc_manager.py | 1 - lazyllm/tools/rag/utils.py | 5 ----- 2 files changed, 6 deletions(-) diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index 841998fe..471c95c9 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -69,7 +69,6 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files] file_paths = [gen_unique_filepaths(ele) for ele in file_paths] - print("FILE PATHS", file_paths) ids = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) assert len(files) == len(ids), "len(files) uploaded vs len(ids) recored" results = [] diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index 37045432..fd544259 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -126,13 +126,10 @@ def init_tables(self) -> 'DocListManager': return self def delete_files(self, file_ids: List[str]) -> List[KBDocument]: - print("Delete files:", file_ids) document_list = self.update_file_status_by_cond( file_ids, self.DELETE_SAFE_STATUS_LIST, DocListManager.Status.deleting ) - print("document_list: ", document_list) safe_delete_ids = [doc.doc_id for doc in document_list] - print("Input file ids:", file_ids, ", safe delete ids:", safe_delete_ids) self.update_kb_group_file_status(file_ids=safe_delete_ids, status=DocListManager.Status.deleting) return document_list @@ -231,7 +228,6 @@ def __init__(self, path, name): root_dir = os.path.expanduser(os.path.join(config['home'], '.dbs')) os.makedirs(root_dir, exist_ok=True) self._db_path = os.path.join(root_dir, f'.lazyllm_dlmanager.{self._id}.db') - print(f"Database path: {self._db_path}") self._db_lock = FileLock(self._db_path + '.lock') # ensure that this connection is not used in another thread when sqlite3 is not threadsafe self._check_same_thread = not sqlite3_check_threadsafety() @@ -414,7 +410,6 @@ def update_file_status_by_cond( .values(status=new_status) .returning(KBDocument.doc_id, KBDocument.path) ) - print("raw sql: ", str(stmt.compile(compile_kwargs={"literal_binds": True}))) rows = session.execute(stmt).fetchall() session.commit() return rows From 7447213979539846ff3284257a4ed24d5fc5f48f Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Fri, 20 Dec 2024 11:22:38 +0800 Subject: [PATCH 09/10] restore to old-one-line style --- lazyllm/tools/rag/data_loaders.py | 2 +- lazyllm/tools/rag/doc_impl.py | 2 +- lazyllm/tools/rag/doc_manager.py | 4 +- lazyllm/tools/rag/document.py | 6 +-- lazyllm/tools/rag/utils.py | 63 +++++++++---------------------- 5 files changed, 22 insertions(+), 55 deletions(-) diff --git a/lazyllm/tools/rag/data_loaders.py b/lazyllm/tools/rag/data_loaders.py index 7c12a1a4..3c8c0fcd 100644 --- a/lazyllm/tools/rag/data_loaders.py +++ b/lazyllm/tools/rag/data_loaders.py @@ -16,7 +16,7 @@ def load_data(self, input_files: Optional[List[str]] = None) -> List[DocNode]: file_readers = self._local_readers.copy() for key, func in self._global_readers.items(): if key not in file_readers: file_readers[key] = func - # LOG.info(f"DirectoryReader loads data, input files: {input_files}") + LOG.info(f"DirectoryReader loads data, input files: {input_files}") reader = SimpleDirectoryReader(input_files=input_files, file_extractor=file_readers) nodes: List[DocNode] = [] for doc in reader(): diff --git a/lazyllm/tools/rag/doc_impl.py b/lazyllm/tools/rag/doc_impl.py index ecdf7215..25398a31 100644 --- a/lazyllm/tools/rag/doc_impl.py +++ b/lazyllm/tools/rag/doc_impl.py @@ -255,7 +255,7 @@ def _add_files(self, input_files: List[str], ids: Optional[List[str]] = None, temp_store = self._create_store({"type": "map"}) temp_store.update_nodes(root_nodes) all_groups = self.store.all_groups() - # LOG.info(f"add_files: Trying to merge store with {all_groups}") + LOG.info(f"add_files: Trying to merge store with {all_groups}") for group in all_groups: if group != LAZY_ROOT_NAME and not self.store.is_group_active(group): continue diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index 471c95c9..4275a40f 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -70,7 +70,6 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files] file_paths = [gen_unique_filepaths(ele) for ele in file_paths] ids = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) - assert len(files) == len(ids), "len(files) uploaded vs len(ids) recored" results = [] for file, path in zip(files, file_paths): if os.path.exists(path): @@ -120,8 +119,7 @@ def add_files(self, files: List[str] = Body(...), exist_id = exists_files_info.get(file, None) if exist_id: update_kws = dict(fileid=exist_id, status=DocListManager.Status.success) - if metadatas: - update_kws["meta"] = json.dumps(metadatas[idx]) + if metadatas: update_kws["meta"] = json.dumps(metadatas[idx]) self._manager.update_file_message(**update_kws) exist_ids.append(exist_id) id_mapping[file] = exist_id diff --git a/lazyllm/tools/rag/document.py b/lazyllm/tools/rag/document.py index 00cef393..da93e6c6 100644 --- a/lazyllm/tools/rag/document.py +++ b/lazyllm/tools/rag/document.py @@ -88,12 +88,10 @@ def create_kb_group(self, name: str, doc_fields: Optional[Dict[str, DocField]] = return doc @property - def _impl(self): - return self._impls.get_doc_by_kb_group(self._curr_group) + def _impl(self): return self._impls.get_doc_by_kb_group(self._curr_group) @property - def manager(self): - return getattr(self._impls, '_manager', None) + def manager(self): return getattr(self._impls, '_manager', None) @DynamicDescriptor def create_node_group(self, name: str = None, *, transform: Callable, parent: str = LAZY_ROOT_NAME, diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index fd544259..a75ad785 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -35,11 +35,6 @@ config.add("default_dlmanager", str, "sqlite", "DEFAULT_DOCLIST_MANAGER") - -def gen_docid_wo_dlm(file_path: str) -> str: - return hashlib.sha256(file_path.encode()).hexdigest() - - def gen_docid(file_path: str) -> str: return hashlib.sha256(file_path.encode()).hexdigest() @@ -140,14 +135,9 @@ def table_inited(self): pass def _init_tables(self): pass @abstractmethod - def list_files( - self, - limit: Optional[int] = None, - details: bool = False, - status: Union[str, List[str]] = Status.all, - exclude_status: Optional[Union[str, List[str]]] = None, - ): - pass + def list_files(self, limit: Optional[int] = None, details: bool = False, + status: Union[str, List[str]] = Status.all, + exclude_status: Optional[Union[str, List[str]]] = None): pass @abstractmethod def list_all_kb_group(self): pass @@ -156,17 +146,11 @@ def list_all_kb_group(self): pass def add_kb_group(self, name): pass @abstractmethod - def list_kb_group_files( - self, - group: str = None, - limit: Optional[int] = None, - details: bool = False, - status: Union[str, List[str]] = Status.all, - exclude_status: Optional[Union[str, List[str]]] = None, - upload_status: Union[str, List[str]] = Status.all, - exclude_upload_status: Optional[Union[str, List[str]]] = None, - ): - pass + def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, details: bool = False, + status: Union[str, List[str]] = Status.all, + exclude_status: Optional[Union[str, List[str]]] = None, + upload_status: Union[str, List[str]] = Status.all, + exclude_upload_status: Optional[Union[str, List[str]]] = None): pass def add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: @@ -175,21 +159,12 @@ def add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] return ids @abstractmethod - def get_filepaths(self, file_ids: List[str]): pass + def _add_files(self, files: List[str], metadatas: Optional[List] = None, + status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: pass @abstractmethod def delete_obsolete_files(self): pass - @abstractmethod - def _add_files( - self, - files: List[str], - metadatas: Optional[List] = None, - status: Optional[str] = Status.waiting, - batch_size: int = 64, - ) -> List[str]: - pass - @abstractmethod def update_file_message(self, fileid: str, **kw): pass @@ -334,9 +309,6 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de if not details: return [row[:2] for row in rows] return rows - def get_filepaths(self, file_ids: List[str]) -> List[str]: - pass - def delete_obsolete_files(self): with self._db_lock, self._Session() as session: docs_to_delete = ( @@ -458,11 +430,16 @@ def delete_files_from_kb_group(self, file_ids: List[str], group: str): doc.count = max(0, doc.count - 1) session.commit() + def get_file_status(self, fileid: str): + with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: + cursor = conn.execute("SELECT status FROM documents WHERE doc_id = ?", (fileid,)) + return cursor.fetchone() + def update_file_status(self, file_ids: List[str], status: str, batch_size: int = 64) -> List[Tuple[str, str]]: updated_files = [] for i in range(0, len(file_ids), batch_size): - batch = file_ids[i: i + batch_size] + batch = file_ids[i:i + batch_size] placeholders = ', '.join('?' for _ in batch) sql = f'UPDATE documents SET status = ? WHERE doc_id IN ({placeholders}) RETURNING doc_id, path' @@ -472,14 +449,8 @@ def update_file_status(self, file_ids: List[str], status: str, batch_size: int = conn.commit() return updated_files - def get_file_status(self, fileid: str): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: - cursor = conn.execute("SELECT status FROM documents WHERE doc_id = ?", (fileid,)) - return cursor.fetchone() - def update_kb_group_file_status(self, file_ids: Union[str, List[str]], status: str, group: Optional[str] = None): - if isinstance(file_ids, str): - file_ids = [file_ids] + if isinstance(file_ids, str): file_ids = [file_ids] query, params = 'UPDATE kb_group_documents SET status = ? WHERE ', [status] if group: query += 'group_name = ? AND ' From 3123e37cfb6fa360cc563534278d1b5562583402 Mon Sep 17 00:00:00 2001 From: zhangyongchao Date: Fri, 20 Dec 2024 17:31:14 +0800 Subject: [PATCH 10/10] Add response detail --- lazyllm/tools/rag/doc_manager.py | 53 ++++++++++++++------------- lazyllm/tools/rag/utils.py | 43 +++++++++++++--------- tests/basic_tests/test_doc_manager.py | 12 ++++++ 3 files changed, 66 insertions(+), 42 deletions(-) diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index 4275a40f..40f9fdcb 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -14,7 +14,6 @@ def gen_unique_filepaths(ori_filepath: str) -> str: - assert not os.path.exists(ori_filepath), f"file already exists: {ori_filepath}" if not os.path.exists(ori_filepath): return ori_filepath directory, filename = os.path.split(ori_filepath) @@ -69,28 +68,31 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files] file_paths = [gen_unique_filepaths(ele) for ele in file_paths] - ids = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) - results = [] + # when adding same file which is deleting, it will be ignored + documents = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) + ids, results = [], [] + rev_index = {ele.path: i for i, ele in enumerate(documents)} for file, path in zip(files, file_paths): - if os.path.exists(path): - if not override: - results.append('Duplicated') - continue - - try: - content = file.file.read() - directory = os.path.dirname(path) - if directory: - os.makedirs(directory, exist_ok=True) - - with open(path, 'wb') as f: - f.write(content) - except Exception as e: - lazyllm.LOG.error(f'writing file [{path}] to disk failed: [{e}]') - raise e file_id = gen_docid(path) - self._manager.update_file_status([file_id], status=DocListManager.Status.success) - results.append('Success') + result = "Failed" + if path not in rev_index: + result = "Failed, wait for deleting finished" + else: + try: + content = file.file.read() + directory = os.path.dirname(path) + if directory: + os.makedirs(directory, exist_ok=True) + + with open(path, 'wb') as f: + f.write(content) + except Exception as e: + lazyllm.LOG.error(f'writing file [{path}] to disk failed: [{e}]') + raise e + self._manager.update_file_status([file_id], status=DocListManager.Status.success) + result = "Success" + ids.append(file_id) + results.append(result) return BaseResponse(data=[ids, results]) except Exception as e: @@ -192,12 +194,13 @@ def delete_files(self, request: FileGroupRequest): if request.group_name: return self.delete_files_from_group(request) else: - document_list = self._manager.delete_files(request.file_ids) - for doc in document_list: - print("DELETE doc:", doc.path) + documents = self._manager.delete_files(request.file_ids) + deleted_ids = set([ele.doc_id for ele in documents]) + for doc in documents: if os.path.exists(path := doc.path): os.remove(path) - return BaseResponse() + results = ["Success" if ele.doc_id in deleted_ids else "Failed" for ele in documents] + return BaseResponse(data=[request.file_ids, results]) except Exception as e: return BaseResponse(code=500, msg=str(e), data=None) diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index a75ad785..94a4c2c4 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -11,7 +11,7 @@ from lazyllm.common.queue import sqlite3_check_threadsafety import sqlalchemy from sqlalchemy.orm import DeclarativeBase -from sqlalchemy import Column, insert, update +from sqlalchemy import Column, insert, update, Row from sqlalchemy.orm import sessionmaker import pydantic @@ -50,6 +50,7 @@ class KBOperationLogs(KBDataBase): created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) +DocPartRow = Row class KBDocument(KBDataBase): __tablename__ = "documents" @@ -120,7 +121,7 @@ def init_tables(self) -> 'DocListManager': self.add_files(files_list, status=DocListManager.Status.success) return self - def delete_files(self, file_ids: List[str]) -> List[KBDocument]: + def delete_files(self, file_ids: List[str]) -> List[DocPartRow]: document_list = self.update_file_status_by_cond( file_ids, self.DELETE_SAFE_STATUS_LIST, DocListManager.Status.deleting ) @@ -152,15 +153,21 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de upload_status: Union[str, List[str]] = Status.all, exclude_upload_status: Optional[Union[str, List[str]]] = None): pass - def add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, - status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: - ids = self._add_files(files, metadatas, status, batch_size) - self.add_files_to_kb_group(ids, group=DocListManager.DEFAULT_GROUP_NAME) - return ids + def add_files( + self, + files: List[str], + metadatas: Optional[List[Dict[str, Any]]] = None, + status: Optional[str] = Status.waiting, + batch_size: int = 64, + ) -> List[DocPartRow]: + documents = self._add_files(files, metadatas, status, batch_size) + if documents: + self.add_files_to_kb_group([doc.doc_id for doc in documents], group=DocListManager.DEFAULT_GROUP_NAME) + return documents @abstractmethod def _add_files(self, files: List[str], metadatas: Optional[List] = None, - status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: pass + status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[DocPartRow]: pass @abstractmethod def delete_obsolete_files(self): pass @@ -171,7 +178,7 @@ def update_file_message(self, fileid: str, **kw): pass @abstractmethod def update_file_status_by_cond( self, file_ids: List[str], cond_status_list: Union[None, List[str]], new_status: str - ) -> List[KBDocument]: + ) -> List[DocPartRow]: pass @abstractmethod @@ -324,7 +331,7 @@ def delete_obsolete_files(self): def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64): - ids = [] + documents = [] if not files: return [] @@ -351,13 +358,15 @@ def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] } ) with self._db_lock, self._Session() as session: - result = session.execute( - insert(KBDocument).values(vals).prefix_with('OR IGNORE').returning(KBDocument.doc_id) - ) - returned_values = result.fetchall() + rows = session.execute( + insert(KBDocument) + .values(vals) + .prefix_with('OR IGNORE') + .returning(KBDocument.doc_id, KBDocument.path) + ).fetchall() session.commit() - ids.extend([ele.doc_id for ele in returned_values]) - return ids + documents.extend(rows) + return documents # TODO(wangzhihong): set to metadatas and enable this function def update_file_message(self, fileid: str, **kw): @@ -369,7 +378,7 @@ def update_file_message(self, fileid: str, **kw): def update_file_status_by_cond( self, file_ids: List[str], cond_status_list: Union[None, List[str]], new_status: str - ) -> List[KBDocument]: + ) -> List[DocPartRow]: rows = [] if cond_status_list is None: sql_cond = KBDocument.doc_id.in_(file_ids) diff --git a/tests/basic_tests/test_doc_manager.py b/tests/basic_tests/test_doc_manager.py index 23ee766a..cac67b2b 100644 --- a/tests/basic_tests/test_doc_manager.py +++ b/tests/basic_tests/test_doc_manager.py @@ -90,6 +90,18 @@ def test_delete_files(self): assert len(files_list) == 1 assert not any(self.test_file_1.endswith(row[1]) for row in files_list) + def test_add_deleting_file(self): + self.manager.init_tables() + + self.manager.add_files([self.test_file_1, self.test_file_2]) + self.manager.delete_files([hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest()]) + files_list = self.manager.list_files(details=True) + assert len(files_list) == 2 + files_list = self.manager.list_files(details=True, status=DocListManager.Status.deleting) + assert len(files_list) == 1 + documents = self.manager.add_files([self.test_file_1]) + assert documents == [] + def test_update_file_message(self): self.manager.init_tables()