Skip to content

Commit

Permalink
RAG: use scan instead of notify for file changes and add kb_group [st…
Browse files Browse the repository at this point in the history
…ep 1] (#296)
  • Loading branch information
wzh1994 authored Oct 11, 2024
1 parent 98cc6f2 commit cfda586
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 315 deletions.
2 changes: 2 additions & 0 deletions lazyllm/common/bind.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def __getitem__(self, key):
return self

def __getattr__(self, key):
if key.startswith('__') and key.endswith('__'):
raise AttributeError(f'Args has no attribute {key}')
self._attr_key = key
return self

Expand Down
10 changes: 5 additions & 5 deletions lazyllm/docs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@
>>> doc1 = Document(dataset_path="your_files_path", create_ui=False)
>>> doc2 = Document(dataset_path="your_files_path", create_ui=False)
>>> files = ["your_yml_files"]
>>> docs1 = doc1._impl._impl.directory_reader.load_data(input_files=files)
>>> docs2 = doc2._impl._impl.directory_reader.load_data(input_files=files)
>>> docs1 = doc1._impl._reader.load_data(input_files=files)
>>> docs2 = doc2._impl._reader.load_data(input_files=files)
>>> print(docs1[0].text == docs2[0].text)
# True
''')
Expand Down Expand Up @@ -207,9 +207,9 @@
# {}
>>> files = ["your_yml_files"]
>>> Document.register_global_reader("**/*.yml", processYml)
>>> doc1._impl._impl.directory_reader.load_data(input_files=files)
>>> doc1._impl._reader.load_data(input_files=files)
# Call the class YmlReader.
>>> doc2._impl._impl.directory_reader.load_data(input_files=files)
>>> doc2._impl._reader.load_data(input_files=files)
# Call the function processYml.
''')

Expand Down Expand Up @@ -251,7 +251,7 @@
...
>>> files = ["your_yml_files"]
>>> doc = Document(dataset_path="your_files_path", create_ui=False)
>>> reader = doc._impl._impl.directory_reader.load_data(input_files=files)
>>> reader = doc._impl._reader.load_data(input_files=files)
# Call the class YmlReader.
''')

Expand Down
2 changes: 1 addition & 1 deletion lazyllm/tools/rag/data_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .dataReader import SimpleDirectoryReader

class DirectoryReader:
def __init__(self, input_files: List[str], local_readers: Optional[Dict] = None,
def __init__(self, input_files: Optional[List[str]], local_readers: Optional[Dict] = None,
global_readers: Optional[Dict] = None) -> None:
self._input_files = input_files
self._local_readers = local_readers
Expand Down
74 changes: 57 additions & 17 deletions lazyllm/tools/rag/doc_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .store import MapStore, DocNode, ChromadbStore, LAZY_ROOT_NAME, BaseStore
from .data_loaders import DirectoryReader
from .index import DefaultIndex
import os

_transmap = dict(function=FuncNodeTransform, sentencesplitter=SentenceSplitter, llm=LLMParser)

Expand All @@ -25,13 +26,19 @@ def wrapper(*args, **kwargs) -> List[float]:


class DocImpl:
_builtin_node_groups = {}
_global_node_groups = {}
DEDAULT_GROUP_NAME = '__default__'
_builtin_node_groups: Dict[str, Dict] = {}
_global_node_groups: Dict[str, Dict] = {}
_registered_file_reader: Dict[str, Callable] = {}

def __init__(self, embed: Dict[str, Callable], doc_files=Optional[List[str]],
local_readers: Optional[Dict] = None, global_readers: Optional[Dict] = None, **kwargs):
def __init__(self, embed: Dict[str, Callable], dataset_path: Optional[str] = None,
doc_files: Optional[str] = None, kb_group_name: str = None):
super().__init__()
self.directory_reader = DirectoryReader(doc_files, local_readers=local_readers, global_readers=global_readers)
assert (dataset_path is None) ^ (doc_files is None), 'Only one of dataset_path or doc_files should be provided'
self._local_file_reader: Dict[str, Callable] = {}
self._kb_group_name = kb_group_name or DocImpl.DEDAULT_GROUP_NAME
self._dataset_path, self._doc_files = dataset_path, doc_files
self._reader = DirectoryReader(None, self._local_file_reader, DocImpl._registered_file_reader)
self.node_groups: Dict[str, Dict] = {LAZY_ROOT_NAME: {}}
self.embed = {k: embed_wrapper(e) for k, e in embed.items()}
self.store = None
Expand All @@ -46,7 +53,7 @@ def _lazy_init(self) -> None:
self.store = self._get_store()
self.index = DefaultIndex(self.embed, self.store)
if not self.store.has_nodes(LAZY_ROOT_NAME):
root_nodes = self.directory_reader.load_data()
root_nodes = self._reader.load_data(self._list_files())
self.store.add_nodes(root_nodes)
LOG.debug(f"building {LAZY_ROOT_NAME} nodes: {root_nodes}")

Expand Down Expand Up @@ -115,11 +122,48 @@ def create_node_group(self, name, transform: Union[str, Callable] = None, parent
DocImpl._create_node_group_impl(self, 'node_groups', name=name, transform=transform, parent=parent,
trans_node=trans_node, num_workers=num_workers, **kwargs)

def add_files(self, input_files: List[str]) -> None:
@classmethod
def register_global_reader(cls, pattern: str, func: Optional[Callable] = None):
if func is not None:
cls._registered_file_reader[pattern] = func
return None

def decorator(klass):
if callable(klass): cls._registered_file_reader[pattern] = klass
else: raise TypeError(f"The registered object {klass} is not a callable object.")
return klass
return decorator

def add_reader(self, pattern: str, func: Optional[Callable] = None):
assert callable(func), 'func for reader should be callable'
self._local_file_reader[pattern] = func

# TODO(wangzhihong): modify here to fit kb-groups
def _list_files(self) -> List[str]:
if self._doc_files: return self._doc_files
if not os.path.isabs(self._dataset_path):
raise ValueError("directory must be an absolute path")

path = (os.path.join(self._dataset_path, self._kb_group_name)
if self._kb_group_name != DocImpl.DEDAULT_GROUP_NAME else self._dataset_path)

try:
files_list = []

for root, _, files in os.walk(path):
files = [os.path.join(root, file_path) for file_path in files]
files_list.extend(files)

return files_list
except Exception as e:
LOG.error(f"Error while listing files in {path}: {e}")
return []

def _add_files(self, input_files: List[str]):
if len(input_files) == 0:
return
self._lazy_init()
root_nodes = self.directory_reader.load_data(input_files)
root_nodes = self._reader.load_data(input_files)
temp_store = self._get_store()
temp_store.add_nodes(root_nodes)
active_groups = self.store.active_groups()
Expand All @@ -130,7 +174,7 @@ def add_files(self, input_files: List[str]) -> None:
self.store.add_nodes(nodes)
LOG.debug(f"Merge {group} with {nodes}")

def delete_files(self, input_files: List[str]) -> None:
def _delete_files(self, input_files: List[str]) -> None:
self._lazy_init()
docs = self.store.get_nodes_by_files(input_files)
LOG.info(f"delete_files: removing documents {input_files} and nodes {docs}")
Expand Down Expand Up @@ -186,7 +230,8 @@ def retrieve(self, query: str, group_name: str, similarity: str, similarity_cut_
query, nodes, similarity, similarity_cut_off, topk, embed_keys, **similarity_kws
)

def find_parent(self, nodes: List[DocNode], group: str) -> List[DocNode]:
@staticmethod
def find_parent(nodes: List[DocNode], group: str) -> List[DocNode]:
def recurse_parents(node: DocNode, visited: Set[DocNode]) -> None:
if node.parent:
if node.parent.group == group:
Expand All @@ -203,13 +248,8 @@ def recurse_parents(node: DocNode, visited: Set[DocNode]) -> None:
LOG.debug(f"Found parent node for {group}: {result}")
return list(result)

def find_children(self, nodes: List[DocNode], group: str) -> List[DocNode]:
active_groups = self.store.active_groups()
if group not in active_groups:
raise ValueError(
f"group {group} not found in active groups {active_groups}, please retrieve the group first."
)

@staticmethod
def find_children(nodes: List[DocNode], group: str) -> List[DocNode]:
def recurse_children(node: DocNode, visited: Set[DocNode]) -> bool:
if group in node.children:
visited.update(node.children[group])
Expand Down
62 changes: 19 additions & 43 deletions lazyllm/tools/rag/doc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,65 +6,41 @@
import lazyllm
from lazyllm import FastapiApp as app

from .group_doc import DocGroupImpl
from .utils import BaseResponse, save_files_in_threads
from .utils import BaseResponse


class DocManager(lazyllm.ModuleBase):
def __init__(self, doc_impl: DocGroupImpl) -> None:
def __init__(self, root: str) -> None:
super().__init__()
self._impl = doc_impl
self._root = root

@app.get("/", response_model=BaseResponse, summary="docs")
def document(self):
return RedirectResponse(url="/docs")

@app.post("/new_group")
def new_group(self, group_name: str):
self._impl.new_group(group_name)
return BaseResponse(msg=f"create {group_name} success")

@app.post("/delete_group")
def delete_group(self, group_name: str):
self._impl.delete_group(group_name)
return BaseResponse(msg=f"delete {group_name} success")

@app.get("/list_groups")
def list_groups(self):
gourp_list = self._impl.list_groups()
return BaseResponse(data=gourp_list)
@app.get("/list_kb_groups")
def list_kb_groups(self):
pass

@app.post("/upload_files")
def upload_files(self, files: List[UploadFile], group_name: str, override: bool):
already_exist_files, new_add_files, overwritten_files = save_files_in_threads(
files=files,
source_path=self._impl.get_group_source_path(group_name=group_name),
override=override,
)

self._impl.delete_files(group_name, overwritten_files, is_del_source=False)
self._impl.add_files(group_name, new_add_files + overwritten_files)
return BaseResponse(
data={
"already_exist_files": already_exist_files,
"new_add_files": new_add_files,
"overwritten_files": overwritten_files,
}
)
def upload_files(self, files: List[UploadFile], override: bool):
pass

@app.get("/list_files")
def list_files(self, group_name: str):
file_list = self._impl.list_files(group_name)
return BaseResponse(data=file_list)
pass

@app.post("/add_files_to_group")
def add_files_to_group(self, files: List[UploadFile], group_name: str):
pass

@app.post("/delete_file")
def delete_file(self, group_name: str, file_name: str):
self._impl.delete_files(group_name, files=[file_name])
return BaseResponse(msg=f"delete {file_name} success")
@app.post("/delete_files")
def delete_file(self, file_names: str):
pass

def forward(self, *args, **kwargs):
func_name = kwargs.pop("func_name")
return getattr(self._impl, func_name)(*args, **kwargs)
@app.post("/delete_files_from_group")
def delete_files_from_group(self, group_name: str, file_names: str):
pass

def __repr__(self):
return lazyllm.make_repr("Module", "DocManager")
98 changes: 48 additions & 50 deletions lazyllm/tools/rag/document.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,53 @@
from functools import partial
import os

from typing import Callable, Optional, Dict, Union
from typing import Callable, Optional, Dict, Union, List
import lazyllm
from lazyllm import ModuleBase, ServerModule, DynamicDescriptor

from .web import DocWebModule
from .doc_manager import DocManager
from .group_doc import DocGroupImpl, DocImpl
from .store import LAZY_ROOT_NAME, EMBED_DEFAULT_KEY
from .doc_impl import DocImpl
from .store import LAZY_ROOT_NAME, EMBED_DEFAULT_KEY, DocNode
import copy


class Document(ModuleBase):
_registered_file_reader: Dict[str, Callable] = {}
class _Impl(ModuleBase):
def __init__(self, dataset_path: str, embed: Optional[Union[Callable, Dict[str, Callable]]] = None,
manager: bool = False, server: bool = False, launcher=None):
super().__init__()
if not os.path.exists(dataset_path):
defatult_path = os.path.join(lazyllm.config["data_path"], dataset_path)
if os.path.exists(defatult_path):
dataset_path = defatult_path
launcher = launcher if launcher else lazyllm.launchers.remote(sync=False)
self._dataset_path = dataset_path
self._embed = embed if isinstance(embed, dict) else {EMBED_DEFAULT_KEY: embed}
for embed in self._embed.values():
if isinstance(embed, ModuleBase):
self._submodules.append(embed)
self._kbs = {DocImpl.DEDAULT_GROUP_NAME: DocImpl(dataset_path=dataset_path, embed=self._embed)}
if manager: self._manager = DocManager(dataset_path)
if server: self._doc = ServerModule(self._doc)

def add_kb_group(self, name): self._kbs[name] = DocImpl(dataset_path=self.dataset_path, embed=self._embed)
def get_doc_by_kb_group(self, name): return self._kbs[name]

def __init__(self, dataset_path: str, embed: Optional[Union[Callable, Dict[str, Callable]]] = None,
create_ui: bool = False, manager: bool = False, launcher=None):
create_ui: bool = False, manager: bool = False, server: bool = False, launcher=None):
super().__init__()
if not os.path.exists(dataset_path):
defatult_path = os.path.join(lazyllm.config["data_path"], dataset_path)
if os.path.exists(defatult_path):
dataset_path = defatult_path
if create_ui:
lazyllm.LOG.warning('`create_ui` for Document is deprecated, use `manager` instead')
self._manager = create_ui or manager
launcher = launcher if launcher else lazyllm.launchers.remote(sync=False)
self._local_file_reader: Dict[str, Callable] = {}
self._embed = embed if isinstance(embed, dict) else {EMBED_DEFAULT_KEY: embed}
for embed in self._embed.values():
if isinstance(embed, ModuleBase):
self._submodules.append(embed)

self._impl = DocGroupImpl(dataset_path=dataset_path, embed=self._embed, local_readers=self._local_file_reader,
global_readers=self._registered_file_reader)
if self._manager:
doc_manager = DocManager(self._impl)
self.doc_server = ServerModule(doc_manager, launcher=launcher)
self.web = DocWebModule(doc_server=self.doc_server)

def forward(self, func_name: str, *args, **kwargs):
if self._manager:
kwargs["func_name"] = func_name
return self.doc_server.forward(*args, **kwargs)
else:
return getattr(self._impl, func_name)(*args, **kwargs)

def find_parent(self, group: str) -> Callable:
return partial(self.forward, "find_parent", group=group)
self._impls = Document._Impl(dataset_path, embed, create_ui or manager, server, launcher)
self._curr_group = DocImpl.DEDAULT_GROUP_NAME

def find_children(self, group: str) -> Callable:
return partial(self.forward, "find_children", group=group)
def create_kb_group(self, name: str) -> "Document":
self._impls.add_kb_group(name)
doc = copy.copy(self)
doc._curr_group = name
return doc

def __repr__(self):
return lazyllm.make_repr("Module", "Document", manager=self._manager)
@property
def _impl(self): return self._impls.get_doc_by_kb_group(self._curr_group)

@DynamicDescriptor
def create_node_group(self, name: str = None, *, transform: Callable, parent: str = LAZY_ROOT_NAME,
Expand All @@ -67,19 +62,22 @@ def create_node_group(self, name: str = None, *, transform: Callable, parent: st
@DynamicDescriptor
def add_reader(self, pattern: str, func: Optional[Callable] = None):
if isinstance(self, type):
return self.register_global_reader(pattern=pattern, func=func)
return DocImpl.register_global_reader(pattern=pattern, func=func)
else:
assert callable(func), 'func for reader should be callable'
self._local_file_reader[pattern] = func
self._impl.add_reader(pattern, func)

@classmethod
def register_global_reader(cls, pattern: str, func: Optional[Callable] = None):
if func is not None:
cls._registered_file_reader[pattern] = func
return None
return cls.add_reader(pattern, func)

def find_parent(self) -> Callable:
return DocImpl.find_parent

def find_children(self) -> Callable:
return DocImpl.find_children

def decorator(klass):
if callable(klass): cls._registered_file_reader[pattern] = klass
else: raise TypeError(f"The registered object {klass} is not a callable object.")
return klass
return decorator
def forward(self, *args, **kw) -> List[DocNode]:
return self._impl.retrieve(*args, **kw)

def __repr__(self):
return lazyllm.make_repr("Module", "Document", manager=bool(self._manager))
Loading

0 comments on commit cfda586

Please sign in to comment.