Skip to content

Commit

Permalink
chore: reorganize files (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlec authored Nov 5, 2024
1 parent a9a6efe commit dea681a
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 103 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ yarn-error.log*
*.tsbuildinfo
next-env.d.ts

.env
.env

# python
/api/__pycache__/
69 changes: 69 additions & 0 deletions api/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import constants

from utils import format_sse
from typing import Iterator

from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage

from cdp_langchain.agent_toolkits import CdpToolkit
from cdp_langchain.utils import CdpAgentkitWrapper

def initialize_agent():
"""Initialize the agent with CDP Agentkit."""
# Initialize LLM.
llm = ChatOpenAI(model="gpt-4o-mini")

# Read wallet data from environment variable
wallet_data = os.getenv(constants.WALLET_DATA_ENV_VAR)

print("Initialized CDP Agentkit with wallet data:", wallet_data)

# Configure CDP Agentkit Langchain Extension.
values = {}
if wallet_data is not None:
# If there is a wallet configuration in environment variables, use it
values = {"cdp_wallet_data": wallet_data}

agentkit = CdpAgentkitWrapper(**values)

# Export and store the updated wallet data back to environment variable
# wallet_data = agentkit.export_wallet()
# print("Exporting wallet data:", wallet_data)
# os.environ[WALLET_DATA_ENV_VAR] = wallet_data

# Initialize CDP Agentkit Toolkit and get tools.
cdp_toolkit = CdpToolkit.from_cdp_agentkit_wrapper(agentkit)
tools = cdp_toolkit.get_tools()

# Store buffered conversation history in memory.
memory = MemorySaver()
config = {"configurable": {"thread_id": "CDP Agentkit Chatbot Example!"}}

# Create ReAct Agent using the LLM and CDP Agentkit tools.
return create_react_agent(
llm,
tools=tools,
checkpointer=memory,
state_modifier="You are a helpful agent that can interact onchain on the Base Layer 2 using the Coinbase Developer Platform Agentkit. You are empowered to interact onchain using your tools. If you ever need funds, you can request them from the faucet. You can also deploy your own ERC-20 tokens, NFTs, and interact with them. If someone asks you to do something you can't do, you can say so, and encourage them to implement it themselves using the CDP SDK + Agentkit, recommend they go to docs.cdp.coinbase.com for more informaton.",
), config

def run_agent(input, agent_executor, config) -> Iterator[str]:
"""Run the agent and yield formatted SSE messages"""
try:
for chunk in agent_executor.stream(
{"messages": [HumanMessage(content=input)]}, config
):
if "agent" in chunk:
content = chunk["agent"]["messages"][0].content
if content:
yield format_sse(content, constants.EVENT_TYPE_AGENT)
elif "tools" in chunk:
content = chunk["tools"]["messages"][0].content
if content:
yield format_sse(content, constants.EVENT_TYPE_TOOLS)
except Exception as e:
yield format_sse(f"Error: {str(e)}", constants.EVENT_TYPE_ERROR)
7 changes: 6 additions & 1 deletion api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@
EVENT_TYPE_ERROR: Final[str]= "error"

# Environment variables
WALLET_DATA_ENV_VAR: Final[str] = "CDP_WALLET_DATA"
WALLET_DATA_ENV_VAR: Final[str] = "CDP_WALLET_DATA"

# Errors
class InputValidationError(Exception):
"""Custom exception for input validation errors"""
pass
103 changes: 2 additions & 101 deletions api/index.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,11 @@
from flask import Flask, request, Response, stream_with_context, jsonify
from dotenv import load_dotenv
import os
import time
import json
import constants
from typing import Iterator

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

# Import CDP Agentkit Langchain Extension.
from cdp_langchain.agent_toolkits import CdpToolkit
from cdp_langchain.utils import CdpAgentkitWrapper
from inference import run_inference
from constants import InputValidationError

load_dotenv()
app = Flask(__name__)

class InputValidationError(Exception):
"""Custom exception for input validation errors"""
pass

def format_sse(data: str, event: str = None) -> str:
"""Format data as SSE"""
response = {
"event": event,
"data": data
}
return json.dumps(response) + "\n"

def initialize_agent():
"""Initialize the agent with CDP Agentkit."""
# Initialize LLM.
llm = ChatOpenAI(model="gpt-4o-mini")

# Read wallet data from environment variable
wallet_data = os.getenv(constants.WALLET_DATA_ENV_VAR)

print("Initialized CDP Agentkit with wallet data:", wallet_data)

# Configure CDP Agentkit Langchain Extension.
values = {}
if wallet_data is not None:
# If there is a wallet configuration in environment variables, use it
values = {"cdp_wallet_data": wallet_data}

agentkit = CdpAgentkitWrapper(**values)

# Export and store the updated wallet data back to environment variable
# wallet_data = agentkit.export_wallet()
# print("Exporting wallet data:", wallet_data)
# os.environ[WALLET_DATA_ENV_VAR] = wallet_data

# Initialize CDP Agentkit Toolkit and get tools.
cdp_toolkit = CdpToolkit.from_cdp_agentkit_wrapper(agentkit)
tools = cdp_toolkit.get_tools()

# Store buffered conversation history in memory.
memory = MemorySaver()
config = {"configurable": {"thread_id": "CDP Agentkit Chatbot Example!"}}

# Create ReAct Agent using the LLM and CDP Agentkit tools.
return create_react_agent(
llm,
tools=tools,
checkpointer=memory,
state_modifier="You are a helpful agent that can interact onchain on the Base Layer 2 using the Coinbase Developer Platform Agentkit. You are empowered to interact onchain using your tools. If you ever need funds, you can request them from the faucet. You can also deploy your own ERC-20 tokens, NFTs, and interact with them. If someone asks you to do something you can't do, you can say so, and encourage them to implement it themselves using the CDP SDK + Agentkit, recommend they go to docs.cdp.coinbase.com for more informaton.",
), config

# Autonomous Mode
def run_agent(input, agent_executor, config) -> Iterator[str]:
"""Run the agent and yield formatted SSE messages"""
try:
for chunk in agent_executor.stream(
{"messages": [HumanMessage(content=input)]}, config
):
if "agent" in chunk:
content = chunk["agent"]["messages"][0].content
if content:
yield format_sse(content, constants.EVENT_TYPE_AGENT)
elif "tools" in chunk:
content = chunk["tools"]["messages"][0].content
if content:
yield format_sse(content, constants.EVENT_TYPE_TOOLS)
except Exception as e:
yield format_sse(f"Error: {str(e)}", constants.EVENT_TYPE_ERROR)

def run_inference(input: str = "Be creative and do something interesting on the blockchain. Choose an action or set of actions and execute it in a way that highlights your abilities.") -> Iterator[str]:
"""Initialize agent, run inference and yield SSE responses"""
start_time = time.time()
print("Running agent...", flush=True)

try:
agent_executor, config = initialize_agent()
print(f"Agent init time: {time.time() - start_time:.2f} seconds", flush=True)

for response in run_agent(input, agent_executor=agent_executor, config=config):
yield response
except Exception as e:
print(f"Error during inference: {str(e)}", flush=True)
yield format_sse(f"Error: {str(e)}", constants.EVENT_TYPE_ERROR)
finally:
print("Agent finished running.", flush=True)
yield format_sse("Agent finished", constants.EVENT_TYPE_COMPLETED)


@app.route("/api/chat", methods=['POST'])
def chat():
Expand Down
24 changes: 24 additions & 0 deletions api/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import time
import constants
from typing import Iterator

from agent import initialize_agent, run_agent
from utils import format_sse

def run_inference(input: str = "Be creative and do something interesting on the blockchain. Choose an action or set of actions and execute it in a way that highlights your abilities.") -> Iterator[str]:
"""Initialize agent, run inference and yield SSE responses"""
start_time = time.time()
print("Running agent...", flush=True)

try:
agent_executor, config = initialize_agent()
print(f"Agent init time: {time.time() - start_time:.2f} seconds", flush=True)

for response in run_agent(input, agent_executor=agent_executor, config=config):
yield response
except Exception as e:
print(f"Error during inference: {str(e)}", flush=True)
yield format_sse(f"Error: {str(e)}", constants.EVENT_TYPE_ERROR)
finally:
print("Agent finished running.", flush=True)
yield format_sse("Agent finished", constants.EVENT_TYPE_COMPLETED)
9 changes: 9 additions & 0 deletions api/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import json

def format_sse(data: str, event: str = None) -> str:
"""Format data as SSE"""
response = {
"event": event,
"data": data
}
return json.dumps(response) + "\n"

0 comments on commit dea681a

Please sign in to comment.