182 lines
6.1 KiB
Python
182 lines
6.1 KiB
Python
import pytest
|
|
|
|
from duck_core.tools.file_read import FileReadTool
|
|
from duck_core.tools.file_write import FileWriteTool
|
|
from duck_core.tools.gateway import ToolGateway
|
|
from duck_core.tools.command_policy import CommandPolicy
|
|
from duck_core.tools.shell_exec_safe import ShellExecSafeTool
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_tools_stay_inside_workspace(tmp_path):
|
|
write = FileWriteTool(str(tmp_path))
|
|
read = FileReadTool(str(tmp_path))
|
|
|
|
result = await write.run({"path": "tmp/note.txt", "content": "hello duck"})
|
|
loaded = await read.run({"path": "tmp/note.txt"})
|
|
escaped = await read.run({"path": "../outside.txt"})
|
|
|
|
assert result.ok is True
|
|
assert loaded.output == "hello duck"
|
|
assert escaped.ok is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_tool_blocks_dangerous_commands(tmp_path):
|
|
shell = ShellExecSafeTool(str(tmp_path))
|
|
|
|
allowed = await shell.run({"command": "pwd"})
|
|
blocked = await shell.run({"command": "rm -rf ."})
|
|
sudo = await shell.run({"command": "sudo apt update"})
|
|
|
|
assert allowed.ok is True
|
|
assert blocked.ok is False
|
|
assert blocked.metadata.get("requires_approval") is not True
|
|
assert blocked.metadata["blocked"] is True
|
|
assert sudo.ok is False
|
|
assert sudo.metadata["requires_approval"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_tool_requests_password_for_approved_sudo(tmp_path):
|
|
shell = ShellExecSafeTool(str(tmp_path))
|
|
|
|
result = await shell.run({"command": "sudo apt update", "_approved": True})
|
|
|
|
assert result.ok is False
|
|
assert result.metadata["requires_password"] is True
|
|
assert "password" in result.error.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_tool_passes_password_to_sudo_stdin(monkeypatch, tmp_path):
|
|
calls = []
|
|
|
|
class Completed:
|
|
returncode = 0
|
|
stdout = "updated\n"
|
|
stderr = ""
|
|
|
|
def fake_run(command, **kwargs):
|
|
calls.append((command, kwargs))
|
|
return Completed()
|
|
|
|
monkeypatch.setattr("duck_core.tools.shell_exec_safe.subprocess.run", fake_run)
|
|
shell = ShellExecSafeTool(str(tmp_path))
|
|
|
|
result = await shell.run(
|
|
{"command": "sudo apt update", "_approved": True, "_password": "secret"}
|
|
)
|
|
|
|
command, kwargs = calls[0]
|
|
assert result.ok is True
|
|
assert "sudo -S -p '' apt update" == command
|
|
assert kwargs["input"] == "secret\n"
|
|
assert "secret" not in str(result.model_dump())
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tool_gateway_runs_allowed_directive(tmp_path):
|
|
gateway = ToolGateway.default(str(tmp_path))
|
|
result = await gateway.run_action(
|
|
{"tool": "file_write", "args": {"path": "a.txt", "content": "x"}}
|
|
)
|
|
|
|
assert result.ok is True
|
|
assert result.metadata["path"].endswith("a.txt")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tool_gateway_lists_workspace_directory(tmp_path):
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "app.py").write_text("print('duck')")
|
|
(tmp_path / "README.md").write_text("hello")
|
|
gateway = ToolGateway.default(str(tmp_path))
|
|
|
|
result = await gateway.run_action({"tool": "list_dir", "args": {"path": "."}})
|
|
escaped = await gateway.run_action({"tool": "list_dir", "args": {"path": ".."}})
|
|
|
|
assert result.ok is True
|
|
assert "README.md" in result.output
|
|
assert "src/" in result.output
|
|
assert escaped.ok is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tool_gateway_requests_approval_for_directory_outside_workspace(tmp_path):
|
|
workspace = tmp_path / "workspace"
|
|
outside = tmp_path / "outside"
|
|
workspace.mkdir()
|
|
outside.mkdir()
|
|
(outside / "note.txt").write_text("external")
|
|
gateway = ToolGateway.default(str(workspace))
|
|
action = {"tool": "list_dir", "args": {"path": str(outside)}}
|
|
|
|
blocked = await gateway.run_action(action)
|
|
approved = await gateway.run_action(action, approved=True)
|
|
|
|
assert blocked.ok is False
|
|
assert blocked.metadata["requires_approval"] is True
|
|
assert blocked.metadata["path"] == str(outside)
|
|
assert approved.ok is True
|
|
assert "note.txt" in approved.output
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tool_gateway_searches_file_contents(tmp_path):
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "app.py").write_text("duck tool gateway\\n")
|
|
(tmp_path / "notes.txt").write_text("other content\\n")
|
|
gateway = ToolGateway.default(str(tmp_path))
|
|
|
|
result = await gateway.run_action(
|
|
{"tool": "search_files", "args": {"query": "duck tool", "path": "."}}
|
|
)
|
|
escaped = await gateway.run_action(
|
|
{"tool": "search_files", "args": {"query": "duck", "path": ".."}}
|
|
)
|
|
|
|
assert result.ok is True
|
|
assert "src/app.py:1:duck tool gateway" in result.output
|
|
assert result.metadata["matches"] == 1
|
|
assert escaped.ok is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_tool_allows_read_only_apt_update_check(monkeypatch, tmp_path):
|
|
class Completed:
|
|
returncode = 0
|
|
stdout = "Listing...\\nbootlogd/stable 3.14-4 amd64 [upgradable from: 3.06-4]\\n"
|
|
stderr = "WARNING: apt does not have a stable CLI interface.\\n"
|
|
|
|
monkeypatch.setattr(
|
|
"duck_core.tools.shell_exec_safe.subprocess.run",
|
|
lambda *args, **kwargs: Completed(),
|
|
)
|
|
shell = ShellExecSafeTool(str(tmp_path))
|
|
|
|
result = await shell.run({"command": "apt list --upgradable"})
|
|
|
|
assert result.ok is True
|
|
assert "bootlogd" in result.output
|
|
assert result.metadata["command"] == "apt list --upgradable"
|
|
|
|
|
|
def test_command_policy_classifies_common_system_commands():
|
|
readonly = CommandPolicy.classify("apt list --upgradable")
|
|
sudo_update = CommandPolicy.classify("sudo apt update")
|
|
install = CommandPolicy.classify("sudo apt install vim")
|
|
destructive = CommandPolicy.classify("rm -rf .")
|
|
|
|
assert readonly.risk_level == "low"
|
|
assert readonly.action_type == "package_check"
|
|
assert readonly.requires_approval is False
|
|
assert sudo_update.risk_level == "high"
|
|
assert sudo_update.action_type == "package_cache_update"
|
|
assert sudo_update.requires_password is True
|
|
assert install.risk_level == "high"
|
|
assert install.action_type == "package_install"
|
|
assert install.requires_password is True
|
|
assert destructive.risk_level == "critical"
|
|
assert destructive.blocked is True
|