|
|
|
|
@@ -19,19 +19,37 @@ logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
CAVE_MCP_URL = os.getenv("CAVE_MCP_URL", "https://mcp.caving.dev/mcp")
|
|
|
|
|
|
|
|
|
|
logger.info("Initializing Cavepedia agent...")
|
|
|
|
|
logger.info(f"Initializing Cavepedia agent with CAVE_MCP_URL={CAVE_MCP_URL}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def limit_history(ctx: RunContext[None], messages: list[ModelMessage]) -> list[ModelMessage]:
|
|
|
|
|
"""Limit conversation history to manage token usage."""
|
|
|
|
|
# Keep last 8 messages for context, but not unlimited
|
|
|
|
|
return messages[-8:]
|
|
|
|
|
"""Limit history and clean up orphaned tool calls to prevent API errors."""
|
|
|
|
|
from pydantic_ai.messages import ModelResponse, ToolCallPart
|
|
|
|
|
|
|
|
|
|
if not messages:
|
|
|
|
|
return messages
|
|
|
|
|
|
|
|
|
|
# Keep only the last 4 messages
|
|
|
|
|
messages = messages[-4:]
|
|
|
|
|
|
|
|
|
|
# Check if the last message is an assistant response with a tool call
|
|
|
|
|
# If so, remove it - it's orphaned (no tool result followed)
|
|
|
|
|
if messages:
|
|
|
|
|
last_msg = messages[-1]
|
|
|
|
|
if isinstance(last_msg, ModelResponse):
|
|
|
|
|
has_tool_call = any(isinstance(part, ToolCallPart) for part in last_msg.parts)
|
|
|
|
|
if has_tool_call:
|
|
|
|
|
logger.warning("Removing orphaned tool call from history")
|
|
|
|
|
return messages[:-1]
|
|
|
|
|
|
|
|
|
|
return messages
|
|
|
|
|
|
|
|
|
|
def check_mcp_available(url: str, timeout: float = 5.0) -> bool:
|
|
|
|
|
"""Check if MCP server is reachable via health endpoint."""
|
|
|
|
|
try:
|
|
|
|
|
# Use the health endpoint instead of the MCP endpoint
|
|
|
|
|
health_url = url.rsplit("/", 1)[0] + "/health"
|
|
|
|
|
logger.info(f"Checking MCP health at: {health_url}")
|
|
|
|
|
response = httpx.get(health_url, timeout=timeout, follow_redirects=True)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
return True
|
|
|
|
|
@@ -41,9 +59,7 @@ def check_mcp_available(url: str, timeout: float = 5.0) -> bool:
|
|
|
|
|
logger.warning(f"MCP server not reachable: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Check if MCP is available at startup
|
|
|
|
|
MCP_AVAILABLE = check_mcp_available(CAVE_MCP_URL)
|
|
|
|
|
logger.info(f"MCP server available: {MCP_AVAILABLE}")
|
|
|
|
|
# MCP availability is checked lazily in create_agent()
|
|
|
|
|
|
|
|
|
|
AGENT_INSTRUCTIONS = """Caving assistant. Help with exploration, safety, surveying, locations, geology, equipment, history, conservation.
|
|
|
|
|
|
|
|
|
|
@@ -54,14 +70,17 @@ Rules:
|
|
|
|
|
4. Can create ascii diagrams/maps.
|
|
|
|
|
5. Be direct—no sycophantic phrases.
|
|
|
|
|
6. Keep responses concise.
|
|
|
|
|
7. Search ONCE, then answer with what you found. Do not search repeatedly for the same topic."""
|
|
|
|
|
7. Use tools sparingly—one search usually suffices. Answer from your knowledge when possible."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_agent(user_roles: list[str] | None = None):
|
|
|
|
|
"""Create an agent with MCP tools configured for the given user roles."""
|
|
|
|
|
toolsets = []
|
|
|
|
|
|
|
|
|
|
if MCP_AVAILABLE and user_roles:
|
|
|
|
|
# Check MCP availability lazily (each request) to handle startup race conditions
|
|
|
|
|
mcp_available = check_mcp_available(CAVE_MCP_URL) if user_roles else False
|
|
|
|
|
|
|
|
|
|
if mcp_available and user_roles:
|
|
|
|
|
try:
|
|
|
|
|
import json
|
|
|
|
|
from pydantic_ai.mcp import MCPServerStreamableHTTP
|
|
|
|
|
@@ -92,7 +111,4 @@ def create_agent(user_roles: list[str] | None = None):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Create a default agent for health checks etc
|
|
|
|
|
agent = create_agent()
|
|
|
|
|
|
|
|
|
|
logger.info("Agent module initialized successfully")
|
|
|
|
|
|