Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into feat/cognee-mcp
Browse files Browse the repository at this point in the history
  • Loading branch information
borisarzentar committed Dec 23, 2024
2 parents 75ffe25 + b73a35f commit 29de5a3
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 100 deletions.
87 changes: 4 additions & 83 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,93 +193,14 @@ if __name__ == '__main__':
When you run this script, you will see step-by-step messages in the console that help you trace the execution flow and understand what the script is doing at each stage.
A version of this example is here: `examples/python/simple_example.py`

### Create your own memory store
### Understand our architecture

cognee framework consists of tasks that can be grouped into pipelines.
Each task can be an independent part of business logic, that can be tied to other tasks to form a pipeline.
These tasks persist data into your memory store enabling you to search for relevant context of past conversations, documents, or any other data you have stored.


### Example: Classify your documents

Here is an example of how it looks for a default cognify pipeline:

1. To prepare the data for the pipeline run, first we need to add it to our metastore and normalize it:

Start with:
```
text = """Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval"""
await cognee.add(text) # Add a new piece of information
```

2. In the next step we make a task. The task can be any business logic we need, but the important part is that it should be encapsulated in one function.

Here we show an example of creating a naive LLM classifier that takes a Pydantic model and then stores the data in both the graph and vector stores after analyzing each chunk.
We provided just a snippet for reference, but feel free to check out the implementation in our repo.

```
async def chunk_naive_llm_classifier(
data_chunks: list[DocumentChunk],
classification_model: Type[BaseModel]
):
# Extract classifications asynchronously
chunk_classifications = await asyncio.gather(
*(extract_categories(chunk.text, classification_model) for chunk in data_chunks)
)
# Collect classification data points using a set to avoid duplicates
classification_data_points = {
uuid5(NAMESPACE_OID, cls.label.type)
for cls in chunk_classifications
} | {
uuid5(NAMESPACE_OID, subclass.value)
for cls in chunk_classifications
for subclass in cls.label.subclass
}
vector_engine = get_vector_engine()
collection_name = "classification"
# Define the payload schema
class Keyword(BaseModel):
uuid: str
text: str
chunk_id: str
document_id: str
# Ensure the collection exists and retrieve existing data points
if not await vector_engine.has_collection(collection_name):
await vector_engine.create_collection(collection_name, payload_schema=Keyword)
existing_points_map = {}
else:
existing_points_map = {}
return data_chunks
...
```

We have many tasks that can be used in your pipelines, and you can also create your tasks to fit your business logic.


3. Once we have our tasks, it is time to group them into a pipeline.
This simplified snippet demonstrates how tasks can be added to a pipeline, and how they can pass the information forward from one to another.

```
Task(
chunk_naive_llm_classifier,
classification_model = cognee_config.classification_model,
)
pipeline = run_tasks(tasks, documents)
```

To see the working code, check cognee.api.v1.cognify default pipeline in our repo.
<div align="center">
<img src="assets/cognee_diagram.png" alt="cognee concept diagram" width="50%" />
</div>


## Vector retrieval, Graphs and LLMs
Expand Down
Binary file removed assets/architecture.png
Binary file not shown.
Binary file added assets/cognee_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
43 changes: 37 additions & 6 deletions cognee/api/v1/cognify/code_graph_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,37 @@
import logging
from pathlib import Path

from cognee.base_config import get_base_config
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
from cognee.tasks.documents import (classify_documents,
extract_chunks_from_documents)
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.ingestion import ingest_data_with_metadata
from cognee.tasks.repo_processor import (enrich_dependency_graph,
expand_dependency_graph,
get_data_list_for_user,
get_non_code_files,
get_repo_file_dependencies)
from cognee.tasks.storage import add_data_points

from cognee.base_config import get_base_config
from cognee.shared.data_models import MonitoringTool

monitoring = get_base_config().monitoring_tool
if monitoring == MonitoringTool.LANGFUSE:
from langfuse.decorators import observe

from cognee.tasks.summarization import summarize_code
from cognee.tasks.summarization import summarize_code, summarize_text

logger = logging.getLogger("code_graph_pipeline")
update_status_lock = asyncio.Lock()

@observe
async def run_code_graph_pipeline(repo_path):
async def run_code_graph_pipeline(repo_path, include_docs=True):
import os
import pathlib

import cognee
from cognee.infrastructure.databases.relational import create_db_and_tables

Expand All @@ -38,6 +46,9 @@ async def run_code_graph_pipeline(repo_path):
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()

cognee_config = get_cognify_config()
user = await get_default_user()

tasks = [
Task(get_repo_file_dependencies),
Task(enrich_dependency_graph, task_config={"batch_size": 50}),
Expand All @@ -46,4 +57,24 @@ async def run_code_graph_pipeline(repo_path):
Task(add_data_points, task_config={"batch_size": 50}),
]

return run_tasks(tasks, repo_path, "cognify_code_pipeline")
if include_docs:
non_code_tasks = [
Task(get_non_code_files, task_config={"batch_size": 50}),
Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user),
Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
Task(classify_documents),
Task(extract_chunks_from_documents),
Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 50}),
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 50}
),
]

if include_docs:
async for result in run_tasks(non_code_tasks, repo_path):
yield result

async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"):
yield result
5 changes: 3 additions & 2 deletions cognee/infrastructure/llm/openai/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.base_config import get_base_config

if MonitoringTool.LANGFUSE:
monitoring = get_base_config().monitoring_tool
if monitoring == MonitoringTool.LANGFUSE:
from langfuse.decorators import observe

class OpenAIAdapter(LLMInterface):
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(
base_config = get_base_config()


@observe()
@observe(as_type='generation')
async def acreate_structured_output(self, text_input: str, system_prompt: str,
response_model: Type[BaseModel]) -> BaseModel:

Expand Down
18 changes: 14 additions & 4 deletions cognee/modules/data/extraction/extract_summary.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
from typing import Type
import logging
import os
from typing import Type

from instructor.exceptions import InstructorRetryException
from pydantic import BaseModel
from tenacity import RetryError

from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.shared.data_models import SummarizedCode, SummarizedClass, SummarizedFunction
from cognee.shared.data_models import SummarizedCode
from cognee.tasks.summarization.mock_summary import get_mock_summarized_code

logger = logging.getLogger("extract_summary")

async def extract_summary(content: str, response_model: Type[BaseModel]):
llm_client = get_llm_client()

system_prompt = read_query_prompt("summarize_content.txt")

llm_output = await llm_client.acreate_structured_output(content, system_prompt, response_model)

return llm_output

async def extract_code_summary(content: str):
Expand All @@ -27,5 +32,10 @@ async def extract_code_summary(content: str):
result = get_mock_summarized_code()
return result
else:
result = await extract_summary(content, response_model=SummarizedCode)
try:
result = await extract_summary(content, response_model=SummarizedCode)
except (RetryError, InstructorRetryException) as e:
logger.error("Failed to extract code summary, falling back to mock summary", exc_info=e)
result = get_mock_summarized_code()

return result
1 change: 1 addition & 0 deletions cognee/tasks/repo_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

from .enrich_dependency_graph import enrich_dependency_graph
from .expand_dependency_graph import expand_dependency_graph
from .get_non_code_files import get_data_list_for_user, get_non_py_files
from .get_repo_file_dependencies import get_repo_file_dependencies
48 changes: 48 additions & 0 deletions cognee/tasks/repo_processor/get_non_code_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os

import aiofiles

import cognee.modules.ingestion as ingestion
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.methods import get_datasets
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.methods.get_datasets_by_name import \
get_datasets_by_name
from cognee.modules.data.models import Data
from cognee.modules.data.operations.write_metadata import write_metadata
from cognee.modules.ingestion.data_types import BinaryData
from cognee.modules.users.methods import get_default_user
from cognee.shared.CodeGraphEntities import Repository


async def get_non_py_files(repo_path):
"""Get files that are not .py files and their contents"""
if not os.path.exists(repo_path):
return {}

IGNORED_PATTERNS = {
'.git', '__pycache__', '*.pyc', '*.pyo', '*.pyd',
'node_modules', '*.egg-info'
}

def should_process(path):
return not any(pattern in path for pattern in IGNORED_PATTERNS)

non_py_files_paths = [
os.path.join(root, file)
for root, _, files in os.walk(repo_path) for file in files
if not file.endswith(".py") and should_process(os.path.join(root, file))
]
return non_py_files_paths


async def get_data_list_for_user(_, dataset_name, user):
# Note: This method is meant to be used as a Task in a pipeline.
# By the nature of pipelines, the output of the previous Task will be passed as the first argument here,
# but it is not needed here, hence the "_" input.
datasets = await get_datasets_by_name(dataset_name, user.id)
data_documents: list[Data] = []
for dataset in datasets:
data_docs: list[Data] = await get_dataset_data(dataset_id=dataset.id)
data_documents.extend(data_docs)
return data_documents
11 changes: 6 additions & 5 deletions examples/python/code_graph_example.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import argparse
import asyncio

from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline


async def main(repo_path):
async for result in await run_code_graph_pipeline(repo_path):
async def main(repo_path, include_docs):
async for result in run_code_graph_pipeline(repo_path, include_docs):
print(result)

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--repo-path", type=str, required=True, help="Path to the repository")
parser.add_argument("--repo_path", type=str, required=True, help="Path to the repository")
parser.add_argument("--include_docs", type=bool, default=True, help="Whether or not to process non-code files")
args = parser.parse_args()
asyncio.run(main(args.repo_path))

asyncio.run(main(args.repo_path, args.include_docs))

0 comments on commit 29de5a3

Please sign in to comment.