ducklm/duck_core/api.py

886 lines
36 KiB
Python

import asyncio
import json
import logging
from pathlib import Path
from typing import Any
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from duck_core.approvals.service import ApprovalService
from duck_core.config import get_settings
from duck_core.conversations.store import ConversationStore
from duck_core.events.store import EventStore
from duck_core.experience.recorder import ExperienceRecorder
from duck_core.memory.vector_memory import EmbeddingsUnavailableError, VectorMemory
from duck_core.memory.store import MemoryStore
from duck_core.model_client import ModelClient
from duck_core.runtime_loop import RuntimeLoop
from duck_core.skills.registry import SkillRegistry
from duck_core.tasks.store import TaskStore
logger = logging.getLogger(__name__)
class ChatRequest(BaseModel):
message: str
conversation_id: str | None = None
workspace: str | None = None
debug: bool = False
class ConversationRequest(BaseModel):
title: str | None = None
workspace: str | None = None
class ContinueRequest(BaseModel):
approval_id: str
class PasswordRequest(BaseModel):
approval_id: str
password: str
class MemoryRequest(BaseModel):
text: str
workspace: str | None = None
scope: str | None = None
conversation_id: str | None = None
memory_type: str = "note"
importance: float = 0.5
metadata: dict[str, Any] = {}
def create_app() -> FastAPI:
get_settings.cache_clear()
settings = get_settings()
if settings.api_host == "0.0.0.0":
logger.warning(
"DuckLM API is listening on 0.0.0.0. This may expose local tool execution endpoints."
)
Path(settings.workspace).mkdir(parents=True, exist_ok=True)
Path(settings.db_path).parent.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="DuckLM", version="0.1.0")
templates = Jinja2Templates(directory="duck_core/web/templates")
app.mount("/static", StaticFiles(directory="duck_core/web/static"), name="static")
task_store = TaskStore(settings.db_path)
event_store = EventStore(settings.db_path)
conversations = ConversationStore(settings.db_path)
model_client = ModelClient()
approvals = ApprovalService(settings.db_path)
runtime = RuntimeLoop(task_store, event_store, model_client, approval_service=approvals)
skills = SkillRegistry("skills")
experience = ExperienceRecorder(settings.db_path)
memory = VectorMemory(settings.qdrant_url, embeddings_base_url=None)
memory_store = MemoryStore(settings.db_path)
@app.on_event("startup")
async def startup() -> None:
await task_store.init()
await event_store.init()
await conversations.init()
await approvals.init()
await experience.init()
await memory_store.init()
@app.get("/", response_class=HTMLResponse)
async def index(request: Request) -> HTMLResponse:
return templates.TemplateResponse(request, "index.html")
@app.get("/favicon.ico", include_in_schema=False)
async def favicon() -> FileResponse:
return FileResponse("favicon.ico")
@app.get("/approvals", response_class=HTMLResponse)
async def approvals_page(request: Request) -> HTMLResponse:
return templates.TemplateResponse(request, "approvals.html")
@app.get("/skills", response_class=HTMLResponse)
async def skills_page(request: Request) -> HTMLResponse:
return templates.TemplateResponse(request, "skills.html")
@app.get("/memory", response_class=HTMLResponse)
async def memory_page(request: Request) -> HTMLResponse:
return templates.TemplateResponse(request, "memory.html")
@app.get("/experience", response_class=HTMLResponse)
async def experience_page(request: Request) -> HTMLResponse:
return templates.TemplateResponse(request, "experience.html")
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/v1/status")
async def status() -> dict[str, Any]:
return {
"name": "DuckLM",
"version": "0.1.0",
"api_host": settings.api_host,
"api_port": settings.api_port,
"workspace": settings.workspace,
"db_path": settings.db_path,
}
@app.get("/v1/models/roles")
async def roles() -> dict[str, Any]:
return model_client.list_roles()
@app.get("/v1/models/ping")
async def models_ping() -> dict[str, Any]:
return await model_client.ping()
@app.post("/v1/chat")
async def chat(body: ChatRequest) -> dict[str, Any]:
conversation = await conversations.ensure(
body.conversation_id,
title_from_message(body.message),
body.workspace or settings.workspace,
)
history = await conversation_history(conversation.conversation_id)
memory_records = await relevant_memory(
body.message, conversation.workspace, conversation.conversation_id
)
result = await runtime.run_chat(
body.message,
conversation.workspace,
body.debug,
history_messages=history,
memory_records=memory_records,
)
await conversations.add_message(
conversation.conversation_id,
"user",
body.message,
task_id=result.task_id,
status=result.status,
)
await conversations.add_message(
conversation.conversation_id,
"assistant",
result.final_response,
reasoning_content=result.reasoning_content,
task_id=result.task_id,
status=result.status,
)
return {**result.__dict__, "conversation_id": conversation.conversation_id}
async def conversation_history(conversation_id: str) -> list[dict[str, str]]:
messages = await conversations.list_messages(conversation_id, limit=20)
return [
{"role": message.role, "content": message.content}
for message in messages
if message.role in {"user", "assistant"} and message.content
]
def title_from_message(message: str) -> str:
title = " ".join(message.strip().split())
return title[:60] or "New chat"
async def relevant_memory(
query: str, workspace: str, conversation_id: str | None
) -> list[dict[str, str]]:
records = await memory_store.relevant(
workspace=workspace,
conversation_id=conversation_id,
query=query,
limit=8,
)
return [
{"scope": record.scope, "text": record.text}
for record in records
if record.text
]
@app.get("/v1/conversations")
async def list_conversations() -> list[dict[str, Any]]:
return [conversation.model_dump() for conversation in await conversations.list()]
@app.post("/v1/conversations")
async def create_conversation(body: ConversationRequest) -> dict[str, Any]:
conversation = await conversations.create(
body.title or "New chat",
body.workspace or settings.workspace,
)
return conversation.model_dump()
@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str) -> dict[str, Any]:
conversation = await conversations.get(conversation_id)
if conversation is None:
raise HTTPException(status_code=404, detail="Conversation not found")
messages = await conversations.list_messages(conversation_id)
return {
**conversation.model_dump(),
"messages": [message.model_dump() for message in messages],
}
def sse(event: str, payload: dict[str, Any]) -> str:
return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
async def emit_tool_events(task_id: str, after_sequence: int):
events = await event_store.list_events(task_id)
visible_types = {
"tool_call_started",
"tool_call_finished",
"tool_approval_requested",
}
for event in events:
if event.sequence > after_sequence and event.event_type in visible_types:
yield sse(event.event_type, event.model_dump())
@app.post("/v1/chat/stream")
async def chat_stream(body: ChatRequest) -> StreamingResponse:
async def generator():
conversation = await conversations.ensure(
body.conversation_id,
title_from_message(body.message),
body.workspace or settings.workspace,
)
history = await conversation_history(conversation.conversation_id)
memory_records = await relevant_memory(
body.message, conversation.workspace, conversation.conversation_id
)
task = await task_store.create_task(
body.message, conversation.workspace, body.debug
)
task_event = await event_store.append(
task.task_id,
"task_created",
{
"message": body.message,
"workspace": conversation.workspace,
"debug": body.debug,
"conversation_id": conversation.conversation_id,
},
)
yield sse("task_created", task_event.model_dump())
reasoning_parts: list[str] = []
content_parts: list[str] = []
try:
messages = runtime.context_builder.build_basic_messages(
task, history, memory_records
)
tool_observations = await runtime._run_action_loop(
task.task_id, messages, conversation.workspace
)
async for tool_event in emit_tool_events(task.task_id, task_event.sequence):
yield tool_event
if any(observation.get("requires_approval") for observation in tool_observations):
await task_store.waiting_for_approval(task.task_id)
await event_store.append(
task.task_id,
"task_waiting_for_approval",
{"observations": tool_observations},
)
await conversations.add_message(
conversation.conversation_id,
"user",
body.message,
task_id=task.task_id,
status="waiting_for_approval",
)
await conversations.add_message(
conversation.conversation_id,
"assistant",
"Waiting for approval.",
task_id=task.task_id,
status="waiting_for_approval",
)
yield sse(
"done",
{
"task_id": task.task_id,
"conversation_id": conversation.conversation_id,
"status": "waiting_for_approval",
"final_response": "Waiting for approval.",
"reasoning_content": None,
},
)
return
if tool_observations:
messages = [
*messages,
{
"role": "user",
"content": "tool_observations:\n"
+ json.dumps(tool_observations, ensure_ascii=False, indent=2),
},
]
await event_store.append(task.task_id, "model_call_started", {"role": "thinker"})
async for chunk in model_client.stream_chat("thinker", messages):
delta = str(chunk.get("delta") or "")
if chunk.get("type") == "reasoning_delta":
reasoning_parts.append(delta)
yield sse(
"reasoning_delta",
{"task_id": task.task_id, "delta": delta},
)
elif chunk.get("type") == "content_delta":
content_parts.append(delta)
yield sse(
"content_delta",
{"task_id": task.task_id, "delta": delta},
)
content = "".join(content_parts)
reasoning_content = "".join(reasoning_parts) or None
await event_store.append(
task.task_id,
"cognition_response",
{
"role": "thinker",
"content": content,
"reasoning_content": reasoning_content,
},
)
await event_store.append(
task.task_id,
"model_call_finished",
{
"role": "thinker",
"model": model_client.get_role_config("thinker").model,
},
)
await task_store.complete_task(task.task_id, content)
await conversations.add_message(
conversation.conversation_id,
"user",
body.message,
task_id=task.task_id,
status="completed",
)
await conversations.add_message(
conversation.conversation_id,
"assistant",
content,
reasoning_content=reasoning_content,
task_id=task.task_id,
status="completed",
)
await event_store.append(
task.task_id,
"task_completed",
{
"final_response": content,
"reasoning_content": reasoning_content,
},
)
yield sse(
"done",
{
"task_id": task.task_id,
"conversation_id": conversation.conversation_id,
"status": "completed",
"final_response": content,
"reasoning_content": reasoning_content,
},
)
except asyncio.CancelledError:
await task_store.cancel_task(task.task_id)
await event_store.append(
task.task_id, "task_cancelled", {"reason": "client_disconnected"}
)
raise
except Exception as exc:
await task_store.fail_task(task.task_id, str(exc))
await event_store.append(task.task_id, "task_failed", {"error": str(exc)})
yield sse(
"error",
{
"task_id": task.task_id,
"status": "failed",
"error": str(exc),
},
)
return StreamingResponse(generator(), media_type="text/event-stream")
@app.post("/v1/tasks")
async def create_task(body: ChatRequest) -> dict[str, Any]:
task = await task_store.create_task(body.message, body.workspace or settings.workspace, body.debug)
await event_store.append(task.task_id, "task_created", body.model_dump())
return task.model_dump()
@app.get("/v1/tasks")
async def list_tasks() -> list[dict[str, Any]]:
return [task.model_dump() for task in await task_store.list_tasks()]
@app.get("/v1/tasks/{task_id}")
async def get_task(task_id: str) -> dict[str, Any]:
task = await task_store.get_task(task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return task.model_dump()
@app.get("/v1/tasks/{task_id}/events")
async def get_events(task_id: str) -> list[dict[str, Any]]:
return [event.model_dump() for event in await event_store.list_events(task_id)]
@app.get("/v1/audit/commands")
async def command_audit(limit: int = 100) -> list[dict[str, Any]]:
bounded_limit = min(max(limit, 1), 500)
return [
event.model_dump()
for event in await event_store.list_by_type("command_audit", bounded_limit)
]
@app.get("/v1/tasks/{task_id}/stream")
async def stream_events(task_id: str) -> StreamingResponse:
async def generator():
sent = 0
for _ in range(30):
events = await event_store.list_events(task_id)
for event in events[sent:]:
yield f"data: {json.dumps(event.model_dump())}\n\n"
sent = len(events)
await asyncio.sleep(1)
return StreamingResponse(generator(), media_type="text/event-stream")
@app.post("/v1/tasks/{task_id}/continue")
async def continue_task(task_id: str) -> dict[str, str]:
task = await task_store.get_task(task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
await task_store.update_status(task_id, "running")
await event_store.append(task_id, "task_continued", {})
return {"status": "running"}
@app.post("/v1/tasks/{task_id}/continue/stream")
async def continue_task_stream(task_id: str, body: ContinueRequest) -> StreamingResponse:
task = await task_store.get_task(task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
approval = await approvals.get(body.approval_id)
if approval is None or approval.task_id != task_id:
raise HTTPException(status_code=404, detail="Approval not found for task")
if approval.decision is None:
raise HTTPException(status_code=409, detail="Approval is still pending")
conversation_id = await conversations.get_conversation_id_for_task(task_id)
async def generator():
reasoning_parts: list[str] = []
content_parts: list[str] = []
try:
await task_store.update_status(task_id, "running")
continued_event = await event_store.append(
task_id,
"task_continued",
{"approval_id": body.approval_id, "decision": approval.decision},
)
tool_observation = await runtime._run_approved_or_denied_action(
task_id, approval.normalized_action, approval.decision
)
messages = runtime.context_builder.build_basic_messages(task)
tool_observations = [tool_observation]
if approval.decision != "deny" and not has_password_request(tool_observations):
tool_observations = await runtime._run_action_loop(
task_id,
messages,
task.workspace,
initial_observations=tool_observations,
)
async for tool_event in emit_tool_events(task_id, continued_event.sequence):
yield tool_event
if has_password_request(tool_observations):
await task_store.update_status(task_id, "waiting_for_password")
password_event = await event_store.append(
task_id,
"tool_password_requested",
{
"approval_id": body.approval_id,
"tool": approval.normalized_action.get("tool"),
"action": redacted_action(approval.normalized_action),
"reason": "Sudo password is required to continue.",
},
)
yield sse("tool_password_requested", password_event.model_dump())
yield sse(
"done",
{
"task_id": task_id,
"conversation_id": conversation_id,
"status": "waiting_for_password",
"final_response": "Waiting for sudo password.",
"reasoning_content": None,
},
)
return
if any(observation.get("requires_approval") for observation in tool_observations):
await task_store.waiting_for_approval(task_id)
await event_store.append(
task_id,
"task_waiting_for_approval",
{"observations": tool_observations},
)
if conversation_id:
await conversations.add_message(
conversation_id,
"assistant",
"Waiting for approval.",
task_id=task_id,
status="waiting_for_approval",
)
yield sse(
"done",
{
"task_id": task_id,
"conversation_id": conversation_id,
"status": "waiting_for_approval",
"final_response": "Waiting for approval.",
"reasoning_content": None,
},
)
return
messages = [
*messages,
{
"role": "user",
"content": "tool_observations:\n"
+ json.dumps(tool_observations, ensure_ascii=False, indent=2),
},
]
await event_store.append(task_id, "model_call_started", {"role": "thinker"})
async for chunk in model_client.stream_chat("thinker", messages):
delta = str(chunk.get("delta") or "")
if chunk.get("type") == "reasoning_delta":
reasoning_parts.append(delta)
yield sse("reasoning_delta", {"task_id": task_id, "delta": delta})
elif chunk.get("type") == "content_delta":
content_parts.append(delta)
yield sse("content_delta", {"task_id": task_id, "delta": delta})
content = "".join(content_parts)
reasoning_content = "".join(reasoning_parts) or None
await event_store.append(
task_id,
"cognition_response",
{
"role": "thinker",
"content": content,
"reasoning_content": reasoning_content,
},
)
await event_store.append(
task_id,
"model_call_finished",
{
"role": "thinker",
"model": model_client.get_role_config("thinker").model,
},
)
await task_store.complete_task(task_id, content)
if conversation_id:
await conversations.add_message(
conversation_id,
"assistant",
content,
reasoning_content=reasoning_content,
task_id=task_id,
status="completed",
)
await event_store.append(
task_id,
"task_completed",
{
"final_response": content,
"reasoning_content": reasoning_content,
},
)
yield sse(
"done",
{
"task_id": task_id,
"conversation_id": conversation_id,
"status": "completed",
"final_response": content,
"reasoning_content": reasoning_content,
},
)
except asyncio.CancelledError:
await task_store.cancel_task(task_id)
await event_store.append(
task_id, "task_cancelled", {"reason": "client_disconnected"}
)
raise
except Exception as exc:
await task_store.fail_task(task_id, str(exc))
await event_store.append(task_id, "task_failed", {"error": str(exc)})
yield sse(
"error",
{
"task_id": task_id,
"status": "failed",
"error": str(exc),
},
)
return StreamingResponse(generator(), media_type="text/event-stream")
def has_password_request(observations: list[dict[str, Any]]) -> bool:
return any(
observation.get("result", {}).get("metadata", {}).get("requires_password")
for observation in observations
)
def redacted_action(action: dict[str, Any]) -> dict[str, Any]:
redacted = json.loads(json.dumps(action))
args = redacted.get("args")
if isinstance(args, dict):
args.pop("_password", None)
return redacted
@app.post("/v1/tasks/{task_id}/password/stream")
async def password_task_stream(task_id: str, body: PasswordRequest) -> StreamingResponse:
task = await task_store.get_task(task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
approval = await approvals.get(body.approval_id)
if approval is None or approval.task_id != task_id:
raise HTTPException(status_code=404, detail="Approval not found for task")
if approval.decision is None:
raise HTTPException(status_code=409, detail="Approval is still pending")
conversation_id = await conversations.get_conversation_id_for_task(task_id)
async def generator():
reasoning_parts: list[str] = []
content_parts: list[str] = []
try:
await task_store.update_status(task_id, "running")
continued_event = await event_store.append(
task_id,
"task_password_submitted",
{"approval_id": body.approval_id},
)
tool_observation = await runtime._run_approved_or_denied_action(
task_id,
approval.normalized_action,
approval.decision,
password=body.password,
)
messages = runtime.context_builder.build_basic_messages(task)
tool_observations = [tool_observation]
if not has_password_request(tool_observations):
tool_observations = await runtime._run_action_loop(
task_id,
messages,
task.workspace,
initial_observations=tool_observations,
)
async for tool_event in emit_tool_events(task_id, continued_event.sequence):
yield tool_event
if has_password_request(tool_observations):
await task_store.update_status(task_id, "waiting_for_password")
password_event = await event_store.append(
task_id,
"tool_password_requested",
{
"approval_id": body.approval_id,
"tool": approval.normalized_action.get("tool"),
"action": redacted_action(approval.normalized_action),
"reason": "Sudo password is required to continue.",
},
)
yield sse("tool_password_requested", password_event.model_dump())
yield sse(
"done",
{
"task_id": task_id,
"conversation_id": conversation_id,
"status": "waiting_for_password",
"final_response": "Waiting for sudo password.",
"reasoning_content": None,
},
)
return
messages = [
*messages,
{
"role": "user",
"content": "tool_observations:\n"
+ json.dumps(tool_observations, ensure_ascii=False, indent=2),
},
]
await event_store.append(task_id, "model_call_started", {"role": "thinker"})
async for chunk in model_client.stream_chat("thinker", messages):
delta = str(chunk.get("delta") or "")
if chunk.get("type") == "reasoning_delta":
reasoning_parts.append(delta)
yield sse("reasoning_delta", {"task_id": task_id, "delta": delta})
elif chunk.get("type") == "content_delta":
content_parts.append(delta)
yield sse("content_delta", {"task_id": task_id, "delta": delta})
content = "".join(content_parts)
reasoning_content = "".join(reasoning_parts) or None
await event_store.append(
task_id,
"cognition_response",
{
"role": "thinker",
"content": content,
"reasoning_content": reasoning_content,
},
)
await event_store.append(
task_id,
"model_call_finished",
{
"role": "thinker",
"model": model_client.get_role_config("thinker").model,
},
)
await task_store.complete_task(task_id, content)
if conversation_id:
await conversations.add_message(
conversation_id,
"assistant",
content,
reasoning_content=reasoning_content,
task_id=task_id,
status="completed",
)
await event_store.append(
task_id,
"task_completed",
{
"final_response": content,
"reasoning_content": reasoning_content,
},
)
yield sse(
"done",
{
"task_id": task_id,
"conversation_id": conversation_id,
"status": "completed",
"final_response": content,
"reasoning_content": reasoning_content,
},
)
except asyncio.CancelledError:
await task_store.cancel_task(task_id)
await event_store.append(
task_id, "task_cancelled", {"reason": "client_disconnected"}
)
raise
except Exception as exc:
await task_store.fail_task(task_id, str(exc))
await event_store.append(task_id, "task_failed", {"error": str(exc)})
yield sse(
"error",
{
"task_id": task_id,
"status": "failed",
"error": str(exc),
},
)
return StreamingResponse(generator(), media_type="text/event-stream")
@app.post("/v1/tasks/{task_id}/cancel")
async def cancel_task(task_id: str) -> dict[str, str]:
await task_store.cancel_task(task_id)
await event_store.append(task_id, "task_cancelled", {})
return {"status": "cancelled"}
@app.get("/v1/approvals/pending")
async def pending_approvals() -> list[dict[str, Any]]:
return [approval.model_dump() for approval in await approvals.pending()]
@app.post("/v1/approvals/{approval_id}/allow_once")
async def allow_once(approval_id: str) -> dict[str, str]:
await approvals.allow_once(approval_id)
return {"status": "allowed_once"}
@app.post("/v1/approvals/{approval_id}/allow_forever")
async def allow_forever(approval_id: str) -> dict[str, str]:
await approvals.allow_forever(approval_id)
return {"status": "allowed_forever"}
@app.post("/v1/approvals/{approval_id}/deny")
async def deny(approval_id: str) -> dict[str, str]:
await approvals.deny(approval_id)
return {"status": "denied"}
@app.get("/v1/skills")
async def list_skills() -> list[dict[str, Any]]:
return [skill.model_dump() for skill in skills.load_skills()]
@app.get("/v1/skills/{skill_id}")
async def get_skill(skill_id: str) -> dict[str, Any]:
skill = skills.get_skill(skill_id)
if skill is None:
raise HTTPException(status_code=404, detail="Skill not found")
return skill.model_dump()
@app.get("/v1/experience")
async def list_experience() -> list[dict[str, Any]]:
return [record.model_dump() for record in await experience.list_records()]
@app.get("/v1/experience/{record_id}")
async def get_experience(record_id: int) -> dict[str, Any]:
record = await experience.get_record(record_id)
if record is None:
raise HTTPException(status_code=404, detail="Experience record not found")
return record.model_dump()
@app.post("/v1/memory")
async def add_memory(body: MemoryRequest) -> dict[str, Any]:
if not body.text.strip():
raise HTTPException(status_code=400, detail="Memory text is required")
record = await memory_store.add(
text=body.text,
workspace=body.workspace or settings.workspace,
scope=body.scope
or memory_store.infer_scope(
body.text, body.workspace or settings.workspace, body.conversation_id
),
conversation_id=body.conversation_id,
memory_type=body.memory_type,
importance=body.importance,
metadata=body.metadata,
)
return record.model_dump()
@app.get("/v1/memory")
async def list_memory(
workspace: str | None = None, limit: int = 50
) -> dict[str, Any]:
records = await memory_store.list(workspace=workspace, limit=limit)
return {"results": [record.model_dump() for record in records]}
@app.get("/v1/memory/search")
async def search_memory(
q: str, workspace: str | None = None, limit: int = 20
) -> dict[str, Any]:
local_results = await memory_store.search(q, workspace=workspace, limit=limit)
if local_results:
return {"results": [record.model_dump() for record in local_results]}
try:
return {"results": await memory.search_memory(q)}
except EmbeddingsUnavailableError as exc:
return {"results": [], "warning": str(exc)}
return app
app = create_app()
if __name__ == "__main__":
settings = get_settings()
uvicorn.run("duck_core.api:app", host=settings.api_host, port=settings.api_port, reload=False)