-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: adds cognee node and edge embeddings for graphiti graph #437
feat: adds cognee node and edge embeddings for graphiti graph #437
Conversation
WalkthroughThis pull request introduces several enhancements to the Cognee system's graph and temporal awareness capabilities. A new asynchronous method Changes
Possibly related PRs
Suggested reviewers
Poem
Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (2)
cognee/tasks/temporal_awareness/graphiti_model.py (1)
5-12
: Add docstring to document the GraphitiNode class purpose and fields.The class structure looks good, but adding documentation would improve maintainability and help other developers understand the purpose of each field.
Add a docstring like this:
class GraphitiNode(DataPoint): + """Represents a node in the Graphiti graph with content, name, and summary. + + Attributes: + content (Optional[str]): The main content of the node + name (Optional[str]): The name identifier of the node + summary (Optional[str]): A summarized description of the node + pydantic_type (str): The type identifier for serialization + """ __tablename__ = "graphitinode"cognee/tasks/storage/index_graph_edges.py (1)
49-53
: Improve edge data validation.The edge data validation could be more robust to handle malformed data.
- edge_types = Counter( - edge[1] - for edge in edges_data[0]["relationships"] - if isinstance(edge, tuple) and len(edge) == 3 - ) + edge_types = Counter() + for edge in edges_data[0].get("relationships", []): + if not isinstance(edge, tuple) or len(edge) != 3: + logging.warning(f"Skipping malformed edge data: {edge}") + continue + edge_types[edge[1]] += 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
cognee/infrastructure/databases/graph/graph_db_interface.py
(1 hunks)cognee/infrastructure/databases/graph/neo4j_driver/adapter.py
(1 hunks)cognee/tasks/storage/index_graph_edges.py
(1 hunks)cognee/tasks/temporal_awareness/graphiti_model.py
(1 hunks)examples/python/graphiti_example.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: docker-compose-test
🔇 Additional comments (2)
examples/python/graphiti_example.py (1)
29-39
: Restore or document removal of search functionality.The search_graph_with_temporal_awareness task was removed. If this is intentional, please document why. If not, consider restoring it or providing an alternative example.
Run this script to check for other examples of temporal awareness search:
cognee/infrastructure/databases/graph/neo4j_driver/adapter.py (1)
429-437
: Verify memory usage with large graphs.The use of
collect()
in Cypher queries can be memory-intensive for large graphs as it loads all nodes/relationships into memory at once.Let's verify the potential impact:
✅ Verification successful
Memory usage is an inherent requirement of this method
The use of
collect()
is necessary here as the methodget_model_independent_graph_data()
is specifically designed to retrieve a complete graph snapshot. While memory usage is a valid concern, it's an expected behavior clearly indicated by the method's name and purpose.
- Consider documenting this memory requirement in the method's docstring to make it explicit for callers.
- Ensure this method is only used when a complete graph snapshot is truly needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check the size of the graph to assess memory impact # Expected results: Large node/relationship counts would indicate potential memory issues # Get node count echo "Checking node count..." ast-grep --pattern 'MATCH (n) RETURN count(n) AS count' # Get relationship count echo "Checking relationship count..." ast-grep --pattern 'MATCH ()-[r]->() RETURN count(r) AS count' # Check for existing pagination patterns echo "Checking for existing pagination patterns..." rg -l 'SKIP.*LIMIT' .Length of output: 371
Script:
#!/bin/bash # Search for collect() usage in queries echo "Searching for collect() usage patterns..." rg "collect\(" --type py -B 2 -A 2 # Look for any memory/size configurations echo "Checking for memory/size configurations..." rg -i "(memory|size|limit|max.*nodes|max.*relationships)" --type py # Search for LIMIT usage in queries echo "Checking for LIMIT usage in queries..." rg "LIMIT" --type py -B 2 -A 2Length of output: 35733
…s graphiti does not support networkx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
cognee/tasks/temporal_awareness/index_graphiti_objects.py (2)
21-25
: Optimize bulk update queries to enhance performanceThe queries at lines 21-25 perform property updates on all nodes and relationships, which may lead to performance issues on large graphs.
Consider batching the updates or adding filters to limit the scope of these queries. For example, update only nodes or relationships that lack the required properties:
- await graph_engine.query("""MATCH (n) SET n.id = n.uuid RETURN n""") + await graph_engine.query("""MATCH (n) WHERE n.id IS NULL SET n.id = n.uuid RETURN n""")This approach reduces the load on the database and improves the efficiency of the operation.
62-79
: Refactor duplicated indexing logic for maintainabilityThe indexing logic for nodes and edges is similar and can be refactored to reduce code duplication and enhance maintainability.
Extract the common indexing steps into a separate function:
async def index_data_points(data_points, data_point_type, created_indexes, index_points, vector_engine): for data_point in data_points: for field_name in data_point._metadata["index_fields"]: index_name = f"{data_point_type.__tablename__}.{field_name}" if index_name not in created_indexes: await vector_engine.create_vector_index(data_point_type.__tablename__, field_name) created_indexes[index_name] = True if index_name not in index_points: index_points[index_name] = [] if getattr(data_point, field_name, None) is not None: indexed_data_point = data_point.model_copy() indexed_data_point._metadata["index_fields"] = [field_name] index_points[index_name].append(indexed_data_point)Then, call this function for both nodes and edges:
# For nodes await index_data_points( data_points=[graphiti_node], data_point_type=type(graphiti_node), created_indexes=created_indexes, index_points=index_points, vector_engine=vector_engine ) # For edges await index_data_points( data_points=[edge], data_point_type=type(edge), created_indexes=created_indexes, index_points=index_points, vector_engine=vector_engine )This refactoring simplifies the code and makes future maintenance easier.
examples/python/graphiti_example.py (1)
60-65
: Add error handling for LLM client callsThe LLM client request may fail due to network issues or API errors. Adding error handling ensures the program can handle such exceptions gracefully.
Apply this diff to handle possible exceptions:
llm_client = get_llm_client() - computed_answer = await llm_client.acreate_structured_output( - text_input=user_prompt, - system_prompt=system_prompt, - response_model=str, - ) + try: + computed_answer = await llm_client.acreate_structured_output( + text_input=user_prompt, + system_prompt=system_prompt, + response_model=str, + ) + except Exception as e: + logging.error("LLM client call failed: %s", e) + computed_answer = "An error occurred while generating the answer."This ensures that the application continues to function even if the LLM request fails.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cognee/infrastructure/databases/graph/neo4j_driver/adapter.py
(1 hunks)cognee/tasks/temporal_awareness/index_graphiti_objects.py
(1 hunks)examples/python/graphiti_example.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: docker-compose-test
- GitHub Check: Publish Cognee Docker image
🔇 Additional comments (3)
examples/python/graphiti_example.py (2)
27-29
: Add error handling for initialization operationsThe initialization steps should include error handling to gracefully manage failures.
Apply this change to wrap the initialization in a try-except block:
setup_logging(logging.INFO) - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - await create_relational_db_and_tables() + try: + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + await create_relational_db_and_tables() + except Exception as e: + logging.error("Failed to initialize: %s", e) + raise RuntimeError("Setup failed") from e
43-43
: Verify the success of node and edge indexingAfter calling
index_and_transform_graphiti_nodes_and_edges()
, there is no verification or logging to confirm that the indexing was successful.Consider adding logging to indicate the completion of the indexing process:
await index_and_transform_graphiti_nodes_and_edges() + logging.info("Successfully indexed nodes and edges.")
This provides operational visibility and aids in debugging if issues arise.
✅ Verification successful
Verify the success of node and edge indexing
The suggestion to add success logging is valid. The function performs critical database operations including graph queries, vector index creation, and data point indexing, but lacks completion logging. Adding success logging aligns with the codebase's established patterns and provides valuable operational visibility.
The original review suggestion to add:
logging.info("Successfully indexed nodes and edges.")is appropriate and consistent with the logging patterns found throughout the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash # Find the implementation of the function and its context ast-grep --pattern 'async def index_and_transform_graphiti_nodes_and_edges($$$)' # Look for logging patterns in Python files rg "logging\." -A 2 --type py # Find similar async function patterns with logging ast-grep --pattern 'async def $FUNC($$$) $$$ logging.$$$ $$$'Length of output: 14651
Script:
#!/bin/bash # Search for the function in task-related files rg "index_and_transform_graphiti_nodes_and_edges" --type py -C 5 # Specifically look in temporal awareness tasks fd . "cognee/tasks/temporal_awareness" --type f --exec rg "index_and_transform_graphiti_nodes_and_edges" -C 5 {}Length of output: 3266
Script:
#!/bin/bash # Get the complete function implementation including error handling rg -U "async def index_and_transform_graphiti_nodes_and_edges\(\)[\s\S]*?(?=\n\n|\Z)" --type pyLength of output: 444
Script:
#!/bin/bash # Get the complete function implementation using ast-grep ast-grep --pattern 'async def index_and_transform_graphiti_nodes_and_edges() { $$$ }' # Fallback to simpler grep with context rg "async def index_and_transform_graphiti_nodes_and_edges" -A 20 --type pyLength of output: 2313
Script:
#!/bin/bash # Get the complete function implementation with more context rg "async def index_and_transform_graphiti_nodes_and_edges" -A 50 --type pyLength of output: 5228
cognee/infrastructure/databases/graph/neo4j_driver/adapter.py (1)
429-437
: Add type hints and error handling toget_model_independent_graph_data
Including type hints and error handling enhances code reliability and clarity.
Update the method as follows:
+ from typing import Tuple, List, Dict, Any ... async def get_model_independent_graph_data(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: query_nodes = "MATCH (n) RETURN collect(n) AS nodes" - nodes = await self.query(query_nodes) + try: + nodes = await self.query(query_nodes) + except Neo4jError as error: + logger.error("Failed to retrieve nodes: %s", error, exc_info=True) + raise query_edges = "MATCH (n)-[r]->(m) RETURN collect([n, r, m]) AS elements" - edges = await self.query(query_edges) + try: + edges = await self.query(query_edges) + except Neo4jError as error: + logger.error("Failed to retrieve edges: %s", error, exc_info=True) + raise return (nodes, edges)Additionally, for large graphs, consider implementing pagination to handle data efficiently and prevent potential memory issues.
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9573981 | Triggered | Generic Password | 1db44de | notebooks/cognee_graphiti_demo.ipynb | View secret |
8719688 | Triggered | Generic Password | 1db44de | notebooks/cognee_graphiti_demo.ipynb | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
Leaked passwords are the dummy passwords here from github actions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (1)
examples/python/graphiti_example.py (1)
27-29
:⚠️ Potential issueAdd error handling for initialization operations.
Critical setup operations should include error handling.
+try: await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) await create_relational_db_and_tables() +except Exception as e: + logging.error("Failed to initialize: %s", e) + raise RuntimeError("Setup failed") from e
🧹 Nitpick comments (3)
notebooks/cognee_graphiti_demo.ipynb (2)
127-154
: Add error handling and progress tracking for database operations.While the code structure is good, it could benefit from better error handling and progress tracking:
Consider these improvements:
# 🏗️ Creating Relational Database and Tables -await create_relational_db_and_tables() +try: + await create_relational_db_and_tables() +except Exception as e: + logging.error(f"Failed to create database tables: {e}") + raise # 📚 Adding Text Data to Cognee +total_texts = len(text_list) +for i, text in enumerate(text_list, 1): + try: + await cognee.add(text) + print(f"Processed {i}/{total_texts} texts") + except Exception as e: + logging.error(f"Failed to process text: {text[:50]}... Error: {e}") + raise
1-261
: Consider extracting demo code into reusable utilities.While this notebook effectively demonstrates the integration, consider:
- Creating a reusable
CogneeGraphitiDemo
class that encapsulates the setup and execution logic- Moving configuration to a separate config file or environment variables
- Adding integration tests based on this demo flow
This would make the demo more maintainable and serve as a reference implementation for other integrations.
examples/python/graphiti_example.py (1)
4-17
: Improve import organization for better readability.Consider grouping related imports with newlines and adding comments to indicate functional groups:
import asyncio import cognee import logging + +# Core functionality from cognee.modules.pipelines import Task, run_tasks from cognee.shared.utils import setup_logging from cognee.tasks.temporal_awareness import build_graph_with_temporal_awareness + +# Database and indexing from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, ) from cognee.tasks.temporal_awareness.index_graphiti_objects import ( index_and_transform_graphiti_nodes_and_edges, ) + +# Retrieval and LLM from cognee.modules.retrieval.brute_force_triplet_search import brute_force_triplet_search from cognee.tasks.completion.graph_query_completion import retrieved_edges_to_string from cognee.infrastructure.llm.prompts import read_query_prompt, render_prompt from cognee.infrastructure.llm.get_llm_client import get_llm_client
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
examples/python/graphiti_example.py
(2 hunks)notebooks/cognee_graphiti_demo.ipynb
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: run_notebook_test / test
- GitHub Check: run_dynamic_steps_example_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: test
- GitHub Check: run_multimedia_example_test / test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: docker-compose-test
- GitHub Check: Publish Cognee Docker image
🔇 Additional comments (4)
notebooks/cognee_graphiti_demo.ipynb (2)
21-36
: Well-structured imports!The imports are properly organized and cover all necessary functionality for the demo, including task management, database operations, and LLM integration.
104-108
: Good choice of sample data!The example text effectively demonstrates temporal information processing capabilities while remaining concise and clear.
examples/python/graphiti_example.py (2)
Line range hint
19-23
: LGTM! Sample data is clear and demonstrates the use case well.
43-50
: 🛠️ Refactor suggestion
⚠️ Potential issueAdd error handling and parameterize search configuration.
The graph operations lack error handling and use hardcoded values.
+try: await index_and_transform_graphiti_nodes_and_edges() +except Exception as e: + logging.error("Failed to index graph data: %s", e) + raise RuntimeError("Indexing failed") from e +# Define search configuration +SEARCH_CONFIG = { + "top_k": 3, + "collections": ["graphitinode_content", "graphitinode_name", "graphitinode_summary"] +} query = "When was Kamala Harris in office?" +try: triplets = await brute_force_triplet_search( query=query, - top_k=3, - collections=["graphitinode_content", "graphitinode_name", "graphitinode_summary"], + **SEARCH_CONFIG ) +except Exception as e: + logging.error("Search failed: %s", e) + raise RuntimeError("Search failed") from eLikely invalid or redundant comment.
Summary by CodeRabbit
Release Notes
New Features
GraphitiNode
for enhanced data representation.Improvements
Technical Updates