ducklm/tests/smoke/test_tool_gateway.py

162 lines
5.4 KiB
Python

import pytest
from duck_core.tools.file_read import FileReadTool
from duck_core.tools.file_write import FileWriteTool
from duck_core.tools.gateway import ToolGateway
from duck_core.tools.command_policy import CommandPolicy
from duck_core.tools.shell_exec_safe import ShellExecSafeTool
@pytest.mark.asyncio
async def test_file_tools_stay_inside_workspace(tmp_path):
write = FileWriteTool(str(tmp_path))
read = FileReadTool(str(tmp_path))
result = await write.run({"path": "tmp/note.txt", "content": "hello duck"})
loaded = await read.run({"path": "tmp/note.txt"})
escaped = await read.run({"path": "../outside.txt"})
assert result.ok is True
assert loaded.output == "hello duck"
assert escaped.ok is False
@pytest.mark.asyncio
async def test_shell_tool_blocks_dangerous_commands(tmp_path):
shell = ShellExecSafeTool(str(tmp_path))
allowed = await shell.run({"command": "pwd"})
blocked = await shell.run({"command": "rm -rf ."})
sudo = await shell.run({"command": "sudo apt update"})
assert allowed.ok is True
assert blocked.ok is False
assert blocked.metadata.get("requires_approval") is not True
assert blocked.metadata["blocked"] is True
assert sudo.ok is False
assert sudo.metadata["requires_approval"] is True
@pytest.mark.asyncio
async def test_shell_tool_requests_password_for_approved_sudo(tmp_path):
shell = ShellExecSafeTool(str(tmp_path))
result = await shell.run({"command": "sudo apt update", "_approved": True})
assert result.ok is False
assert result.metadata["requires_password"] is True
assert "password" in result.error.lower()
@pytest.mark.asyncio
async def test_shell_tool_passes_password_to_sudo_stdin(monkeypatch, tmp_path):
calls = []
class Completed:
returncode = 0
stdout = "updated\n"
stderr = ""
def fake_run(command, **kwargs):
calls.append((command, kwargs))
return Completed()
monkeypatch.setattr("duck_core.tools.shell_exec_safe.subprocess.run", fake_run)
shell = ShellExecSafeTool(str(tmp_path))
result = await shell.run(
{"command": "sudo apt update", "_approved": True, "_password": "secret"}
)
command, kwargs = calls[0]
assert result.ok is True
assert "sudo -S -p '' apt update" == command
assert kwargs["input"] == "secret\n"
assert "secret" not in str(result.model_dump())
@pytest.mark.asyncio
async def test_tool_gateway_runs_allowed_directive(tmp_path):
gateway = ToolGateway.default(str(tmp_path))
result = await gateway.run_action(
{"tool": "file_write", "args": {"path": "a.txt", "content": "x"}}
)
assert result.ok is True
assert result.metadata["path"].endswith("a.txt")
@pytest.mark.asyncio
async def test_tool_gateway_lists_workspace_directory(tmp_path):
(tmp_path / "src").mkdir()
(tmp_path / "src" / "app.py").write_text("print('duck')")
(tmp_path / "README.md").write_text("hello")
gateway = ToolGateway.default(str(tmp_path))
result = await gateway.run_action({"tool": "list_dir", "args": {"path": "."}})
escaped = await gateway.run_action({"tool": "list_dir", "args": {"path": ".."}})
assert result.ok is True
assert "README.md" in result.output
assert "src/" in result.output
assert escaped.ok is False
@pytest.mark.asyncio
async def test_tool_gateway_searches_file_contents(tmp_path):
(tmp_path / "src").mkdir()
(tmp_path / "src" / "app.py").write_text("duck tool gateway\\n")
(tmp_path / "notes.txt").write_text("other content\\n")
gateway = ToolGateway.default(str(tmp_path))
result = await gateway.run_action(
{"tool": "search_files", "args": {"query": "duck tool", "path": "."}}
)
escaped = await gateway.run_action(
{"tool": "search_files", "args": {"query": "duck", "path": ".."}}
)
assert result.ok is True
assert "src/app.py:1:duck tool gateway" in result.output
assert result.metadata["matches"] == 1
assert escaped.ok is False
@pytest.mark.asyncio
async def test_shell_tool_allows_read_only_apt_update_check(monkeypatch, tmp_path):
class Completed:
returncode = 0
stdout = "Listing...\\nbootlogd/stable 3.14-4 amd64 [upgradable from: 3.06-4]\\n"
stderr = "WARNING: apt does not have a stable CLI interface.\\n"
monkeypatch.setattr(
"duck_core.tools.shell_exec_safe.subprocess.run",
lambda *args, **kwargs: Completed(),
)
shell = ShellExecSafeTool(str(tmp_path))
result = await shell.run({"command": "apt list --upgradable"})
assert result.ok is True
assert "bootlogd" in result.output
assert result.metadata["command"] == "apt list --upgradable"
def test_command_policy_classifies_common_system_commands():
readonly = CommandPolicy.classify("apt list --upgradable")
sudo_update = CommandPolicy.classify("sudo apt update")
install = CommandPolicy.classify("sudo apt install vim")
destructive = CommandPolicy.classify("rm -rf .")
assert readonly.risk_level == "low"
assert readonly.action_type == "package_check"
assert readonly.requires_approval is False
assert sudo_update.risk_level == "high"
assert sudo_update.action_type == "package_cache_update"
assert sudo_update.requires_password is True
assert install.risk_level == "high"
assert install.action_type == "package_install"
assert install.requires_password is True
assert destructive.risk_level == "critical"
assert destructive.blocked is True