From 65a0c98455fdcb7df0852955fcdafc61cae45448 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Fri, 17 Jan 2025 10:20:57 +0100 Subject: [PATCH] COG-989 feat: make tasks a configurable argument in the cognify function (#442) * feat: make tasks a configurable argument in the cognify function * fix: add data points task --------- Co-authored-by: hajdul88 <52442977+hajdul88@users.noreply.github.com> --- cognee/api/v1/cognify/cognify_v2.py | 58 +++++++++++++++++++---------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 680c05828..738f77c52 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -36,6 +36,7 @@ async def cognify( datasets: Union[str, list[str]] = None, user: User = None, graph_model: BaseModel = KnowledgeGraph, + tasks: list[Task] = None, ): if user is None: user = await get_default_user() @@ -55,18 +56,19 @@ async def cognify( awaitables = [] + if tasks is None: + tasks = await get_default_tasks(user, graph_model) + for dataset in datasets: dataset_name = generate_dataset_name(dataset.name) if dataset_name in existing_datasets_map: - awaitables.append(run_cognify_pipeline(dataset, user, graph_model)) + awaitables.append(run_cognify_pipeline(dataset, user, tasks)) return await asyncio.gather(*awaitables) -async def run_cognify_pipeline( - dataset: Dataset, user: User, graph_model: BaseModel = KnowledgeGraph -): +async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]): data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id) document_ids_str = [str(document.id) for document in data_documents] @@ -96,22 +98,12 @@ async def run_cognify_pipeline( ) try: - cognee_config = get_cognify_config() + if not isinstance(tasks, list): + raise ValueError("Tasks must be a list") - tasks = [ - Task(classify_documents), - Task(check_permissions_on_documents, user=user, permissions=["write"]), - Task(extract_chunks_from_documents), # Extract text chunks based on the document type. - Task( - extract_graph_from_data, graph_model=graph_model, task_config={"batch_size": 10} - ), # Generate knowledge graphs from the document chunks. - Task( - summarize_text, - summarization_model=cognee_config.summarization_model, - task_config={"batch_size": 10}, - ), - Task(add_data_points, only_root=True, task_config={"batch_size": 10}), - ] + for task in tasks: + if not isinstance(task, Task): + raise ValueError(f"Task {task} is not an instance of Task") pipeline = run_tasks(tasks, data_documents, "cognify_pipeline") @@ -146,3 +138,31 @@ async def run_cognify_pipeline( def generate_dataset_name(dataset_name: str) -> str: return dataset_name.replace(".", "_").replace(" ", "_") + + +async def get_default_tasks( + user: User = None, graph_model: BaseModel = KnowledgeGraph +) -> list[Task]: + if user is None: + user = await get_default_user() + + try: + cognee_config = get_cognify_config() + default_tasks = [ + Task(classify_documents), + Task(check_permissions_on_documents, user=user, permissions=["write"]), + Task(extract_chunks_from_documents), # Extract text chunks based on the document type. + Task( + extract_graph_from_data, graph_model=graph_model, task_config={"batch_size": 10} + ), # Generate knowledge graphs from the document chunks. + Task( + summarize_text, + summarization_model=cognee_config.summarization_model, + task_config={"batch_size": 10}, + ), + Task(add_data_points, only_root=True, task_config={"batch_size": 10}), + ] + except Exception as error: + send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id) + raise error + return default_tasks