Skip to content

Commit

Permalink
Merge branch 'dev' into ruff-version
Browse files Browse the repository at this point in the history
  • Loading branch information
dexters1 authored Jan 17, 2025
2 parents 89b23b8 + 65a0c98 commit be2aa99
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions cognee/api/v1/cognify/cognify_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def cognify(
datasets: Union[str, list[str]] = None,
user: User = None,
graph_model: BaseModel = KnowledgeGraph,
tasks: list[Task] = None,
):
if user is None:
user = await get_default_user()
Expand All @@ -55,18 +56,19 @@ async def cognify(

awaitables = []

if tasks is None:
tasks = await get_default_tasks(user, graph_model)

for dataset in datasets:
dataset_name = generate_dataset_name(dataset.name)

if dataset_name in existing_datasets_map:
awaitables.append(run_cognify_pipeline(dataset, user, graph_model))
awaitables.append(run_cognify_pipeline(dataset, user, tasks))

return await asyncio.gather(*awaitables)


async def run_cognify_pipeline(
dataset: Dataset, user: User, graph_model: BaseModel = KnowledgeGraph
):
async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id)

document_ids_str = [str(document.id) for document in data_documents]
Expand Down Expand Up @@ -96,22 +98,12 @@ async def run_cognify_pipeline(
)

try:
cognee_config = get_cognify_config()
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")

tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(
extract_graph_from_data, graph_model=graph_model, task_config={"batch_size": 10}
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 10},
),
Task(add_data_points, only_root=True, task_config={"batch_size": 10}),
]
for task in tasks:
if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task")

pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")

Expand Down Expand Up @@ -146,3 +138,31 @@ async def run_cognify_pipeline(

def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_")


async def get_default_tasks(
user: User = None, graph_model: BaseModel = KnowledgeGraph
) -> list[Task]:
if user is None:
user = await get_default_user()

try:
cognee_config = get_cognify_config()
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(
extract_graph_from_data, graph_model=graph_model, task_config={"batch_size": 10}
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 10},
),
Task(add_data_points, only_root=True, task_config={"batch_size": 10}),
]
except Exception as error:
send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id)
raise error
return default_tasks

0 comments on commit be2aa99

Please sign in to comment.