Skip to content

Commit

Permalink
fix parallel transform bug of document (LazyAGI#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
wzh1994 authored Sep 27, 2024
1 parent c0cc809 commit 74438ee
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
1 change: 1 addition & 0 deletions lazyllm/tools/rag/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def _parallel_do_embedding(self, nodes: List[DocNode]) -> List[DocNode]:
if node.has_missing_embedding(k):
future = executor.submit(node.do_embedding, {k: self.embed[k]}) \
if k not in node.embedding_state else executor.submit(node.check_embedding_state, k)
node.embedding_state.add(k)
futures.append(future)
if len(futures) > 0:
for future in concurrent.futures.as_completed(futures):
Expand Down
16 changes: 9 additions & 7 deletions lazyllm/tools/rag/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ def batch_forward(
) -> List[DocNode]:
documents: List[DocNode] = documents if isinstance(documents, (tuple, list)) else [documents]

def impl(node):
splits = self(node, **kwargs)
for s in splits:
s.parent = node
s.group = node_group
node.children[node_group] = splits
return splits
def impl(node: DocNode):
with node._lock:
if node_group in node.children: return []
splits = self(node, **kwargs)
for s in splits:
s.parent = node
s.group = node_group
node.children[node_group] = splits
return splits

if getattr(self, '_number_workers', 0) > 0:
pool = ThreadPoolExecutor(max_workers=self._number_workers)
Expand Down

0 comments on commit 74438ee

Please sign in to comment.