From eef010f227ff868baac4ffc2c4a0a60e7fe84c1e Mon Sep 17 00:00:00 2001 From: mirivlad Date: Fri, 22 May 2026 07:48:01 +0800 Subject: [PATCH] Validate structured utility outputs --- CURRENT_STATE.md | 4 ++- docs/web_api.md | 5 +++ duck_core/memory/policy.py | 28 ++++++++--------- duck_core/runtime_loop.py | 8 ++++- duck_core/structured_output.py | 38 +++++++++++++++++++++++ tests/smoke/test_memory_policy.py | 24 +++++++++++++++ tests/smoke/test_runtime_tools.py | 51 +++++++++++++++++++++++++++++++ 7 files changed, 141 insertions(+), 17 deletions(-) create mode 100644 duck_core/structured_output.py diff --git a/CURRENT_STATE.md b/CURRENT_STATE.md index b12e803..dcdefb5 100644 --- a/CURRENT_STATE.md +++ b/CURRENT_STATE.md @@ -47,6 +47,7 @@ WebChat доступен через FastAPI на `http://127.0.0.1:8000/`. - Автоматический выбор candidate skill по ключевым словам и добавление skill summary в context. - MemoryStore в SQLite. - MemoryPolicy через LLM role `memory_policy` с fallback в безопасный no-store режим. +- Structured JSON validation для `action` и `memory_policy`: невалидный JSON/schema violation не запускает tools и уходит в безопасный fallback. - VectorMemory adapter для Qdrant с локальной embedding-моделью или remote embeddings endpoint. - Recall-фильтрация памяти через `recall` role. - Reflection через `critic` role. @@ -67,6 +68,7 @@ WebChat доступен через FastAPI на `http://127.0.0.1:8000/`. - experience records - Skill candidate selection теперь используется в обычном и streaming chat. - `scripts/duck.sh status --probe` и `scripts/duck-mtp.sh status --probe` показывают live-состояние DuckLM runtime, model backend и vector memory. +- Structured utility-outputs валидируются локально по JSON schema; это защищает tool loop и memory writes от мусора модели. ## Соответствие этапам из Ducklm.md @@ -148,6 +150,6 @@ bash scripts/duck-mtp.sh logs --follow 1. Пройти live E2E checklist в WebChat на реальной модели. 2. Вынести runtime/model role routing в явный конфиг с fallback-политикой, оставив Qwen основным backend для всех ролей. -3. Добавить строгую JSON validation/fallback для structured utility-ролей. +3. Расширить strict validation/fallback на `recall` и будущие structured utility-roles. 4. При необходимости заменить keyword skill selection на LLM-based selection. 5. Позже мигрировать FastAPI startup на lifespan. diff --git a/docs/web_api.md b/docs/web_api.md index e21ca28..8c0586a 100644 --- a/docs/web_api.md +++ b/docs/web_api.md @@ -47,6 +47,11 @@ GET /v1/memory/search?q=... Use `GET /v1/status?probe=true` to also call the model backend and Qdrant. +Structured utility roles are validated locally before side effects: + +- `action` output must match `duck_core/schemas/action_directive.schema.json`; invalid directives are logged as `action_directive_failed` and no tool runs. +- `memory_policy` output must match its JSON schema; invalid decisions fall back to `should_store=false`. + Chat requests accept optional `reasoning`: ```json diff --git a/duck_core/memory/policy.py b/duck_core/memory/policy.py index 407523b..fbb8780 100644 --- a/duck_core/memory/policy.py +++ b/duck_core/memory/policy.py @@ -1,11 +1,12 @@ from __future__ import annotations -import json import logging from typing import Any from pydantic import BaseModel +from duck_core.structured_output import load_json_object, validate_json_object + logger = logging.getLogger(__name__) @@ -124,26 +125,23 @@ class MemoryPolicy: def _parse_response(self, content: str, summary: str, task_id: str) -> MemoryDecision: try: - data = json.loads(content) - except (json.JSONDecodeError, TypeError): + data = validate_json_object( + load_json_object(content, "memory policy"), + self._RESPONSE_SCHEMA, + "memory policy", + ) + except ValueError as exc: logger.warning("MemoryPolicy: invalid JSON for %s: %s", task_id, content[:200]) return MemoryDecision( should_store=False, memory_type="event", summary=summary, importance=0.0, - metadata={"task_id": task_id, "source": "llm_policy_fallback"}, - ) - - required = ("should_store", "memory_type", "summary", "importance", "scope") - if not all(key in data for key in required): - logger.warning("MemoryPolicy: missing fields for %s: %s", task_id, list(data.keys())) - return MemoryDecision( - should_store=False, - memory_type="event", - summary=summary, - importance=0.0, - metadata={"task_id": task_id, "source": "llm_policy_fallback"}, + metadata={ + "task_id": task_id, + "source": "llm_policy_fallback", + "error": str(exc), + }, ) return MemoryDecision( diff --git a/duck_core/runtime_loop.py b/duck_core/runtime_loop.py index b448eee..752f103 100644 --- a/duck_core/runtime_loop.py +++ b/duck_core/runtime_loop.py @@ -12,11 +12,13 @@ from duck_core.memory.store import MemoryStore from duck_core.memory.vector_memory import VectorMemory from duck_core.model_client import ModelClient, ReasoningMode from duck_core.reflection import Reflection +from duck_core.structured_output import load_json_object, load_json_schema, validate_json_object from duck_core.tasks.store import TaskStore from duck_core.tools.base import ToolResult from duck_core.tools.gateway import ToolGateway logger = logging.getLogger(__name__) +ACTION_DIRECTIVE_SCHEMA = load_json_schema("duck_core/schemas/action_directive.schema.json") @dataclass @@ -415,7 +417,11 @@ class RuntimeLoop: try: await self.event_store.append(task_id, "model_call_started", {"role": "action"}) response = await self.model_client.chat("action", messages) - directive = json.loads(response.content) + directive = validate_json_object( + load_json_object(response.content, "action directive"), + ACTION_DIRECTIVE_SCHEMA, + "action directive", + ) except Exception as exc: await self.event_store.append( task_id, diff --git a/duck_core/structured_output.py b/duck_core/structured_output.py new file mode 100644 index 0000000..2ef85cf --- /dev/null +++ b/duck_core/structured_output.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from jsonschema import Draft202012Validator + + +class StructuredOutputError(ValueError): + pass + + +def load_json_object(content: str, label: str) -> dict[str, Any]: + try: + data = json.loads(content) + except (json.JSONDecodeError, TypeError) as exc: + raise StructuredOutputError(f"{label}: invalid JSON: {exc}") from exc + if not isinstance(data, dict): + raise StructuredOutputError(f"{label}: expected JSON object") + return data + + +def load_json_schema(path: str | Path) -> dict[str, Any]: + return json.loads(Path(path).read_text()) + + +def validate_json_object( + data: dict[str, Any], + schema: dict[str, Any], + label: str, +) -> dict[str, Any]: + errors = sorted(Draft202012Validator(schema).iter_errors(data), key=lambda error: error.path) + if errors: + first = errors[0] + location = ".".join(str(part) for part in first.absolute_path) or "" + raise StructuredOutputError(f"{label}: schema violation at {location}: {first.message}") + return data diff --git a/tests/smoke/test_memory_policy.py b/tests/smoke/test_memory_policy.py index a630d1a..46ec2c5 100644 --- a/tests/smoke/test_memory_policy.py +++ b/tests/smoke/test_memory_policy.py @@ -121,3 +121,27 @@ async def test_llm_memory_policy_missing_fields_falls_back(mock_model_client): decision = await policy.classify("some summary", "task_y") assert decision.should_store is False assert decision.metadata["source"] == "llm_policy_fallback" + + +@pytest.mark.asyncio +async def test_llm_memory_policy_schema_violation_falls_back(mock_model_client): + mock_model_client.chat.return_value = ModelResponse( + role="critic", + model="local-main", + content=json.dumps({ + "should_store": True, + "memory_type": "secret", + "summary": "Store this invalid memory type.", + "importance": 1.5, + "scope": "everywhere", + "metadata": {}, + }), + reasoning_content=None, + raw={}, + latency_ms=10.0, + ) + policy = MemoryPolicy(model_client=mock_model_client) + decision = await policy.classify("some summary", "task_z") + assert decision.should_store is False + assert decision.metadata["source"] == "llm_policy_fallback" + assert "schema violation" in decision.metadata["error"] diff --git a/tests/smoke/test_runtime_tools.py b/tests/smoke/test_runtime_tools.py index d95c68a..7297278 100644 --- a/tests/smoke/test_runtime_tools.py +++ b/tests/smoke/test_runtime_tools.py @@ -142,6 +142,40 @@ class FakeUpdateCheckModelClient: ) +class FakeMalformedActionModelClient: + async def chat(self, role, messages): + if role == "action": + return ModelResponse( + role=role, + model="local-main", + content=json.dumps( + { + "kind": "action_directive", + "intent": "broken action", + "risk_level": "low", + "actions": [ + { + "tool": "file_read", + "reason": "Missing args must fail schema validation", + } + ], + } + ), + reasoning_content=None, + raw={}, + latency_ms=5.0, + ) + assert role == "thinker" + return ModelResponse( + role=role, + model="local-main", + content="Answered without tool execution.", + reasoning_content=None, + raw={}, + latency_ms=12.0, + ) + + @pytest.mark.asyncio async def test_runtime_executes_action_directive_tool_and_finishes_with_observation(tmp_path): (tmp_path / "note.txt").write_text("hello from tool") @@ -202,6 +236,23 @@ async def test_runtime_checks_system_updates_without_approval_loop(tmp_path): ) +@pytest.mark.asyncio +async def test_runtime_rejects_malformed_action_directive_before_tools(tmp_path): + (tmp_path / "note.txt").write_text("hello") + db_path = str(tmp_path / "duck.sqlite3") + task_store = TaskStore(db_path) + event_store = EventStore(db_path) + loop = RuntimeLoop(task_store, event_store, FakeMalformedActionModelClient()) + + result = await loop.run_chat("read note.txt", str(tmp_path), debug=True) + events = await event_store.list_events(result.task_id) + failed = next(event for event in events if event.event_type == "action_directive_failed") + + assert result.status == "completed" + assert "schema violation" in failed.payload["error"] + assert not any(event.event_type == "tool_call_started" for event in events) + + class FakeApprovalModelClient: async def chat(self, role, messages): if role == "action":