Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPIK-653: Add integration with CrewAI #988

Merged
merged 16 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/lib-crewai-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Workflow to run CrewAI tests
#
# Please read inputs to provide correct values.
#
name: SDK Lib CrewAI Tests
run-name: "SDK Lib CrewAI Tests ${{ github.ref_name }} by @${{ github.actor }}"
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
OPENAI_ORG_ID: ${{ secrets.OPENAI_ORG_ID }}
on:
workflow_call:

jobs:
tests:
name: CrewAI Python ${{matrix.python_version}}
runs-on: ubuntu-latest
defaults:
run:
working-directory: sdks/python

strategy:
fail-fast: true
matrix:
python_version: ["3.10", "3.11", "3.12"]

steps:
- name: Check out code
uses: actions/checkout@v4

- name: Setup Python ${{matrix.python_version}}
uses: actions/setup-python@v5
with:
python-version: ${{matrix.python_version}}

- name: Install opik
run: pip install .

- name: Install test tools
run: |
cd ./tests
pip install --no-cache-dir --disable-pip-version-check -r test_requirements.txt

- name: Install lib
run: |
cd ./tests
pip install --no-cache-dir --disable-pip-version-check -r library_integration/crewai/requirements.txt

- name: Run tests
run: |
cd ./tests/library_integration/crewai/
python -m pytest -vv .
7 changes: 7 additions & 0 deletions .github/workflows/lib-integration-tests-runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ on:
- haystack
- guardrails
- dspy
- crewai
schedule:
- cron: "0 0 */1 * *"
pull_request:
Expand Down Expand Up @@ -94,3 +95,9 @@ jobs:
if: contains(fromJSON('["dspy", "all"]'), needs.init_environment.outputs.LIBS)
uses: ./.github/workflows/lib-dspy-tests.yml
secrets: inherit

crewai_tests:
needs: [init_environment]
if: contains(fromJSON('["crewai", "all"]'), needs.init_environment.outputs.LIBS)
uses: ./.github/workflows/lib-crewai-tests.yml
secrets: inherit
21 changes: 16 additions & 5 deletions sdks/python/src/opik/integrations/aisuite/aisuite_decorator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
Generator,
List,
Optional,
Tuple,
Union,
)

import aisuite.framework as aisuite_chat_completion
from openai.types.chat import chat_completion as openai_chat_completion

from opik import dict_utils
from opik.decorator import arguments_helpers, base_track_decorator


LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["messages"]
Expand Down Expand Up @@ -131,6 +140,8 @@ def _generators_handler(
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], Any]],
) -> None:
return None
generations_aggregator: Optional[Callable[[List[Any]], str]],
) -> Optional[Union[Generator, AsyncGenerator]]:
return super()._generators_handler(
output, capture_output, generations_aggregator
)
4 changes: 4 additions & 0 deletions sdks/python/src/opik/integrations/crewai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .opik_tracker import track_crewai


__all__ = ["track_crewai"]
211 changes: 211 additions & 0 deletions sdks/python/src/opik/integrations/crewai/crewai_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import logging
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
Generator,
List,
Optional,
Tuple,
Union,
)

from opik import opik_context
from opik.decorator import arguments_helpers, base_track_decorator
from opik.types import SpanType

LOGGER = logging.getLogger(__name__)

AGENT_KWARGS_KEYS_TO_LOG_AS_INPUTS = [
# "agent_executor",
# "allow_delegation",
"backstory",
# "cache",
# "cache_handler",
# "crew",
# "formatting_errors",
"goal",
# "i18n",
# "id",
# "llm",
# "max_iter",
# "max_rpm",
# "max_tokens",
"role",
"tools",
# "tools_handler",
# "verbose",
]

TASK_KWARGS_KEYS_TO_LOG_AS_INPUTS = [
# 'agent',
# 'async_execution',
# 'callback',
"config",
"context",
# 'converter_cls',
# 'delegations',
"description",
"expected_output",
# 'human_input',
# 'i18n',
# 'id',
"name",
# 'output',
# 'output_file',
# 'output_json',
# 'output_pydantic',
# 'processed_by_agents',
"prompt_context",
"tools",
# 'tools_errors',
# 'used_tools',
]

TASK_KWARGS_KEYS_TO_LOG_AS_OUTPUT = [
# 'agent',
# 'description',
# 'expected_output',
# 'json_dict',
"name",
# 'output_format',
# 'pydantic',
"raw",
"summary",
]


class CrewAITrackDecorator(base_track_decorator.BaseTrackDecorator):
def _start_span_inputs_preprocessor(
self,
func: Callable,
track_options: arguments_helpers.TrackOptions,
args: Tuple,
kwargs: Dict[str, Any],
) -> arguments_helpers.StartSpanParameters:
name = track_options.name if track_options.name is not None else func.__name__
metadata = track_options.metadata if track_options.metadata is not None else {}
metadata["created_from"] = "crewai"
tags = ["crewai"]

input_dict, name, span_type = self._parse_inputs(args, kwargs, metadata, name)

result = arguments_helpers.StartSpanParameters(
name=name,
input=input_dict,
type=span_type,
tags=tags,
metadata=metadata,
project_name=track_options.project_name,
)

return result

def _parse_inputs(
self,
args: Tuple,
kwargs: Dict,
metadata: Dict,
name: str,
) -> Tuple[Dict, str, SpanType]:
span_type: SpanType = "general"
input_dict: Dict[str, Any] = {}

# Crew
if name == "kickoff":
metadata["object_type"] = "crew"
input_dict = kwargs.get("inputs", {})

# Agent
elif name == "execute_task":
metadata["object_type"] = "agent"
agent = args[0]
input_dict = {"context": kwargs.get("context")}
agent_dict = agent.model_dump(include=AGENT_KWARGS_KEYS_TO_LOG_AS_INPUTS)
input_dict["agent"] = agent_dict
name = agent.role.strip()

# Task
elif name == "execute_sync":
metadata["object_type"] = "task"
input_dict = {}
task_dict = args[0].model_dump(include=TASK_KWARGS_KEYS_TO_LOG_AS_INPUTS)
input_dict["task"] = task_dict
name = f"Task: {args[0].name}"

elif name == "completion":
metadata["object_type"] = "completion"
input_dict = {"messages": kwargs.get("messages")}
span_type = "llm"
name = "llm call"

return input_dict, name, span_type

def _end_span_inputs_preprocessor(
self,
output: Any,
capture_output: bool,
) -> arguments_helpers.EndSpanParameters:
object_type = None
metadata = {}

current_span = opik_context.get_current_span_data()
if current_span and current_span.metadata:
metadata = current_span.metadata
object_type = metadata.pop("object_type")

model, provider, output_dict, usage = self._parse_outputs(object_type, output)

result = arguments_helpers.EndSpanParameters(
output=output_dict,
usage=usage,
metadata=metadata,
model=model,
provider=provider,
)

return result

def _parse_outputs(
self,
object_type: Optional[str],
output: Any,
) -> Tuple[
Optional[str],
Optional[str],
Dict[str, Any],
Optional[Dict[str, Any]],
]:
model = None
provider = None
usage = None
output_dict = {}

if object_type == "crew":
output_dict = output.model_dump()
_ = output_dict.pop("token_usage")
elif object_type == "agent":
output_dict = {"output": output}
elif object_type == "task":
output_dict = output.model_dump(include=TASK_KWARGS_KEYS_TO_LOG_AS_OUTPUT)
elif object_type == "completion":
output_dict = output.model_dump()
usage = output_dict.pop("usage", None)
model = output_dict.pop("model", None)
provider = (
"openai" if output_dict.get("object") == "chat.completion" else None
)
output_dict = {}

return model, provider, output_dict, usage

def _generators_handler(
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], str]],
) -> Optional[Union[Generator, AsyncGenerator]]:
return super()._generators_handler(
output, capture_output, generations_aggregator
)
31 changes: 31 additions & 0 deletions sdks/python/src/opik/integrations/crewai/opik_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Optional

import crewai
import litellm

from . import crewai_decorator

__IS_TRACKING_ENABLED = False


def track_crewai(
japdubengsub marked this conversation as resolved.
Show resolved Hide resolved
project_name: Optional[str] = None,
) -> None:
global __IS_TRACKING_ENABLED
if __IS_TRACKING_ENABLED:
return
__IS_TRACKING_ENABLED = True

decorator_factory = crewai_decorator.CrewAITrackDecorator()

crewai_wrapper = decorator_factory.track(
project_name=project_name,
)

crewai.Crew.kickoff = crewai_wrapper(crewai.Crew.kickoff)
crewai.Crew.kickoff_for_each = crewai_wrapper(crewai.Crew.kickoff_for_each)
crewai.Agent.execute_task = crewai_wrapper(crewai.Agent.execute_task)
crewai.Task.execute_sync = crewai_wrapper(crewai.Task.execute_sync)
litellm.completion = crewai_wrapper(litellm.completion)

return None
Empty file.
20 changes: 20 additions & 0 deletions sdks/python/tests/library_integration/crewai/config/agents.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# src/latest_ai_development/config/agents.yaml
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.

reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.
19 changes: 19 additions & 0 deletions sdks/python/tests/library_integration/crewai/config/tasks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# src/latest_ai_development/config/tasks.yaml
research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is 2024.
expected_output: >
A list with 2 bullet points of the most relevant information about {topic}
agent: researcher

reporting_task:
description: >
Review the context you got and expand each topic into a small section for a report.
# Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledge reports with the mains topics, each with a small section of information.
Formatted as markdown without '```'
agent: reporting_analyst
output_file: report.md
Loading
Loading