Skip to content

Commit

Permalink
RAG: use scan instead of notify for file changes and add kb_group [st…
Browse files Browse the repository at this point in the history
…ep 2] (#302)
  • Loading branch information
wzh1994 authored Oct 14, 2024
1 parent 8e0d791 commit bf7a7a7
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 56 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/macOS_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ env:

jobs:
macos-basic_tests:
if: |
!contains(github.event.head_commit.message, '[skip ci]')
&& !contains(github.event.pull_request.title, '[skip ci]')
runs-on: macos-13

steps:
Expand Down Expand Up @@ -81,11 +84,14 @@ jobs:
pip install -r tests/requirements.txt
export LAZYLLM_DATA_PATH=/tmp/lazyllm/data
python -m pytest -v --reruns=2 tests/basic_tests/
timeout-minutes: 25
timeout-minutes: 30
env :
GITHUB_TOKEN: ${{ secrets.PERSONAL_GITHUB_TOKEN }}

macos-charge_tests:
if: |
!contains(github.event.head_commit.message, '[skip ci]')
&& !contains(github.event.pull_request.title, '[skip ci]')
runs-on: macos-13

steps:
Expand Down
3 changes: 2 additions & 1 deletion lazyllm/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .common import package, kwargs, arguments, LazyLLMCMD, timeout, final, ReadOnlyWrapper, DynamicDescriptor
from .common import FlatList, Identity, ResultCollector, ArgsDict, CaseInsensitiveDict
from .common import ReprRule, make_repr, modify_repr
from .common import once_flag, call_once, once_wrapper
from .common import once_flag, call_once, once_wrapper, singleton
from .option import Option, OptionIter
from .threading import Thread, ThreadPoolExecutor
from .multiprocessing import SpawnProcess, ForkProcess
Expand Down Expand Up @@ -31,6 +31,7 @@
'deprecated',
'compile_func',
'DynamicDescriptor',
'singleton',

# arg praser
'LazyLLMCMD',
Expand Down
9 changes: 9 additions & 0 deletions lazyllm/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,12 @@ def __init__(self, func):

def __get__(self, instance, owner):
return DynamicDescriptor.Impl(self.__func__, instance, owner)


def singleton(cls):
instances = {}

def get_instance(*args, **kwargs):
if cls not in instances: instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
67 changes: 39 additions & 28 deletions lazyllm/tools/rag/doc_impl.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import ast
from collections import defaultdict
from functools import wraps
from typing import Callable, Dict, List, Optional, Set, Union
from typing import Callable, Dict, List, Optional, Set, Union, Tuple
from lazyllm import LOG, config, once_wrapper
from .transform import (NodeTransform, FuncNodeTransform, SentenceSplitter, LLMParser,
AdaptiveTransform, make_transform, TransformArgs)
from .store import MapStore, DocNode, ChromadbStore, LAZY_ROOT_NAME, BaseStore
from .data_loaders import DirectoryReader
from .index import DefaultIndex
import os
from .utils import DocListManager
import threading
import time

_transmap = dict(function=FuncNodeTransform, sentencesplitter=SentenceSplitter, llm=LLMParser)

Expand All @@ -26,18 +28,17 @@ def wrapper(*args, **kwargs) -> List[float]:


class DocImpl:
DEDAULT_GROUP_NAME = '__default__'
_builtin_node_groups: Dict[str, Dict] = {}
_global_node_groups: Dict[str, Dict] = {}
_registered_file_reader: Dict[str, Callable] = {}

def __init__(self, embed: Dict[str, Callable], dataset_path: Optional[str] = None,
def __init__(self, embed: Dict[str, Callable], dlm: Optional[DocListManager] = None,
doc_files: Optional[str] = None, kb_group_name: str = None):
super().__init__()
assert (dataset_path is None) ^ (doc_files is None), 'Only one of dataset_path or doc_files should be provided'
assert (dlm is None) ^ (doc_files is None), 'Only one of dataset_path or doc_files should be provided'
self._local_file_reader: Dict[str, Callable] = {}
self._kb_group_name = kb_group_name or DocImpl.DEDAULT_GROUP_NAME
self._dataset_path, self._doc_files = dataset_path, doc_files
self._kb_group_name = kb_group_name or DocListManager.DEDAULT_GROUP_NAME
self._dlm, self._doc_files = dlm, doc_files
self._reader = DirectoryReader(None, self._local_file_reader, DocImpl._registered_file_reader)
self.node_groups: Dict[str, Dict] = {LAZY_ROOT_NAME: {}}
self.embed = {k: embed_wrapper(e) for k, e in embed.items()}
Expand All @@ -53,10 +54,17 @@ def _lazy_init(self) -> None:
self.store = self._get_store()
self.index = DefaultIndex(self.embed, self.store)
if not self.store.has_nodes(LAZY_ROOT_NAME):
root_nodes = self._reader.load_data(self._list_files())
ids, pathes = self._list_files()
root_nodes = self._reader.load_data(pathes)
self.store.add_nodes(root_nodes)
if self._dlm: self._dlm.update_kb_group_file_status(self._kb_group_name, ids, 'success')
LOG.debug(f"building {LAZY_ROOT_NAME} nodes: {root_nodes}")

if self._dlm:
self._daemon = threading.Thread(target=self.worker)
self._daemon.daemon = True
self._daemon.start()

def _get_store(self) -> BaseStore:
rag_store_type = config["rag_store_type"]
if rag_store_type == "map":
Expand Down Expand Up @@ -138,26 +146,29 @@ def add_reader(self, pattern: str, func: Optional[Callable] = None):
assert callable(func), 'func for reader should be callable'
self._local_file_reader[pattern] = func

# TODO(wangzhihong): modify here to fit kb-groups
def _list_files(self) -> List[str]:
if self._doc_files: return self._doc_files
if not os.path.isabs(self._dataset_path):
raise ValueError("directory must be an absolute path")

path = (os.path.join(self._dataset_path, self._kb_group_name)
if self._kb_group_name != DocImpl.DEDAULT_GROUP_NAME else self._dataset_path)

try:
files_list = []

for root, _, files in os.walk(path):
files = [os.path.join(root, file_path) for file_path in files]
files_list.extend(files)

return files_list
except Exception as e:
LOG.error(f"Error while listing files in {path}: {e}")
return []
def worker(self):
while True:
ids, files = self._list_files(status='delete')
if files:
self._dlm.update_kb_group_file_status(self._kb_group_name, ids, 'deleting')
self._delete_files(files)
self._dlm.delete_files_from_kb_group(ids, self._kb_group_name)
continue

ids, files = self._list_files(status='waiting')
if files:
self._dlm.update_kb_group_file_status(self._kb_group_name, ids, 'processing')
self._add_files(files)
self._dlm.update_kb_group_file_status(self._kb_group_name, ids, 'success')
time.sleep(10)

def _list_files(self, status: str = 'all') -> Tuple[List[str], List[str]]:
if self._doc_files: return None, self._doc_files
ids, paths = [], []
for row in self._dlm.list_kb_group_files(group=self._kb_group_name, status=status, details=True):
ids.append(row[0])
paths.append(row[1])
return ids, paths

def _add_files(self, input_files: List[str]):
if len(input_files) == 0:
Expand Down
46 changes: 38 additions & 8 deletions lazyllm/tools/rag/doc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,72 @@

import lazyllm
from lazyllm import FastapiApp as app
from .utils import DocListManager

from .utils import BaseResponse


class DocManager(lazyllm.ModuleBase):
def __init__(self, root: str) -> None:
def __init__(self, dlm: DocListManager) -> None:
super().__init__()
self._root = root
self._manager = dlm

@app.get("/", response_model=BaseResponse, summary="docs")
def document(self):
return RedirectResponse(url="/docs")

@app.get("/list_kb_groups")
def list_kb_groups(self):
pass
try:
return BaseResponse(data=self._manager.list_all_kb_group())
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

@app.post("/upload_files")
def upload_files(self, files: List[UploadFile], override: bool):
pass
try:
self._manager.add_files(files)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

@app.get("/list_files")
def list_files(self, group_name: str):
pass
try:
return BaseResponse(data=self._manager.list_files())
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

@app.get("/list_files_in_group")
def list_files_in_group(self, group_name: str):
try:
return BaseResponse(data=self._manager.list_kb_group_files(group_name, details=True))
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

@app.post("/add_files_to_group")
def add_files_to_group(self, files: List[UploadFile], group_name: str):
pass
try:
self._manager.add_files_to_kb_group(files, group_name)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

@app.post("/delete_files")
def delete_file(self, file_names: str):
pass
try:
self._manager.delete_files(file_names)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

@app.post("/delete_files_from_group")
def delete_files_from_group(self, group_name: str, file_names: str):
pass
try:
self._manager.delete_files_from_kb_group(file_names, group_name)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

def __repr__(self):
return lazyllm.make_repr("Module", "DocManager")
21 changes: 14 additions & 7 deletions lazyllm/tools/rag/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from .doc_manager import DocManager
from .doc_impl import DocImpl
from .store import LAZY_ROOT_NAME, EMBED_DEFAULT_KEY, DocNode
from .utils import DocListManager
import copy


class Document(ModuleBase):
class _Impl(ModuleBase):
def __init__(self, dataset_path: str, embed: Optional[Union[Callable, Dict[str, Callable]]] = None,
manager: bool = False, server: bool = False, launcher=None):
manager: bool = False, server: bool = False, name: Optional[str] = None, launcher=None):
super().__init__()
if not os.path.exists(dataset_path):
defatult_path = os.path.join(lazyllm.config["data_path"], dataset_path)
Expand All @@ -22,23 +23,29 @@ def __init__(self, dataset_path: str, embed: Optional[Union[Callable, Dict[str,
launcher = launcher if launcher else lazyllm.launchers.remote(sync=False)
self._dataset_path = dataset_path
self._embed = embed if isinstance(embed, dict) else {EMBED_DEFAULT_KEY: embed}
self.name = name
for embed in self._embed.values():
if isinstance(embed, ModuleBase):
self._submodules.append(embed)
self._kbs = {DocImpl.DEDAULT_GROUP_NAME: DocImpl(dataset_path=dataset_path, embed=self._embed)}
if manager: self._manager = DocManager(dataset_path)
self._dlm = DocListManager(dataset_path, name).init_tables()
self._kbs = {DocListManager.DEDAULT_GROUP_NAME: DocImpl(embed=self._embed, dlm=self._dlm)}
if manager: self._manager = DocManager(self._dlm)
if server: self._doc = ServerModule(self._doc)

def add_kb_group(self, name): self._kbs[name] = DocImpl(dataset_path=self.dataset_path, embed=self._embed)
def add_kb_group(self, name):
self._kbs[name] = DocImpl(dlm=self._dlm, embed=self._embed, kb_group_name=name)
self._dlm.add_kb_group(name)

def get_doc_by_kb_group(self, name): return self._kbs[name]

def __init__(self, dataset_path: str, embed: Optional[Union[Callable, Dict[str, Callable]]] = None,
create_ui: bool = False, manager: bool = False, server: bool = False, launcher=None):
create_ui: bool = False, manager: bool = False, server: bool = False,
name: Optional[str] = None, launcher=None):
super().__init__()
if create_ui:
lazyllm.LOG.warning('`create_ui` for Document is deprecated, use `manager` instead')
self._impls = Document._Impl(dataset_path, embed, create_ui or manager, server, launcher)
self._curr_group = DocImpl.DEDAULT_GROUP_NAME
self._impls = Document._Impl(dataset_path, embed, create_ui or manager, server, name, launcher)
self._curr_group = DocListManager.DEDAULT_GROUP_NAME

def create_kb_group(self, name: str) -> "Document":
self._impls.add_kb_group(name)
Expand Down
2 changes: 1 addition & 1 deletion lazyllm/tools/rag/retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, target: Optional[str] = None,
def _post_process(self, nodes):
if self._target:
# TODO(wangzhihong): search relationship and add find_child
nodes = self._doc.find_parent(self._target)(nodes)
nodes = DocImpl.find_parent(self._target)(nodes)
if self._output_format == 'content':
nodes = [node.get_content() for node in nodes]
if isinstance(self._join, str): nodes = self._join.join(nodes)
Expand Down
Loading

0 comments on commit bf7a7a7

Please sign in to comment.