From 1291281d7e10d533a9c64302d43b68f88f1e3032 Mon Sep 17 00:00:00 2001 From: mirivlad Date: Fri, 22 May 2026 21:13:58 +0800 Subject: [PATCH] Stabilize live chat tool loop --- .env.example | 1 + CURRENT_STATE.md | 12 +++-- docs/how_to_run.md | 4 ++ docs/web_api.md | 3 ++ duck_core/api.py | 34 +++++++++---- duck_core/config.py | 2 + duck_core/runtime_loop.py | 68 ++++++++++++++++++++++--- tests/smoke/test_api_stream_chat.py | 1 + tests/smoke/test_runtime_tools.py | 77 +++++++++++++++++++++++++++++ 9 files changed, 182 insertions(+), 20 deletions(-) diff --git a/.env.example b/.env.example index 2d3a9b1..31095b0 100644 --- a/.env.example +++ b/.env.example @@ -21,6 +21,7 @@ DUCK_MAX_INPUT_TOKENS=49152 DUCK_MAX_RECENT_EVENTS_TOKENS=12000 DUCK_MAX_MEMORY_TOKENS=8000 DUCK_MAX_SKILL_TOKENS=6000 +DUCK_ENABLE_REFLECTION=0 QDRANT_URL=http://127.0.0.1:6333 diff --git a/CURRENT_STATE.md b/CURRENT_STATE.md index dcdefb5..0258725 100644 --- a/CURRENT_STATE.md +++ b/CURRENT_STATE.md @@ -48,9 +48,11 @@ WebChat доступен через FastAPI на `http://127.0.0.1:8000/`. - MemoryStore в SQLite. - MemoryPolicy через LLM role `memory_policy` с fallback в безопасный no-store режим. - Structured JSON validation для `action` и `memory_policy`: невалидный JSON/schema violation не запускает tools и уходит в безопасный fallback. +- Tool observations компактируются перед повторной подачей в model context; полные outputs остаются в event/audit log. +- Duplicate tool actions в рамках одной задачи пропускаются, чтобы модель не выполняла одну и ту же команду повторно. - VectorMemory adapter для Qdrant с локальной embedding-моделью или remote embeddings endpoint. - Recall-фильтрация памяти через `recall` role. -- Reflection через `critic` role. +- Reflection через `critic` role доступна, но выключена по умолчанию в API (`DUCK_ENABLE_REFLECTION=0`), чтобы не забивать single-slot llama во время интерактивного чата. - ExperienceRecorder и skill update proposals. - Scripts для llama-server, verification и benchmark. - Docker compose для Qdrant. @@ -69,6 +71,8 @@ WebChat доступен через FastAPI на `http://127.0.0.1:8000/`. - Skill candidate selection теперь используется в обычном и streaming chat. - `scripts/duck.sh status --probe` и `scripts/duck-mtp.sh status --probe` показывают live-состояние DuckLM runtime, model backend и vector memory. - Structured utility-outputs валидируются локально по JSON schema; это защищает tool loop и memory writes от мусора модели. +- Live E2E выявил и исправил два runtime-дефекта: большие stdout больше не раздувают следующий planning prompt, повторяющиеся identical actions больше не исполняются повторно. +- Critic reflection по умолчанию выключена для API/WebChat, потому что на одном `llama-server --parallel 1` она конкурировала с пользовательскими запросами и вызывала timeouts. ## Соответствие этапам из Ducklm.md @@ -88,6 +92,7 @@ WebChat доступен через FastAPI на `http://127.0.0.1:8000/`. ## Остаточные ограничения - Qdrant и локальная embedding-модель должны быть доступны отдельно; при ошибках vector memory деградирует без падения runtime. +- `DUCK_ENABLE_REFLECTION=1` включает critic reflection после задач, но для single-slot llama это может заметно тормозить последующие запросы. - Token speed считается приближённо по текущему estimator, а не по tokenizer конкретной модели. - Skill selection сейчас keyword-based. LLM selection можно добавить позже, если понадобится. - WebChat остаётся lightweight vanilla JS UI; это не production frontend framework. @@ -151,5 +156,6 @@ bash scripts/duck-mtp.sh logs --follow 1. Пройти live E2E checklist в WebChat на реальной модели. 2. Вынести runtime/model role routing в явный конфиг с fallback-политикой, оставив Qwen основным backend для всех ролей. 3. Расширить strict validation/fallback на `recall` и будущие structured utility-roles. -4. При необходимости заменить keyword skill selection на LLM-based selection. -5. Позже мигрировать FastAPI startup на lifespan. +4. Добавить WebChat runtime/status panel поверх `/v1/status?probe=true`. +5. При необходимости заменить keyword skill selection на LLM-based selection. +6. Позже мигрировать FastAPI startup на lifespan. diff --git a/docs/how_to_run.md b/docs/how_to_run.md index 904939e..61bb29f 100644 --- a/docs/how_to_run.md +++ b/docs/how_to_run.md @@ -16,6 +16,10 @@ cp .env.example .env The default `DUCK_MAIN_MODEL_PATH` points to `./models/Qwen3.6/nonMTP/Qwen3.6-35B-A3B-UD-Q4_K_M.gguf`. +`DUCK_ENABLE_REFLECTION=0` is the recommended default for the local single-slot +stack. Set it to `1` only when you explicitly want critic reflection after each +chat and accept that it can slow down the next request. + 3. Start DuckLM: ```bash diff --git a/docs/web_api.md b/docs/web_api.md index 8c0586a..e154db6 100644 --- a/docs/web_api.md +++ b/docs/web_api.md @@ -51,6 +51,9 @@ Structured utility roles are validated locally before side effects: - `action` output must match `duck_core/schemas/action_directive.schema.json`; invalid directives are logged as `action_directive_failed` and no tool runs. - `memory_policy` output must match its JSON schema; invalid decisions fall back to `should_store=false`. +- Large tool outputs are compacted before being sent back to the model; full outputs remain in task events and command audit. +- Duplicate identical tool actions in one task are skipped and logged as `tool_call_skipped`. +- Critic reflection is controlled by `DUCK_ENABLE_REFLECTION`; the default is off for responsive single-slot local chat. Chat requests accept optional `reasoning`: diff --git a/duck_core/api.py b/duck_core/api.py index 7a70ff5..a2f4296 100644 --- a/duck_core/api.py +++ b/duck_core/api.py @@ -261,6 +261,7 @@ def create_app() -> FastAPI: memory_records=memory_records, skill_summary=await selected_skill_summary(body.message), reasoning=body.reasoning, + reflect=bool(settings.enable_reflection), ) await conversations.add_message( conversation.conversation_id, @@ -454,8 +455,7 @@ def create_app() -> FastAPI: *messages, { "role": "user", - "content": "tool_observations:\n" - + json.dumps(tool_observations, ensure_ascii=False, indent=2), + "content": runtime.format_tool_observations_for_model(tool_observations), }, ] yield runtime_status( @@ -531,7 +531,13 @@ def create_app() -> FastAPI: "generation_stats": generation_stats.summary(), }, ) - asyncio.create_task(runtime.complete_postprocessing(task.task_id, content)) + asyncio.create_task( + runtime.complete_postprocessing( + task.task_id, + content, + reflect=bool(settings.enable_reflection), + ) + ) yield sse( "done", { @@ -707,8 +713,7 @@ def create_app() -> FastAPI: *messages, { "role": "user", - "content": "tool_observations:\n" - + json.dumps(tool_observations, ensure_ascii=False, indent=2), + "content": runtime.format_tool_observations_for_model(tool_observations), }, ] yield runtime_status(task_id, "answering", "Формирую ответ...") @@ -763,7 +768,13 @@ def create_app() -> FastAPI: "generation_stats": generation_stats.summary(), }, ) - asyncio.create_task(runtime.complete_postprocessing(task_id, content)) + asyncio.create_task( + runtime.complete_postprocessing( + task_id, + content, + reflect=bool(settings.enable_reflection), + ) + ) yield sse( "done", { @@ -878,8 +889,7 @@ def create_app() -> FastAPI: *messages, { "role": "user", - "content": "tool_observations:\n" - + json.dumps(tool_observations, ensure_ascii=False, indent=2), + "content": runtime.format_tool_observations_for_model(tool_observations), }, ] yield runtime_status(task_id, "answering", "Формирую ответ...") @@ -934,7 +944,13 @@ def create_app() -> FastAPI: "generation_stats": generation_stats.summary(), }, ) - asyncio.create_task(runtime.complete_postprocessing(task_id, content)) + asyncio.create_task( + runtime.complete_postprocessing( + task_id, + content, + reflect=bool(settings.enable_reflection), + ) + ) yield sse( "done", { diff --git a/duck_core/config.py b/duck_core/config.py index 0bb0e0e..15db249 100644 --- a/duck_core/config.py +++ b/duck_core/config.py @@ -23,6 +23,7 @@ class Settings: max_memory_tokens: int = 8000 max_skill_tokens: int = 6000 qdrant_url: str = "http://127.0.0.1:6333" + enable_reflection: int = 0 skip_live_llm_tests: int = 0 @property @@ -52,5 +53,6 @@ def get_settings() -> Settings: max_memory_tokens=int(os.getenv("DUCK_MAX_MEMORY_TOKENS", "8000")), max_skill_tokens=int(os.getenv("DUCK_MAX_SKILL_TOKENS", "6000")), qdrant_url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"), + enable_reflection=int(os.getenv("DUCK_ENABLE_REFLECTION", "0")), skip_live_llm_tests=int(os.getenv("DUCK_SKIP_LIVE_LLM_TESTS", "0")), ) diff --git a/duck_core/runtime_loop.py b/duck_core/runtime_loop.py index 752f103..f2d8541 100644 --- a/duck_core/runtime_loop.py +++ b/duck_core/runtime_loop.py @@ -19,6 +19,7 @@ from duck_core.tools.gateway import ToolGateway logger = logging.getLogger(__name__) ACTION_DIRECTIVE_SCHEMA = load_json_schema("duck_core/schemas/action_directive.schema.json") +MAX_TOOL_OBSERVATION_TEXT_CHARS = 2000 @dataclass @@ -96,8 +97,7 @@ class RuntimeLoop: *messages, { "role": "user", - "content": "tool_observations:\n" - + json.dumps(tool_observations, ensure_ascii=False, indent=2), + "content": self.format_tool_observations_for_model(tool_observations), }, ] await self.event_store.append( @@ -232,8 +232,7 @@ class RuntimeLoop: *messages, { "role": "user", - "content": "tool_observations:\n" - + json.dumps(tool_observations, ensure_ascii=False, indent=2), + "content": self.format_tool_observations_for_model(tool_observations), }, ] await self.event_store.append(task_id, "model_call_started", {"role": "thinker"}) @@ -394,9 +393,13 @@ class RuntimeLoop: "tool": tool_name, "reason": action.get("reason"), "decision": decision, + "action": action, "result": result_payload, } + def _action_key(self, action: dict[str, Any]) -> str: + return json.dumps(action, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + async def _approval_action_index(self, task_id: str, action: dict[str, Any]) -> int: events = await self.event_store.list_events(task_id) for event in reversed(events): @@ -413,6 +416,7 @@ class RuntimeLoop: messages: list[dict[str, str]], workspace: str | None, start_index: int = 1, + seen_action_keys: set[str] | None = None, ) -> list[dict[str, Any]]: try: await self.event_store.append(task_id, "model_call_started", {"role": "action"}) @@ -443,6 +447,21 @@ class RuntimeLoop: {"index": index, "ok": False, "error": "Action must be an object"} ) continue + action_key = self._action_key(action) + if seen_action_keys is not None and action_key in seen_action_keys: + await self.event_store.append( + task_id, + "tool_call_skipped", + { + "index": index, + "tool": str(action.get("tool", "")), + "reason": "duplicate_action", + "action": action, + }, + ) + continue + if seen_action_keys is not None: + seen_action_keys.add(action_key) tool_name = str(action.get("tool", "")) await self.event_store.append( task_id, @@ -478,6 +497,7 @@ class RuntimeLoop: "index": index, "tool": tool_name, "reason": action.get("reason"), + "action": action, "requires_approval": True, "approval_id": approval.approval_id if approval else None, "result": result_payload, @@ -494,11 +514,39 @@ class RuntimeLoop: "index": index, "tool": tool_name, "reason": action.get("reason"), + "action": action, "result": result_payload, } ) return observations + def format_tool_observations_for_model(self, observations: list[dict[str, Any]]) -> str: + return "tool_observations:\n" + json.dumps( + self.compact_tool_observations_for_model(observations), + ensure_ascii=False, + indent=2, + ) + + def compact_tool_observations_for_model( + self, observations: list[dict[str, Any]] + ) -> list[dict[str, Any]]: + return [self._compact_observation_for_model(observation) for observation in observations] + + def _compact_observation_for_model(self, value: Any) -> Any: + if isinstance(value, dict): + return {key: self._compact_observation_for_model(item) for key, item in value.items()} + if isinstance(value, list): + return [self._compact_observation_for_model(item) for item in value] + if isinstance(value, str) and len(value) > MAX_TOOL_OBSERVATION_TEXT_CHARS: + half = MAX_TOOL_OBSERVATION_TEXT_CHARS // 2 + omitted = len(value) - MAX_TOOL_OBSERVATION_TEXT_CHARS + return ( + value[:half] + + f"\n... [truncated {omitted} chars for model context] ...\n" + + value[-half:] + ) + return value + async def _append_command_audit( self, task_id: str, @@ -538,14 +586,18 @@ class RuntimeLoop: initial_observations: list[dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: all_observations = list(initial_observations or []) + seen_action_keys = { + self._action_key(observation["action"]) + for observation in all_observations + if isinstance(observation.get("action"), dict) + } current_messages = messages if all_observations: current_messages = [ *messages, { "role": "user", - "content": "tool_observations:\n" - + json.dumps(all_observations, ensure_ascii=False, indent=2), + "content": self.format_tool_observations_for_model(all_observations), }, ] for _ in range(self.max_tool_iterations): @@ -554,6 +606,7 @@ class RuntimeLoop: current_messages, workspace, start_index=len(all_observations) + 1, + seen_action_keys=seen_action_keys, ) if not observations: return all_observations @@ -564,8 +617,7 @@ class RuntimeLoop: *messages, { "role": "user", - "content": "tool_observations:\n" - + json.dumps(all_observations, ensure_ascii=False, indent=2), + "content": self.format_tool_observations_for_model(all_observations), }, ] await self.event_store.append( diff --git a/tests/smoke/test_api_stream_chat.py b/tests/smoke/test_api_stream_chat.py index 911c9d4..833d58d 100644 --- a/tests/smoke/test_api_stream_chat.py +++ b/tests/smoke/test_api_stream_chat.py @@ -113,6 +113,7 @@ def test_stream_chat_forwards_reasoning_toggle_to_thinker(tmp_path, monkeypatch) def test_stream_chat_runs_memory_policy_and_reflection_after_completion(tmp_path, monkeypatch): monkeypatch.setenv("DUCK_DB_PATH", str(tmp_path / "duck.sqlite3")) + monkeypatch.setenv("DUCK_ENABLE_REFLECTION", "1") async def fake_chat(self, role, messages, temperature=None, max_output_tokens=None, response_format=None): if role == "action": diff --git a/tests/smoke/test_runtime_tools.py b/tests/smoke/test_runtime_tools.py index 7297278..06df179 100644 --- a/tests/smoke/test_runtime_tools.py +++ b/tests/smoke/test_runtime_tools.py @@ -176,6 +176,41 @@ class FakeMalformedActionModelClient: ) +class FakeRepeatingActionModelClient: + async def chat(self, role, messages): + if role == "action": + return ModelResponse( + role=role, + model="local-main", + content=json.dumps( + { + "kind": "action_directive", + "intent": "repeat same action", + "risk_level": "low", + "actions": [ + { + "tool": "file_read", + "args": {"path": "note.txt"}, + "reason": "Read requested file", + } + ], + } + ), + reasoning_content=None, + raw={}, + latency_ms=5.0, + ) + assert role == "thinker" + return ModelResponse( + role=role, + model="local-main", + content="Final answer from first observation.", + reasoning_content=None, + raw={}, + latency_ms=12.0, + ) + + @pytest.mark.asyncio async def test_runtime_executes_action_directive_tool_and_finishes_with_observation(tmp_path): (tmp_path / "note.txt").write_text("hello from tool") @@ -253,6 +288,48 @@ async def test_runtime_rejects_malformed_action_directive_before_tools(tmp_path) assert not any(event.event_type == "tool_call_started" for event in events) +def test_runtime_compacts_large_tool_observations_for_model_context(tmp_path): + db_path = str(tmp_path / "duck.sqlite3") + task_store = TaskStore(db_path) + event_store = EventStore(db_path) + loop = RuntimeLoop(task_store, event_store, FakeToolModelClient()) + + compact = loop.format_tool_observations_for_model([ + { + "tool": "shell_exec_safe", + "result": { + "ok": True, + "output": "A" * 2500 + "KEEP_TAIL", + "metadata": {"command": "ls /tmp"}, + }, + } + ]) + + assert "tool_observations" in compact + assert "truncated" in compact + assert "KEEP_TAIL" in compact + assert len(compact) < 2300 + + +@pytest.mark.asyncio +async def test_runtime_skips_duplicate_action_within_same_task(tmp_path): + (tmp_path / "note.txt").write_text("hello once") + db_path = str(tmp_path / "duck.sqlite3") + task_store = TaskStore(db_path) + event_store = EventStore(db_path) + loop = RuntimeLoop(task_store, event_store, FakeRepeatingActionModelClient()) + + result = await loop.run_chat("read note.txt", str(tmp_path), debug=True) + events = await event_store.list_events(result.task_id) + finished_tools = [event for event in events if event.event_type == "tool_call_finished"] + skipped_tools = [event for event in events if event.event_type == "tool_call_skipped"] + + assert result.status == "completed" + assert len(finished_tools) == 1 + assert len(skipped_tools) == 1 + assert skipped_tools[0].payload["reason"] == "duplicate_action" + + class FakeApprovalModelClient: async def chat(self, role, messages): if role == "action":