-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add versioning to the data point model #378
Conversation
Warning Rate limit exceeded@Vasilije1990 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 24 minutes and 12 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (8)
WalkthroughThe pull request introduces enhancements to three key files in the Cognee project. The Changes
Sequence DiagramsequenceDiagram
participant DataPoint
participant Serializer
participant Monitoring
DataPoint->>DataPoint: update_version()
DataPoint->>Serializer: to_json()
DataPoint->>Serializer: to_pickle()
DataPoint->>Serializer: to_dict()
alt Langfuse Monitoring
Monitoring->>DataPoint: Apply @observe decorator
end
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
cognee/infrastructure/engine/models/DataPoint.py (3)
11-14
: Consider making MetaData more type-safeThe
MetaData
type definition could be more specific about the allowed values.Consider this improvement:
class MetaData(TypedDict): - index_fields: list[str] + index_fields: list[str] + type: Literal["DataPoint"]
36-47
: Consider adding docstring type hints and return valueThe method documentation could be more explicit about types and return values.
Consider enhancing the docstring:
@classmethod def get_embeddable_data(cls, data_point): - """Retrieve embeddable data based on metadata's index_fields.""" + """Retrieve embeddable data based on metadata's index_fields. + + Args: + data_point: DataPoint instance to extract data from + + Returns: + str | Any: The stripped string value if the attribute is a string, + otherwise the original attribute value + """
16-29
: Consider adding database indexes and documenting versioning strategySince this is a database model with timestamp fields and versioning:
- Consider adding database indexes for
created_at
andupdated_at
fields to optimize queries by time range- Document the versioning strategy, including:
- When and how versions should be incremented
- The meaning of version numbers
- Migration strategy for existing data
created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | ||
updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify timestamp creation and add validation
The timestamp creation could be simplified and should validate against negative values.
Consider this improvement:
- created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000))
- updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000))
+ created_at: int = Field(
+ default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000),
+ ge=0
+ )
+ updated_at: int = Field(
+ default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000),
+ ge=0
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | |
updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | |
created_at: int = Field( | |
default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), | |
ge=0 | |
) | |
updated_at: int = Field( | |
default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), | |
ge=0 | |
) |
def update_version(self, new_version: str): | ||
"""Update the version and updated_at timestamp.""" | ||
self.version = new_version | ||
self.updated_at = int(datetime.now(timezone.utc).timestamp() * 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve version update method with validation
The update_version
method should validate the version format and reuse timestamp logic.
Consider this improvement:
def update_version(self, new_version: str):
"""Update the version and updated_at timestamp."""
+ if not re.match(r"^\d+\.\d+$", new_version):
+ raise ValueError("Version must be in format 'X.Y'")
self.version = new_version
- self.updated_at = int(datetime.now(timezone.utc).timestamp() * 1000)
+ self.updated_at = Field(
+ default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)
+ ).default_factory()
Committable suggestion skipped: line range outside the PR's diff.
version: str = "0.1" # Default version | ||
source: Optional[str] = None # Path to file, URL, etc. | ||
type: Optional[str] = "text" # "text", "file", "image", "video" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add version format validation and type enumeration
The version string and type field should have proper validation.
Consider these improvements:
+from enum import Enum
+import re
+
+class DataPointType(str, Enum):
+ TEXT = "text"
+ FILE = "file"
+ IMAGE = "image"
+ VIDEO = "video"
class DataPoint(BaseModel):
# ... other fields ...
- version: str = "0.1" # Default version
- type: Optional[str] = "text" # "text", "file", "image", "video"
+ version: str = Field(
+ default="0.1",
+ regex=r"^\d+\.\d+$"
+ )
+ type: Optional[DataPointType] = Field(default=DataPointType.TEXT)
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (3)
cognee/infrastructure/engine/models/DataPoint.py (3)
20-21
: 🛠️ Refactor suggestionAdd validation for timestamp fields
The timestamp fields should include validation to prevent negative values.
- created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) - updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) + created_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0 + ) + updated_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0 + )
22-24
: 🛠️ Refactor suggestionAdd version format validation and type enumeration
The version string and type field should have proper validation.
+from enum import Enum +import re + +class DataPointType(str, Enum): + TEXT = "text" + FILE = "file" + IMAGE = "image" + VIDEO = "video" - version: str = "0.1" # Default version - type: Optional[str] = "text" # "text", "file", "image", "video" + version: str = Field( + default="0.1", + pattern=r"^\d+\.\d+$" + ) + type: Optional[DataPointType] = Field(default=DataPointType.TEXT)
59-62
: 🛠️ Refactor suggestionImprove version update method with validation
The
update_version
method should validate the version format.def update_version(self, new_version: str): """Update the version and updated_at timestamp.""" + if not re.match(r"^\d+\.\d+$", new_version): + raise ValueError("Version must be in format 'X.Y'") self.version = new_version self.updated_at = int(datetime.now(timezone.utc).timestamp() * 1000)
🧹 Nitpick comments (1)
cognee/infrastructure/llm/openai/adapter.py (1)
42-43
: Remove commented codeThe commented callback configuration lines should be removed as they are not being used and may cause confusion.
- # litellm.success_callback = ["langfuse"] - # litellm.failure_callback = ["langfuse"]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.github/workflows/profiling.yaml
(1 hunks)cognee/base_config.py
(1 hunks)cognee/infrastructure/engine/models/DataPoint.py
(2 hunks)cognee/infrastructure/llm/openai/adapter.py
(2 hunks)
🔇 Additional comments (2)
.github/workflows/profiling.yaml (1)
60-61
: Verify necessity of new dependencies
The newly added packages parso
and jedi
aren't directly used in the workflow. Please verify if these are necessary dependencies or if they're being installed for a specific purpose.
✅ Verification successful
Dependencies are necessary for code analysis functionality
The parso
and jedi
packages are actively used in the codebase for Python code analysis:
- Used in
get_local_dependencies.py
for parsing code and extracting code entities - Used in
extract_code_parts.py
for parsing source code - Used in
top_down_repo_parse.py
for project-wide code analysis
These packages are essential dependencies for the repository's code analysis functionality and their installation in the workflow is justified.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if parso and jedi are required dependencies
poetry show --tree | grep -A 5 -E "parso|jedi"
Length of output: 145
Script:
#!/bin/bash
# Search for any references to parso or jedi in Python files
rg -l "import (parso|jedi)" -t python
# Search for any references in requirements files
fd "requirements.*\.txt$" --exec grep -l "parso\|jedi" {} \;
# Search for any references in pyproject.toml or setup files
fd "(pyproject\.toml|setup\.py|setup\.cfg)$" --exec grep -l "parso\|jedi" {} \;
# Look for any documentation or comments mentioning these packages
rg -l "parso|jedi" -g "!*.{pyc,pyo,pyd}" -g "!.git"
Length of output: 442
Script:
#!/bin/bash
# Check the content of files that reference these packages
rg "parso|jedi" poetry.lock -A 5 -B 5
# Check how these packages are used in the Python files
rg "parso|jedi" cognee/tasks/repo_processor/get_local_dependencies.py
rg "parso|jedi" cognee/tasks/repo_processor/extract_code_parts.py
rg "parso|jedi" cognee/tasks/repo_processor/top_down_repo_parse.py
Length of output: 5192
cognee/infrastructure/llm/openai/adapter.py (1)
9-9
: LGTM: Import statements are correctly added
The new imports for MonitoringTool
and get_base_config
are appropriately placed and necessary for the monitoring configuration feature.
Also applies to: 13-13
cognee/base_config.py
Outdated
langfuse_public_key: Optional[str] = os.getenv("LANGFUSE_PUBLIC_KEY") | ||
langfuse_secret_key: Optional[str] = os.getenv("LANGFUSE_SECRET_KEY") | ||
langfuse_host: Optional[str] = os.environ["LANGFUSE_HOST"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential KeyError in environment variable access
The direct access to LANGFUSE_HOST
using os.environ[]
could raise a KeyError if the environment variable isn't set. Consider using os.getenv()
for consistent behavior with other environment variables.
- langfuse_host: Optional[str] = os.environ["LANGFUSE_HOST"]
+ langfuse_host: Optional[str] = os.getenv("LANGFUSE_HOST")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
langfuse_public_key: Optional[str] = os.getenv("LANGFUSE_PUBLIC_KEY") | |
langfuse_secret_key: Optional[str] = os.getenv("LANGFUSE_SECRET_KEY") | |
langfuse_host: Optional[str] = os.environ["LANGFUSE_HOST"] | |
langfuse_public_key: Optional[str] = os.getenv("LANGFUSE_PUBLIC_KEY") | |
langfuse_secret_key: Optional[str] = os.getenv("LANGFUSE_SECRET_KEY") | |
langfuse_host: Optional[str] = os.getenv("LANGFUSE_HOST") |
@classmethod | ||
@classmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove duplicate @classmethod decorator
The @classmethod
decorator is duplicated.
- @classmethod
- @classmethod
+ @classmethod
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@classmethod | |
@classmethod | |
@classmethod |
# Pickle Serialization | ||
def to_pickle(self) -> bytes: | ||
"""Serialize the instance to pickle-compatible bytes.""" | ||
return pickle.dumps(self.dict()) | ||
|
||
@classmethod | ||
def from_pickle(self, pickled_data: bytes): | ||
"""Deserialize the instance from pickled bytes.""" | ||
data = pickle.loads(pickled_data) | ||
return self(**data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Security concern: Replace pickle with a safer serialization method
Using pickle
for serialization poses a security risk as it can execute arbitrary code during deserialization. Consider using a safer alternative like JSON or MessagePack.
- def to_pickle(self) -> bytes:
- """Serialize the instance to pickle-compatible bytes."""
- return pickle.dumps(self.dict())
-
- @classmethod
- def from_pickle(self, pickled_data: bytes):
- """Deserialize the instance from pickled bytes."""
- data = pickle.loads(pickled_data)
- return self(**data)
+ def to_bytes(self) -> bytes:
+ """Serialize the instance to bytes using JSON."""
+ return self.json().encode('utf-8')
+
+ @classmethod
+ def from_bytes(cls, data: bytes):
+ """Deserialize the instance from JSON bytes."""
+ return cls.parse_raw(data)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Pickle Serialization | |
def to_pickle(self) -> bytes: | |
"""Serialize the instance to pickle-compatible bytes.""" | |
return pickle.dumps(self.dict()) | |
@classmethod | |
def from_pickle(self, pickled_data: bytes): | |
"""Deserialize the instance from pickled bytes.""" | |
data = pickle.loads(pickled_data) | |
return self(**data) | |
def to_bytes(self) -> bytes: | |
"""Serialize the instance to bytes using JSON.""" | |
return self.json().encode('utf-8') | |
@classmethod | |
def from_bytes(cls, data: bytes): | |
"""Deserialize the instance from JSON bytes.""" | |
return cls.parse_raw(data) |
base_config = get_base_config() | ||
if base_config.monitoring_tool == MonitoringTool.LANGFUSE: | ||
# set callbacks | ||
# litellm.success_callback = ["langfuse"] | ||
# litellm.failure_callback = ["langfuse"] | ||
self.aclient.success_callback = ["langfuse"] | ||
self.aclient.failure_callback = ["langfuse"] | ||
self.client.success_callback = ["langfuse"] | ||
self.client.failure_callback = ["langfuse"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor callback configuration and add error handling
The current implementation has several areas for improvement:
- Duplicate callback configuration for both clients
- Missing error handling for base_config
- Missing documentation for the monitoring feature
Consider refactoring like this:
+ def _configure_langfuse_callbacks(self, client):
+ """Configure Langfuse callbacks for the given client."""
+ client.success_callback = ["langfuse"]
+ client.failure_callback = ["langfuse"]
def __init__(
self,
api_key: str,
endpoint: str,
api_version: str,
model: str,
transcription_model: str,
streaming: bool = False,
):
+ """Initialize OpenAI adapter with optional Langfuse monitoring.
+
+ Args:
+ api_key (str): OpenAI API key
+ endpoint (str): API endpoint
+ api_version (str): API version
+ model (str): Model identifier
+ transcription_model (str): Model for transcription
+ streaming (bool, optional): Enable streaming. Defaults to False.
+ """
self.aclient = instructor.from_litellm(litellm.acompletion)
self.client = instructor.from_litellm(litellm.completion)
self.transcription_model = transcription_model
self.model = model
self.api_key = api_key
self.endpoint = endpoint
self.api_version = api_version
self.streaming = streaming
- base_config = get_base_config()
- if base_config.monitoring_tool == MonitoringTool.LANGFUSE:
- self.aclient.success_callback = ["langfuse"]
- self.aclient.failure_callback = ["langfuse"]
- self.client.success_callback = ["langfuse"]
- self.client.failure_callback = ["langfuse"]
+ try:
+ base_config = get_base_config()
+ if base_config and base_config.monitoring_tool == MonitoringTool.LANGFUSE:
+ self._configure_langfuse_callbacks(self.aclient)
+ self._configure_langfuse_callbacks(self.client)
+ except Exception as e:
+ # Log the error but don't fail initialization
+ print(f"Warning: Failed to configure monitoring: {str(e)}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
base_config = get_base_config() | |
if base_config.monitoring_tool == MonitoringTool.LANGFUSE: | |
# set callbacks | |
# litellm.success_callback = ["langfuse"] | |
# litellm.failure_callback = ["langfuse"] | |
self.aclient.success_callback = ["langfuse"] | |
self.aclient.failure_callback = ["langfuse"] | |
self.client.success_callback = ["langfuse"] | |
self.client.failure_callback = ["langfuse"] | |
def _configure_langfuse_callbacks(self, client): | |
"""Configure Langfuse callbacks for the given client.""" | |
client.success_callback = ["langfuse"] | |
client.failure_callback = ["langfuse"] | |
def __init__( | |
self, | |
api_key: str, | |
endpoint: str, | |
api_version: str, | |
model: str, | |
transcription_model: str, | |
streaming: bool = False, | |
): | |
"""Initialize OpenAI adapter with optional Langfuse monitoring. | |
Args: | |
api_key (str): OpenAI API key | |
endpoint (str): API endpoint | |
api_version (str): API version | |
model (str): Model identifier | |
transcription_model (str): Model for transcription | |
streaming (bool, optional): Enable streaming. Defaults to False. | |
""" | |
self.aclient = instructor.from_litellm(litellm.acompletion) | |
self.client = instructor.from_litellm(litellm.completion) | |
self.transcription_model = transcription_model | |
self.model = model | |
self.api_key = api_key | |
self.endpoint = endpoint | |
self.api_version = api_version | |
self.streaming = streaming | |
try: | |
base_config = get_base_config() | |
if base_config and base_config.monitoring_tool == MonitoringTool.LANGFUSE: | |
self._configure_langfuse_callbacks(self.aclient) | |
self._configure_langfuse_callbacks(self.client) | |
except Exception as e: | |
# Log the error but don't fail initialization | |
print(f"Warning: Failed to configure monitoring: {str(e)}") |
💡 Codebase verification
Missing Langfuse environment documentation and host configuration
The Langfuse integration requires proper documentation and configuration:
- The required environment variables (
LANGFUSE_PUBLIC_KEY
,LANGFUSE_SECRET_KEY
,LANGFUSE_HOST
) are not documented in any.md
files LANGFUSE_HOST
is accessed directly withos.environ[]
instead ofos.getenv()
, which could cause crashes if not set
🔗 Analysis chain
Verify Langfuse configuration requirements
Since Langfuse monitoring is being added, we should ensure all necessary configuration is present.
Let me check the actual Langfuse configuration content to verify if all necessary settings are present.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for Langfuse configuration in base config and environment variables
# Check if BaseConfig contains Langfuse configuration
echo "Checking BaseConfig for Langfuse configuration..."
rg -l "langfuse" --type py
# Check for environment variable documentation
echo "Checking for environment variable documentation..."
find . -type f -name "*.md" -exec grep -l "LANGFUSE" {} \;
Length of output: 456
Script:
#!/bin/bash
# Check Langfuse configuration details
# Check BaseConfig for Langfuse settings
echo "=== BaseConfig Langfuse settings ==="
rg -A 5 -B 5 "langfuse" cognee/base_config.py
# Check environment variables and configuration in data models
echo -e "\n=== Data models Langfuse configuration ==="
rg -A 5 -B 5 "langfuse" cognee/shared/data_models.py
# Check if Langfuse secret/public key settings exist
echo -e "\n=== Environment variables for Langfuse ==="
rg -i "LANGFUSE.*KEY|LANGFUSE.*SECRET" .
Length of output: 1434
base_config = get_base_config() | ||
if base_config.monitoring_tool == MonitoringTool.LANGFUSE: | ||
# set callbacks | ||
# litellm.success_callback = ["langfuse"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove these commented lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
id: UUID = Field(default_factory = uuid4) | ||
updated_at: Optional[datetime] = datetime.now(timezone.utc) | ||
id: UUID = Field(default_factory=uuid4) | ||
created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between datetime.now(timezone.utc)
and this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created_at is when the initial record was created, updated at is any change that happens
id: UUID = Field(default_factory=uuid4) | ||
created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | ||
updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | ||
version: str = "0.1" # Default version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep it as a number, and we can just increase it with each version. (1, 2, 3, 4...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk
created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | ||
updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | ||
version: str = "0.1" # Default version | ||
source: Optional[str] = None # Path to file, URL, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source
is a Document
model related property, doesn't belong to this general DataPoint
model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair
updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) | ||
version: str = "0.1" # Default version | ||
source: Optional[str] = None # Path to file, URL, etc. | ||
type: Optional[str] = "text" # "text", "file", "image", "video" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for type, doesn't belong here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Laslzo asked me for this one, due to retriever logic. In general I agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the only thing is that the type should be the pydantic type. Like "Entity", "TextSummary" etc.
def update_version(self, new_version: str): | ||
"""Update the version and updated_at timestamp.""" | ||
self.version = new_version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have a number as a version, we can do +1
here then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
# JSON Serialization | ||
def to_json(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this serialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you can parallelize tasks, since you had issues with that. Pickle or json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cognee/infrastructure/engine/models/DataPoint.py (1)
71-80
:⚠️ Potential issueSecurity concern: Remove pickle serialization
Using pickle for serialization poses a security risk as it can execute arbitrary code during deserialization. Since JSON serialization is already implemented and serves the same purpose, the pickle methods should be removed.
- # Pickle Serialization - def to_pickle(self) -> bytes: - """Serialize the instance to pickle-compatible bytes.""" - return pickle.dumps(self.dict()) - - @classmethod - def from_pickle(self, pickled_data: bytes): - """Deserialize the instance from pickled bytes.""" - data = pickle.loads(pickled_data) - return self(**data)If binary serialization is needed for parallelization (as mentioned in past comments), consider using MessagePack or Protocol Buffers instead.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cognee/base_config.py
(1 hunks)cognee/infrastructure/engine/models/DataPoint.py
(2 hunks)cognee/infrastructure/llm/openai/adapter.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cognee/infrastructure/llm/openai/adapter.py
- cognee/base_config.py
🔇 Additional comments (4)
cognee/infrastructure/engine/models/DataPoint.py (4)
20-21
: Add validation for timestamp fieldsThe timestamp fields should validate against negative values.
- created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) - updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) + created_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0 + ) + updated_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0 + )
82-89
: LGTM: Dict serialization methods are well implementedThe implementation correctly uses Pydantic's model_dump and model_validate methods, following best practices for dictionary serialization.
16-18
: Add tests for versioning functionalityThe new versioning feature needs test coverage to ensure correct behavior, especially for:
- Version incrementation
- Timestamp updates
- Serialization/deserialization of versioned objects
Would you like me to help create test cases for these scenarios?
56-59
:⚠️ Potential issueUpdate version increment logic for integer version
Since version is being changed to an integer, the increment logic needs to be updated.
def update_version(self): """Update the version and updated_at timestamp.""" - self.version += 1 + self.version = self.version + 1 # Explicit increment for clarity self.updated_at = int(datetime.now(timezone.utc).timestamp() * 1000)Likely invalid or redundant comment.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9573981 | Triggered | Generic Password | 58526a6 | notebooks/cognee_graphiti_demo.ipynb | View secret |
8719688 | Triggered | Generic Password | 58526a6 | notebooks/cognee_graphiti_demo.ipynb | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
cognee/infrastructure/engine/models/DataPoint.py (3)
9-10
:⚠️ Potential issueSecurity Risk: Avoid using pickle for serialization
Using
pickle
for serialization poses a security risk as it can execute arbitrary code during deserialization. Consider using JSON serialization which is already implemented in the class.Since JSON serialization methods are already implemented (
to_json
,from_json
), we should remove the pickle-based methods entirely.
20-21
: 🛠️ Refactor suggestionAdd validation for timestamp fields
The timestamp fields should validate against negative values.
- created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) - updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) + created_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0, + description="Creation timestamp in milliseconds since epoch" + ) + updated_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0, + description="Last update timestamp in milliseconds since epoch" + )
71-80
:⚠️ Potential issueRemove pickle serialization methods
These methods should be removed as they pose security risks. Use the JSON or dict serialization methods instead.
🧹 Nitpick comments (2)
cognee/infrastructure/engine/models/DataPoint.py (2)
23-23
: Document type field constraintsSince the type field was specifically requested for retriever logic, please add documentation about its purpose and valid values.
- type: Optional[str] = "text" # "text", "file", "image", "video" + type: Optional[str] = Field( + default="text", + description="Content type for retriever logic. Valid values: text, file, image, video" + )
Line range hint
34-42
: Add docstring to get_embeddable_data methodFor consistency with other methods, please add a docstring explaining the purpose and return value of this method.
@classmethod def get_embeddable_data(self, data_point): + """Retrieve the primary embeddable data from the data point. + + Args: + data_point: The data point instance to extract data from + + Returns: + The stripped string value of the first index field if it's a string, + otherwise returns the raw value + """ if data_point._metadata and len(data_point._metadata["index_fields"]) > 0 \
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
cognee/infrastructure/engine/models/DataPoint.py
(2 hunks)
🔇 Additional comments (4)
cognee/infrastructure/engine/models/DataPoint.py (4)
30-33
: LGTM: Proper configuration for private attributesThe Pydantic configuration correctly handles the private _metadata field.
56-59
: LGTM: Clean version update implementationThe version update implementation correctly follows the numeric versioning approach as discussed in previous comments.
61-69
: LGTM: Secure JSON serialization implementationThe JSON serialization methods correctly utilize Pydantic's built-in functionality.
82-89
: LGTM: Proper dictionary serialization implementationThe dictionary serialization methods correctly use Pydantic's model_dump and model_validate functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
cognee/infrastructure/llm/openai/adapter.py (1)
Fix indentation issues throughout the file
The following formatting issues need to be addressed:
- Remove indentation for class-level attributes (
name
,model
,api_key
,api_version
)- Fix method parameter indentation to use 4 spaces instead of 8 (e.g., in
__init__
,acreate_structured_output
)- Normalize indentation in multi-line string content (e.g., in the
content
field of messages)🔗 Analysis chain
Line range hint
1-200
: Fix formatting issuesThe pipeline indicates formatting issues that need to be addressed with Ruff formatter.
Run the following command to fix formatting:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check specific formatting issues in this file rg --line-number '[ ]{2,}' cognee/infrastructure/llm/openai/adapter.pyLength of output: 5223
🧰 Tools
🪛 GitHub Actions: ruff format
[error] 308 files require formatting according to Ruff standards. Run Ruff formatter to fix formatting issues.
♻️ Duplicate comments (1)
cognee/infrastructure/llm/openai/adapter.py (1)
48-53
: 🛠️ Refactor suggestionRefactor callback configuration
Previous review comments already suggested refactoring this callback configuration pattern.
As previously suggested, consider extracting this into a separate method:
+ def _configure_langfuse_callbacks(self, client): + """Configure Langfuse callbacks for the given client.""" + client.success_callback = ["langfuse"] + client.failure_callback = ["langfuse"] def __init__(self, ...): # ... try: if base_config.monitoring_tool == MonitoringTool.LANGFUSE: - self.aclient.success_callback = ["langfuse"] - self.aclient.failure_callback = ["langfuse"] - self.client.success_callback = ["langfuse"] - self.client.failure_callback = ["langfuse"] + self._configure_langfuse_callbacks(self.aclient) + self._configure_langfuse_callbacks(self.client) except Exception as e: print(f"Warning: Failed to configure monitoring: {str(e)}")🧰 Tools
🪛 GitHub Actions: ruff format
[error] 308 files require formatting according to Ruff standards. Run Ruff formatter to fix formatting issues.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
cognee/infrastructure/llm/openai/adapter.py
(2 hunks)
🧰 Additional context used
🪛 GitHub Actions: ruff format
cognee/infrastructure/llm/openai/adapter.py
[error] 308 files require formatting according to Ruff standards. Run Ruff formatter to fix formatting issues.
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
|
||
|
||
@observe(as_type='generation') | ||
async def acreate_structured_output(self, text_input: str, system_prompt: str, | ||
response_model: Type[BaseModel]) -> BaseModel: | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Standardize decorator usage and add error handling
The @observe
decorator usage is inconsistent between methods and lacks error handling:
acreate_structured_output
usesas_type='generation'
create_structured_output
uses default parameters- Missing fallback if decorator import fails
Consider standardizing the decorator usage:
- @observe(as_type='generation')
+ @observe(as_type='generation', name='async_structured_output')
async def acreate_structured_output(self, text_input: str, system_prompt: str,
response_model: Type[BaseModel]) -> BaseModel:
"""Generate a response from a user query."""
- @observe
+ @observe(as_type='generation', name='structured_output')
def create_structured_output(self, text_input: str, system_prompt: str,
response_model: Type[BaseModel]) -> BaseModel:
Also, add docstring documentation for the monitoring functionality:
def create_structured_output(self, text_input: str, system_prompt: str,
response_model: Type[BaseModel]) -> BaseModel:
"""Generate a response from a user query.
Args:
text_input (str): The input text to process
system_prompt (str): The system prompt to use
response_model (Type[BaseModel]): The Pydantic model for response validation
Returns:
BaseModel: The structured response
Note:
This method is monitored by Langfuse when monitoring_tool is configured.
"""
🧰 Tools
🪛 GitHub Actions: ruff format
[error] 308 files require formatting according to Ruff standards. Run Ruff formatter to fix formatting issues.
|
||
monitoring = get_base_config().monitoring_tool | ||
if monitoring == MonitoringTool.LANGFUSE: | ||
from langfuse.decorators import observe | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve initialization robustness and error handling
The global configuration setup has several potential issues:
- Using global variables for configuration can lead to initialization order problems
- Missing error handling for
get_base_config()
- Potential circular dependency with
base_config
import
Consider moving this logic into a class method:
-monitoring = get_base_config().monitoring_tool
-if monitoring == MonitoringTool.LANGFUSE:
- from langfuse.decorators import observe
+def get_monitoring_config():
+ try:
+ config = get_base_config()
+ return config.monitoring_tool if config else None
+ except Exception as e:
+ print(f"Warning: Failed to load monitoring configuration: {str(e)}")
+ return None
+
+def import_langfuse():
+ try:
+ from langfuse.decorators import observe
+ return observe
+ except ImportError:
+ print("Warning: Langfuse package not installed")
+ return lambda x: x # No-op decorator
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
monitoring = get_base_config().monitoring_tool | |
if monitoring == MonitoringTool.LANGFUSE: | |
from langfuse.decorators import observe | |
def get_monitoring_config(): | |
try: | |
config = get_base_config() | |
return config.monitoring_tool if config else None | |
except Exception as e: | |
print(f"Warning: Failed to load monitoring configuration: {str(e)}") | |
return None | |
def import_langfuse(): | |
try: | |
from langfuse.decorators import observe | |
return observe | |
except ImportError: | |
print("Warning: Langfuse package not installed") | |
return lambda x: x # No-op decorator | |
🧰 Tools
🪛 GitHub Actions: ruff format
[error] 308 files require formatting according to Ruff standards. Run Ruff formatter to fix formatting issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
cognee/infrastructure/engine/models/DataPoint.py (1)
Fix formatting issues in DataPoint.py
The following formatting issues need to be addressed:
- Remove consecutive blank lines at the start of the file and between class methods
- Fix lines exceeding 88 characters:
- Line 21-22:
created_at
andupdated_at
timestamp creation- Line 51: List comprehension in
get_embeddable_properties
- Fix inconsistent spacing before comments (e.g., " # JSON Serialization")
🔗 Analysis chain
Line range hint
1-96
: Fix formatting issuesThe file requires formatting using Ruff formatter as indicated by the pipeline failure.
Run the following to fix formatting:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check and fix formatting issues ruff format cognee/infrastructure/engine/models/DataPoint.pyLength of output: 157
Script:
#!/bin/bash # Check for multiple consecutive blank lines and line lengths echo "=== Checking blank lines ===" grep -A 1 "^$" cognee/infrastructure/engine/models/DataPoint.py echo -e "\n=== Checking line lengths ===" awk 'length($0) > 88 { print NR": "length($0)" chars: "$0 }' cognee/infrastructure/engine/models/DataPoint.py echo -e "\n=== Checking indentation and spacing ===" grep -n "^[[:space:]]*class" cognee/infrastructure/engine/models/DataPoint.pyLength of output: 1575
🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
♻️ Duplicate comments (2)
cognee/infrastructure/engine/models/DataPoint.py (2)
21-22
: 🛠️ Refactor suggestionImprove timestamp field validation
The timestamp fields should validate against negative values.
- created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) - updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)) + created_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0, + description="Creation timestamp in milliseconds since epoch" + ) + updated_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000), + ge=0, + description="Last update timestamp in milliseconds since epoch" + )🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
77-86
:⚠️ Potential issueReplace pickle serialization with secure alternatives
Using pickle for serialization is a security risk as it can execute arbitrary code during deserialization.
- # Pickle Serialization - def to_pickle(self) -> bytes: - """Serialize the instance to pickle-compatible bytes.""" - return pickle.dumps(self.dict()) - - @classmethod - def from_pickle(self, pickled_data: bytes): - """Deserialize the instance from pickled bytes.""" - data = pickle.loads(pickled_data) - return self(**data) + def to_bytes(self) -> bytes: + """Serialize the instance to bytes using JSON.""" + return self.json().encode('utf-8') + + @classmethod + def from_bytes(cls, data: bytes) -> "DataPoint": + """Deserialize the instance from JSON bytes.""" + return cls.parse_raw(data)🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
🧹 Nitpick comments (3)
cognee/infrastructure/engine/models/DataPoint.py (3)
24-24
: Use Enum for type fieldUsing string literals for type is error-prone. Consider using an Enum for better type safety and IDE support.
+from enum import Enum + +class DataPointType(str, Enum): + TEXT = "text" + FILE = "file" + IMAGE = "image" + VIDEO = "video" - type: Optional[str] = "text" # "text", "file", "image", "video" + type: Optional[DataPointType] = Field( + default=DataPointType.TEXT, + description="Type of the data point" + )🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
Line range hint
32-61
: Improve method documentation and null safetyThe methods have inconsistent documentation and could benefit from better null checking.
- Add docstring to
get_embeddable_data
- Add type hints to method parameters
- Add null checks for
_metadata
access@classmethod - def get_embeddable_data(self, data_point): + def get_embeddable_data(cls, data_point: "DataPoint") -> Optional[Any]: + """Retrieve the primary embeddable data from the data point. + + Args: + data_point: The data point to extract data from + + Returns: + The primary embeddable data or None if not available + """ if ( - data_point._metadata + data_point._metadata is not None and len(data_point._metadata["index_fields"]) > 0 and hasattr(data_point, data_point._metadata["index_fields"][0]) ):🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
88-96
: Add specific type hints for dictionary methodsThe dictionary conversion methods could benefit from more specific type hints.
- def to_dict(self, **kwargs) -> Dict[str, Any]: + def to_dict( + self, + *, + exclude_unset: bool = False, + exclude_defaults: bool = False, + **kwargs + ) -> Dict[str, Any]: """Serialize model to a dictionary.""" return self.model_dump(**kwargs)🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cognee/base_config.py
(1 hunks)cognee/infrastructure/engine/models/DataPoint.py
(3 hunks)cognee/infrastructure/llm/openai/adapter.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cognee/base_config.py
- cognee/infrastructure/llm/openai/adapter.py
🧰 Additional context used
🪛 GitHub Actions: ruff format
cognee/infrastructure/engine/models/DataPoint.py
[warning] File requires formatting using Ruff formatter
⏰ Context from checks skipped due to timeout of 90000ms (15)
- GitHub Check: run_dynamic_steps_example_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: test
- GitHub Check: run_multimedia_example_test / test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: run_notebook_test / test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: docker-compose-test
🔇 Additional comments (1)
cognee/infrastructure/engine/models/DataPoint.py (1)
62-66
: LGTM! Clean version update implementationThe version update method is well-implemented with clear responsibility and proper timestamp update.
🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
|
||
|
||
from datetime import datetime, timezone | ||
from typing import Optional | ||
from typing import Optional, Any, Dict | ||
from uuid import UUID, uuid4 | ||
|
||
from pydantic import BaseModel, Field | ||
from typing_extensions import TypedDict | ||
import pickle | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Organize imports and consider security implications
The imports should be organized according to the standard convention (stdlib, third-party, local). Also, using pickle
for serialization poses security risks as it can execute arbitrary code during deserialization.
-
-
from datetime import datetime, timezone
from typing import Optional, Any, Dict
from uuid import UUID, uuid4
+from datetime import datetime, timezone
+from typing import Optional, Any, Dict
+from uuid import UUID, uuid4
+
+import json
+import pickle # Consider removing in favor of json
+
+from pydantic import BaseModel, Field
+from typing_extensions import TypedDict
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
from datetime import datetime, timezone | |
from typing import Optional | |
from typing import Optional, Any, Dict | |
from uuid import UUID, uuid4 | |
from pydantic import BaseModel, Field | |
from typing_extensions import TypedDict | |
import pickle | |
from datetime import datetime, timezone | |
from typing import Optional, Any, Dict | |
from uuid import UUID, uuid4 | |
import json | |
import pickle # Consider removing in favor of json | |
from pydantic import BaseModel, Field | |
from typing_extensions import TypedDict |
🧰 Tools
🪛 GitHub Actions: ruff format
[warning] File requires formatting using Ruff formatter
Summary by CodeRabbit
New Features
DataPoint
model with versioning and timestamp trackingImprovements
Technical Updates