-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/cog 184 add falkordb #192
Changes from all commits
c901fa8
62022a8
14e2c7e
758698a
897bbac
f569088
cf5b337
c890636
9e10c61
19d62f2
9579cc7
51a8305
f20c838
e7e6107
d733bfd
9c4da23
4c19999
08a8442
39bc8d6
d2d819e
f8e35b3
30edd2d
44954c1
a3b3667
ed9036a
fd6398d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ on: | |
pull_request: | ||
branches: | ||
- main | ||
types: [labeled] | ||
types: [labeled, synchronize] | ||
|
||
|
||
concurrency: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ on: | |
pull_request: | ||
branches: | ||
- main | ||
types: [labeled] | ||
types: [labeled, synchronize] | ||
|
||
|
||
concurrency: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ on: | |
pull_request: | ||
branches: | ||
- main | ||
types: [labeled] | ||
types: [labeled, synchronize] | ||
|
||
|
||
concurrency: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ on: | |
pull_request: | ||
branches: | ||
- main | ||
types: [labeled] | ||
types: [labeled, synchronize] | ||
|
||
|
||
concurrency: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import asyncio | ||
import logging | ||
from typing import Union | ||
|
||
from cognee.shared.SourceCodeGraph import SourceCodeGraph | ||
from cognee.shared.utils import send_telemetry | ||
from cognee.modules.data.models import Dataset, Data | ||
from cognee.modules.data.methods.get_dataset_data import get_dataset_data | ||
from cognee.modules.data.methods import get_datasets, get_datasets_by_name | ||
from cognee.modules.pipelines.tasks.Task import Task | ||
from cognee.modules.pipelines import run_tasks | ||
from cognee.modules.users.models import User | ||
from cognee.modules.users.methods import get_default_user | ||
from cognee.modules.pipelines.models import PipelineRunStatus | ||
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status | ||
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status | ||
from cognee.tasks.documents import classify_documents, check_permissions_on_documents, extract_chunks_from_documents | ||
from cognee.tasks.graph import extract_graph_from_code | ||
from cognee.tasks.storage import add_data_points | ||
|
||
logger = logging.getLogger("code_graph_pipeline") | ||
|
||
update_status_lock = asyncio.Lock() | ||
|
||
class PermissionDeniedException(Exception): | ||
def __init__(self, message: str): | ||
self.message = message | ||
super().__init__(self.message) | ||
|
||
async def code_graph_pipeline(datasets: Union[str, list[str]] = None, user: User = None): | ||
if user is None: | ||
user = await get_default_user() | ||
|
||
existing_datasets = await get_datasets(user.id) | ||
|
||
if datasets is None or len(datasets) == 0: | ||
# If no datasets are provided, cognify all existing datasets. | ||
datasets = existing_datasets | ||
|
||
if type(datasets[0]) == str: | ||
datasets = await get_datasets_by_name(datasets, user.id) | ||
|
||
existing_datasets_map = { | ||
generate_dataset_name(dataset.name): True for dataset in existing_datasets | ||
} | ||
|
||
awaitables = [] | ||
|
||
for dataset in datasets: | ||
dataset_name = generate_dataset_name(dataset.name) | ||
|
||
if dataset_name in existing_datasets_map: | ||
awaitables.append(run_pipeline(dataset, user)) | ||
|
||
return await asyncio.gather(*awaitables) | ||
|
||
|
||
async def run_pipeline(dataset: Dataset, user: User): | ||
data_documents: list[Data] = await get_dataset_data(dataset_id = dataset.id) | ||
|
||
document_ids_str = [str(document.id) for document in data_documents] | ||
|
||
dataset_id = dataset.id | ||
dataset_name = generate_dataset_name(dataset.name) | ||
|
||
send_telemetry("code_graph_pipeline EXECUTION STARTED", user.id) | ||
|
||
async with update_status_lock: | ||
task_status = await get_pipeline_status([dataset_id]) | ||
|
||
if dataset_id in task_status and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED: | ||
logger.info("Dataset %s is already being processed.", dataset_name) | ||
return | ||
|
||
await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_STARTED, { | ||
"dataset_name": dataset_name, | ||
"files": document_ids_str, | ||
}) | ||
try: | ||
tasks = [ | ||
Task(classify_documents), | ||
Task(check_permissions_on_documents, user = user, permissions = ["write"]), | ||
Task(extract_chunks_from_documents), # Extract text chunks based on the document type. | ||
Task(add_data_points, task_config = { "batch_size": 10 }), | ||
Task(extract_graph_from_code, graph_model = SourceCodeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks. | ||
] | ||
|
||
pipeline = run_tasks(tasks, data_documents, "code_graph_pipeline") | ||
|
||
async for result in pipeline: | ||
print(result) | ||
|
||
send_telemetry("code_graph_pipeline EXECUTION COMPLETED", user.id) | ||
|
||
await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_COMPLETED, { | ||
"dataset_name": dataset_name, | ||
"files": document_ids_str, | ||
}) | ||
except Exception as error: | ||
send_telemetry("code_graph_pipeline EXECUTION ERRORED", user.id) | ||
|
||
await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_ERRORED, { | ||
"dataset_name": dataset_name, | ||
"files": document_ids_str, | ||
}) | ||
raise error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use When re-raising an exception inside an Apply this diff to fix the issue: await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_ERRORED, {
"dataset_name": dataset_name,
"files": document_ids_str,
})
- raise error
+ raise
|
||
|
||
|
||
def generate_dataset_name(dataset_name: str) -> str: | ||
return dataset_name.replace(".", "_").replace(" ", "_") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure consistent handling of
datasets
and useisinstance()
for type checkingThe current implementation may not handle the
datasets
parameter correctly when it is a string. Whendatasets
is a string,datasets[0]
will be the first character of the string, which is likely not the intended behavior. Additionally, it's recommended to useisinstance()
for type checking instead of comparing types directly.Apply this diff to fix the issues:
📝 Committable suggestion
🧰 Tools
🪛 Ruff
40-40: Use
is
andis not
for type comparisons, orisinstance()
for isinstance checks(E721)