Skip to content

Commit

Permalink
Merge pull request #265 from topoteretes/feat/COG-418-log-config-to-t…
Browse files Browse the repository at this point in the history
…elemetry

Add cognee config to telemetry
  • Loading branch information
Vasilije1990 authored Dec 7, 2024
2 parents 86a6304 + f30bf35 commit ce96431
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions cognee/modules/pipelines/operations/run_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import inspect
import json
import logging

from cognee.modules.settings import get_current_settings
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.shared.utils import send_telemetry

from ..tasks.Task import Task

logger = logging.getLogger("run_tasks(tasks: [Task], data)")
Expand Down Expand Up @@ -160,37 +162,43 @@ async def run_tasks_base(tasks: list[Task], data = None, user: User = None):
raise error

async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str):
user = await get_default_user()

config = get_current_settings()

logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent = 1))

user = await get_default_user()

try:
logger.info("Pipeline run started: `%s`", pipeline_name)
send_telemetry("Pipeline Run Started", user.id, {
"pipeline_name": pipeline_name,
})

send_telemetry("Pipeline Run Started",
user.id,
additional_properties = {"pipeline_name": pipeline_name, } | config
)

async for result in run_tasks_base(tasks, data, user):
yield result

logger.info("Pipeline run completed: `%s`", pipeline_name)
send_telemetry("Pipeline Run Completed", user.id, {
"pipeline_name": pipeline_name,
})
send_telemetry("Pipeline Run Completed",
user.id,
additional_properties = {"pipeline_name": pipeline_name, }
)
except Exception as error:
logger.error(
"Pipeline run errored: `%s`\n%s\n",
pipeline_name,
str(error),
exc_info = True,
)
send_telemetry("Pipeline Run Errored", user.id, {
"pipeline_name": pipeline_name,
})
send_telemetry("Pipeline Run Errored",
user.id,
additional_properties = {"pipeline_name": pipeline_name, } | config
)

raise error

async def run_tasks(tasks: list[Task], data = None, pipeline_name: str = "default_pipeline"):
config = get_current_settings()
logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent = 1))


async for result in run_tasks_with_telemetry(tasks, data, pipeline_name):
yield result

0 comments on commit ce96431

Please sign in to comment.