ducklm/tests/smoke/test_api_stream_chat.py

614 lines
22 KiB
Python

from fastapi.testclient import TestClient
import json
import re
import time
from duck_core.model_client import ModelResponse
from duck_core.api import create_app
def test_stream_chat_endpoint_emits_sse_reasoning_and_content(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
async def fake_chat(self, role, messages):
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "answer directly",
"risk_level": "none",
"actions": [],
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
yield {"type": "reasoning_delta", "delta": "thinking"}
yield {"type": "content_delta", "delta": "answer"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
app = create_app()
client = TestClient(app)
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "hello", "workspace": "./workspace", "debug": True},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: reasoning_delta" in body
assert "event: content_delta" in body
assert "event: done" in body
assert '"generation_stats":' in body
assert '"min_tokens_per_second":' in body
assert '"avg_tokens_per_second":' in body
assert '"max_tokens_per_second":' in body
assert "thinking" in body
assert "answer" in body
def test_stream_chat_forwards_reasoning_toggle_to_thinker(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
captured = {}
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "answer directly",
"risk_level": "none",
"actions": [],
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(
self,
role,
messages,
temperature=None,
max_output_tokens=None,
response_format=None,
reasoning=None,
):
captured["role"] = role
captured["reasoning"] = reasoning
yield {"type": "content_delta", "delta": "answer"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={
"message": "hello",
"workspace": "./workspace",
"debug": True,
"reasoning": "off",
},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: done" in body
assert captured == {"role": "thinker", "reasoning": "off"}
def test_stream_chat_runs_memory_policy_and_reflection_after_completion(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
if role == "action":
content = {
"kind": "action_directive",
"intent": "answer directly",
"risk_level": "none",
"actions": [],
}
elif role == "memory_policy":
content = {
"should_store": True,
"memory_type": "preference",
"summary": "User wants streamed chats to retain memory.",
"importance": 0.8,
"scope": "workspace",
"metadata": {},
}
elif role == "critic":
return ModelResponse(
role=role,
model="local-main",
content="Task completed. Reusable lesson: streamed tasks need post-processing.",
reasoning_content=None,
raw={},
latency_ms=1.0,
)
else:
raise AssertionError(f"unexpected role: {role}")
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(content),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
assert role == "thinker"
yield {"type": "content_delta", "delta": "streamed answer"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
with TestClient(create_app()) as client:
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "remember this", "workspace": str(tmp_path), "debug": True},
) as response:
body = "".join(response.iter_text())
task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', body).group(1)
events = []
for _ in range(20):
events = client.get(f"/v1/tasks/{task_id}/events").json()
if any(event["event_type"] == "reflection_completed" for event in events):
break
time.sleep(0.05)
event_types = [event["event_type"] for event in events]
memory = client.get("/v1/memory", params={"workspace": str(tmp_path)}).json()
experience = client.get("/v1/experience").json()
assert response.status_code == 200
assert "event: done" in body
assert "memory_policy_decision" in event_types
assert "memory_stored" in event_types
assert "reflection_completed" in event_types
assert memory["results"][0]["text"] == "User wants streamed chats to retain memory."
assert len(experience) == 1
def test_stream_chat_endpoint_executes_tool_before_streaming_answer(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
(tmp_path / "note.txt").write_text("stream tool content")
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
if any("tool_observations" in message["content"] for message in messages):
actions = []
else:
actions = [
{
"tool": "file_read",
"args": {"path": "note.txt"},
"reason": "User asked for file contents",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "read requested file",
"risk_level": "low",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
assert role == "thinker"
assert any("tool_observations" in message["content"] for message in messages)
yield {"type": "content_delta", "delta": "answer from tool"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "read note.txt", "workspace": str(tmp_path), "debug": True},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert 'event: runtime_status\ndata: {"task_id":' in body
assert '"stage": "planning"' in body
assert body.index('"stage": "planning"') < body.index("event: tool_call_started")
assert "event: tool_call_started" in body
assert "event: tool_call_finished" in body
assert "stream tool content" in body
assert "event: content_delta" in body
assert "answer from tool" in body
assert "event: done" in body
def test_stream_chat_injects_candidate_skill_summary(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
captured_messages = []
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
captured_messages.extend(messages)
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "answer directly",
"risk_level": "none",
"actions": [],
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
yield {"type": "content_delta", "delta": "ok"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "analyze repository structure", "workspace": str(tmp_path), "debug": True},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: done" in body
assert any("Active skill:" in message["content"] for message in captured_messages)
assert any("analyze_project" in message["content"] for message in captured_messages)
def test_stream_chat_requests_approval_for_directory_outside_workspace(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
workspace = tmp_path / "workspace"
outside = tmp_path / "outside"
workspace.mkdir()
outside.mkdir()
(outside / "note.txt").write_text("external")
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "list external directory",
"risk_level": "low",
"actions": [
{
"tool": "list_dir",
"args": {"path": str(outside)},
"reason": "User asked for that directory",
}
],
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "show outside", "workspace": str(workspace), "debug": True},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: tool_approval_requested" in body
assert '"tool": "list_dir"' in body
assert str(outside) in body
assert '"status": "waiting_for_approval"' in body
def test_continue_stream_executes_approved_tool_and_streams_answer(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
action_calls = 0
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
nonlocal action_calls
assert role == "action"
action_calls += 1
if any("tool_observations" in message["content"] for message in messages):
actions = []
else:
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "uname -a"},
"reason": "User asked for system information",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
assert role == "thinker"
observation_message = next(message for message in messages if "tool_observations" in message["content"])
assert "uname" in observation_message["content"]
yield {"type": "content_delta", "delta": "continued after approval"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "run uname", "workspace": str(tmp_path), "debug": True},
) as response:
initial_body = "".join(response.iter_text())
task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1)
pending = client.get("/v1/approvals/pending").json()
approval = next(item for item in pending if item["task_id"] == task_id)
client.post(f"/v1/approvals/{approval['approval_id']}/allow_once")
with client.stream(
"POST",
f"/v1/tasks/{approval['task_id']}/continue/stream",
json={"approval_id": approval["approval_id"]},
) as response:
body = "".join(response.iter_text())
assert "event: tool_approval_requested" in initial_body
assert response.status_code == 200
assert '"stage": "running_tool"' in body
assert body.index('"stage": "running_tool"') < body.index("event: tool_call_finished")
assert "event: tool_call_finished" in body
assert "event: content_delta" in body
assert "continued after approval" in body
assert "event: done" in body
assert action_calls == 1
conversation_id = re.search(r'"conversation_id"\s*:\s*"([^"]+)"', initial_body).group(1)
conversation = client.get(f"/v1/conversations/{conversation_id}").json()
assert conversation["messages"][-1]["content"] == "continued after approval"
assert conversation["messages"][-1]["status"] == "completed"
def test_continue_stream_requests_password_for_approved_sudo(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
actions = []
if not any("tool_observations" in message["content"] for message in messages):
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "sudo apt update"},
"reason": "Check updates with root privileges",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run sudo command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "run sudo update", "workspace": str(tmp_path), "debug": True},
) as response:
initial_body = "".join(response.iter_text())
task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1)
approval = next(
item for item in client.get("/v1/approvals/pending").json()
if item["task_id"] == task_id
)
client.post(f"/v1/approvals/{approval['approval_id']}/allow_once")
with client.stream(
"POST",
f"/v1/tasks/{task_id}/continue/stream",
json={"approval_id": approval["approval_id"]},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert "event: tool_password_requested" in body
assert "waiting_for_password" in body
assert "password" in body.lower()
def test_password_stream_runs_sudo_with_password_and_streams_answer(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
calls = []
class Completed:
returncode = 0
stdout = "sudo updated\n"
stderr = ""
def fake_run(command, **kwargs):
calls.append((command, kwargs))
return Completed()
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
if role == "action":
actions = []
if not any("tool_observations" in message["content"] for message in messages):
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "sudo apt update"},
"reason": "Check updates with root privileges",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "run sudo command",
"risk_level": "medium",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
raise AssertionError("non-stream thinker should not be used")
async def fake_stream_chat(self, role, messages):
assert role == "thinker"
observation = next(message for message in messages if "tool_observations" in message["content"])
assert "sudo updated" in observation["content"]
assert "secret" not in observation["content"]
yield {"type": "content_delta", "delta": "sudo command completed"}
monkeypatch.setattr("duck_core.tools.shell_exec_safe.subprocess.run", fake_run)
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "run sudo update", "workspace": str(tmp_path), "debug": True},
) as response:
initial_body = "".join(response.iter_text())
task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1)
approval = next(
item for item in client.get("/v1/approvals/pending").json()
if item["task_id"] == task_id
)
client.post(f"/v1/approvals/{approval['approval_id']}/allow_once")
with client.stream(
"POST",
f"/v1/tasks/{task_id}/continue/stream",
json={"approval_id": approval["approval_id"]},
) as response:
_ = "".join(response.iter_text())
with client.stream(
"POST",
f"/v1/tasks/{task_id}/password/stream",
json={"approval_id": approval["approval_id"], "password": "secret"},
) as response:
body = "".join(response.iter_text())
assert response.status_code == 200
assert calls[0][1]["input"] == "secret\n"
assert "event: tool_call_finished" in body
assert "event: content_delta" in body
assert "sudo command completed" in body
assert "secret" not in body
def test_command_audit_endpoint_exposes_redacted_shell_events(tmp_path, monkeypatch):
monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3"))
async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None):
assert role == "action"
actions = []
if not any("tool_observations" in message["content"] for message in messages):
actions = [
{
"tool": "shell_exec_safe",
"args": {"command": "apt list --upgradable"},
"reason": "Check available updates",
}
]
return ModelResponse(
role=role,
model="local-main",
content=json.dumps(
{
"kind": "action_directive",
"intent": "check updates",
"risk_level": "low",
"actions": actions,
}
),
reasoning_content=None,
raw={},
latency_ms=1.0,
)
async def fake_stream_chat(self, role, messages):
yield {"type": "content_delta", "delta": "updates checked"}
monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat)
monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat)
client = TestClient(create_app())
with client.stream(
"POST",
"/v1/chat/stream",
json={"message": "check updates", "workspace": str(tmp_path), "debug": True},
) as response:
_ = "".join(response.iter_text())
audit = client.get("/v1/audit/commands").json()
assert response.status_code == 200
assert audit[0]["event_type"] == "command_audit"
assert audit[0]["payload"]["command"] == "apt list --upgradable"
assert audit[0]["payload"]["risk_level"] == "low"
assert audit[0]["payload"]["action_type"] == "package_check"
assert audit[0]["payload"]["approved"] is False
assert "password" not in json.dumps(audit).lower()