Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cog 337 llama index support #186

Merged
merged 21 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ec5fd80
feat: Add support for LlamaIndex Document type
dexters1 Nov 6, 2024
b05a07f
docs: Add Jupyer Notebook for cognee with llama index document type
dexters1 Nov 6, 2024
93445bf
feat: Add metadata migration from LlamaIndex document type
dexters1 Nov 7, 2024
804ca26
refactor: Change llama index migration function name
dexters1 Nov 7, 2024
c151d7b
chore: Add llama index core dependency
dexters1 Nov 7, 2024
fb84aa0
Merge branch 'COG-337-llama-index-support' of github.com:topoteretes/…
dexters1 Nov 7, 2024
7d222ee
Feature: Add ingest_data_with_metadata task
dexters1 Nov 7, 2024
094e45d
docs: Add description on why specific type checking is done
dexters1 Nov 7, 2024
d5aa316
fix: Add missing parameter to function call
dexters1 Nov 7, 2024
98dc18c
refactor: Move storing of data from async to sync function
dexters1 Nov 7, 2024
2615b35
refactor: Pretend ingest_data was changes instead of having two tasks
dexters1 Nov 7, 2024
e4c39e8
refactor: Use old name for data ingestion with metadata
dexters1 Nov 7, 2024
733caa7
refactor: Return ingest_data and save_data_to_storage Tasks
dexters1 Nov 8, 2024
8c7fdc4
refactor: Return previous ingestion Tasks to add function
dexters1 Nov 8, 2024
f13146d
Merge branch 'main' of github.com:topoteretes/cognee into COG-337-lla…
dexters1 Nov 14, 2024
cc53693
fix: Remove dict and use string for search query
dexters1 Nov 14, 2024
af99e4f
refactor: Add changes request in pull request
dexters1 Nov 15, 2024
417b119
Merge branch 'main' of github.com:topoteretes/cognee into COG-337-lla…
dexters1 Nov 15, 2024
4c7a4e2
fix: Resolve issue with llama-index being mandatory
dexters1 Nov 15, 2024
c4700b1
fix: Add install of llama-index to notebook
dexters1 Nov 15, 2024
f6e8294
Merge branch 'main' into COG-337-llama-index-support
borisarzentar Nov 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .DS_Store
Binary file not shown.
63 changes: 63 additions & 0 deletions .github/workflows/test_cognee_llama_index_notebook.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: test | llama index notebook

on:
workflow_dispatch:
pull_request:
branches:
- main
types: [labeled, synchronize]


concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
RUNTIME__LOG_LEVEL: ERROR

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml

run_notebook_test:
name: test
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' && github.event.label.name == 'run-checks'
runs-on: ubuntu-latest
defaults:
run:
shell: bash
steps:
- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11.x'

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Install dependencies
run: |
poetry install --no-interaction --all-extras --no-root
poetry add jupyter --no-interaction
Comment on lines +35 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add Poetry caching for faster workflow execution.

Consider these improvements for the Python/Poetry setup:

  1. Add Poetry cache to speed up dependency installation
  2. Pin Poetry version more specifically for better reproducibility
 - name: Setup Python
   uses: actions/setup-python@v5
   with:
     python-version: '3.11.x'

+- name: Cache Poetry
+  uses: actions/cache@v3
+  with:
+    path: ~/.cache/pypoetry
+    key: ${{ runner.os }}-poetry-${{ hashFiles('**/poetry.lock') }}
+    restore-keys: |
+      ${{ runner.os }}-poetry-

 - name: Install Poetry
-  uses: snok/[email protected]
+  uses: snok/[email protected]
   with:
     virtualenvs-create: true
     virtualenvs-in-project: true
     installer-parallel: true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11.x'
- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
- name: Install dependencies
run: |
poetry install --no-interaction --all-extras --no-root
poetry add jupyter --no-interaction
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11.x'
- name: Cache Poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry
key: ${{ runner.os }}-poetry-${{ hashFiles('**/poetry.lock') }}
restore-keys: |
${{ runner.os }}-poetry-
- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
- name: Install dependencies
run: |
poetry install --no-interaction --all-extras --no-root
poetry add jupyter --no-interaction

- name: Execute Jupyter Notebook
env:
ENV: 'dev'
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GRAPHISTRY_USERNAME: ${{ secrets.GRAPHISTRY_USERNAME }}
GRAPHISTRY_PASSWORD: ${{ secrets.GRAPHISTRY_PASSWORD }}
run: |
poetry run jupyter nbconvert \
--to notebook \
--execute notebooks/cognee_llama_index.ipynb \
--output executed_notebook.ipynb \
--ExecutePreprocessor.timeout=1200
Comment on lines +52 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance notebook execution error handling and fix EOF.

  1. The 20-minute timeout suggests potential performance concerns. Consider adding error handling and output capture.
  2. Add a newline at the end of file.
       run: |
         poetry run jupyter nbconvert \
         --to notebook \
         --execute notebooks/cognee_llama_index.ipynb \
         --output executed_notebook.ipynb \
-        --ExecutePreprocessor.timeout=1200
+        --ExecutePreprocessor.timeout=1200 \
+        || {
+          echo "::error::Notebook execution failed"
+          cat executed_notebook.ipynb
+          exit 1
+        }
+
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- name: Execute Jupyter Notebook
env:
ENV: 'dev'
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GRAPHISTRY_USERNAME: ${{ secrets.GRAPHISTRY_USERNAME }}
GRAPHISTRY_PASSWORD: ${{ secrets.GRAPHISTRY_PASSWORD }}
run: |
poetry run jupyter nbconvert \
--to notebook \
--execute notebooks/cognee_llama_index.ipynb \
--output executed_notebook.ipynb \
--ExecutePreprocessor.timeout=1200
- name: Execute Jupyter Notebook
env:
ENV: 'dev'
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GRAPHISTRY_USERNAME: ${{ secrets.GRAPHISTRY_USERNAME }}
GRAPHISTRY_PASSWORD: ${{ secrets.GRAPHISTRY_PASSWORD }}
run: |
poetry run jupyter nbconvert \
--to notebook \
--execute notebooks/cognee_llama_index.ipynb \
--output executed_notebook.ipynb \
--ExecutePreprocessor.timeout=1200 \
|| {
echo "::error::Notebook execution failed"
cat executed_notebook.ipynb
exit 1
}
🧰 Tools
🪛 yamllint

[error] 63-63: no new line character at the end of file

(new-line-at-end-of-file)

Binary file removed cognee/.DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion cognee/api/v1/add/add_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_nam
pipeline = run_tasks(tasks, data, "add_pipeline")

async for result in pipeline:
print(result)
print(result)
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ async def check_permission_on_documents(user: User, permission_type: str, docume
has_permissions = all(document_id in resource_ids for document_id in document_ids)

if not has_permissions:
raise PermissionDeniedException(f"User {user.username} does not have {permission_type} permission on documents")
raise PermissionDeniedException(f"User {user.email} does not have {permission_type} permission on documents")
0xideas marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions cognee/tasks/ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .ingest_data import ingest_data
from .save_data_to_storage import save_data_to_storage
from .save_data_item_to_storage import save_data_item_to_storage
from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage
2 changes: 1 addition & 1 deletion cognee/tasks/ingestion/ingest_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import get_relational_config, get_relational_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import create_dataset
from cognee.modules.users.permissions.methods import give_permission_on_document
from .get_dlt_destination import get_dlt_destination
Expand Down
92 changes: 92 additions & 0 deletions cognee/tasks/ingestion/ingest_data_with_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import dlt
import cognee.modules.ingestion as ingestion
from typing import Any
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import create_dataset
from cognee.modules.users.permissions.methods import give_permission_on_document
from .get_dlt_destination import get_dlt_destination
from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage

async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = get_dlt_destination()

pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
)

@dlt.resource(standalone = True, merge_key = "id")
async def data_resources(data: Any, user: User):
if not isinstance(data, list):
# Convert data to a list as we work with lists further down.
data = [data]

# Process data
for data_item in data:

file_path = save_data_item_with_metadata_to_storage(data_item, dataset_name)

# Ingest data and add metadata
with open(file_path.replace("file://", ""), mode = "rb") as file:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a Robust Method to Parse File URIs

The current approach of using file_path.replace("file://", "") may not handle all edge cases of file URIs, especially across different operating systems. Consider using the urllib.parse or pathlib modules to reliably parse the file path from the URI.

Here's how you might adjust the code:

+from urllib.parse import urlparse
+parsed_url = urlparse(file_path)
+file_system_path = parsed_url.path
-with open(file_path.replace("file://", ""), mode="rb") as file:
+with open(file_system_path, mode="rb") as file:

Committable suggestion skipped: line range outside the PR's diff.

classified_data = ingestion.classify(file)

Comment on lines +32 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add Exception Handling for File Operations

Opening a file can raise exceptions such as FileNotFoundError or PermissionError. To prevent the program from crashing, add exception handling around file operations to handle these potential errors gracefully.

I can help implement appropriate exception handling if needed.

data_id = ingestion.identify(classified_data)

file_metadata = classified_data.get_metadata()

from sqlalchemy import select
from cognee.modules.data.models import Data
0xideas marked this conversation as resolved.
Show resolved Hide resolved

db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session)

data_point = (await session.execute(
select(Data).filter(Data.id == data_id)
)).scalar_one_or_none()

if data_point is not None:
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]

await session.merge(data_point)
await session.commit()
else:
data_point = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
)

dataset.data.append(data_point)
await session.commit()

Comment on lines +44 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure Proper Transaction Management and Exception Handling in Database Operations

While using async sessions, it's important to handle exceptions that may occur during database operations to prevent partial commits or data inconsistencies. Consider adding exception handling and ensuring that transactions are properly managed.

I can assist in implementing robust exception handling for database transactions if you'd like.

Comment on lines +51 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to Reduce Code Duplication in Data Handling

The code blocks for updating an existing data_point and creating a new one share similar assignments to data_point attributes. To improve maintainability and readability, refactor the code to eliminate duplication.

Here is a suggested refactor:

 if data_point is not None:
-    data_point.name = file_metadata["name"]
-    data_point.raw_data_location = file_metadata["file_path"]
-    data_point.extension = file_metadata["extension"]
-    data_point.mime_type = file_metadata["mime_type"]
-
-    await session.merge(data_point)
-    await session.commit()
+    pass  # Existing data_point, no need to create a new one
 else:
     data_point = Data(
         id=data_id,
-        # These fields will be set below
     )
     dataset.data.append(data_point)
-    await session.commit()

+# Set or update shared attributes
+data_point.name = file_metadata["name"]
+data_point.raw_data_location = file_metadata["file_path"]
+data_point.extension = file_metadata["extension"]
+data_point.mime_type = file_metadata["mime_type"]

+await session.merge(data_point)
+await session.commit()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if data_point is not None:
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]
await session.merge(data_point)
await session.commit()
else:
data_point = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
)
dataset.data.append(data_point)
await session.commit()
if data_point is not None:
pass # Existing data_point, no need to create a new one
else:
data_point = Data(
id=data_id,
)
dataset.data.append(data_point)
# Set or update shared attributes
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]
await session.merge(data_point)
await session.commit()

yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
}

await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")


send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id)
run_info = pipeline.run(
data_resources(data, user),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
)
send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id)

return run_info
20 changes: 20 additions & 0 deletions cognee/tasks/ingestion/save_data_item_to_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Union, BinaryIO
from cognee.modules.ingestion import save_data_to_file

def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str) -> str:

# data is a file object coming from upload.
if hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)

elif isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_path = data_item.replace("file://", "")
Comment on lines +12 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Based on the gathered information, I can now provide the final response:

Add path validation and sanitization to prevent path traversal vulnerabilities

The current implementation lacks proper path validation, which could lead to security issues:

  • The file_path = data_item.replace("file://", "") operation is insufficient for security
  • No validation is performed to ensure paths are within allowed boundaries
  • No sanitization against directory traversal attacks (e.g., ../../../etc/passwd)

Required changes:

  • Use os.path.abspath() and verify the resolved path is within the allowed data directory
  • Add path sanitization to remove any potential directory traversal sequences
  • Implement path validation before file operations in save_data_item_to_storage.py

Example vulnerable operations found:

  • Direct path usage in ingest_data.py: open(file_path.replace("file://", ""), mode = "rb")
  • Unchecked path handling in save_data_item_to_storage.py
  • Raw path concatenation in save_data_to_file.py: storage_path = data_directory_path + "/" + dataset_name.replace(".", "/")
🔗 Analysis chain

Add path validation for security.

Direct handling of file paths without validation could lead to path traversal vulnerabilities. Consider:

  1. Validating the path is within allowed boundaries
  2. Sanitizing the path to prevent directory traversal

Let's check if there are any path validation utilities in the codebase:


Let me search for more specific path-related security implementations:


Let me check if there are any security-related utilities or path validation functions in the codebase:


Let me check how the file paths are being used after they're processed:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for path validation utilities
rg -l "(?i)(validate|sanitize|clean|check).*path"

Length of output: 63


Script:

#!/bin/bash
# Search for path-related security checks and normalizations
rg -i "path|directory" -A 3 -B 3 cognee/tasks/ingestion/

Length of output: 11512


Script:

#!/bin/bash
# Search for security-related utilities and path validation functions
rg -i "def.*secure|sanitize|validate|normalize.*path" -g "*.py"

# Also check for any security-related imports
rg -i "from.*security|import.*pathlib|import.*os\.path" -g "*.py"

Length of output: 1185


Script:

#!/bin/bash
# Check for file operations and how paths are used
rg -A 5 "open\(.*file_path" -g "*.py"

# Also check for the save_data_to_file implementation
rg -A 10 "def save_data_to_file" -g "*.py"

Length of output: 6430

# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
else:
raise ValueError(f"Data type not supported: {type(data_item)}")

return file_path
28 changes: 28 additions & 0 deletions cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Union, BinaryIO, Any
from cognee.modules.ingestion import save_data_to_file

def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str:
# Dynamic import is used because the llama_index module is optional.
# For the same reason Any is accepted as a data item
from llama_index.core import Document
from .transform_data import get_data_from_llama_index
Comment on lines +4 to +8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for dynamic imports.

The dynamic imports could fail silently if the optional dependencies are not installed. Consider adding explicit error handling.

-    from llama_index.core import Document
-    from .transform_data import get_data_from_llama_index
+    try:
+        from llama_index.core import Document
+        from .transform_data import get_data_from_llama_index
+    except ImportError as e:
+        raise ImportError(
+            "Failed to import required dependencies. "
+            "Please ensure llama_index is installed for Document support."
+        ) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str:
# Dynamic import is used because the llama_index module is optional.
# For the same reason Any is accepted as a data item
from llama_index.core import Document
from .transform_data import get_data_from_llama_index
def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str:
# Dynamic import is used because the llama_index module is optional.
# For the same reason Any is accepted as a data item
try:
from llama_index.core import Document
from .transform_data import get_data_from_llama_index
except ImportError as e:
raise ImportError(
"Failed to import required dependencies. "
"Please ensure llama_index is installed for Document support."
) from e


# Check if data is of type Document or any of it's subclasses
if isinstance(data_item, Document):
file_path = get_data_from_llama_index(data_item, dataset_name)

# data is a file object coming from upload.
elif hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)
Comment on lines +14 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add robust file object validation.

The current implementation makes assumptions about the file object structure without proper validation.

-    elif hasattr(data_item, "file"):
-        file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)
+    elif hasattr(data_item, "file") and hasattr(data_item, "filename"):
+        if not data_item.file:
+            raise ValueError("File object is empty")
+        if not data_item.filename:
+            raise ValueError("Filename is missing")
+        file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# data is a file object coming from upload.
elif hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)
# data is a file object coming from upload.
elif hasattr(data_item, "file") and hasattr(data_item, "filename"):
if not data_item.file:
raise ValueError("File object is empty")
if not data_item.filename:
raise ValueError("Filename is missing")
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)


elif isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_path = data_item.replace("file://", "")
# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
Comment on lines +18 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance string input validation and handling.

The current string handling could be improved with better file path validation and text content limits.

     elif isinstance(data_item, str):
+        if not data_item.strip():
+            raise ValueError("Empty string input")
+
         # data is a file path
         if data_item.startswith("file://") or data_item.startswith("/"):
             file_path = data_item.replace("file://", "")
+            # Verify file exists and is accessible
+            import os
+            if not os.path.isfile(file_path):
+                raise FileNotFoundError(f"File not found: {file_path}")
         # data is text
         else:
+            # Add reasonable size limit for text content
+            if len(data_item) > 10_000_000:  # 10MB limit
+                raise ValueError("Text content exceeds size limit")
             file_path = save_data_to_file(data_item, dataset_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
elif isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_path = data_item.replace("file://", "")
# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
elif isinstance(data_item, str):
if not data_item.strip():
raise ValueError("Empty string input")
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_path = data_item.replace("file://", "")
# Verify file exists and is accessible
import os
if not os.path.isfile(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# data is text
else:
# Add reasonable size limit for text content
if len(data_item) > 10_000_000: # 10MB limit
raise ValueError("Text content exceeds size limit")
file_path = save_data_to_file(data_item, dataset_name)

else:
raise ValueError(f"Data type not supported: {type(data_item)}")

return file_path
18 changes: 3 additions & 15 deletions cognee/tasks/ingestion/save_data_to_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Union, BinaryIO
from cognee.modules.ingestion import save_data_to_file
from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage

def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]:
if not isinstance(data, list):
Expand All @@ -9,19 +9,7 @@ def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]:
file_paths = []

for data_item in data:
# data is a file object coming from upload.
if hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename = data_item.filename)
file_paths.append(file_path)

if isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_paths.append(data_item.replace("file://", ""))

# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
file_paths.append(file_path)
file_path = save_data_item_to_storage(data_item, dataset_name)
file_paths.append(file_path)

return file_paths
18 changes: 18 additions & 0 deletions cognee/tasks/ingestion/transform_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from llama_index.core import Document
from llama_index.core.schema import ImageDocument
from cognee.modules.ingestion import save_data_to_file
from typing import Union

def get_data_from_llama_index(data_point: Union[Document, ImageDocument], dataset_name: str) -> str:
# Specific type checking is used to ensure it's not a child class from Document
if type(data_point) == Document:
borisarzentar marked this conversation as resolved.
Show resolved Hide resolved
file_path = data_point.metadata.get("file_path")
if file_path is None:
file_path = save_data_to_file(data_point.text, dataset_name)
return file_path
return file_path
elif type(data_point) == ImageDocument:
if data_point.image_path is None:
file_path = save_data_to_file(data_point.text, dataset_name)
return file_path
return data_point.image_path
Comment on lines +6 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to reduce code duplication and improve error handling

The function has similar logic patterns that could be consolidated, and it needs better error handling.

Consider this improved implementation:

 def get_data_from_llama_index(data_point: Union[Document, ImageDocument], dataset_name: str) -> str:
+    """Extract or generate a file path from a LlamaIndex document.
+
+    Args:
+        data_point: A Document or ImageDocument from LlamaIndex
+        dataset_name: Name of the dataset for file path generation
+
+    Returns:
+        str: Path to the file containing the document's content
+
+    Raises:
+        TypeError: If data_point is neither Document nor ImageDocument
+    """
+    if isinstance(data_point, ImageDocument):
+        return data_point.image_path or save_data_to_file(data_point.text, dataset_name)
+    elif isinstance(data_point, Document):
+        return data_point.metadata.get("file_path") or save_data_to_file(data_point.text, dataset_name)
+    else:
+        raise TypeError(f"Expected Document or ImageDocument, got {type(data_point).__name__}")

This refactored version:

  1. Uses the or operator for cleaner null-coalescing
  2. Adds proper error handling for invalid types
  3. Reduces code duplication
  4. Handles ImageDocument check first to avoid isinstance ambiguity
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_data_from_llama_index(data_point: Union[Document, ImageDocument], dataset_name: str) -> str:
# Specific type checking is used to ensure it's not a child class from Document
if type(data_point) == Document:
file_path = data_point.metadata.get("file_path")
if file_path is None:
file_path = save_data_to_file(data_point.text, dataset_name)
return file_path
return file_path
elif type(data_point) == ImageDocument:
if data_point.image_path is None:
file_path = save_data_to_file(data_point.text, dataset_name)
return file_path
return data_point.image_path
def get_data_from_llama_index(data_point: Union[Document, ImageDocument], dataset_name: str) -> str:
"""Extract or generate a file path from a LlamaIndex document.
Args:
data_point: A Document or ImageDocument from LlamaIndex
dataset_name: Name of the dataset for file path generation
Returns:
str: Path to the file containing the document's content
Raises:
TypeError: If data_point is neither Document nor ImageDocument
"""
if isinstance(data_point, ImageDocument):
return data_point.image_path or save_data_to_file(data_point.text, dataset_name)
elif isinstance(data_point, Document):
return data_point.metadata.get("file_path") or save_data_to_file(data_point.text, dataset_name)
else:
raise TypeError(f"Expected Document or ImageDocument, got {type(data_point).__name__}")
🧰 Tools
🪛 Ruff

8-8: Use is and is not for type comparisons, or isinstance() for isinstance checks

(E721)


14-14: Use is and is not for type comparisons, or isinstance() for isinstance checks

(E721)

3 changes: 1 addition & 2 deletions examples/python/simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ async def main():

# Query cognee for insights on the added text
search_results = await cognee.search(
SearchType.INSIGHTS,
{'query': 'Tell me about NLP'}
SearchType.INSIGHTS, query='Tell me about NLP'
)

# Display search results
Expand Down
Loading
Loading