Skip to content

Commit

Permalink
refactor: Return ingest_data and save_data_to_storage Tasks
Browse files Browse the repository at this point in the history
Returned ingest_data and save_data_to_storage tasks

Refactor #COG-337
  • Loading branch information
dexters1 committed Nov 8, 2024
1 parent e4c39e8 commit 733caa7
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 50 deletions.
4 changes: 2 additions & 2 deletions cognee/api/v1/add/add_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from cognee.modules.pipelines import run_tasks, Task
from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables
from cognee.infrastructure.databases.vector.pgvector import create_db_and_tables as create_pgvector_db_and_tables
from cognee.tasks.ingestion.ingest_data import ingest_data
from cognee.tasks.ingestion.ingest_data_with_metadata import ingest_data_with_metadata


async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_name: str = "main_dataset", user: User = None):
Expand All @@ -15,7 +15,7 @@ async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_nam
user = await get_default_user()

tasks = [
Task(ingest_data, dataset_name, user)
Task(ingest_data_with_metadata, dataset_name, user)
]

pipeline = run_tasks(tasks, data, "add_pipeline")
Expand Down
2 changes: 2 additions & 0 deletions cognee/tasks/ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .ingest_data import ingest_data
from .save_data_to_storage import save_data_to_storage
from .save_data_item_to_storage import save_data_item_to_storage
61 changes: 14 additions & 47 deletions cognee/tasks/ingestion/ingest_data.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,24 @@
import dlt
import cognee.modules.ingestion as ingestion
from typing import Any, Union, BinaryIO
from llama_index.core import Document

from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import get_relational_engine
from .transform_data import get_data_from_llama_index
from cognee.modules.data.methods import create_dataset
from cognee.modules.users.permissions.methods import give_permission_on_document
from .get_dlt_destination import get_dlt_destination
from cognee.modules.ingestion import save_data_to_file

async def ingest_data(data: Any, dataset_name: str, user: User):
async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
destination = get_dlt_destination()

pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
)

def save_data_to_storage(data_item: Union[BinaryIO, Document, str], dataset_name: str) -> str:
# Check if data is of type Document or any of it's subclasses
if isinstance(data_item, Document):
file_path = get_data_from_llama_index(data_item, dataset_name)

# data is a file object coming from upload.
elif hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)

elif isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_path = data_item.replace("file://", "")
# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
else:
raise ValueError(f"Data type not supported: {type(data_item)}")

return file_path

@dlt.resource(standalone = True, merge_key = "id")
async def data_resources(data: Any, user: User):
if not isinstance(data, list):
# Convert data to a list as we work with lists further down.
data = [data]

# Process data
for data_item in data:

file_path = save_data_to_storage(data_item, dataset_name)

# Ingest data and add metadata
async def data_resources(file_paths: str, user: User):
for file_path in file_paths:
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)

Expand All @@ -67,28 +34,28 @@ async def data_resources(data: Any, user: User):
async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session)

data_point = (await session.execute(
data = (await session.execute(
select(Data).filter(Data.id == data_id)
)).scalar_one_or_none()

if data_point is not None:
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]
if data is not None:
data.name = file_metadata["name"]
data.raw_data_location = file_metadata["file_path"]
data.extension = file_metadata["extension"]
data.mime_type = file_metadata["mime_type"]

await session.merge(data_point)
await session.merge(data)
await session.commit()
else:
data_point = Data(
data = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
)

dataset.data.append(data_point)
dataset.data.append(data)
await session.commit()

yield {
Expand All @@ -105,7 +72,7 @@ async def data_resources(data: Any, user: User):

send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id)
run_info = pipeline.run(
data_resources(data, user),
data_resources(file_paths, user),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
Expand Down
92 changes: 92 additions & 0 deletions cognee/tasks/ingestion/ingest_data_with_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import dlt
import cognee.modules.ingestion as ingestion
from typing import Any
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import create_dataset
from cognee.modules.users.permissions.methods import give_permission_on_document
from .get_dlt_destination import get_dlt_destination
from .save_data_item_to_storage import save_data_item_to_storage

async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = get_dlt_destination()

pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
)

@dlt.resource(standalone = True, merge_key = "id")
async def data_resources(data: Any, user: User):
if not isinstance(data, list):
# Convert data to a list as we work with lists further down.
data = [data]

# Process data
for data_item in data:

file_path = save_data_item_to_storage(data_item, dataset_name)

# Ingest data and add metadata
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)

data_id = ingestion.identify(classified_data)

file_metadata = classified_data.get_metadata()

from sqlalchemy import select
from cognee.modules.data.models import Data

db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session)

data_point = (await session.execute(
select(Data).filter(Data.id == data_id)
)).scalar_one_or_none()

if data_point is not None:
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]

await session.merge(data_point)
await session.commit()
else:
data_point = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
)

dataset.data.append(data_point)
await session.commit()

yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
}

await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")


send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id)
run_info = pipeline.run(
data_resources(data, user),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
)
send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id)

return run_info
25 changes: 25 additions & 0 deletions cognee/tasks/ingestion/save_data_item_to_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from llama_index.core import Document
from typing import Union, BinaryIO
from cognee.modules.ingestion import save_data_to_file
from .transform_data import get_data_from_llama_index

def save_data_item_to_storage(data_item: Union[BinaryIO, Document, str], dataset_name: str) -> str:
# Check if data is of type Document or any of it's subclasses
if isinstance(data_item, Document):
file_path = get_data_from_llama_index(data_item, dataset_name)

# data is a file object coming from upload.
elif hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)

elif isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_path = data_item.replace("file://", "")
# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
else:
raise ValueError(f"Data type not supported: {type(data_item)}")

return file_path
15 changes: 15 additions & 0 deletions cognee/tasks/ingestion/save_data_to_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Union, BinaryIO
from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage

def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]:
if not isinstance(data, list):
# Convert data to a list as we work with lists further down.
data = [data]

file_paths = []

for data_item in data:
file_path = save_data_item_to_storage(data_item, dataset_name)
file_paths.append(file_path)

return file_paths
1 change: 0 additions & 1 deletion cognee/tasks/ingestion/transform_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from cognee.modules.ingestion import save_data_to_file
from typing import Union


def get_data_from_llama_index(data_point: Union[Document, ImageDocument], dataset_name: str) -> str:
# Specific type checking is used to ensure it's not a child class from Document
if type(data_point) == Document:
Expand Down

0 comments on commit 733caa7

Please sign in to comment.