import asyncio import json import logging from pathlib import Path from typing import Any import uvicorn from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel from duck_core.approvals.service import ApprovalService from duck_core.config import get_settings from duck_core.events.store import EventStore from duck_core.experience.recorder import ExperienceRecorder from duck_core.memory.vector_memory import EmbeddingsUnavailableError, VectorMemory from duck_core.model_client import ModelClient from duck_core.runtime_loop import RuntimeLoop from duck_core.skills.registry import SkillRegistry from duck_core.tasks.store import TaskStore logger = logging.getLogger(__name__) class ChatRequest(BaseModel): message: str workspace: str | None = None debug: bool = False def create_app() -> FastAPI: settings = get_settings() if settings.api_host == "0.0.0.0": logger.warning( "DuckLM API is listening on 0.0.0.0. This may expose local tool execution endpoints." ) Path(settings.workspace).mkdir(parents=True, exist_ok=True) Path(settings.db_path).parent.mkdir(parents=True, exist_ok=True) app = FastAPI(title="DuckLM", version="0.1.0") templates = Jinja2Templates(directory="duck_core/web/templates") app.mount("/static", StaticFiles(directory="duck_core/web/static"), name="static") task_store = TaskStore(settings.db_path) event_store = EventStore(settings.db_path) model_client = ModelClient() approvals = ApprovalService(settings.db_path) runtime = RuntimeLoop(task_store, event_store, model_client, approval_service=approvals) skills = SkillRegistry("skills") experience = ExperienceRecorder(settings.db_path) memory = VectorMemory(settings.qdrant_url, embeddings_base_url=None) @app.on_event("startup") async def startup() -> None: await task_store.init() await event_store.init() await approvals.init() await experience.init() @app.get("/", response_class=HTMLResponse) async def index(request: Request) -> HTMLResponse: return templates.TemplateResponse(request, "index.html") @app.get("/approvals", response_class=HTMLResponse) async def approvals_page(request: Request) -> HTMLResponse: return templates.TemplateResponse(request, "approvals.html") @app.get("/skills", response_class=HTMLResponse) async def skills_page(request: Request) -> HTMLResponse: return templates.TemplateResponse(request, "skills.html") @app.get("/memory", response_class=HTMLResponse) async def memory_page(request: Request) -> HTMLResponse: return templates.TemplateResponse(request, "memory.html") @app.get("/experience", response_class=HTMLResponse) async def experience_page(request: Request) -> HTMLResponse: return templates.TemplateResponse(request, "experience.html") @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok"} @app.get("/v1/status") async def status() -> dict[str, Any]: return { "name": "DuckLM", "version": "0.1.0", "api_host": settings.api_host, "api_port": settings.api_port, "workspace": settings.workspace, "db_path": settings.db_path, } @app.get("/v1/models/roles") async def roles() -> dict[str, Any]: return model_client.list_roles() @app.get("/v1/models/ping") async def models_ping() -> dict[str, Any]: return await model_client.ping() @app.post("/v1/chat") async def chat(body: ChatRequest) -> dict[str, Any]: result = await runtime.run_chat(body.message, body.workspace or settings.workspace, body.debug) return result.__dict__ def sse(event: str, payload: dict[str, Any]) -> str: return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n" async def emit_tool_events(task_id: str, after_sequence: int): events = await event_store.list_events(task_id) visible_types = { "tool_call_started", "tool_call_finished", "tool_approval_requested", } for event in events: if event.sequence > after_sequence and event.event_type in visible_types: yield sse(event.event_type, event.model_dump()) @app.post("/v1/chat/stream") async def chat_stream(body: ChatRequest) -> StreamingResponse: async def generator(): task = await task_store.create_task( body.message, body.workspace or settings.workspace, body.debug ) task_event = await event_store.append( task.task_id, "task_created", { "message": body.message, "workspace": body.workspace or settings.workspace, "debug": body.debug, }, ) yield sse("task_created", task_event.model_dump()) reasoning_parts: list[str] = [] content_parts: list[str] = [] try: messages = runtime.context_builder.build_basic_messages(task) tool_observations = await runtime._run_action_tools( task.task_id, messages, body.workspace or settings.workspace ) async for tool_event in emit_tool_events(task.task_id, task_event.sequence): yield tool_event if any(observation.get("requires_approval") for observation in tool_observations): await task_store.waiting_for_approval(task.task_id) await event_store.append( task.task_id, "task_waiting_for_approval", {"observations": tool_observations}, ) yield sse( "done", { "task_id": task.task_id, "status": "waiting_for_approval", "final_response": "Waiting for approval.", "reasoning_content": None, }, ) return if tool_observations: messages = [ *messages, { "role": "user", "content": "tool_observations:\n" + json.dumps(tool_observations, ensure_ascii=False, indent=2), }, ] await event_store.append(task.task_id, "model_call_started", {"role": "thinker"}) async for chunk in model_client.stream_chat("thinker", messages): delta = str(chunk.get("delta") or "") if chunk.get("type") == "reasoning_delta": reasoning_parts.append(delta) yield sse( "reasoning_delta", {"task_id": task.task_id, "delta": delta}, ) elif chunk.get("type") == "content_delta": content_parts.append(delta) yield sse( "content_delta", {"task_id": task.task_id, "delta": delta}, ) content = "".join(content_parts) reasoning_content = "".join(reasoning_parts) or None await event_store.append( task.task_id, "cognition_response", { "role": "thinker", "content": content, "reasoning_content": reasoning_content, }, ) await event_store.append( task.task_id, "model_call_finished", { "role": "thinker", "model": model_client.get_role_config("thinker").model, }, ) await task_store.complete_task(task.task_id, content) await event_store.append( task.task_id, "task_completed", { "final_response": content, "reasoning_content": reasoning_content, }, ) yield sse( "done", { "task_id": task.task_id, "status": "completed", "final_response": content, "reasoning_content": reasoning_content, }, ) except Exception as exc: await task_store.fail_task(task.task_id, str(exc)) await event_store.append(task.task_id, "task_failed", {"error": str(exc)}) yield sse( "error", { "task_id": task.task_id, "status": "failed", "error": str(exc), }, ) return StreamingResponse(generator(), media_type="text/event-stream") @app.post("/v1/tasks") async def create_task(body: ChatRequest) -> dict[str, Any]: task = await task_store.create_task(body.message, body.workspace or settings.workspace, body.debug) await event_store.append(task.task_id, "task_created", body.model_dump()) return task.model_dump() @app.get("/v1/tasks") async def list_tasks() -> list[dict[str, Any]]: return [task.model_dump() for task in await task_store.list_tasks()] @app.get("/v1/tasks/{task_id}") async def get_task(task_id: str) -> dict[str, Any]: task = await task_store.get_task(task_id) if task is None: raise HTTPException(status_code=404, detail="Task not found") return task.model_dump() @app.get("/v1/tasks/{task_id}/events") async def get_events(task_id: str) -> list[dict[str, Any]]: return [event.model_dump() for event in await event_store.list_events(task_id)] @app.get("/v1/tasks/{task_id}/stream") async def stream_events(task_id: str) -> StreamingResponse: async def generator(): sent = 0 for _ in range(30): events = await event_store.list_events(task_id) for event in events[sent:]: yield f"data: {json.dumps(event.model_dump())}\n\n" sent = len(events) await asyncio.sleep(1) return StreamingResponse(generator(), media_type="text/event-stream") @app.post("/v1/tasks/{task_id}/continue") async def continue_task(task_id: str) -> dict[str, str]: task = await task_store.get_task(task_id) if task is None: raise HTTPException(status_code=404, detail="Task not found") await task_store.update_status(task_id, "running") await event_store.append(task_id, "task_continued", {}) return {"status": "running"} @app.post("/v1/tasks/{task_id}/cancel") async def cancel_task(task_id: str) -> dict[str, str]: await task_store.cancel_task(task_id) await event_store.append(task_id, "task_cancelled", {}) return {"status": "cancelled"} @app.get("/v1/approvals/pending") async def pending_approvals() -> list[dict[str, Any]]: return [approval.model_dump() for approval in await approvals.pending()] @app.post("/v1/approvals/{approval_id}/allow_once") async def allow_once(approval_id: str) -> dict[str, str]: await approvals.allow_once(approval_id) return {"status": "allowed_once"} @app.post("/v1/approvals/{approval_id}/allow_forever") async def allow_forever(approval_id: str) -> dict[str, str]: await approvals.allow_forever(approval_id) return {"status": "allowed_forever"} @app.post("/v1/approvals/{approval_id}/deny") async def deny(approval_id: str) -> dict[str, str]: await approvals.deny(approval_id) return {"status": "denied"} @app.get("/v1/skills") async def list_skills() -> list[dict[str, Any]]: return [skill.model_dump() for skill in skills.load_skills()] @app.get("/v1/skills/{skill_id}") async def get_skill(skill_id: str) -> dict[str, Any]: skill = skills.get_skill(skill_id) if skill is None: raise HTTPException(status_code=404, detail="Skill not found") return skill.model_dump() @app.get("/v1/experience") async def list_experience() -> list[dict[str, Any]]: return [record.model_dump() for record in await experience.list_records()] @app.get("/v1/experience/{record_id}") async def get_experience(record_id: int) -> dict[str, Any]: record = await experience.get_record(record_id) if record is None: raise HTTPException(status_code=404, detail="Experience record not found") return record.model_dump() @app.get("/v1/memory/search") async def search_memory(q: str) -> dict[str, Any]: try: return {"results": await memory.search_memory(q)} except EmbeddingsUnavailableError as exc: return {"results": [], "warning": str(exc)} return app app = create_app() if __name__ == "__main__": settings = get_settings() uvicorn.run("duck_core.api:app", host=settings.api_host, port=settings.api_port, reload=False)