import json from pathlib import Path from app.core.contracts import ExecutionDirective, UserTask from app.runtime.runtime_controller import RuntimeController def _write_config_tree(base_dir: Path) -> None: (base_dir / "config").mkdir() (base_dir / "data" / "events").mkdir(parents=True, exist_ok=True) (base_dir / "data" / "state").mkdir(parents=True, exist_ok=True) (base_dir / "data" / "permissions").mkdir(parents=True, exist_ok=True) (base_dir / "models").mkdir(exist_ok=True) configs = { "models.json": { "orchestrator_path": "models/llama.gguf", "coder_path": "models/xcoder.gguf", "critic_path": "models/gemma.gguf", "embeddings_path": "models/all-MiniLM-L6-v2", "inference": {}, }, "prompts.json": { "orchestration_prompt": "", "planning_prompt": "", "coder_prompt": "", "critic_prompt": "", }, "permissions.json": { "dangerous_commands": {"rm": "ask_always", "sudo": "ask_always"}, "sensitive_paths": ["/etc", "/usr", "/var"], "default_approval_behavior": "ask_always", }, "runtime.json": { "step_timeout_ms": 5000, "task_timeout_ms": 30000, "planner_retry_limit": 1, "tool_retry_limit": 0, "replan_limit": 0, "max_execution_steps": 5, "retrieval_top_k": 3, "memory_thresholds": {}, "critic_fallback_policy": "continue_without_critic", "checkpoint_policy": {"save_on_transition": True}, "event_retention_policy": {"keep_all": True}, "streaming_settings": {"enabled": True}, }, } for name, payload in configs.items(): (base_dir / "config" / name).write_text(json.dumps(payload), encoding="utf-8") def test_file_write_and_read_tool_flow(tmp_path: Path) -> None: _write_config_tree(tmp_path) controller = RuntimeController(base_dir=tmp_path) target = tmp_path / "notes" / "test.txt" write_result = controller.handle_task( UserTask( input="write a file", context={ "requested_tool": "file_write", "tool_args": {"path": str(target), "content": "hello from ducklm"}, }, ) ) assert write_result["status"] == "completed" assert target.read_text(encoding="utf-8") == "hello from ducklm" read_result = controller.handle_task( UserTask( input="read the file", context={ "requested_tool": "file_read", "tool_args": {"path": str(target)}, }, ) ) assert read_result["status"] == "completed" assert read_result["result"]["output"] == "hello from ducklm" def test_shell_exec_requires_permission_for_dangerous_command(tmp_path: Path) -> None: _write_config_tree(tmp_path) controller = RuntimeController(base_dir=tmp_path) result = controller.handle_task( UserTask( input="run dangerous shell command", context={ "requested_tool": "shell_exec", "tool_args": {"command": "rm -rf /tmp/nonexistent"}, }, ) ) assert result["status"] == "awaiting_permission" assert "permission_request" in result["result"] def test_shell_exec_allows_safe_command(tmp_path: Path) -> None: _write_config_tree(tmp_path) controller = RuntimeController(base_dir=tmp_path) result = controller.handle_task( UserTask( input="run safe shell command", context={ "requested_tool": "shell_exec", "tool_args": {"command": "pwd"}, }, ) ) assert result["status"] == "completed" assert str(tmp_path) in result["result"]["output"] class _RecoveryCritic: async def generate(self, prompt: str, max_tokens: int | None = None) -> str: return '{"action":"continue","reason":"No matches is acceptable information for this exploratory check."}' def test_failed_shell_step_can_recover_and_continue(tmp_path: Path) -> None: _write_config_tree(tmp_path) controller = RuntimeController(base_dir=tmp_path) controller.execution_engine.set_critic(_RecoveryCritic()) controller.execution_engine._recovery_limit = 1 result = controller.execution_engine.execute( UserTask( input="run grep with no matches and recover", ), ExecutionDirective( type="plan", payload={ "steps": [ { "id": "1", "tool": "shell_exec", "args": {"command": "printf 'abc\\n' | grep definitely_missing"}, "depends_on": [], } ] }, ), ) assert result["status"] == "completed" failed_result = result["result"]["step_results"][0]["result"]["result"] assert failed_result["metadata"]["exit_code"] == 1 def test_permission_resolution_can_resume_task(tmp_path: Path) -> None: _write_config_tree(tmp_path) controller = RuntimeController(base_dir=tmp_path) initial = controller.handle_task( UserTask( input="запусти sudo apt update", ) ) assert initial["status"] == "awaiting_permission" resumed = controller.resolve_permission(task_id=initial["task_id"], decision="deny") assert resumed["status"] == "failed" assert resumed["result"]["error"] == "Permission denied by user." def test_sudo_permission_resolution_requests_secret_input(tmp_path: Path) -> None: _write_config_tree(tmp_path) controller = RuntimeController(base_dir=tmp_path) initial = controller.handle_task(UserTask(input="запусти sudo apt update")) assert initial["status"] == "awaiting_permission" resumed = controller.resolve_permission(task_id=initial["task_id"], decision="allow_once") assert resumed["status"] == "awaiting_input" assert resumed["result"]["secret_request"]["kind"] == "sudo_password" def test_secret_resolution_continues_after_pending_secret_saved(tmp_path: Path) -> None: _write_config_tree(tmp_path) controller = RuntimeController(base_dir=tmp_path) initial = controller.handle_task(UserTask(input="запусти sudo apt update")) resumed = controller.resolve_permission(task_id=initial["task_id"], decision="allow_once") assert resumed["status"] == "awaiting_input" final = controller.resolve_secret(task_id=initial["task_id"], secret="wrongpass") assert final["status"] in {"completed", "failed"} assert "error" in final["result"] or "output" in final["result"]