Skip to content

Commit

Permalink
refactor: Refactor CommunitySummaryKnowledgeGraph batch extraction me…
Browse files Browse the repository at this point in the history
…thod

Co-authored-by: Appointat <[email protected]>
  • Loading branch information
Appointat committed Oct 28, 2024
1 parent fee90cc commit ccd2cdf
Showing 1 changed file with 20 additions and 28 deletions.
48 changes: 20 additions & 28 deletions dbgpt/storage/knowledge_graph/community_summary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Define the CommunitySummaryKnowledgeGraph."""

import asyncio
import logging
import os
import uuid
Expand Down Expand Up @@ -199,35 +198,28 @@ async def _aload_triplet_graph(self, chunks: List[Chunk]) -> None:
return

document_graph_enabled = self._graph_store.get_config().document_graph_enabled
batch_size = self._triplet_extraction_batch_size

for i in range(0, len(chunks), batch_size):
batch_chunks = chunks[i : i + batch_size]

extraction_tasks = [
self._graph_extractor.extract(chunk.content) for chunk in batch_chunks
]
async_graphs: List[List[MemoryGraph]] = await asyncio.gather(
*extraction_tasks
)
chunk_graph_pairs = await self._graph_extractor.batch_extract(
chunks, batch_size=self._triplet_extraction_batch_size
)

for chunk, graphs in zip(batch_chunks, async_graphs):
for graph in graphs:
if document_graph_enabled:
# append the chunk id to the edge
for edge in graph.edges():
edge.set_prop("_chunk_id", chunk.chunk_id)
graph.append_edge(edge=edge)

# upsert the graph
self._graph_store_apdater.upsert_graph(graph)

# chunk -> include -> entity
if document_graph_enabled:
for vertex in graph.vertices():
self._graph_store_apdater.upsert_chunk_include_entity(
chunk=chunk, entity=vertex
)
for chunk, graphs in chunk_graph_pairs:
for graph in graphs:
if document_graph_enabled:
# Append the chunk id to the edge
for edge in graph.edges():
edge.set_prop("_chunk_id", chunk.chunk_id)
graph.append_edge(edge=edge)

# Upsert the graph
self._graph_store_apdater.upsert_graph(graph)

# chunk -> include -> entity
if document_graph_enabled:
for vertex in graph.vertices():
self._graph_store_apdater.upsert_chunk_include_entity(
chunk=chunk, entity=vertex
)

def _load_chunks(
self, chunks: List[ParagraphChunk]
Expand Down

0 comments on commit ccd2cdf

Please sign in to comment.