diff --git a/submodules/moragents_dockers/agents/src/agents/README.md b/submodules/moragents_dockers/agents/src/agents/README.md new file mode 100644 index 0000000..7b701fc --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/README.md @@ -0,0 +1,228 @@ +# Creating a New Agent + +This guide walks through the process of creating and configuring a new agent using the provided creation script and making necessary modifications for it to work properly. + +## Quick Start + +1. Run the agent creation script: + +```bash +./create_new_agent.sh +``` + +2. When prompted, enter your agent name (must start with a letter and can only contain letters, numbers, underscores, and hyphens) + +The script will create a new directory structure for your agent with the following files: + +``` +your_agent_name/ +├── __init__.py +├── agent.py +├── config.py +└── tools/ + └── tools.py +``` + +## Required Modifications + +After creating your agent, you'll need to make the following modifications to get it working: + +### 1. Configure Agent in src/config.py + +Add your agent's configuration to the main config file: + +```python +from src.agents.your_agent_name.config import Config as YourAgentConfig + +class Config: + # ... existing config ... + + AGENT_CONFIGS = { + # ... existing agents ... + "your_agent_name": YourAgentConfig, + } +``` + +### 2. Implement Agent Logic + +Modify `your_agent_name/agent.py`: + +1. Update the class name and docstring: + +```python +class YourAgentNameAgent(AgentCore): + """Agent for handling specific operations related to your agent's purpose.""" +``` + +2. Add your tools to the `tools_provided` list in `__init__`: + +```python +def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) + self.tools_provided = [ + # Add your tool functions here + ] + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) +``` + +3. Implement custom request processing in `_process_request`: + +```python +async def _process_request(self, request: ChatRequest) -> AgentResponse: + try: + messages = [ + SystemMessage(content="Your custom system prompt here"), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) + except Exception as e: + self.logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) +``` + +4. Implement tool execution in `_execute_tool`: + +```python +async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate tool based on function name.""" + tool_map = { + # Map your tool names to their implementation functions + "your_tool_name": self._your_tool_implementation, + } + + if func_name not in tool_map: + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") + + try: + result = await tool_map[func_name](**args) + return AgentResponse.success(content=result) + except Exception as e: + return AgentResponse.error(error_message=str(e)) +``` + +### 3. Implement Tools + +Add your tool implementations in `your_agent_name/tools/tools.py`: + +```python +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +async def your_tool_implementation(arg1: str, arg2: int) -> str: + """ + Implementation of your custom tool. + + Args: + arg1: Description of first argument + arg2: Description of second argument + + Returns: + str: Result of the tool operation + """ + try: + # Your tool implementation here + result = f"Processed {arg1} with value {arg2}" + return result + except Exception as e: + logger.error(f"Error in tool implementation: {str(e)}", exc_info=True) + raise +``` + +### 4. Update Configuration + +Modify `your_agent_name/config.py` to include any necessary configuration for your tools: + +```python +class Config: + tools = [ + { + "name": "your_tool_name", + "description": "Description of what your tool does", + "parameters": { + "type": "object", + "properties": { + "arg1": { + "type": "string", + "description": "Description of first argument" + }, + "arg2": { + "type": "integer", + "description": "Description of second argument" + } + }, + "required": ["arg1", "arg2"] + } + } + ] +``` + +## Error Handling + +Your agent inherits from `AgentCore` which provides several error handling mechanisms: + +1. The `@handle_exceptions` decorator handles common exceptions: + + - `ValueError` for validation errors (returns as needs_info) + - Unexpected errors (returns as error) + +2. Use `AgentResponse` for structured responses: + - `AgentResponse.success(content="Success message")` + - `AgentResponse.error(error_message="Error description")` + - `AgentResponse.needs_info(message="Additional information needed")` + +## Best Practices + +1. Always use the logger for debugging and monitoring: + +```python +self.logger.info("Processing request") +self.logger.error("Error occurred", exc_info=True) +``` + +2. Validate inputs early: + +```python +if not some_required_value: + return AgentResponse.needs_info(message="Please provide required value") +``` + +3. Keep tool implementations modular and focused on a single responsibility + +4. Document your code thoroughly, especially tool parameters and expected behavior + +5. Use type hints consistently to make the code more maintainable + +## Testing + +Create tests for your agent in the `tests/agents/your_agent_name/` directory: + +1. Test basic agent functionality +2. Test each tool implementation +3. Test error handling +4. Test edge cases and input validation + +## Example Usage + +```python +from src.agents.your_agent_name.agent import YourAgentNameAgent +from src.models.core import ChatRequest + +# Initialize agent +agent = YourAgentNameAgent(config, llm, embeddings) + +# Create request +request = ChatRequest(prompt="Your prompt here") + +# Process request +response = await agent.chat(request) + +# Handle response +if response.error_message: + print(f"Error: {response.error_message}") +else: + print(f"Success: {response.content}") +``` diff --git a/submodules/moragents_dockers/agents/src/agents/agent_core/__init__.py b/submodules/moragents_dockers/agents/src/agents/agent_core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/submodules/moragents_dockers/agents/src/agents/agent_core/agent.py b/submodules/moragents_dockers/agents/src/agents/agent_core/agent.py new file mode 100644 index 0000000..fb75eef --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/agent_core/agent.py @@ -0,0 +1,139 @@ +import logging + +from abc import ABC, abstractmethod +from enum import Enum +from typing import Dict, Any, Optional +from functools import wraps + +from src.models.core import ChatRequest, AgentResponse + + +class ResponseType(Enum): + """Enum to distinguish between different types of responses""" + + SUCCESS = "success" + NEEDS_INFO = "needs_info" # When we need more information from the user + ERROR = "error" # For breaking errors + WARNING = "warning" # For non-breaking issues that should be logged + + +def handle_exceptions(func): + """Decorator to handle exceptions uniformly across agent methods""" + + @wraps(func) + async def wrapper(self, *args, **kwargs): + try: + return await func(self, *args, **kwargs) + except ValueError as e: + # Handle validation errors - these are expected and should return as needs_info + self.logger.info(f"Validation error in {func.__name__}: {str(e)}") + return AgentResponse.error(error_message=str(e)) + except Exception as e: + # Handle unexpected errors - these are breaking errors + self.logger.error(f"Unexpected error in {func.__name__}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message="An unexpected error occurred. Please try again later.") + + return wrapper + + +class AgentCore(ABC): + """Enhanced core agent functionality that all specialized agents inherit from.""" + + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + self.config = config + self.llm = llm + self.embeddings = embeddings + self._setup_logging() + + def _setup_logging(self): + """Set up logging for the agent""" + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + + async def _validate_request(self, request: ChatRequest) -> Optional[AgentResponse]: + """Validate common request parameters and return appropriate response type""" + if not request.prompt: + return AgentResponse.error(error_message="Please provide a prompt to process your request") + + return None + + @handle_exceptions + async def chat(self, request: ChatRequest) -> AgentResponse: + """Main entry point for chat interactions""" + self.logger.info(f"Received chat request: {request}") + + # Validate request + validation_result = await self._validate_request(request) + if validation_result: + return validation_result + + # Process the request + response = await self._process_request(request) + + # Log response for monitoring + if response.error_message: + self.logger.warning(f"Response error: {response.error_message}") + + return response + + @abstractmethod + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """ + Process the validated request. Must be implemented by subclasses. + + Args: + request: Validated ChatRequest object + + Returns: + AgentResponse: Response containing content and optional error/metadata + """ + raise NotImplementedError("Subclasses must implement _process_request") + + async def _handle_llm_response(self, response: Any) -> AgentResponse: + """Handle LLM response and convert to appropriate AgentResponse""" + try: + if not response: + return AgentResponse.error( + error_message="I couldn't process that request. Could you please rephrase it?" + ) + + # Extract relevant information from LLM response + content = getattr(response, "content", None) + tool_calls = getattr(response, "tool_calls", []) + if tool_calls: + # Handle tool calls + self.logger.info(f"Processing tool calls: {tool_calls}") + return await self._process_tool_calls(tool_calls) + elif content: + # Direct response from LLM + self.logger.info(f"Received direct response from LLM: {content}") + return AgentResponse.success(content=content) + else: + self.logger.warning("Received invalid response format from LLM") + return AgentResponse.error(error_message="Received invalid response format from LLM") + + except Exception as e: + self.logger.error(f"Error processing LLM response: {str(e)}", exc_info=True) + return AgentResponse.error(error_message="Error processing the response") + + async def _process_tool_calls(self, tool_calls: list) -> AgentResponse: + """Process tool calls from LLM response""" + try: + tool_call = tool_calls[0] # Get first tool call + func_name = tool_call.get("name") + args = tool_call.get("args", {}) + + if not func_name: + return AgentResponse.error(error_message="Invalid tool call format - no function name provided") + + # Execute tool and handle response + # This should be implemented by subclasses based on their specific tools + return await self._execute_tool(func_name, args) + + except Exception as e: + self.logger.error(f"Error processing tool calls: {str(e)}", exc_info=True) + return AgentResponse.error(error_message="Error executing the requested action") + + @abstractmethod + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute a tool with given arguments. Must be implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement _execute_tool") diff --git a/submodules/moragents_dockers/agents/src/agents/base_agent/agent.py b/submodules/moragents_dockers/agents/src/agents/base_agent/agent.py index dcca19d..3505ad4 100644 --- a/submodules/moragents_dockers/agents/src/agents/base_agent/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/base_agent/agent.py @@ -1,159 +1,87 @@ import logging -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Optional from src.agents.base_agent import tools -from src.models.messages import ChatRequest +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore from langchain.schema import HumanMessage, SystemMessage from src.agents.base_agent.config import Config from src.stores import wallet_manager_instance -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) logger = logging.getLogger(__name__) -class BaseAgent: - def __init__(self, agent_info: Dict[str, Any], llm: Any, embeddings: Any): - """ - Initialize the BaseAgent for sending transactions on Base. - """ - self.agent_info = agent_info - self.llm = llm - self.embeddings = embeddings - self.config = Config() +class BaseAgent(AgentCore): + """Agent for handling Base blockchain transactions.""" - # Bind tools to LLM - self.tool_bound_llm = self.llm.bind_tools(self.config.tools) + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) + self.tools_provided = Config.tools + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) - def chat(self, request: ChatRequest) -> Dict[str, Any]: - try: - data = request.dict() - logger.info(f"Received chat request: {data}") - - if not data: - return {"role": "assistant", "content": "Invalid request data. Please try again."} - - # Check CDP client initialization - if not wallet_manager_instance.configure_cdp_client(): - return { - "role": "assistant", - "content": "CDP client not initialized. Please set API credentials.", - } + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for Base transactions.""" + # Check CDP client initialization + if not wallet_manager_instance.configure_cdp_client(): + # Return user-friendly error for missing credentials + return AgentResponse.success( + content="I'm not able to help with transactions right now because the CDP client is not initialized. Please set up your API credentials first." + ) - # Check for active wallet - active_wallet = wallet_manager_instance.get_active_wallet() - if not active_wallet: - return { - "role": "assistant", - "content": "No active wallet selected. Please select or create a wallet first.", - } + # Check for active wallet + active_wallet = wallet_manager_instance.get_active_wallet() + if not active_wallet: + # Return user-friendly error for missing wallet + return AgentResponse.success( + content="You'll need to select or create a wallet before I can help with transactions. Please set up a wallet first." + ) - if "prompt" in data: - prompt = data["prompt"] - wallet_address = data.get("wallet_address") - chain_id = data.get("chain_id") - response_content = self.handle_request(prompt, chain_id, wallet_address) - return { - "role": "assistant", - "content": response_content, - } - else: - logger.error("Missing 'prompt' in chat request data") - return { - "role": "assistant", - "content": "Missing required parameters. Please provide a prompt.", - } + try: + messages = [ + SystemMessage( + content=( + "You are an agent that can perform various financial transactions on Base. " + "When you need to perform an action, use the appropriate function with the correct arguments." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) except Exception as e: - logger.error(f"Error in chat method: {str(e)}, agent: {self.agent_info['name']}") - raise e - - def handle_request( - self, message: dict[str, any], chain_id: Optional[str], wallet_address: Optional[str] - ) -> Dict[str, Any]: - logger.info(f"Message: {message}") - logger.info(f"Chain ID: {chain_id}") - logger.info(f"Wallet Address: {wallet_address}") - - # System prompt that includes descriptions of available tools - tool_descriptions = "\n".join( - f"{tool['name']}: {tool['description']}" for tool in self.config.tools - ) - - messages = [ - SystemMessage( - content=( - "You are an agent that can perform various financial transactions on Base. " - f"You have access to the following functions:\n{tool_descriptions}\n" - "When you need to perform an action, use the appropriate function with the correct arguments." - ) - ), - ] - - messages.append(HumanMessage(content=message.get("content"))) + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) - logger.info(f"Messages: {messages}") - - result = self.tool_bound_llm.invoke(messages) - - logger.info(f"Result: {result}") - - # Process the LLM's response + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate Base transaction tool based on function name.""" try: - if result.tool_calls: - # Get first tool call - tool_call = result.tool_calls[0] - - # Extract function name and args from the tool call dict - func_name = tool_call.get("name") - args = tool_call.get("args", {}) - - logger.info(f"Function name: {func_name}") - logger.info(f"Arguments: {args}") - - if not func_name: - return { - "message": "Error: No function name provided in tool call", - "actionType": None, - } - - # Handle swap and transfer tools differently - if func_name == "swap_assets": - return {"message": "Ready to perform swap", "actionType": "swap"} - elif func_name == "transfer_asset": - return {"message": "Ready to perform transfer", "actionType": "transfer"} - elif func_name == "get_balance": - # Get active wallet from wallet manager - wallet = wallet_manager_instance.get_active_wallet() - if not wallet: - return {"message": "Error: No active wallet found", "actionType": None} - - try: - tool_result = tools.get_balance( - wallet, asset_id=args.get("asset_id").lower() - ) - balance = tool_result["balance"] - asset = tool_result["asset"] - address = tool_result["address"] - return { - "message": f"Your wallet {address} has a balance of {balance} {asset}", - "actionType": None, - } - except ValueError as e: - return {"message": f"Error: {str(e)}", "actionType": None} - else: - return { - "message": f"Error: Function '{func_name}' not supported.", - "actionType": None, - } - + if func_name == "swap_assets": + return AgentResponse.action_required(content="Ready to perform swap", action_type="swap") + elif func_name == "transfer_asset": + return AgentResponse.action_required(content="Ready to perform transfer", action_type="transfer") + elif func_name == "get_balance": + wallet = wallet_manager_instance.get_active_wallet() + if not wallet: + return AgentResponse.success( + content="I can't check the balance because no wallet is selected. Please select a wallet first." + ) + + asset_id = args.get("asset_id") + if not asset_id: + return AgentResponse.success( + content="Please specify which asset you'd like to check the balance for." + ) + + tool_result = tools.get_balance(wallet, asset_id=asset_id.lower()) + content = f"Your wallet {tool_result['address']} has a balance of {tool_result['balance']} {tool_result['asset']}" + return AgentResponse.success(content=content) else: - # No function call; return the assistant's message - content = result.content if hasattr(result, "content") else "" - return {"message": content, "actionType": None} + return AgentResponse.success( + content=f"I don't know how to {func_name} yet. Please try a different action." + ) except Exception as e: - logger.error(f"Error processing LLM response: {str(e)}") - return {"message": "Error: Unable to process the request.", "actionType": None} + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) diff --git a/submodules/moragents_dockers/agents/src/agents/create_new_agent.sh b/submodules/moragents_dockers/agents/src/agents/create_new_agent.sh new file mode 100755 index 0000000..f680728 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/create_new_agent.sh @@ -0,0 +1,103 @@ +#!/bin/bash + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Function to validate agent name +validate_agent_name() { + if [[ ! $1 =~ ^[a-zA-Z][a-zA-Z0-9_-]*$ ]]; then + echo -e "${RED}Error: Agent name must start with a letter and can only contain letters, numbers, underscores, and hyphens.${NC}" + return 1 + fi + return 0 +} + +# Function to create agent files +create_agent_files() { + local agent_name=$1 + local agent_dir="${agent_name}" + + # Create directory + mkdir -p "${agent_dir}" + touch "${agent_dir}/__init__.py" + + # Create agent.py + cat > "${agent_dir}/agent.py" << 'EOL' +import logging +from typing import Any, Dict + +from src.agents.agent_core.agent import AgentCore +from src.models.core import ChatRequest, AgentResponse +from langchain.schema import HumanMessage, SystemMessage + +logger = logging.getLogger(__name__) + +class ${agent_name^}Agent(AgentCore): + """Agent for handling ${agent_name} operations.""" + + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) + self.tools_provided = [] # Add your tools here + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) + + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request.""" + try: + messages = [ + SystemMessage(content="You are an agent that can perform various operations."), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate tool based on function name.""" + return AgentResponse.error(error_message=f"Tool {func_name} not implemented yet") +EOL + + # Create config.py + cat > "${agent_dir}/config.py" << EOL +class Config: + tools = [] # Add your tool configurations here +EOL + + # Create tools.py + cat > "${agent_dir}/tools.py" << EOL +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +# Add your tool implementations here +EOL + + echo -e "${GREEN}Successfully created ${agent_name} agent!${NC}" + echo -e "Agent location: ${BLUE}${agent_dir}${NC}" +} + +# Main script +echo -e "${BLUE}Welcome to the Agent Creation Wizard${NC}" +echo "Please enter a name for your new agent:" +read agent_name + +# Validate agent name +if ! validate_agent_name "$agent_name"; then + exit 1 +fi + +# Check if agent directory already exists +if [ -d "${agent_name}" ]; then + echo -e "${RED}Error: An agent with name '${agent_name}' already exists.${NC}" + exit 1 +fi + +# Create agent +create_agent_files "$agent_name" \ No newline at end of file diff --git a/submodules/moragents_dockers/agents/src/agents/crypto_data/agent.py b/submodules/moragents_dockers/agents/src/agents/crypto_data/agent.py index 9d77053..9e3d5a9 100644 --- a/submodules/moragents_dockers/agents/src/agents/crypto_data/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/crypto_data/agent.py @@ -1,95 +1,67 @@ -import json import logging - from src.agents.crypto_data import tools -from src.models.messages import ChatRequest +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore +from langchain.schema import HumanMessage, SystemMessage logger = logging.getLogger(__name__) -class CryptoDataAgent: +class CryptoDataAgent(AgentCore): + """Agent for handling cryptocurrency-related queries and data retrieval.""" + def __init__(self, config, llm, embeddings): - self.config = config - self.llm = llm - self.embeddings = embeddings + super().__init__(config, llm, embeddings) self.tools_provided = tools.get_tools() + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) - def get_response(self, message): - system_prompt = ( - "Don't make assumptions about the value of the arguments for the function " - "they should always be supplied by the user and do not alter the value of the arguments. " - "Don't make assumptions about what values to plug into functions. Ask for clarification if a user " - "request is ambiguous." - ) - - messages = [ - {"role": "system", "content": system_prompt}, - ] - messages.extend(message) + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for crypto-related queries.""" + try: + messages = [ + SystemMessage( + content=( + "Don't make assumptions about function arguments - " + "they should always be supplied by the user. " + "Ask for clarification if a request is ambiguous." + ) + ), + HumanMessage(content=request.prompt.content), + ] - logger.info("Sending request to LLM with %d messages", len(messages)) + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) - llm_with_tools = self.llm.bind_tools(self.tools_provided) + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + async def _execute_tool(self, func_name: str, args: dict) -> AgentResponse: + """Execute the appropriate crypto tool based on function name.""" try: - result = llm_with_tools.invoke(messages) - logger.info("Received response from LLM: %s", result) - - if result.tool_calls: - tool_call = result.tool_calls[0] - func_name = tool_call.get("name") - args = tool_call.get("args") - logger.info("LLM suggested using tool: %s", func_name) - - response_data = {"data": None, "coinId": None} + metadata = {} - if func_name == "get_price": - response_data["data"] = tools.get_coin_price_tool(args["coin_name"]) - response_data["coinId"] = tools.get_tradingview_symbol( - tools.get_coingecko_id(args["coin_name"]) - ) - return response_data, "assistant" - elif func_name == "get_floor_price": - response_data["data"] = tools.get_nft_floor_price_tool(args["nft_name"]) - return response_data, "assistant" - elif func_name == "get_fdv": - response_data["data"] = tools.get_fully_diluted_valuation_tool( - args["coin_name"] - ) - return response_data, "assistant" - elif func_name == "get_tvl": - response_data["data"] = tools.get_protocol_total_value_locked_tool( - args["protocol_name"] - ) - return response_data, "assistant" - elif func_name == "get_market_cap": - response_data["data"] = tools.get_coin_market_cap_tool(args["coin_name"]) - return response_data, "assistant" + if func_name == "get_price": + content = tools.get_coin_price_tool(args["coin_name"]) + trading_symbol = tools.get_tradingview_symbol(tools.get_coingecko_id(args["coin_name"])) + if trading_symbol: + metadata["coinId"] = trading_symbol + elif func_name == "get_floor_price": + content = tools.get_nft_floor_price_tool(args["nft_name"]) + elif func_name == "get_fdv": + content = tools.get_fully_diluted_valuation_tool(args["coin_name"]) + elif func_name == "get_tvl": + content = tools.get_protocol_total_value_locked_tool(args["protocol_name"]) + elif func_name == "get_market_cap": + content = tools.get_coin_market_cap_tool(args["coin_name"]) else: - logger.info("LLM provided a direct response without using tools") - return {"data": result.content, "coinId": None}, "assistant" - except Exception as e: - logger.error(f"Error in get_response: {str(e)}") - raise e + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") - def generate_response(self, prompt): - response, role = self.get_response([prompt]) - return response, role + if "error" in content.lower() or "not found" in content.lower(): + return AgentResponse.error(error_message=content) + + return AgentResponse.success(content=content, metadata=metadata) - def chat(self, request: ChatRequest): - try: - data = request.dict() - if "prompt" in data: - prompt = data["prompt"] - logger.info( - "Received chat request with prompt: %s", - prompt[:50] + "..." if len(prompt) > 50 else prompt, - ) - response, role = self.generate_response(prompt) - return {"role": role, "content": response} - else: - logger.warning("Received chat request without 'prompt' in data") - return {"error": "Missing required parameters"}, 400 except Exception as e: - logger.error("Error in chat method: %s", str(e), exc_info=True) - raise e + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) diff --git a/submodules/moragents_dockers/agents/src/agents/dca_agent/agent.py b/submodules/moragents_dockers/agents/src/agents/dca_agent/agent.py index 11a308d..4a7826b 100644 --- a/submodules/moragents_dockers/agents/src/agents/dca_agent/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/dca_agent/agent.py @@ -1,51 +1,66 @@ import logging from typing import Dict, Any -from src.models.messages import ChatRequest +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore from src.stores import wallet_manager_instance +from langchain.schema import HumanMessage, SystemMessage -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) logger = logging.getLogger(__name__) -class DCAAgent: +class DCAAgent(AgentCore): + """Agent for handling DCA (Dollar Cost Averaging) strategies.""" + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): - """ - Initialize the DCAAgent for managing DCA strategies. - """ - self.config = config - self.llm = llm - self.embeddings = embeddings - - def chat(self, request: ChatRequest): - """Handle incoming chat requests""" + """Initialize the DCAAgent.""" + super().__init__(config, llm, embeddings) + # Initialize any DCA-specific tools here + self.tools_provided = [] # TODO: Add DCA-specific tools + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) + + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for DCA-related queries.""" + # Check CDP client initialization + if not wallet_manager_instance.configure_cdp_client(): + # Return user-friendly error for missing credentials + return AgentResponse.success( + content="I'm not able to help with DCA strategies right now because the CDP client is not initialized. Please set up your API credentials first." + ) + + # Check for active wallet + active_wallet = wallet_manager_instance.get_active_wallet() + if not active_wallet: + # Return user-friendly error for missing wallet + return AgentResponse.success( + content="You'll need to select or create a wallet before I can help with DCA strategies. Please set up a wallet first." + ) + + try: + messages = [ + SystemMessage( + content=( + "You are a DCA strategy manager. " + "Help users set up and manage their DCA strategies. " + "Ask for clarification if a request is ambiguous." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate DCA tool based on function name.""" try: - data = request.dict() - if not data: - return {"role": "assistant", "content": "Invalid request data"} - - # Check CDP client initialization and active wallet - if not wallet_manager_instance.configure_cdp_client(): - return { - "role": "assistant", - "content": "CDP client not initialized. Please set API credentials.", - "needs_credentials": True, - } - - active_wallet = wallet_manager_instance.get_active_wallet() - if not active_wallet: - return { - "role": "assistant", - "content": "No active wallet selected. Please select or create a wallet first.", - } - - if "prompt" in data: - return {"role": "assistant", "content": "Ready to set up DCA"} - - return {"role": "assistant", "content": "Missing required parameters"} + # TODO: Implement DCA-specific tools + return AgentResponse.success(content=f"I don't know how to {func_name} yet. Please try a different action.") except Exception as e: - logger.error(f"Error in chat method: {str(e)}") - return {"role": "assistant", "content": str(e)} + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) diff --git a/submodules/moragents_dockers/agents/src/agents/default/agent.py b/submodules/moragents_dockers/agents/src/agents/default/agent.py index c76d8cb..9f19875 100644 --- a/submodules/moragents_dockers/agents/src/agents/default/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/default/agent.py @@ -1,52 +1,54 @@ import logging +from typing import Dict, Any -from src.models.messages import ChatRequest +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore from src.stores import agent_manager_instance +from langchain.schema import HumanMessage, SystemMessage logger = logging.getLogger(__name__) -class DefaultAgent: - def __init__(self, config, llm, embeddings): - self.config = config - self.llm = llm +class DefaultAgent(AgentCore): + """Agent for handling general conversation and providing information about Morpheus agents.""" - def chat(self, request: ChatRequest): + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) + + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for general conversation.""" try: - data = request.dict() - if "prompt" in data: - prompt = data["prompt"]["content"] - # Get currently selected agents for system prompt - available_agents = agent_manager_instance.get_available_agents() - selected_agent_names = agent_manager_instance.get_selected_agents() - - # Build list of human readable names for selected agents - selected_agents_info = [] - for agent in available_agents: - if agent["name"] in selected_agent_names and agent["name"] != "default": - human_name = agent.get("human_readable_name", agent["name"]) - selected_agents_info.append(f"- {human_name}: {agent['description']}") - - system_prompt = ( - "You are a helpful assistant that can engage in general conversation and provide information about Morpheus agents when specifically asked.\n" - "For general questions, respond naturally without mentioning Morpheus or its agents.\n" - "Only when explicitly asked about Morpheus or its capabilities, use this list of available agents:\n" - f"{chr(10).join(selected_agents_info)}\n" - "Remember: Only mention Morpheus agents if directly asked about them. Otherwise, simply answer questions normally as a helpful assistant." - ) - - messages = [ - { - "role": "system", - "content": system_prompt, - }, - {"role": "user", "content": prompt}, - ] - - result = self.llm.invoke(messages) - return {"role": "assistant", "content": result.content.strip()} - else: - return {"error": "Missing required parameters"}, 400 + # Get currently selected agents for system prompt + available_agents = agent_manager_instance.get_available_agents() + selected_agent_names = agent_manager_instance.get_selected_agents() + + # Build list of human readable names for selected agents + selected_agents_info = [] + for agent in available_agents: + if agent["name"] in selected_agent_names and agent["name"] != "default": + human_name = agent.get("human_readable_name", agent["name"]) + selected_agents_info.append(f"- {human_name}: {agent['description']}") + + system_prompt = ( + "You are a helpful assistant that can engage in general conversation and provide information about Morpheus agents when specifically asked.\n" + "For general questions, respond naturally without mentioning Morpheus or its agents.\n" + "Only when explicitly asked about Morpheus or its capabilities, use this list of available agents:\n" + f"{chr(10).join(selected_agents_info)}\n" + "Remember: Only mention Morpheus agents if directly asked about them. Otherwise, simply answer questions normally as a helpful assistant." + ) + + messages = [ + SystemMessage(content=system_prompt), + HumanMessage(content=request.prompt.content), + ] + + result = self.llm.invoke(messages) + return AgentResponse.success(content=result.content.strip()) + except Exception as e: - logger.error(f"Error in chat endpoint: {str(e)}") - return {"Error": str(e)}, 500 + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Default agent doesn't use any tools.""" + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") diff --git a/submodules/moragents_dockers/agents/src/agents/dexscreener/__init__.py b/submodules/moragents_dockers/agents/src/agents/dexscreener/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/submodules/moragents_dockers/agents/src/agents/dexscreener/agent.py b/submodules/moragents_dockers/agents/src/agents/dexscreener/agent.py new file mode 100644 index 0000000..2f39866 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/dexscreener/agent.py @@ -0,0 +1,112 @@ +import logging +from typing import Dict, Any, List, Union, Optional +from src.agents.agent_core.agent import AgentCore +from src.models.core import ChatRequest, AgentResponse +from langchain.schema import HumanMessage, SystemMessage +from .config import Config +from . import tools +from .models import TokenProfile, BoostedToken + +logger = logging.getLogger(__name__) + + +class DexScreenerAgent(AgentCore): + """Agent for interacting with DexScreener Token API.""" + + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) + self.tools_provided = Config.tools + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) + + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for DexScreener API interactions.""" + try: + messages = [ + SystemMessage( + content=( + "You are an agent that can fetch and analyze cryptocurrency token data " + "from DexScreener. You can get token profiles and information about " + "boosted tokens across different chains. When chain_id is not specified, " + "you'll get data for all chains. You can filter by specific chains like " + "'solana', 'ethereum', or 'bsc'." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate DexScreener API tool based on function name.""" + try: + chain_id = args.get("chain_id") + result = None + + if func_name == "get_latest_token_profiles": + result = await tools.get_latest_token_profiles(chain_id) + elif func_name == "get_latest_boosted_tokens": + result = await tools.get_latest_boosted_tokens(chain_id) + elif func_name == "get_top_boosted_tokens": + result = await tools.get_top_boosted_tokens(chain_id) + else: + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") + + return AgentResponse.success(content=self._format_token_response(result, chain_id)) + + except Exception as e: + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + def _format_token_response( + self, tokens: List[Union[TokenProfile, BoostedToken]], chain_id: Optional[str] = None + ) -> str: + """Format token data into a readable string.""" + if not tokens: + chain_msg = f" for chain {chain_id}" if chain_id else "" + return f"No tokens found{chain_msg}." + + # Limit to top 10 tokens + tokens = tokens[:10] + + formatted = f"# Top {len(tokens)} Tokens" + if chain_id: + formatted += f" on {chain_id}" + formatted += "\n\n" + + for token in tokens: + logger.info("Token: %s", token) + + # Add icon if available + if token.get("icon"): + formatted += f"![Token Icon]({token['icon']})\n\n" + + formatted += f"### `{token['tokenAddress']}`\n\n" + + if token.get("description"): + formatted += f"{token['description']}\n\n" + + # Format all links in a single line + links = token.get("links", []) + if links: + formatted += "**Links**: " + link_parts = [] + + # Add DexScreener URL first + link_parts.append(f"[DexScreener]({token['url']})") + + # Add other links + for link in links: + if link.get("url"): + label = link.get("type") or link.get("label") or "Link" + link_parts.append(f"[{label}]({link['url']})") + + formatted += " • ".join(link_parts) + "\n\n" + + formatted += "\n---\n\n\n" + + return formatted diff --git a/submodules/moragents_dockers/agents/src/agents/dexscreener/config.py b/submodules/moragents_dockers/agents/src/agents/dexscreener/config.py new file mode 100644 index 0000000..3f69747 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/dexscreener/config.py @@ -0,0 +1,56 @@ +class Config: + """Configuration for DexScreener Token API.""" + + BASE_URL = "https://api.dexscreener.com" + RATE_LIMIT = 60 # requests per minute + + ENDPOINTS = { + "token_profiles": "/token-profiles/latest/v1", + "latest_boosts": "/token-boosts/latest/v1", + "top_boosts": "/token-boosts/top/v1", + } + + tools = [ + { + "name": "get_latest_token_profiles", + "description": "Get the latest token profiles from DexScreener", + "parameters": { + "type": "object", + "properties": { + "chain_id": { + "type": "string", + "description": "Optional chain ID to filter results (e.g., 'solana', 'ethereum')", + "required": False, + } + }, + }, + }, + { + "name": "get_latest_boosted_tokens", + "description": "Get the latest boosted tokens from DexScreener", + "parameters": { + "type": "object", + "properties": { + "chain_id": { + "type": "string", + "description": "Optional chain ID to filter results (e.g., 'solana', 'ethereum')", + "required": False, + } + }, + }, + }, + { + "name": "get_top_boosted_tokens", + "description": "Get the tokens with most active boosts", + "parameters": { + "type": "object", + "properties": { + "chain_id": { + "type": "string", + "description": "Optional chain ID to filter results (e.g., 'solana', 'ethereum')", + "required": False, + } + }, + }, + }, + ] diff --git a/submodules/moragents_dockers/agents/src/agents/dexscreener/models.py b/submodules/moragents_dockers/agents/src/agents/dexscreener/models.py new file mode 100644 index 0000000..2491ed4 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/dexscreener/models.py @@ -0,0 +1,22 @@ +from typing import TypedDict, List, Optional + + +class TokenLink(TypedDict): + type: str + label: str + url: str + + +class TokenProfile(TypedDict): + url: str + chainId: str + tokenAddress: str + icon: Optional[str] + header: Optional[str] + description: Optional[str] + links: Optional[List[TokenLink]] + + +class BoostedToken(TokenProfile): + amount: float + totalAmount: float diff --git a/submodules/moragents_dockers/agents/src/agents/dexscreener/tools.py b/submodules/moragents_dockers/agents/src/agents/dexscreener/tools.py new file mode 100644 index 0000000..7b0d98f --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/dexscreener/tools.py @@ -0,0 +1,61 @@ +import logging +from typing import Dict, Any, List, Optional +import aiohttp +from src.agents.dexscreener.models import TokenProfile, BoostedToken +from src.agents.dexscreener.config import Config + +logger = logging.getLogger(__name__) + + +def filter_by_chain(tokens: List[Dict[str, Any]], chain_id: Optional[str] = None) -> List[Dict[str, Any]]: + """Filter tokens by chain ID if provided.""" + if not chain_id: + return tokens + return [token for token in tokens if token.get("chainId", "").lower() == chain_id.lower()] + + +async def _make_request(endpoint: str) -> Dict[str, Any]: + """Make an API request to DexScreener.""" + url = f"{Config.BASE_URL}{endpoint}" + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status != 200: + raise Exception(f"API request failed with status {response.status}") + return await response.json() + except Exception as e: + logger.error(f"API request failed: {str(e)}", exc_info=True) + raise Exception(f"Failed to fetch data: {str(e)}") + + +async def get_latest_token_profiles(chain_id: Optional[str] = None) -> List[TokenProfile]: + """Get the latest token profiles, optionally filtered by chain.""" + try: + response = await _make_request(Config.ENDPOINTS["token_profiles"]) + tokens = response if isinstance(response, list) else [] + return filter_by_chain(tokens, chain_id) + except Exception as e: + raise Exception(f"Failed to get token profiles: {str(e)}") + + +async def get_latest_boosted_tokens(chain_id: Optional[str] = None) -> List[BoostedToken]: + """Get the latest boosted tokens, optionally filtered by chain.""" + try: + response = await _make_request(Config.ENDPOINTS["latest_boosts"]) + tokens = response if isinstance(response, list) else [] + return filter_by_chain(tokens, chain_id) + except Exception as e: + raise Exception(f"Failed to get boosted tokens: {str(e)}") + + +async def get_top_boosted_tokens(chain_id: Optional[str] = None) -> List[BoostedToken]: + """Get tokens with most active boosts, optionally filtered by chain.""" + try: + response = await _make_request(Config.ENDPOINTS["top_boosts"]) + tokens = response if isinstance(response, list) else [] + filtered_tokens = filter_by_chain(tokens, chain_id) + + # Sort by total amount + return sorted(filtered_tokens, key=lambda x: float(x.get("totalAmount", 0) or 0), reverse=True) + except Exception as e: + raise Exception(f"Failed to get top boosted tokens: {str(e)}") diff --git a/submodules/moragents_dockers/agents/src/agents/imagen/agent.py b/submodules/moragents_dockers/agents/src/agents/imagen/agent.py index e9f29d7..5182e67 100644 --- a/submodules/moragents_dockers/agents/src/agents/imagen/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/imagen/agent.py @@ -1,6 +1,7 @@ import base64 import logging from io import BytesIO +from typing import Dict, Any, List, Optional import requests from PIL import Image @@ -10,21 +11,56 @@ from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait -from src.models.messages import ChatRequest -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore +from langchain.schema import HumanMessage, SystemMessage + logger = logging.getLogger(__name__) -class ImagenAgent: - def __init__(self, config, llm, embeddings): - self.config = config - self.llm = llm - self.embeddings = embeddings +class ImagenAgent(AgentCore): + """Agent for handling image generation requests.""" + + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) + self.tools_provided: List[str] = [] # No tools needed for image generation + self.tool_bound_llm = self.llm + + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for image generation.""" + try: + messages = [ + SystemMessage( + content=( + "You are an image generation assistant. " + "Help users create images by understanding their prompts " + "and generating appropriate images." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + # For image generation, we'll directly use the prompt content + result = self.generate_image(request.prompt.content) + + if result["success"]: + return AgentResponse.success( + content="Image generated successfully", + metadata={"success": True, "service": result["service"], "image": result["image"]}, + ) + else: + return AgentResponse.error(error_message=result["error"]) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Image generation agent doesn't use tools.""" + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") - def _setup_headless_browser(self): + def _setup_headless_browser(self) -> webdriver.Chrome: chrome_options = Options() # Essential Chromium flags for running in Docker @@ -54,8 +90,9 @@ def _setup_headless_browser(self): logger.error(f"Failed to setup Chromium browser: {str(e)}") raise - def _generate_with_fluxai(self, prompt): + def _generate_with_fluxai(self, prompt: str) -> Optional[Image.Image]: logger.info(f"Attempting image generation for prompt: {prompt}") + driver = None try: driver = self._setup_headless_browser() driver.set_page_load_timeout(30) @@ -76,13 +113,15 @@ def _generate_with_fluxai(self, prompt): # Wait for the generated image img_element = WebDriverWait(driver, 30).until( - EC.presence_of_element_located( - (By.XPATH, "//img[@alt='Generated' and @loading='lazy']") - ) + EC.presence_of_element_located((By.XPATH, "//img[@alt='Generated' and @loading='lazy']")) ) if img_element: img_src = img_element.get_attribute("src") + if not img_src: + logger.warning("Image source URL is empty") + return None + logger.debug(f"Image source: {img_src}") # Download the image @@ -97,28 +136,25 @@ def _generate_with_fluxai(self, prompt): img_data = response.content return Image.open(BytesIO(img_data)) else: - logger.error( - f"Failed to download image. Status code: {response.status_code}" - ) + logger.error(f"Failed to download image. Status code: {response.status_code}") else: - logger.warning( - "Image format not supported. Expected a valid imgproxy or replicate URL." - ) + logger.warning("Image format not supported. Expected a valid imgproxy or replicate URL.") else: - logger.warning( - "Image not found or still generating. You may need to increase the wait time." - ) + logger.warning("Image not found or still generating. You may need to increase the wait time.") except Exception as e: logger.error(f"Error in image generation: {str(e)}") - return None + finally: - try: - driver.quit() - except Exception as e: - logger.error(f"Error closing browser: {str(e)}") + if driver: + try: + driver.quit() + except Exception as e: + logger.error(f"Error closing browser: {str(e)}") - def _encode_image(self, image): + return None + + def _encode_image(self, image: Optional[Image.Image]) -> Optional[str]: if image: buffered = BytesIO() image.save(buffered, format="PNG") @@ -126,7 +162,7 @@ def _encode_image(self, image): return img_str return None - def generate_image(self, prompt): + def generate_image(self, prompt: str) -> Dict[str, Any]: logger.info(f"Starting image generation for prompt: {prompt}") # Generate image using the new method @@ -140,18 +176,3 @@ def generate_image(self, prompt): "success": False, "error": "Failed to generate image with FluxAI", } - - def chat(self, request: ChatRequest): - try: - data = request.dict() - logger.info(f"Received chat request: {data}") - if "prompt" in data: - prompt = data["prompt"] - result = self.generate_image(prompt["content"]) - return {"role": "assistant", "content": result} - else: - logger.error("Missing 'prompt' in chat request data") - return {"error": "Missing parameters"}, 400 - except Exception as e: - logger.error(f"Unexpected error in chat method: {str(e)}, request: {request}") - raise e diff --git a/submodules/moragents_dockers/agents/src/agents/mor_claims/agent.py b/submodules/moragents_dockers/agents/src/agents/mor_claims/agent.py index 374b8c9..f4d7415 100644 --- a/submodules/moragents_dockers/agents/src/agents/mor_claims/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/mor_claims/agent.py @@ -1,163 +1,119 @@ import logging +from typing import Dict, Any from src.agents.mor_claims import tools -from src.models.messages import ChatRequest +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore +from langchain.schema import HumanMessage, SystemMessage from src.stores import agent_manager_instance logger = logging.getLogger(__name__) -class MorClaimsAgent: - def __init__(self, agent_info, llm, embeddings): - self.agent_info = agent_info - self.llm = llm - self.embeddings = embeddings +class MorClaimsAgent(AgentCore): + """Agent for handling MOR token claims and rewards.""" + + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) self.tools_provided = tools.get_tools() + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) self.conversation_state = {} - def _get_response(self, message, wallet_address): - if wallet_address not in self.conversation_state: - self.conversation_state[wallet_address] = {"state": "initial"} + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for MOR claims.""" + try: + wallet_address = request.wallet_address - state = self.conversation_state[wallet_address]["state"] + if wallet_address not in self.conversation_state: + self.conversation_state[wallet_address] = {"state": "initial"} - if state == "initial": - agent_manager_instance.set_active_agent(self.agent_info["name"]) + state = self.conversation_state[wallet_address]["state"] - rewards = { - 0: tools.get_current_user_reward(wallet_address, 0), - 1: tools.get_current_user_reward(wallet_address, 1), - } - available_rewards = {pool: amount for pool, amount in rewards.items() if amount > 0} + if state == "initial": + agent_manager_instance.set_active_agent("mor claims") - if available_rewards: - selected_pool = max(available_rewards, key=available_rewards.get) - self.conversation_state[wallet_address]["available_rewards"] = { - selected_pool: available_rewards[selected_pool] - } - self.conversation_state[wallet_address]["receiver_address"] = wallet_address - self.conversation_state[wallet_address]["state"] = "awaiting_confirmation" - return ( - f"You have {available_rewards[selected_pool]} MOR rewards available in pool {selected_pool}. Would you like to proceed with claiming these rewards?", - "assistant", - self.agent_info["name"], - ) - else: - return ( - f"No rewards found for your wallet address {wallet_address} in either pool. Claim cannot be processed.", - "assistant", - None, - ) - - elif state == "awaiting_confirmation": - user_input = message[-1]["content"].lower() - if any(word in user_input for word in ["yes", "proceed", "confirm", "claim"]): - return self.prepare_transactions(wallet_address) - else: - return ( - "Please confirm if you want to proceed with the claim by saying 'yes', 'proceed', 'confirm', or 'claim'.", - "assistant", - self.agent_info["name"], - ) - - return ( - "I'm sorry, I didn't understand that. Can you please rephrase your request?", - "assistant", - self.agent_info["name"], - ) - - def prepare_transactions(self, wallet_address): - available_rewards = self.conversation_state[wallet_address]["available_rewards"] - receiver_address = self.conversation_state[wallet_address]["receiver_address"] - transactions = [] - - for pool_id in available_rewards.keys(): - try: - tx_data = tools.prepare_claim_transaction(pool_id, receiver_address) - transactions.append({"pool": pool_id, "transaction": tx_data}) - except Exception as e: - return ( - f"Error preparing transaction for pool {pool_id}: {str(e)}", - "assistant", - None, - ) - - self.conversation_state[wallet_address]["transactions"] = transactions - - # Return a structured response - return ( - { - "role": "claim", - "content": {"transactions": transactions, "claim_tx_cb": "/claim"}, - }, - "claim", - None, - ) - - def chat(self, request: ChatRequest): - try: - data = request.dict() - if "prompt" in data and "wallet_address" in data: - prompt = data["prompt"] - wallet_address = data["wallet_address"] - response, role, next_turn_agent = self._get_response([prompt], wallet_address) - return { - "role": role, - "content": response, - "next_turn_agent": next_turn_agent, + rewards = { + 0: tools.get_current_user_reward(wallet_address, 0), + 1: tools.get_current_user_reward(wallet_address, 1), } - else: - return {"error": "Missing required parameters"}, 400 - except Exception as e: - logger.error(f"Unexpected error in chat method: {str(e)}, request: {request}") - raise e + available_rewards = {pool: amount for pool, amount in rewards.items() if amount > 0} + + if available_rewards: + selected_pool = max(available_rewards, key=available_rewards.get) + self.conversation_state[wallet_address]["available_rewards"] = { + selected_pool: available_rewards[selected_pool] + } + self.conversation_state[wallet_address]["receiver_address"] = wallet_address + self.conversation_state[wallet_address]["state"] = "awaiting_confirmation" + return AgentResponse.success( + content=f"You have {available_rewards[selected_pool]} MOR rewards available in pool {selected_pool}. Would you like to proceed with claiming these rewards?" + ) + else: + return AgentResponse.error( + error_message=f"No rewards found for your wallet address {wallet_address} in either pool. Claim cannot be processed." + ) + + elif state == "awaiting_confirmation": + user_input = request.prompt.content.lower() + if any(word in user_input for word in ["yes", "proceed", "confirm", "claim"]): + return await self._prepare_transactions(wallet_address) + else: + return AgentResponse.success( + content="Please confirm if you want to proceed with the claim by saying 'yes', 'proceed', 'confirm', or 'claim'." + ) + + messages = [ + SystemMessage( + content=( + "You are a MOR claims agent that helps users claim their MOR rewards. " + "Ask for clarification if a request is ambiguous." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) - def claim(self, request: ChatRequest): - try: - data = request.dict() - wallet_address = data["wallet_address"] - transactions = self.conversation_state[wallet_address]["transactions"] - agent_manager_instance.clear_active_agent() - return {"transactions": transactions} except Exception as e: - return {"error": str(e)}, 500 + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) - def claim_status(self, request: ChatRequest): + async def _prepare_transactions(self, wallet_address: str) -> AgentResponse: + """Prepare claim transactions for the given wallet.""" try: - data = request.dict() - wallet_address = data.get("wallet_address") - transaction_hash = data.get("transaction_hash") - status = data.get("status") - - if not all([wallet_address, transaction_hash, status]): - return {"error": "Missing required parameters"}, 400 + available_rewards = self.conversation_state[wallet_address]["available_rewards"] + receiver_address = self.conversation_state[wallet_address]["receiver_address"] + transactions = [] + + for pool_id in available_rewards.keys(): + try: + tx_data = tools.prepare_claim_transaction(pool_id, receiver_address) + transactions.append({"pool": pool_id, "transaction": tx_data}) + except Exception as e: + return AgentResponse.error( + error_message=f"Error preparing transaction for pool {pool_id}: {str(e)}" + ) + + self.conversation_state[wallet_address]["transactions"] = transactions + + return AgentResponse.action_required( + content={"transactions": transactions, "claim_tx_cb": "/claim"}, action_type="claim" + ) - # Generate and return the status message - response = self.get_status(status, transaction_hash, "claim") - return response, 200 except Exception as e: - return {"error": str(e)}, 500 - - def get_status(self, flag, tx_hash, tx_type): - response = "" - - if flag == "cancelled": - response = f"The claim transaction has been cancelled." - elif flag == "success": - response = f"The claim transaction was successful." - elif flag == "failed": - response = f"The claim transaction has failed." - elif flag == "initiated": - response = f"Claim transaction has been sent, please wait for it to be confirmed." - - if tx_hash: - response = ( - response + f" The transaction hash is {tx_hash}. " - f"Here's the link to the Etherscan transaction: " - f"https://etherscan.io/tx/{tx_hash}" - ) + logger.error(f"Error preparing transactions: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) - if flag != "initiated": - response = response + " Is there anything else I can help you with?" + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate MOR claims tool based on function name.""" + try: + if func_name == "get_claim_status": + status = tools.get_claim_status(args["transaction_hash"]) + return AgentResponse.success(content=status) + else: + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") - return {"role": "assistant", "content": response} + except Exception as e: + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) diff --git a/submodules/moragents_dockers/agents/src/agents/news_agent/agent.py b/submodules/moragents_dockers/agents/src/agents/news_agent/agent.py index c10ac86..a6090b5 100644 --- a/submodules/moragents_dockers/agents/src/agents/news_agent/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/news_agent/agent.py @@ -1,21 +1,23 @@ import logging import re - import pyshorteners from src.agents.news_agent.config import Config from src.agents.news_agent.tools import clean_html, fetch_rss_feed, is_within_time_window -from src.models.messages import ChatRequest +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore +from langchain.schema import HumanMessage, SystemMessage logger = logging.getLogger(__name__) -class NewsAgent: - def __init__(self, agent_info, llm, embeddings): - self.agent_info = agent_info - self.llm = llm - self.embeddings = embeddings +class NewsAgent(AgentCore): + """Agent for fetching and analyzing cryptocurrency news.""" + + def __init__(self, config, llm, embeddings): + super().__init__(config, llm, embeddings) self.tools_provided = self.get_tools() + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) self.url_shortener = pyshorteners.Shortener() def get_tools(self): @@ -40,7 +42,59 @@ def get_tools(self): } ] - def check_relevance_and_summarize(self, title, content, coin): + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for news-related queries.""" + try: + messages = [ + SystemMessage( + content=( + "You are a news analysis agent that fetches and analyzes cryptocurrency news. " + "Ask for clarification if a request is ambiguous." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: dict) -> AgentResponse: + """Execute the appropriate news tool based on function name.""" + try: + if func_name == "fetch_crypto_news": + coins = args.get("coins", []) + if not coins: + return AgentResponse.error(error_message="No coins specified for news fetch") + + news = self._fetch_crypto_news(coins) + if not news: + return AgentResponse.success( + content="No relevant news found for the specified cryptocurrencies in the last 24 hours." + ) + + response = "Here are the latest news items relevant to changes in price movement of the mentioned tokens in the last 24 hours:\n\n" + for index, item in enumerate(news, start=1): + coin_name = Config.CRYPTO_DICT.get(item["Coin"], item["Coin"]) + short_url = self.url_shortener.tinyurl.short(item["Link"]) + response += f"{index}. ***{coin_name} News***:\n" + response += f"{item['Title']}\n" + response += f"{item['Summary']}\n" + response += f"Read more: {short_url}\n\n" + + return AgentResponse.success(content=response) + else: + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") + + except Exception as e: + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + def _check_relevance_and_summarize(self, title, content, coin): + """Check if news is relevant and generate summary.""" logger.info(f"Checking relevance for {coin}: {title}") prompt = Config.RELEVANCE_PROMPT.format(coin=coin, title=title, content=content) result = self.llm.invoke( @@ -50,7 +104,8 @@ def check_relevance_and_summarize(self, title, content, coin): ) return result.content.strip() - def process_rss_feed(self, feed_url, coin): + def _process_rss_feed(self, feed_url, coin): + """Process RSS feed and filter relevant articles.""" logger.info(f"Processing RSS feed for {coin}: {feed_url}") feed = fetch_rss_feed(feed_url) results = [] @@ -60,7 +115,7 @@ def process_rss_feed(self, feed_url, coin): title = clean_html(entry.title) content = clean_html(entry.summary) logger.info(f"Checking relevance for article: {title}") - result = self.check_relevance_and_summarize(title, content, coin) + result = self._check_relevance_and_summarize(title, content, coin) if not result.upper().startswith("NOT RELEVANT"): results.append({"Title": title, "Summary": result, "Link": entry.link}) if len(results) >= Config.ARTICLES_PER_TOKEN: @@ -70,72 +125,16 @@ def process_rss_feed(self, feed_url, coin): logger.info(f"Found {len(results)} relevant articles for {coin}") return results - def fetch_crypto_news(self, coins): + def _fetch_crypto_news(self, coins): + """Fetch and process news for specified coins.""" logger.info(f"Fetching news for coins: {coins}") all_news = [] for coin in coins: logger.info(f"Processing news for {coin}") coin_name = Config.CRYPTO_DICT.get(coin.upper(), coin) google_news_url = Config.GOOGLE_NEWS_BASE_URL.format(coin_name) - results = self.process_rss_feed(google_news_url, coin_name) - all_news.extend( - [{"Coin": coin, **result} for result in results[: Config.ARTICLES_PER_TOKEN]] - ) + results = self._process_rss_feed(google_news_url, coin_name) + all_news.extend([{"Coin": coin, **result} for result in results[: Config.ARTICLES_PER_TOKEN]]) logger.info(f"Total news items fetched: {len(all_news)}") return all_news - - def chat(self, request: ChatRequest): - try: - data = request.dict() - if "prompt" in data: - prompt = data["prompt"] - if isinstance(prompt, dict) and "content" in prompt: - prompt = prompt["content"] - - # Updated coin detection logic - coins = re.findall( - r"\b(" + "|".join(re.escape(key) for key in Config.CRYPTO_DICT.keys()) + r")\b", - prompt.upper(), - ) - - if not coins: - return { - "role": "assistant", - "content": "I couldn't identify any cryptocurrency symbols in your message. Please specify the cryptocurrencies you want news for.", - "next_turn_agent": None, - } - - news = self.fetch_crypto_news(coins) - - if not news: - return { - "role": "assistant", - "content": "No relevant news found for the specified cryptocurrencies in the last 24 hours.", - "next_turn_agent": None, - } - - response = "Here are the latest news items relevant to changes in price movement of the mentioned tokens in the last 24 hours:\n\n" - for index, item in enumerate(news, start=1): - coin_name = Config.CRYPTO_DICT.get(item["Coin"], item["Coin"]) - short_url = self.url_shortener.tinyurl.short(item["Link"]) - response += f"{index}. ***{coin_name} News***:\n" - response += f"{item['Title']}\n" - response += f"{item['Summary']}\n" - response += f"Read more: {short_url}\n\n" - - return { - "role": "assistant", - "content": response, - "next_turn_agent": None, - } - else: - return { - "role": "assistant", - "content": "Missing required parameters", - "next_turn_agent": None, - } - - except Exception as e: - logger.error(f"Error in chat method: {str(e)}, request: {request}") - raise e diff --git a/submodules/moragents_dockers/agents/src/agents/realtime_search/agent.py b/submodules/moragents_dockers/agents/src/agents/realtime_search/agent.py index c022186..73b382e 100644 --- a/submodules/moragents_dockers/agents/src/agents/realtime_search/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/realtime_search/agent.py @@ -1,5 +1,5 @@ import logging -import time +from typing import Dict, Any import requests from bs4 import BeautifulSoup @@ -7,46 +7,80 @@ from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys -from src.models.messages import ChatRequest -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore +from langchain.schema import HumanMessage, SystemMessage +from src.agents.realtime_search.config import Config + logger = logging.getLogger(__name__) -USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" +class RealtimeSearchAgent(AgentCore): + """Agent for performing real-time web searches.""" -class RealtimeSearchAgent: - def __init__(self, config, llm, embeddings): - self.config = config - self.llm = llm - self.embeddings = embeddings + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) self.last_search_term = None + self.tools_provided = Config.tools + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) + + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for web search queries.""" + try: + messages = [ + SystemMessage( + content=( + "You are a real-time web search agent that helps find current information. " + "Ask for clarification if a request is ambiguous." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate search tool based on function name.""" + try: + if func_name == "perform_web_search": + search_term = args.get("search_term") + if not search_term: + return AgentResponse.error(error_message="No search term provided") + + search_results = self._perform_search_with_web_scraping(search_term) + if "Error performing web search" in search_results: + return AgentResponse.error(error_message=search_results) + + synthesized_answer = self._synthesize_answer(search_term, search_results) + return AgentResponse.success(content=synthesized_answer) + else: + return AgentResponse.error(error_message=f"Unknown tool: {func_name}") - def perform_search_with_web_scraping(self, search_term=None): - if search_term is not None: - self.last_search_term = search_term - elif self.last_search_term is None: - logger.warning("No search term available for web search") - return "Web search failed. Please provide a search term." - else: - search_term = self.last_search_term + except Exception as e: + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + def _perform_search_with_web_scraping(self, search_term: str) -> str: + """Perform web search using requests and BeautifulSoup.""" logger.info(f"Performing web search for: {search_term}") try: - url = f"https://www.google.com/search?q={search_term}" - headers = {"User-Agent": USER_AGENT} + url = Config.SEARCH_URL.format(search_term) + headers = {"User-Agent": Config.USER_AGENT} response = requests.get(url, headers=headers) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") - search_results = soup.find_all("div", class_="g") formatted_results = [] - for result in search_results[:5]: + for result in search_results[: Config.MAX_SEARCH_RESULTS]: result_text = result.get_text(strip=True) formatted_results.append(f"Result:\n{result_text}") @@ -55,82 +89,55 @@ def perform_search_with_web_scraping(self, search_term=None): except requests.RequestException as e: logger.error(f"Error performing web search: {str(e)}") logger.info("Attempting fallback to headless browsing") - return self.perform_search_with_headless_browsing(search_term) + return self._perform_search_with_headless_browsing(search_term) - def perform_search_with_headless_browsing(self, search_term): + def _perform_search_with_headless_browsing(self, search_term: str) -> str: + """Fallback search method using headless Chrome.""" chrome_options = Options() - chrome_options.add_argument("--headless") - + for option in Config.CHROME_OPTIONS: + chrome_options.add_argument(option) driver = webdriver.Chrome(options=chrome_options) try: driver.get("https://www.google.com") - search_box = driver.find_element(By.NAME, "q") search_box.send_keys(search_term) search_box.send_keys(Keys.RETURN) - time.sleep(2) - soup = BeautifulSoup(driver.page_source, "html.parser") - search_results = soup.find_all("div", class_="g") formatted_results = [] - for result in search_results[:5]: + for result in search_results[: Config.MAX_SEARCH_RESULTS]: result_text = result.get_text(strip=True) formatted_results.append(f"Result:\n{result_text}") return "\n\n".join(formatted_results) except Exception as e: - logger.error(f"Error performing headless web search: {str(e)}") - return f"Error performing web search: {str(e)}" + error_msg = f"Error performing headless web search: {str(e)}" + logger.error(error_msg) + return error_msg finally: driver.quit() - def synthesize_answer(self, search_term, search_results): + def _synthesize_answer(self, search_term: str, search_results: str) -> str: + """Synthesize search results into a coherent answer.""" logger.info("Synthesizing answer from search results") messages = [ { "role": "system", - "content": """You are a helpful assistant that synthesizes information from web search results to answer user queries. - Do not preface your answer with 'Based on the search results, I can tell you that:' or anything similar. - Just provide the answer.""", + "content": Config.SYNTHESIS_SYSTEM_PROMPT, }, { "role": "user", - "content": f"""Based on the following search results for the query '{search_term}', provide a concise and informative answer: {search_results}""", + "content": f"Query: {search_term}\nResults: {search_results}", }, ] try: - result = self.llm.invoke(messages) - logger.info(f"Received response from LLM: {result}") + result = self.llm.invoke(messages, max_tokens=Config.MAX_TOKENS, temperature=Config.TEMPERATURE) return result.content.strip() except Exception as e: logger.error(f"Error synthesizing answer: {str(e)}") raise - - def chat(self, request: ChatRequest): - try: - data = request.dict() - logger.info(f"Received chat request: {data}") - if "prompt" in data: - prompt = data["prompt"] - search_term = prompt["content"] - logger.info(f"Performing web search for prompt: {search_term}") - - search_results = self.perform_search_with_web_scraping(search_term) - logger.info(f"Search results obtained") - - synthesized_answer = self.synthesize_answer(search_term, search_results) - logger.info(f"Synthesized answer: {synthesized_answer}") - - return {"role": "assistant", "content": synthesized_answer} - else: - logger.error("Missing 'prompt' in chat request data") - return {"error": "Missing parameters"}, 400 - except Exception as e: - logger.error(f"Unexpected error in chat method: {str(e)}, request: {request}") - raise e diff --git a/submodules/moragents_dockers/agents/src/agents/realtime_search/config.py b/submodules/moragents_dockers/agents/src/agents/realtime_search/config.py new file mode 100644 index 0000000..52b96d4 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/realtime_search/config.py @@ -0,0 +1,30 @@ +class Config: + tools = [ + { + "name": "perform_web_search", + "description": "Perform a web search and return relevant results", + "parameters": { + "type": "object", + "properties": { + "search_term": { + "type": "string", + "description": "The search term to look up", + } + }, + "required": ["search_term"], + }, + } + ] + + # Web scraping configuration + USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + MAX_SEARCH_RESULTS = 5 + SEARCH_URL = "https://www.google.com/search?q={}" + + # Chrome options for headless browsing + CHROME_OPTIONS = ["--headless", "--disable-gpu", "--no-sandbox", "--disable-dev-shm-usage"] + + # LLM configuration for answer synthesis + SYNTHESIS_SYSTEM_PROMPT = "Synthesize information from web search results into a clear, direct answer." + MAX_TOKENS = 150 + TEMPERATURE = 0.3 diff --git a/submodules/moragents_dockers/agents/src/agents/rugcheck/__init__.py b/submodules/moragents_dockers/agents/src/agents/rugcheck/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/submodules/moragents_dockers/agents/src/agents/rugcheck/agent.py b/submodules/moragents_dockers/agents/src/agents/rugcheck/agent.py new file mode 100644 index 0000000..029fc28 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/rugcheck/agent.py @@ -0,0 +1,131 @@ +import logging +from typing import Any, Dict, Optional, List + +from src.agents.agent_core.agent import AgentCore +from src.models.core import ChatRequest, AgentResponse +from langchain.schema import HumanMessage, SystemMessage +from .config import Config +from .client import RugcheckClient + +logger = logging.getLogger(__name__) + + +class RugcheckAgent(AgentCore): + """Agent for analyzing token safety and trends using the Rugcheck API.""" + + def __init__(self, config: Dict[str, Any], llm: Any, embeddings: Any): + super().__init__(config, llm, embeddings) + self.tools_provided = Config.tools + self.tool_bound_llm = self.llm.bind_tools(self.tools_provided) + self.api_base_url = "https://api.rugcheck.xyz/v1" + + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for token analysis.""" + try: + messages = [ + SystemMessage( + content=( + "You are an agent that can analyze tokens for safety and view trending tokens. " + "When you need to perform an analysis, use the appropriate function call." + ) + ), + HumanMessage(content=request.prompt.content), + ] + + result = self.tool_bound_llm.invoke(messages) + return await self._handle_llm_response(result) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _execute_tool(self, func_name: str, args: Dict[str, Any]) -> AgentResponse: + """Execute the appropriate Rugcheck API tool based on function name.""" + try: + if func_name == "get_token_report": + mint = args.get("mint") + if not mint: + return AgentResponse.error(error_message="Please provide a token mint address") + + try: + report = await self._fetch_token_report(mint) + + # Format a user-friendly response + risks = "\n".join( + [ + f"- {risk['name']}: {risk['description']} (Score: {risk['score']})" + for risk in report.get("risks", []) + ] + ) + + content = ( + f"Token Analysis Report for {mint}\n" + f"Name: {report.get('tokenMeta', {}).get('name')}\n" + f"Symbol: {report.get('tokenMeta', {}).get('symbol')}\n" + f"Overall Score: {report.get('score')}\n" + f"\nRisk Factors:\n{risks}\n" + f"\nTotal Market Liquidity: {report.get('totalMarketLiquidity')} USD" + ) + + return AgentResponse.success(content=content) + + except Exception as e: + return AgentResponse.error(error_message=f"Failed to get token report: {str(e)}") + + elif func_name == "get_most_viewed": + try: + viewed_tokens = await self._fetch_most_viewed() + content = "Most Viewed Tokens (Past 24h):\n" + tokens_list = list(viewed_tokens.values()) if isinstance(viewed_tokens, dict) else viewed_tokens + for token in tokens_list[:10]: # Top 10 + content += ( + f"\n- {token['metadata']['name']} ({token['metadata']['symbol']})\n" + f" Visits: {token['visits']}, Unique Users: {token['user_visits']}" + ) + return AgentResponse.success(content=content) + + except Exception as e: + return AgentResponse.error(error_message=f"Failed to get most viewed tokens: {str(e)}") + + elif func_name == "get_most_voted": + try: + voted_tokens = await self._fetch_most_voted() + content = "Most Voted Tokens (Past 24h):\n" + tokens_list = list(voted_tokens.values()) if isinstance(voted_tokens, dict) else voted_tokens + for token in tokens_list[:10]: # Top 10 + content += f"\n- Mint: {token['mint']}\n Upvotes: {token['up_count']}, Total Votes: {token['vote_count']}" + return AgentResponse.success(content=content) + + except Exception as e: + return AgentResponse.error(error_message=f"Failed to get most voted tokens: {str(e)}") + + else: + return AgentResponse.error(error_message=f"Unknown tool function: {func_name}") + + except Exception as e: + logger.error(f"Error executing tool {func_name}: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + async def _fetch_token_report(self, mint: str) -> Dict[str, Any]: + """Fetch token report from Rugcheck API.""" + client = RugcheckClient(self.api_base_url) + try: + return await client.get_token_report(mint) + finally: + await client.close() + + async def _fetch_most_viewed(self) -> Dict[str, Any]: + """Fetch most viewed tokens from Rugcheck API.""" + client = RugcheckClient(self.api_base_url) + try: + return await client.get_most_viewed() + finally: + await client.close() + + async def _fetch_most_voted(self) -> Dict[str, Any]: + """Fetch most voted tokens from Rugcheck API.""" + client = RugcheckClient(self.api_base_url) + try: + return await client.get_most_voted() + finally: + await client.close() diff --git a/submodules/moragents_dockers/agents/src/agents/rugcheck/client.py b/submodules/moragents_dockers/agents/src/agents/rugcheck/client.py new file mode 100644 index 0000000..7051e2b --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/rugcheck/client.py @@ -0,0 +1,56 @@ +import aiohttp +import logging +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + + +class RugcheckClient: + """Client for interacting with the Rugcheck API.""" + + def __init__(self, base_url: str = "https://api.rugcheck.xyz/v1"): + self.base_url = base_url + self._session: Optional[aiohttp.ClientSession] = None + + async def _ensure_session(self): + """Ensure aiohttp session exists.""" + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + + async def close(self): + """Close the aiohttp session.""" + if self._session and not self._session.closed: + await self._session.close() + + async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: + """Make HTTP request to Rugcheck API.""" + await self._ensure_session() + url = f"{self.base_url}{endpoint}" + + try: + async with self._session.request(method, url, **kwargs) as response: + response.raise_for_status() + return await response.json() + + except aiohttp.ClientError as e: + logger.error(f"HTTP error for {url}: {str(e)}") + raise Exception(f"Failed to fetch data from Rugcheck API: {str(e)}") + + except Exception as e: + logger.error(f"Unexpected error for {url}: {str(e)}") + raise + + async def get_token_report(self, mint: str) -> Dict[str, Any]: + """Get detailed report for a token mint.""" + endpoint = f"/tokens/{mint}/report/summary" + return await self._make_request("GET", endpoint) + + async def get_most_viewed(self) -> Dict[str, Any]: + """Get most viewed tokens in past 24 hours.""" + endpoint = "/stats/recent" + return await self._make_request("GET", endpoint) + + async def get_most_voted(self) -> Dict[str, Any]: + """Get most voted tokens in past 24 hours.""" + endpoint = "/stats/trending" + return await self._make_request("GET", endpoint) diff --git a/submodules/moragents_dockers/agents/src/agents/rugcheck/config.py b/submodules/moragents_dockers/agents/src/agents/rugcheck/config.py new file mode 100644 index 0000000..e479123 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/rugcheck/config.py @@ -0,0 +1,24 @@ +class Config: + """Configuration for RugcheckAgent tools""" + + tools = [ + { + "name": "get_token_report", + "description": "Generate a report summary for a given token mint address", + "parameters": { + "type": "object", + "properties": {"mint": {"type": "string", "description": "Token mint address to analyze"}}, + "required": ["mint"], + }, + }, + { + "name": "get_most_viewed", + "description": "Get most viewed tokens in past 24 hours", + "parameters": {"type": "object", "properties": {}}, + }, + { + "name": "get_most_voted", + "description": "Get most voted tokens in past 24 hours", + "parameters": {"type": "object", "properties": {}}, + }, + ] diff --git a/submodules/moragents_dockers/agents/src/agents/rugcheck/tools.py b/submodules/moragents_dockers/agents/src/agents/rugcheck/tools.py new file mode 100644 index 0000000..a237d7f --- /dev/null +++ b/submodules/moragents_dockers/agents/src/agents/rugcheck/tools.py @@ -0,0 +1,6 @@ +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +# Add your tool implementations here diff --git a/submodules/moragents_dockers/agents/src/agents/tweet_sizzler/agent.py b/submodules/moragents_dockers/agents/src/agents/tweet_sizzler/agent.py index f1d4cf2..75db821 100644 --- a/submodules/moragents_dockers/agents/src/agents/tweet_sizzler/agent.py +++ b/submodules/moragents_dockers/agents/src/agents/tweet_sizzler/agent.py @@ -1,151 +1,110 @@ import logging - import tweepy from src.agents.tweet_sizzler.config import Config -from src.models.messages import ChatRequest +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import AgentCore +from langchain.schema import HumanMessage, SystemMessage -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) logger = logging.getLogger(__name__) -class TweetSizzlerAgent: +class TweetSizzlerAgent(AgentCore): + """Agent for generating and posting tweets.""" + def __init__(self, config, llm, embeddings): - self.config = config - self.llm = llm - self.embeddings = embeddings - self.x_api_key = None + super().__init__(config, llm, embeddings) self.last_prompt_content = None - self.twitter_client = None - - def generate_tweet(self, prompt_content=None): - # State management for tweet regeneration purposes - if prompt_content is not None: - self.last_prompt_content = prompt_content - elif self.last_prompt_content is None: - logger.warning("No prompt content available for tweet generation") - return "Tweet generation failed. Please provide a prompt." - else: - prompt_content = self.last_prompt_content - - logger.info(f"Generating tweet for prompt_content: {prompt_content}") - messages = [ - { - "role": "system", - "content": Config.TWEET_GENERATION_PROMPT, - }, - {"role": "user", "content": f"Generate a tweet for: {prompt_content}"}, - ] + async def _process_request(self, request: ChatRequest) -> AgentResponse: + """Process the validated chat request for tweet generation and posting.""" try: - result = self.llm.invoke(messages) - logger.info(f"Received response from LLM: {result}") - tweet = result.content.strip() - tweet = " ".join(tweet.split()) + messages = [ + SystemMessage(content=Config.TWEET_GENERATION_PROMPT), + HumanMessage(content=request.prompt.content), + ] + + # Extract action from prompt content + action = "generate" # Default action + if isinstance(request.prompt.content, dict): + action = request.prompt.content.get("action", "generate") + content = request.prompt.content.get("content", request.prompt.content) + else: + content = request.prompt.content + + if action == "generate": + tweet = self._generate_tweet(content) + return AgentResponse.success(content=tweet) + elif action == "post": + if not hasattr(request.prompt, "metadata") or not request.prompt.metadata: + return AgentResponse.error(error_message=Config.ERROR_MISSING_API_CREDENTIALS) + + credentials = request.prompt.metadata.get("credentials") + if not credentials: + return AgentResponse.error(error_message=Config.ERROR_MISSING_API_CREDENTIALS) - # Remove any dictionary-like formatting, if present - if tweet.startswith("{") and tweet.endswith("}"): - tweet = tweet.strip("{}").split(":", 1)[-1].strip().strip('"') + result = await self._post_tweet(credentials, content) + if "error" in result: + return AgentResponse.error(error_message=result["error"]) + + return AgentResponse.success( + content=f"Tweet posted successfully: {result['tweet']}", metadata={"tweet_id": result["tweet_id"]} + ) + else: + return AgentResponse.error(error_message=Config.ERROR_INVALID_ACTION) - logger.info(f"Tweet generated successfully: {tweet}") - return tweet except Exception as e: - logger.error(f"Error generating tweet: {str(e)}") - raise - - async def post_tweet(self, request): - logger.info(f"Received tweet request: {request}") - data = await request.json() - logger.info(f"Received tweet data: {data}") - - tweet_content = data.get("post_content") - logger.info(f"Received tweet content: {tweet_content}") - - if not tweet_content: - logger.warning("Attempted to post tweet without providing content") - return {"error": Config.ERROR_NO_TWEET_CONTENT}, 400 - - required_keys = [ - "api_key", - "api_secret", - "access_token", - "access_token_secret", - "bearer_token", - ] - if not all(key in data for key in required_keys): - logger.warning("Missing required API credentials") - return {"error": Config.ERROR_MISSING_API_CREDENTIALS}, 400 + self.logger.error(f"Error processing request: {str(e)}", exc_info=True) + return AgentResponse.error(error_message=str(e)) + + def _generate_tweet(self, prompt_content: str) -> str: + """Generate tweet content based on prompt.""" + if not prompt_content: + raise ValueError("Tweet generation failed. Please provide a prompt.") + + self.last_prompt_content = prompt_content + self.logger.info(f"Generating tweet for prompt_content: {prompt_content}") + + result = self.llm.invoke( + [ + SystemMessage(content=Config.TWEET_GENERATION_PROMPT), + HumanMessage(content=f"Generate a tweet for: {prompt_content}"), + ] + ) + + tweet = result.content.strip() + tweet = " ".join(tweet.split()) + + # Remove any dictionary-like formatting + if tweet.startswith("{") and tweet.endswith("}"): + tweet = tweet.strip("{}").split(":", 1)[-1].strip().strip('"') + + self.logger.info(f"Tweet generated successfully: {tweet}") + return tweet + + async def _post_tweet(self, credentials: dict, tweet_content: str) -> dict: + """Post tweet using provided credentials.""" + required_keys = ["api_key", "api_secret", "access_token", "access_token_secret", "bearer_token"] + + if not all(key in credentials for key in required_keys): + return {"error": Config.ERROR_MISSING_API_CREDENTIALS} try: client = tweepy.Client( - consumer_key=data["api_key"], - consumer_secret=data["api_secret"], - access_token=data["access_token"], - access_token_secret=data["access_token_secret"], - bearer_token=data["bearer_token"], + consumer_key=credentials["api_key"], + consumer_secret=credentials["api_secret"], + access_token=credentials["access_token"], + access_token_secret=credentials["access_token_secret"], + bearer_token=credentials["bearer_token"], ) - # Post tweet response = client.create_tweet(text=tweet_content) - logger.info(f"Tweet posted successfully: {response}") - return { - "success": "Tweet posted successfully", - "tweet": response.data["text"], - "tweet_id": response.data["id"], - }, 200 - except Exception as e: - logger.error(f"Error posting tweet: {str(e)}") - return {"error": f"Failed to post tweet: {str(e)}"}, 500 - - def set_x_api_key(self, request): - data = request.get_json() - required_keys = [ - "api_key", - "api_secret", - "access_token", - "access_token_secret", - "bearer_token", - ] - - if not all(key in data for key in required_keys): - logger.warning("Missing required API credentials") - return {"error": Config.ERROR_MISSING_API_CREDENTIALS}, 400 - - # Save these credentials to local storage - for key in required_keys: - self.flask_app.config[key] = data[key] - - return {"success": "API credentials saved successfully"}, 200 - - def chat(self, chat_request: ChatRequest): - try: - prompt = chat_request.prompt.dict() - logger.info(f"Received chat request: {prompt}") - - action = prompt.get("action", Config.DEFAULT_ACTION) - logger.debug(f"Extracted prompt content: {prompt['content']}, action: {action}") - - if action == "generate": - logger.info(f"Generating tweet for prompt: {prompt['content']}") - tweet = self.generate_tweet(prompt["content"]) - logger.info(f"Generated tweet: {tweet}") - return {"role": "assistant", "content": tweet} - elif action == "post": - logger.info("Attempting to post tweet") - result, status_code = self.post_tweet(chat_request) - logger.info(f"Posted tweet result: {result}, status code: {status_code}") - if isinstance(result, dict) and "error" in result: - return result, status_code - return { - "role": "assistant", - "content": f"Tweet posted successfully: {result.get('tweet', '')}", - }, status_code - else: - logger.error(f"Invalid action received: {action}") - return {"role": "assistant", "content": Config.ERROR_INVALID_ACTION} + self.logger.info(f"Tweet posted successfully: {response}") + return {"tweet": response.data["text"], "tweet_id": response.data["id"]} except Exception as e: - logger.error(f"Unexpected error in chat method: {str(e)}, request: {chat_request}") - raise e + self.logger.error(f"Error posting tweet: {str(e)}") + return {"error": f"Failed to post tweet: {str(e)}"} + + async def _execute_tool(self, func_name: str, args: dict) -> AgentResponse: + """Not implemented as this agent doesn't use tools.""" + return AgentResponse.error(error_message="This agent does not support tool execution") diff --git a/submodules/moragents_dockers/agents/src/app.py b/submodules/moragents_dockers/agents/src/app.py index 6631953..801dbe4 100644 --- a/submodules/moragents_dockers/agents/src/app.py +++ b/submodules/moragents_dockers/agents/src/app.py @@ -1,6 +1,7 @@ import logging import os import time +from typing import Tuple, Dict, Any import uvicorn from fastapi import FastAPI, HTTPException @@ -10,18 +11,30 @@ from src.config import Config from src.delegator import Delegator -from src.models.messages import ChatRequest -from src.stores import agent_manager_instance, chat_manager_instance, workflow_manager_instance +from src.models.core import AgentResponse, ChatRequest +from src.stores import ( + agent_manager_instance, + chat_manager_instance, + workflow_manager_instance, +) + +# Configure routes from src.routes import ( agent_manager_routes, chat_manager_routes, key_manager_routes, wallet_manager_routes, - workflow_manager_routes + workflow_manager_routes, ) -# Constants -UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads") +# Configure agent routes +from src.agents.crypto_data.routes import router as crypto_router +from src.agents.rag.routes import router as rag_router +from src.agents.mor_claims.routes import router as claim_router +from src.agents.tweet_sizzler.routes import router as tweet_router +from src.agents.token_swap.routes import router as swap_router +from src.agents.dca_agent.routes import router as dca_router +from src.agents.base_agent.routes import router as base_router # Configure logging logging.basicConfig( @@ -32,6 +45,7 @@ ) logger = logging.getLogger(__name__) +# Initialize FastAPI app app = FastAPI() app.add_middleware( CORSMiddleware, @@ -40,53 +54,42 @@ allow_methods=["*"], allow_headers=["*"], ) -@app.on_event("startup") -async def startup_event(): - await workflow_manager_instance.initialize() - - -@app.on_event("startup") -async def startup_event(): - await workflow_manager_instance.initialize() - +# Setup constants and directories +UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads") os.makedirs(UPLOAD_FOLDER, exist_ok=True) +# Initialize LLM and embeddings llm = ChatOllama( model=Config.OLLAMA_MODEL, base_url=Config.OLLAMA_URL, ) embeddings = OllamaEmbeddings(model=Config.OLLAMA_EMBEDDING_MODEL, base_url=Config.OLLAMA_URL) +# Initialize delegator delegator = Delegator(llm, embeddings) -# Include base store routes -app.include_router(agent_manager_routes.router) -app.include_router(key_manager_routes.router) -app.include_router(chat_manager_routes.router) -app.include_router(wallet_manager_routes.router) -app.include_router(workflow_manager_routes.router) - -# Agent route imports -from src.agents.crypto_data.routes import router as crypto_router -from src.agents.rag.routes import router as rag_router -from src.agents.mor_claims.routes import router as claim_router -from src.agents.tweet_sizzler.routes import router as tweet_router -from src.agents.token_swap.routes import router as swap_router -from src.agents.dca_agent.routes import router as dca_router -from src.agents.base_agent.routes import router as base_router - -# Include agent routes -app.include_router(crypto_router) -app.include_router(rag_router) -app.include_router(claim_router) -app.include_router(tweet_router) -app.include_router(swap_router) -app.include_router(dca_router) -app.include_router(base_router) - - -async def get_active_agent_for_chat(prompt: dict) -> str: +# Include all routers +ROUTERS = [ + agent_manager_routes.router, + key_manager_routes.router, + chat_manager_routes.router, + wallet_manager_routes.router, + workflow_manager_routes.router, + crypto_router, + rag_router, + claim_router, + tweet_router, + swap_router, + dca_router, + base_router, +] + +for router in ROUTERS: + app.include_router(router) + + +async def get_active_agent_for_chat(prompt: Dict[str, Any]) -> str: """Get the active agent for handling the chat request.""" active_agent = agent_manager_instance.get_active_agent() if active_agent: @@ -105,36 +108,55 @@ async def get_active_agent_for_chat(prompt: dict) -> str: return result["agent"] -def validate_agent_response(response: dict, current_agent: str) -> dict: - """Validate and process the agent's response.""" - if not current_agent: - logger.error("All agents failed to provide a valid response") - raise HTTPException( - status_code=500, - detail="All available agents failed to process the request", - ) - - return response +@app.on_event("startup") +async def startup_event(): + """Initialize workflow manager on startup""" + await workflow_manager_instance.initialize() @app.post("/chat") async def chat(chat_request: ChatRequest): - prompt = chat_request.prompt.dict() - chat_manager_instance.add_message(prompt) + """Handle chat requests and delegate to appropriate agent""" + logger.info(f"Received chat request for conversation {chat_request.conversation_id}") try: - delegator.reset_attempted_agents() - active_agent = await get_active_agent_for_chat(prompt) + # Parse command if present + agent_name, message = agent_manager_instance.parse_command(chat_request.prompt.content) + + if agent_name: + agent_manager_instance.set_active_agent(agent_name) + chat_request.prompt.content = message + + # Add user message to chat history + chat_manager_instance.add_message(chat_request.prompt.dict(), chat_request.conversation_id) + + # Get active agent + if not agent_name: + delegator.reset_attempted_agents() + active_agent = await get_active_agent_for_chat(chat_request.prompt.dict()) + else: + active_agent = agent_name logger.info(f"Delegating chat to active agent: {active_agent}") - current_agent, response = delegator.delegate_chat(active_agent, chat_request) + current_agent, agent_response = await delegator.delegate_chat(active_agent, chat_request) + + if not isinstance(agent_response, AgentResponse): + logger.error(f"Agent {current_agent} returned invalid response type {type(agent_response)}") + raise HTTPException(status_code=500, detail="Agent returned invalid response type") + + # Handle error responses + if agent_response.error_message: + status_code = 400 if "required parameters" in agent_response.error_message else 500 + raise HTTPException(status_code=status_code, detail=agent_response.error_message) - validated_response = validate_agent_response(response, current_agent) - chat_manager_instance.add_response(validated_response, current_agent) + # Convert to API response and add to chat history + chat_manager_instance.add_response(agent_response.dict(), current_agent, chat_request.conversation_id) - logger.info(f"Sending response: {validated_response}") - return validated_response + logger.info(f"Sending response: {agent_response.dict()}") + return agent_response.dict() + except HTTPException: + raise except TimeoutError: logger.error("Chat request timed out") raise HTTPException(status_code=504, detail="Request timed out") diff --git a/submodules/moragents_dockers/agents/src/config.py b/submodules/moragents_dockers/agents/src/config.py index 4809fec..6ff84ca 100644 --- a/submodules/moragents_dockers/agents/src/config.py +++ b/submodules/moragents_dockers/agents/src/config.py @@ -22,32 +22,36 @@ class Config: "description": "Must be used for meta-queries that ask about active Morpheus agents, and also for general, simple questions", "name": "default", "human_readable_name": "Default General Purpose", + "command": "morpheus", "upload_required": False, }, - { - "path": "src.agents.imagen.agent", - "class": "ImagenAgent", - "description": "Must only be used for image generation tasks. Use when the query explicitly mentions generating or creating an image.", - "name": "imagen", - "human_readable_name": "Image Generator", - "upload_required": False, - }, - { - "path": "src.agents.base_agent.agent", - "class": "BaseAgent", - "description": "Handles transactions on the Base crypto network. Use when the user makes any reference to Base, base, the base network, or Coinbase", - "name": "base", - "human_readable_name": "Base Transaction Manager", - "upload_required": False, - }, - { - "path": "src.agents.crypto_data.agent", - "class": "CryptoDataAgent", - "description": "Crypto-specific. Provides real-time cryptocurrency data such as price, market cap, and fully diluted valuation (FDV).", - "name": "crypto data", - "human_readable_name": "Crypto Data Fetcher", - "upload_required": False, - }, + # { + # "path": "src.agents.imagen.agent", + # "class": "ImagenAgent", + # "description": "Must only be used for image generation tasks. Use when the query explicitly mentions generating or creating an image.", + # "name": "imagen", + # "human_readable_name": "Image Generator", + # "command": "imagen", + # "upload_required": False, + # }, + # { + # "path": "src.agents.base_agent.agent", + # "class": "BaseAgent", + # "description": "Handles transactions on the Base crypto network. Use when the user makes any reference to Base, base, the base network, or Coinbase", + # "name": "base", + # "human_readable_name": "Base Transaction Manager", + # "command": "base", + # "upload_required": False, + # }, + # { + # "path": "src.agents.crypto_data.agent", + # "class": "CryptoDataAgent", + # "description": "Crypto-specific. Provides real-time cryptocurrency data such as price, market cap, and fully diluted valuation (FDV).", + # "name": "crypto data", + # "human_readable_name": "Crypto Data Fetcher", + # "command": "crypto", + # "upload_required": False, + # }, # DISABLED: Pending 1inch protocol fix # # { @@ -57,30 +61,33 @@ class Config: # "name": "token swap", # "upload_required": False, # }, - { - "path": "src.agents.tweet_sizzler.agent", - "class": "TweetSizzlerAgent", - "description": "Generates engaging tweets. Use ONLY when the query explicitly mentions Twitter, tweeting, or the X platform.", - "name": "tweet sizzler", - "human_readable_name": "Tweet / X-Post Generator", - "upload_required": False, - }, - { - "path": "src.agents.dca_agent.agent", - "class": "DCAAgent", - "description": "Sets up DCA strategies. Use when the user requests to set up a dollar-cost averaging strategy for crypto purchases or trades.", - "name": "dca", - "human_readable_name": "DCA Strategy Manager", - "upload_required": False, - }, - { - "path": "src.agents.rag.agent", - "class": "RagAgent", - "description": "Answers questions about a document. Must be used anytime an upload, a document, Documents, or uploaded document is mentioned", - "name": "rag", - "human_readable_name": "Document Assistant", - "upload_required": True, - }, + # { + # "path": "src.agents.tweet_sizzler.agent", + # "class": "TweetSizzlerAgent", + # "description": "Generates engaging tweets. Use ONLY when the query explicitly mentions Twitter, tweeting, or the X platform.", + # "name": "tweet sizzler", + # "human_readable_name": "Tweet / X-Post Generator", + # "command": "tweet", + # "upload_required": False, + # }, + # { + # "path": "src.agents.dca_agent.agent", + # "class": "DCAAgent", + # "description": "Sets up DCA strategies. Use when the user requests to set up a dollar-cost averaging strategy for crypto purchases or trades.", + # "name": "dca", + # "human_readable_name": "DCA Strategy Manager", + # "command": "dca", + # "upload_required": False, + # }, + # { + # "path": "src.agents.rag.agent", + # "class": "RagAgent", + # "description": "Answers questions about a document. Must be used anytime an upload, a document, Documents, or uploaded document is mentioned", + # "name": "rag", + # "human_readable_name": "Document Assistant", + # "command": "document", + # "upload_required": True, + # }, # DISABLED: # # { @@ -90,28 +97,49 @@ class Config: # "name": "mor claims", # "upload_required": False, # }, + # { + # "path": "src.agents.mor_rewards.agent", + # "class": "MorRewardsAgent", + # "description": "Provides information about user's accrued MOR rewards or tokens. Use when the query is about checking or querying reward balances.", + # "name": "mor rewards", + # "human_readable_name": "MOR Rewards Tracker", + # "command": "rewards", + # "upload_required": False, + # }, + # { + # "path": "src.agents.realtime_search.agent", + # "class": "RealtimeSearchAgent", + # "description": f"Use when the query is about searching the web or asks about a recent / current event (The year is {datetime.datetime.now().year})", + # "name": "realtime search", + # "human_readable_name": "Real-Time Search", + # "command": "search", + # "upload_required": False, + # }, + # { + # "path": "src.agents.news_agent.agent", + # "class": "NewsAgent", + # "description": "Fetches and analyzes cryptocurrency news for potential price impacts.", + # "name": "crypto news", + # "human_readable_name": "Crypto News Analyst", + # "command": "news", + # "upload_required": False, + # }, { - "path": "src.agents.mor_rewards.agent", - "class": "MorRewardsAgent", - "description": "Provides information about user's accrued MOR rewards or tokens. Use when the query is about checking or querying reward balances.", - "name": "mor rewards", - "human_readable_name": "MOR Rewards Tracker", - "upload_required": False, - }, - { - "path": "src.agents.realtime_search.agent", - "class": "RealtimeSearchAgent", - "description": f"Use when the query is about searching the web or asks about a recent / current event (The year is {datetime.datetime.now().year})", - "name": "realtime search", - "human_readable_name": "Real-Time Search", + "path": "src.agents.dexscreener.agent", + "class": "DexScreenerAgent", + "description": "Fetches and analyzes cryptocurrency trading data from DexScreener.", + "name": "dexscreener", + "human_readable_name": "DexScreener Analyst", + "command": "dexscreener", "upload_required": False, }, { - "path": "src.agents.news_agent.agent", - "class": "NewsAgent", - "description": "Fetches and analyzes cryptocurrency news for potential price impacts.", - "name": "crypto news", - "human_readable_name": "Crypto News Analyst", + "path": "src.agents.rugcheck.agent", + "class": "RugcheckAgent", + "description": "Analyzes token safety and trends using the Rugcheck API. Use when the query is about checking token safety, risks, or viewing trending tokens.", + "name": "rugcheck", + "human_readable_name": "Token Safety Analyzer", + "command": "rugcheck", "upload_required": False, }, ] diff --git a/submodules/moragents_dockers/agents/src/delegator.py b/submodules/moragents_dockers/agents/src/delegator.py index d79268b..6e4105f 100644 --- a/submodules/moragents_dockers/agents/src/delegator.py +++ b/submodules/moragents_dockers/agents/src/delegator.py @@ -3,6 +3,8 @@ from langchain.schema import HumanMessage, SystemMessage from src.stores import chat_manager_instance, agent_manager_instance +from src.models.core import ChatRequest, AgentResponse +from src.agents.agent_core.agent import ResponseType logger = logging.getLogger(__name__) @@ -29,10 +31,7 @@ def get_available_unattempted_agents(self) -> List[Dict]: for agent_config in agent_manager_instance.get_available_agents() if agent_config["name"] in agent_manager_instance.get_selected_agents() and agent_config["name"] not in self.attempted_agents - and not ( - agent_config["upload_required"] - and not chat_manager_instance.get_uploaded_file_status() - ) + and not (agent_config["upload_required"] and not chat_manager_instance.get_uploaded_file_status()) ] def get_delegator_response(self, prompt: Dict) -> Dict[str, str]: @@ -91,47 +90,62 @@ def get_delegator_response(self, prompt: Dict) -> Dict[str, str]: # Track this agent as attempted self.attempted_agents.add(selected_agent_name) - logger.info( - f"Added {selected_agent_name} to attempted agents. Current attempts: {self.attempted_agents}" - ) + logger.info(f"Added {selected_agent_name} to attempted agents. Current attempts: {self.attempted_agents}") return {"agent": selected_agent_name} - def delegate_chat(self, agent_name: str, chat_request: Any) -> Tuple[Optional[str], Any]: + async def delegate_chat(self, agent_name: str, chat_request: ChatRequest) -> Tuple[Optional[str], AgentResponse]: """Delegate chat to specific agent with cascading fallback""" logger.info(f"Attempting to delegate chat to agent: {agent_name}") + # Add agent to attempted set before trying to use it + self.attempted_agents.add(agent_name) + if agent_name not in agent_manager_instance.get_selected_agents(): logger.warning(f"Attempted to delegate to unselected agent: {agent_name}") - return self._try_next_agent(chat_request) + return await self._try_next_agent(chat_request) agent = agent_manager_instance.get_agent(agent_name) if not agent: logger.error(f"Agent {agent_name} is selected but not loaded") - return self._try_next_agent(chat_request) + return await self._try_next_agent(chat_request) try: - result = agent.chat(chat_request) + result = await agent.chat(chat_request) logger.info(f"Chat delegation to {agent_name} completed successfully") return agent_name, result except Exception as e: logger.error(f"Error during chat delegation to {agent_name}: {str(e)}") - return self._try_next_agent(chat_request) + return await self._try_next_agent(chat_request) - def _try_next_agent(self, chat_request: Any) -> Tuple[Optional[str], Any]: + async def _try_next_agent(self, chat_request: ChatRequest) -> Tuple[Optional[str], AgentResponse]: """Try to get a response from the next best available agent""" try: # Get next best agent result = self.get_delegator_response(chat_request.prompt.dict()) if "agent" not in result: - return None, {"error": "No suitable agent found"} + return None, AgentResponse( + type=ResponseType.ERROR, message="No suitable agent found", error="No suitable agent found" + ) next_agent = result["agent"] logger.info(f"Cascading to next agent: {next_agent}") - return self.delegate_chat(next_agent, chat_request) + # Check if we've already tried this agent to prevent infinite loop + if next_agent in self.attempted_agents: + return None, AgentResponse( + type=ResponseType.ERROR, + message="All available agents have been attempted without success", + error="No remaining untried agents", + ) + + return await self.delegate_chat(next_agent, chat_request) except ValueError as ve: # No more agents available logger.error(f"No more agents available: {str(ve)}") - return None, {"error": "All available agents have been attempted without success"} + return None, AgentResponse( + type=ResponseType.ERROR, + message="All available agents have been attempted without success", + error=str(ve), + ) diff --git a/submodules/moragents_dockers/agents/src/models/core.py b/submodules/moragents_dockers/agents/src/models/core.py new file mode 100644 index 0000000..2d42995 --- /dev/null +++ b/submodules/moragents_dockers/agents/src/models/core.py @@ -0,0 +1,80 @@ +import time +from typing import List, Optional, Dict, Any +from fastapi import Query +from pydantic import BaseModel, Field + + +class ChatMessage(BaseModel): + """Enhanced chat message model that includes all agent response fields""" + + role: str + content: str + agentName: Optional[str] = None + error_message: Optional[str] = None + metadata: Optional[Dict[str, Any]] = Field(default_factory=dict) + requires_action: Optional[bool] = False + action_type: Optional[str] = None + timestamp: Optional[float] = Field(default_factory=lambda: time.time()) + + def from_agent_response(self, response: "AgentResponse", agent_name: str) -> "ChatMessage": + """Create a ChatMessage from an AgentResponse""" + return ChatMessage( + role="assistant", + content=response.content, + agentName=agent_name, + error_message=response.error_message, + metadata=response.metadata, + requires_action=response.requires_action, + action_type=response.action_type, + ) + + +class ChatRequest(BaseModel): + prompt: ChatMessage + chain_id: str + wallet_address: str + conversation_id: str = Query(default="default") + + +class Conversation(BaseModel): + messages: List[ChatMessage] + has_uploaded_file: bool = False + + +class AgentResponse(BaseModel): + """Base response model for all agent responses""" + + content: str + error_message: Optional[str] = None + metadata: Optional[Dict[str, Any]] = Field(default_factory=dict) + requires_action: Optional[bool] = False + action_type: Optional[str] = None + + def to_chat_message(self, agent_name: str) -> ChatMessage: + """Convert AgentResponse to ChatMessage""" + return ChatMessage( + role="assistant", + content=self.content, + agentName=agent_name, + error_message=self.error_message, + metadata=self.metadata, + requires_action=self.requires_action, + action_type=self.action_type, + ) + + @classmethod + def success(cls, content: str, metadata: Optional[Dict[str, Any]] = None) -> "AgentResponse": + """Create a successful response""" + return cls(content=content, metadata=metadata or {}) + + @classmethod + def error(cls, error_message: str) -> "AgentResponse": + """Create an error response""" + return cls(content="An error occurred", error_message=error_message) + + @classmethod + def action_required( + cls, content: str, action_type: str, metadata: Optional[Dict[str, Any]] = None + ) -> "AgentResponse": + """Create a response that requires user action""" + return cls(content=content, requires_action=True, action_type=action_type, metadata=metadata or {}) diff --git a/submodules/moragents_dockers/agents/src/models/messages.py b/submodules/moragents_dockers/agents/src/models/messages.py deleted file mode 100644 index e17d693..0000000 --- a/submodules/moragents_dockers/agents/src/models/messages.py +++ /dev/null @@ -1,12 +0,0 @@ -from pydantic import BaseModel - - -class ChatMessage(BaseModel): - role: str - content: str - - -class ChatRequest(BaseModel): - prompt: ChatMessage - chain_id: str - wallet_address: str diff --git a/submodules/moragents_dockers/agents/src/routes/agent_manager_routes.py b/submodules/moragents_dockers/agents/src/routes/agent_manager_routes.py index 1b3cd4a..6b40af4 100644 --- a/submodules/moragents_dockers/agents/src/routes/agent_manager_routes.py +++ b/submodules/moragents_dockers/agents/src/routes/agent_manager_routes.py @@ -29,3 +29,18 @@ async def set_selected_agents(request: Request) -> JSONResponse: logger.info(f"Newly selected agents: {agent_manager_instance.get_selected_agents()}") return JSONResponse(content={"status": "success", "agents": agent_names}) + + +@router.get("/commands") +async def get_agent_commands() -> JSONResponse: + """Get the list of available agent commands""" + available_agents = agent_manager_instance.get_available_agents() + commands = [ + { + "command": agent["command"], + "description": agent["description"], + "name": agent["human_readable_name"], + } + for agent in available_agents + ] + return JSONResponse(content={"commands": commands}) diff --git a/submodules/moragents_dockers/agents/src/routes/chat_manager_routes.py b/submodules/moragents_dockers/agents/src/routes/chat_manager_routes.py index a1df779..32f41d2 100644 --- a/submodules/moragents_dockers/agents/src/routes/chat_manager_routes.py +++ b/submodules/moragents_dockers/agents/src/routes/chat_manager_routes.py @@ -1,5 +1,5 @@ import logging -from fastapi import APIRouter +from fastapi import APIRouter, Query from src.stores import chat_manager_instance logger = logging.getLogger(__name__) @@ -8,15 +8,39 @@ @router.get("/messages") -async def get_messages(): - """Get all chat messages""" - logger.info("Received get_messages request") - return {"messages": chat_manager_instance.get_messages()} +async def get_messages(conversation_id: str = Query(default="default")): + """Get all chat messages for a conversation""" + logger.info(f"Received get_messages request for conversation {conversation_id}") + return {"messages": chat_manager_instance.get_messages(conversation_id)} @router.get("/clear") -async def clear_messages(): - """Clear chat message history""" - logger.info("Clearing message history") - chat_manager_instance.clear_messages() +async def clear_messages(conversation_id: str = Query(default="default")): + """Clear chat message history for a conversation""" + logger.info(f"Clearing message history for conversation {conversation_id}") + chat_manager_instance.clear_messages(conversation_id) return {"response": "successfully cleared message history"} + + +@router.get("/conversations") +async def get_conversations(): + """Get all conversation IDs""" + logger.info("Getting all conversation IDs") + return {"conversation_ids": chat_manager_instance.get_all_conversation_ids()} + + +@router.post("/conversations") +async def create_conversation(): + """Create a new conversation""" + new_id = f"conversation_{len(chat_manager_instance.get_all_conversation_ids())}" + conversation = chat_manager_instance.create_conversation(new_id) + logger.info(f"Created new conversation with ID: {new_id}") + return {"conversation_id": new_id, "conversation": conversation} + + +@router.delete("/conversations/{conversation_id}") +async def delete_conversation(conversation_id: str): + """Delete a conversation""" + logger.info(f"Deleting conversation {conversation_id}") + chat_manager_instance.delete_conversation(conversation_id) + return {"response": f"successfully deleted conversation {conversation_id}"} diff --git a/submodules/moragents_dockers/agents/src/stores/agent_manager.py b/submodules/moragents_dockers/agents/src/stores/agent_manager.py index aaee3dd..a28cf61 100644 --- a/submodules/moragents_dockers/agents/src/stores/agent_manager.py +++ b/submodules/moragents_dockers/agents/src/stores/agent_manager.py @@ -1,7 +1,7 @@ import importlib import logging -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from langchain_ollama import ChatOllama from langchain_community.embeddings import OllamaEmbeddings @@ -165,6 +165,41 @@ def get_agent(self, agent_name: str) -> Optional[Any]: """ return self.agents.get(agent_name) + def get_agent_by_command(self, command: str) -> Optional[str]: + """ + Get agent name by command. + + Args: + command (str): Command to look up + + Returns: + Optional[str]: Agent name if found, None otherwise + """ + for agent in self.config["agents"]: + if agent["command"] == command: + return agent["name"] + return None + + def parse_command(self, message: str) -> Tuple[Optional[str], str]: + """ + Parse a message for commands. + + Args: + message (str): Message to parse + + Returns: + Tuple[Optional[str], str]: Tuple of (agent_name, message_without_command) + """ + if not message.startswith("/"): + return None, message + + parts = message[1:].split(maxsplit=1) + command = parts[0] + remaining_message = parts[1] if len(parts) > 1 else "" + + agent_name = self.get_agent_by_command(command) + return agent_name, remaining_message + # Create an instance to act as a singleton store agent_manager_instance = AgentManager(Config.AGENTS_CONFIG) diff --git a/submodules/moragents_dockers/agents/src/stores/chat_manager.py b/submodules/moragents_dockers/agents/src/stores/chat_manager.py index d52831a..6ded291 100644 --- a/submodules/moragents_dockers/agents/src/stores/chat_manager.py +++ b/submodules/moragents_dockers/agents/src/stores/chat_manager.py @@ -1,51 +1,201 @@ import logging +import time from typing import Dict, List +from src.models.core import ChatMessage, Conversation, AgentResponse logger = logging.getLogger(__name__) class ChatManager: - def __init__(self): - self.has_uploaded_file = False - self.messages: List[Dict[str, str]] = [ - { - "role": "assistant", - "content": """This highly experimental chatbot is not intended for making important decisions, - and its responses are generated based on incomplete data and algorithms that may evolve rapidly. - By using this chatbot, you acknowledge that you use it at your own discretion - and assume all risks associated with its limitations and potential errors.""", - } - ] - - def add_message(self, message: Dict[str, str]): - self.messages.append(message) - logger.info(f"Added message: {message}") - - def get_messages(self) -> List[Dict[str, str]]: - return self.messages - - def set_uploaded_file(self, has_file: bool): - self.has_uploaded_file = has_file - logger.info(f"Set uploaded file status to: {has_file}") - - def get_uploaded_file_status(self) -> bool: - return self.has_uploaded_file - - def clear_messages(self): - self.messages = [self.messages[0]] # Keep the initial message - logger.info("Cleared message history") - - def get_last_message(self) -> Dict[str, str]: - return self.messages[-1] if self.messages else {} - - def add_response(self, response: Dict[str, str], agent_name: str): - response_with_agent = response.copy() - response_with_agent["agentName"] = agent_name - self.add_message(response_with_agent) - logger.info(f"Added response from agent {agent_name}: {response_with_agent}") - - def get_chat_history(self) -> str: - return "\n".join([f"{msg['role']}: {msg['content']}" for msg in self.messages]) + """ + Manages chat conversations and message history. + + This class provides functionality to: + - Create and manage multiple conversations identified by unique IDs + - Add/retrieve messages and responses within conversations + - Track file upload status per conversation + - Clear conversation history + - Get chat history in different formats + - Delete conversations + + Each conversation starts with a default disclaimer message about the experimental nature + of the chatbot. + + Attributes: + conversations (Dict[str, Conversation]): Dictionary mapping conversation IDs to Conversation objects + default_message (ChatMessage): Default disclaimer message added to new conversations + + Example: + >>> chat_manager = ChatManager() + >>> chat_manager.add_message({"role": "user", "content": "Hello"}, "conv1") + >>> messages = chat_manager.get_messages("conv1") + """ + + def __init__(self) -> None: + self.conversations: Dict[str, Conversation] = {} + self.default_message = ChatMessage( + role="assistant", + agentName="Morpheus AI", + content="""This highly experimental chatbot is not intended for making important decisions. Its + responses are generated using AI models and may not always be accurate. + By using this chatbot, you acknowledge that you use it at your own discretion + and assume all risks associated with its limitations and potential errors.""", + metadata={}, + requires_action=False, + ) + + def get_messages(self, conversation_id: str) -> List[Dict[str, str]]: + """ + Get all messages for a specific conversation. + + Args: + conversation_id (str): Unique identifier for the conversation + + Returns: + List[Dict[str, str]]: List of messages as dictionaries + """ + conversation = self._get_or_create_conversation(conversation_id) + return [msg.dict() for msg in conversation.messages] + + def add_message(self, message: Dict[str, str], conversation_id: str): + """ + Add a new message to a conversation. + + Args: + message (Dict[str, str]): Message to add + conversation_id (str): Conversation to add message to + """ + conversation = self._get_or_create_conversation(conversation_id) + chat_message = ChatMessage(**message) + if "timestamp" not in message: + chat_message.timestamp = time.time() + conversation.messages.append(chat_message) + logger.info(f"Added message to conversation {conversation_id}: {chat_message.content}") + + def add_response(self, response: Dict[str, str], agent_name: str, conversation_id: str): + """ + Add an agent's response to a conversation. + + Args: + response (Dict[str, str]): Response content + agent_name (str): Name of the responding agent + conversation_id (str): Conversation to add response to + """ + # Convert the dictionary to an AgentResponse first + agent_response = AgentResponse(**response) + # Then convert to ChatMessage + chat_message = agent_response.to_chat_message(agent_name) + self.add_message(chat_message.dict(), conversation_id) + logger.info(f"Added response from agent {agent_name} to conversation {conversation_id}") + + def set_uploaded_file(self, has_file: bool, conversation_id: str): + """ + Set whether a conversation has an uploaded file. + + Args: + has_file (bool): Whether file is uploaded + conversation_id (str): Target conversation + """ + conversation = self._get_or_create_conversation(conversation_id) + conversation.has_uploaded_file = has_file + logger.info(f"Set uploaded file status to {has_file} for conversation {conversation_id}") + + def get_uploaded_file_status(self, conversation_id: str) -> bool: + """ + Check if a conversation has an uploaded file. + + Args: + conversation_id (str): Conversation to check + + Returns: + bool: True if conversation has uploaded file, False otherwise + """ + conversation = self._get_or_create_conversation(conversation_id) + return conversation.has_uploaded_file + + def clear_messages(self, conversation_id: str): + """ + Clear all messages in a conversation except the default message. + + Args: + conversation_id (str): Conversation to clear + """ + conversation = self._get_or_create_conversation(conversation_id) + conversation.messages = [self.default_message] # Keep the initial message + logger.info(f"Cleared message history for conversation {conversation_id}") + + def get_last_message(self, conversation_id: str) -> Dict[str, str]: + """ + Get the most recent message from a conversation. + + Args: + conversation_id (str): Conversation to get message from + + Returns: + Dict[str, str]: Last message or empty dict if no messages + """ + conversation = self._get_or_create_conversation(conversation_id) + return conversation.messages[-1].dict() if conversation.messages else {} + + def get_chat_history(self, conversation_id: str) -> str: + """ + Get formatted chat history for a conversation. + + Args: + conversation_id (str): Conversation to get history for + + Returns: + str: Formatted chat history as string + """ + conversation = self._get_or_create_conversation(conversation_id) + return "\n".join([f"{msg.role}: {msg.content}" for msg in conversation.messages]) + + def get_all_conversation_ids(self) -> List[str]: + """ + Get a list of all conversation IDs. + + Returns: + List[str]: List of conversation IDs + """ + return list(self.conversations.keys()) + + def delete_conversation(self, conversation_id: str): + """ + Delete a conversation by ID. + + Args: + conversation_id (str): ID of conversation to delete + """ + if conversation_id in self.conversations: + del self.conversations[conversation_id] + logger.info(f"Deleted conversation {conversation_id}") + + def create_conversation(self, conversation_id: str) -> Dict: + """ + Create a new conversation with the given ID. + + Args: + conversation_id (str): ID for new conversation + + Returns: + Dict: Created conversation as dictionary + """ + conversation = self._get_or_create_conversation(conversation_id) + return conversation.dict() + + def _get_or_create_conversation(self, conversation_id: str) -> Conversation: + """ + Get existing conversation or create new one if not exists. + + Args: + conversation_id (str): Conversation ID to get/create + + Returns: + Conversation: Retrieved or created conversation + """ + if conversation_id not in self.conversations: + self.conversations[conversation_id] = Conversation(messages=[self.default_message], has_uploaded_file=False) + return self.conversations[conversation_id] # Create an instance to act as a singleton store diff --git a/submodules/moragents_dockers/agents/src/stores/key_manager.py b/submodules/moragents_dockers/agents/src/stores/key_manager.py index f697fa0..e4eec51 100644 --- a/submodules/moragents_dockers/agents/src/stores/key_manager.py +++ b/submodules/moragents_dockers/agents/src/stores/key_manager.py @@ -6,18 +6,34 @@ class Service(Enum): + """Supported API service types""" + X = "x" COINBASE = "coinbase" class BaseKeys: - """Base class for API keys to ensure proper initialization""" + """ + Base class for API keys to ensure proper initialization. + + This class serves as a parent class for specific API key implementations, + providing a consistent interface for key management. + """ def __init__(self): pass class XKeys(BaseKeys): + """ + Container for X (formerly Twitter) API authentication credentials. + + Stores and manages the various keys and tokens required for X API access: + - API key and secret for application authentication + - Access token and secret for user authentication + - Bearer token for application-only authentication + """ + def __init__(self): super().__init__() self.api_key: Optional[str] = None @@ -27,7 +43,12 @@ def __init__(self): self.bearer_token: Optional[str] = None def is_complete(self) -> bool: - """Check if all required keys are set""" + """ + Check if all required X API credentials are set. + + Returns: + bool: True if all required keys are present, False otherwise + """ return all( [ self.api_key, @@ -40,13 +61,26 @@ def is_complete(self) -> bool: class CoinbaseKeys(BaseKeys): + """ + Container for Coinbase API authentication credentials. + + Stores and manages the CDP API key and secret required for Coinbase API access. + CDP (Coinbase Developer Platform) credentials are used for accessing Coinbase's + trading and account management features. + """ + def __init__(self): super().__init__() self.cdp_api_key: Optional[str] = None self.cdp_api_secret: Optional[str] = None def is_complete(self) -> bool: - """Check if all required keys are set""" + """ + Check if all required Coinbase API credentials are set. + + Returns: + bool: True if both CDP API key and secret are present, False otherwise + """ return all([self.cdp_api_key, self.cdp_api_secret]) @@ -54,6 +88,29 @@ def is_complete(self) -> bool: class KeyManager: + """ + Manages API keys and authentication credentials for multiple services. + + This class provides a centralized way to store, retrieve, and manage API keys + for different services (X and Coinbase). It implements a singleton pattern + to ensure consistent key management across the application. + + Features: + - Secure storage of API keys and tokens + - Service-specific key validation + - Key presence checking + - Key clearing functionality + + Attributes: + keys (Dict[Service, KeysType]): Dictionary mapping services to their respective key containers + + Example: + >>> manager = KeyManager() + >>> manager.set_x_keys(api_key="key", api_secret="secret", ...) + >>> if manager.has_x_keys(): + >>> x_keys = manager.get_x_keys() + """ + def __init__(self): self.keys: Dict[Service, KeysType] = { Service.X: XKeys(), @@ -68,7 +125,16 @@ def set_x_keys( access_token_secret: str, bearer_token: str, ) -> None: - """Set all X API keys""" + """ + Set all X API keys. + + Args: + api_key (str): Application API key + api_secret (str): Application API secret + access_token (str): User access token + access_token_secret (str): User access token secret + bearer_token (str): Application-only authentication token + """ x_keys = XKeys() x_keys.api_key = api_key x_keys.api_secret = api_secret @@ -79,41 +145,68 @@ def set_x_keys( logger.info("Updated X API keys") def set_coinbase_keys(self, cdp_api_key: str, cdp_api_secret: str) -> None: - """Set Coinbase API keys""" + """ + Set Coinbase API keys. + + Args: + cdp_api_key (str): Coinbase Developer Platform API key + cdp_api_secret (str): Coinbase Developer Platform API secret + """ coinbase_keys = CoinbaseKeys() coinbase_keys.cdp_api_key = cdp_api_key # Handle newline replacement when setting the key - coinbase_keys.cdp_api_secret = ( - cdp_api_secret.replace("\\n", "\n") if cdp_api_secret else None - ) + coinbase_keys.cdp_api_secret = cdp_api_secret.replace("\\n", "\n") if cdp_api_secret else None self.keys[Service.COINBASE] = coinbase_keys logger.info("Updated Coinbase API keys") def get_x_keys(self) -> XKeys: - """Get X API keys""" + """ + Get X API keys. + + Returns: + XKeys: Container with X API credentials + """ keys = self.keys[Service.X] assert isinstance(keys, XKeys) return keys def get_coinbase_keys(self) -> CoinbaseKeys: - """Get Coinbase API keys""" + """ + Get Coinbase API keys. + + Returns: + CoinbaseKeys: Container with Coinbase API credentials + """ keys = self.keys[Service.COINBASE] assert isinstance(keys, CoinbaseKeys) return keys def has_x_keys(self) -> bool: - """Check if all required X keys are set""" + """ + Check if all required X keys are set. + + Returns: + bool: True if all required X API credentials are present and valid + """ return isinstance(self.keys[Service.X], XKeys) and self.keys[Service.X].is_complete() def has_coinbase_keys(self) -> bool: - """Check if all required Coinbase keys are set""" - return ( - isinstance(self.keys[Service.COINBASE], CoinbaseKeys) - and self.keys[Service.COINBASE].is_complete() - ) + """ + Check if all required Coinbase keys are set. + + Returns: + bool: True if all required Coinbase API credentials are present and valid + """ + return isinstance(self.keys[Service.COINBASE], CoinbaseKeys) and self.keys[Service.COINBASE].is_complete() def clear_keys(self, service: Optional[Service] = None) -> None: - """Clear keys for specified service or all if none specified""" + """ + Clear keys for specified service or all if none specified. + + Args: + service (Optional[Service]): Specific service to clear keys for. + If None, clears all services' keys. + """ if service == Service.X or service is None: self.keys[Service.X] = XKeys() logger.info("Cleared X API keys") @@ -123,7 +216,12 @@ def clear_keys(self, service: Optional[Service] = None) -> None: logger.info("Cleared Coinbase API keys") def has_any_keys(self) -> bool: - """Check if any API keys are stored""" + """ + Check if any API keys are stored. + + Returns: + bool: True if any service has valid API credentials set + """ return any([self.has_x_keys(), self.has_coinbase_keys()]) diff --git a/submodules/moragents_dockers/agents/src/stores/wallet_manager.py b/submodules/moragents_dockers/agents/src/stores/wallet_manager.py index 86734a6..94d17d0 100644 --- a/submodules/moragents_dockers/agents/src/stores/wallet_manager.py +++ b/submodules/moragents_dockers/agents/src/stores/wallet_manager.py @@ -9,7 +9,36 @@ class WalletManager: - def __init__(self): + """ + Manages Coinbase CDP wallets and their associated data. + + This class provides functionality to create, restore, save, and manage CDP wallets. + It maintains an in-memory store of wallets and their exported data, handles wallet + activation states, and provides methods for wallet operations. + + Key Features: + - Create and restore CDP wallets + - Save/load wallet data to/from files + - Manage active wallet selection + - Export wallet data + - List available wallets + - Configure CDP client with stored credentials + + Attributes: + wallets (Dict[str, Wallet]): Dictionary storing wallet objects by wallet ID + wallet_data (Dict[str, dict]): Dictionary storing exported wallet data by wallet ID + cdp_client (Optional[Cdp]): Configured CDP client instance + active_wallet_id (Optional[str]): ID of currently active wallet + + Example Usage: + manager = WalletManager() + wallet = manager.create_wallet("my_wallet", network_id="BASE-SEPOLIA") + manager.save_wallet("my_wallet", "wallets/my_wallet.json") + manager.set_active_wallet("my_wallet") + address = manager.get_wallet_address("my_wallet") + """ + + def __init__(self) -> None: """Initialize the WalletManager""" self.wallets: Dict[str, Wallet] = {} self.wallet_data: Dict[str, dict] = {} @@ -36,9 +65,7 @@ def configure_cdp_client(self) -> bool: logger.error(f"Failed to configure CDP client: {str(e)}") return False - def create_wallet( - self, wallet_id: str, network_id: Optional[str] = None, set_active: bool = True - ) -> Wallet: + def create_wallet(self, wallet_id: str, network_id: Optional[str] = None, set_active: bool = True) -> Wallet: """Create a new CDP wallet and store it""" try: if not wallet_id: @@ -49,7 +76,11 @@ def create_wallet( logger.info(f"Creating new wallet with network ID: {network_id}") logger.info(f"Current wallets: {self.wallets}") - wallet = Wallet.create(network_id=network_id) + + # Use testnet network ID if none is provided + network_id_to_use = network_id if network_id is not None else "BASE-SEPOLIA" + wallet = Wallet.create(network_id=network_id_to_use) + if not wallet: raise ValueError("Failed to create wallet - wallet is None") @@ -72,9 +103,7 @@ def create_wallet( logger.error(f"Failed to create wallet: {str(e)}") raise - def restore_wallet( - self, wallet_id: str, wallet_data: dict, set_active: bool = True - ) -> Optional[Wallet]: + def restore_wallet(self, wallet_id: str, wallet_data: dict, set_active: bool = True) -> Optional[Wallet]: """Restore a wallet from exported data""" try: if not wallet_id: @@ -114,7 +143,7 @@ def get_wallet(self, wallet_id: str) -> Optional[Wallet]: def get_wallet_address(self, wallet_id: str) -> Optional[str]: """Get the default address for a wallet""" wallet = self.get_wallet(wallet_id) - if not wallet: + if not wallet or not wallet.default_address: return None return wallet.default_address.address_id @@ -163,9 +192,7 @@ def save_wallet(self, wallet_id: str, filepath: str) -> bool: logger.error(f"Failed to save wallet: {str(e)}") return False - def load_wallet( - self, wallet_id: str, filepath: str, set_active: bool = True - ) -> Optional[Wallet]: + def load_wallet(self, wallet_id: str, filepath: str, set_active: bool = True) -> Optional[Wallet]: """Load wallet from saved data""" try: with open(filepath, "r") as f: @@ -209,7 +236,7 @@ def list_wallets(self) -> list[dict]: "wallet_id": wallet_id, "network_id": wallet.network_id, "is_active": wallet_id == self.active_wallet_id, - "address": wallet.default_address.address_id, + "address": wallet.default_address.address_id if wallet.default_address else None, } for wallet_id, wallet in self.wallets.items() ] diff --git a/submodules/moragents_dockers/agents/src/stores/workflow_manager.py b/submodules/moragents_dockers/agents/src/stores/workflow_manager.py index a11170b..69b5bf4 100644 --- a/submodules/moragents_dockers/agents/src/stores/workflow_manager.py +++ b/submodules/moragents_dockers/agents/src/stores/workflow_manager.py @@ -15,23 +15,47 @@ class WorkflowStatus(str, Enum): """Status states for workflows""" - ACTIVE = "active" - PAUSED = "paused" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" + ACTIVE = "active" # Workflow is active and will be executed on schedule + PAUSED = "paused" # Workflow is temporarily paused but can be resumed + COMPLETED = "completed" # Workflow has finished successfully + FAILED = "failed" # Workflow encountered an error and stopped + CANCELLED = "cancelled" # Workflow was manually cancelled @dataclass class Workflow: - """Represents a scheduled recurring action workflow""" + """ + Represents a scheduled recurring action workflow. + + A workflow defines an automated task that runs on a schedule. It contains all the + information needed to execute an action periodically, track its progress, and + manage its lifecycle. + + Attributes: + id (str): Unique identifier for the workflow + name (str): Human-readable name for the workflow + description (str): Detailed description of what the workflow does + action (str): Name of the action to execute (e.g. "dca_trade") + params (Dict[str, Any]): Parameters required by the action + interval (timedelta): Time between workflow executions + last_run (Optional[datetime]): When the workflow last executed + next_run (Optional[datetime]): When the workflow will next execute + status (WorkflowStatus): Current status of the workflow + created_at (datetime): When the workflow was created + updated_at (datetime): When the workflow was last modified + metadata (Dict): Additional workflow metadata + + Methods: + to_dict(): Serializes workflow to dictionary format + from_dict(): Creates workflow instance from dictionary data + """ id: str name: str description: str - action: str # Name of the action to execute (e.g. "dca_trade") - params: Dict[str, Any] # Parameters for the action - interval: timedelta # How often to execute the action + action: str + params: Dict[str, Any] + interval: timedelta last_run: Optional[datetime] = None next_run: Optional[datetime] = None status: WorkflowStatus = WorkflowStatus.ACTIVE @@ -40,7 +64,12 @@ class Workflow: metadata: Dict = field(default_factory=dict) def to_dict(self) -> dict: - """Convert workflow to dictionary format""" + """ + Convert workflow to dictionary format for storage. + + Returns: + dict: Dictionary representation of the workflow with all attributes + """ return { "id": self.id, "name": self.name, @@ -58,7 +87,15 @@ def to_dict(self) -> dict: @classmethod def from_dict(cls, data: dict) -> "Workflow": - """Create workflow instance from dictionary""" + """ + Create workflow instance from dictionary data. + + Args: + data (dict): Dictionary containing workflow data + + Returns: + Workflow: New workflow instance initialized with the provided data + """ workflow = cls( id=data["id"], name=data["name"], @@ -79,7 +116,50 @@ def from_dict(cls, data: dict) -> "Workflow": class WorkflowManager: - """Manages workflow persistence and operations""" + """ + Manages the lifecycle and execution of automated workflows. + + The WorkflowManager handles creation, scheduling, execution, and persistence of + workflows. It provides a robust system for running recurring automated tasks with + proper error handling, state management, and data persistence. + + Key Features: + - Asynchronous workflow execution + - Persistent storage of workflow data + - Configurable action handlers + - Automatic scheduling and execution + - Thread-safe operations with locking + - Comprehensive error handling and logging + + Attributes: + storage_path (Path): Path to workflow storage file + workflows (Dict[str, Workflow]): In-memory store of active workflows + _lock (asyncio.Lock): Lock for thread-safe operations + _scheduler_task (Optional[asyncio.Task]): Background scheduler task + _action_handlers (Dict[str, Any]): Registered workflow action handlers + + Example Usage: + manager = WorkflowManager() + await manager.initialize() + + # Create a new workflow + workflow = await manager.create_workflow( + name="DCA Bitcoin", + description="Weekly BTC purchase", + action="dca_trade", + params={"amount": 100, "asset": "BTC"}, + interval=timedelta(days=7) + ) + + # List active workflows + workflows = await manager.list_workflows() + + # Update workflow + await manager.update_workflow(workflow.id, status=WorkflowStatus.PAUSED) + + # Delete workflow + await manager.delete_workflow(workflow.id) + """ def __init__(self, storage_path: str = "workflows.json"): """Initialize the WorkflowManager""" @@ -114,18 +194,12 @@ async def _scheduler_loop(self) -> None: try: logger.info("Workflow scheduler checking for due workflows...") now = datetime.now() - active_workflows = [ - w for w in self.workflows.values() if w.status == WorkflowStatus.ACTIVE - ] + active_workflows = [w for w in self.workflows.values() if w.status == WorkflowStatus.ACTIVE] logger.info(f"Found {len(active_workflows)} active workflows") for workflow in self.workflows.values(): logger.info(f"Checking workflow {workflow.id} ({workflow.name})") - if ( - workflow.status == WorkflowStatus.ACTIVE - and workflow.next_run - and now >= workflow.next_run - ): + if workflow.status == WorkflowStatus.ACTIVE and workflow.next_run and now >= workflow.next_run: logger.info(f"Executing workflow {workflow.id} ({workflow.name})") await self._execute_workflow(workflow) @@ -167,9 +241,7 @@ async def _execute_workflow(self, workflow: Workflow) -> None: if total_invested >= total_target: workflow.status = WorkflowStatus.COMPLETED should_remove = True - logger.info( - f"Workflow {workflow.id} completed - reached total investment target" - ) + logger.info(f"Workflow {workflow.id} completed - reached total investment target") # Remove completed/failed workflows, keep active ones if should_remove: @@ -287,8 +359,7 @@ async def _load_workflows(self) -> None: content = await f.read() data = json.loads(content) self.workflows = { - workflow_id: Workflow.from_dict(workflow_data) - for workflow_id, workflow_data in data.items() + workflow_id: Workflow.from_dict(workflow_data) for workflow_id, workflow_data in data.items() } except Exception as e: logger.error(f"Failed to load workflows: {e}") diff --git a/submodules/moragents_dockers/frontend/components/Chat/index.tsx b/submodules/moragents_dockers/frontend/components/Chat/index.tsx index 5de28b2..4292b1d 100644 --- a/submodules/moragents_dockers/frontend/components/Chat/index.tsx +++ b/submodules/moragents_dockers/frontend/components/Chat/index.tsx @@ -2,19 +2,20 @@ import React, { FC, useEffect, useState } from "react"; import { Flex, Box } from "@chakra-ui/react"; import { ChatMessage } from "@/services/types"; import { useTransactionConfirmations } from "wagmi"; -import { MessageList } from "../MessageList"; -import { ChatInput } from "../ChatInput"; -import { LoadingIndicator } from "../LoadingIndicator"; -import { Widgets, shouldOpenWidget } from "../Widgets"; -import { ChatProps } from "./types"; -import { useChat } from "./hooks"; +import { MessageList } from "@/components/MessageList"; +import { ChatInput } from "@/components/ChatInput"; +import { LoadingIndicator } from "@/components/LoadingIndicator"; +import { Widgets, shouldOpenWidget } from "@/components/Widgets"; +import { ChatProps } from "@/components/Chat/types"; +import { useChat } from "@/components/Chat/hooks"; export const Chat: FC = ({ onSubmitMessage, onCancelSwap, messages, - selectedAgent, onBackendError, + // New prop from Home that indicates whether the sidebar is open + isSidebarOpen = false, }) => { const [messagesData, setMessagesData] = useState(messages); const [activeWidget, setActiveWidget] = useState(null); @@ -39,6 +40,7 @@ export const Chat: FC = ({ useEffect(() => { if (messages.length > 0) { + console.log("messages", messages); const lastMessage = messages[messages.length - 1]; if (lastMessage.role === "assistant" && shouldOpenWidget(lastMessage)) { setActiveWidget(lastMessage); @@ -62,6 +64,11 @@ export const Chat: FC = ({ setActiveWidget(null); }; + // Decide how far from the left we want to be when the sidebar is open vs closed. + // Example: if the sidebar is fully open, let's shift it 280px to the right. + // If it's closed, maybe shift only 80px from the edge. Tweak as needed. + const chatMarginLeft = isSidebarOpen ? "280px" : "80px"; + return ( = ({ width="100%" transition="all 0.3s ease-in-out" mt={4} - paddingLeft={isWidgetOpen ? "5%" : "20%"} - paddingRight={isWidgetOpen ? "35%" : "20%"} + // Existing widget-based padding logic + paddingLeft={isWidgetOpen ? "5%" : "10%"} + paddingRight={isWidgetOpen ? "35%" : "30%"} + // NEW MARGIN to keep space from the sidebar + ml={chatMarginLeft} > - {showSpinner && } + {showSpinner && } 1} @@ -88,8 +97,11 @@ export const Chat: FC = ({ showSpinner || messagesData[messagesData.length - 1]?.role === "swap" } + isSidebarOpen={isSidebarOpen} /> + + {/* The widgets panel on the right side */} Promise; onCancelSwap: (fromAction: number) => void; messages: ChatMessage[]; - selectedAgent: string; onBackendError: () => void; + isSidebarOpen?: boolean; }; export type SwapTransaction = { diff --git a/submodules/moragents_dockers/frontend/components/ChatInput/PrefilledOptions.tsx b/submodules/moragents_dockers/frontend/components/ChatInput/PrefilledOptions.tsx index 9f54f00..3521027 100644 --- a/submodules/moragents_dockers/frontend/components/ChatInput/PrefilledOptions.tsx +++ b/submodules/moragents_dockers/frontend/components/ChatInput/PrefilledOptions.tsx @@ -11,10 +11,12 @@ import { LineChart, Flame, Globe2, - Zap, ArrowLeftRight, + BarChart2, + Shield, Gift, } from "lucide-react"; +import { AGENT_TYPES } from "@/services/constants"; import styles from "./PrefilledOptions.module.css"; type PrefilledOption = { @@ -26,13 +28,8 @@ type PrefilledOption = { }>; }; -type PrefilledOptionsProps = { - onSelect: (message: string) => void; - isWidgetOpen?: boolean; -}; - const prefilledOptionsMap: Record = { - default: { + [AGENT_TYPES.DEFAULT]: { title: "Default Agent 🔄", icon: , examples: [ @@ -40,7 +37,7 @@ const prefilledOptionsMap: Record = { { text: "What Morpheus agents are currently active?", agent: "default" }, ], }, - imagen: { + [AGENT_TYPES.IMAGEN]: { title: "Generate Images 🎨", icon: , examples: [ @@ -51,7 +48,7 @@ const prefilledOptionsMap: Record = { }, ], }, - rag: { + [AGENT_TYPES.RAG]: { title: "Document Analysis 📄", icon: , examples: [ @@ -62,7 +59,7 @@ const prefilledOptionsMap: Record = { }, ], }, - "crypto data": { + [AGENT_TYPES.CRYPTO_DATA]: { title: "Crypto Market Data 📊", icon: , examples: [ @@ -71,7 +68,7 @@ const prefilledOptionsMap: Record = { { text: "What's the FDV of USDC?", agent: "crypto" }, ], }, - "token swap": { + [AGENT_TYPES.TOKEN_SWAP]: { title: "Token Swaps 💱", icon: , examples: [ @@ -79,7 +76,7 @@ const prefilledOptionsMap: Record = { { text: "Exchange my BTC for ETH", agent: "swap" }, ], }, - "tweet sizzler": { + [AGENT_TYPES.TWEET_SIZZLER]: { title: "Tweet Generator 🔥", icon: , examples: [ @@ -90,7 +87,7 @@ const prefilledOptionsMap: Record = { }, ], }, - dca: { + [AGENT_TYPES.DCA]: { title: "DCA Strategy Planning 💰", icon: , examples: [ @@ -98,7 +95,7 @@ const prefilledOptionsMap: Record = { { text: "Help me create a monthly BTC buying strategy", agent: "dca" }, ], }, - base: { + [AGENT_TYPES.BASE]: { title: "Base Transactions 🔄", icon: , examples: [ @@ -106,7 +103,7 @@ const prefilledOptionsMap: Record = { { text: "Swap USDC for ETH on Base", agent: "base" }, ], }, - "mor claims": { + [AGENT_TYPES.MOR_CLAIMS]: { title: "MOR Claims 🎁", icon: , examples: [ @@ -114,7 +111,7 @@ const prefilledOptionsMap: Record = { { text: "Help me claim my pending MOR tokens", agent: "claims" }, ], }, - "mor rewards": { + [AGENT_TYPES.MOR_REWARDS]: { title: "MOR Rewards Tracking 🏆", icon: , examples: [ @@ -122,7 +119,7 @@ const prefilledOptionsMap: Record = { { text: "Calculate my pending MOR rewards", agent: "rewards" }, ], }, - "realtime search": { + [AGENT_TYPES.REALTIME_SEARCH]: { title: "Real-Time Search 🔍", icon: , examples: [ @@ -133,7 +130,7 @@ const prefilledOptionsMap: Record = { { text: "What did Donald Trump say about Bitcoin?", agent: "realtime" }, ], }, - "crypto news": { + [AGENT_TYPES.CRYPTO_NEWS]: { title: "Crypto News Analysis 📰", icon: , examples: [ @@ -141,15 +138,49 @@ const prefilledOptionsMap: Record = { { text: "What's the latest news impact on BTC?", agent: "news" }, ], }, + [AGENT_TYPES.DEXSCREENER]: { + title: "DexScreener 📊", + icon: , + examples: [ + { + text: "What are the most active tokens on solana?", + agent: "dexscreener", + }, + { + text: "Which ethereum tokens have the most trading liquidity?", + agent: "dexscreener", + }, + ], + }, + [AGENT_TYPES.RUGCHECK]: { + title: "Token Safety Analysis 🛡️", + icon: , + examples: [ + { + text: "Check token safety for this mint", + agent: "rugcheck", + }, + { text: "Show me the most voted tokens on rugcheck", agent: "rugcheck" }, + ], + }, }; -const PrefilledOptions: React.FC = ({ +const PrefilledOptions = ({ onSelect, isWidgetOpen = false, + isSidebarOpen = true, +}: { + onSelect: (message: string) => void; + isWidgetOpen?: boolean; + isSidebarOpen?: boolean; }) => { const [selectedAgents, setSelectedAgents] = useState([]); const containerStyle = { - paddingLeft: isWidgetOpen ? "5%" : "20%", + paddingLeft: isWidgetOpen + ? "5%" + : isSidebarOpen + ? "calc(260px + 20%)" // Sidebar width first, then percentage + : "20%", paddingRight: isWidgetOpen ? "35%" : "20%", }; diff --git a/submodules/moragents_dockers/frontend/components/ChatInput/index.module.css b/submodules/moragents_dockers/frontend/components/ChatInput/index.module.css index 02e3c57..fb4ba60 100644 --- a/submodules/moragents_dockers/frontend/components/ChatInput/index.module.css +++ b/submodules/moragents_dockers/frontend/components/ChatInput/index.module.css @@ -1,7 +1,7 @@ .container { position: relative; width: 100%; - background-color: black; + background-color: transparent; margin-bottom: 2rem; } @@ -9,7 +9,7 @@ margin-top: 1rem; padding-left: 1.5rem; padding-right: 1.5rem; - background-color: black; + background-color: transparent; width: 100%; } diff --git a/submodules/moragents_dockers/frontend/components/ChatInput/index.tsx b/submodules/moragents_dockers/frontend/components/ChatInput/index.tsx index edbcf29..8f95e57 100644 --- a/submodules/moragents_dockers/frontend/components/ChatInput/index.tsx +++ b/submodules/moragents_dockers/frontend/components/ChatInput/index.tsx @@ -1,30 +1,128 @@ -import React, { FC, useState } from "react"; +import React, { FC, useState, useEffect, useRef } from "react"; import { - Flex, Textarea, InputGroup, InputLeftAddon, InputRightAddon, IconButton, + Box, + VStack, + Text, } from "@chakra-ui/react"; import { AttachmentIcon } from "@chakra-ui/icons"; import { SendIcon } from "../CustomIcon/SendIcon"; import PrefilledOptions from "./PrefilledOptions"; import styles from "./index.module.css"; +type Command = { + command: string; + description: string; + name: string; +}; + type ChatInputProps = { onSubmit: (message: string, file: File | null) => Promise; disabled: boolean; hasMessages?: boolean; + isSidebarOpen?: boolean; }; export const ChatInput: FC = ({ onSubmit, disabled, hasMessages = false, + isSidebarOpen = false, }) => { const [message, setMessage] = useState(""); const [file, setFile] = useState(null); + const [commands, setCommands] = useState([]); + const [showCommands, setShowCommands] = useState(false); + const [selectedCommandIndex, setSelectedCommandIndex] = useState(0); + const [dropdownPosition, setDropdownPosition] = useState({ + top: 0, + left: 0, + width: 0, + }); + const inputGroupRef = useRef(null); + const inputRef = useRef(null); + const commandsRef = useRef(null); + + useEffect(() => { + fetch("http://localhost:8080/agents/commands") + .then((res) => res.json()) + .then((data) => setCommands(data.commands)) + .catch((error) => console.error("Error fetching commands:", error)); + }, []); + + useEffect(() => { + if (inputGroupRef.current && message.startsWith("/")) { + const rect = inputGroupRef.current.getBoundingClientRect(); + setDropdownPosition({ + top: rect.top, + left: rect.left - 400, + width: rect.width, + }); + } + }, [message, showCommands]); + + const filteredCommands = message.startsWith("/") + ? commands.filter((cmd) => + cmd.command.toLowerCase().includes(message.slice(1).toLowerCase()) + ) + : []; + + useEffect(() => { + setShowCommands(message.startsWith("/") && filteredCommands.length > 0); + setSelectedCommandIndex(0); + }, [message, filteredCommands.length]); + + useEffect(() => { + if (commandsRef.current && showCommands) { + const selectedElement = commandsRef.current.querySelector( + `[data-index="${selectedCommandIndex}"]` + ); + if (selectedElement) { + selectedElement.scrollIntoView({ + block: "center", + behavior: "smooth", + }); + } + } + }, [selectedCommandIndex, showCommands]); + + const handleCommandSelect = (command: Command) => { + setMessage(`/${command.command} `); + setShowCommands(false); + inputRef.current?.focus(); + }; + + const handleKeyDown = (e: React.KeyboardEvent) => { + if (showCommands) { + switch (e.key) { + case "ArrowDown": + e.preventDefault(); + setSelectedCommandIndex((prev) => + Math.min(prev + 1, filteredCommands.length - 1) + ); + break; + case "ArrowUp": + e.preventDefault(); + setSelectedCommandIndex((prev) => Math.max(prev - 1, 0)); + break; + case "Tab": + case "Enter": + e.preventDefault(); + handleCommandSelect(filteredCommands[selectedCommandIndex]); + break; + case "Escape": + setShowCommands(false); + break; + } + } else if (e.key === "Enter" && !e.shiftKey) { + e.preventDefault(); + handleSubmit(); + } + }; const agentSupportsFileUploads = true; @@ -41,10 +139,71 @@ export const ChatInput: FC = ({ return ( <> - {!hasMessages && } + {showCommands && ( + + + {filteredCommands.map((cmd, index) => ( + handleCommandSelect(cmd)} + transition="background-color 0.2s" + borderBottom="1px solid #454945" + _last={{ borderBottom: "none" }} + > + + /{cmd.command} + + + {cmd.name} - {cmd.description} + + + ))} + + + )} +
- - + {!hasMessages && ( + + )} +
+ {agentSupportsFileUploads && ( = ({ )}