Compare commits

...

5 Commits

Author SHA1 Message Date
f6891d231f 3 -> 2 candidates
All checks were successful
Build and Push Agent Docker Image / build (push) Successful in 2m26s
Build and Push Web Docker Image / build (push) Successful in 9m15s
2025-12-26 02:22:34 +01:00
337540496a simplify mcp 2025-12-26 02:18:00 +01:00
383e452322 allow prioritizing prefixes 2025-12-25 03:18:12 +01:00
a91fdb315c more concise 2025-12-25 02:52:50 +01:00
b156094691 sources only 2025-12-25 02:51:49 +01:00
6 changed files with 144 additions and 114 deletions

View File

@@ -59,14 +59,22 @@ def embed(text, input_type):
assert resp.embeddings.float_ is not None assert resp.embeddings.float_ is not None
return resp.embeddings.float_[0] return resp.embeddings.float_[0]
def search(query, roles: list[str], top_n: int = 3, max_content_length: int = 1500) -> list[dict]: @mcp.tool
"""Search with vector similarity, then rerank with Cohere for better relevance.""" def search_caving_documents(query: str, priority_prefixes: list[str] | None = None) -> dict:
"""Search caving documents for information about caves, techniques, safety, accidents, history, and more.
Args:
query: Search query
priority_prefixes: Optional list of key prefixes to prioritize (e.g., ['nss/aca'] for rescue topics)
"""
roles = get_user_roles()
if not roles:
return {"results": [], "note": "No results. Answer based on your knowledge."}
query_embedding = embed(query, 'search_query') query_embedding = embed(query, 'search_query')
if not roles:
return []
# Fetch more candidates for reranking # Fetch more candidates for reranking
top_n = 2
candidate_limit = top_n * 4 candidate_limit = top_n * 4
rows = conn.execute( rows = conn.execute(
'SELECT * FROM embeddings WHERE embedding IS NOT NULL AND role = ANY(%s) ORDER BY embedding <=> %s::vector LIMIT %s', 'SELECT * FROM embeddings WHERE embedding IS NOT NULL AND role = ANY(%s) ORDER BY embedding <=> %s::vector LIMIT %s',
@@ -74,52 +82,37 @@ def search(query, roles: list[str], top_n: int = 3, max_content_length: int = 15
).fetchall() ).fetchall()
if not rows: if not rows:
return [] return {"results": [], "note": "No results found. Answer based on your knowledge."}
# Rerank with Cohere for better relevance # Rerank with Cohere for better relevance
rerank_resp = co.rerank( rerank_resp = co.rerank(
query=query, query=query,
documents=[row['content'] or '' for row in rows], documents=[row['content'] or '' for row in rows],
model='rerank-v3.5', model='rerank-v3.5',
top_n=top_n, top_n=min(top_n * 2, len(rows)),
) )
# Build results with optional priority boost
docs = [] docs = []
for result in rerank_resp.results: for result in rerank_resp.results:
row = rows[result.index] row = rows[result.index]
score = result.relevance_score
# Boost score if key starts with any priority prefix (e.g., 'nss/aca')
if priority_prefixes:
key = row['key'] or ''
if any(key.startswith(prefix) for prefix in priority_prefixes):
score = min(1.0, score * 1.3)
content = row['content'] or '' content = row['content'] or ''
if len(content) > max_content_length: docs.append({'key': row['key'], 'content': content, 'relevance': round(score, 3)})
content = content[:max_content_length] + '...[truncated, use get_document_page for full text]'
docs.append({'key': row['key'], 'content': content, 'relevance': round(result.relevance_score, 3)})
return docs
@mcp.tool # Re-sort by boosted score and return top_n
def get_cave_location(cave: str, state: str, county: str) -> list[dict]: docs.sort(key=lambda x: x['relevance'], reverse=True)
"""Lookup cave location as coordinates.""" return {
roles = get_user_roles() "results": docs[:top_n],
return search(f'{cave} Location, latitude, Longitude. Located in {state} and {county} county.', roles) "note": "These are ALL available results. Do NOT search again - answer using these results now."
}
@mcp.tool
def general_caving_information(query: str) -> list[dict]:
"""General purpose search for any topic related to caves."""
roles = get_user_roles()
return search(query, roles)
@mcp.tool
def get_document_page(key: str) -> dict:
"""Fetch full content for a document page. Pass the exact 'key' value from search results."""
roles = get_user_roles()
if not roles:
return {"error": "No roles assigned"}
row = conn.execute(
'SELECT key, content FROM embeddings WHERE key = %s AND role = ANY(%s)',
(key, roles)
).fetchone()
if row:
return {"key": row["key"], "content": row["content"]}
return {"error": f"Page not found: {key}"}
@mcp.tool @mcp.tool
def get_user_info() -> dict: def get_user_info() -> dict:

View File

@@ -26,7 +26,6 @@ logfire.configure(
logfire.instrument_pydantic_ai() logfire.instrument_pydantic_ai()
logfire.instrument_httpx() logfire.instrument_httpx()
from typing import Any
from pydantic_ai import Agent, ModelMessage, RunContext from pydantic_ai import Agent, ModelMessage, RunContext
from pydantic_ai.settings import ModelSettings from pydantic_ai.settings import ModelSettings
from pydantic_ai.mcp import CallToolFunc from pydantic_ai.mcp import CallToolFunc
@@ -43,8 +42,8 @@ def limit_history(ctx: RunContext[None], messages: list[ModelMessage]) -> list[M
if not messages: if not messages:
return messages return messages
# Keep only the last 4 messages # Keep last 10 messages
messages = messages[-4:] messages = messages[-10:]
# Check if the last message is an assistant response with a tool call # Check if the last message is an assistant response with a tool call
# If so, remove it - it's orphaned (no tool result followed) # If so, remove it - it's orphaned (no tool result followed)
@@ -80,37 +79,34 @@ AGENT_INSTRUCTIONS = """Caving assistant. Help with exploration, safety, surveyi
Rules: Rules:
1. ALWAYS cite sources in a bulleted list at the end of every reply, even if there's only one. Format them human-readably (e.g., "- The Trog 2021, page 19" not "vpi/trog/2021-trog.pdf/page-19.pdf"). 1. ALWAYS cite sources in a bulleted list at the end of every reply, even if there's only one. Format them human-readably (e.g., "- The Trog 2021, page 19" not "vpi/trog/2021-trog.pdf/page-19.pdf").
2. Say when uncertain. Never hallucinate. 2. Say when uncertain. Never hallucinate.
3. Be safety-conscious. 3. Be direct—no sycophantic phrases.
4. Can create ascii diagrams/maps. 4. Keep responses concise.
5. Be direct—no sycophantic phrases. 5. SEARCH EXACTLY ONCE. After searching, IMMEDIATELY answer using those results. NEVER search again - additional searches are blocked and waste resources.
6. Keep responses concise. 6. For rescue, accident, or emergency-related queries, use priority_prefixes=['nss/aca'] when searching to prioritize official accident reports."""
7. Use tools sparingly—one search usually suffices.
8. If you hit the search limit, end your reply with an italicized note: *Your question may be too broad. Try asking something more specific.* Do NOT mention "tools" or "tool limits"—the user doesn't know what those are.""" SOURCES_ONLY_INSTRUCTIONS = """SOURCES ONLY MODE: Give exactly ONE sentence summary. Then list sources with specific page numbers (e.g., "- The Trog 2021, page 19"). No explanations."""
def create_tool_call_limiter(max_calls: int = 3): def create_search_limiter():
"""Create a process_tool_call callback that limits tool calls.""" """Block searches after the first one."""
call_count = [0] # Mutable container for closure searched = [False]
async def process_tool_call( async def process_tool_call(
ctx: RunContext, ctx: RunContext,
call_tool: CallToolFunc, call_tool: CallToolFunc,
name: str, name: str,
tool_args: dict[str, Any], tool_args: dict,
): ):
call_count[0] += 1 if name == "search_caving_documents":
if call_count[0] > max_calls: if searched[0]:
return ( return "You have already searched. Use the results you have."
f"SEARCH LIMIT REACHED: You have made {max_calls} searches. " searched[0] = True
"Stop searching and answer now with what you have. "
"End your reply with: *Your question may be too broad. Try asking something more specific.*"
)
return await call_tool(name, tool_args) return await call_tool(name, tool_args)
return process_tool_call return process_tool_call
def create_agent(user_roles: list[str] | None = None): def create_agent(user_roles: list[str] | None = None, sources_only: bool = False):
"""Create an agent with MCP tools configured for the given user roles.""" """Create an agent with MCP tools configured for the given user roles."""
toolsets = [] toolsets = []
@@ -129,7 +125,7 @@ def create_agent(user_roles: list[str] | None = None):
url=CAVE_MCP_URL, url=CAVE_MCP_URL,
headers={"x-user-roles": roles_header}, headers={"x-user-roles": roles_header},
timeout=30.0, timeout=30.0,
process_tool_call=create_tool_call_limiter(max_calls=3), process_tool_call=create_search_limiter(),
) )
toolsets.append(mcp_server) toolsets.append(mcp_server)
logger.info(f"MCP server configured with roles: {user_roles}") logger.info(f"MCP server configured with roles: {user_roles}")
@@ -140,10 +136,15 @@ def create_agent(user_roles: list[str] | None = None):
else: else:
logger.info("MCP server unavailable - running without MCP tools") logger.info("MCP server unavailable - running without MCP tools")
# Build instructions based on mode
instructions = AGENT_INSTRUCTIONS
if sources_only:
instructions = f"{SOURCES_ONLY_INSTRUCTIONS}\n\n{AGENT_INSTRUCTIONS}"
return Agent( return Agent(
model="anthropic:claude-sonnet-4-5", model="anthropic:claude-sonnet-4-5",
toolsets=toolsets if toolsets else None, toolsets=toolsets if toolsets else None,
instructions=AGENT_INSTRUCTIONS, instructions=instructions,
history_processors=[limit_history], history_processors=[limit_history],
model_settings=ModelSettings(max_tokens=4096), model_settings=ModelSettings(max_tokens=4096),
) )

View File

@@ -67,8 +67,13 @@ async def handle_agent_request(request: Request) -> Response:
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.warning(f"Failed to parse x-user-roles header: {e}") logger.warning(f"Failed to parse x-user-roles header: {e}")
# Create agent with the user's roles # Extract sources-only mode from header
agent = create_agent(user_roles) sources_only = request.headers.get("x-sources-only", "false") == "true"
if sources_only:
logger.info("Sources-only mode enabled")
# Create agent with the user's roles and mode
agent = create_agent(user_roles, sources_only=sources_only)
# Dispatch the request - tool limits handled by ToolCallLimiter in agent.py # Dispatch the request - tool limits handled by ToolCallLimiter in agent.py
return await AGUIAdapter.dispatch_request( return await AGUIAdapter.dispatch_request(

View File

@@ -15,13 +15,19 @@ export const POST = async (req: NextRequest) => {
const session = await auth0.getSession(); const session = await auth0.getSession();
const userRoles = (session?.user?.roles as string[]) || []; const userRoles = (session?.user?.roles as string[]) || [];
console.log("DEBUG: User roles from session:", userRoles); // Get sources-only mode from query param
const url = new URL(req.url);
const sourcesOnly = url.searchParams.get("sourcesOnly") === "true";
// Create HttpAgent with user roles header console.log("DEBUG: User roles from session:", userRoles);
console.log("DEBUG: Sources only mode:", sourcesOnly);
// Create HttpAgent with user roles and sources-only headers
const agent = new HttpAgent({ const agent = new HttpAgent({
url: process.env.AGENT_URL || "http://localhost:8000/", url: process.env.AGENT_URL || "http://localhost:8000/",
headers: { headers: {
"x-user-roles": JSON.stringify(userRoles), "x-user-roles": JSON.stringify(userRoles),
"x-sources-only": sourcesOnly ? "true" : "false",
}, },
}); });

View File

@@ -1,6 +1,5 @@
import type { Metadata } from "next"; import type { Metadata } from "next";
import { CopilotKit } from "@copilotkit/react-core";
import { Auth0Provider } from "@auth0/nextjs-auth0/client"; import { Auth0Provider } from "@auth0/nextjs-auth0/client";
import "./globals.css"; import "./globals.css";
import "@copilotkit/react-ui/styles.css"; import "@copilotkit/react-ui/styles.css";
@@ -19,9 +18,7 @@ export default function RootLayout({
<html lang="en"> <html lang="en">
<body className={"antialiased"}> <body className={"antialiased"}>
<Auth0Provider> <Auth0Provider>
<CopilotKit runtimeUrl="/api/copilotkit" agent="vpi_1000"> {children}
{children}
</CopilotKit>
</Auth0Provider> </Auth0Provider>
</body> </body>
</html> </html>

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { useCopilotAction, useCopilotChat } from "@copilotkit/react-core"; import { CopilotKit, useCopilotAction, useCopilotChat } from "@copilotkit/react-core";
import { CopilotKitCSSProperties, CopilotChat } from "@copilotkit/react-ui"; import { CopilotKitCSSProperties, CopilotChat } from "@copilotkit/react-ui";
import { useState } from "react"; import { useState } from "react";
import { useUser } from "@auth0/nextjs-auth0/client"; import { useUser } from "@auth0/nextjs-auth0/client";
@@ -36,10 +36,8 @@ function LoadingOverlay() {
} }
} }
export default function CopilotKitPage() { // Chat content with CopilotKit hooks - must be inside CopilotKit provider
const [themeColor, setThemeColor] = useState("#6366f1"); function ChatContent({ themeColor, setThemeColor }: { themeColor: string; setThemeColor: (color: string) => void }) {
const { user, isLoading: authLoading } = useUser();
useCopilotAction({ useCopilotAction({
name: "setThemeColor", name: "setThemeColor",
parameters: [{ parameters: [{
@@ -52,6 +50,35 @@ export default function CopilotKitPage() {
}, },
}); });
return (
<div
className="flex-1 flex justify-center py-8 px-2 overflow-hidden relative"
style={{ "--copilot-kit-primary-color": themeColor } as CopilotKitCSSProperties}
>
<div className="h-full w-full max-w-5xl flex flex-col">
<CopilotChat
labels={{
title: "AI Cartwright",
initial: "Hello! I'm here to help with anything related to caving. Ask me about caves, techniques, safety, equipment, or anything else caving-related!",
}}
className="h-full w-full"
/>
</div>
<LoadingOverlay />
</div>
);
}
export default function CopilotKitPage() {
const [themeColor, setThemeColor] = useState("#6366f1");
const [sourcesOnlyMode, setSourcesOnlyMode] = useState(false);
const { user, isLoading: authLoading } = useUser();
// Dynamic runtime URL based on sources-only mode
const runtimeUrl = sourcesOnlyMode
? "/api/copilotkit?sourcesOnly=true"
: "/api/copilotkit";
// Show loading state while checking authentication // Show loading state while checking authentication
if (authLoading) { if (authLoading) {
return ( return (
@@ -88,50 +115,51 @@ export default function CopilotKitPage() {
// If authenticated, show the CopilotKit chat with user profile // If authenticated, show the CopilotKit chat with user profile
return ( return (
<main <CopilotKit
style={{ "--copilot-kit-primary-color": themeColor } as CopilotKitCSSProperties} runtimeUrl={runtimeUrl}
className="h-screen w-screen flex flex-col bg-gray-50" agent="vpi_1000"
key={sourcesOnlyMode ? "sources" : "normal"}
> >
{/* Header with user profile and logout */} <main className="h-screen w-screen flex flex-col bg-gray-50">
<div className="w-full bg-white shadow-sm border-b border-gray-200 px-4 py-3"> {/* Header with user profile and logout */}
<div className="max-w-7xl mx-auto flex justify-between items-center"> <div className="w-full bg-white shadow-sm border-b border-gray-200 px-4 py-3">
<div className="flex items-center gap-4"> <div className="max-w-7xl mx-auto flex justify-between items-center">
<h1 className="text-xl font-semibold text-gray-900">Cavepedia</h1> <div className="flex items-center gap-4">
</div> <h1 className="text-xl font-semibold text-gray-900">Cavepedia</h1>
<div className="flex items-center gap-4"> <label className="flex items-center gap-2 text-sm text-gray-600 cursor-pointer">
{user.picture && ( <input
<img type="checkbox"
src={user.picture} checked={sourcesOnlyMode}
alt={user.name || 'User'} onChange={(e) => setSourcesOnlyMode(e.target.checked)}
className="w-8 h-8 rounded-full" className="w-4 h-4 rounded border-gray-300 text-indigo-600 focus:ring-indigo-500"
/> />
)} Sources only
<div className="flex flex-col items-end"> </label>
<span className="text-sm text-gray-700">{user.name}</span> </div>
{(user as any).roles && (user as any).roles.length > 0 && ( <div className="flex items-center gap-4">
<span className="text-xs text-gray-500"> {user.picture && (
{(user as any).roles.join(', ')} <img
</span> src={user.picture}
)} alt={user.name || 'User'}
className="w-8 h-8 rounded-full"
/>
)}
<div className="flex flex-col items-end">
<span className="text-sm text-gray-700">{user.name}</span>
{(user as any).roles && (user as any).roles.length > 0 && (
<span className="text-xs text-gray-500">
{(user as any).roles.join(', ')}
</span>
)}
</div>
<LogoutButton />
</div> </div>
<LogoutButton />
</div> </div>
</div> </div>
</div>
{/* CopilotKit Chat */} {/* CopilotKit Chat */}
<div className="flex-1 flex justify-center py-8 px-2 overflow-hidden relative"> <ChatContent themeColor={themeColor} setThemeColor={setThemeColor} />
<div className="h-full w-full max-w-5xl flex flex-col"> </main>
<CopilotChat </CopilotKit>
labels={{
title: "AI Cartwright",
initial: "Hello! I'm here to help with anything related to caving. Ask me about caves, techniques, safety, equipment, or anything else caving-related!",
}}
className="h-full w-full"
/>
</div>
<LoadingOverlay />
</div>
</main>
); );
} }