Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/cog 184 add falkordb #192

Merged
merged 26 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c901fa8
feat: add falkordb adapter
borisarzentar Oct 24, 2024
62022a8
Merge remote-tracking branch 'origin/main' into feat/COG-184-add-falk…
borisarzentar Oct 29, 2024
14e2c7e
feat: add FalkorDB integration
borisarzentar Nov 7, 2024
758698a
Merge remote-tracking branch 'origin/main' into feat/COG-184-add-falk…
borisarzentar Nov 7, 2024
897bbac
fix: serialize UUID in pgvector data point payload
borisarzentar Nov 7, 2024
f569088
fix: add summaries to the graph
borisarzentar Nov 7, 2024
cf5b337
Merge remote-tracking branch 'origin/main' into feat/COG-184-add-falk…
borisarzentar Nov 7, 2024
c890636
fix: remove unused import
borisarzentar Nov 7, 2024
9e10c61
fix: resolves pg asyncpg UUID to UUID
hajdul88 Nov 7, 2024
19d62f2
fix: add code graph generation pipeline
borisarzentar Nov 8, 2024
9579cc7
Merge remote-tracking branch 'origin/feat/COG-184-add-falkordb' into …
borisarzentar Nov 8, 2024
51a8305
Merge remote-tracking branch 'origin/main' into feat/COG-184-add-falk…
borisarzentar Nov 8, 2024
f20c838
Merge remote-tracking branch 'origin/feat/COG-184-add-falkordb' into …
borisarzentar Nov 11, 2024
e7e6107
fix: check "updated_at" in edge instead of node
borisarzentar Nov 11, 2024
d733bfd
fix: convert qdrant search results to ScoredPoint
borisarzentar Nov 11, 2024
9c4da23
fix: fix single data point addition to weaiate
borisarzentar Nov 11, 2024
4c19999
fix: convert UUID to str for neo4j query
borisarzentar Nov 11, 2024
08a8442
Merge remote-tracking branch 'origin/main' into feat/COG-184-add-falk…
borisarzentar Nov 11, 2024
39bc8d6
fix: change weaviate batch update to use dynamic batch
borisarzentar Nov 11, 2024
d2d819e
fix: unwrap connections in PGVectorAdapter
borisarzentar Nov 11, 2024
f8e35b3
fix: update poetry.lock
borisarzentar Nov 11, 2024
30edd2d
fix: add postgres extras to dependencies
borisarzentar Nov 11, 2024
44954c1
fix: update entities collection name in cognee_demo notebook
borisarzentar Nov 11, 2024
a3b3667
fix: rerun github workflow checks on push
borisarzentar Nov 11, 2024
ed9036a
fix: change entity collection name
borisarzentar Nov 11, 2024
fd6398d
fix: cognee_demo notebook search
borisarzentar Nov 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,34 @@ import asyncio
from cognee.api.v1.search import SearchType

async def main():
await cognee.prune.prune_data() # Reset cognee data
await cognee.prune.prune_system(metadata=True) # Reset cognee system state
# Reset cognee data
await cognee.prune.prune_data()
# Reset cognee system state
await cognee.prune.prune_system(metadata=True)

text = """
Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval.
"""

await cognee.add(text) # Add text to cognee
await cognee.cognify() # Use LLMs and cognee to create knowledge graph
# Add text to cognee
await cognee.add(text)

search_results = await cognee.search( # Search cognee for insights
# Use LLMs and cognee to create knowledge graph
await cognee.cognify()

# Search cognee for insights
search_results = await cognee.search(
SearchType.INSIGHTS,
{'query': 'Tell me about NLP'}
"Tell me about NLP",
)

for result_text in search_results: # Display results
# Display results
for result_text in search_results:
print(result_text)
# natural_language_processing is_a field
# natural_language_processing is_subfield_of computer_science
# natural_language_processing is_subfield_of information_retrieval

asyncio.run(main())
```
Expand Down
110 changes: 110 additions & 0 deletions cognee/api/v1/cognify/code_graph_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
import logging
from typing import Union

from cognee.shared.SourceCodeGraph import SourceCodeGraph
from cognee.shared.utils import send_telemetry
from cognee.modules.data.models import Dataset, Data
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.methods import get_datasets, get_datasets_by_name
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.pipelines import run_tasks
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status
from cognee.tasks.documents import classify_documents, check_permissions_on_documents, extract_chunks_from_documents
from cognee.tasks.graph import extract_graph_from_code
from cognee.tasks.storage import add_data_points

logger = logging.getLogger("code_graph_pipeline")

update_status_lock = asyncio.Lock()

class PermissionDeniedException(Exception):
def __init__(self, message: str):
self.message = message
super().__init__(self.message)

async def code_graph_pipeline(datasets: Union[str, list[str]] = None, user: User = None):
if user is None:
user = await get_default_user()

existing_datasets = await get_datasets(user.id)

if datasets is None or len(datasets) == 0:
# If no datasets are provided, cognify all existing datasets.
datasets = existing_datasets

if type(datasets[0]) == str:
datasets = await get_datasets_by_name(datasets, user.id)
Comment on lines +36 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure consistent handling of datasets and use isinstance() for type checking

The current implementation may not handle the datasets parameter correctly when it is a string. When datasets is a string, datasets[0] will be the first character of the string, which is likely not the intended behavior. Additionally, it's recommended to use isinstance() for type checking instead of comparing types directly.

Apply this diff to fix the issues:

     if datasets is None or len(datasets) == 0:
         # If no datasets are provided, cognify all existing datasets.
         datasets = existing_datasets

+    if isinstance(datasets, str):
+        datasets = [datasets]
+
-    if type(datasets[0]) == str:
+    if isinstance(datasets[0], str):
         datasets = await get_datasets_by_name(datasets, user.id)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if datasets is None or len(datasets) == 0:
# If no datasets are provided, cognify all existing datasets.
datasets = existing_datasets
if type(datasets[0]) == str:
datasets = await get_datasets_by_name(datasets, user.id)
if datasets is None or len(datasets) == 0:
# If no datasets are provided, cognify all existing datasets.
datasets = existing_datasets
if isinstance(datasets, str):
datasets = [datasets]
if isinstance(datasets[0], str):
datasets = await get_datasets_by_name(datasets, user.id)
🧰 Tools
🪛 Ruff

40-40: Use is and is not for type comparisons, or isinstance() for isinstance checks

(E721)


existing_datasets_map = {
generate_dataset_name(dataset.name): True for dataset in existing_datasets
}

awaitables = []

for dataset in datasets:
dataset_name = generate_dataset_name(dataset.name)

if dataset_name in existing_datasets_map:
awaitables.append(run_pipeline(dataset, user))

return await asyncio.gather(*awaitables)


async def run_pipeline(dataset: Dataset, user: User):
data_documents: list[Data] = await get_dataset_data(dataset_id = dataset.id)

document_ids_str = [str(document.id) for document in data_documents]

dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)

send_telemetry("code_graph_pipeline EXECUTION STARTED", user.id)

async with update_status_lock:
task_status = await get_pipeline_status([dataset_id])

if dataset_id in task_status and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED:
logger.info("Dataset %s is already being processed.", dataset_name)
return

await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_STARTED, {
"dataset_name": dataset_name,
"files": document_ids_str,
})
try:
tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user = user, permissions = ["write"]),
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(add_data_points, task_config = { "batch_size": 10 }),
Task(extract_graph_from_code, graph_model = SourceCodeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks.
]

pipeline = run_tasks(tasks, data_documents, "code_graph_pipeline")

async for result in pipeline:
print(result)

send_telemetry("code_graph_pipeline EXECUTION COMPLETED", user.id)

await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_COMPLETED, {
"dataset_name": dataset_name,
"files": document_ids_str,
})
except Exception as error:
send_telemetry("code_graph_pipeline EXECUTION ERRORED", user.id)

await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_ERRORED, {
"dataset_name": dataset_name,
"files": document_ids_str,
})
raise error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use raise without arguments to preserve traceback

When re-raising an exception inside an except block, use raise without specifying the exception to preserve the original traceback.

Apply this diff to fix the issue:

         await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_ERRORED, {
             "dataset_name": dataset_name,
             "files": document_ids_str,
         })
-        raise error
+        raise

Committable suggestion skipped: line range outside the PR's diff.



def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_")
42 changes: 11 additions & 31 deletions cognee/api/v1/cognify/cognify_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,15 @@
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.methods import get_datasets, get_datasets_by_name
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.pipelines import run_tasks, run_tasks_parallel
from cognee.modules.pipelines import run_tasks
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status
from cognee.tasks import chunk_naive_llm_classifier, \
chunk_remove_disconnected, \
infer_data_ontology, \
save_chunks_to_store, \
chunk_update_check, \
chunks_into_graph, \
source_documents_to_chunks, \
check_permissions_on_documents, \
classify_documents
from cognee.tasks.documents import classify_documents, check_permissions_on_documents, extract_chunks_from_documents
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text

logger = logging.getLogger("cognify.v2")
Expand Down Expand Up @@ -87,31 +81,17 @@ async def run_cognify_pipeline(dataset: Dataset, user: User):
try:
cognee_config = get_cognify_config()

root_node_id = None

tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user = user, permissions = ["write"]),
Task(infer_data_ontology, root_node_id = root_node_id, ontology_model = KnowledgeGraph),
Task(source_documents_to_chunks, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
Task(chunks_into_graph, graph_model = KnowledgeGraph, collection_name = "entities", task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks and attach it to chunk nodes
Task(chunk_update_check, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(add_data_points, task_config = { "batch_size": 10 }),
Task(extract_graph_from_data, graph_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks.
Task(
save_chunks_to_store,
collection_name = "chunks",
), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other)
run_tasks_parallel([
Task(
summarize_text,
summarization_model = cognee_config.summarization_model,
collection_name = "summaries",
),
Task(
chunk_naive_llm_classifier,
classification_model = cognee_config.classification_model,
),
]),
Task(chunk_remove_disconnected), # Remove the obsolete document chunks.
summarize_text,
summarization_model = cognee_config.summarization_model,
task_config = { "batch_size": 10 }
),
]

pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
Expand Down
2 changes: 1 addition & 1 deletion cognee/api/v1/search/search_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_document_ids_for_user
from cognee.tasks.chunking import query_chunks
from cognee.tasks.chunks import query_chunks
from cognee.tasks.graph import query_graph_connections
from cognee.tasks.summarization import query_summaries

Expand Down
Empty file.
Loading
Loading