From 709a10c50c94617570a0dcbfc347f473289e87f0 Mon Sep 17 00:00:00 2001 From: Boris Arzentar Date: Mon, 5 Aug 2024 23:42:36 +0200 Subject: [PATCH] fix: add dataset and data models --- cognee/api/client.py | 58 ++++-- cognee/api/v1/add/add.py | 54 ++++-- cognee/api/v1/cognify/cognify_v2.py | 183 ++++++++++-------- cognee/api/v1/datasets/datasets.py | 14 +- cognee/infrastructure/data/__init__.py | 3 - cognee/infrastructure/data/models/Data.py | 23 --- .../infrastructure/data/models/DatasetData.py | 12 -- cognee/infrastructure/data/models/__init__.py | 0 .../sqlalchemy/SqlAlchemyAdapter.py | 89 ++------- cognee/modules/data/models/Data.py | 25 +++ .../data/models/Dataset.py | 11 +- cognee/modules/data/models/DatasetData.py | 11 ++ cognee/modules/data/models/__init__.py | 2 + .../data/operations/ensure_dataset_exists.py | 26 +++ .../document_types/AudioDocument.py | 14 +- .../document_types/ImageDocument.py | 16 +- .../processing/document_types/PdfDocument.py | 10 +- .../processing/document_types/TextDocument.py | 10 +- .../ingestion/data_types/BinaryData.py | 2 +- cognee/modules/ingestion/identify.py | 6 +- cognee/modules/pipelines/models/Pipeline.py | 2 +- .../modules/pipelines/models/PipelineRun.py | 8 +- .../modules/pipelines/models/PipelineTask.py | 7 +- cognee/modules/pipelines/models/Task.py | 2 +- cognee/modules/pipelines/models/TaskRun.py | 17 ++ cognee/modules/pipelines/models/TaskStatus.py | 0 cognee/modules/pipelines/models/__init__.py | 1 + .../modules/pipelines/operations/add_task.py | 4 - .../operations/get_pipeline_status.py | 40 ++++ .../operations/log_pipeline_status.py | 14 ++ cognee/modules/tasks/__init__.py | 3 - .../modules/tasks/create_task_status_table.py | 10 - cognee/modules/tasks/get_task_status.py | 18 -- cognee/modules/tasks/update_task_status.py | 5 - .../users/methods/create_default_user.py | 6 +- cognee/modules/users/methods/create_user.py | 5 + .../modules/users/methods/get_default_user.py | 18 +- .../methods/check_permissions_on_documents.py | 36 ++-- .../methods/get_document_ids_for_user.py | 2 +- .../methods/give_permission_on_document.py | 12 +- cognee/tests/test_library.py | 22 +-- 41 files changed, 432 insertions(+), 369 deletions(-) delete mode 100644 cognee/infrastructure/data/models/Data.py delete mode 100644 cognee/infrastructure/data/models/DatasetData.py delete mode 100644 cognee/infrastructure/data/models/__init__.py create mode 100644 cognee/modules/data/models/Data.py rename cognee/{infrastructure => modules}/data/models/Dataset.py (57%) create mode 100644 cognee/modules/data/models/DatasetData.py create mode 100644 cognee/modules/data/models/__init__.py create mode 100644 cognee/modules/data/operations/ensure_dataset_exists.py create mode 100644 cognee/modules/pipelines/models/TaskRun.py delete mode 100644 cognee/modules/pipelines/models/TaskStatus.py create mode 100644 cognee/modules/pipelines/models/__init__.py delete mode 100644 cognee/modules/pipelines/operations/add_task.py create mode 100644 cognee/modules/pipelines/operations/get_pipeline_status.py create mode 100644 cognee/modules/pipelines/operations/log_pipeline_status.py delete mode 100644 cognee/modules/tasks/__init__.py delete mode 100644 cognee/modules/tasks/create_task_status_table.py delete mode 100644 cognee/modules/tasks/get_task_status.py delete mode 100644 cognee/modules/tasks/update_task_status.py diff --git a/cognee/api/client.py b/cognee/api/client.py index 50252319e..b8f578d22 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -39,6 +39,7 @@ async def lifespan(app: FastAPI): app = FastAPI(debug = os.getenv("ENV") != "prod", lifespan = lifespan) origins = [ + "http://127.0.0.1:3000", "http://frontend:3000", "http://localhost:3000", "http://localhost:3001", @@ -107,21 +108,33 @@ def health_check(): """ return {"status": "OK"} -class Payload(BaseModel): - payload: Dict[str, Any] - @app.get("/datasets", response_model=list) async def get_datasets(): - from cognee.api.v1.datasets.datasets import datasets - return await datasets.list_datasets() + try: + from cognee.api.v1.datasets.datasets import datasets + datasets = await datasets.list_datasets() + + return JSONResponse( + status_code = 200, + content = [{ + "id": str(dataset.id), + "name": dataset.name, + "created_at": dataset.created_at, + "updated_at": dataset.updated_at, + "data": dataset.data, + } for dataset in datasets], + ) + except Exception as error: + raise HTTPException(status_code = 500, detail=f"Error retrieving datasets: {str(error)}") from error @app.delete("/datasets/{dataset_id}", response_model=dict) async def delete_dataset(dataset_id: str): from cognee.api.v1.datasets.datasets import datasets - datasets.delete_dataset(dataset_id) + await datasets.delete_dataset(dataset_id) + return JSONResponse( - status_code=200, - content="OK", + status_code = 200, + content = "OK", ) @app.get("/datasets/{dataset_id}/graph", response_model=list) @@ -146,7 +159,7 @@ async def get_dataset_graph(dataset_id: str): @app.get("/datasets/{dataset_id}/data", response_model=list) async def get_dataset_data(dataset_id: str): from cognee.api.v1.datasets.datasets import datasets - dataset_data = datasets.list_data(dataset_id) + dataset_data = await datasets.list_data(dataset_id) if dataset_data is None: raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.") return [ @@ -162,17 +175,24 @@ async def get_dataset_data(dataset_id: str): @app.get("/datasets/status", response_model=dict) async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None): from cognee.api.v1.datasets.datasets import datasets as cognee_datasets - datasets_statuses = cognee_datasets.get_status(datasets) - return JSONResponse( - status_code = 200, - content = datasets_statuses, - ) + try: + datasets_statuses = await cognee_datasets.get_status(datasets) + + return JSONResponse( + status_code = 200, + content = datasets_statuses, + ) + except Exception as error: + return JSONResponse( + status_code = 409, + content = {"error": str(error)} + ) @app.get("/datasets/{dataset_id}/data/{data_id}/raw", response_class=FileResponse) async def get_raw_data(dataset_id: str, data_id: str): from cognee.api.v1.datasets.datasets import datasets - dataset_data = datasets.list_data(dataset_id) + dataset_data = await datasets.list_data(dataset_id) if dataset_data is None: raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.") data = [data for data in dataset_data if data["id"] == data_id][0] @@ -312,10 +332,10 @@ def start_api_server(host: str = "0.0.0.0", port: int = 8000): try: logger.info("Starting server at %s:%s", host, port) - import asyncio - from cognee.modules.data.deletion import prune_system, prune_data - asyncio.run(prune_data()) - asyncio.run(prune_system(metadata = True)) + # import asyncio + # from cognee.modules.data.deletion import prune_system, prune_data + # asyncio.run(prune_data()) + # asyncio.run(prune_system(metadata = True)) uvicorn.run(app, host = host, port = port) except Exception as e: diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 3fad96e52..31a36316b 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -9,10 +9,11 @@ from cognee.modules.ingestion import get_matched_datasets, save_data_to_file from cognee.shared.utils import send_telemetry from cognee.base_config import get_base_config -from cognee.infrastructure.databases.relational import get_relational_config, create_db_and_tables -from cognee.modules.users.methods import create_default_user +from cognee.infrastructure.databases.relational import get_relational_config, get_relational_engine, create_db_and_tables +from cognee.modules.users.methods import create_default_user, get_default_user from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.modules.users.models import User +from cognee.modules.data.operations.ensure_dataset_exists import ensure_dataset_exists async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset", user: User = None): await create_db_and_tables() @@ -99,6 +100,9 @@ async def add_files(file_paths: List[str], dataset_name: str, user): destination = destination, ) + dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset" + dataset = await ensure_dataset_exists(dataset_name) + @dlt.resource(standalone = True, merge_key = "id") async def data_resources(file_paths: str, user: User): for file_path in file_paths: @@ -107,16 +111,34 @@ async def data_resources(file_paths: str, user: User): data_id = ingestion.identify(classified_data) - if user is None: - try: - user = await create_default_user() - - await give_permission_on_document(user, data_id, "read") - await give_permission_on_document(user, data_id, "write") - except: - pass file_metadata = classified_data.get_metadata() + from sqlalchemy import select + from cognee.modules.data.models import Data + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + data = (await session.execute( + select(Data).filter(Data.id == data_id) + )).scalar_one_or_none() + + if data is not None: + data.name = file_metadata["name"] + data.raw_data_location = file_metadata["file_path"] + data.extension = file_metadata["extension"] + data.mime_type = file_metadata["mime_type"] + + await session.merge(data) + else: + data = Data( + name = file_metadata["name"], + raw_data_location = file_metadata["file_path"], + extension = file_metadata["extension"], + mime_type = file_metadata["mime_type"], + ) + dataset.data.append(data) + + await session.merge(dataset) + yield { "id": data_id, "name": file_metadata["name"], @@ -125,10 +147,20 @@ async def data_resources(file_paths: str, user: User): "mime_type": file_metadata["mime_type"], } + await give_permission_on_document(user, data_id, "read") + await give_permission_on_document(user, data_id, "write") + + + if user is None: + user = await get_default_user() + + if user is None: + user = await create_default_user() + run_info = pipeline.run( data_resources(processed_file_paths, user), table_name = "file_metadata", - dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset", + dataset_name = dataset_name, write_disposition = "merge", ) send_telemetry("cognee.add") diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index d17cab4f2..163dda67d 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -19,10 +19,11 @@ from cognee.modules.data.extraction.knowledge_graph.establish_graph_topology import establish_graph_topology from cognee.modules.pipelines.tasks.Task import Task from cognee.modules.pipelines import run_tasks, run_tasks_parallel -from cognee.modules.tasks import create_task_status_table, update_task_status, get_task_status from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.users.permissions.methods import check_permissions_on_documents +from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status +from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status logger = logging.getLogger("cognify.v2") @@ -35,97 +36,109 @@ def __init__(self, message: str): async def cognify(datasets: Union[str, list[str]] = None, user: User = None): db_engine = get_relational_engine() - await create_task_status_table() if datasets is None or len(datasets) == 0: return await cognify(await db_engine.get_datasets()) - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - - if user is None: - user = await get_default_user(session= session) - - async def run_cognify_pipeline(dataset_name: str, files: list[dict]): - documents = [ - PdfDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "pdf" else - AudioDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "audio" else - ImageDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "image" else - TextDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) - for file in files + if user is None: + user = await get_default_user() + + async def run_cognify_pipeline(dataset_name: str, files: list[dict]): + documents = [ + PdfDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "pdf" else + AudioDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "audio" else + ImageDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "image" else + TextDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) + for file in files + ] + + document_ids = [document.id for document in documents] + + await check_permissions_on_documents( + user, + "read", + document_ids, + ) + + async with update_status_lock: + task_status = await get_pipeline_status([dataset_name]) + + if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED": + logger.info(f"Dataset {dataset_name} is being processed.") + return + + await log_pipeline_status(dataset_name, "DATASET_PROCESSING_STARTED", { + "dataset_name": dataset_name, + "files": document_ids, + }) + try: + cognee_config = get_cognify_config() + graph_config = get_graph_config() + root_node_id = None + + if graph_config.infer_graph_topology and graph_config.graph_topology_task: + from cognee.modules.topology.topology import TopologyEngine + topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology) + root_node_id = await topology_engine.add_graph_topology(files = files) + elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology: + from cognee.modules.topology.topology import TopologyEngine + topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology) + await topology_engine.add_graph_topology(graph_config.topology_file_path) + elif not graph_config.graph_topology_task: + root_node_id = "ROOT" + + tasks = [ + Task(process_documents, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type + Task(establish_graph_topology, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data + Task(expand_knowledge_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes + Task(filter_affected_chunks, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks + Task( + save_data_chunks, + collection_name = "chunks", + ), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other) + run_tasks_parallel([ + Task( + summarize_text_chunks, + summarization_model = cognee_config.summarization_model, + collection_name = "chunk_summaries", + ), # Summarize the document chunks + Task( + classify_text_chunks, + classification_model = cognee_config.classification_model, + ), + ]), + Task(remove_obsolete_chunks), # Remove the obsolete document chunks. ] - await check_permissions_on_documents(user, "read", [document.id for document in documents], session=session) - - async with update_status_lock: - task_status = get_task_status([dataset_name]) - - if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED": - logger.info(f"Dataset {dataset_name} is being processed.") - return - - update_task_status(dataset_name, "DATASET_PROCESSING_STARTED") - try: - cognee_config = get_cognify_config() - graph_config = get_graph_config() - root_node_id = None - - if graph_config.infer_graph_topology and graph_config.graph_topology_task: - from cognee.modules.topology.topology import TopologyEngine - topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology) - root_node_id = await topology_engine.add_graph_topology(files = files) - elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology: - from cognee.modules.topology.topology import TopologyEngine - topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology) - await topology_engine.add_graph_topology(graph_config.topology_file_path) - elif not graph_config.graph_topology_task: - root_node_id = "ROOT" - - tasks = [ - Task(process_documents, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type - Task(establish_graph_topology, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data - Task(expand_knowledge_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes - Task(filter_affected_chunks, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks - Task( - save_data_chunks, - collection_name = "chunks", - ), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other) - run_tasks_parallel([ - Task( - summarize_text_chunks, - summarization_model = cognee_config.summarization_model, - collection_name = "chunk_summaries", - ), # Summarize the document chunks - Task( - classify_text_chunks, - classification_model = cognee_config.classification_model, - ), - ]), - Task(remove_obsolete_chunks), # Remove the obsolete document chunks. - ] - - pipeline = run_tasks(tasks, documents) - - async for result in pipeline: - print(result) - - update_task_status(dataset_name, "DATASET_PROCESSING_FINISHED") - except Exception as error: - update_task_status(dataset_name, "DATASET_PROCESSING_ERROR") - raise error - - - existing_datasets = await db_engine.get_datasets() - awaitables = [] - - for dataset in datasets: - dataset_name = generate_dataset_name(dataset) - - if dataset_name in existing_datasets: - awaitables.append(run_cognify_pipeline(dataset, await db_engine.get_files_metadata(dataset_name))) - - return await asyncio.gather(*awaitables) + pipeline = run_tasks(tasks, documents) + + async for result in pipeline: + print(result) + + await log_pipeline_status(dataset_name, "DATASET_PROCESSING_FINISHED", { + "dataset_name": dataset_name, + "files": document_ids, + }) + except Exception as error: + await log_pipeline_status(dataset_name, "DATASET_PROCESSING_ERROR", { + "dataset_name": dataset_name, + "files": document_ids, + }) + raise error + + + existing_datasets = [dataset.name for dataset in list(await db_engine.get_datasets())] + awaitables = [] + + for dataset_name in datasets: + dataset_name = generate_dataset_name(dataset_name) + + if dataset_name in existing_datasets: + awaitables.append(run_cognify_pipeline(dataset_name, await db_engine.get_files_metadata(dataset_name))) + + return await asyncio.gather(*awaitables) + def generate_dataset_name(dataset_name: str) -> str: return dataset_name.replace(".", "_").replace(" ", "_") diff --git a/cognee/api/v1/datasets/datasets.py b/cognee/api/v1/datasets/datasets.py index d46fd2413..8a44e0411 100644 --- a/cognee/api/v1/datasets/datasets.py +++ b/cognee/api/v1/datasets/datasets.py @@ -1,6 +1,6 @@ from duckdb import CatalogException from cognee.modules.ingestion import discover_directory_datasets -from cognee.modules.tasks import get_task_status +from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status from cognee.infrastructure.databases.relational import get_relational_engine class datasets(): @@ -14,24 +14,24 @@ def discover_datasets(directory_path: str): return list(discover_directory_datasets(directory_path).keys()) @staticmethod - def list_data(dataset_name: str): + async def list_data(dataset_name: str): db = get_relational_engine() try: - return db.get_files_metadata(dataset_name) + return await db.get_files_metadata(dataset_name) except CatalogException: return None @staticmethod - def get_status(dataset_ids: list[str]) -> dict: + async def get_status(dataset_ids: list[str]) -> dict: try: - return get_task_status(dataset_ids) + return await get_pipeline_status(dataset_ids) except CatalogException: return {} @staticmethod - def delete_dataset(dataset_id: str): + async def delete_dataset(dataset_id: str): db = get_relational_engine() try: - return db.delete_table(dataset_id) + return await db.delete_table(dataset_id) except CatalogException: return {} diff --git a/cognee/infrastructure/data/__init__.py b/cognee/infrastructure/data/__init__.py index 6aaaf6b41..e69de29bb 100644 --- a/cognee/infrastructure/data/__init__.py +++ b/cognee/infrastructure/data/__init__.py @@ -1,3 +0,0 @@ -from .models.Data import Data -from .models.Dataset import Dataset -from .models.DatasetData import DatasetData diff --git a/cognee/infrastructure/data/models/Data.py b/cognee/infrastructure/data/models/Data.py deleted file mode 100644 index da389e154..000000000 --- a/cognee/infrastructure/data/models/Data.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import List -from datetime import datetime, timezone -from sqlalchemy.orm import relationship, MappedColumn, Mapped -from sqlalchemy import Column, String, DateTime, UUID, Text, JSON -from cognee.infrastructure.databases.relational import Base -from .DatasetData import DatasetData - -class Data(Base): - __tablename__ = "data" - - id = Column(UUID, primary_key = True) - name = Column(String, nullable = True) - description = Column(Text, nullable = True) - raw_data_location = Column(String) - meta_data: Mapped[dict] = MappedColumn(type_ = JSON) # metadata is reserved word - - created_at = Column(DateTime, default = datetime.now(timezone.utc)) - updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc)) - - datasets: Mapped[List["Dataset"]] = relationship( - secondary = DatasetData.__tablename__, - back_populates = "data" - ) diff --git a/cognee/infrastructure/data/models/DatasetData.py b/cognee/infrastructure/data/models/DatasetData.py deleted file mode 100644 index c84d890ae..000000000 --- a/cognee/infrastructure/data/models/DatasetData.py +++ /dev/null @@ -1,12 +0,0 @@ -from uuid import uuid4 -from datetime import datetime, timezone -from sqlalchemy import Column, DateTime, UUID, ForeignKey -from cognee.infrastructure.databases.relational import Base - -class DatasetData(Base): - __tablename__ = "dataset_data" - - id = Column(UUID, primary_key=True, default=uuid4) - - dataset_id = Column(UUID, ForeignKey("dataset.id"), nullable=False) - data_id = Column(UUID, ForeignKey("data.id"), nullable=False) diff --git a/cognee/infrastructure/data/models/__init__.py b/cognee/infrastructure/data/models/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py index 4ca533aca..7fcced7ec 100644 --- a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py +++ b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py @@ -2,11 +2,12 @@ import asyncio from typing import AsyncGenerator from contextlib import asynccontextmanager -from sqlalchemy import create_engine, MetaData, Table, Column, String, Boolean, TIMESTAMP, text -from sqlalchemy.orm import sessionmaker +from sqlalchemy import create_engine, text, select +from sqlalchemy.orm import sessionmaker, joinedload from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from cognee.infrastructure.files.storage import LocalStorage from cognee.infrastructure.databases.relational.FakeAsyncSession import FakeAsyncSession +from ..ModelBase import Base def make_async_sessionmaker(sessionmaker): @asynccontextmanager @@ -19,6 +20,7 @@ async def async_session_maker(): class SQLAlchemyAdapter(): def __init__(self, db_type: str, db_path: str, db_name: str, db_user: str, db_password: str, db_host: str, db_port: str): self.db_location = os.path.abspath(os.path.join(db_path, db_name)) + self.db_name = db_name if db_type == "duckdb": LocalStorage.ensure_directory_exists(db_path) @@ -42,15 +44,11 @@ def get_session(self): yield session async def get_datasets(self): - async with self.engine.connect() as connection: - result = await connection.execute(text("SELECT DISTINCT table_schema FROM information_schema.tables;")) - tables = [row[0] for row in result] - return list( - filter( - lambda table_schema: not table_schema.endswith("staging") and table_schema != "cognee", - tables - ) - ) + from cognee.modules.data.models import Dataset + + async with self.get_async_session() as session: + datasets = (await session.execute(select(Dataset).options(joinedload(Dataset.data)))).unique().scalars().all() + return datasets async def create_table(self, schema_name: str, table_name: str, table_config: list[dict]): fields_query_parts = [f"{item['name']} {item['type']}" for item in table_config] @@ -100,65 +98,6 @@ async def execute_query(self, query): result = await connection.execute(text(query)) return [dict(row) for row in result] - async def load_cognify_data(self, data): - metadata = MetaData() - - cognify_table = Table( - "cognify", - metadata, - Column("document_id", String), - Column("created_at", TIMESTAMP, server_default=text("CURRENT_TIMESTAMP")), - Column("updated_at", TIMESTAMP, nullable=True, default=None), - Column("processed", Boolean, default=False), - Column("document_id_target", String, nullable=True) - ) - - async with self.engine.begin() as connection: - await connection.run_sync(metadata.create_all) - - insert_query = cognify_table.insert().values(document_id=text(":document_id")) - async with self.engine.connect() as connection: - await connection.execute(insert_query, data) - - async def fetch_cognify_data(self, excluded_document_id: str): - async with self.engine.connect() as connection: - await connection.execute(text(""" - CREATE TABLE IF NOT EXISTS cognify ( - document_id VARCHAR, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT NULL, - processed BOOLEAN DEFAULT FALSE, - document_id_target VARCHAR NULL - ); - """)) - query = text(""" - SELECT document_id, created_at, updated_at, processed - FROM cognify - WHERE document_id != :excluded_document_id AND processed = FALSE; - """) - records = await connection.execute(query, {"excluded_document_id": excluded_document_id}) - records = records.fetchall() - - if records: - document_ids = tuple(record["document_id"] for record in records) - update_query = text("UPDATE cognify SET processed = TRUE WHERE document_id IN :document_ids;") - await connection.execute(update_query, {"document_ids": document_ids}) - return [dict(record) for record in records] - - async def delete_cognify_data(self): - async with self.engine.connect() as connection: - await connection.execute(text(""" - CREATE TABLE IF NOT EXISTS cognify ( - document_id VARCHAR, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT NULL, - processed BOOLEAN DEFAULT FALSE, - document_id_target VARCHAR NULL - ); - """)) - await connection.execute(text("DELETE FROM cognify;")) - await connection.execute(text("DROP TABLE cognify;")) - async def drop_tables(self, connection): try: await connection.execute(text("DROP TABLE IF EXISTS group_permission CASCADE")) @@ -169,9 +108,11 @@ async def drop_tables(self, connection): print(f"Error dropping database tables: {e}") async def delete_database(self): - async with self.engine.begin() as connection: + async with self.engine.connect() as connection: try: - await self.drop_tables(connection) - print("Database tables dropped successfully.") + async with self.engine.begin() as connection: + await connection.run_sync(Base.metadata.drop_all) + + print("Database deleted successfully.") except Exception as e: - print(f"Error dropping database tables: {e}") \ No newline at end of file + print(f"Error deleting database: {e}") diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py new file mode 100644 index 000000000..a1d20ebee --- /dev/null +++ b/cognee/modules/data/models/Data.py @@ -0,0 +1,25 @@ +from uuid import uuid4 +from typing import List +from datetime import datetime, timezone +from sqlalchemy.orm import relationship, Mapped +from sqlalchemy import Column, String, DateTime, UUID +from cognee.infrastructure.databases.relational import Base +from .DatasetData import DatasetData + +class Data(Base): + __tablename__ = "data" + + id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4) + + name = Column(String) + extension = Column(String) + mime_type = Column(String) + raw_data_location = Column(String) + + created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) + + datasets: Mapped[List["Dataset"]] = relationship( + secondary = DatasetData.__tablename__, + back_populates = "data" + ) diff --git a/cognee/infrastructure/data/models/Dataset.py b/cognee/modules/data/models/Dataset.py similarity index 57% rename from cognee/infrastructure/data/models/Dataset.py rename to cognee/modules/data/models/Dataset.py index 918cd5b28..32d26dac5 100644 --- a/cognee/infrastructure/data/models/Dataset.py +++ b/cognee/modules/data/models/Dataset.py @@ -1,3 +1,4 @@ +from uuid import uuid4 from typing import List from datetime import datetime, timezone from sqlalchemy.orm import relationship, Mapped @@ -6,14 +7,14 @@ from .DatasetData import DatasetData class Dataset(Base): - __tablename__ = "dataset" + __tablename__ = "datasets" + + id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4) - id = Column(UUID, primary_key = True) name = Column(Text) - description = Column(Text, nullable = True) - created_at = Column(DateTime, default = datetime.now(timezone.utc)) - updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc)) + created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) data: Mapped[List["Data"]] = relationship( secondary = DatasetData.__tablename__, diff --git a/cognee/modules/data/models/DatasetData.py b/cognee/modules/data/models/DatasetData.py new file mode 100644 index 000000000..d340889f2 --- /dev/null +++ b/cognee/modules/data/models/DatasetData.py @@ -0,0 +1,11 @@ +from datetime import datetime, timezone +from sqlalchemy import Column, DateTime, UUID, ForeignKey +from cognee.infrastructure.databases.relational import Base + +class DatasetData(Base): + __tablename__ = "dataset_data" + + created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) + + dataset_id = Column(UUID(as_uuid = True), ForeignKey("datasets.id"), primary_key = True) + data_id = Column(UUID(as_uuid = True), ForeignKey("data.id"), primary_key = True) diff --git a/cognee/modules/data/models/__init__.py b/cognee/modules/data/models/__init__.py new file mode 100644 index 000000000..5d79dbd40 --- /dev/null +++ b/cognee/modules/data/models/__init__.py @@ -0,0 +1,2 @@ +from .Data import Data +from .Dataset import Dataset diff --git a/cognee/modules/data/operations/ensure_dataset_exists.py b/cognee/modules/data/operations/ensure_dataset_exists.py new file mode 100644 index 000000000..9de662cae --- /dev/null +++ b/cognee/modules/data/operations/ensure_dataset_exists.py @@ -0,0 +1,26 @@ +from sqlalchemy import select +from sqlalchemy.orm import joinedload +from cognee.modules.data.models import Dataset +from cognee.infrastructure.databases.relational import get_relational_engine + +async def ensure_dataset_exists(dataset_name: str) -> Dataset: + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + dataset = (await session.scalars( + select(Dataset)\ + .options(joinedload(Dataset.data))\ + .filter(Dataset.name == dataset_name) + )).first() + + if dataset is None: + dataset = Dataset( + name = dataset_name, + data = [] + ) + + session.add(dataset) + + await session.commit() + + return dataset diff --git a/cognee/modules/data/processing/document_types/AudioDocument.py b/cognee/modules/data/processing/document_types/AudioDocument.py index bc129b66b..e3aa6dca7 100644 --- a/cognee/modules/data/processing/document_types/AudioDocument.py +++ b/cognee/modules/data/processing/document_types/AudioDocument.py @@ -1,5 +1,5 @@ -from uuid import uuid5, NAMESPACE_OID -from typing import Optional, Generator +from uuid import UUID, uuid5, NAMESPACE_OID +from typing import Optional from cognee.infrastructure.llm.get_llm_client import get_llm_client from cognee.modules.data.chunking import chunk_by_paragraph @@ -7,10 +7,10 @@ from cognee.modules.data.processing.document_types.Document import Document class AudioReader: - id: str + id: UUID file_path: str - def __init__(self, id: str, file_path: str): + def __init__(self, id: UUID, file_path: str): self.id = id self.file_path = file_path self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc. @@ -87,13 +87,11 @@ class AudioDocument(Document): title: str file_path: str - def __init__(self, title: str, file_path: str): - self.id = uuid5(NAMESPACE_OID, title) + def __init__(self, id: UUID, title: str, file_path: str): + self.id = id or uuid5(NAMESPACE_OID, title) self.title = title self.file_path = file_path - reader = AudioReader(self.id, self.file_path) - def get_reader(self) -> AudioReader: reader = AudioReader(self.id, self.file_path) return reader diff --git a/cognee/modules/data/processing/document_types/ImageDocument.py b/cognee/modules/data/processing/document_types/ImageDocument.py index aa99c672b..faddb3120 100644 --- a/cognee/modules/data/processing/document_types/ImageDocument.py +++ b/cognee/modules/data/processing/document_types/ImageDocument.py @@ -1,5 +1,5 @@ -from uuid import uuid5, NAMESPACE_OID -from typing import Optional, Generator +from uuid import UUID, uuid5, NAMESPACE_OID +from typing import Optional from cognee.infrastructure.llm.get_llm_client import get_llm_client from cognee.modules.data.chunking import chunk_by_paragraph @@ -7,10 +7,10 @@ from cognee.modules.data.processing.document_types.Document import Document class ImageReader: - id: str + id: UUID file_path: str - def __init__(self, id: str, file_path: str): + def __init__(self, id: UUID, file_path: str): self.id = id self.file_path = file_path self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc. @@ -24,10 +24,8 @@ def read(self, max_chunk_size: Optional[int] = 1024): # Transcribe the image file result = self.llm_client.transcribe_image(self.file_path) - print("Transcription result: ", result.choices[0].message.content) text = result.choices[0].message.content - # Simulate reading text in chunks as done in TextReader def read_text_chunks(text, chunk_size): for i in range(0, len(text), chunk_size): @@ -89,13 +87,11 @@ class ImageDocument(Document): title: str file_path: str - def __init__(self, title: str, file_path: str): - self.id = uuid5(NAMESPACE_OID, title) + def __init__(self, id: UUID, title: str, file_path: str): + self.id = id or uuid5(NAMESPACE_OID, title) self.title = title self.file_path = file_path - reader = ImageReader(self.id, self.file_path) - def get_reader(self) -> ImageReader: reader = ImageReader(self.id, self.file_path) return reader diff --git a/cognee/modules/data/processing/document_types/PdfDocument.py b/cognee/modules/data/processing/document_types/PdfDocument.py index 9402da7c9..b2579a06a 100644 --- a/cognee/modules/data/processing/document_types/PdfDocument.py +++ b/cognee/modules/data/processing/document_types/PdfDocument.py @@ -1,6 +1,6 @@ # import pdfplumber import logging -from uuid import uuid5, NAMESPACE_OID +from uuid import UUID, uuid5, NAMESPACE_OID from typing import Optional from pypdf import PdfReader as pypdf_PdfReader from cognee.modules.data.chunking import chunk_by_paragraph @@ -8,10 +8,10 @@ from .Document import Document class PdfReader(): - id: str + id: UUID file_path: str - def __init__(self, id: str, file_path: str): + def __init__(self, id: UUID, file_path: str): self.id = id self.file_path = file_path @@ -86,8 +86,8 @@ class PdfDocument(Document): num_pages: int file_path: str - def __init__(self, title: str, file_path: str): - self.id = uuid5(NAMESPACE_OID, title) + def __init__(self, id: UUID, title: str, file_path: str): + self.id = id or uuid5(NAMESPACE_OID, title) self.title = title self.file_path = file_path logging.debug("file_path: %s", self.file_path) diff --git a/cognee/modules/data/processing/document_types/TextDocument.py b/cognee/modules/data/processing/document_types/TextDocument.py index 2e5bf4452..7e9ca8862 100644 --- a/cognee/modules/data/processing/document_types/TextDocument.py +++ b/cognee/modules/data/processing/document_types/TextDocument.py @@ -1,14 +1,14 @@ -from uuid import uuid5, NAMESPACE_OID +from uuid import UUID, uuid5, NAMESPACE_OID from typing import Optional from cognee.modules.data.chunking import chunk_by_paragraph from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk from .Document import Document class TextReader(): - id: str + id: UUID file_path: str - def __init__(self, id: str, file_path: str): + def __init__(self, id: UUID, file_path: str): self.id = id self.file_path = file_path @@ -90,8 +90,8 @@ class TextDocument(Document): num_pages: int file_path: str - def __init__(self, title: str, file_path: str): - self.id = uuid5(NAMESPACE_OID, title) + def __init__(self, id: UUID, title: str, file_path: str): + self.id = id or uuid5(NAMESPACE_OID, title) self.title = title self.file_path = file_path diff --git a/cognee/modules/ingestion/data_types/BinaryData.py b/cognee/modules/ingestion/data_types/BinaryData.py index 5531fec86..6959eb15f 100644 --- a/cognee/modules/ingestion/data_types/BinaryData.py +++ b/cognee/modules/ingestion/data_types/BinaryData.py @@ -17,7 +17,7 @@ def __init__(self, data: BinaryIO, name: str = None): def get_identifier(self): metadata = self.get_metadata() - return self.name + "_" + metadata["mime_type"] + return self.name + "." + metadata["extension"] def get_metadata(self): self.ensure_metadata() diff --git a/cognee/modules/ingestion/identify.py b/cognee/modules/ingestion/identify.py index 89c347449..745aab913 100644 --- a/cognee/modules/ingestion/identify.py +++ b/cognee/modules/ingestion/identify.py @@ -1,9 +1,7 @@ -from uuid import uuid5, UUID +from uuid import uuid5, NAMESPACE_OID from .data_types import IngestionData -null_uuid: UUID = UUID("00000000-0000-0000-0000-000000000000") - def identify(data: IngestionData) -> str: data_id: str = data.get_identifier() - return str(uuid5(null_uuid, data_id)).replace("-", "") + return uuid5(NAMESPACE_OID, data_id) diff --git a/cognee/modules/pipelines/models/Pipeline.py b/cognee/modules/pipelines/models/Pipeline.py index 8005844be..8904e00dc 100644 --- a/cognee/modules/pipelines/models/Pipeline.py +++ b/cognee/modules/pipelines/models/Pipeline.py @@ -8,7 +8,7 @@ class Pipeline(Base): __tablename__ = "pipelines" - id = Column(UUID, primary_key = True, default = uuid4) + id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4) name = Column(String) description = Column(Text, nullable = True) diff --git a/cognee/modules/pipelines/models/PipelineRun.py b/cognee/modules/pipelines/models/PipelineRun.py index bc5f890d9..fd34b8d98 100644 --- a/cognee/modules/pipelines/models/PipelineRun.py +++ b/cognee/modules/pipelines/models/PipelineRun.py @@ -6,10 +6,12 @@ class PipelineRun(Base): __tablename__ = "pipeline_runs" - id = Column(UUID, primary_key = True, default = uuid4) - - dataset_name = Column(String) + id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4) created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) + run_name = Column(String, index = True) + + status = Column(String) + run_info = Column(JSON) diff --git a/cognee/modules/pipelines/models/PipelineTask.py b/cognee/modules/pipelines/models/PipelineTask.py index 0c70baddc..079b12f47 100644 --- a/cognee/modules/pipelines/models/PipelineTask.py +++ b/cognee/modules/pipelines/models/PipelineTask.py @@ -1,4 +1,3 @@ -from uuid import uuid4 from datetime import datetime, timezone from sqlalchemy import Column, DateTime, UUID, ForeignKey from cognee.infrastructure.databases.relational import Base @@ -6,9 +5,7 @@ class PipelineTask(Base): __tablename__ = "pipeline_task" - id = Column(UUID, primary_key = True, default = uuid4) - created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) - pipeline_id = Column("pipeline", UUID, ForeignKey("pipeline.id"), primary_key = True) - task_id = Column("task", UUID, ForeignKey("task.id"), primary_key = True) + pipeline_id = Column("pipeline", UUID(as_uuid = True), ForeignKey("pipeline.id"), primary_key = True) + task_id = Column("task", UUID(as_uuid = True), ForeignKey("task.id"), primary_key = True) diff --git a/cognee/modules/pipelines/models/Task.py b/cognee/modules/pipelines/models/Task.py index c3ee89e41..3a56f4b71 100644 --- a/cognee/modules/pipelines/models/Task.py +++ b/cognee/modules/pipelines/models/Task.py @@ -8,7 +8,7 @@ class Task(Base): __tablename__ = "tasks" - id = Column(UUID, primary_key = True, default = uuid4) + id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4) name = Column(String) description = Column(Text, nullable = True) diff --git a/cognee/modules/pipelines/models/TaskRun.py b/cognee/modules/pipelines/models/TaskRun.py new file mode 100644 index 000000000..89cda8b44 --- /dev/null +++ b/cognee/modules/pipelines/models/TaskRun.py @@ -0,0 +1,17 @@ +from uuid import uuid4 +from datetime import datetime, timezone +from sqlalchemy import Column, UUID, DateTime, String, JSON +from cognee.infrastructure.databases.relational import Base + +class TaskRun(Base): + __tablename__ = "task_runs" + + id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4) + + task_name = Column(String) + + created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) + + status = Column(String) + + run_info = Column(JSON) diff --git a/cognee/modules/pipelines/models/TaskStatus.py b/cognee/modules/pipelines/models/TaskStatus.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/cognee/modules/pipelines/models/__init__.py b/cognee/modules/pipelines/models/__init__.py new file mode 100644 index 000000000..27a936983 --- /dev/null +++ b/cognee/modules/pipelines/models/__init__.py @@ -0,0 +1 @@ +from .PipelineRun import PipelineRun diff --git a/cognee/modules/pipelines/operations/add_task.py b/cognee/modules/pipelines/operations/add_task.py deleted file mode 100644 index deb491993..000000000 --- a/cognee/modules/pipelines/operations/add_task.py +++ /dev/null @@ -1,4 +0,0 @@ -from ..models import Pipeline, Task - -def add_task(pipeline: Pipeline, task: Task): - pipeline.tasks.append(task) diff --git a/cognee/modules/pipelines/operations/get_pipeline_status.py b/cognee/modules/pipelines/operations/get_pipeline_status.py new file mode 100644 index 000000000..2f190ac3b --- /dev/null +++ b/cognee/modules/pipelines/operations/get_pipeline_status.py @@ -0,0 +1,40 @@ +from sqlalchemy import func, select +from sqlalchemy.orm import aliased +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models import PipelineRun + +async def get_pipeline_status(pipeline_names: [str]): + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + query = select( + PipelineRun, + func.row_number().over( + partition_by = PipelineRun.run_name, + order_by = PipelineRun.created_at.desc(), + ).label("rn") + ).filter(PipelineRun.run_name.in_(pipeline_names)).subquery() + + aliased_pipeline_run = aliased(PipelineRun, query) + + latest_runs = ( + select(aliased_pipeline_run).filter(query.c.rn == 1) + ) + + runs = (await session.execute(latest_runs)).scalars().all() + + pipeline_statuses = { + run.run_name: run.status for run in runs + } + + return pipeline_statuses + + # f"""SELECT data_id, status + # FROM ( + # SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn + # FROM cognee.cognee.task_runs + # WHERE data_id IN ({formatted_data_ids}) + # ) t + # WHERE rn = 1;""" + + # return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses } diff --git a/cognee/modules/pipelines/operations/log_pipeline_status.py b/cognee/modules/pipelines/operations/log_pipeline_status.py new file mode 100644 index 000000000..5b484ebf3 --- /dev/null +++ b/cognee/modules/pipelines/operations/log_pipeline_status.py @@ -0,0 +1,14 @@ +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models.PipelineRun import PipelineRun + +async def log_pipeline_status(run_name: str, status: str, run_info: dict): + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + session.add(PipelineRun( + run_name = run_name, + status = status, + run_info = run_info, + )) + + await session.commit() diff --git a/cognee/modules/tasks/__init__.py b/cognee/modules/tasks/__init__.py deleted file mode 100644 index e0cc3da4a..000000000 --- a/cognee/modules/tasks/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .get_task_status import get_task_status -from .update_task_status import update_task_status -from .create_task_status_table import create_task_status_table diff --git a/cognee/modules/tasks/create_task_status_table.py b/cognee/modules/tasks/create_task_status_table.py deleted file mode 100644 index 48763600d..000000000 --- a/cognee/modules/tasks/create_task_status_table.py +++ /dev/null @@ -1,10 +0,0 @@ -from cognee.infrastructure.databases.relational import get_relational_engine - -async def create_task_status_table(): - db_engine = get_relational_engine() - - await db_engine.create_table("cognee", "cognee_task_status", [ - dict(name="data_id", type="VARCHAR"), - dict(name="status", type="VARCHAR"), - dict(name="created_at", type="TIMESTAMP DEFAULT CURRENT_TIMESTAMP"), - ]) diff --git a/cognee/modules/tasks/get_task_status.py b/cognee/modules/tasks/get_task_status.py deleted file mode 100644 index 17b796084..000000000 --- a/cognee/modules/tasks/get_task_status.py +++ /dev/null @@ -1,18 +0,0 @@ -from cognee.infrastructure.databases.relational import get_relational_engine - -def get_task_status(data_ids: [str]): - db_engine = get_relational_engine() - - formatted_data_ids = ", ".join([f"'{data_id}'" for data_id in data_ids]) - - datasets_statuses = db_engine.execute_query( - f"""SELECT data_id, status - FROM ( - SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn - FROM cognee.cognee.cognee_task_status - WHERE data_id IN ({formatted_data_ids}) - ) t - WHERE rn = 1;""" - ) - - return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses } diff --git a/cognee/modules/tasks/update_task_status.py b/cognee/modules/tasks/update_task_status.py deleted file mode 100644 index 0676a5a92..000000000 --- a/cognee/modules/tasks/update_task_status.py +++ /dev/null @@ -1,5 +0,0 @@ -from cognee.infrastructure.databases.relational import get_relational_engine - -def update_task_status(data_id: str, status: str): - db_engine = get_relational_engine() - db_engine.insert_data("cognee.cognee", "cognee_task_status", [dict(data_id = data_id, status = status)]) diff --git a/cognee/modules/users/methods/create_default_user.py b/cognee/modules/users/methods/create_default_user.py index 673756ed1..3fae3c10f 100644 --- a/cognee/modules/users/methods/create_default_user.py +++ b/cognee/modules/users/methods/create_default_user.py @@ -1,5 +1,4 @@ import hashlib -# from cognee.infrastructure.databases.relational import get_relational_engine from .create_user import create_user async def create_default_user(): @@ -12,12 +11,9 @@ async def create_default_user(): is_superuser = True, is_active = True, is_verified = True, + auto_login = True, ) - # db_engine = get_relational_engine() - # async with db_engine.get_async_session() as session: - # await session.refresh(user) - return user async def hash_password(password: str) -> str: diff --git a/cognee/modules/users/methods/create_user.py b/cognee/modules/users/methods/create_user.py index f2886caae..3145fd799 100644 --- a/cognee/modules/users/methods/create_user.py +++ b/cognee/modules/users/methods/create_user.py @@ -10,6 +10,7 @@ async def create_user( is_superuser: bool = False, is_active: bool = True, is_verified: bool = False, + auto_login: bool = False, ): try: relational_engine = get_relational_engine() @@ -26,6 +27,10 @@ async def create_user( is_verified = is_verified, ) ) + + if auto_login: + await session.refresh(user) + return user print(f"User created: {user.email}") except UserAlreadyExists as error: diff --git a/cognee/modules/users/methods/get_default_user.py b/cognee/modules/users/methods/get_default_user.py index 8ab330d4c..72faef3c8 100644 --- a/cognee/modules/users/methods/get_default_user.py +++ b/cognee/modules/users/methods/get_default_user.py @@ -1,12 +1,16 @@ from sqlalchemy.orm import joinedload - +from sqlalchemy.future import select from cognee.modules.users.models import User from cognee.infrastructure.databases.relational import get_relational_engine -from sqlalchemy.future import select +async def get_default_user(): + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + query = select(User).options(joinedload(User.groups))\ + .where(User.email == "default_user@example.com") + + result = await session.execute(query) + user = result.scalars().first() -async def get_default_user(session): - stmt = select(User).options(joinedload(User.groups)).where(User.email == "default_user@example.com") - result = await session.execute(stmt) - user = result.scalars().first() - return user \ No newline at end of file + return user diff --git a/cognee/modules/users/permissions/methods/check_permissions_on_documents.py b/cognee/modules/users/permissions/methods/check_permissions_on_documents.py index 3f2724ac8..4464f0f24 100644 --- a/cognee/modules/users/permissions/methods/check_permissions_on_documents.py +++ b/cognee/modules/users/permissions/methods/check_permissions_on_documents.py @@ -1,8 +1,8 @@ import logging - from sqlalchemy import select - +from sqlalchemy.orm import joinedload from cognee.infrastructure.databases.relational import get_relational_engine + from ...models.User import User from ...models.ACL import ACL @@ -14,24 +14,26 @@ def __init__(self, message: str): super().__init__(self.message) -async def check_permissions_on_documents(user: User, permission_type: str, document_ids: list[str], session): - - logging.info("This is the user: %s", user.__dict__) +async def check_permissions_on_documents(user: User, permission_type: str, document_ids: list[str]): try: user_group_ids = [group.id for group in user.groups] - acls = await session.execute( - select(ACL) - .join(ACL.permission) - .where(ACL.principal_id.in_([user.id, *user_group_ids])) - .where(ACL.permission.has(name=permission_type)) - ) - resource_ids = [resource.resource_id for acl in acls.scalars().all() for resource in acl.resources] - has_permissions = all(document_id in resource_ids for document_id in document_ids) - - if not has_permissions: - raise PermissionDeniedException(f"User {user.username} does not have {permission_type} permission on documents") + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + result = await session.execute( + select(ACL) + .join(ACL.permission) + .options(joinedload(ACL.resources)) + .where(ACL.principal_id.in_([user.id, *user_group_ids])) + .where(ACL.permission.has(name = permission_type)) + ) + acls = result.unique().scalars().all() + resource_ids = [str(resource.resource_id) for acl in acls for resource in acl.resources] + has_permissions = all(document_id in resource_ids for document_id in document_ids) + + if not has_permissions: + raise PermissionDeniedException(f"User {user.username} does not have {permission_type} permission on documents") except Exception as error: logger.error("Error checking permissions on documents: %s", str(error)) raise - diff --git a/cognee/modules/users/permissions/methods/get_document_ids_for_user.py b/cognee/modules/users/permissions/methods/get_document_ids_for_user.py index 31f031d75..47f33b2c7 100644 --- a/cognee/modules/users/permissions/methods/get_document_ids_for_user.py +++ b/cognee/modules/users/permissions/methods/get_document_ids_for_user.py @@ -16,7 +16,7 @@ async def get_document_ids_for_user(user_id: UUID) -> list[str]: ACL.permission.name == "read", ) ) - document_ids = [row[0] for row in result.fetchall()] + document_ids = [row[0] for row in result.scalars().all()] return document_ids diff --git a/cognee/modules/users/permissions/methods/give_permission_on_document.py b/cognee/modules/users/permissions/methods/give_permission_on_document.py index f4dabfd6b..65516f7cd 100644 --- a/cognee/modules/users/permissions/methods/give_permission_on_document.py +++ b/cognee/modules/users/permissions/methods/give_permission_on_document.py @@ -12,18 +12,18 @@ async def give_permission_on_document( document_resource = Resource(resource_id = document_id) async with db_engine.get_async_session() as session: - permission = (await session.execute(select(Permission).filter(Permission.name == permission_name))).first() + permission = (await session.execute(select(Permission).filter(Permission.name == permission_name))).scalars().first() if permission is None: permission = Permission(name = permission_name) - acl = ACL(principal_id = user.id) - acl.permission = permission - acl.resources.append(document_resource) + acl = ACL(principal_id = user.id) + acl.permission = permission + acl.resources.append(document_resource) - session.add(acl) + session.add(acl) - await session.commit() + await session.commit() # if user.is_superuser: diff --git a/cognee/tests/test_library.py b/cognee/tests/test_library.py index d00c2b360..1480020d5 100755 --- a/cognee/tests/test_library.py +++ b/cognee/tests/test_library.py @@ -27,13 +27,13 @@ async def main(): # In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited. # """ - text = """A large language model (LLM) is a language model notable for its ability to achieve general-purpose language generation and other natural language processing tasks such as classification. LLMs acquire these abilities by learning statistical relationships from text documents during a computationally intensive self-supervised and semi-supervised training process. LLMs can be used for text generation, a form of generative AI, by taking an input text and repeatedly predicting the next token or word. - LLMs are artificial neural networks. The largest and most capable, as of March 2024, are built with a decoder-only transformer-based architecture while some recent implementations are based on other architectures, such as recurrent neural network variants and Mamba (a state space model). - Up to 2020, fine tuning was the only way a model could be adapted to be able to accomplish specific tasks. Larger sized models, such as GPT-3, however, can be prompt-engineered to achieve similar results.[6] They are thought to acquire knowledge about syntax, semantics and "ontology" inherent in human language corpora, but also inaccuracies and biases present in the corpora. - Some notable LLMs are OpenAI's GPT series of models (e.g., GPT-3.5 and GPT-4, used in ChatGPT and Microsoft Copilot), Google's PaLM and Gemini (the latter of which is currently used in the chatbot of the same name), xAI's Grok, Meta's LLaMA family of open-source models, Anthropic's Claude models, Mistral AI's open source models, and Databricks' open source DBRX. - """ + # text = """A large language model (LLM) is a language model notable for its ability to achieve general-purpose language generation and other natural language processing tasks such as classification. LLMs acquire these abilities by learning statistical relationships from text documents during a computationally intensive self-supervised and semi-supervised training process. LLMs can be used for text generation, a form of generative AI, by taking an input text and repeatedly predicting the next token or word. + # LLMs are artificial neural networks. The largest and most capable, as of March 2024, are built with a decoder-only transformer-based architecture while some recent implementations are based on other architectures, such as recurrent neural network variants and Mamba (a state space model). + # Up to 2020, fine tuning was the only way a model could be adapted to be able to accomplish specific tasks. Larger sized models, such as GPT-3, however, can be prompt-engineered to achieve similar results.[6] They are thought to acquire knowledge about syntax, semantics and "ontology" inherent in human language corpora, but also inaccuracies and biases present in the corpora. + # Some notable LLMs are OpenAI's GPT series of models (e.g., GPT-3.5 and GPT-4, used in ChatGPT and Microsoft Copilot), Google's PaLM and Gemini (the latter of which is currently used in the chatbot of the same name), xAI's Grok, Meta's LLaMA family of open-source models, Anthropic's Claude models, Mistral AI's open source models, and Databricks' open source DBRX. + # """ - await cognee.add([text], dataset_name) + # await cognee.add([text], dataset_name) await cognee.cognify([dataset_name]) @@ -42,27 +42,27 @@ async def main(): random_node = (await vector_engine.search("entities", "AI"))[0] random_node_name = random_node.payload["name"] - search_results = await cognee.search("SIMILARITY", { "query": random_node_name }) + search_results = await cognee.search("SIMILARITY", params = { "query": random_node_name }) assert len(search_results) != 0, "The search results list is empty." print("\n\nExtracted sentences are:\n") for result in search_results: print(f"{result}\n") - search_results = await cognee.search("TRAVERSE", { "query": random_node_name }) + search_results = await cognee.search("TRAVERSE", params = { "query": random_node_name }) assert len(search_results) != 0, "The search results list is empty." print("\n\nExtracted sentences are:\n") for result in search_results: print(f"{result}\n") - search_results = await cognee.search("SUMMARY", { "query": random_node_name }) + search_results = await cognee.search("SUMMARY", params = { "query": random_node_name }) assert len(search_results) != 0, "Query related summaries don't exist." print("\n\nQuery related summaries exist:\n") for result in search_results: print(f"{result}\n") - search_results = await cognee.search("ADJACENT", { "query": random_node_name }) + search_results = await cognee.search("ADJACENT", params = { "query": random_node_name }) assert len(search_results) != 0, "Large language model query found no neighbours." - print("\n\Large language model query found neighbours.\n") + print("\n\nLarge language model query found neighbours.\n") for result in search_results: print(f"{result}\n")