Skip to content

Commit

Permalink
Merge pull request #414 from topoteretes/COG-949
Browse files Browse the repository at this point in the history
Code graph pipeline improvements and fixes
  • Loading branch information
Vasilije1990 authored Jan 10, 2025
2 parents 892666c + 5839ab0 commit f7e808e
Show file tree
Hide file tree
Showing 18 changed files with 255 additions and 54 deletions.
11 changes: 2 additions & 9 deletions cognee/api/v1/cognify/code_graph_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path

from cognee.base_config import get_base_config
from cognee.infrastructure.databases.vector.embeddings import get_embedding_engine
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.Task import Task
Expand Down Expand Up @@ -54,20 +53,14 @@ async def run_code_graph_pipeline(repo_path, include_docs=True):
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()

embedding_engine = get_embedding_engine()

cognee_config = get_cognify_config()
user = await get_default_user()

tasks = [
Task(get_repo_file_dependencies),
Task(enrich_dependency_graph),
Task(expand_dependency_graph, task_config={"batch_size": 50}),
Task(
get_source_code_chunks,
embedding_model=embedding_engine.model,
task_config={"batch_size": 50},
),
Task(get_source_code_chunks, task_config={"batch_size": 50}),
Task(summarize_code, task_config={"batch_size": 50}),
Task(add_data_points, task_config={"batch_size": 50}),
]
Expand All @@ -78,7 +71,7 @@ async def run_code_graph_pipeline(repo_path, include_docs=True):
Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user),
Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
Task(classify_documents),
Task(extract_chunks_from_documents),
Task(extract_chunks_from_documents, max_tokens=cognee_config.max_tokens),
Task(
extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 50}
),
Expand Down
24 changes: 20 additions & 4 deletions cognee/modules/chunking/TextChunker.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,47 @@
from uuid import uuid5, NAMESPACE_OID
from typing import Optional
from uuid import NAMESPACE_OID, uuid5

from .models.DocumentChunk import DocumentChunk
from cognee.tasks.chunks import chunk_by_paragraph

from .models.DocumentChunk import DocumentChunk


class TextChunker:
document = None
max_chunk_size: int

chunk_index = 0
chunk_size = 0
token_count = 0

def __init__(self, document, get_text: callable, chunk_size: int = 1024):
def __init__(
self, document, get_text: callable, max_tokens: Optional[int] = None, chunk_size: int = 1024
):
self.document = document
self.max_chunk_size = chunk_size
self.get_text = get_text
self.max_tokens = max_tokens if max_tokens else float("inf")

def check_word_count_and_token_count(self, word_count_before, token_count_before, chunk_data):
word_count_fits = word_count_before + chunk_data["word_count"] <= self.max_chunk_size
token_count_fits = token_count_before + chunk_data["token_count"] <= self.max_tokens
return word_count_fits and token_count_fits

def read(self):
paragraph_chunks = []
for content_text in self.get_text():
for chunk_data in chunk_by_paragraph(
content_text,
self.max_tokens,
self.max_chunk_size,
batch_paragraphs=True,
):
if self.chunk_size + chunk_data["word_count"] <= self.max_chunk_size:
if self.check_word_count_and_token_count(
self.chunk_size, self.token_count, chunk_data
):
paragraph_chunks.append(chunk_data)
self.chunk_size += chunk_data["word_count"]
self.token_count += chunk_data["token_count"]
else:
if len(paragraph_chunks) == 0:
yield DocumentChunk(
Expand Down Expand Up @@ -66,6 +81,7 @@ def read(self):
print(e)
paragraph_chunks = [chunk_data]
self.chunk_size = chunk_data["word_count"]
self.token_count = chunk_data["token_count"]

self.chunk_index += 1

Expand Down
4 changes: 3 additions & 1 deletion cognee/modules/cognify/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.shared.data_models import DefaultContentPrediction, SummarizedContent
from typing import Optional
import os


class CognifyConfig(BaseSettings):
classification_model: object = DefaultContentPrediction
summarization_model: object = SummarizedContent

max_tokens: Optional[int] = os.getenv("MAX_TOKENS")
model_config = SettingsConfigDict(env_file=".env", extra="allow")

def to_dict(self) -> dict:
Expand Down
11 changes: 8 additions & 3 deletions cognee/modules/data/processing/document_types/AudioDocument.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Optional

from cognee.infrastructure.llm.get_llm_client import get_llm_client
from .Document import Document

from .ChunkerMapping import ChunkerConfig
from .Document import Document


class AudioDocument(Document):
Expand All @@ -10,12 +13,14 @@ def create_transcript(self):
result = get_llm_client().create_transcript(self.raw_data_location)
return result.text

def read(self, chunk_size: int, chunker: str):
def read(self, chunk_size: int, chunker: str, max_tokens: Optional[int] = None):
# Transcribe the audio file

text = self.create_transcript()

chunker_func = ChunkerConfig.get_chunker(chunker)
chunker = chunker_func(self, chunk_size=chunk_size, get_text=lambda: [text])
chunker = chunker_func(
self, chunk_size=chunk_size, get_text=lambda: [text], max_tokens=max_tokens
)

yield from chunker.read()
3 changes: 2 additions & 1 deletion cognee/modules/data/processing/document_types/Document.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Optional
from uuid import UUID

from cognee.infrastructure.engine import DataPoint
Expand All @@ -10,5 +11,5 @@ class Document(DataPoint):
mime_type: str
_metadata: dict = {"index_fields": ["name"], "type": "Document"}

def read(self, chunk_size: int, chunker=str) -> str:
def read(self, chunk_size: int, chunker=str, max_tokens: Optional[int] = None) -> str:
pass
11 changes: 8 additions & 3 deletions cognee/modules/data/processing/document_types/ImageDocument.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Optional

from cognee.infrastructure.llm.get_llm_client import get_llm_client
from .Document import Document

from .ChunkerMapping import ChunkerConfig
from .Document import Document


class ImageDocument(Document):
Expand All @@ -10,11 +13,13 @@ def transcribe_image(self):
result = get_llm_client().transcribe_image(self.raw_data_location)
return result.choices[0].message.content

def read(self, chunk_size: int, chunker: str):
def read(self, chunk_size: int, chunker: str, max_tokens: Optional[int] = None):
# Transcribe the image file
text = self.transcribe_image()

chunker_func = ChunkerConfig.get_chunker(chunker)
chunker = chunker_func(self, chunk_size=chunk_size, get_text=lambda: [text])
chunker = chunker_func(
self, chunk_size=chunk_size, get_text=lambda: [text], max_tokens=max_tokens
)

yield from chunker.read()
11 changes: 8 additions & 3 deletions cognee/modules/data/processing/document_types/PdfDocument.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import Optional

from pypdf import PdfReader
from .Document import Document

from .ChunkerMapping import ChunkerConfig
from .Document import Document


class PdfDocument(Document):
type: str = "pdf"

def read(self, chunk_size: int, chunker: str):
def read(self, chunk_size: int, chunker: str, max_tokens: Optional[int] = None):
file = PdfReader(self.raw_data_location)

def get_text():
Expand All @@ -15,7 +18,9 @@ def get_text():
yield page_text

chunker_func = ChunkerConfig.get_chunker(chunker)
chunker = chunker_func(self, chunk_size=chunk_size, get_text=get_text)
chunker = chunker_func(
self, chunk_size=chunk_size, get_text=get_text, max_tokens=max_tokens
)

yield from chunker.read()

Expand Down
10 changes: 7 additions & 3 deletions cognee/modules/data/processing/document_types/TextDocument.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from .Document import Document
from typing import Optional

from .ChunkerMapping import ChunkerConfig
from .Document import Document


class TextDocument(Document):
type: str = "text"

def read(self, chunk_size: int, chunker: str):
def read(self, chunk_size: int, chunker: str, max_tokens: Optional[int] = None):
def get_text():
with open(self.raw_data_location, mode="r", encoding="utf-8") as file:
while True:
Expand All @@ -18,6 +20,8 @@ def get_text():

chunker_func = ChunkerConfig.get_chunker(chunker)

chunker = chunker_func(self, chunk_size=chunk_size, get_text=get_text)
chunker = chunker_func(
self, chunk_size=chunk_size, get_text=get_text, max_tokens=max_tokens
)

yield from chunker.read()
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from io import StringIO
from typing import Optional

from cognee.modules.chunking.TextChunker import TextChunker
from .Document import Document
from cognee.modules.data.exceptions import UnstructuredLibraryImportError

from .Document import Document


class UnstructuredDocument(Document):
type: str = "unstructured"

def read(self, chunk_size: int):
def read(self, chunk_size: int, chunker: str, max_tokens: Optional[int] = None) -> str:
def get_text():
try:
from unstructured.partition.auto import partition
Expand All @@ -27,6 +29,6 @@ def get_text():

yield text

chunker = TextChunker(self, chunk_size=chunk_size, get_text=get_text)
chunker = TextChunker(self, chunk_size=chunk_size, get_text=get_text, max_tokens=max_tokens)

yield from chunker.read()
36 changes: 32 additions & 4 deletions cognee/tasks/chunks/chunk_by_paragraph.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from uuid import uuid5, NAMESPACE_OID
from typing import Dict, Any, Iterator
from typing import Any, Dict, Iterator, Optional, Union
from uuid import NAMESPACE_OID, uuid5

import tiktoken

from cognee.infrastructure.databases.vector import get_vector_engine

from .chunk_by_sentence import chunk_by_sentence


def chunk_by_paragraph(
data: str, paragraph_length: int = 1024, batch_paragraphs: bool = True
data: str,
max_tokens: Optional[Union[int, float]] = None,
paragraph_length: int = 1024,
batch_paragraphs: bool = True,
) -> Iterator[Dict[str, Any]]:
"""
Chunks text by paragraph while preserving exact text reconstruction capability.
Expand All @@ -15,16 +23,31 @@ def chunk_by_paragraph(
chunk_index = 0
paragraph_ids = []
last_cut_type = None
current_token_count = 0
if not max_tokens:
max_tokens = float("inf")

vector_engine = get_vector_engine()
embedding_model = vector_engine.embedding_engine.model
embedding_model = embedding_model.split("/")[-1]

for paragraph_id, sentence, word_count, end_type in chunk_by_sentence(
data, maximum_length=paragraph_length
):
# Check if this sentence would exceed length limit
if current_word_count > 0 and current_word_count + word_count > paragraph_length:

tokenizer = tiktoken.encoding_for_model(embedding_model)
token_count = len(tokenizer.encode(sentence))

if current_word_count > 0 and (
current_word_count + word_count > paragraph_length
or current_token_count + token_count > max_tokens
):
# Yield current chunk
chunk_dict = {
"text": current_chunk,
"word_count": current_word_count,
"token_count": current_token_count,
"chunk_id": uuid5(NAMESPACE_OID, current_chunk),
"paragraph_ids": paragraph_ids,
"chunk_index": chunk_index,
Expand All @@ -37,18 +60,21 @@ def chunk_by_paragraph(
paragraph_ids = []
current_chunk = ""
current_word_count = 0
current_token_count = 0
chunk_index += 1

paragraph_ids.append(paragraph_id)
current_chunk += sentence
current_word_count += word_count
current_token_count += token_count

# Handle end of paragraph
if end_type in ("paragraph_end", "sentence_cut") and not batch_paragraphs:
# For non-batch mode, yield each paragraph separately
chunk_dict = {
"text": current_chunk,
"word_count": current_word_count,
"token_count": current_token_count,
"paragraph_ids": paragraph_ids,
"chunk_id": uuid5(NAMESPACE_OID, current_chunk),
"chunk_index": chunk_index,
Expand All @@ -58,6 +84,7 @@ def chunk_by_paragraph(
paragraph_ids = []
current_chunk = ""
current_word_count = 0
current_token_count = 0
chunk_index += 1

last_cut_type = end_type
Expand All @@ -67,6 +94,7 @@ def chunk_by_paragraph(
chunk_dict = {
"text": current_chunk,
"word_count": current_word_count,
"token_count": current_token_count,
"chunk_id": uuid5(NAMESPACE_OID, current_chunk),
"paragraph_ids": paragraph_ids,
"chunk_index": chunk_index,
Expand Down
11 changes: 9 additions & 2 deletions cognee/tasks/documents/extract_chunks_from_documents.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from typing import Optional

from cognee.modules.data.processing.document_types.Document import Document


async def extract_chunks_from_documents(
documents: list[Document], chunk_size: int = 1024, chunker="text_chunker"
documents: list[Document],
chunk_size: int = 1024,
chunker="text_chunker",
max_tokens: Optional[int] = None,
):
for document in documents:
for document_chunk in document.read(chunk_size=chunk_size, chunker=chunker):
for document_chunk in document.read(
chunk_size=chunk_size, chunker=chunker, max_tokens=max_tokens
):
yield document_chunk
Loading

0 comments on commit f7e808e

Please sign in to comment.