import json import logging from dataclasses import dataclass from typing import Any from duck_core.approvals.service import ApprovalService from duck_core.context_builder import ContextBuilder from duck_core.events.store import EventStore from duck_core.experience.recorder import ExperienceRecorder from duck_core.memory.policy import MemoryPolicy from duck_core.memory.store import MemoryStore from duck_core.memory.vector_memory import VectorMemory from duck_core.model_client import ModelClient, ReasoningMode from duck_core.reflection import Reflection from duck_core.tasks.store import TaskStore from duck_core.tools.base import ToolResult from duck_core.tools.gateway import ToolGateway logger = logging.getLogger(__name__) @dataclass class ChatResult: task_id: str status: str final_response: str reasoning_content: str | None = None class RuntimeLoop: def __init__( self, task_store: TaskStore, event_store: EventStore, model_client: ModelClient | None = None, context_builder: ContextBuilder | None = None, approval_service: ApprovalService | None = None, memory_policy: MemoryPolicy | None = None, memory_store: MemoryStore | None = None, vector_memory: VectorMemory | None = None, experience_recorder: ExperienceRecorder | None = None, max_tool_iterations: int = 4, ): self.task_store = task_store self.event_store = event_store self.model_client = model_client or ModelClient() self.context_builder = context_builder or ContextBuilder( model_client=self.model_client ) self.approval_service = approval_service self.memory_policy = memory_policy or MemoryPolicy(model_client=self.model_client) self.memory_store = memory_store self.vector_memory = vector_memory self.experience_recorder = experience_recorder self.max_tool_iterations = max_tool_iterations async def run_chat( self, message: str, workspace: str | None = None, debug: bool = False, history_messages: list[dict[str, str]] | None = None, memory_records: list[dict[str, str]] | None = None, skill_summary: str | None = None, reflect: bool = True, reasoning: ReasoningMode | None = None, ) -> ChatResult: task = await self.task_store.create_task(message, workspace, debug) await self.event_store.append( task.task_id, "task_created", {"message": message, "workspace": workspace, "debug": debug}, ) try: messages = await self.context_builder.build_async_messages( task, history_messages, memory_records, skill_summary=skill_summary ) tool_observations = await self._run_action_loop(task.task_id, messages, workspace) if any(observation.get("requires_approval") for observation in tool_observations): await self.task_store.waiting_for_approval(task.task_id) await self.event_store.append( task.task_id, "task_waiting_for_approval", {"observations": tool_observations}, ) return ChatResult( task_id=task.task_id, status="waiting_for_approval", final_response="Waiting for approval.", reasoning_content=None, ) if tool_observations: messages = [ *messages, { "role": "user", "content": "tool_observations:\n" + json.dumps(tool_observations, ensure_ascii=False, indent=2), }, ] await self.event_store.append( task.task_id, "model_call_started", {"role": "thinker"} ) if reasoning in {"on", "off"}: response = await self.model_client.chat("thinker", messages, reasoning=reasoning) else: response = await self.model_client.chat("thinker", messages) await self.event_store.append( task.task_id, "cognition_response", { "role": response.role, "content": response.content, "reasoning_content": response.reasoning_content, }, ) await self.event_store.append( task.task_id, "model_call_finished", { "role": response.role, "model": response.model, "latency_ms": response.latency_ms, "prompt_tokens": response.prompt_tokens, "completion_tokens": response.completion_tokens, "total_tokens": response.total_tokens, }, ) await self.task_store.complete_task(task.task_id, response.content) await self.event_store.append( task.task_id, "task_completed", { "final_response": response.content, "reasoning_content": response.reasoning_content, }, ) await self.complete_postprocessing(task.task_id, response.content, reflect=reflect) return ChatResult( task_id=task.task_id, status="completed", final_response=response.content, reasoning_content=response.reasoning_content, ) except Exception as exc: await self.task_store.fail_task(task.task_id, str(exc)) await self.event_store.append( task.task_id, "task_failed", {"error": str(exc)} ) return ChatResult( task_id=task.task_id, status="failed", final_response=str(exc), reasoning_content=None, ) async def complete_postprocessing( self, task_id: str, final_response: str, reflect: bool = True ) -> None: await self._run_memory_policy(task_id, final_response) if reflect: await self._run_reflection(task_id) async def continue_after_approval(self, task_id: str, approval_id: str) -> ChatResult: if self.approval_service is None: return ChatResult( task_id=task_id, status="failed", final_response="Approval service is not configured.", reasoning_content=None, ) task = await self.task_store.get_task(task_id) approval = await self.approval_service.get(approval_id) if task is None: return ChatResult( task_id=task_id, status="failed", final_response="Task not found.", reasoning_content=None, ) if approval is None or approval.task_id != task_id: return ChatResult( task_id=task_id, status="failed", final_response="Approval not found for task.", reasoning_content=None, ) if approval.decision is None: return ChatResult( task_id=task_id, status="waiting_for_approval", final_response="Waiting for approval.", reasoning_content=None, ) await self.task_store.update_status(task_id, "running") await self.event_store.append( task_id, "task_continued", {"approval_id": approval_id, "decision": approval.decision}, ) try: tool_observation = await self._run_approved_or_denied_action( task_id, approval.normalized_action, approval.decision ) messages = await self.context_builder.build_async_messages(task) tool_observations = [tool_observation] if approval.decision != "deny": tool_observations = await self._run_action_loop( task_id, messages, task.workspace, initial_observations=tool_observations, ) if any(observation.get("requires_approval") for observation in tool_observations): await self.task_store.waiting_for_approval(task_id) await self.event_store.append( task_id, "task_waiting_for_approval", {"observations": tool_observations}, ) return ChatResult( task_id=task_id, status="waiting_for_approval", final_response="Waiting for approval.", reasoning_content=None, ) messages = [ *messages, { "role": "user", "content": "tool_observations:\n" + json.dumps(tool_observations, ensure_ascii=False, indent=2), }, ] await self.event_store.append(task_id, "model_call_started", {"role": "thinker"}) response = await self.model_client.chat("thinker", messages) await self.event_store.append( task_id, "cognition_response", { "role": response.role, "content": response.content, "reasoning_content": response.reasoning_content, }, ) await self.event_store.append( task_id, "model_call_finished", { "role": response.role, "model": response.model, "latency_ms": response.latency_ms, "prompt_tokens": response.prompt_tokens, "completion_tokens": response.completion_tokens, "total_tokens": response.total_tokens, }, ) await self.task_store.complete_task(task_id, response.content) await self.event_store.append( task_id, "task_completed", { "final_response": response.content, "reasoning_content": response.reasoning_content, }, ) return ChatResult( task_id=task_id, status="completed", final_response=response.content, reasoning_content=response.reasoning_content, ) except Exception as exc: await self.task_store.fail_task(task_id, str(exc)) await self.event_store.append(task_id, "task_failed", {"error": str(exc)}) return ChatResult( task_id=task_id, status="failed", final_response=str(exc), reasoning_content=None, ) async def _run_memory_policy(self, task_id: str, final_response: str) -> None: """Classify task output and store in memory if policy says so.""" if self.memory_store is None: return try: decision = await self.memory_policy.classify(final_response, task_id) await self.event_store.append( task_id, "memory_policy_decision", decision.model_dump(), ) if decision.should_store: task = await self.task_store.get_task(task_id) memory_workspace = task.workspace if task and task.workspace else "" await self.memory_store.add( text=decision.summary, workspace=memory_workspace, scope=decision.scope, memory_type=decision.memory_type, importance=decision.importance, metadata=decision.metadata, ) if self.vector_memory is not None: try: await self.vector_memory.add_memory( text=decision.summary, metadata={"scope": decision.scope, "memory_type": decision.memory_type}, ) except Exception as vec_exc: logger.warning("Vector memory store failed for %s: %s", task_id, vec_exc) await self.event_store.append( task_id, "memory_stored", {"summary": decision.summary, "scope": decision.scope}, ) except Exception as exc: logger.warning("Memory policy failed for %s: %s", task_id, exc) await self.event_store.append( task_id, "memory_policy_failed", {"error": str(exc)}, ) async def _run_reflection(self, task_id: str) -> None: """Run critic reflection on completed task and record experience.""" if self.experience_recorder is None: return try: events = await self.event_store.list_events(task_id) transcript_lines = [] for event in events: line = f"[{event.event_type}] {json.dumps(event.payload, ensure_ascii=False)}" transcript_lines.append(line) transcript = "\n".join(transcript_lines) reflection = Reflection( model_client=self.model_client, recorder=self.experience_recorder, ) record = await reflection.reflect(task_id, transcript) await self.event_store.append( task_id, "reflection_completed", { "record_id": record.id, "summary": record.summary[:200], "reusable_lesson": record.reusable_lesson[:200] if record.reusable_lesson else None, }, ) except Exception as exc: logger.warning("Reflection failed for %s: %s", task_id, exc) await self.event_store.append( task_id, "reflection_failed", {"error": str(exc)}, ) async def _run_approved_or_denied_action( self, task_id: str, action: dict[str, Any], decision: str, password: str | None = None, ) -> dict[str, Any]: tool_name = str(action.get("tool", "")) index = await self._approval_action_index(task_id, action) if decision == "deny": result = ToolResult( ok=False, error="Tool action denied by user.", metadata={"decision": "deny"}, ) else: gateway = ToolGateway.default((await self.task_store.get_task(task_id)).workspace or ".") result = await gateway.run_action(action, approved=True, password=password) result_payload = result.model_dump() await self._append_command_audit( task_id, index, tool_name, action, result_payload, approved=decision != "deny" ) await self.event_store.append( task_id, "tool_call_finished", {"index": index, "tool": tool_name, "result": result_payload}, ) return { "index": index, "tool": tool_name, "reason": action.get("reason"), "decision": decision, "result": result_payload, } async def _approval_action_index(self, task_id: str, action: dict[str, Any]) -> int: events = await self.event_store.list_events(task_id) for event in reversed(events): if ( event.event_type == "tool_approval_requested" and event.payload.get("action") == action ): return int(event.payload.get("index") or 1) return 1 async def _run_action_tools( self, task_id: str, messages: list[dict[str, str]], workspace: str | None, start_index: int = 1, ) -> list[dict[str, Any]]: try: await self.event_store.append(task_id, "model_call_started", {"role": "action"}) response = await self.model_client.chat("action", messages) directive = json.loads(response.content) except Exception as exc: await self.event_store.append( task_id, "action_directive_failed", {"error": str(exc)}, ) return [] await self.event_store.append(task_id, "action_directive", directive) actions = directive.get("actions") or [] if not isinstance(actions, list) or not actions: return [] gateway = ToolGateway.default(workspace or ".") observations: list[dict[str, Any]] = [] for index, action in enumerate(actions, start=start_index): if not isinstance(action, dict): observations.append( {"index": index, "ok": False, "error": "Action must be an object"} ) continue tool_name = str(action.get("tool", "")) await self.event_store.append( task_id, "tool_call_started", {"index": index, "tool": tool_name, "args": action.get("args") or {}}, ) approved_forever = ( self.approval_service is not None and await self.approval_service.is_allowed_forever(action) ) result = await gateway.run_action(action, approved=approved_forever) result_payload = result.model_dump() await self._append_command_audit( task_id, index, tool_name, action, result_payload, approved=approved_forever ) if result.metadata.get("requires_approval"): approval = None if self.approval_service is not None: approval = await self.approval_service.create_pending(task_id, action) await self.event_store.append( task_id, "tool_approval_requested", { "index": index, "tool": tool_name, "action": action, "approval_id": approval.approval_id if approval else None, "reason": result.error, }, ) observations.append( { "index": index, "tool": tool_name, "reason": action.get("reason"), "requires_approval": True, "approval_id": approval.approval_id if approval else None, "result": result_payload, } ) break await self.event_store.append( task_id, "tool_call_finished", {"index": index, "tool": tool_name, "result": result_payload}, ) observations.append( { "index": index, "tool": tool_name, "reason": action.get("reason"), "result": result_payload, } ) return observations async def _append_command_audit( self, task_id: str, index: int, tool_name: str, action: dict[str, Any], result_payload: dict[str, Any], approved: bool, ) -> None: if tool_name != "shell_exec_safe": return metadata = result_payload.get("metadata") or {} command = metadata.get("command") or (action.get("args") or {}).get("command") await self.event_store.append( task_id, "command_audit", { "index": index, "tool": tool_name, "command": command, "action_type": metadata.get("action_type"), "risk_level": metadata.get("risk_level"), "requires_approval": bool(metadata.get("requires_approval")), "blocked": bool(metadata.get("blocked")), "approved": approved, "ok": bool(result_payload.get("ok")), "returncode": metadata.get("returncode"), "reason": metadata.get("reason") or result_payload.get("error"), }, ) async def _run_action_loop( self, task_id: str, messages: list[dict[str, str]], workspace: str | None, initial_observations: list[dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: all_observations = list(initial_observations or []) current_messages = messages if all_observations: current_messages = [ *messages, { "role": "user", "content": "tool_observations:\n" + json.dumps(all_observations, ensure_ascii=False, indent=2), }, ] for _ in range(self.max_tool_iterations): observations = await self._run_action_tools( task_id, current_messages, workspace, start_index=len(all_observations) + 1, ) if not observations: return all_observations all_observations.extend(observations) if any(observation.get("requires_approval") for observation in observations): return all_observations current_messages = [ *messages, { "role": "user", "content": "tool_observations:\n" + json.dumps(all_observations, ensure_ascii=False, indent=2), }, ] await self.event_store.append( task_id, "tool_iteration_limit_reached", {"max_tool_iterations": self.max_tool_iterations}, ) return all_observations