ducklm/app/runtime/async_runtime_loop.py

148 lines
6.0 KiB
Python

from __future__ import annotations
import asyncio
from app.core.context_builder import ContextBuilder
from app.core.contracts import ExecutionDirective, PermissionDecision, PermissionRequest, RuntimeEvent, TaskCheckpoint, UserTask
from app.core.execution_engine import ExecutionEngine
from app.core.async_router import AsyncRouter
from app.events.event_bus import EventBus
from app.events.event_types import CHECKPOINT_SAVED, CONTEXT_BUILT, TASK_AWAITING_PERMISSION, TASK_COMPLETED, TASK_FAILED, TASK_RECEIVED
from app.core.permission_service import PermissionService
from app.state.checkpoint_store import SQLiteCheckpointStore
from app.state.task_state_store import SQLiteTaskStateStore
class AsyncRuntimeLoop:
"""Async runtime loop using LLM orchestrator."""
def __init__(
self,
event_bus: EventBus,
task_state_store: SQLiteTaskStateStore,
checkpoint_store: SQLiteCheckpointStore,
context_builder: ContextBuilder,
router: AsyncRouter,
execution_engine: ExecutionEngine,
permission_service: PermissionService,
memory_interface=None,
) -> None:
self._event_bus = event_bus
self._task_state_store = task_state_store
self._checkpoint_store = checkpoint_store
self._context_builder = context_builder
self._router = router
self._execution_engine = execution_engine
self._permission_service = permission_service
self._memory_interface = memory_interface
async def run_task(self, task: UserTask) -> dict[str, object]:
state = self._task_state_store.create_task(
task.task_id,
{
"status": "received",
"session_id": task.session_id,
"plan": None,
"task_input": task.input,
"task_context": task.context,
},
)
self._publish(task, TASK_RECEIVED, {"status": "received"})
checkpoint = TaskCheckpoint(task_id=task.task_id, status="received")
self._checkpoint_store.save(checkpoint)
self._publish(task, CHECKPOINT_SAVED, checkpoint.model_dump(mode="json"))
context = self._context_builder.build(task=task, checkpoint=checkpoint)
self._publish(task, CONTEXT_BUILT, {"keys": sorted(context.keys())})
directive = await self._router.decide(state=state, context=context, task_id=task.task_id, session_id=task.session_id)
execution_result = await asyncio.to_thread(
self._execution_engine.execute,
task=task,
directive=directive,
)
state_patch = {"status": execution_result["status"], "last_directive": directive.model_dump(mode="json")}
if execution_result["status"] == "awaiting_permission":
state_patch["pending_permission_request"] = execution_result["result"].get("permission_request")
self._task_state_store.update_task(task.task_id, state_patch)
status = execution_result["status"]
if status == "completed":
self._publish(task, TASK_COMPLETED, {"directive": directive.model_dump(mode="json"), "execution_result": execution_result["result"]})
elif status == "failed":
self._publish(task, TASK_FAILED, {"error": execution_result.get("result", {}).get("error")})
checkpoint.status = status
self._checkpoint_store.save(checkpoint)
self._publish(task, CHECKPOINT_SAVED, checkpoint.model_dump(mode="json"))
# Save task and result to memory for session context
self._save_to_memory(task, execution_result, status)
return {
"task_id": task.task_id,
"status": status,
"directive": directive.model_dump(mode="json"),
"result": execution_result.get("result"),
"events": list(self._event_bus.get_task_events(task.task_id)),
}
def _publish(self, task: UserTask, event_type: str, payload: dict) -> None:
if not self._event_bus:
return
event = RuntimeEvent(
task_id=task.task_id,
session_id=task.session_id,
sequence=self._event_bus.next_sequence(task.task_id),
type=event_type,
payload=payload,
)
self._event_bus.publish(event)
def _save_to_memory(self, task: UserTask, execution_result: dict, status: str) -> None:
"""Save task input and result to memory for session context."""
if not self._memory_interface:
return
try:
# Save task input as summary
self._memory_interface.insert(
text=f"User request: {task.input}",
kind="summary",
source="user",
task_id=task.task_id,
session_id=task.session_id,
weight=0.8,
metadata={"status": status},
)
# Save execution result
result_text = ""
if status == "completed":
step_results = execution_result.get("result", {}).get("step_results", [])
if step_results:
for step in step_results:
tool_result = step.get("result", {}).get("result", {})
if tool_result.get("output"):
result_text += f" | {step.get('step_id')}: {tool_result.get('output')[:200]}"
elif status == "failed":
result_text = f" | Error: {execution_result.get('result', {}).get('error', 'Unknown')}"
if result_text:
self._memory_interface.insert(
text=f"Result: {status}{result_text}",
kind="tool_result",
source="system",
task_id=task.task_id,
session_id=task.session_id,
weight=0.7,
metadata={"status": status},
)
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"Failed to save to memory: {e}")