ducklm/tests/smoke/test_runtime_tools.py

386 lines
15 KiB
Python

import json
import pytest
from duck_core.events.store import EventStore
from duck_core.model_client import ModelResponse
from duck_core.approvals.service import ApprovalService
from duck_core.runtime_loop import RuntimeLoop
from duck_core.tasks.store import TaskStore
class FakeToolModelClient:
async def chat(self, role, messages):
if role == "action":
if any("tool_observations" in message["content"] for message in messages):
actions = []
else:
actions = [
{
"tool": "file_read",
"args": {"path": "note.txt"},
"reason": "User asked for file contents",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "read requested file",
"risk_level": "low",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=5.0,
)
assert role == "thinker"
assert any("tool_observations" in message["content"] for message in messages)
return ModelResponse(
role=role,
model="local-main",
content="The file says: hello from tool",
reasoning_content="used file_read",
raw={},
latency_ms=12.0,
)
class FakeMultiStepToolModelClient:
async def chat(self, role, messages):
if role == "action":
observation_text = "\n".join(message["content"] for message in messages)
if "tool_observations" not in observation_text:
actions = [
{
"tool": "list_dir",
"args": {"path": "."},
"reason": "Find available files",
}
]
elif "README.md" in observation_text and "readme contents" not in observation_text:
actions = [
{
"tool": "file_read",
"args": {"path": "README.md"},
"reason": "Read discovered README",
}
]
else:
actions = []
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "multi-step file inspection",
"risk_level": "low",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=5.0,
)
assert role == "thinker"
observation_text = "\n".join(message["content"] for message in messages)
assert "list_dir" in observation_text
assert "file_read" in observation_text
assert "readme contents" in observation_text
return ModelResponse(
role=role,
model="local-main",
content="Readme inspected",
reasoning_content=None,
raw={},
latency_ms=12.0,
)
@pytest.mark.asyncio
async def test_runtime_executes_action_directive_tool_and_finishes_with_observation(tmp_path):
(tmp_path / "note.txt").write_text("hello from tool")
db_path = str(tmp_path / "duck.sqlite3")
task_store = TaskStore(db_path)
event_store = EventStore(db_path)
loop = RuntimeLoop(task_store, event_store, FakeToolModelClient())
result = await loop.run_chat("read note.txt", str(tmp_path), debug=True)
events = await event_store.list_events(result.task_id)
event_types = [event.event_type for event in events]
tool_finished = next(event for event in events if event.event_type == "tool_call_finished")
assert result.status == "completed"
assert result.final_response == "The file says: hello from tool"
assert "action_directive" in event_types
assert "tool_call_started" in event_types
assert tool_finished.payload["tool"] == "file_read"
assert tool_finished.payload["result"]["ok"] is True
assert tool_finished.payload["result"]["output"] == "hello from tool"
@pytest.mark.asyncio
async def test_runtime_runs_multiple_tool_steps_before_final_answer(tmp_path):
(tmp_path / "README.md").write_text("readme contents")
db_path = str(tmp_path / "duck.sqlite3")
task_store = TaskStore(db_path)
event_store = EventStore(db_path)
loop = RuntimeLoop(task_store, event_store, FakeMultiStepToolModelClient())
result = await loop.run_chat("inspect the workspace readme", str(tmp_path), debug=True)
events = await event_store.list_events(result.task_id)
finished_tools = [
event.payload["tool"] for event in events if event.event_type == "tool_call_finished"
]
assert result.status == "completed"
assert result.final_response == "Readme inspected"
assert finished_tools == ["list_dir", "file_read"]
class FakeApprovalModelClient:
async def chat(self, role, messages):
if role == "action":
if any("tool_observations" in message["content"] for message in messages):
actions = []
else:
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "uname -a"},
"reason": "User requested system information",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=5.0,
)
raise AssertionError("thinker must not be called while approval is pending")
@pytest.mark.asyncio
async def test_runtime_creates_pending_approval_when_tool_requires_it(tmp_path):
db_path = str(tmp_path / "duck.sqlite3")
task_store = TaskStore(db_path)
event_store = EventStore(db_path)
approvals = ApprovalService(db_path)
loop = RuntimeLoop(task_store, event_store, FakeApprovalModelClient(), approval_service=approvals)
result = await loop.run_chat("run uname", str(tmp_path), debug=True)
pending = await approvals.pending()
events = await event_store.list_events(result.task_id)
assert result.status == "waiting_for_approval"
assert pending[0].task_id == result.task_id
assert pending[0].normalized_action["tool"] == "shell_exec_safe"
assert any(event.event_type == "tool_approval_requested" for event in events)
class FakeApprovalContinuationModelClient:
def __init__(self):
self.thinker_messages = []
async def chat(self, role, messages):
if role == "action":
if any("tool_observations" in message["content"] for message in messages):
actions = []
else:
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "uname -a"},
"reason": "User requested system information",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=5.0,
)
assert role == "thinker"
self.thinker_messages = messages
assert any("tool_observations" in message["content"] for message in messages)
return ModelResponse(
role=role,
model="local-main",
content="uname completed",
reasoning_content="used approved shell command",
raw={},
latency_ms=10.0,
)
class FakeApprovalThenSecondToolModelClient:
async def chat(self, role, messages):
observation_text = "\n".join(message["content"] for message in messages)
if role == "action":
if "tool_observations" in observation_text and "second step content" not in observation_text:
actions = [
{
"tool": "file_read",
"args": {"path": "second.txt"},
"reason": "Read follow-up file after approved command",
}
]
elif "tool_observations" in observation_text:
actions = []
else:
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "uname -a"},
"reason": "User requested system information",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "approval then follow-up",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=5.0,
)
assert role == "thinker"
assert "shell_exec_safe" in observation_text
assert "file_read" in observation_text
assert "second step content" in observation_text
return ModelResponse(
role=role,
model="local-main",
content="approved command and second tool completed",
reasoning_content=None,
raw={},
latency_ms=10.0,
)
@pytest.mark.asyncio
async def test_runtime_continues_after_approved_tool_call(tmp_path):
db_path = str(tmp_path / "duck.sqlite3")
task_store = TaskStore(db_path)
event_store = EventStore(db_path)
approvals = ApprovalService(db_path)
model_client = FakeApprovalContinuationModelClient()
loop = RuntimeLoop(task_store, event_store, model_client, approval_service=approvals)
pending_result = await loop.run_chat("run uname", str(tmp_path), debug=True)
pending = await approvals.pending()
await approvals.allow_once(pending[0].approval_id)
result = await loop.continue_after_approval(pending_result.task_id, pending[0].approval_id)
events = await event_store.list_events(result.task_id)
finished = next(event for event in events if event.event_type == "tool_call_finished")
assert result.status == "completed"
assert result.final_response == "uname completed"
assert finished.payload["tool"] == "shell_exec_safe"
assert finished.payload["result"]["ok"] is True
assert "uname" in finished.payload["result"]["metadata"]["command"]
assert any(event.event_type == "task_completed" for event in events)
@pytest.mark.asyncio
async def test_runtime_can_run_followup_tool_after_approval(tmp_path):
(tmp_path / "second.txt").write_text("second step content")
db_path = str(tmp_path / "duck.sqlite3")
task_store = TaskStore(db_path)
event_store = EventStore(db_path)
approvals = ApprovalService(db_path)
loop = RuntimeLoop(
task_store,
event_store,
FakeApprovalThenSecondToolModelClient(),
approval_service=approvals,
)
pending_result = await loop.run_chat("run uname then inspect second file", str(tmp_path), debug=True)
pending = await approvals.pending()
await approvals.allow_once(pending[0].approval_id)
result = await loop.continue_after_approval(pending_result.task_id, pending[0].approval_id)
events = await event_store.list_events(result.task_id)
finished_tools = [
event.payload["tool"] for event in events if event.event_type == "tool_call_finished"
]
assert result.status == "completed"
assert finished_tools == ["shell_exec_safe", "file_read"]
@pytest.mark.asyncio
async def test_runtime_continues_after_denied_tool_call_without_execution(tmp_path):
db_path = str(tmp_path / "duck.sqlite3")
task_store = TaskStore(db_path)
event_store = EventStore(db_path)
approvals = ApprovalService(db_path)
model_client = FakeApprovalContinuationModelClient()
loop = RuntimeLoop(task_store, event_store, model_client, approval_service=approvals)
pending_result = await loop.run_chat("run uname", str(tmp_path), debug=True)
pending = await approvals.pending()
await approvals.deny(pending[0].approval_id)
result = await loop.continue_after_approval(pending_result.task_id, pending[0].approval_id)
events = await event_store.list_events(result.task_id)
finished = next(event for event in events if event.event_type == "tool_call_finished")
assert result.status == "completed"
assert finished.payload["result"]["ok"] is False
assert finished.payload["result"]["metadata"]["decision"] == "deny"
assert "denied" in finished.payload["result"]["error"].lower()
@pytest.mark.asyncio
async def test_runtime_reuses_allow_forever_for_matching_action(tmp_path):
db_path = str(tmp_path / "duck.sqlite3")
task_store = TaskStore(db_path)
event_store = EventStore(db_path)
approvals = ApprovalService(db_path)
model_client = FakeApprovalContinuationModelClient()
loop = RuntimeLoop(task_store, event_store, model_client, approval_service=approvals)
first_result = await loop.run_chat("run uname", str(tmp_path), debug=True)
first_pending = await approvals.pending()
await approvals.allow_forever(first_pending[0].approval_id)
await loop.continue_after_approval(first_result.task_id, first_pending[0].approval_id)
second_result = await loop.run_chat("run uname again", str(tmp_path), debug=True)
second_events = await event_store.list_events(second_result.task_id)
assert second_result.status == "completed"
assert second_result.final_response == "uname completed"
assert not any(event.event_type == "tool_approval_requested" for event in second_events)
assert any(event.event_type == "tool_call_finished" for event in second_events)