From 458eeace78608c144fe72a6447143fb2d8a1103e Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Tue, 28 Jan 2025 12:11:05 +0100 Subject: [PATCH 01/15] Count the number of tokens in documents --- cognee/modules/chunking/TextChunker.py | 3 +++ cognee/modules/chunking/models/DocumentChunk.py | 1 + cognee/modules/data/processing/document_types/Document.py | 1 + cognee/tasks/documents/extract_chunks_from_documents.py | 3 +++ 4 files changed, 8 insertions(+) diff --git a/cognee/modules/chunking/TextChunker.py b/cognee/modules/chunking/TextChunker.py index cd71dd128..3f40d281c 100644 --- a/cognee/modules/chunking/TextChunker.py +++ b/cognee/modules/chunking/TextChunker.py @@ -48,6 +48,7 @@ def read(self): id=chunk_data["chunk_id"], text=chunk_data["text"], word_count=chunk_data["word_count"], + token_count=chunk_data["token_count"], is_part_of=self.document, chunk_index=self.chunk_index, cut_type=chunk_data["cut_type"], @@ -67,6 +68,7 @@ def read(self): ), text=chunk_text, word_count=self.chunk_size, + token_count=self.token_count, is_part_of=self.document, chunk_index=self.chunk_index, cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], @@ -89,6 +91,7 @@ def read(self): id=uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"), text=" ".join(chunk["text"] for chunk in paragraph_chunks), word_count=self.chunk_size, + token_count=self.token_count, is_part_of=self.document, chunk_index=self.chunk_index, cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], diff --git a/cognee/modules/chunking/models/DocumentChunk.py b/cognee/modules/chunking/models/DocumentChunk.py index a232d50a1..894a810d2 100644 --- a/cognee/modules/chunking/models/DocumentChunk.py +++ b/cognee/modules/chunking/models/DocumentChunk.py @@ -9,6 +9,7 @@ class DocumentChunk(DataPoint): __tablename__ = "document_chunk" text: str word_count: int + token_count: int chunk_index: int cut_type: str is_part_of: Document diff --git a/cognee/modules/data/processing/document_types/Document.py b/cognee/modules/data/processing/document_types/Document.py index 4d9f3bf72..396afa264 100644 --- a/cognee/modules/data/processing/document_types/Document.py +++ b/cognee/modules/data/processing/document_types/Document.py @@ -9,6 +9,7 @@ class Document(DataPoint): raw_data_location: str external_metadata: Optional[str] mime_type: str + token_count: Optional[int] = None _metadata: dict = {"index_fields": ["name"], "type": "Document"} def read(self, chunk_size: int, chunker=str, max_tokens: Optional[int] = None) -> str: diff --git a/cognee/tasks/documents/extract_chunks_from_documents.py b/cognee/tasks/documents/extract_chunks_from_documents.py index ecdd6817d..5434be421 100644 --- a/cognee/tasks/documents/extract_chunks_from_documents.py +++ b/cognee/tasks/documents/extract_chunks_from_documents.py @@ -17,7 +17,10 @@ async def extract_chunks_from_documents( - The `chunker` parameter determines the chunking logic and should align with the document type. """ for document in documents: + document_token_count = 0 for document_chunk in document.read( chunk_size=chunk_size, chunker=chunker, max_tokens=max_tokens ): + document_token_count += document_chunk.token_count yield document_chunk + document.token_count = document_token_count From f6663ab66bee830b530834c72fbcacc4066b3031 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Tue, 28 Jan 2025 17:08:18 +0100 Subject: [PATCH 02/15] save token count to relational db --- .../processing/document_types/Document.py | 1 - .../extract_chunks_from_documents.py | 24 +++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/cognee/modules/data/processing/document_types/Document.py b/cognee/modules/data/processing/document_types/Document.py index 11996a95a..80ba8e428 100644 --- a/cognee/modules/data/processing/document_types/Document.py +++ b/cognee/modules/data/processing/document_types/Document.py @@ -9,7 +9,6 @@ class Document(DataPoint): raw_data_location: str external_metadata: Optional[str] mime_type: str - token_count: Optional[int] = None _metadata: dict = {"index_fields": ["name"], "type": "Document"} def read(self, chunk_size: int, chunker=str) -> str: diff --git a/cognee/tasks/documents/extract_chunks_from_documents.py b/cognee/tasks/documents/extract_chunks_from_documents.py index 8e0d1b12d..a65f32fc9 100644 --- a/cognee/tasks/documents/extract_chunks_from_documents.py +++ b/cognee/tasks/documents/extract_chunks_from_documents.py @@ -1,6 +1,25 @@ -from typing import Optional, AsyncGenerator +from typing import AsyncGenerator from cognee.modules.data.processing.document_types.Document import Document +from sqlalchemy import select +from cognee.modules.data.models import Data +from cognee.infrastructure.databases.relational import get_relational_engine +from uuid import UUID + + +async def update_document_token_count(document_id: UUID, token_count: int) -> None: + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + document_data_point = ( + await session.execute(select(Data).filter(Data.id == document_id)) + ).scalar_one_or_none() + + if document_data_point: + document_data_point.token_count = token_count + await session.merge(document_data_point) + await session.commit() + else: + raise ValueError(f"Document with id {document_id} not found.") async def extract_chunks_from_documents( @@ -23,4 +42,5 @@ async def extract_chunks_from_documents( ): document_token_count += document_chunk.token_count yield document_chunk - document.token_count = document_token_count + + await update_document_token_count(document.id, document_token_count) From 72dfec46cbfd522d4b8983e81dde5351f7788634 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Tue, 28 Jan 2025 20:00:32 +0100 Subject: [PATCH 03/15] Add metrics to metric table --- cognee/api/v1/cognify/cognify_v2.py | 2 + cognee/modules/data/models/MetricData.py | 14 +++---- cognee/tasks/storage/descriptive_metrics.py | 46 +++++++++++++++++++++ 3 files changed, 55 insertions(+), 7 deletions(-) create mode 100644 cognee/tasks/storage/descriptive_metrics.py diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 12a84030d..48d46417f 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -25,6 +25,7 @@ ) from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.storage import add_data_points +from cognee.tasks.storage.descriptive_metrics import store_descriptive_metrics from cognee.tasks.storage.index_graph_edges import index_graph_edges from cognee.tasks.summarization import summarize_text @@ -164,6 +165,7 @@ async def get_default_tasks( task_config={"batch_size": 10}, ), Task(add_data_points, only_root=True, task_config={"batch_size": 10}), + Task(store_descriptive_metrics), ] except Exception as error: send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id) diff --git a/cognee/modules/data/models/MetricData.py b/cognee/modules/data/models/MetricData.py index ef6c33d1d..4f1b91e1b 100644 --- a/cognee/modules/data/models/MetricData.py +++ b/cognee/modules/data/models/MetricData.py @@ -11,13 +11,13 @@ class GraphMetricData(Base): # TODO: Change ID to reflect unique id of graph database id = Column(UUID, primary_key=True, default=uuid4) - num_tokens = Column(Integer) - num_nodes = Column(Integer) - num_edges = Column(Integer) - mean_degree = Column(Float) - edge_density = Column(Float) - num_connected_components = Column(Integer) - sizes_of_connected_components = Column(ARRAY(Integer)) + num_tokens = Column(Integer, nullable=True) + num_nodes = Column(Integer, nullable=True) + num_edges = Column(Integer, nullable=True) + mean_degree = Column(Float, nullable=True) + edge_density = Column(Float, nullable=True) + num_connected_components = Column(Integer, nullable=True) + sizes_of_connected_components = Column(ARRAY(Integer), nullable=True) num_selfloops = Column(Integer, nullable=True) diameter = Column(Integer, nullable=True) avg_shortest_path_length = Column(Float, nullable=True) diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py new file mode 100644 index 000000000..5c1f710a6 --- /dev/null +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -0,0 +1,46 @@ +from cognee.infrastructure.engine import DataPoint +from cognee.modules.data.processing.document_types import Document +from cognee.infrastructure.databases.relational import get_relational_engine +from sqlalchemy import select +from cognee.modules.data.models import Data +from cognee.modules.data.models.MetricData import GraphMetricData +import uuid +from cognee.infrastructure.databases.graph import get_graph_engine + + +async def fetch_token_count(db_engine) -> int: + """ + Fetches and sums token counts from the database. + + Returns: + int: The total number of tokens across all documents. + """ + + async with db_engine.get_async_session() as session: + document_data_points = await session.execute(select(Data)) + token_count_sum = sum(document.token_count for document in document_data_points.scalars()) + + return token_count_sum + + +async def calculate_graph_metrics(graph_data): + nodes, edges = graph_data + graph_metrics = { + "num_nodes": len(nodes), + "num_edges": len(edges), + } + return graph_metrics + + +async def store_descriptive_metrics(data_points: list[DataPoint]): + db_engine = get_relational_engine() + graph_engine = await get_graph_engine() + graph_data = await graph_engine.get_graph_data() + + token_count_sum = await fetch_token_count(db_engine) + graph_metrics = await calculate_graph_metrics(graph_data) + + table_name = "graph_metric_table" + metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics + + await db_engine.insert_data(table_name, metrics_dict) From 22b64598190bd2624428bc9b5f1f42605a0cb9a2 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Wed, 29 Jan 2025 09:30:14 +0100 Subject: [PATCH 04/15] Store list as json instead of array in relational db table --- cognee/modules/data/models/MetricData.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/modules/data/models/MetricData.py b/cognee/modules/data/models/MetricData.py index 4f1b91e1b..aac817cdc 100644 --- a/cognee/modules/data/models/MetricData.py +++ b/cognee/modules/data/models/MetricData.py @@ -1,6 +1,6 @@ from datetime import datetime, timezone -from sqlalchemy import Column, DateTime, Float, Integer, ARRAY, UUID +from sqlalchemy import Column, DateTime, Float, Integer, JSON, UUID from cognee.infrastructure.databases.relational import Base from uuid import uuid4 @@ -17,7 +17,7 @@ class GraphMetricData(Base): mean_degree = Column(Float, nullable=True) edge_density = Column(Float, nullable=True) num_connected_components = Column(Integer, nullable=True) - sizes_of_connected_components = Column(ARRAY(Integer), nullable=True) + sizes_of_connected_components = Column(JSON, nullable=True) num_selfloops = Column(Integer, nullable=True) diameter = Column(Integer, nullable=True) avg_shortest_path_length = Column(Float, nullable=True) From 100e7d79744b68eb70e1e2531c25af0421bd8a95 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Wed, 29 Jan 2025 13:25:38 +0100 Subject: [PATCH 05/15] Sum in sql instead of python --- cognee/tasks/storage/descriptive_metrics.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py index 5c1f710a6..c6d229057 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -2,6 +2,7 @@ from cognee.modules.data.processing.document_types import Document from cognee.infrastructure.databases.relational import get_relational_engine from sqlalchemy import select +from sqlalchemy.sql import func from cognee.modules.data.models import Data from cognee.modules.data.models.MetricData import GraphMetricData import uuid @@ -17,8 +18,8 @@ async def fetch_token_count(db_engine) -> int: """ async with db_engine.get_async_session() as session: - document_data_points = await session.execute(select(Data)) - token_count_sum = sum(document.token_count for document in document_data_points.scalars()) + token_count_sum = await session.execute(select(func.sum(Data.token_count))) + token_count_sum = token_count_sum.scalar() return token_count_sum From c182d476820932fd95a943b35606a0c296e8ac66 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Wed, 29 Jan 2025 13:34:07 +0100 Subject: [PATCH 06/15] Unify naming --- cognee/modules/data/models/{MetricData.py => GraphMetrics.py} | 4 ++-- cognee/tasks/storage/descriptive_metrics.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename cognee/modules/data/models/{MetricData.py => GraphMetrics.py} (93%) diff --git a/cognee/modules/data/models/MetricData.py b/cognee/modules/data/models/GraphMetrics.py similarity index 93% rename from cognee/modules/data/models/MetricData.py rename to cognee/modules/data/models/GraphMetrics.py index aac817cdc..2103214c8 100644 --- a/cognee/modules/data/models/MetricData.py +++ b/cognee/modules/data/models/GraphMetrics.py @@ -6,8 +6,8 @@ from uuid import uuid4 -class GraphMetricData(Base): - __tablename__ = "graph_metric_table" +class GraphMetrics(Base): + __tablename__ = "graph_metrics_table" # TODO: Change ID to reflect unique id of graph database id = Column(UUID, primary_key=True, default=uuid4) diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py index c6d229057..c057b23c0 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -4,7 +4,7 @@ from sqlalchemy import select from sqlalchemy.sql import func from cognee.modules.data.models import Data -from cognee.modules.data.models.MetricData import GraphMetricData +from cognee.modules.data.models import GraphMetrics import uuid from cognee.infrastructure.databases.graph import get_graph_engine @@ -41,7 +41,7 @@ async def store_descriptive_metrics(data_points: list[DataPoint]): token_count_sum = await fetch_token_count(db_engine) graph_metrics = await calculate_graph_metrics(graph_data) - table_name = "graph_metric_table" + table_name = "graph_metrics_table" metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics await db_engine.insert_data(table_name, metrics_dict) From 44fa2cd1336c3b264e97007893500d6bd6f512b7 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Wed, 29 Jan 2025 13:35:42 +0100 Subject: [PATCH 07/15] Return data_points in descriptive metric calculation task --- cognee/tasks/storage/descriptive_metrics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py index c057b23c0..f7a854e53 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -45,3 +45,4 @@ async def store_descriptive_metrics(data_points: list[DataPoint]): metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics await db_engine.insert_data(table_name, metrics_dict) + return data_points From 06030ff768f6e214cd5c826ec38c209932e08946 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Wed, 29 Jan 2025 14:28:41 +0100 Subject: [PATCH 08/15] Graph metrics getter template in graph db interface and adapters --- .../databases/graph/graph_db_interface.py | 4 ++++ .../databases/graph/neo4j_driver/adapter.py | 14 ++++++++++++++ .../databases/graph/networkx/adapter.py | 14 ++++++++++++++ cognee/tasks/storage/descriptive_metrics.py | 12 +----------- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/cognee/infrastructure/databases/graph/graph_db_interface.py b/cognee/infrastructure/databases/graph/graph_db_interface.py index 30acc1b95..dfb955cd7 100644 --- a/cognee/infrastructure/databases/graph/graph_db_interface.py +++ b/cognee/infrastructure/databases/graph/graph_db_interface.py @@ -54,3 +54,7 @@ async def delete_graph( @abstractmethod async def get_graph_data(self): raise NotImplementedError + + @abstractmethod + async def get_graph_metrics(self): + raise NotImplementedError diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py index 41bfb891d..dc4a61b51 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py @@ -522,3 +522,17 @@ async def get_filtered_graph_data(self, attribute_filters): ] return (nodes, edges) + + async def get_graph_metrics(self): + return { + "num_nodes": -1, + "num_edges": -1, + "mean_degree": -1, + "edge_density": -1, + "num_connected_components": -1, + "sizes_of_connected_components": -1, + "num_selfloops": -1, + "diameter": -1, + "avg_shortest_path_length": -1, + "avg_clustering": -1, + } diff --git a/cognee/infrastructure/databases/graph/networkx/adapter.py b/cognee/infrastructure/databases/graph/networkx/adapter.py index ddc1707d3..018799a08 100644 --- a/cognee/infrastructure/databases/graph/networkx/adapter.py +++ b/cognee/infrastructure/databases/graph/networkx/adapter.py @@ -385,3 +385,17 @@ async def get_filtered_graph_data( ] return filtered_nodes, filtered_edges + + async def get_graph_metrics(self): + return { + "num_nodes": -1, + "num_edges": -1, + "mean_degree": -1, + "edge_density": -1, + "num_connected_components": -1, + "sizes_of_connected_components": -1, + "num_selfloops": -1, + "diameter": -1, + "avg_shortest_path_length": -1, + "avg_clustering": -1, + } diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py index f7a854e53..3134252d2 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -24,22 +24,12 @@ async def fetch_token_count(db_engine) -> int: return token_count_sum -async def calculate_graph_metrics(graph_data): - nodes, edges = graph_data - graph_metrics = { - "num_nodes": len(nodes), - "num_edges": len(edges), - } - return graph_metrics - - async def store_descriptive_metrics(data_points: list[DataPoint]): db_engine = get_relational_engine() graph_engine = await get_graph_engine() - graph_data = await graph_engine.get_graph_data() token_count_sum = await fetch_token_count(db_engine) - graph_metrics = await calculate_graph_metrics(graph_data) + graph_metrics = await graph_engine.get_graph_metrics() table_name = "graph_metrics_table" metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics From 67d9908ad2ec3fb275c76881746c29f7feb064f8 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Wed, 29 Jan 2025 15:48:55 +0100 Subject: [PATCH 09/15] Calculate descriptive metrics in networkx adapter --- cognee/api/v1/cognify/cognify_v2.py | 2 +- .../databases/graph/graph_db_interface.py | 2 +- .../databases/graph/networkx/adapter.py | 72 +++++++++++++++---- cognee/modules/data/models/GraphMetrics.py | 5 +- cognee/tasks/storage/descriptive_metrics.py | 4 +- 5 files changed, 67 insertions(+), 18 deletions(-) diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 48d46417f..e0ddb03a7 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -165,7 +165,7 @@ async def get_default_tasks( task_config={"batch_size": 10}, ), Task(add_data_points, only_root=True, task_config={"batch_size": 10}), - Task(store_descriptive_metrics), + Task(store_descriptive_metrics, include_optional=True), ] except Exception as error: send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id) diff --git a/cognee/infrastructure/databases/graph/graph_db_interface.py b/cognee/infrastructure/databases/graph/graph_db_interface.py index dfb955cd7..345261438 100644 --- a/cognee/infrastructure/databases/graph/graph_db_interface.py +++ b/cognee/infrastructure/databases/graph/graph_db_interface.py @@ -56,5 +56,5 @@ async def get_graph_data(self): raise NotImplementedError @abstractmethod - async def get_graph_metrics(self): + async def get_graph_metrics(self, include_optional): raise NotImplementedError diff --git a/cognee/infrastructure/databases/graph/networkx/adapter.py b/cognee/infrastructure/databases/graph/networkx/adapter.py index 018799a08..3fa0dbf2d 100644 --- a/cognee/infrastructure/databases/graph/networkx/adapter.py +++ b/cognee/infrastructure/databases/graph/networkx/adapter.py @@ -14,6 +14,7 @@ from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface from cognee.infrastructure.engine import DataPoint from cognee.modules.storage.utils import JSONEncoder +import numpy as np logger = logging.getLogger("NetworkXAdapter") @@ -386,16 +387,63 @@ async def get_filtered_graph_data( return filtered_nodes, filtered_edges - async def get_graph_metrics(self): - return { - "num_nodes": -1, - "num_edges": -1, - "mean_degree": -1, - "edge_density": -1, - "num_connected_components": -1, - "sizes_of_connected_components": -1, - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, + async def get_graph_metrics(self, include_optional=False): + graph = self.graph + + def _get_mean_degree(graph): + degrees = [d for _, d in graph.degree()] + return np.mean(degrees) if degrees else 0 + + def _get_edge_density(graph): + num_nodes = graph.number_of_nodes() + num_edges = graph.number_of_edges() + num_possible_edges = num_nodes * (num_nodes - 1) + edge_density = num_edges / num_possible_edges if num_possible_edges > 0 else 0 + return edge_density + + def _get_diameter(graph): + if nx.is_strongly_connected(graph): + return nx.diameter(graph.to_undirected()) + else: + return None + + def _get_avg_shortest_path_length(graph): + if nx.is_strongly_connected(graph): + return nx.average_shortest_path_length(graph) + else: + return None + + def _get_avg_clustering(graph): + try: + return nx.average_clustering(graph.to_undirected()) + except Exception as e: + print(f"Unexpected error in clustering calculation: {e}") + return None + + mandatory_metrics = { + "num_nodes": graph.number_of_nodes(), + "num_edges": graph.number_of_edges(), + "mean_degree": _get_mean_degree(graph), + "edge_density": _get_edge_density(graph), + "num_connected_components": nx.number_weakly_connected_components(graph), + "sizes_of_connected_components": [ + len(c) for c in nx.weakly_connected_components(graph) + ], } + + if include_optional: + optional_metrics = { + "num_selfloops": sum(1 for u, v in graph.edges() if u == v), + "diameter": _get_diameter(graph), + "avg_shortest_path_length": _get_avg_shortest_path_length(graph), + "avg_clustering": _get_avg_clustering(graph), + } + else: + optional_metrics = { + "num_selfloops": -1, + "diameter": -1, + "avg_shortest_path_length": -1, + "avg_clustering": -1, + } + + return mandatory_metrics | optional_metrics diff --git a/cognee/modules/data/models/GraphMetrics.py b/cognee/modules/data/models/GraphMetrics.py index 2103214c8..89535d8f7 100644 --- a/cognee/modules/data/models/GraphMetrics.py +++ b/cognee/modules/data/models/GraphMetrics.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from sqlalchemy.sql import func from sqlalchemy import Column, DateTime, Float, Integer, JSON, UUID @@ -23,5 +24,5 @@ class GraphMetrics(Base): avg_shortest_path_length = Column(Float, nullable=True) avg_clustering = Column(Float, nullable=True) - created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py index 3134252d2..36ff8ac78 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -24,12 +24,12 @@ async def fetch_token_count(db_engine) -> int: return token_count_sum -async def store_descriptive_metrics(data_points: list[DataPoint]): +async def store_descriptive_metrics(data_points: list[DataPoint], include_optional: bool): db_engine = get_relational_engine() graph_engine = await get_graph_engine() token_count_sum = await fetch_token_count(db_engine) - graph_metrics = await graph_engine.get_graph_metrics() + graph_metrics = await graph_engine.get_graph_metrics(include_optional) table_name = "graph_metrics_table" metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics From 252ac7ff88c265c4455363f5df774957c3b8b980 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Wed, 29 Jan 2025 18:01:23 +0100 Subject: [PATCH 10/15] neo4j metrics --- cognee/api/v1/cognify/cognify_v2.py | 2 +- .../databases/graph/neo4j_driver/adapter.py | 161 ++++++++++++++++-- cognee/tasks/storage/descriptive_metrics.py | 1 - 3 files changed, 150 insertions(+), 14 deletions(-) diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index e0ddb03a7..48d46417f 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -165,7 +165,7 @@ async def get_default_tasks( task_config={"batch_size": 10}, ), Task(add_data_points, only_root=True, task_config={"batch_size": 10}), - Task(store_descriptive_metrics, include_optional=True), + Task(store_descriptive_metrics), ] except Exception as error: send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id) diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py index dc4a61b51..9c76bda98 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py @@ -523,16 +523,153 @@ async def get_filtered_graph_data(self, attribute_filters): return (nodes, edges) - async def get_graph_metrics(self): - return { - "num_nodes": -1, - "num_edges": -1, - "mean_degree": -1, - "edge_density": -1, - "num_connected_components": -1, - "sizes_of_connected_components": -1, - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, + async def graph_exists(self, graph_name="myGraph"): + query = "CALL gds.graph.list() YIELD graphName RETURN collect(graphName) AS graphNames;" + result = await self.query(query) + graph_names = result[0]["graphNames"] if result else [] + return graph_name in graph_names + + async def project_entire_graph(self, graph_name="myGraph"): + """ + Projects all node labels and all relationship types into an in-memory GDS graph. + """ + if await self.graph_exists(graph_name): + return + + node_labels_query = "CALL db.labels() YIELD label RETURN collect(label) AS labels;" + node_labels_result = await self.query(node_labels_query) + node_labels = node_labels_result[0]["labels"] if node_labels_result else [] + + relationship_types_query = "CALL db.relationshipTypes() YIELD relationshipType RETURN collect(relationshipType) AS relationships;" + relationship_types_result = await self.query(relationship_types_query) + relationship_types = ( + relationship_types_result[0]["relationships"] if relationship_types_result else [] + ) + + if not node_labels or not relationship_types: + raise ValueError("No node labels or relationship types found in the database.") + + node_labels_str = "[" + ", ".join(f"'{label}'" for label in node_labels) + "]" + relationship_types_str = "[" + ", ".join(f"'{rel}'" for rel in relationship_types) + "]" + + query = f""" + CALL gds.graph.project( + '{graph_name}', + {node_labels_str}, + {relationship_types_str} + ) YIELD graphName; + """ + + await self.query(query) + + async def drop_graph(self, graph_name="myGraph"): + if self.graph_exists(graph_name): + drop_query = f"CALL gds.graph.drop('{graph_name}');" + await self.query(drop_query) + + async def get_graph_metrics(self, include_optional=False): + nodes, edges = await self.get_model_independent_graph_data() + graph_name = "myGraph" + await self.drop_graph(graph_name) + await self.project_entire_graph(graph_name) + + async def _get_mean_degree(): + query = """ + MATCH (n) + OPTIONAL MATCH (n)-[r]-() + WITH n, COUNT(r) AS degree + RETURN avg(degree) AS mean_degree; + """ + result = await self.query(query) + return result[0]["mean_degree"] if result else 0 + + async def _get_edge_density(): + query = """ + MATCH (n) + WITH count(n) AS num_nodes + MATCH ()-[r]->() + WITH num_nodes, count(r) AS num_edges + RETURN CASE + WHEN num_nodes < 2 THEN 0 + ELSE num_edges * 1.0 / (num_nodes * (num_nodes - 1)) + END AS edge_density; + """ + result = await self.query(query) + return result[0]["edge_density"] if result else 0 + + async def _get_num_connected_components(): + graph_name = "myGraph" + await self.drop_graph(graph_name) + await self.project_entire_graph(graph_name) + + query = f""" + CALL gds.wcc.stream('{graph_name}') + YIELD componentId + RETURN count(DISTINCT componentId) AS num_connected_components; + """ + + result = await self.query(query) + return result[0]["num_connected_components"] if result else 0 + + async def _get_size_of_connected_components(): + graph_name = "myGraph" + await self.drop_graph(graph_name) + await self.project_entire_graph(graph_name) + + query = f""" + CALL gds.wcc.stream('{graph_name}') + YIELD componentId + RETURN componentId, count(*) AS size + ORDER BY size DESC; + """ + + result = await self.query(query) + return [record["size"] for record in result] if result else [] + + async def _count_self_loops(): + query = """ + MATCH (n)-[r]->(n) + RETURN count(r) AS self_loop_count; + """ + result = await self.query(query) + return result[0]["self_loop_count"] if result else 0 + + async def _get_diameter(): + logging.warning("Diameter calculation is not implemented for neo4j.") + return -1 + + async def _get_avg_shortest_path_length(): + logging.warning( + "Average shortest path length calculation is not implemented for neo4j." + ) + return -1 + + async def _get_avg_clustering(): + logging.warning("Average clustering calculation is not implemented for neo4j.") + return -1 + + mandatory_metrics = { + "num_nodes": len(nodes[0]["nodes"]), + "num_edges": len(edges[0]["elements"]), + "mean_degree": await _get_mean_degree(), + "edge_density": await _get_edge_density(), + "num_connected_components": await _get_num_connected_components(), + "sizes_of_connected_components": await _get_size_of_connected_components(), } + + if include_optional: + optional_metrics = { + "num_selfloops": await _count_self_loops(), + "diameter": await _get_diameter(), + "avg_shortest_path_length": await _get_avg_shortest_path_length(), + "avg_clustering": await _get_avg_clustering(), + } + else: + optional_metrics = { + "num_selfloops": -1, + "diameter": -1, + "avg_shortest_path_length": -1, + "avg_clustering": -1, + } + + return mandatory_metrics | optional_metrics diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py index 36ff8ac78..0ec06891d 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -30,7 +30,6 @@ async def store_descriptive_metrics(data_points: list[DataPoint], include_option token_count_sum = await fetch_token_count(db_engine) graph_metrics = await graph_engine.get_graph_metrics(include_optional) - table_name = "graph_metrics_table" metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics From 9a94db8dd41239034f6d85ba7e2ce2c190066a59 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Thu, 30 Jan 2025 17:43:20 +0100 Subject: [PATCH 11/15] remove _table from table name --- cognee/modules/data/models/GraphMetrics.py | 2 +- cognee/tasks/storage/descriptive_metrics.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/modules/data/models/GraphMetrics.py b/cognee/modules/data/models/GraphMetrics.py index 2103214c8..d86a2048b 100644 --- a/cognee/modules/data/models/GraphMetrics.py +++ b/cognee/modules/data/models/GraphMetrics.py @@ -7,7 +7,7 @@ class GraphMetrics(Base): - __tablename__ = "graph_metrics_table" + __tablename__ = "graph_metrics" # TODO: Change ID to reflect unique id of graph database id = Column(UUID, primary_key=True, default=uuid4) diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/tasks/storage/descriptive_metrics.py index 3134252d2..f35e8460a 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/tasks/storage/descriptive_metrics.py @@ -31,7 +31,7 @@ async def store_descriptive_metrics(data_points: list[DataPoint]): token_count_sum = await fetch_token_count(db_engine) graph_metrics = await graph_engine.get_graph_metrics() - table_name = "graph_metrics_table" + table_name = "graph_metrics" metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics await db_engine.insert_data(table_name, metrics_dict) From 05138faf76e35f2878c24fdb0164f03c34bd3ff9 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Mon, 3 Feb 2025 11:35:27 +0100 Subject: [PATCH 12/15] Use modules for adding to db instead of infrastructure --- cognee/api/v1/cognify/cognify_v2.py | 2 +- cognee/modules/data/methods/__init__.py | 2 ++ .../methods/store_descriptive_metrics.py} | 24 +++++++++++++++---- cognee/modules/data/models/__init__.py | 1 + 4 files changed, 23 insertions(+), 6 deletions(-) rename cognee/{tasks/storage/descriptive_metrics.py => modules/data/methods/store_descriptive_metrics.py} (53%) diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 978205d2f..c727230d7 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -25,7 +25,7 @@ ) from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.storage import add_data_points -from cognee.tasks.storage.descriptive_metrics import store_descriptive_metrics +from cognee.modules.data.methods import store_descriptive_metrics from cognee.tasks.storage.index_graph_edges import index_graph_edges from cognee.tasks.summarization import summarize_text diff --git a/cognee/modules/data/methods/__init__.py b/cognee/modules/data/methods/__init__.py index c32db1d2f..57ac00c1a 100644 --- a/cognee/modules/data/methods/__init__.py +++ b/cognee/modules/data/methods/__init__.py @@ -11,3 +11,5 @@ # Delete from .delete_dataset import delete_dataset from .delete_data import delete_data + +from .store_descriptive_metrics import store_descriptive_metrics diff --git a/cognee/tasks/storage/descriptive_metrics.py b/cognee/modules/data/methods/store_descriptive_metrics.py similarity index 53% rename from cognee/tasks/storage/descriptive_metrics.py rename to cognee/modules/data/methods/store_descriptive_metrics.py index f35e8460a..6e2507305 100644 --- a/cognee/tasks/storage/descriptive_metrics.py +++ b/cognee/modules/data/methods/store_descriptive_metrics.py @@ -7,6 +7,7 @@ from cognee.modules.data.models import GraphMetrics import uuid from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.modules.data.methods import create_dataset async def fetch_token_count(db_engine) -> int: @@ -27,12 +28,25 @@ async def fetch_token_count(db_engine) -> int: async def store_descriptive_metrics(data_points: list[DataPoint]): db_engine = get_relational_engine() graph_engine = await get_graph_engine() - - token_count_sum = await fetch_token_count(db_engine) graph_metrics = await graph_engine.get_graph_metrics() - table_name = "graph_metrics" - metrics_dict = {"id": uuid.uuid4(), "num_tokens": token_count_sum} | graph_metrics + async with db_engine.get_async_session() as session: + metrics = GraphMetrics( + id=uuid.uuid4(), + num_tokens=await fetch_token_count(db_engine), + num_nodes=graph_metrics["num_nodes"], + num_edges=graph_metrics["num_edges"], + mean_degree=graph_metrics["mean_degree"], + edge_density=graph_metrics["edge_density"], + num_connected_components=graph_metrics["num_connected_components"], + sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], + num_selfloops=graph_metrics["num_selfloops"], + diameter=graph_metrics["diameter"], + avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], + avg_clustering=graph_metrics["avg_clustering"], + ) + + session.add(metrics) + await session.commit() - await db_engine.insert_data(table_name, metrics_dict) return data_points diff --git a/cognee/modules/data/models/__init__.py b/cognee/modules/data/models/__init__.py index bd5774f88..51d6ad1d5 100644 --- a/cognee/modules/data/models/__init__.py +++ b/cognee/modules/data/models/__init__.py @@ -1,3 +1,4 @@ from .Data import Data from .Dataset import Dataset from .DatasetData import DatasetData +from .GraphMetrics import GraphMetrics From d93b5f51982ada40526139d40ac440f7b2fa94d4 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Mon, 3 Feb 2025 17:35:21 +0100 Subject: [PATCH 13/15] minor fixes --- .../databases/graph/neo4j_driver/adapter.py | 11 +++++++---- .../databases/graph/networkx/adapter.py | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py index e3ff51e0c..35900f6be 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py @@ -571,7 +571,7 @@ async def project_entire_graph(self, graph_name="myGraph"): await self.query(query) async def drop_graph(self, graph_name="myGraph"): - if self.graph_exists(graph_name): + if await self.graph_exists(graph_name): drop_query = f"CALL gds.graph.drop('{graph_name}');" await self.query(drop_query) @@ -656,10 +656,13 @@ async def _get_avg_clustering(): logging.warning("Average clustering calculation is not implemented for neo4j.") return -1 + num_nodes = len(nodes[0]["nodes"]) + num_edges = len(edges[0]["elements"]) + mandatory_metrics = { - "num_nodes": len(nodes[0]["nodes"]), - "num_edges": len(edges[0]["elements"]), - "mean_degree": await _get_mean_degree(), + "num_nodes": num_nodes, + "num_edges": num_edges, + "mean_degree": (2 * num_edges) / num_nodes if num_nodes != 0 else None, "edge_density": await _get_edge_density(), "num_connected_components": await _get_num_connected_components(), "sizes_of_connected_components": await _get_size_of_connected_components(), diff --git a/cognee/infrastructure/databases/graph/networkx/adapter.py b/cognee/infrastructure/databases/graph/networkx/adapter.py index 3fa0dbf2d..89dcca021 100644 --- a/cognee/infrastructure/databases/graph/networkx/adapter.py +++ b/cognee/infrastructure/databases/graph/networkx/adapter.py @@ -415,9 +415,9 @@ def _get_avg_shortest_path_length(graph): def _get_avg_clustering(graph): try: - return nx.average_clustering(graph.to_undirected()) + return nx.average_clustering(nx.DiGraph(graph)) except Exception as e: - print(f"Unexpected error in clustering calculation: {e}") + logger.warning("Failed to calculate clustering coefficient: %s", e) return None mandatory_metrics = { From c13fdec943896bd4a887c4cc0b39c8e485eeaac8 Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Mon, 3 Feb 2025 18:14:30 +0100 Subject: [PATCH 14/15] minor cleanup --- .../databases/graph/neo4j_driver/adapter.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py index 35900f6be..767b21142 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py @@ -581,16 +581,6 @@ async def get_graph_metrics(self, include_optional=False): await self.drop_graph(graph_name) await self.project_entire_graph(graph_name) - async def _get_mean_degree(): - query = """ - MATCH (n) - OPTIONAL MATCH (n)-[r]-() - WITH n, COUNT(r) AS degree - RETURN avg(degree) AS mean_degree; - """ - result = await self.query(query) - return result[0]["mean_degree"] if result else 0 - async def _get_edge_density(): query = """ MATCH (n) @@ -606,7 +596,6 @@ async def _get_edge_density(): return result[0]["edge_density"] if result else 0 async def _get_num_connected_components(): - graph_name = "myGraph" await self.drop_graph(graph_name) await self.project_entire_graph(graph_name) @@ -620,7 +609,6 @@ async def _get_num_connected_components(): return result[0]["num_connected_components"] if result else 0 async def _get_size_of_connected_components(): - graph_name = "myGraph" await self.drop_graph(graph_name) await self.project_entire_graph(graph_name) From 3e67828cab458b301bde2e87828ad33289b7882b Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Tue, 4 Feb 2025 12:22:33 +0100 Subject: [PATCH 15/15] Remove graph metric calculation from the default cognify pipeline --- cognee/api/v1/cognify/cognify_v2.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 55792f2da..9385507ce 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -165,7 +165,6 @@ async def get_default_tasks( task_config={"batch_size": 10}, ), Task(add_data_points, task_config={"batch_size": 10}), - Task(store_descriptive_metrics, include_optional=True), ] except Exception as error: send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id)