from fastapi.testclient import TestClient import json import re from duck_core.model_client import ModelResponse from duck_core.api import create_app def test_stream_chat_endpoint_emits_sse_reasoning_and_content(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) async def fake_chat(self, role, messages): return ModelResponse( role=role, model="local-main", content=json.dumps( { "kind": "action_directive", "intent": "answer directly", "risk_level": "none", "actions": [], } ), reasoning_content=None, raw={}, latency_ms=1.0, ) async def fake_stream_chat(self, role, messages): yield {"type": "reasoning_delta", "delta": "thinking"} yield {"type": "content_delta", "delta": "answer"} monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat) monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat) app = create_app() client = TestClient(app) with client.stream( "POST", "/v1/chat/stream", json={"message": "hello", "workspace": "./workspace", "debug": True}, ) as response: body = "".join(response.iter_text()) assert response.status_code == 200 assert "event: reasoning_delta" in body assert "event: content_delta" in body assert "event: done" in body assert '"generation_stats":' in body assert '"min_tokens_per_second":' in body assert '"avg_tokens_per_second":' in body assert '"max_tokens_per_second":' in body assert "thinking" in body assert "answer" in body def test_stream_chat_endpoint_executes_tool_before_streaming_answer(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) (tmp_path / "note.txt").write_text("stream tool content") async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None): assert role == "action" if any("tool_observations" in message["content"] for message in messages): actions = [] else: actions = [ { "tool": "file_read", "args": {"path": "note.txt"}, "reason": "User asked for file contents", } ] return ModelResponse( role=role, model="local-main", content=json.dumps( { "kind": "action_directive", "intent": "read requested file", "risk_level": "low", "actions": actions, } ), reasoning_content=None, raw={}, latency_ms=1.0, ) async def fake_stream_chat(self, role, messages): assert role == "thinker" assert any("tool_observations" in message["content"] for message in messages) yield {"type": "content_delta", "delta": "answer from tool"} monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat) monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat) client = TestClient(create_app()) with client.stream( "POST", "/v1/chat/stream", json={"message": "read note.txt", "workspace": str(tmp_path), "debug": True}, ) as response: body = "".join(response.iter_text()) assert response.status_code == 200 assert 'event: runtime_status\ndata: {"task_id":' in body assert '"stage": "planning"' in body assert body.index('"stage": "planning"') < body.index("event: tool_call_started") assert "event: tool_call_started" in body assert "event: tool_call_finished" in body assert "stream tool content" in body assert "event: content_delta" in body assert "answer from tool" in body assert "event: done" in body def test_stream_chat_requests_approval_for_directory_outside_workspace(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) workspace = tmp_path / "workspace" outside = tmp_path / "outside" workspace.mkdir() outside.mkdir() (outside / "note.txt").write_text("external") async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None): assert role == "action" return ModelResponse( role=role, model="local-main", content=json.dumps( { "kind": "action_directive", "intent": "list external directory", "risk_level": "low", "actions": [ { "tool": "list_dir", "args": {"path": str(outside)}, "reason": "User asked for that directory", } ], } ), reasoning_content=None, raw={}, latency_ms=1.0, ) monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat) client = TestClient(create_app()) with client.stream( "POST", "/v1/chat/stream", json={"message": "show outside", "workspace": str(workspace), "debug": True}, ) as response: body = "".join(response.iter_text()) assert response.status_code == 200 assert "event: tool_approval_requested" in body assert '"tool": "list_dir"' in body assert str(outside) in body assert '"status": "waiting_for_approval"' in body def test_continue_stream_executes_approved_tool_and_streams_answer(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) action_calls = 0 async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None): nonlocal action_calls assert role == "action" action_calls += 1 if any("tool_observations" in message["content"] for message in messages): actions = [] else: actions = [ { "tool": "shell_exec_safe", "args": {"command": "uname -a"}, "reason": "User asked for system information", } ] return ModelResponse( role=role, model="local-main", content=json.dumps( { "kind": "action_directive", "intent": "run command", "risk_level": "medium", "actions": actions, } ), reasoning_content=None, raw={}, latency_ms=1.0, ) async def fake_stream_chat(self, role, messages): assert role == "thinker" observation_message = next(message for message in messages if "tool_observations" in message["content"]) assert "uname" in observation_message["content"] yield {"type": "content_delta", "delta": "continued after approval"} monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat) monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat) client = TestClient(create_app()) with client.stream( "POST", "/v1/chat/stream", json={"message": "run uname", "workspace": str(tmp_path), "debug": True}, ) as response: initial_body = "".join(response.iter_text()) task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1) pending = client.get("/v1/approvals/pending").json() approval = next(item for item in pending if item["task_id"] == task_id) client.post(f"/v1/approvals/{approval['approval_id']}/allow_once") with client.stream( "POST", f"/v1/tasks/{approval['task_id']}/continue/stream", json={"approval_id": approval["approval_id"]}, ) as response: body = "".join(response.iter_text()) assert "event: tool_approval_requested" in initial_body assert response.status_code == 200 assert '"stage": "running_tool"' in body assert body.index('"stage": "running_tool"') < body.index("event: tool_call_finished") assert "event: tool_call_finished" in body assert "event: content_delta" in body assert "continued after approval" in body assert "event: done" in body assert action_calls == 1 conversation_id = re.search(r'"conversation_id"\s*:\s*"([^"]+)"', initial_body).group(1) conversation = client.get(f"/v1/conversations/{conversation_id}").json() assert conversation["messages"][-1]["content"] == "continued after approval" assert conversation["messages"][-1]["status"] == "completed" def test_continue_stream_requests_password_for_approved_sudo(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None): assert role == "action" actions = [] if not any("tool_observations" in message["content"] for message in messages): actions = [ { "tool": "shell_exec_safe", "args": {"command": "sudo apt update"}, "reason": "Check updates with root privileges", } ] return ModelResponse( role=role, model="local-main", content=json.dumps( { "kind": "action_directive", "intent": "run sudo command", "risk_level": "medium", "actions": actions, } ), reasoning_content=None, raw={}, latency_ms=1.0, ) monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat) client = TestClient(create_app()) with client.stream( "POST", "/v1/chat/stream", json={"message": "run sudo update", "workspace": str(tmp_path), "debug": True}, ) as response: initial_body = "".join(response.iter_text()) task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1) approval = next( item for item in client.get("/v1/approvals/pending").json() if item["task_id"] == task_id ) client.post(f"/v1/approvals/{approval['approval_id']}/allow_once") with client.stream( "POST", f"/v1/tasks/{task_id}/continue/stream", json={"approval_id": approval["approval_id"]}, ) as response: body = "".join(response.iter_text()) assert response.status_code == 200 assert "event: tool_password_requested" in body assert "waiting_for_password" in body assert "password" in body.lower() def test_password_stream_runs_sudo_with_password_and_streams_answer(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) calls = [] class Completed: returncode = 0 stdout = "sudo updated\n" stderr = "" def fake_run(command, **kwargs): calls.append((command, kwargs)) return Completed() async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None): if role == "action": actions = [] if not any("tool_observations" in message["content"] for message in messages): actions = [ { "tool": "shell_exec_safe", "args": {"command": "sudo apt update"}, "reason": "Check updates with root privileges", } ] return ModelResponse( role=role, model="local-main", content=json.dumps( { "kind": "action_directive", "intent": "run sudo command", "risk_level": "medium", "actions": actions, } ), reasoning_content=None, raw={}, latency_ms=1.0, ) raise AssertionError("non-stream thinker should not be used") async def fake_stream_chat(self, role, messages): assert role == "thinker" observation = next(message for message in messages if "tool_observations" in message["content"]) assert "sudo updated" in observation["content"] assert "secret" not in observation["content"] yield {"type": "content_delta", "delta": "sudo command completed"} monkeypatch.setattr("duck_core.tools.shell_exec_safe.subprocess.run", fake_run) monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat) monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat) client = TestClient(create_app()) with client.stream( "POST", "/v1/chat/stream", json={"message": "run sudo update", "workspace": str(tmp_path), "debug": True}, ) as response: initial_body = "".join(response.iter_text()) task_id = re.search(r'"task_id"\s*:\s*"([^"]+)"', initial_body).group(1) approval = next( item for item in client.get("/v1/approvals/pending").json() if item["task_id"] == task_id ) client.post(f"/v1/approvals/{approval['approval_id']}/allow_once") with client.stream( "POST", f"/v1/tasks/{task_id}/continue/stream", json={"approval_id": approval["approval_id"]}, ) as response: _ = "".join(response.iter_text()) with client.stream( "POST", f"/v1/tasks/{task_id}/password/stream", json={"approval_id": approval["approval_id"], "password": "secret"}, ) as response: body = "".join(response.iter_text()) assert response.status_code == 200 assert calls[0][1]["input"] == "secret\n" assert "event: tool_call_finished" in body assert "event: content_delta" in body assert "sudo command completed" in body assert "secret" not in body def test_command_audit_endpoint_exposes_redacted_shell_events(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None): assert role == "action" actions = [] if not any("tool_observations" in message["content"] for message in messages): actions = [ { "tool": "shell_exec_safe", "args": {"command": "apt list --upgradable"}, "reason": "Check available updates", } ] return ModelResponse( role=role, model="local-main", content=json.dumps( { "kind": "action_directive", "intent": "check updates", "risk_level": "low", "actions": actions, } ), reasoning_content=None, raw={}, latency_ms=1.0, ) async def fake_stream_chat(self, role, messages): yield {"type": "content_delta", "delta": "updates checked"} monkeypatch.setattr("duck_core.model_client.ModelClient.chat", fake_chat) monkeypatch.setattr("duck_core.model_client.ModelClient.stream_chat", fake_stream_chat) client = TestClient(create_app()) with client.stream( "POST", "/v1/chat/stream", json={"message": "check updates", "workspace": str(tmp_path), "debug": True}, ) as response: _ = "".join(response.iter_text()) audit = client.get("/v1/audit/commands").json() assert response.status_code == 200 assert audit[0]["event_type"] == "command_audit" assert audit[0]["payload"]["command"] == "apt list --upgradable" assert audit[0]["payload"]["risk_level"] == "low" assert audit[0]["payload"]["action_type"] == "package_check" assert audit[0]["payload"]["approved"] is False assert "password" not in json.dumps(audit).lower()