ducklm/tests/smoke/test_api_stream_chat.py

384 lines
14 KiB
Python

from fastapi.testclient import TestClient
import json
import re
from duck_core.model_client import ModelResponse
from duck_core.api import create_app
def test_stream_chat_endpoint_emits_sse_reasoning_and_content(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
async def fake_chat(self, role, messages):
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "answer directly",
"risk_level": "none",
"actions": [],
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
yield {"type": "reasoning_delta", "delta": "thinking"}
yield {"type": "content_delta", "delta": "answer"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
app = create_app()
client = TestClient(app)
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "hello", "workspace": "./workspace", "debug": True},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: reasoning_delta" in body
assert "event: content_delta" in body
assert "event: done" in body
assert "thinking" in body
assert "answer" in body
def test_stream_chat_endpoint_executes_tool_before_streaming_answer(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
(tmp_path / "note.txt").write_text("stream tool content")
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
if any("tool_observations" in message["content"] for message in messages):
actions = []
else:
actions = [
{
"tool": "file_read",
"args": {"path": "note.txt"},
"reason": "User asked for file contents",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "read requested file",
"risk_level": "low",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
assert role == "thinker"
assert any("tool_observations" in message["content"] for message in messages)
yield {"type": "content_delta", "delta": "answer from tool"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "read note.txt", "workspace": str(tmp_path), "debug": True},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: tool_call_started" in body
assert "event: tool_call_finished" in body
assert "stream tool content" in body
assert "event: content_delta" in body
assert "answer from tool" in body
assert "event: done" in body
def test_continue_stream_executes_approved_tool_and_streams_answer(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
action_calls = 0
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
nonlocal action_calls
assert role == "action"
action_calls += 1
if any("tool_observations" in message["content"] for message in messages):
actions = []
else:
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "uname -a"},
"reason": "User asked for system information",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
assert role == "thinker"
observation_message = next(message for message in messages if "tool_observations" in message["content"])
assert "uname" in observation_message["content"]
yield {"type": "content_delta", "delta": "continued after approval"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "run uname", "workspace": str(tmp_path), "debug": True},
) as response:
initial_body = "".join(response.iter_text())
task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1)
pending = client.get("/v1/approvals/pending").json()
approval = next(item for item in pending if item["task_id"] == task_id)
client.post(f"/v1/approvals/{approval['approval_id']}/allow_once")
with client.stream(
"POST",
f"/v1/tasks/{approval['task_id']}/continue/stream",
json={"approval_id": approval["approval_id"]},
) as response:
body = "".join(response.iter_text())
assert "event: tool_approval_requested" in initial_body
assert response.status_code == 200
assert "event: tool_call_finished" in body
assert "event: content_delta" in body
assert "continued after approval" in body
assert "event: done" in body
assert action_calls == 1
conversation_id = re.search(r'"conversation_id"\s*:\s*"([^"]+)"', initial_body).group(1)
conversation = client.get(f"/v1/conversations/{conversation_id}").json()
assert conversation["messages"][-1]["content"] == "continued after approval"
assert conversation["messages"][-1]["status"] == "completed"
def test_continue_stream_requests_password_for_approved_sudo(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
actions = []
if not any("tool_observations" in message["content"] for message in messages):
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "sudo apt update"},
"reason": "Check updates with root privileges",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run sudo command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "run sudo update", "workspace": str(tmp_path), "debug": True},
) as response:
initial_body = "".join(response.iter_text())
task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1)
approval = next(
item for item in client.get("/v1/approvals/pending").json()
if item["task_id"] == task_id
)
client.post(f"/v1/approvals/{approval['approval_id']}/allow_once")
with client.stream(
"POST",
f"/v1/tasks/{task_id}/continue/stream",
json={"approval_id": approval["approval_id"]},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: tool_password_requested" in body
assert "waiting_for_password" in body
assert "password" in body.lower()
def test_password_stream_runs_sudo_with_password_and_streams_answer(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
calls = []
class Completed:
returncode = 0
stdout = "sudo updated\n"
stderr = ""
def fake_run(command, **kwargs):
calls.append((command, kwargs))
return Completed()
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
if role == "action":
actions = []
if not any("tool_observations" in message["content"] for message in messages):
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "sudo apt update"},
"reason": "Check updates with root privileges",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run sudo command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
raise AssertionError("non-stream thinker should not be used")
async def fake_stream_chat(self, role, messages):
assert role == "thinker"
observation = next(message for message in messages if "tool_observations" in message["content"])
assert "sudo updated" in observation["content"]
assert "secret" not in observation["content"]
yield {"type": "content_delta", "delta": "sudo command completed"}
monkeypatch.setattr("duck_core.tools.shell_exec_safe.subprocess.run", fake_run)
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "run sudo update", "workspace": str(tmp_path), "debug": True},
) as response:
initial_body = "".join(response.iter_text())
task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1)
approval = next(
item for item in client.get("/v1/approvals/pending").json()
if item["task_id"] == task_id
)
client.post(f"/v1/approvals/{approval['approval_id']}/allow_once")
with client.stream(
"POST",
f"/v1/tasks/{task_id}/continue/stream",
json={"approval_id": approval["approval_id"]},
) as response:
_ = "".join(response.iter_text())
with client.stream(
"POST",
f"/v1/tasks/{task_id}/password/stream",
json={"approval_id": approval["approval_id"], "password": "secret"},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert calls[0][1]["input"] == "secret\n"
assert "event: tool_call_finished" in body
assert "event: content_delta" in body
assert "sudo command completed" in body
assert "secret" not in body
def test_command_audit_endpoint_exposes_redacted_shell_events(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
actions = []
if not any("tool_observations" in message["content"] for message in messages):
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "apt list --upgradable"},
"reason": "Check available updates",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "check updates",
"risk_level": "low",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
yield {"type": "content_delta", "delta": "updates checked"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "check updates", "workspace": str(tmp_path), "debug": True},
) as response:
_ = "".join(response.iter_text())
audit = client.get("/v1/audit/commands").json()
assert response.status_code == 200
assert audit[0]["event_type"] == "command_audit"
assert audit[0]["payload"]["command"] == "apt list --upgradable"
assert audit[0]["payload"]["risk_level"] == "low"
assert audit[0]["payload"]["action_type"] == "package_check"
assert audit[0]["payload"]["approved"] is False
assert "password" not in json.dumps(audit).lower()