Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate graphiti's temporal awareness functionality as Tasks #253

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cognee/tasks/temporal_awareness/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .build_graph_with_temporal_awareness import \
build_graph_with_temporal_awareness
from .search_graph_with_temporal_awareness import \
search_graph_with_temporal_awareness
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from datetime import datetime

from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
Comment on lines +4 to +5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Missing dependency specification for graphiti-core

The code correctly uses the Graphiti API with proper initialization and method calls (Graphiti() constructor, build_indices_and_constraints(), add_episode()), and the EpisodeType enum is used appropriately. However, the graphiti-core dependency is not specified in any requirements file, which could lead to installation issues.

  • Add graphiti-core to your project's requirements file to ensure proper dependency management
  • Consider externalizing the database connection parameters instead of hardcoding them
🔗 Analysis chain

Verify Graphiti API usage.

Let's verify that we're using the correct Graphiti API methods and that all required dependencies are properly specified in the project's requirements.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify Graphiti dependency and API usage

# Check if graphiti-core is in requirements
fd -t f "requirements.*\.txt" -x grep -l "graphiti-core"

# Look for other Graphiti method usages to ensure consistency
rg "from graphiti_core import|from graphiti_core\." -A 3

Length of output: 609


Script:

#!/bin/bash
# Let's check the actual usage of Graphiti and EpisodeType in the file
ast-grep --pattern 'class $_ {
  $$$
  Graphiti.$_($$$)
  $$$
}'

# Also check direct Graphiti instantiation
ast-grep --pattern 'Graphiti($$$)'

# And check EpisodeType usage
ast-grep --pattern 'EpisodeType.$_'

# Let's also see the full content of the build_graph function to understand the context
rg -A 20 "async def build_graph_with_temporal_awareness" cognee/tasks/temporal_awareness/build_graph_with_temporal_awareness.py

Length of output: 1097



async def build_graph_with_temporal_awareness(text_list):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add type hints and documentation.

The function lacks type hints and documentation, which are essential for maintainability and usability.

-async def build_graph_with_temporal_awareness(text_list):
+async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti:
+    """Build a temporal awareness graph from a list of texts.
+
+    Args:
+        text_list: List of text strings to be added as episodes.
+
+    Returns:
+        Graphiti: Initialized graphiti instance with added episodes.
+
+    Raises:
+        ConnectionError: If unable to connect to Neo4j database.
+    """

Committable suggestion skipped: line range outside the PR's diff.


url = os.getenv("GRAPH_DATABASE_URL")
password = os.getenv("GRAPH_DATABASE_PASSWORD")
graphiti = Graphiti(url, "neo4j", password)

await graphiti.build_indices_and_constraints()
print("Graph database initialized.")

for i, text in enumerate(text_list):
await graphiti.add_episode(
name=f"episode_{i}",
episode_body=text,
source=EpisodeType.text,
source_description="input",
reference_time=datetime.now()
)
print(f"Added text: {text[:35]}...")
Comment on lines +17 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve temporal awareness implementation.

The current implementation uses the same timestamp (current time) for all episodes, which might not be accurate for temporal awareness. Consider accepting a reference time parameter or extracting temporal information from the text.

-async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti:
+async def build_graph_with_temporal_awareness(
+    text_list: list[str],
+    reference_times: list[datetime] | None = None
+) -> Graphiti:
     # ... initialization code ...
     
     for i, text in enumerate(text_list):
+        reference_time = (
+            reference_times[i] if reference_times 
+            else datetime.now()
+        )
         await graphiti.add_episode(
             name=f"episode_{i}",
             episode_body=text,
             source=EpisodeType.text,
             source_description="input",
-            reference_time=datetime.now()
+            reference_time=reference_time
         )

Committable suggestion skipped: line range outside the PR's diff.

return graphiti
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@


async def search_graph_with_temporal_awareness(graphiti, query):
search_result = await graphiti.search(query)
await graphiti.close()
return search_result
Comment on lines +3 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling, type hints, and documentation

The function needs several improvements for robustness and maintainability:

Consider this enhanced implementation:

-async def search_graph_with_temporal_awareness(graphiti, query):
-    search_result = await graphiti.search(query)
-    await graphiti.close()
-    return search_result
+from typing import Any, TypeVar
+
+T = TypeVar('T')
+
+async def search_graph_with_temporal_awareness(graphiti: Any, query: str) -> T:
+    """Search the temporal awareness graph with the given query.
+    
+    Args:
+        graphiti: The graphiti instance to use for searching
+        query: The search query string
+        
+    Returns:
+        The search result from graphiti
+        
+    Raises:
+        Exception: If the search operation fails
+    """
+    try:
+        search_result = await graphiti.search(query)
+        return search_result
+    except Exception as e:
+        raise Exception(f"Failed to search graph: {str(e)}") from e
+    finally:
+        try:
+            await graphiti.close()
+        except Exception:
+            # Log error but don't fail the operation
+            pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def search_graph_with_temporal_awareness(graphiti, query):
search_result = await graphiti.search(query)
await graphiti.close()
return search_result
from typing import Any, TypeVar
T = TypeVar('T')
async def search_graph_with_temporal_awareness(graphiti: Any, query: str) -> T:
"""Search the temporal awareness graph with the given query.
Args:
graphiti: The graphiti instance to use for searching
query: The search query string
Returns:
The search result from graphiti
Raises:
Exception: If the search operation fails
"""
try:
search_result = await graphiti.search(query)
return search_result
except Exception as e:
raise Exception(f"Failed to search graph: {str(e)}") from e
finally:
try:
await graphiti.close()
except Exception:
# Log error but don't fail the operation
pass

29 changes: 29 additions & 0 deletions examples/python/graphiti_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio

import cognee
from cognee.api.v1.search import SearchType
from cognee.modules.pipelines import Task, run_tasks
from cognee.tasks.temporal_awareness import (
build_graph_with_temporal_awareness, search_graph_with_temporal_awareness)

text_list = [
"Kamala Harris is the Attorney General of California. She was previously "
"the district attorney for San Francisco.",
"As AG, Harris was in office from January 3, 2011 – January 3, 2017",
]

async def main():

tasks = [
Task(build_graph_with_temporal_awareness, text_list=text_list),
Task(search_graph_with_temporal_awareness, query='Who was the California Attorney General?')
]
Comment on lines +17 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve task dependency management and error handling

The second task depends on the result of the first task, but this dependency isn't properly managed.

Consider this improved implementation:

-    tasks = [
-        Task(build_graph_with_temporal_awareness, text_list=text_list),
-        Task(search_graph_with_temporal_awareness, query='Who was the California Attorney General?')
-    ]
+    try:
+        graphiti = await build_graph_with_temporal_awareness(text_list=text_list)
+        tasks = [
+            Task(search_graph_with_temporal_awareness, 
+                 graphiti=graphiti, 
+                 query='Who was the California Attorney General?')
+        ]
+    except Exception as e:
+        print(f"Failed to build graph: {e}")
+        return

Committable suggestion skipped: line range outside the PR's diff.


pipeline = run_tasks(tasks)

async for result in pipeline:
print(result)


if __name__ == '__main__':
asyncio.run(main())
Loading