import argparse import asyncio import json import os import signal import subprocess import time from dataclasses import dataclass, field from pathlib import Path from statistics import mean from typing import Any import httpx import jsonschema ROOT = Path(__file__).resolve().parents[2] LLAMA_BIN = ROOT / "vendor/llama.cpp/build/bin/llama-server" BASELINE_URL = "http://127.0.0.1:8081/v1" BENCH_PORT = 18091 BENCH_URL = f"http://127.0.0.1:{BENCH_PORT}/v1" RESULTS_DIR = ROOT / "docs/bench" CANDIDATES = [ ROOT / "models/Menlo_Lucy-Q4_K_M.gguf", ROOT / "models/Qwen3.5-9B-GLM5.1-Distill-v1-Q4_K_M.gguf", ROOT / "models/X-Coder-SFT-Qwen3-8B.Q6_K.gguf", ROOT / "models/gemma-4-E4B-it-Q4_K_M.gguf", ] @dataclass class BenchCase: role: str name: str system_prompt: str user_prompt: str max_tokens: int response_format: dict[str, Any] | None = None required_keywords: list[str] = field(default_factory=list) def read(path: str) -> str: return (ROOT / path).read_text() ACTION_SCHEMA = json.loads((ROOT / "duck_core/schemas/action_directive.schema.json").read_text()) MEMORY_SCHEMA = { "type": "object", "required": ["should_store", "memory_type", "summary", "importance", "scope", "metadata"], "additionalProperties": True, "properties": { "should_store": {"type": "boolean"}, "memory_type": {"type": "string"}, "summary": {"type": "string"}, "importance": {"type": "number"}, "scope": {"type": "string"}, "metadata": {"type": "object"}, }, } RECALL_SCHEMA = { "type": "object", "required": ["relevant_ids", "reasoning"], "additionalProperties": True, "properties": { "relevant_ids": {"type": "array", "items": {"type": "string"}}, "reasoning": {"type": "string"}, }, } CASES = [ BenchCase( role="action", name="direct_answer_no_tools", system_prompt=read("prompts/roles/action.md"), user_prompt="User request: Скажи коротко, что такое DuckLM.\nWorkspace: /tmp/duck", max_tokens=180, response_format={ "type": "json_schema", "json_schema": {"name": "action_directive", "schema": ACTION_SCHEMA, "strict": True}, }, ), BenchCase( role="action", name="read_specific_file", system_prompt=read("prompts/roles/action.md"), user_prompt="User request: Прочитай файл CURRENT_STATE.md и кратко скажи статус проекта.\nWorkspace: /home/mirivlad/git/ducklm", max_tokens=220, response_format={ "type": "json_schema", "json_schema": {"name": "action_directive", "schema": ACTION_SCHEMA, "strict": True}, }, ), BenchCase( role="memory_policy", name="store_user_preference", system_prompt=read("prompts/roles/memory_policy.md"), user_prompt="Task ID: task_pref\n\nTranscript:\nПользователь сказал: всегда отвечай мне по-русски и не запускай sudo без отдельного подтверждения.", max_tokens=180, response_format={ "type": "json_schema", "json_schema": {"name": "memory_decision", "schema": MEMORY_SCHEMA, "strict": True}, }, ), BenchCase( role="memory_policy", name="ignore_trivial_tool_call", system_prompt=read("prompts/roles/memory_policy.md"), user_prompt="Task ID: task_tmp\n\nTranscript:\nЗапущен pwd, stdout: /tmp/project. Ответ отправлен пользователю.", max_tokens=160, response_format={ "type": "json_schema", "json_schema": {"name": "memory_decision", "schema": MEMORY_SCHEMA, "strict": True}, }, ), BenchCase( role="recall", name="select_relevant_memory", system_prompt=read("prompts/roles/recall.md"), user_prompt=( "Query: Как пользователь хочет, чтобы я запускал sudo?\n\n" "Memories:\n" "- id: m1 | text: Пользователь просит не запускать sudo без отдельного подтверждения.\n" "- id: m2 | text: Проект использует SQLite для событий.\n" "- id: m3 | text: Пользователь любит краткие ответы о погоде.\n" ), max_tokens=160, response_format={ "type": "json_schema", "json_schema": {"name": "recall_decision", "schema": RECALL_SCHEMA, "strict": True}, }, ), BenchCase( role="summary", name="preserve_decisions", system_prompt=read("prompts/roles/summary.md"), user_prompt=( "Сожми контекст до 3 пунктов. Сохрани решения:\n" "1. WebChat работает на 8000, llama-server на 8081.\n" "2. Для внешних путей нужен approval.\n" "3. allow_forever хранится по normalized action hash.\n" ), max_tokens=180, required_keywords=["8000", "8081", "approval", "allow_forever"], ), BenchCase( role="critic", name="reflection_quality", system_prompt=read("prompts/roles/critic.md"), user_prompt=( "Task transcript:\n" "User asked to fix WebChat. Root cause was API not running. " "We started API manually but had no unified service script. Reflect on risk and reusable lesson." ), max_tokens=220, required_keywords=["risk", "lesson"], ), ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--baseline-url", default=BASELINE_URL) parser.add_argument("--port", type=int, default=BENCH_PORT) parser.add_argument("--models", nargs="*", default=[str(path) for path in CANDIDATES]) parser.add_argument("--threads", type=int, default=max(1, (os.cpu_count() or 8) // 2)) parser.add_argument("--ctx-size", type=int, default=4096) parser.add_argument("--timeout", type=float, default=180.0) parser.add_argument("--skip-cpu", action="store_true") return parser.parse_args() def llama_env() -> dict[str, str]: env = os.environ.copy() bin_dir = str(LLAMA_BIN.parent) env["LD_LIBRARY_PATH"] = f"{bin_dir}{':' + env['LD_LIBRARY_PATH'] if env.get('LD_LIBRARY_PATH') else ''}" return env async def wait_ready(base_url: str, timeout: float) -> None: deadline = time.perf_counter() + timeout async with httpx.AsyncClient(timeout=5.0, trust_env=False) as client: last_error = "" while time.perf_counter() < deadline: try: response = await client.get(f"{base_url}/models") if response.status_code == 200: return last_error = f"HTTP {response.status_code}: {response.text[:120]}" except Exception as exc: last_error = str(exc) await asyncio.sleep(1.0) raise TimeoutError(f"{base_url} not ready: {last_error}") def start_cpu_server(model_path: Path, port: int, threads: int, ctx_size: int) -> subprocess.Popen: log_dir = ROOT / "data/bench" log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / f"{model_path.stem}.log" command = [ str(LLAMA_BIN), "-m", str(model_path), "--alias", "bench-cpu", "--host", "127.0.0.1", "--port", str(port), "-c", str(ctx_size), "--parallel", "1", "-ngl", "0", "--threads", str(threads), "--threads-batch", str(threads), "--reasoning", "off", "--cache-ram", "0", ] handle = log_file.open("a") handle.write("Command: " + " ".join(command) + "\n") handle.flush() process = subprocess.Popen( command, cwd=ROOT, env=llama_env(), stdout=handle, stderr=subprocess.STDOUT, start_new_session=True, ) process._duck_log_handle = handle # type: ignore[attr-defined] return process def stop_process(process: subprocess.Popen | None) -> None: if process is None: return if process.poll() is None: os.killpg(process.pid, signal.SIGTERM) try: process.wait(timeout=15) except subprocess.TimeoutExpired: os.killpg(process.pid, signal.SIGKILL) process.wait(timeout=10) handle = getattr(process, "_duck_log_handle", None) if handle: handle.close() def safe_json(content: str) -> tuple[dict[str, Any] | None, str | None]: text = content.strip() if text.startswith("```"): text = text.strip("`") text = text.removeprefix("json").strip() try: return json.loads(text), None except json.JSONDecodeError as exc: return None, str(exc) def score_case(case: BenchCase, content: str) -> tuple[float, str]: data = None if case.response_format: data, error = safe_json(content) if data is None: return 0.0, f"invalid_json: {error}" schema = case.response_format["json_schema"]["schema"] try: jsonschema.validate(data, schema) except jsonschema.ValidationError as exc: return 0.2, f"schema_error: {exc.message}" if case.role == "action" and data is not None: actions = data.get("actions") or [] if case.name == "direct_answer_no_tools": return (1.0, "ok") if actions == [] else (0.3, f"unexpected_actions={actions}") if case.name == "read_specific_file": if actions and actions[0].get("tool") == "file_read" and actions[0].get("args", {}).get("path") == "CURRENT_STATE.md": return 1.0, "ok" return 0.4, f"wrong_action={actions}" if case.role == "memory_policy" and data is not None: if case.name == "store_user_preference": ok = data.get("should_store") is True and data.get("memory_type") == "preference" and data.get("scope") == "global" return (1.0, "ok") if ok else (0.4, f"wrong_memory_decision={data}") if case.name == "ignore_trivial_tool_call": ok = data.get("should_store") is False return (1.0, "ok") if ok else (0.3, f"stored_trivial={data}") if case.role == "recall" and data is not None: ids = set(data.get("relevant_ids") or []) if "m1" in ids and "m2" not in ids and "m3" not in ids: return 1.0, "ok" return 0.3, f"wrong_ids={sorted(ids)}" lowered = content.lower() missing = [word for word in case.required_keywords if word.lower() not in lowered] if missing: return max(0.2, 1.0 - 0.2 * len(missing)), f"missing={missing}" return 1.0, "ok" async def run_case(base_url: str, model: str, case: BenchCase, timeout: float) -> dict[str, Any]: payload: dict[str, Any] = { "model": model, "messages": [ {"role": "system", "content": case.system_prompt}, {"role": "user", "content": case.user_prompt}, ], "temperature": 0.0, "max_tokens": case.max_tokens, } if case.response_format: payload["response_format"] = case.response_format started = time.perf_counter() async with httpx.AsyncClient(timeout=timeout, trust_env=False) as client: response = await client.post(f"{base_url}/chat/completions", json=payload) elapsed = time.perf_counter() - started response.raise_for_status() raw = response.json() message = raw.get("choices", [{}])[0].get("message", {}) content = message.get("content") or "" score, note = score_case(case, content) usage = raw.get("usage") or {} completion_tokens = usage.get("completion_tokens") or max(1, len(content.split())) return { "role": case.role, "case": case.name, "score": score, "note": note, "elapsed_seconds": round(elapsed, 3), "completion_tokens": completion_tokens, "tokens_per_second": round(completion_tokens / max(elapsed, 0.001), 2), "content_preview": content[:500], } async def run_model(label: str, base_url: str, model: str, timeout: float) -> dict[str, Any]: rows = [] for case in CASES: try: rows.append(await run_case(base_url, model, case, timeout)) except Exception as exc: rows.append({ "role": case.role, "case": case.name, "score": 0.0, "note": f"error: {exc}", "elapsed_seconds": timeout, "completion_tokens": 0, "tokens_per_second": 0.0, "content_preview": "", }) scores = [row["score"] for row in rows] return { "model": label, "quality": round(mean(scores), 3), "avg_latency_seconds": round(mean(row["elapsed_seconds"] for row in rows), 3), "avg_tokens_per_second": round(mean(row["tokens_per_second"] for row in rows), 2), "cases": rows, } def markdown_report(results: list[dict[str, Any]]) -> str: lines = [ "# Utility Role Model Benchmark", "", "Scope: service roles only (`action`, `memory_policy`, `recall`, `summary`, `critic`).", "The main user-facing thinker is not evaluated for replacement here.", "", "| Model | Quality | Avg latency, s | Avg tok/s | Notes |", "| --- | ---: | ---: | ---: | --- |", ] for result in results: failed = [case for case in result["cases"] if case["score"] < 1.0] note = "all checks passed" if not failed else "; ".join( f"{case['role']}/{case['case']}: {case['note']}" for case in failed[:3] ) lines.append( f"| {result['model']} | {result['quality']:.2f} | " f"{result['avg_latency_seconds']:.2f} | {result['avg_tokens_per_second']:.2f} | {note} |" ) lines.append("") lines.append("## Case Details") for result in results: lines.append(f"\n### {result['model']}") lines.append("| Role | Case | Score | Latency, s | tok/s | Note |") lines.append("| --- | --- | ---: | ---: | ---: | --- |") for case in result["cases"]: lines.append( f"| {case['role']} | {case['case']} | {case['score']:.2f} | " f"{case['elapsed_seconds']:.2f} | {case['tokens_per_second']:.2f} | {case['note']} |" ) return "\n".join(lines) + "\n" async def main() -> None: args = parse_args() RESULTS_DIR.mkdir(parents=True, exist_ok=True) results: list[dict[str, Any]] = [] print("Checking Qwen GPU baseline at", args.baseline_url) await wait_ready(args.baseline_url, args.timeout) results.append(await run_model("Qwen3.6-35B nonMTP GPU baseline", args.baseline_url, "local-main", args.timeout)) if not args.skip_cpu: for raw_model in args.models: model_path = Path(raw_model).resolve() label = f"{model_path.stem} CPU" print("Starting", label) process = start_cpu_server(model_path, args.port, args.threads, args.ctx_size) try: await wait_ready(f"http://127.0.0.1:{args.port}/v1", args.timeout) results.append(await run_model(label, f"http://127.0.0.1:{args.port}/v1", "bench-cpu", args.timeout)) finally: stop_process(process) timestamp = time.strftime("%Y%m%d_%H%M%S") json_path = RESULTS_DIR / f"utility_model_bench_{timestamp}.json" md_path = RESULTS_DIR / f"utility_model_bench_{timestamp}.md" json_path.write_text(json.dumps(results, ensure_ascii=False, indent=2)) md_path.write_text(markdown_report(results)) print(markdown_report(results)) print(f"Wrote {json_path}") print(f"Wrote {md_path}") if __name__ == "__main__": asyncio.run(main())