diff --git a/lazyllm/tools/rag/doc_impl.py b/lazyllm/tools/rag/doc_impl.py index 99abf6f9..25398a31 100644 --- a/lazyllm/tools/rag/doc_impl.py +++ b/lazyllm/tools/rag/doc_impl.py @@ -219,12 +219,14 @@ def worker(self): if self._kb_group_name == DocListManager.DEFAULT_GROUP_NAME: self._dlm.init_tables() + self._dlm.delete_obsolete_files() ids, files, metadatas = self._list_files(status=DocListManager.Status.waiting, upload_status=DocListManager.Status.success) if files: self._dlm.update_kb_group_file_status(ids, DocListManager.Status.working, group=self._kb_group_name) self._add_files(input_files=files, ids=ids, metadatas=metadatas) self._dlm.update_kb_group_file_status(ids, DocListManager.Status.success, group=self._kb_group_name) + time.sleep(3) continue time.sleep(10) diff --git a/lazyllm/tools/rag/doc_manager.py b/lazyllm/tools/rag/doc_manager.py index 568da23e..40f9fdcb 100644 --- a/lazyllm/tools/rag/doc_manager.py +++ b/lazyllm/tools/rag/doc_manager.py @@ -13,6 +13,19 @@ from .global_metadata import RAG_DOC_ID, RAG_DOC_PATH +def gen_unique_filepaths(ori_filepath: str) -> str: + if not os.path.exists(ori_filepath): + return ori_filepath + directory, filename = os.path.split(ori_filepath) + name, ext = os.path.splitext(filename) + ct = 1 + new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" + while os.path.exists(new_filepath): + ct += 1 + new_filepath = f"{os.path.join(directory, name)}_{ct}{ext}" + return new_filepath + + class DocManager(lazyllm.ModuleBase): def __init__(self, dlm: DocListManager) -> None: super().__init__() @@ -54,29 +67,32 @@ def upload_files(self, files: List[UploadFile], override: bool = False, # noqa return BaseResponse(code=400, msg=f'file [{files[idx].filename}]: {err_msg}', data=None) file_paths = [os.path.join(self._manager._path, user_path or '', file.filename) for file in files] - ids = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) - results = [] + file_paths = [gen_unique_filepaths(ele) for ele in file_paths] + # when adding same file which is deleting, it will be ignored + documents = self._manager.add_files(file_paths, metadatas=metadatas, status=DocListManager.Status.working) + ids, results = [], [] + rev_index = {ele.path: i for i, ele in enumerate(documents)} for file, path in zip(files, file_paths): - if os.path.exists(path): - if not override: - results.append('Duplicated') - continue - - try: - content = file.file.read() - directory = os.path.dirname(path) - if directory: - os.makedirs(directory, exist_ok=True) - - with open(path, 'wb') as f: - f.write(content) - except Exception as e: - lazyllm.LOG.error(f'writing file [{path}] to disk failed: [{e}]') - raise e - file_id = gen_docid(path) - self._manager.update_file_status([file_id], status=DocListManager.Status.success) - results.append('Success') + result = "Failed" + if path not in rev_index: + result = "Failed, wait for deleting finished" + else: + try: + content = file.file.read() + directory = os.path.dirname(path) + if directory: + os.makedirs(directory, exist_ok=True) + + with open(path, 'wb') as f: + f.write(content) + except Exception as e: + lazyllm.LOG.error(f'writing file [{path}] to disk failed: [{e}]') + raise e + self._manager.update_file_status([file_id], status=DocListManager.Status.success) + result = "Success" + ids.append(file_id) + results.append(result) return BaseResponse(data=[ids, results]) except Exception as e: @@ -105,7 +121,7 @@ def add_files(self, files: List[str] = Body(...), exist_id = exists_files_info.get(file, None) if exist_id: update_kws = dict(fileid=exist_id, status=DocListManager.Status.success) - if metadatas: update_kws["metadata"] = json.dumps(metadatas[idx]) + if metadatas: update_kws["meta"] = json.dumps(metadatas[idx]) self._manager.update_file_message(**update_kws) exist_ids.append(exist_id) id_mapping[file] = exist_id @@ -178,16 +194,13 @@ def delete_files(self, request: FileGroupRequest): if request.group_name: return self.delete_files_from_group(request) else: - self._manager.update_kb_group_file_status( - file_ids=request.file_ids, status=DocListManager.Status.deleting) - docs = self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleting) - - for doc in docs: - if os.path.exists(path := doc[1]): + documents = self._manager.delete_files(request.file_ids) + deleted_ids = set([ele.doc_id for ele in documents]) + for doc in documents: + if os.path.exists(path := doc.path): os.remove(path) - - self._manager.update_file_status(file_ids=request.file_ids, status=DocListManager.Status.deleted) - return BaseResponse() + results = ["Success" if ele.doc_id in deleted_ids else "Failed" for ele in documents] + return BaseResponse(data=[request.file_ids, results]) except Exception as e: return BaseResponse(code=500, msg=str(e), data=None) diff --git a/lazyllm/tools/rag/utils.py b/lazyllm/tools/rag/utils.py index 26cd036b..94a4c2c4 100644 --- a/lazyllm/tools/rag/utils.py +++ b/lazyllm/tools/rag/utils.py @@ -9,6 +9,10 @@ from .global_metadata import RAG_DOC_PATH, RAG_DOC_ID from lazyllm.common import override from lazyllm.common.queue import sqlite3_check_threadsafety +import sqlalchemy +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy import Column, insert, update, Row +from sqlalchemy.orm import sessionmaker import pydantic import sqlite3 @@ -34,6 +38,50 @@ def gen_docid(file_path: str) -> str: return hashlib.sha256(file_path.encode()).hexdigest() + +class KBDataBase(DeclarativeBase): + pass + + +class KBOperationLogs(KBDataBase): + __tablename__ = "operation_logs" + id = Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + log = Column(sqlalchemy.Text, nullable=False) + created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) + + +DocPartRow = Row +class KBDocument(KBDataBase): + __tablename__ = "documents" + + doc_id = Column(sqlalchemy.Text, primary_key=True) + filename = Column(sqlalchemy.Text, nullable=False, index=True) + path = Column(sqlalchemy.Text, nullable=False) + created_at = Column(sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False) + meta = Column(sqlalchemy.Text, nullable=True) + status = Column(sqlalchemy.Text, nullable=False, index=True) + count = Column(sqlalchemy.Integer, default=0) + + +class KBGroup(KBDataBase): + __tablename__ = "document_groups" + + group_id = Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + group_name = Column(sqlalchemy.String, nullable=False, unique=True) + + +class KBGroupDocuments(KBDataBase): + __tablename__ = "kb_group_documents" + + id = Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + doc_id = Column(sqlalchemy.String, sqlalchemy.ForeignKey("documents.doc_id"), nullable=False) + group_name = Column(sqlalchemy.String, sqlalchemy.ForeignKey("document_groups.group_name"), nullable=False) + status = Column(sqlalchemy.Text, nullable=True) + log = Column(sqlalchemy.Text, nullable=True) + # unique constraint + __table_args__ = (sqlalchemy.UniqueConstraint('doc_id', 'group_name', name='uq_doc_to_group'),) + + class DocListManager(ABC): DEFAULT_GROUP_NAME = '__default__' __pool__ = dict() @@ -47,6 +95,8 @@ class Status: deleting = 'deleting' deleted = 'deleted' + DELETE_SAFE_STATUS_LIST = [Status.waiting, Status.success, Status.failed] + def __init__(self, path, name): self._path = path self._name = name @@ -57,7 +107,7 @@ def __init__(self, path, name): def __new__(cls, *args, **kw): if cls is not DocListManager: return super().__new__(cls) - return __class__.__pool__[config['default_dlmanager']](*args, **kw) + return super().__new__(__class__.__pool__[config['default_dlmanager']]) def init_tables(self) -> 'DocListManager': if not self.table_inited(): @@ -71,9 +121,13 @@ def init_tables(self) -> 'DocListManager': self.add_files(files_list, status=DocListManager.Status.success) return self - def delete_files(self, file_ids: List[str], real: bool = False): - self.update_kb_group_file_status(file_ids=file_ids, status=DocListManager.Status.deleting) - self._delete_files(file_ids, real) + def delete_files(self, file_ids: List[str]) -> List[DocPartRow]: + document_list = self.update_file_status_by_cond( + file_ids, self.DELETE_SAFE_STATUS_LIST, DocListManager.Status.deleting + ) + safe_delete_ids = [doc.doc_id for doc in document_list] + self.update_kb_group_file_status(file_ids=safe_delete_ids, status=DocListManager.Status.deleting) + return document_list @abstractmethod def table_inited(self): pass @@ -99,19 +153,34 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de upload_status: Union[str, List[str]] = Status.all, exclude_upload_status: Optional[Union[str, List[str]]] = None): pass - def add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, - status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: - ids = self._add_files(files, metadatas, status, batch_size) - self.add_files_to_kb_group(ids, group=DocListManager.DEFAULT_GROUP_NAME) - return ids + def add_files( + self, + files: List[str], + metadatas: Optional[List[Dict[str, Any]]] = None, + status: Optional[str] = Status.waiting, + batch_size: int = 64, + ) -> List[DocPartRow]: + documents = self._add_files(files, metadatas, status, batch_size) + if documents: + self.add_files_to_kb_group([doc.doc_id for doc in documents], group=DocListManager.DEFAULT_GROUP_NAME) + return documents @abstractmethod def _add_files(self, files: List[str], metadatas: Optional[List] = None, - status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[str]: pass + status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[DocPartRow]: pass + + @abstractmethod + def delete_obsolete_files(self): pass @abstractmethod def update_file_message(self, fileid: str, **kw): pass + @abstractmethod + def update_file_status_by_cond( + self, file_ids: List[str], cond_status_list: Union[None, List[str]], new_status: str + ) -> List[DocPartRow]: + pass + @abstractmethod def add_files_to_kb_group(self, file_ids: List[str], group: str): pass @@ -144,38 +213,13 @@ def __init__(self, path, name): self._db_lock = FileLock(self._db_path + '.lock') # ensure that this connection is not used in another thread when sqlite3 is not threadsafe self._check_same_thread = not sqlite3_check_threadsafety() + self._engine = sqlalchemy.create_engine( + f"sqlite:///{self._db_path}?check_same_thread={self._check_same_thread}" + ) + self._Session = sessionmaker(bind=self._engine) def _init_tables(self): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: - conn.execute(""" - CREATE TABLE IF NOT EXISTS documents ( - doc_id TEXT PRIMARY KEY, - filename TEXT NOT NULL, - path TEXT NOT NULL, - metadata TEXT, - status TEXT, - count INTEGER DEFAULT 0 - ) - """) - conn.execute(""" - CREATE TABLE IF NOT EXISTS document_groups ( - group_id INTEGER PRIMARY KEY AUTOINCREMENT, - group_name TEXT NOT NULL UNIQUE - ) - """) - conn.execute(""" - CREATE TABLE IF NOT EXISTS kb_group_documents ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - doc_id TEXT NOT NULL, - group_name TEXT NOT NULL, - status TEXT, - log TEXT, - UNIQUE (doc_id, group_name), - FOREIGN KEY(doc_id) REFERENCES documents(doc_id), - FOREIGN KEY(group_name) REFERENCES document_groups(group_name) - ) - """) - conn.commit() + KBDataBase.metadata.create_all(bind=self._engine) def table_inited(self): with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: @@ -238,7 +282,7 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de upload_status: Union[str, List[str]] = DocListManager.Status.all, exclude_upload_status: Optional[Union[str, List[str]]] = None): query = """ - SELECT documents.doc_id, documents.path, documents.status, documents.metadata, + SELECT documents.doc_id, documents.path, documents.status, documents.meta, kb_group_documents.group_name, kb_group_documents.status, kb_group_documents.log FROM kb_group_documents JOIN documents ON kb_group_documents.doc_id = documents.doc_id @@ -272,35 +316,57 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de if not details: return [row[:2] for row in rows] return rows + def delete_obsolete_files(self): + with self._db_lock, self._Session() as session: + docs_to_delete = ( + session.query(KBDocument) + .filter(KBDocument.status == DocListManager.Status.deleting, KBDocument.count == 0) + .all() + ) + for doc in docs_to_delete: + session.delete(doc) + log = KBOperationLogs(log=f"Delete obsolete file: {doc.doc_id}") + session.add(log) + session.commit() + def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64): - results = [] + documents = [] + if not files: + return [] + for i in range(0, len(files), batch_size): batch_files = files[i:i + batch_size] batch_metadatas = metadatas[i:i + batch_size] if metadatas else None - insert_values, params = [], [] + vals = [] for i, file_path in enumerate(batch_files): - filename = os.path.basename(file_path) doc_id = gen_docid(file_path) metadata = batch_metadatas[i].copy() if batch_metadatas else {} metadata.setdefault(RAG_DOC_ID, doc_id) metadata.setdefault(RAG_DOC_PATH, file_path) - metadata_str = json.dumps(metadata) - insert_values.append("(?, ?, ?, ?, ?, ?)") - params.extend([doc_id, filename, file_path, metadata_str, status, 1]) - - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: - query = f""" - INSERT OR IGNORE INTO documents (doc_id, filename, path, metadata, status, count) - VALUES {', '.join(insert_values)} RETURNING doc_id; - """ - cursor = conn.execute(query, params) - results.extend([row[0] for row in cursor.fetchall()]) - conn.commit() - return results + vals.append( + { + KBDocument.doc_id.name: doc_id, + KBDocument.filename.name: os.path.basename(file_path), + KBDocument.path.name: file_path, + KBDocument.meta.name: json.dumps(metadata), + KBDocument.status.name: status, + KBDocument.count.name: 0, + } + ) + with self._db_lock, self._Session() as session: + rows = session.execute( + insert(KBDocument) + .values(vals) + .prefix_with('OR IGNORE') + .returning(KBDocument.doc_id, KBDocument.path) + ).fetchall() + session.commit() + documents.extend(rows) + return documents # TODO(wangzhihong): set to metadatas and enable this function def update_file_message(self, fileid: str, **kw): @@ -310,14 +376,43 @@ def update_file_message(self, fileid: str, **kw): conn.execute(f"UPDATE documents SET {set_clause} WHERE doc_id = ?", params) conn.commit() + def update_file_status_by_cond( + self, file_ids: List[str], cond_status_list: Union[None, List[str]], new_status: str + ) -> List[DocPartRow]: + rows = [] + if cond_status_list is None: + sql_cond = KBDocument.doc_id.in_(file_ids) + else: + sql_cond = sqlalchemy.and_(KBDocument.status.in_(cond_status_list), KBDocument.doc_id.in_(file_ids)) + with self._db_lock, self._Session() as session: + stmt = ( + update(KBDocument) + .where(sql_cond) + .values(status=new_status) + .returning(KBDocument.doc_id, KBDocument.path) + ) + rows = session.execute(stmt).fetchall() + session.commit() + return rows + def add_files_to_kb_group(self, file_ids: List[str], group: str): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: + with self._db_lock, self._Session() as session: + vals = [] for doc_id in file_ids: - conn.execute(""" - INSERT OR IGNORE INTO kb_group_documents (doc_id, group_name, status) - VALUES (?, ?, ?) - """, (doc_id, group, DocListManager.Status.waiting)) - conn.commit() + vals = { + KBGroupDocuments.doc_id.name: doc_id, + KBGroupDocuments.group_name.name: group, + KBGroupDocuments.status.name: DocListManager.Status.waiting, + } + rows = session.execute( + insert(KBGroupDocuments).values(vals).prefix_with('OR IGNORE').returning(KBGroupDocuments.doc_id) + ).fetchall() + session.commit() + if not rows: + continue + doc = session.query(KBDocument).filter_by(doc_id=rows[0].doc_id).one() + doc.count += 1 + session.commit() def _delete_files(self, file_ids: List[str], real: bool = False): if not real: @@ -328,11 +423,21 @@ def _delete_files(self, file_ids: List[str], real: bool = False): conn.commit() def delete_files_from_kb_group(self, file_ids: List[str], group: str): - with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: + with self._db_lock, self._Session() as session: for doc_id in file_ids: - conn.execute("UPDATE kb_group_documents SET status = ? WHERE doc_id = ? AND group_name = ?", - (DocListManager.Status.deleted, doc_id, group)) - conn.commit() + vals = { + KBGroupDocuments.status.name: DocListManager.Status.deleted, + } + cond = sqlalchemy.and_(KBGroupDocuments.doc_id == doc_id, KBGroupDocuments.group_name == group) + rows = session.execute( + update(KBGroupDocuments).where(cond).values(vals).returning(KBGroupDocuments.doc_id) + ).fetchall() + session.commit() + if not rows: + continue + doc = session.query(KBDocument).filter_by(doc_id=rows[0].doc_id).one() + doc.count = max(0, doc.count - 1) + session.commit() def get_file_status(self, fileid: str): with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn: @@ -369,6 +474,7 @@ def release(self): conn.execute('delete from documents') conn.execute('delete from document_groups') conn.execute('delete from kb_group_documents') + conn.execute('delete from operation_logs') conn.commit() def __reduce__(self): diff --git a/tests/basic_tests/test_doc_manager.py b/tests/basic_tests/test_doc_manager.py index ebb0973a..cac67b2b 100644 --- a/tests/basic_tests/test_doc_manager.py +++ b/tests/basic_tests/test_doc_manager.py @@ -33,6 +33,7 @@ def setUp(self): test_file_1.write("This is a test file 1.") test_file_2.write("This is a test file 2.") self.test_file_1, self.test_file_2 = str(test_file_1), str(test_file_2) + print(f"test_file_1:{test_file_1}, test_file_2:{test_file_2}") self.manager = DocListManager(str(test_dir), "TestManager") @@ -85,19 +86,31 @@ def test_delete_files(self): self.manager.delete_files([hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest()]) files_list = self.manager.list_files(details=True) assert len(files_list) == 2 - files_list = self.manager.list_files(details=True, exclude_status=DocListManager.Status.deleted) + files_list = self.manager.list_files(details=True, exclude_status=DocListManager.Status.deleting) assert len(files_list) == 1 assert not any(self.test_file_1.endswith(row[1]) for row in files_list) + def test_add_deleting_file(self): + self.manager.init_tables() + + self.manager.add_files([self.test_file_1, self.test_file_2]) + self.manager.delete_files([hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest()]) + files_list = self.manager.list_files(details=True) + assert len(files_list) == 2 + files_list = self.manager.list_files(details=True, status=DocListManager.Status.deleting) + assert len(files_list) == 1 + documents = self.manager.add_files([self.test_file_1]) + assert documents == [] + def test_update_file_message(self): self.manager.init_tables() self.manager.add_files([self.test_file_1]) file_id = hashlib.sha256(f'{self.test_file_1}'.encode()).hexdigest() - self.manager.update_file_message(file_id, metadata="New metadata", status="processed") + self.manager.update_file_message(file_id, meta="New metadata", status="processed") conn = sqlite3.connect(self.manager._db_path) - cursor = conn.execute("SELECT metadata, status FROM documents WHERE doc_id = ?", (file_id,)) + cursor = conn.execute("SELECT meta, status FROM documents WHERE doc_id = ?", (file_id,)) row = cursor.fetchone() conn.close()