Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cog 505 data dataset model changes #260

Merged
merged 14 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions .github/workflows/test_deduplication.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: test | deduplication

on:
workflow_dispatch:
pull_request:
branches:
- main
types: [labeled, synchronize]


concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
RUNTIME__LOG_LEVEL: ERROR

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml

run_deduplication_test:
name: test
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' && ${{ github.event.label.name == 'run-checks' }}
dexters1 marked this conversation as resolved.
Show resolved Hide resolved
runs-on: ubuntu-latest
defaults:
run:
shell: bash
services:
postgres:
image: pgvector/pgvector:pg17
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- name: Check out
uses: actions/checkout@master
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security: Use specific version for checkout action

Using @master for actions can be dangerous as it may introduce breaking changes or security issues.

-        uses: actions/checkout@master
+        uses: actions/checkout@v4
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
uses: actions/checkout@master
uses: actions/checkout@v4


- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11.x'

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Install dependencies
run: poetry install -E postgres --no-interaction

- name: Run deduplication test
env:
ENV: 'dev'
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: poetry run python ./cognee/tests/test_deduplication.py
8 changes: 4 additions & 4 deletions cognee/api/v1/add/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,25 @@ async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_nam

# data is text
else:
file_path = save_data_to_file(data, dataset_name)
file_path = save_data_to_file(data)
return await add([file_path], dataset_name)

if hasattr(data, "file"):
file_path = save_data_to_file(data.file, dataset_name, filename = data.filename)
file_path = save_data_to_file(data.file, filename = data.filename)
return await add([file_path], dataset_name)

# data is a list of file paths or texts
file_paths = []

for data_item in data:
if hasattr(data_item, "file"):
file_paths.append(save_data_to_file(data_item, dataset_name, filename = data_item.filename))
file_paths.append(save_data_to_file(data_item, filename = data_item.filename))
elif isinstance(data_item, str) and (
data_item.startswith("/") or data_item.startswith("file://")
):
file_paths.append(data_item)
elif isinstance(data_item, str):
file_paths.append(save_data_to_file(data_item, dataset_name))
file_paths.append(save_data_to_file(data_item))

if len(file_paths) > 0:
return await add_files(file_paths, dataset_name, user)
Expand Down
7 changes: 7 additions & 0 deletions cognee/infrastructure/files/utils/get_file_metadata.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from typing import BinaryIO, TypedDict
import hashlib
from .guess_file_type import guess_file_type
from cognee.shared.utils import get_file_content_hash


class FileMetadata(TypedDict):
name: str
file_path: str
mime_type: str
extension: str
content_hash: str

def get_file_metadata(file: BinaryIO) -> FileMetadata:
"""Get metadata from a file"""
file.seek(0)
content_hash = get_file_content_hash(file)
file.seek(0)

file_type = guess_file_type(file)

file_path = file.name
Expand All @@ -21,4 +27,5 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata:
file_path = file_path,
mime_type = file_type.mime,
extension = file_type.extension,
content_hash = content_hash,
)
3 changes: 2 additions & 1 deletion cognee/modules/data/models/Data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime, timezone
from typing import List
from uuid import uuid4

from sqlalchemy import UUID, Column, DateTime, String
from sqlalchemy.orm import Mapped, relationship

Expand All @@ -19,6 +18,8 @@ class Data(Base):
extension = Column(String)
mime_type = Column(String)
raw_data_location = Column(String)
owner_id = Column(UUID, index=True)
content_hash = Column(String)
borisarzentar marked this conversation as resolved.
Show resolved Hide resolved
created_at = Column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
Expand Down
1 change: 0 additions & 1 deletion cognee/modules/data/operations/write_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import re
import warnings
from typing import Any
from uuid import UUID
from sqlalchemy import select
from typing import Any, BinaryIO, Union
Expand Down
2 changes: 1 addition & 1 deletion cognee/modules/ingestion/data_types/BinaryData.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, data: BinaryIO, name: str = None):
def get_identifier(self):
metadata = self.get_metadata()

return self.name + "." + metadata["extension"]
return metadata["content_hash"]
dexters1 marked this conversation as resolved.
Show resolved Hide resolved

def get_metadata(self):
self.ensure_metadata()
Expand Down
8 changes: 5 additions & 3 deletions cognee/modules/ingestion/identify.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from uuid import uuid5, NAMESPACE_OID
from .data_types import IngestionData

def identify(data: IngestionData) -> str:
data_id: str = data.get_identifier()
from cognee.modules.users.models import User

return uuid5(NAMESPACE_OID, data_id)
def identify(data: IngestionData, user: User) -> str:
data_content_hash: str = data.get_identifier()
# return UUID hash of file contents + owner id
return uuid5(NAMESPACE_OID,f"{data_content_hash}{user.id}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some space is missing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say we need owner_id for permissions regarding some Data operations as they are no longer tied to Datasets.

content_hash we don't need right now, but might be nice to have as it's the only way to verify the content of the Data

19 changes: 11 additions & 8 deletions cognee/modules/ingestion/save_data_to_file.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import string
import random
import os.path
import hashlib
from typing import BinaryIO, Union
from cognee.base_config import get_base_config
from cognee.infrastructure.files.storage import LocalStorage
from .classify import classify

def save_data_to_file(data: Union[str, BinaryIO], dataset_name: str, filename: str = None):
def save_data_to_file(data: Union[str, BinaryIO], filename: str = None):
base_config = get_base_config()
data_directory_path = base_config.data_root_directory

classified_data = classify(data, filename)

storage_path = data_directory_path + "/" + dataset_name.replace(".", "/")
storage_path = os.path.join(data_directory_path, "data")
LocalStorage.ensure_directory_exists(storage_path)

file_metadata = classified_data.get_metadata()
if "name" not in file_metadata or file_metadata["name"] is None:
letters = string.ascii_lowercase
random_string = "".join(random.choice(letters) for _ in range(32))
file_metadata["name"] = "text_" + random_string + ".txt"
data_contents = classified_data.get_data().encode('utf-8')
hash_contents = hashlib.md5(data_contents).hexdigest()
file_metadata["name"] = "text_" + hash_contents + ".txt"
file_name = file_metadata["name"]
dexters1 marked this conversation as resolved.
Show resolved Hide resolved
LocalStorage(storage_path).store(file_name, classified_data.get_data())

# Don't save file if it already exists
if not os.path.isfile(os.path.join(storage_path, file_name)):
LocalStorage(storage_path).store(file_name, classified_data.get_data())

return "file://" + storage_path + "/" + file_name
23 changes: 23 additions & 0 deletions cognee/shared/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
""" This module contains utility functions for the cognee. """
import os
from typing import BinaryIO, Union

import requests
import hashlib
from datetime import datetime, timezone
import graphistry
import networkx as nx
Expand Down Expand Up @@ -70,6 +73,26 @@ def num_tokens_from_string(string: str, encoding_name: str) -> int:
num_tokens = len(encoding.encode(string))
return num_tokens

def get_file_content_hash(file_obj: Union[str, BinaryIO]) -> str:
h = hashlib.md5()

if isinstance(file_obj, str):
with open(file_obj, 'rb') as file:
while True:
# Reading is buffered, so we can read smaller chunks.
chunk = file.read(h.block_size)
if not chunk:
break
h.update(chunk)
else:
while True:
# Reading is buffered, so we can read smaller chunks.
chunk = file_obj.read(h.block_size)
if not chunk:
break
h.update(chunk)

return h.hexdigest()
dexters1 marked this conversation as resolved.
Show resolved Hide resolved

def trim_text_to_max_tokens(text: str, max_tokens: int, encoding_name: str) -> str:
"""
Expand Down
37 changes: 27 additions & 10 deletions cognee/tasks/ingestion/ingest_data_with_metadata.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Any
from typing import Any, List

import dlt
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import create_dataset
from cognee.modules.data.operations.delete_metadata import delete_metadata
from cognee.modules.data.models.DatasetData import DatasetData
from cognee.modules.users.models import User
from cognee.modules.users.permissions.methods import give_permission_on_document
from cognee.shared.utils import send_telemetry
Expand All @@ -23,19 +23,21 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = destination,
)

@dlt.resource(standalone=True, merge_key="id")
async def data_resources(file_paths: str):
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
async def data_resources(file_paths: List[str], user: User):
for file_path in file_paths:
with open(file_path.replace("file://", ""), mode="rb") as file:
classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data)
data_id = ingestion.identify(classified_data, user)
file_metadata = classified_data.get_metadata()
yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
"content_hash": file_metadata["content_hash"],
"owner_id": str(user.id),
}

async def data_storing(data: Any, dataset_name: str, user: User):
Expand All @@ -57,7 +59,8 @@ async def data_storing(data: Any, dataset_name: str, user: User):
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)

data_id = ingestion.identify(classified_data)
# data_id is the hash of file contents + owner id to avoid duplicate data
data_id = ingestion.identify(classified_data, user)

file_metadata = classified_data.get_metadata()

Expand All @@ -70,6 +73,7 @@ async def data_storing(data: Any, dataset_name: str, user: User):
async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session)

# Check to see if data should be updated
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
Expand All @@ -79,17 +83,29 @@ async def data_storing(data: Any, dataset_name: str, user: User):
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]
data_point.owner_id = user.id
data_point.content_hash = file_metadata["content_hash"]
await session.merge(data_point)
else:
data_point = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"]
mime_type = file_metadata["mime_type"],
owner_id = user.id,
content_hash = file_metadata["content_hash"],
)

# Check if data is already in dataset
dataset_data = (
await session.execute(select(DatasetData).filter(DatasetData.data_id == data_id,
DatasetData.dataset_id == dataset.id))
).scalar_one_or_none()
# If data is not present in dataset add it
if dataset_data is None:
dataset.data.append(data_point)

await session.commit()
await write_metadata(data_item, data_point.id, file_metadata)

Expand All @@ -109,16 +125,17 @@ async def data_storing(data: Any, dataset_name: str, user: User):
# To use sqlite with dlt dataset_name must be set to "main".
# Sqlite doesn't support schemas
run_info = pipeline.run(
data_resources(file_paths),
data_resources(file_paths, user),
table_name="file_metadata",
dataset_name="main",
write_disposition="merge",
)
else:
# Data should be stored in the same schema to allow deduplication
run_info = pipeline.run(
data_resources(file_paths),
data_resources(file_paths, user),
table_name="file_metadata",
dataset_name=dataset_name,
dataset_name="public",
write_disposition="merge",
)

Expand Down
4 changes: 2 additions & 2 deletions cognee/tasks/ingestion/save_data_item_to_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str

# data is a file object coming from upload.
if hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)
file_path = save_data_to_file(data_item.file, filename=data_item.filename)

elif isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_path = data_item.replace("file://", "")
# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
file_path = save_data_to_file(data_item)
else:
raise IngestionError(message=f"Data type not supported: {type(data_item)}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def save_data_item_with_metadata_to_storage(
# data is a file object coming from upload.
elif hasattr(data_item, "file"):
file_path = save_data_to_file(
data_item.file, dataset_name, filename=data_item.filename
data_item.file, filename=data_item.filename
)

Comment on lines +20 to 22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation for data_item

The function should validate that data_item is not None or empty before processing.

Apply this diff:

+    if data_item is None:
+        raise ValueError("data_item cannot be None")
+
     if "llama_index" in str(type(data_item)):
         # Dynamic import is used because the llama_index module is optional.
         from .transform_data import get_data_from_llama_index

Also applies to: 29-29

elif isinstance(data_item, str):
Expand All @@ -26,7 +26,7 @@ async def save_data_item_with_metadata_to_storage(
file_path = data_item.replace("file://", "")
# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
file_path = save_data_to_file(data_item)
else:
raise IngestionError(message=f"Data type not supported: {type(data_item)}")

Expand Down
Loading
Loading