Skip to content

Commit

Permalink
fix: refactor get_graph_from_model to return nodes and edges correctly (
Browse files Browse the repository at this point in the history
#257)

* fix: handle rate limit error coming from llm model

* fix: fixes lost edges and nodes in get_graph_from_model

* fix: fixes database pruning issue in pgvector (#261)

* fix: cognee_demo notebook pipeline is not saving summaries

---------

Co-authored-by: hajdul88 <[email protected]>
  • Loading branch information
borisarzentar and hajdul88 authored Dec 6, 2024
1 parent 351ce92 commit 348610e
Show file tree
Hide file tree
Showing 22 changed files with 233 additions and 151 deletions.
2 changes: 1 addition & 1 deletion cognee/api/v1/cognify/cognify_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ async def run_cognify_pipeline(dataset: Dataset, user: User):
Task(classify_documents),
Task(check_permissions_on_documents, user = user, permissions = ["write"]),
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(add_data_points, task_config = { "batch_size": 10 }),
Task(extract_graph_from_data, graph_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model = cognee_config.summarization_model,
task_config = { "batch_size": 10 }
),
Task(add_data_points, task_config = { "batch_size": 10 }),
]

pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ def __init__(
self.model = model
self.dimensions = dimensions

MAX_RETRIES = 5
retry_count = 0

async def embed_text(self, text: List[str]) -> List[List[float]]:
async def exponential_backoff(attempt):
wait_time = min(10 * (2 ** attempt), 60) # Max 60 seconds
await asyncio.sleep(wait_time)

try:
response = await litellm.aembedding(
self.model,
Expand All @@ -38,23 +45,40 @@ async def embed_text(self, text: List[str]) -> List[List[float]]:
api_base = self.endpoint,
api_version = self.api_version
)

self.retry_count = 0

return [data["embedding"] for data in response.data]

except litellm.exceptions.ContextWindowExceededError as error:
if isinstance(text, list):
parts = [text[0:math.ceil(len(text)/2)], text[math.ceil(len(text)/2):]]
if len(text) == 1:
parts = [text]
else:
parts = [text[0:math.ceil(len(text)/2)], text[math.ceil(len(text)/2):]]

parts_futures = [self.embed_text(part) for part in parts]
embeddings = await asyncio.gather(*parts_futures)

all_embeddings = []
for embeddings_part in embeddings:
all_embeddings.extend(embeddings_part)

return [data["embedding"] for data in all_embeddings]
return all_embeddings

logger.error("Context window exceeded for embedding text: %s", str(error))
raise error

except litellm.exceptions.RateLimitError:
if self.retry_count >= self.MAX_RETRIES:
raise Exception(f"Rate limit exceeded and no more retries left.")

await exponential_backoff(self.retry_count)

self.retry_count += 1

return await self.embed_text(text)

except Exception as error:
logger.error("Error embedding text: %s", str(error))
raise error
Expand Down
3 changes: 3 additions & 0 deletions cognee/modules/chunking/TextChunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def read(self):
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = chunk_data["cut_type"],
contains = [],
_metadata = {
"index_fields": ["text"],
"metadata_id": self.document.metadata_id
Expand All @@ -52,6 +53,7 @@ def read(self):
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains = [],
_metadata = {
"index_fields": ["text"],
"metadata_id": self.document.metadata_id
Expand All @@ -73,6 +75,7 @@ def read(self):
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains = [],
_metadata = {
"index_fields": ["text"],
"metadata_id": self.document.metadata_id
Expand Down
4 changes: 3 additions & 1 deletion cognee/modules/chunking/models/DocumentChunk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from typing import List, Optional
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.processing.document_types import Document
from cognee.modules.engine.models import Entity

class DocumentChunk(DataPoint):
__tablename__ = "document_chunk"
Expand All @@ -9,6 +10,7 @@ class DocumentChunk(DataPoint):
chunk_index: int
cut_type: str
is_part_of: Document
contains: List[Entity] = None

_metadata: Optional[dict] = {
"index_fields": ["text"],
Expand Down
1 change: 1 addition & 0 deletions cognee/modules/chunking/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .DocumentChunk import DocumentChunk
2 changes: 0 additions & 2 deletions cognee/modules/engine/models/Entity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.engine.models.EntityType import EntityType


Expand All @@ -8,7 +7,6 @@ class Entity(DataPoint):
name: str
is_a: EntityType
description: str
mentioned_in: DocumentChunk

_metadata: dict = {
"index_fields": ["name"],
Expand Down
3 changes: 0 additions & 3 deletions cognee/modules/engine/models/EntityType.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk


class EntityType(DataPoint):
__tablename__ = "entity_type"
name: str
type: str
description: str
exists_in: DocumentChunk

_metadata: dict = {
"index_fields": ["name"],
Expand Down
21 changes: 13 additions & 8 deletions cognee/modules/graph/utils/expand_with_nodes_and_edges.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models import DocumentChunk
from cognee.modules.engine.models import Entity, EntityType
from cognee.modules.engine.utils import (
generate_edge_name,
Expand All @@ -11,17 +11,19 @@


def expand_with_nodes_and_edges(
graph_node_index: list[tuple[DataPoint, KnowledgeGraph]],
data_chunks: list[DocumentChunk],
chunk_graphs: list[KnowledgeGraph],
existing_edges_map: Optional[dict[str, bool]] = None,
):
if existing_edges_map is None:
existing_edges_map = {}

added_nodes_map = {}
relationships = []
data_points = []

for graph_source, graph in graph_node_index:
for index, data_chunk in enumerate(data_chunks):
graph = chunk_graphs[index]

if graph is None:
continue

Expand All @@ -38,7 +40,6 @@ def expand_with_nodes_and_edges(
name = type_node_name,
type = type_node_name,
description = type_node_name,
exists_in = graph_source,
)
added_nodes_map[f"{str(type_node_id)}_type"] = type_node
else:
Expand All @@ -50,9 +51,13 @@ def expand_with_nodes_and_edges(
name = node_name,
is_a = type_node,
description = node.description,
mentioned_in = graph_source,
)
data_points.append(entity_node)

if data_chunk.contains is None:
data_chunk.contains = []

data_chunk.contains.append(entity_node)

added_nodes_map[f"{str(node_id)}_entity"] = entity_node

# Add relationship that came from graphs.
Expand Down Expand Up @@ -80,4 +85,4 @@ def expand_with_nodes_and_edges(
)
existing_edges_map[edge_key] = True

return (data_points, relationships)
return (data_chunks, relationships)
Loading

0 comments on commit 348610e

Please sign in to comment.