ducklm/duck_core/runtime_loop.py

459 lines
17 KiB
Python

import json
from dataclasses import dataclass
from typing import Any
from duck_core.approvals.service import ApprovalService
from duck_core.context_builder import ContextBuilder
from duck_core.events.store import EventStore
from duck_core.model_client import ModelClient
from duck_core.tasks.store import TaskStore
from duck_core.tools.gateway import ToolGateway
from duck_core.tools.base import ToolResult
@dataclass
class ChatResult:
task_id: str
status: str
final_response: str
reasoning_content: str | None = None
class RuntimeLoop:
def __init__(
self,
task_store: TaskStore,
event_store: EventStore,
model_client: ModelClient | None = None,
context_builder: ContextBuilder | None = None,
approval_service: ApprovalService | None = None,
max_tool_iterations: int = 4,
):
self.task_store = task_store
self.event_store = event_store
self.model_client = model_client or ModelClient()
self.context_builder = context_builder or ContextBuilder()
self.approval_service = approval_service
self.max_tool_iterations = max_tool_iterations
async def run_chat(
self,
message: str,
workspace: str | None = None,
debug: bool = False,
history_messages: list[dict[str, str]] | None = None,
) -> ChatResult:
task = await self.task_store.create_task(message, workspace, debug)
await self.event_store.append(
task.task_id,
"task_created",
{"message": message, "workspace": workspace, "debug": debug},
)
try:
messages = self.context_builder.build_basic_messages(task, history_messages)
tool_observations = await self._run_action_loop(task.task_id, messages, workspace)
if any(observation.get("requires_approval") for observation in tool_observations):
await self.task_store.waiting_for_approval(task.task_id)
await self.event_store.append(
task.task_id,
"task_waiting_for_approval",
{"observations": tool_observations},
)
return ChatResult(
task_id=task.task_id,
status="waiting_for_approval",
final_response="Waiting for approval.",
reasoning_content=None,
)
if tool_observations:
messages = [
*messages,
{
"role": "user",
"content": "tool_observations:\n"
+ json.dumps(tool_observations, ensure_ascii=False, indent=2),
},
]
await self.event_store.append(
task.task_id, "model_call_started", {"role": "thinker"}
)
response = await self.model_client.chat("thinker", messages)
await self.event_store.append(
task.task_id,
"cognition_response",
{
"role": response.role,
"content": response.content,
"reasoning_content": response.reasoning_content,
},
)
await self.event_store.append(
task.task_id,
"model_call_finished",
{
"role": response.role,
"model": response.model,
"latency_ms": response.latency_ms,
"prompt_tokens": response.prompt_tokens,
"completion_tokens": response.completion_tokens,
"total_tokens": response.total_tokens,
},
)
await self.task_store.complete_task(task.task_id, response.content)
await self.event_store.append(
task.task_id,
"task_completed",
{
"final_response": response.content,
"reasoning_content": response.reasoning_content,
},
)
return ChatResult(
task_id=task.task_id,
status="completed",
final_response=response.content,
reasoning_content=response.reasoning_content,
)
except Exception as exc:
await self.task_store.fail_task(task.task_id, str(exc))
await self.event_store.append(
task.task_id, "task_failed", {"error": str(exc)}
)
return ChatResult(
task_id=task.task_id,
status="failed",
final_response=str(exc),
reasoning_content=None,
)
async def continue_after_approval(self, task_id: str, approval_id: str) -> ChatResult:
if self.approval_service is None:
return ChatResult(
task_id=task_id,
status="failed",
final_response="Approval service is not configured.",
reasoning_content=None,
)
task = await self.task_store.get_task(task_id)
approval = await self.approval_service.get(approval_id)
if task is None:
return ChatResult(
task_id=task_id,
status="failed",
final_response="Task not found.",
reasoning_content=None,
)
if approval is None or approval.task_id != task_id:
return ChatResult(
task_id=task_id,
status="failed",
final_response="Approval not found for task.",
reasoning_content=None,
)
if approval.decision is None:
return ChatResult(
task_id=task_id,
status="waiting_for_approval",
final_response="Waiting for approval.",
reasoning_content=None,
)
await self.task_store.update_status(task_id, "running")
await self.event_store.append(
task_id,
"task_continued",
{"approval_id": approval_id, "decision": approval.decision},
)
try:
tool_observation = await self._run_approved_or_denied_action(
task_id, approval.normalized_action, approval.decision
)
messages = self.context_builder.build_basic_messages(task)
tool_observations = [tool_observation]
if approval.decision != "deny":
tool_observations = await self._run_action_loop(
task_id,
messages,
task.workspace,
initial_observations=tool_observations,
)
if any(observation.get("requires_approval") for observation in tool_observations):
await self.task_store.waiting_for_approval(task_id)
await self.event_store.append(
task_id,
"task_waiting_for_approval",
{"observations": tool_observations},
)
return ChatResult(
task_id=task_id,
status="waiting_for_approval",
final_response="Waiting for approval.",
reasoning_content=None,
)
messages = [
*messages,
{
"role": "user",
"content": "tool_observations:\n"
+ json.dumps(tool_observations, ensure_ascii=False, indent=2),
},
]
await self.event_store.append(task_id, "model_call_started", {"role": "thinker"})
response = await self.model_client.chat("thinker", messages)
await self.event_store.append(
task_id,
"cognition_response",
{
"role": response.role,
"content": response.content,
"reasoning_content": response.reasoning_content,
},
)
await self.event_store.append(
task_id,
"model_call_finished",
{
"role": response.role,
"model": response.model,
"latency_ms": response.latency_ms,
"prompt_tokens": response.prompt_tokens,
"completion_tokens": response.completion_tokens,
"total_tokens": response.total_tokens,
},
)
await self.task_store.complete_task(task_id, response.content)
await self.event_store.append(
task_id,
"task_completed",
{
"final_response": response.content,
"reasoning_content": response.reasoning_content,
},
)
return ChatResult(
task_id=task_id,
status="completed",
final_response=response.content,
reasoning_content=response.reasoning_content,
)
except Exception as exc:
await self.task_store.fail_task(task_id, str(exc))
await self.event_store.append(task_id, "task_failed", {"error": str(exc)})
return ChatResult(
task_id=task_id,
status="failed",
final_response=str(exc),
reasoning_content=None,
)
async def _run_approved_or_denied_action(
self,
task_id: str,
action: dict[str, Any],
decision: str,
password: str | None = None,
) -> dict[str, Any]:
tool_name = str(action.get("tool", ""))
index = await self._approval_action_index(task_id, action)
if decision == "deny":
result = ToolResult(
ok=False,
error="Tool action denied by user.",
metadata={"decision": "deny"},
)
else:
gateway = ToolGateway.default((await self.task_store.get_task(task_id)).workspace or ".")
result = await gateway.run_action(action, approved=True, password=password)
result_payload = result.model_dump()
await self._append_command_audit(
task_id, index, tool_name, action, result_payload, approved=decision != "deny"
)
await self.event_store.append(
task_id,
"tool_call_finished",
{"index": index, "tool": tool_name, "result": result_payload},
)
return {
"index": index,
"tool": tool_name,
"reason": action.get("reason"),
"decision": decision,
"result": result_payload,
}
async def _approval_action_index(self, task_id: str, action: dict[str, Any]) -> int:
events = await self.event_store.list_events(task_id)
for event in reversed(events):
if (
event.event_type == "tool_approval_requested"
and event.payload.get("action") == action
):
return int(event.payload.get("index") or 1)
return 1
async def _run_action_tools(
self,
task_id: str,
messages: list[dict[str, str]],
workspace: str | None,
start_index: int = 1,
) -> list[dict[str, Any]]:
try:
await self.event_store.append(task_id, "model_call_started", {"role": "action"})
response = await self.model_client.chat("action", messages)
directive = json.loads(response.content)
except Exception as exc:
await self.event_store.append(
task_id,
"action_directive_failed",
{"error": str(exc)},
)
return []
await self.event_store.append(task_id, "action_directive", directive)
actions = directive.get("actions") or []
if not isinstance(actions, list) or not actions:
return []
gateway = ToolGateway.default(workspace or ".")
observations: list[dict[str, Any]] = []
for index, action in enumerate(actions, start=start_index):
if not isinstance(action, dict):
observations.append(
{"index": index, "ok": False, "error": "Action must be an object"}
)
continue
tool_name = str(action.get("tool", ""))
await self.event_store.append(
task_id,
"tool_call_started",
{"index": index, "tool": tool_name, "args": action.get("args") or {}},
)
approved_forever = (
self.approval_service is not None
and await self.approval_service.is_allowed_forever(action)
)
result = await gateway.run_action(action, approved=approved_forever)
result_payload = result.model_dump()
await self._append_command_audit(
task_id, index, tool_name, action, result_payload, approved=approved_forever
)
if result.metadata.get("requires_approval"):
approval = None
if self.approval_service is not None:
approval = await self.approval_service.create_pending(task_id, action)
await self.event_store.append(
task_id,
"tool_approval_requested",
{
"index": index,
"tool": tool_name,
"action": action,
"approval_id": approval.approval_id if approval else None,
"reason": result.error,
},
)
observations.append(
{
"index": index,
"tool": tool_name,
"reason": action.get("reason"),
"requires_approval": True,
"approval_id": approval.approval_id if approval else None,
"result": result_payload,
}
)
break
await self.event_store.append(
task_id,
"tool_call_finished",
{"index": index, "tool": tool_name, "result": result_payload},
)
observations.append(
{
"index": index,
"tool": tool_name,
"reason": action.get("reason"),
"result": result_payload,
}
)
return observations
async def _append_command_audit(
self,
task_id: str,
index: int,
tool_name: str,
action: dict[str, Any],
result_payload: dict[str, Any],
approved: bool,
) -> None:
if tool_name != "shell_exec_safe":
return
metadata = result_payload.get("metadata") or {}
command = metadata.get("command") or (action.get("args") or {}).get("command")
await self.event_store.append(
task_id,
"command_audit",
{
"index": index,
"tool": tool_name,
"command": command,
"action_type": metadata.get("action_type"),
"risk_level": metadata.get("risk_level"),
"requires_approval": bool(metadata.get("requires_approval")),
"blocked": bool(metadata.get("blocked")),
"approved": approved,
"ok": bool(result_payload.get("ok")),
"returncode": metadata.get("returncode"),
"reason": metadata.get("reason") or result_payload.get("error"),
},
)
async def _run_action_loop(
self,
task_id: str,
messages: list[dict[str, str]],
workspace: str | None,
initial_observations: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
all_observations = list(initial_observations or [])
current_messages = messages
if all_observations:
current_messages = [
*messages,
{
"role": "user",
"content": "tool_observations:\n"
+ json.dumps(all_observations, ensure_ascii=False, indent=2),
},
]
for _ in range(self.max_tool_iterations):
observations = await self._run_action_tools(
task_id,
current_messages,
workspace,
start_index=len(all_observations) + 1,
)
if not observations:
return all_observations
all_observations.extend(observations)
if any(observation.get("requires_approval") for observation in observations):
return all_observations
current_messages = [
*messages,
{
"role": "user",
"content": "tool_observations:\n"
+ json.dumps(all_observations, ensure_ascii=False, indent=2),
},
]
await self.event_store.append(
task_id,
"tool_iteration_limit_reached",
{"max_tool_iterations": self.max_tool_iterations},
)
return all_observations