Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rag readers for replace llamaindex #239

Merged
merged 55 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3be037b
add lazyllm before group
Aug 8, 2024
9337e4c
Merge remote-tracking branch 'upstream/main'
Aug 8, 2024
67f086c
Merge remote-tracking branch 'upstream/main'
Aug 9, 2024
dbf3280
Merge remote-tracking branch 'upstream/main'
Aug 12, 2024
d2421d2
Merge remote-tracking branch 'upstream/main'
Aug 13, 2024
b1169fe
Merge remote-tracking branch 'upstream/main'
Aug 14, 2024
2d73df9
Merge remote-tracking branch 'upstream/main'
Aug 14, 2024
e3193b1
Merge remote-tracking branch 'upstream/main'
Aug 14, 2024
80fc6c5
Merge remote-tracking branch 'upstream/main'
Aug 14, 2024
8a1b54b
Merge remote-tracking branch 'upstream/main'
Aug 15, 2024
ef51ad1
Merge remote-tracking branch 'upstream/main'
Aug 16, 2024
3d39b4e
Merge remote-tracking branch 'upstream/main'
Aug 19, 2024
63acf1a
Merge remote-tracking branch 'upstream/main'
Aug 20, 2024
5d8be06
Merge remote-tracking branch 'upstream/main'
Aug 20, 2024
a0cdd74
Merge remote-tracking branch 'upstream/main'
Aug 23, 2024
c44f5ec
Merge remote-tracking branch 'upstream/main'
Aug 23, 2024
5093af2
Merge remote-tracking branch 'upstream/main'
Aug 24, 2024
214ec1b
Merge remote-tracking branch 'upstream/main'
Aug 28, 2024
2224adc
Merge remote-tracking branch 'upstream/main'
Aug 29, 2024
ec59048
Merge remote-tracking branch 'upstream/main'
Aug 29, 2024
92e9c6b
Merge remote-tracking branch 'upstream/main'
Aug 30, 2024
e079af8
Merge remote-tracking branch 'upstream/main'
Aug 30, 2024
02892f2
Merge remote-tracking branch 'upstream/main'
Aug 30, 2024
9348f06
Merge remote-tracking branch 'upstream/main'
Aug 30, 2024
3bd6c6a
Merge remote-tracking branch 'upstream/main'
Sep 2, 2024
e11d29d
Merge remote-tracking branch 'upstream/main'
Sep 2, 2024
1494288
Merge remote-tracking branch 'upstream/main'
Sep 3, 2024
2a9c4fe
Merge remote-tracking branch 'upstream/main'
Sep 3, 2024
61cfb0c
Merge remote-tracking branch 'upstream/main'
Sep 3, 2024
4558336
Merge remote-tracking branch 'upstream/main'
Sep 4, 2024
407b84e
Merge remote-tracking branch 'upstream/main'
Sep 4, 2024
ce3e287
Merge remote-tracking branch 'upstream/main'
Sep 5, 2024
4546f52
Merge remote-tracking branch 'upstream/main'
Sep 5, 2024
cbf4a7e
add readers for rag
Sep 10, 2024
6357946
Merge remote-tracking branch 'upstream/main'
Sep 10, 2024
072a2ed
Merge branch 'main' into wj/rag_reader
Sep 10, 2024
aab9c47
optimizing reader register process
Sep 12, 2024
306c115
Merge remote-tracking branch 'upstream/main'
Sep 12, 2024
796d801
merge main branch
Sep 12, 2024
7b6bc80
optimizing rag readers register
Sep 13, 2024
9a92680
Merge remote-tracking branch 'upstream/main'
Sep 13, 2024
a42a4c1
Merge branch 'main' into wj/rag_reader
Sep 13, 2024
973c023
add two package for readers in unit tests
Sep 13, 2024
5eca597
modify rag reader file path in unit test
Sep 13, 2024
c03756b
Optimize the usage priority of local and global registered file readers
Sep 14, 2024
706a734
Merge remote-tracking branch 'upstream/main'
Sep 14, 2024
89463f8
Merge branch 'main' into wj/rag_reader
Sep 14, 2024
e5d70eb
modify reader usage priority
Sep 14, 2024
ab90f96
Merge remote-tracking branch 'upstream/main'
Sep 14, 2024
55d5840
Merge branch 'main' into wj/rag_reader
Sep 14, 2024
0d70ed8
modify pattern in readers of unittests
Sep 14, 2024
8b56a64
modify reader test bug in unit tests
Sep 14, 2024
66ef831
Merge remote-tracking branch 'upstream/main'
Sep 14, 2024
2524a5a
Merge branch 'main' into wj/rag_reader
Sep 14, 2024
36c967a
remove yaml package in rag reader of unittests
Sep 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
run: |
set -ex
cd ${{ env.CI_PATH }}
pip install -r tests/requirements.txt
realpath .
env | grep '^SCC'
export LAZYLLM_SCO_ENV_NAME=lazyllm
Expand Down
16 changes: 16 additions & 0 deletions lazyllm/tools/rag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from .transform import SentenceSplitter, LLMParser, NodeTransform
from .index import register_similarity
from .store import DocNode
from .readers import (PDFReader, DocxReader, HWPReader, PPTXReader, ImageReader, IPYNBReader, EpubReader,
MarkdownReader, MboxReader, PandasCSVReader, PandasExcelReader, VideoAudioReader)
from .dataReader import SimpleDirectoryReader


__all__ = [
Expand All @@ -16,4 +19,17 @@
"register_similarity",
"register_reranker",
"DocNode",
"PDFReader",
"DocxReader",
"HWPReader",
"PPTXReader",
"ImageReader",
"IPYNBReader",
"EpubReader",
"MarkdownReader",
"MboxReader",
"PandasCSVReader",
"PandasExcelReader",
"VideoAudioReader",
"SimpleDirectoryReader",
]
260 changes: 260 additions & 0 deletions lazyllm/tools/rag/dataReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
"""
The overall process of SimpleDirectoryReader is borrowed from LLAMA_INDEX, but we have added a customized part
based on it, that is, allowing users to register custom rules instead of processing only based on file suffixes.
"""
import os
import mimetypes
import multiprocessing
import fnmatch
from tqdm import tqdm
from datetime import datetime
from functools import reduce
from itertools import repeat
from typing import Dict, Optional, List, Callable, Type
from pathlib import Path, PurePosixPath, PurePath
from fsspec import AbstractFileSystem
from lazyllm import ModuleBase, LOG
from .store import DocNode
from .readers import (ReaderBase, PDFReader, DocxReader, HWPReader, PPTXReader, ImageReader, IPYNBReader,
EpubReader, MarkdownReader, MboxReader, PandasCSVReader, PandasExcelReader, VideoAudioReader,
get_default_fs, is_default_fs)

def _file_timestamp_format(timestamp: float, include_time: bool = False) -> Optional[str]:
wangjian052163 marked this conversation as resolved.
Show resolved Hide resolved
try:
if include_time:
return datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ")
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d")
except Exception:
return None

class _DefaultFileMetadataFunc:
def __init__(self, fs: Optional[AbstractFileSystem] = None):
self._fs = fs or get_default_fs()

def __call__(self, file_path: str) -> Dict:
stat_result = self._fs.stat(file_path)

try:
file_name = os.path.basename(str(stat_result['name']))
except Exception:
file_name = os.path.basename(file_path)

creation_date = _file_timestamp_format(stat_result.get("created"))
last_modified_date = _file_timestamp_format(stat_result.get("mtime"))
last_accessed_date = _file_timestamp_format(stat_result.get("atime"))
default_meta = {
"file_path": file_path,
"file_name": file_name,
"file_type": mimetypes.guess_type(file_path)[0],
"file_size": stat_result.get("size"),
"creation_date": creation_date,
"last_modified_date": last_modified_date,
"last_accessed_date": last_accessed_date,
}

return {meta_key: meta_value for meta_key, meta_value in default_meta.items() if meta_value is not None}

class SimpleDirectoryReader(ModuleBase):
default_file_readers: Dict[str, Type[ReaderBase]] = {
"*.pdf": PDFReader,
"*.docx": DocxReader,
"*.hwp": HWPReader,
"*.pptx": PPTXReader,
"*.ppt": PPTXReader,
"*.pptm": PPTXReader,
"*.gif": ImageReader,
"*.jpeg": ImageReader,
"*.jpg": ImageReader,
"*.png": ImageReader,
"*.webp": ImageReader,
"*.ipynb": IPYNBReader,
"*.epub": EpubReader,
"*.md": MarkdownReader,
"*.mbox": MboxReader,
"*.csv": PandasCSVReader,
"*.xls": PandasExcelReader,
"*.xlsx": PandasExcelReader,
"*.mp3": VideoAudioReader,
"*.mp4": VideoAudioReader,
}

def __init__(self, input_dir: Optional[str] = None, input_files: Optional[List] = None,
exclude: Optional[List] = None, exclude_hidden: bool = True, recursive: bool = False,
encoding: str = "utf-8", filename_as_id: bool = False, required_exts: Optional[List[str]] = None,
file_extractor: Optional[Dict[str, Callable]] = None, fs: Optional[AbstractFileSystem] = None,
file_metadata: Optional[Callable[[str], Dict]] = None, num_files_limit: Optional[int] = None,
return_trace: bool = False) -> None:
super().__init__(return_trace=return_trace)

if (not input_dir and not input_files) or (input_dir and input_files):
raise ValueError("Must provide either `input_dir` or `input_files`.")

self._fs = fs or get_default_fs()
self._encoding = encoding

self._exclude = exclude
self._recursive = recursive
self._exclude_hidden = exclude_hidden
self._required_exts = required_exts
self._num_files_limit = num_files_limit
self._Path = Path if is_default_fs(self._fs) else PurePosixPath

if input_files:
self._input_files = []
for path in input_files:
if not self._fs.isfile(path):
raise ValueError(f"File {path} does not exist.")
input_file = self._Path(path)
self._input_files.append(input_file)
elif input_dir:
if not self._fs.isdir(input_dir):
raise ValueError(f"Directory {input_dir} does not exist.")
self._input_dir = self._Path(input_dir)
self._input_files = self._add_files(self._input_dir)

self._file_extractor = file_extractor or {}

self._file_metadata = file_metadata or _DefaultFileMetadataFunc(self._fs)
self._filename_as_id = filename_as_id

def _add_files(self, input_dir: Path) -> List[Path]: # noqa: C901
all_files = set()
rejected_files = set()
rejected_dirs = set()

if self._exclude is not None:
for excluded_pattern in self._exclude:
if self._recursive:
excluded_glob = self._Path(input_dir) / self._Path("**") / excluded_pattern
else:
excluded_glob = self._Path(input_dir) / excluded_pattern
for file in self._fs.glob(str(excluded_glob)):
if self._fs.isdir(file):
rejected_dirs.add(self._Path(file))
else:
rejected_files.add(self._Path(file))

file_refs: List[str] = []
if self._recursive:
file_refs = self._fs.glob(str(input_dir) + "/**/*")
else:
file_refs = self._fs.glob(str(input_dir) + "/*")

for ref in file_refs:
ref = self._Path(ref)
is_dir = self._fs.isdir(ref)
skip_hidden = self._exclude_hidden and self._is_hidden(ref)
skip_bad_exts = (self._required_exts is not None and ref.suffix not in self._required_exts)
skip_excluded = ref in rejected_files
if not skip_excluded:
if is_dir:
ref_parent_dir = ref
else:
ref_parent_dir = self._fs._parent(ref)
for rejected_dir in rejected_dirs:
if str(ref_parent_dir).startswith(str(rejected_dir)):
skip_excluded = True
LOG.warning(f"Skipping {ref} because it in parent dir "
f"{ref_parent_dir} which is in {rejected_dir}.")
break

if is_dir or skip_hidden or skip_bad_exts or skip_excluded:
continue
else:
all_files.add(ref)

new_input_files = sorted(all_files)

if len(new_input_files) == 0:
raise ValueError(f"No files found in {input_dir}.")
if self._num_files_limit is not None and self._num_files_limit > 0:
new_input_files = new_input_files[0: self._num_files_limit]

LOG.debug(f"[SimpleDirectoryReader] Total files add: {len(new_input_files)}")

LOG.info(f"input_files: {new_input_files}")
return new_input_files

def _is_hidden(self, path: Path) -> bool:
return any(part.startswith(".") and part not in [".", ".."] for part in path.parts)

def _exclude_metadata(self, documents: List[DocNode]) -> List[DocNode]:
for doc in documents:
doc.excluded_embed_metadata_keys.extend(
["file_name", "file_type", "file_size", "creation_date",
"last_modified_date", "last_accessed_date"])
doc.excluded_llm_metadata_keys.extend(
["file_name", "file_type", "file_size", "creation_date",
"last_modified_date", "last_accessed_date"])
return documents

@staticmethod
def load_file(input_file: Path, file_metadata: Callable[[str], Dict], file_extractor: Dict[str, Callable],
filename_as_id: bool = False, encoding: str = "utf-8", pathm: PurePath = Path,
fs: Optional[AbstractFileSystem] = None) -> List[DocNode]:
metadata: Optional[dict] = None
documents: List[DocNode] = []

if file_metadata is not None: metadata = file_metadata(str(input_file))

file_reader_patterns = list(file_extractor.keys())

for pattern in file_reader_patterns:
pt = str(pathm(pattern))
match_pattern = pt if pt.startswith("*") else os.path.join(str(pathm.cwd()), pt)
if fnmatch.fnmatch(input_file, match_pattern):
reader = file_extractor[pattern]
reader = reader() if isinstance(reader, type) else reader
kwargs = {"extra_info": metadata}
if fs and not is_default_fs(fs): kwargs['fs'] = fs
docs = reader(input_file, **kwargs)

if filename_as_id:
for i, doc in enumerate(docs):
doc.uid = f"{input_file!s}_index_{i}"
documents.extend(docs)
break
else:
fs = fs or get_default_fs()
with fs.open(input_file, encoding=encoding) as f:
data = f.read().decode(encoding)

doc = DocNode(text=data, metadata=metadata or {})
if filename_as_id: doc.uid = str(input_file)
documents.append(doc)

return documents

def _load_data(self, show_progress: bool = False, num_workers: Optional[int] = None,
fs: Optional[AbstractFileSystem] = None) -> List[DocNode]:
documents = []

fs = fs or self._fs
process_file = self._input_files
file_readers = self._file_extractor.copy()
for key, func in self.default_file_readers.items():
if key not in file_readers: file_readers[key] = func

if num_workers and num_workers >= 1:
if num_workers > multiprocessing.cpu_count():
LOG.warning("Specified num_workers exceed number of CPUs in the system. "
"Setting `num_workers` down to the maximum CPU count.")
with multiprocessing.get_context("spawn").Pool(num_workers) as p:
results = p.starmap(SimpleDirectoryReader.load_file,
zip(process_file, repeat(self._file_metadata), repeat(file_readers),
repeat(self._filename_as_id), repeat(self._encoding), repeat(self._Path),
repeat(self._fs)))
documents = reduce(lambda x, y: x + y, results)
else:
if show_progress:
process_file = tqdm(self._input_files, desc="Loading files", unit="file")
for input_file in process_file:
documents.extend(
SimpleDirectoryReader.load_file(input_file=input_file, file_metadata=self._file_metadata,
file_extractor=file_readers, filename_as_id=self._filename_as_id,
encoding=self._encoding, pathm=self._Path, fs=self._fs))

return self._exclude_metadata(documents)

def forward(self, *args, **kwargs) -> List[DocNode]:
return self._load_data(*args, **kwargs)
28 changes: 13 additions & 15 deletions lazyllm/tools/rag/data_loaders.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
from typing import List, Optional
from typing import List, Optional, Dict
from .store import DocNode, LAZY_ROOT_NAME
from lazyllm import LOG

from .dataReader import SimpleDirectoryReader

class DirectoryReader:
def __init__(self, input_files: List[str]) -> None:
def __init__(self, input_files: List[str], local_readers: Optional[Dict] = None,
global_readers: Optional[Dict] = None) -> None:
self._input_files = input_files
self._local_readers = local_readers
self._global_readers = global_readers

def load_data(self, input_files: Optional[List[str]] = None) -> List[DocNode]:
input_files = input_files or self._input_files
from llama_index.core import SimpleDirectoryReader

file_readers = self._local_readers.copy()
for key, func in self._global_readers.items():
if key not in file_readers: file_readers[key] = func
LOG.info(f"DirectoryReader loads data, input files: {input_files}")
reader = SimpleDirectoryReader(input_files=input_files)
reader = SimpleDirectoryReader(input_files=input_files, file_extractor=file_readers)
nodes: List[DocNode] = []
for doc in reader.load_data():
node = DocNode(
text=doc.text,
group=LAZY_ROOT_NAME,
)
node.metadata = doc.metadata
node.excluded_embed_metadata_keys = doc.excluded_embed_metadata_keys
node.excluded_llm_metadata_keys = doc.excluded_llm_metadata_keys
nodes.append(node)
for doc in reader():
doc.group = LAZY_ROOT_NAME
nodes.append(doc)
if not nodes:
LOG.warning(
f"No nodes load from path {self.input_files}, please check your data path."
Expand Down
5 changes: 3 additions & 2 deletions lazyllm/tools/rag/doc_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ def wrapper(*args, **kwargs) -> List[float]:


class DocImpl:
def __init__(self, embed, doc_files=Optional[List[str]], **kwargs):
def __init__(self, embed, doc_files=Optional[List[str]], local_readers: Optional[Dict] = None,
global_readers: Optional[Dict] = None, **kwargs):
super().__init__()
self.directory_reader = DirectoryReader(doc_files)
self.directory_reader = DirectoryReader(doc_files, local_readers=local_readers, global_readers=global_readers)
self.node_groups: Dict[str, Dict] = {LAZY_ROOT_NAME: {}}
self._create_node_group_default()
self.embed = embed_wrapper(embed)
Expand Down
Loading