Skip to content

Commit

Permalink
experimental[patch]: Enhance LLMGraphTransformer with async processin…
Browse files Browse the repository at this point in the history
…g and improved readability (#19205)

- [x] **PR title**: "experimental: Enhance LLMGraphTransformer with
async processing and improved readability"


- [x] **PR message**: 
- **Description:** This pull request refactors the `process_response`
and `convert_to_graph_documents` methods in the LLMGraphTransformer
class to improve code readability and adds async versions of these
methods for concurrent processing.
    The main changes include:
- Simplifying list comprehensions and conditional logic in the
process_response method for better readability.
- Adding async versions aprocess_response and
aconvert_to_graph_documents to enable concurrent processing of
documents.
These enhancements aim to improve the overall efficiency and
maintainability of the `LLMGraphTransformer` class.
  - **Issue:** N/A
  - **Dependencies:** No additional dependencies required.
  - **Twitter handle:** @jjovalle99


- [x] **Add tests and docs**: N/A (This PR does not introduce a new
integration)


- [x] **Lint and test**: Ran make format, make lint, and make test from
the root of the modified package(s). All tests pass successfully.

Additional notes:

- The changes made in this PR are backwards compatible and do not
introduce any breaking changes.
- The PR touches only the `LLMGraphTransformer` class within the
experimental package.

---------

Co-authored-by: Bagatur <[email protected]>
  • Loading branch information
2 people authored and hinthornw committed Apr 26, 2024
1 parent 320d3c7 commit 90199a2
Showing 1 changed file with 64 additions and 29 deletions.
93 changes: 64 additions & 29 deletions libs/experimental/langchain_experimental/graph_transformers/llm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Any, List, Optional, Sequence

from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
Expand Down Expand Up @@ -207,47 +208,35 @@ def process_response(self, document: Document) -> GraphDocument:
"""
text = document.page_content
raw_schema = self.chain.invoke({"input": text})
if raw_schema.nodes:
nodes = [map_to_base_node(node) for node in raw_schema.nodes]
else:
nodes = []
if raw_schema.relationships:
relationships = [
map_to_base_relationship(rel) for rel in raw_schema.relationships
]
else:
relationships = []
nodes = (
[map_to_base_node(node) for node in raw_schema.nodes]
if raw_schema.nodes
else []
)
relationships = (
[map_to_base_relationship(rel) for rel in raw_schema.relationships]
if raw_schema.relationships
else []
)

# Strict mode filtering
if self.strict_mode and (self.allowed_nodes or self.allowed_relationships):
if self.allowed_relationships and self.allowed_nodes:
nodes = [node for node in nodes if node.type in self.allowed_nodes]
relationships = [
rel
for rel in relationships
if rel.type in self.allowed_relationships
and rel.source.type in self.allowed_nodes
and rel.target.type in self.allowed_nodes
]
elif self.allowed_nodes and not self.allowed_relationships:
if self.allowed_nodes:
nodes = [node for node in nodes if node.type in self.allowed_nodes]
relationships = [
rel
for rel in relationships
if rel.source.type in self.allowed_nodes
and rel.target.type in self.allowed_nodes
]
if self.allowed_relationships and not self.allowed_nodes:
if self.allowed_relationships:
relationships = [
rel
for rel in relationships
if rel.type in self.allowed_relationships
]

graph_document = GraphDocument(
nodes=nodes, relationships=relationships, source=document
)
return graph_document
return GraphDocument(nodes=nodes, relationships=relationships, source=document)

def convert_to_graph_documents(
self, documents: Sequence[Document]
Expand All @@ -261,8 +250,54 @@ def convert_to_graph_documents(
Returns:
Sequence[GraphDocument]: The transformed documents as graphs.
"""
results = []
for document in documents:
graph_document = self.process_response(document)
results.append(graph_document)
return [self.process_response(document) for document in documents]

async def aprocess_response(self, document: Document) -> GraphDocument:
"""
Asynchronously processes a single document, transforming it into a
graph document.
"""
text = document.page_content
raw_schema = await self.chain.ainvoke({"input": text})

nodes = (
[map_to_base_node(node) for node in raw_schema.nodes]
if raw_schema.nodes
else []
)
relationships = (
[map_to_base_relationship(rel) for rel in raw_schema.relationships]
if raw_schema.relationships
else []
)

if self.strict_mode and (self.allowed_nodes or self.allowed_relationships):
if self.allowed_nodes:
nodes = [node for node in nodes if node.type in self.allowed_nodes]
relationships = [
rel
for rel in relationships
if rel.source.type in self.allowed_nodes
and rel.target.type in self.allowed_nodes
]
if self.allowed_relationships:
relationships = [
rel
for rel in relationships
if rel.type in self.allowed_relationships
]

return GraphDocument(nodes=nodes, relationships=relationships, source=document)

async def aconvert_to_graph_documents(
self, documents: Sequence[Document]
) -> List[GraphDocument]:
"""
Asynchronously convert a sequence of documents into graph documents.
"""
tasks = [
asyncio.create_task(self.aprocess_response(document))
for document in documents
]
results = await asyncio.gather(*tasks)
return results

0 comments on commit 90199a2

Please sign in to comment.