Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPIK-653: Add integration with CrewAI #988

Merged
merged 16 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions sdks/python/src/opik/integrations/aisuite/aisuite_decorator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, AsyncGenerator, Callable, Dict, Generator, List, Optional, Tuple, Union

import aisuite.framework as aisuite_chat_completion
from openai.types.chat import chat_completion as openai_chat_completion

from opik import dict_utils
from opik.decorator import arguments_helpers, base_track_decorator


LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["messages"]
Expand Down Expand Up @@ -131,6 +130,8 @@ def _generators_handler(
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], Any]],
) -> None:
return None
generations_aggregator: Optional[Callable[[List[Any]], str]],
) -> Optional[Union[Generator, AsyncGenerator]]:
return super()._generators_handler(
output, capture_output, generations_aggregator
)
4 changes: 4 additions & 0 deletions sdks/python/src/opik/integrations/crewai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .opik_tracker import track_crewai


__all__ = ["track_crewai"]
204 changes: 204 additions & 0 deletions sdks/python/src/opik/integrations/crewai/crewai_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import logging
from typing import Any, AsyncGenerator, Callable, Dict, Generator, List, Optional, Tuple, Union

from crewai.tasks import TaskOutput

from opik import opik_context
from opik.decorator import arguments_helpers, base_track_decorator

LOGGER = logging.getLogger(__name__)

AGENT_KWARGS_KEYS_TO_LOG_AS_INPUTS = [
# "agent_executor",
# "allow_delegation",
"backstory",
# "cache",
# "cache_handler",
# "crew",
# "formatting_errors",
"goal",
# "i18n",
# "id",
# "llm",
# "max_iter",
# "max_rpm",
# "max_tokens",
"role",
"tools",
# "tools_handler",
# "verbose",
]

TASK_KWARGS_KEYS_TO_LOG_AS_INPUTS = [
# 'agent',
# 'async_execution',
# 'callback',
'config',
'context',
# 'converter_cls',
# 'delegations',
'description',
'expected_output',
# 'human_input',
# 'i18n',
# 'id',
'name',
# 'output',
# 'output_file',
# 'output_json',
# 'output_pydantic',
# 'processed_by_agents',
'prompt_context',
'tools',
# 'tools_errors',
# 'used_tools',
]



from litellm.integrations.custom_logger import CustomLogger


class OpikTokenCalcHandler(CustomLogger):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._event_kwargs = None
self._response_obj = None
self.model = None
self.provider = None
self.usage = None


def log_success_event(self, kwargs, response_obj, start_time, end_time):
self._event_kwargs = kwargs
self._response_obj = response_obj
self.model = response_obj.model
self.provider = "openai" if response_obj.object == 'chat.completion' else None
self.usage = response_obj.model_dump().get("usage")


class CrewAITrackDecorator(base_track_decorator.BaseTrackDecorator):
def _start_span_inputs_preprocessor(
self,
func: Callable,
track_options: arguments_helpers.TrackOptions,
args: Optional[Tuple],
kwargs: Optional[Dict[str, Any]],
) -> arguments_helpers.StartSpanParameters:

name = track_options.name if track_options.name is not None else func.__name__
metadata = track_options.metadata if track_options.metadata is not None else {}

metadata.update({
"created_from": "crewai",
"args": args,
"kwargs": kwargs,
})

tags = ["crewai"]

######################
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
# PARSE INPUT
######################
# Crew
if name == "kickoff":
input = kwargs.get("inputs")
metadata.update({
"object_type": "crew",
})

# Agent
elif name == "execute_task":
assert kwargs['task'].agent == args[0]
agent = args[0]
token_usage_callback = OpikTokenCalcHandler()
agent.agent_executor.callbacks = [token_usage_callback] + agent.agent_executor.callbacks

input = {}
input["context"] = kwargs.get("context")
agent_dict = agent.model_dump(include=AGENT_KWARGS_KEYS_TO_LOG_AS_INPUTS)
input["agent"] = agent_dict

name = agent.role.strip()
metadata.update({
"object_type": "agent",
})

# Task
elif name == "execute_sync":
input = {}
task_dict = args[0].model_dump(include=TASK_KWARGS_KEYS_TO_LOG_AS_INPUTS)
input["task"] = task_dict

name = f"Task: {args[0].name}"
metadata.update({
"object_type": "task",
})

else:
raise NotImplementedError

result = arguments_helpers.StartSpanParameters(
name=name,
input=input,
type=track_options.type,
tags=tags,
metadata=metadata,
project_name=track_options.project_name,
)

return result

def _end_span_inputs_preprocessor(
self,
output: Any,
capture_output: bool,
) -> arguments_helpers.EndSpanParameters:

usage = None
model = None
provider = None

current_span = opik_context.get_current_span_data()

if isinstance(output, TaskOutput):
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
output = {"output": output.raw}
elif isinstance(output, str):
output = {"output": output}
else:
# output = output.model_dump()
# usage = output.pop("token_usage", None)
output = {}

if current_span.metadata.get("object_type") == "agent":
opik_callback_handler = current_span.metadata['args'][0].agent_executor.callbacks[0]
if opik_callback_handler:
model = opik_callback_handler.model
provider = opik_callback_handler.provider
usage = opik_callback_handler.usage

metadata = current_span.metadata

metadata.pop('args')
metadata.pop('kwargs')
metadata.pop('token_usage_callback', None)

result = arguments_helpers.EndSpanParameters(
output=output,
usage=usage,
metadata=metadata,
model=model,
provider=provider,
)

return result

def _generators_handler(
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], str]],
) -> Optional[Union[Generator, AsyncGenerator]]:
return super()._generators_handler(
output, capture_output, generations_aggregator
)
51 changes: 51 additions & 0 deletions sdks/python/src/opik/integrations/crewai/opik_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import functools
from typing import Optional

import crewai

from . import crewai_decorator


def track_crewai(
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
project_name: Optional[str] = None,
) -> None:
decorator_factory = crewai_decorator.CrewAITrackDecorator()

kickoff_decorator = decorator_factory.track(
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
project_name=project_name,
)

crewai.Crew.kickoff = kickoff_decorator(crewai.Crew.kickoff)
crewai.Crew.kickoff_for_each = kickoff_decorator(crewai.Crew.kickoff_for_each)
crewai.Crew.kickoff_async = kickoff_decorator(crewai.Crew.kickoff_async)
crewai.Crew.kickoff_for_each_async = kickoff_decorator(crewai.Crew.kickoff_for_each_async)

crewai.Agent.execute_task = kickoff_decorator(crewai.Agent.execute_task)
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
crewai.Agent.create_agent_executor = create_agent_executor_wrapper(crewai.Agent.create_agent_executor)

crewai.Task.execute_sync = kickoff_decorator(crewai.Task.execute_sync)
crewai.Task.execute_async = kickoff_decorator(crewai.Task.execute_async)
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved

return None


def create_agent_executor_wrapper(method):
@functools.wraps(method)
def wrapped_method(*args, **kwargs):
opik_obj = None

if args[0].agent_executor and len(args[0].agent_executor.callbacks) > 1:
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
for callback in args[0].agent_executor.callbacks:
if isinstance(callback, crewai_decorator.OpikTokenCalcHandler):
opik_obj = callback
break

result = method(*args, **kwargs)

if opik_obj is not None:
args[0].agent_executor.callbacks = [opik_obj] + args[0].agent_executor.callbacks

return result

return wrapped_method

3 changes: 3 additions & 0 deletions sdks/python/src/opik/integrations/dspy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .callback import OpikCallback

__all__ = ["OpikCallback"]
Empty file.
20 changes: 20 additions & 0 deletions sdks/python/tests/library_integration/crewai/config/agents.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# src/latest_ai_development/config/agents.yaml
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.

reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.
19 changes: 19 additions & 0 deletions sdks/python/tests/library_integration/crewai/config/tasks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# src/latest_ai_development/config/tasks.yaml
research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is 2024.
expected_output: >
A list with 2 bullet points of the most relevant information about {topic}
agent: researcher

reporting_task:
description: >
Review the context you got and expand each topic into a small section for a report.
# Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledge reports with the mains topics, each with a small section of information.
Formatted as markdown without '```'
agent: reporting_analyst
output_file: report.md
47 changes: 47 additions & 0 deletions sdks/python/tests/library_integration/crewai/crew.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task


@CrewBase
class LatestAiDevelopmentCrew:
"""LatestAiDevelopment crew"""

@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True,
tools=[
# SerperDevTool()
]
)

@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)

@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)

@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='output/report.md' # This is the file that will be contain the final report.
)

@crew
def crew(self) -> Crew:
"""Creates the LatestAiDevelopment crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
crewai
crewai-tools
Loading
Loading