Skip to content

Commit

Permalink
Merge branch 'dev' into COG-475-local-file-endpoint-deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
dexters1 authored Dec 20, 2024
2 parents 6cb7fef + de2394c commit 7232b04
Show file tree
Hide file tree
Showing 7 changed files with 847 additions and 154 deletions.
145 changes: 35 additions & 110 deletions cognee/api/v1/cognify/code_graph_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,38 @@
# NOTICE: This module contains deprecated functions.
# Use only the run_code_graph_pipeline function; all other functions are deprecated.
# Related issue: COG-906

import asyncio
import logging
from pathlib import Path
from typing import Union

from cognee.modules.data.methods import get_datasets, get_datasets_by_name
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.models import Data, Dataset
from cognee.base_config import get_base_config
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import \
get_pipeline_status
from cognee.modules.pipelines.operations.log_pipeline_status import \
log_pipeline_status
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.shared.SourceCodeGraph import SourceCodeGraph
from cognee.shared.utils import send_telemetry
from cognee.tasks.documents import (check_permissions_on_documents,
classify_documents,
from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
from cognee.tasks.documents import (classify_documents,
extract_chunks_from_documents)
from cognee.tasks.graph import extract_graph_from_code
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.ingestion import ingest_data_with_metadata
from cognee.tasks.repo_processor import (enrich_dependency_graph,
expand_dependency_graph,
get_data_list_for_user,
get_non_code_files,
get_repo_file_dependencies)
from cognee.tasks.storage import add_data_points

from cognee.base_config import get_base_config
from cognee.shared.data_models import MonitoringTool

monitoring = get_base_config().monitoring_tool
if monitoring == MonitoringTool.LANGFUSE:
from langfuse.decorators import observe

from cognee.tasks.summarization import summarize_code

from cognee.tasks.summarization import summarize_code, summarize_text

logger = logging.getLogger("code_graph_pipeline")

update_status_lock = asyncio.Lock()

async def code_graph_pipeline(datasets: Union[str, list[str]] = None, user: User = None):
if user is None:
user = await get_default_user()

existing_datasets = await get_datasets(user.id)

if datasets is None or len(datasets) == 0:
# If no datasets are provided, cognify all existing datasets.
datasets = existing_datasets

if type(datasets[0]) == str:
datasets = await get_datasets_by_name(datasets, user.id)

existing_datasets_map = {
generate_dataset_name(dataset.name): True for dataset in existing_datasets
}

awaitables = []

for dataset in datasets:
dataset_name = generate_dataset_name(dataset.name)

if dataset_name in existing_datasets_map:
awaitables.append(run_pipeline(dataset, user))

return await asyncio.gather(*awaitables)

@observe
async def run_pipeline(dataset: Dataset, user: User):
'''DEPRECATED: Use `run_code_graph_pipeline` instead. This function will be removed.'''
data_documents: list[Data] = await get_dataset_data(dataset_id = dataset.id)

document_ids_str = [str(document.id) for document in data_documents]

dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)

send_telemetry("code_graph_pipeline EXECUTION STARTED", user.id)

async with update_status_lock:
task_status = await get_pipeline_status([dataset_id])

if dataset_id in task_status and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED:
logger.info("Dataset %s is already being processed.", dataset_name)
return

await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_STARTED, {
"dataset_name": dataset_name,
"files": document_ids_str,
})
try:
tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user = user, permissions = ["write"]),
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(add_data_points, task_config = { "batch_size": 10 }),
Task(extract_graph_from_code, graph_model = SourceCodeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks.
]

pipeline = run_tasks(tasks, data_documents, "code_graph_pipeline")

async for result in pipeline:
print(result)

send_telemetry("code_graph_pipeline EXECUTION COMPLETED", user.id)

await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_COMPLETED, {
"dataset_name": dataset_name,
"files": document_ids_str,
})
except Exception as error:
send_telemetry("code_graph_pipeline EXECUTION ERRORED", user.id)

await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_ERRORED, {
"dataset_name": dataset_name,
"files": document_ids_str,
})
raise error


def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_")


async def run_code_graph_pipeline(repo_path):
async def run_code_graph_pipeline(repo_path, include_docs=True):
import os
import pathlib

import cognee
from cognee.infrastructure.databases.relational import create_db_and_tables

Expand All @@ -144,6 +46,9 @@ async def run_code_graph_pipeline(repo_path):
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()

cognee_config = get_cognify_config()
user = await get_default_user()

tasks = [
Task(get_repo_file_dependencies),
Task(enrich_dependency_graph, task_config={"batch_size": 50}),
Expand All @@ -152,4 +57,24 @@ async def run_code_graph_pipeline(repo_path):
Task(add_data_points, task_config={"batch_size": 50}),
]

return run_tasks(tasks, repo_path, "cognify_code_pipeline")
if include_docs:
non_code_tasks = [
Task(get_non_code_files, task_config={"batch_size": 50}),
Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user),
Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
Task(classify_documents),
Task(extract_chunks_from_documents),
Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 50}),
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 50}
),
]

if include_docs:
async for result in run_tasks(non_code_tasks, repo_path):
yield result

async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"):
yield result
1 change: 1 addition & 0 deletions cognee/tasks/repo_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

from .enrich_dependency_graph import enrich_dependency_graph
from .expand_dependency_graph import expand_dependency_graph
from .get_non_code_files import get_data_list_for_user, get_non_py_files
from .get_repo_file_dependencies import get_repo_file_dependencies
48 changes: 48 additions & 0 deletions cognee/tasks/repo_processor/get_non_code_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os

import aiofiles

import cognee.modules.ingestion as ingestion
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.methods import get_datasets
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.methods.get_datasets_by_name import \
get_datasets_by_name
from cognee.modules.data.models import Data
from cognee.modules.data.operations.write_metadata import write_metadata
from cognee.modules.ingestion.data_types import BinaryData
from cognee.modules.users.methods import get_default_user
from cognee.shared.CodeGraphEntities import Repository


async def get_non_py_files(repo_path):
"""Get files that are not .py files and their contents"""
if not os.path.exists(repo_path):
return {}

IGNORED_PATTERNS = {
'.git', '__pycache__', '*.pyc', '*.pyo', '*.pyd',
'node_modules', '*.egg-info'
}

def should_process(path):
return not any(pattern in path for pattern in IGNORED_PATTERNS)

non_py_files_paths = [
os.path.join(root, file)
for root, _, files in os.walk(repo_path) for file in files
if not file.endswith(".py") and should_process(os.path.join(root, file))
]
return non_py_files_paths


async def get_data_list_for_user(_, dataset_name, user):
# Note: This method is meant to be used as a Task in a pipeline.
# By the nature of pipelines, the output of the previous Task will be passed as the first argument here,
# but it is not needed here, hence the "_" input.
datasets = await get_datasets_by_name(dataset_name, user.id)
data_documents: list[Data] = []
for dataset in datasets:
data_docs: list[Data] = await get_dataset_data(dataset_id=dataset.id)
data_documents.extend(data_docs)
return data_documents
38 changes: 0 additions & 38 deletions cognee/tests/test_code_generation.py

This file was deleted.

11 changes: 6 additions & 5 deletions examples/python/code_graph_example.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import argparse
import asyncio

from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline


async def main(repo_path):
async for result in await run_code_graph_pipeline(repo_path):
async def main(repo_path, include_docs):
async for result in run_code_graph_pipeline(repo_path, include_docs):
print(result)

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--repo-path", type=str, required=True, help="Path to the repository")
parser.add_argument("--repo_path", type=str, required=True, help="Path to the repository")
parser.add_argument("--include_docs", type=bool, default=True, help="Whether or not to process non-code files")
args = parser.parse_args()
asyncio.run(main(args.repo_path))

asyncio.run(main(args.repo_path, args.include_docs))
48 changes: 47 additions & 1 deletion notebooks/cognee_code_graph_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,58 @@
"await render_graph(None, include_nodes = True, include_labels = True)"
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "# Let's check the evaluations"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"from evals.eval_on_hotpot import eval_on_hotpotQA\n",
"from evals.eval_on_hotpot import answer_with_cognee\n",
"from evals.eval_on_hotpot import answer_without_cognee\n",
"from evals.eval_on_hotpot import eval_answers\n",
"from cognee.base_config import get_base_config\n",
"from pathlib import Path\n",
"from tqdm import tqdm\n",
"import wget\n",
"import json\n",
"import statistics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
"source": [
"answer_provider = answer_with_cognee # For native LLM answers use answer_without_cognee\n",
"num_samples = 10 # With cognee, it takes ~1m10s per sample\n",
"\n",
"base_config = get_base_config()\n",
"data_root_dir = base_config.data_root_directory\n",
"\n",
"if not Path(data_root_dir).exists():\n",
" Path(data_root_dir).mkdir()\n",
"\n",
"filepath = data_root_dir / Path(\"hotpot_dev_fullwiki_v1.json\")\n",
"if not filepath.exists():\n",
" url = 'http://curtis.ml.cmu.edu/datasets/hotpot/hotpot_dev_fullwiki_v1.json'\n",
" wget.download(url, out=data_root_dir)\n",
"\n",
"with open(filepath, \"r\") as file:\n",
" dataset = json.load(file)\n",
"\n",
"instances = dataset if not num_samples else dataset[:num_samples]\n",
"answers = []\n",
"for instance in tqdm(instances, desc=\"Getting answers\"):\n",
" answer = answer_provider(instance)\n",
" answers.append(answer)"
]
}
],
"metadata": {
Expand Down
Loading

0 comments on commit 7232b04

Please sign in to comment.