Files
cavepediav2/web/agent/main.py
2025-12-07 20:25:53 -07:00

171 lines
5.4 KiB
Python

"""
This is the main entry point for the agent.
It defines the workflow graph, state, tools, nodes and edges.
"""
from typing import Any, List
from langchain.tools import tool
from langchain_core.messages import BaseMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langchain_anthropic import ChatAnthropic
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.types import Command
from langchain_mcp_adapters.client import MultiServerMCPClient
class AgentState(MessagesState):
"""
Here we define the state of the agent
In this instance, we're inheriting from MessagesState, which will bring in
the messages field for conversation history.
"""
tools: List[Any]
# your_custom_agent_state: str = ""
# @tool
# def your_tool_here(your_arg: str):
# """Your tool description here."""
# print(f"Your tool logic here")
# return "Your tool response here."
backend_tools = [
# your_tool_here
]
def get_mcp_client(access_token: str = None):
"""Create MCP client with optional authentication headers."""
headers = {}
if access_token:
headers["Authorization"] = f"Bearer {access_token}"
return MultiServerMCPClient(
{
"cavepedia": {
"transport": "streamable_http",
"url": "https://mcp.caving.dev/mcp",
"timeout": 10.0,
"headers": headers,
}
}
)
# Cache for MCP tools per access token
_mcp_tools_cache = {}
async def get_mcp_tools(access_token: str = None):
"""Lazy load MCP tools with authentication."""
cache_key = access_token or "default"
if cache_key not in _mcp_tools_cache:
try:
mcp_client = get_mcp_client(access_token)
tools = await mcp_client.get_tools()
_mcp_tools_cache[cache_key] = tools
print(f"Loaded {len(tools)} tools from MCP server with auth: {bool(access_token)}")
except Exception as e:
print(f"Warning: Failed to load MCP tools: {e}")
_mcp_tools_cache[cache_key] = []
return _mcp_tools_cache[cache_key]
async def chat_node(state: AgentState, config: RunnableConfig) -> dict:
"""
Standard chat node based on the ReAct design pattern. It handles:
- The model to use (and binds in CopilotKit actions and the tools defined above)
- The system prompt
- Getting a response from the model
- Handling tool calls
For more about the ReAct design pattern, see:
https://www.perplexity.ai/search/react-agents-NcXLQhreS0WDzpVaS4m9Cg
"""
# 0. Extract Auth0 access token from config
configurable = config.get("configurable", {})
access_token = configurable.get("auth0_access_token")
user_roles = configurable.get("auth0_user_roles", [])
print(f"Chat node invoked with auth token: {bool(access_token)}, roles: {user_roles}")
# 1. Define the model
model = ChatAnthropic(model="claude-sonnet-4-5-20250929")
# 1.5 Load MCP tools from the cavepedia server with authentication
mcp_tools = await get_mcp_tools(access_token)
# 2. Bind the tools to the model
model_with_tools = model.bind_tools(
[
*state.get("tools", []), # bind tools defined by ag-ui
*backend_tools,
*mcp_tools, # Add MCP tools from cavepedia server
# your_tool_here
],
# 2.1 Disable parallel tool calls to avoid race conditions,
# enable this for faster performance if you want to manage
# the complexity of running tool calls in parallel.
parallel_tool_calls=False,
)
# 3. Define the system message by which the chat model will be run
system_message = SystemMessage(
content=f"You are a helpful assistant with access to cave-related information through the Cavepedia MCP server. You can help users find information about caves, caving techniques, and related topics. User roles: {', '.join(user_roles) if user_roles else 'none'}"
)
# 4. Run the model to generate a response
response = await model_with_tools.ainvoke(
[
system_message,
*state["messages"],
],
config,
)
# 5. Return the response in the messages
return {"messages": [response]}
async def tool_node_wrapper(state: AgentState, config: RunnableConfig) -> dict:
"""
Custom tool node that handles both backend tools and MCP tools.
"""
# Extract Auth0 access token from config
configurable = config.get("configurable", {})
access_token = configurable.get("auth0_access_token")
# Load MCP tools with authentication and combine with backend tools
mcp_tools = await get_mcp_tools(access_token)
all_tools = [*backend_tools, *mcp_tools]
# Use the standard ToolNode with all tools
node = ToolNode(tools=all_tools)
result = await node.ainvoke(state, config)
return result
# Define the workflow graph
workflow = StateGraph(AgentState)
workflow.add_node("chat_node", chat_node)
workflow.add_node("tools", tool_node_wrapper) # Must be named "tools" for tools_condition
# Set entry point
workflow.add_edge(START, "chat_node")
# Use tools_condition for proper routing
workflow.add_conditional_edges(
"chat_node",
tools_condition,
)
# After tools execute, go back to chat
workflow.add_edge("tools", "chat_node")
graph = workflow.compile()