import pytest from duck_core.tools.file_read import FileReadTool from duck_core.tools.file_write import FileWriteTool from duck_core.tools.gateway import ToolGateway from duck_core.tools.command_policy import CommandPolicy from duck_core.tools.shell_exec_safe import ShellExecSafeTool @pytest.mark.asyncio async def test_file_tools_stay_inside_workspace(tmp_path): write = FileWriteTool(str(tmp_path)) read = FileReadTool(str(tmp_path)) result = await write.run({"path": "tmp/note.txt", "content": "hello duck"}) loaded = await read.run({"path": "tmp/note.txt"}) escaped = await read.run({"path": "../outside.txt"}) assert result.ok is True assert loaded.output == "hello duck" assert escaped.ok is False @pytest.mark.asyncio async def test_shell_tool_blocks_dangerous_commands(tmp_path): shell = ShellExecSafeTool(str(tmp_path)) allowed = await shell.run({"command": "pwd"}) blocked = await shell.run({"command": "rm -rf ."}) sudo = await shell.run({"command": "sudo apt update"}) assert allowed.ok is True assert blocked.ok is False assert blocked.metadata.get("requires_approval") is not True assert blocked.metadata["blocked"] is True assert sudo.ok is False assert sudo.metadata["requires_approval"] is True @pytest.mark.asyncio async def test_shell_tool_requests_password_for_approved_sudo(tmp_path): shell = ShellExecSafeTool(str(tmp_path)) result = await shell.run({"command": "sudo apt update", "_approved": True}) assert result.ok is False assert result.metadata["requires_password"] is True assert "password" in result.error.lower() @pytest.mark.asyncio async def test_shell_tool_passes_password_to_sudo_stdin(monkeypatch, tmp_path): calls = [] class Completed: returncode = 0 stdout = "updated\n" stderr = "" def fake_run(command, **kwargs): calls.append((command, kwargs)) return Completed() monkeypatch.setattr("duck_core.tools.shell_exec_safe.subprocess.run", fake_run) shell = ShellExecSafeTool(str(tmp_path)) result = await shell.run( {"command": "sudo apt update", "_approved": True, "_password": "secret"} ) command, kwargs = calls[0] assert result.ok is True assert "sudo -S -p '' apt update" == command assert kwargs["input"] == "secret\n" assert "secret" not in str(result.model_dump()) @pytest.mark.asyncio async def test_tool_gateway_runs_allowed_directive(tmp_path): gateway = ToolGateway.default(str(tmp_path)) result = await gateway.run_action( {"tool": "file_write", "args": {"path": "a.txt", "content": "x"}} ) assert result.ok is True assert result.metadata["path"].endswith("a.txt") @pytest.mark.asyncio async def test_tool_gateway_lists_workspace_directory(tmp_path): (tmp_path / "src").mkdir() (tmp_path / "src" / "app.py").write_text("print('duck')") (tmp_path / "README.md").write_text("hello") gateway = ToolGateway.default(str(tmp_path)) result = await gateway.run_action({"tool": "list_dir", "args": {"path": "."}}) escaped = await gateway.run_action({"tool": "list_dir", "args": {"path": ".."}}) assert result.ok is True assert "README.md" in result.output assert "src/" in result.output assert escaped.ok is False @pytest.mark.asyncio async def test_tool_gateway_requests_approval_for_directory_outside_workspace(tmp_path): workspace = tmp_path / "workspace" outside = tmp_path / "outside" workspace.mkdir() outside.mkdir() (outside / "note.txt").write_text("external") gateway = ToolGateway.default(str(workspace)) action = {"tool": "list_dir", "args": {"path": str(outside)}} blocked = await gateway.run_action(action) approved = await gateway.run_action(action, approved=True) assert blocked.ok is False assert blocked.metadata["requires_approval"] is True assert blocked.metadata["path"] == str(outside) assert approved.ok is True assert "note.txt" in approved.output @pytest.mark.asyncio async def test_tool_gateway_searches_file_contents(tmp_path): (tmp_path / "src").mkdir() (tmp_path / "src" / "app.py").write_text("duck tool gateway\\n") (tmp_path / "notes.txt").write_text("other content\\n") gateway = ToolGateway.default(str(tmp_path)) result = await gateway.run_action( {"tool": "search_files", "args": {"query": "duck tool", "path": "."}} ) escaped = await gateway.run_action( {"tool": "search_files", "args": {"query": "duck", "path": ".."}} ) assert result.ok is True assert "src/app.py:1:duck tool gateway" in result.output assert result.metadata["matches"] == 1 assert escaped.ok is False @pytest.mark.asyncio async def test_shell_tool_allows_read_only_apt_update_check(monkeypatch, tmp_path): class Completed: returncode = 0 stdout = "Listing...\\nbootlogd/stable 3.14-4 amd64 [upgradable from: 3.06-4]\\n" stderr = "WARNING: apt does not have a stable CLI interface.\\n" monkeypatch.setattr( "duck_core.tools.shell_exec_safe.subprocess.run", lambda *args, **kwargs: Completed(), ) shell = ShellExecSafeTool(str(tmp_path)) result = await shell.run({"command": "apt list --upgradable"}) assert result.ok is True assert "bootlogd" in result.output assert result.metadata["command"] == "apt list --upgradable" def test_command_policy_classifies_common_system_commands(): readonly = CommandPolicy.classify("apt list --upgradable") sudo_update = CommandPolicy.classify("sudo apt update") install = CommandPolicy.classify("sudo apt install vim") destructive = CommandPolicy.classify("rm -rf .") assert readonly.risk_level == "low" assert readonly.action_type == "package_check" assert readonly.requires_approval is False assert sudo_update.risk_level == "high" assert sudo_update.action_type == "package_cache_update" assert sudo_update.requires_password is True assert install.risk_level == "high" assert install.action_type == "package_install" assert install.requires_password is True assert destructive.risk_level == "critical" assert destructive.blocked is True