ducklm/scripts/bench/bench_runtime.py

436 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import asyncio
import json
import os
import signal
import subprocess
import time
from dataclasses import dataclass, field
from pathlib import Path
from statistics import mean
from typing import Any
import httpx
import jsonschema
ROOT = Path(__file__).resolve().parents[2]
LLAMA_BIN = ROOT / "vendor/llama.cpp/build/bin/llama-server"
BASELINE_URL = "http://127.0.0.1:8081/v1"
BENCH_PORT = 18091
BENCH_URL = f"http://127.0.0.1:{BENCH_PORT}/v1"
RESULTS_DIR = ROOT / "docs/bench"
CANDIDATES = [
ROOT / "models/Menlo_Lucy-Q4_K_M.gguf",
ROOT / "models/Qwen3.5-9B-GLM5.1-Distill-v1-Q4_K_M.gguf",
ROOT / "models/X-Coder-SFT-Qwen3-8B.Q6_K.gguf",
ROOT / "models/gemma-4-E4B-it-Q4_K_M.gguf",
]
@dataclass
class BenchCase:
role: str
name: str
system_prompt: str
user_prompt: str
max_tokens: int
response_format: dict[str, Any] | None = None
required_keywords: list[str] = field(default_factory=list)
def read(path: str) -> str:
return (ROOT / path).read_text()
ACTION_SCHEMA = json.loads((ROOT / "duck_core/schemas/action_directive.schema.json").read_text())
MEMORY_SCHEMA = {
"type": "object",
"required": ["should_store", "memory_type", "summary", "importance", "scope", "metadata"],
"additionalProperties": True,
"properties": {
"should_store": {"type": "boolean"},
"memory_type": {"type": "string"},
"summary": {"type": "string"},
"importance": {"type": "number"},
"scope": {"type": "string"},
"metadata": {"type": "object"},
},
}
RECALL_SCHEMA = {
"type": "object",
"required": ["relevant_ids", "reasoning"],
"additionalProperties": True,
"properties": {
"relevant_ids": {"type": "array", "items": {"type": "string"}},
"reasoning": {"type": "string"},
},
}
CASES = [
BenchCase(
role="action",
name="direct_answer_no_tools",
system_prompt=read("prompts/roles/action.md"),
user_prompt="User request: Скажи коротко, что такое DuckLM.\nWorkspace: /tmp/duck",
max_tokens=180,
response_format={
"type": "json_schema",
"json_schema": {"name": "action_directive", "schema": ACTION_SCHEMA, "strict": True},
},
),
BenchCase(
role="action",
name="read_specific_file",
system_prompt=read("prompts/roles/action.md"),
user_prompt="User request: Прочитай файл CURRENT_STATE.md и кратко скажи статус проекта.\nWorkspace: /home/mirivlad/git/ducklm",
max_tokens=220,
response_format={
"type": "json_schema",
"json_schema": {"name": "action_directive", "schema": ACTION_SCHEMA, "strict": True},
},
),
BenchCase(
role="memory_policy",
name="store_user_preference",
system_prompt=read("prompts/roles/memory_policy.md"),
user_prompt="Task ID: task_pref\n\nTranscript:\nПользователь сказал: всегда отвечай мне по-русски и не запускай sudo без отдельного подтверждения.",
max_tokens=180,
response_format={
"type": "json_schema",
"json_schema": {"name": "memory_decision", "schema": MEMORY_SCHEMA, "strict": True},
},
),
BenchCase(
role="memory_policy",
name="ignore_trivial_tool_call",
system_prompt=read("prompts/roles/memory_policy.md"),
user_prompt="Task ID: task_tmp\n\nTranscript:\nЗапущен pwd, stdout: /tmp/project. Ответ отправлен пользователю.",
max_tokens=160,
response_format={
"type": "json_schema",
"json_schema": {"name": "memory_decision", "schema": MEMORY_SCHEMA, "strict": True},
},
),
BenchCase(
role="recall",
name="select_relevant_memory",
system_prompt=read("prompts/roles/recall.md"),
user_prompt=(
"Query: Как пользователь хочет, чтобы я запускал sudo?\n\n"
"Memories:\n"
"- id: m1 | text: Пользователь просит не запускать sudo без отдельного подтверждения.\n"
"- id: m2 | text: Проект использует SQLite для событий.\n"
"- id: m3 | text: Пользователь любит краткие ответы о погоде.\n"
),
max_tokens=160,
response_format={
"type": "json_schema",
"json_schema": {"name": "recall_decision", "schema": RECALL_SCHEMA, "strict": True},
},
),
BenchCase(
role="summary",
name="preserve_decisions",
system_prompt=read("prompts/roles/summary.md"),
user_prompt=(
"Сожми контекст до 3 пунктов. Сохрани решения:\n"
"1. WebChat работает на 8000, llama-server на 8081.\n"
"2. Для внешних путей нужен approval.\n"
"3. allow_forever хранится по normalized action hash.\n"
),
max_tokens=180,
required_keywords=["8000", "8081", "approval", "allow_forever"],
),
BenchCase(
role="critic",
name="reflection_quality",
system_prompt=read("prompts/roles/critic.md"),
user_prompt=(
"Task transcript:\n"
"User asked to fix WebChat. Root cause was API not running. "
"We started API manually but had no unified service script. Reflect on risk and reusable lesson."
),
max_tokens=220,
required_keywords=["risk", "lesson"],
),
]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--baseline-url", default=BASELINE_URL)
parser.add_argument("--port", type=int, default=BENCH_PORT)
parser.add_argument("--models", nargs="*", default=[str(path) for path in CANDIDATES])
parser.add_argument("--threads", type=int, default=max(1, (os.cpu_count() or 8) // 2))
parser.add_argument("--ctx-size", type=int, default=4096)
parser.add_argument("--timeout", type=float, default=180.0)
parser.add_argument("--skip-cpu", action="store_true")
return parser.parse_args()
def llama_env() -> dict[str, str]:
env = os.environ.copy()
bin_dir = str(LLAMA_BIN.parent)
env["LD_LIBRARY_PATH"] = f"{bin_dir}{':' + env['LD_LIBRARY_PATH'] if env.get('LD_LIBRARY_PATH') else ''}"
return env
async def wait_ready(base_url: str, timeout: float) -> None:
deadline = time.perf_counter() + timeout
async with httpx.AsyncClient(timeout=5.0, trust_env=False) as client:
last_error = ""
while time.perf_counter() < deadline:
try:
response = await client.get(f"{base_url}/models")
if response.status_code == 200:
return
last_error = f"HTTP {response.status_code}: {response.text[:120]}"
except Exception as exc:
last_error = str(exc)
await asyncio.sleep(1.0)
raise TimeoutError(f"{base_url} not ready: {last_error}")
def start_cpu_server(model_path: Path, port: int, threads: int, ctx_size: int) -> subprocess.Popen:
log_dir = ROOT / "data/bench"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"{model_path.stem}.log"
command = [
str(LLAMA_BIN),
"-m",
str(model_path),
"--alias",
"bench-cpu",
"--host",
"127.0.0.1",
"--port",
str(port),
"-c",
str(ctx_size),
"--parallel",
"1",
"-ngl",
"0",
"--threads",
str(threads),
"--threads-batch",
str(threads),
"--reasoning",
"off",
"--cache-ram",
"0",
]
handle = log_file.open("a")
handle.write("Command: " + " ".join(command) + "\n")
handle.flush()
process = subprocess.Popen(
command,
cwd=ROOT,
env=llama_env(),
stdout=handle,
stderr=subprocess.STDOUT,
start_new_session=True,
)
process._duck_log_handle = handle # type: ignore[attr-defined]
return process
def stop_process(process: subprocess.Popen | None) -> None:
if process is None:
return
if process.poll() is None:
os.killpg(process.pid, signal.SIGTERM)
try:
process.wait(timeout=15)
except subprocess.TimeoutExpired:
os.killpg(process.pid, signal.SIGKILL)
process.wait(timeout=10)
handle = getattr(process, "_duck_log_handle", None)
if handle:
handle.close()
def safe_json(content: str) -> tuple[dict[str, Any] | None, str | None]:
text = content.strip()
if text.startswith("```"):
text = text.strip("`")
text = text.removeprefix("json").strip()
try:
return json.loads(text), None
except json.JSONDecodeError as exc:
return None, str(exc)
def score_case(case: BenchCase, content: str) -> tuple[float, str]:
data = None
if case.response_format:
data, error = safe_json(content)
if data is None:
return 0.0, f"invalid_json: {error}"
schema = case.response_format["json_schema"]["schema"]
try:
jsonschema.validate(data, schema)
except jsonschema.ValidationError as exc:
return 0.2, f"schema_error: {exc.message}"
if case.role == "action" and data is not None:
actions = data.get("actions") or []
if case.name == "direct_answer_no_tools":
return (1.0, "ok") if actions == [] else (0.3, f"unexpected_actions={actions}")
if case.name == "read_specific_file":
if actions and actions[0].get("tool") == "file_read" and actions[0].get("args", {}).get("path") == "CURRENT_STATE.md":
return 1.0, "ok"
return 0.4, f"wrong_action={actions}"
if case.role == "memory_policy" and data is not None:
if case.name == "store_user_preference":
ok = data.get("should_store") is True and data.get("memory_type") == "preference" and data.get("scope") == "global"
return (1.0, "ok") if ok else (0.4, f"wrong_memory_decision={data}")
if case.name == "ignore_trivial_tool_call":
ok = data.get("should_store") is False
return (1.0, "ok") if ok else (0.3, f"stored_trivial={data}")
if case.role == "recall" and data is not None:
ids = set(data.get("relevant_ids") or [])
if "m1" in ids and "m2" not in ids and "m3" not in ids:
return 1.0, "ok"
return 0.3, f"wrong_ids={sorted(ids)}"
lowered = content.lower()
missing = [word for word in case.required_keywords if word.lower() not in lowered]
if missing:
return max(0.2, 1.0 - 0.2 * len(missing)), f"missing={missing}"
return 1.0, "ok"
async def run_case(base_url: str, model: str, case: BenchCase, timeout: float) -> dict[str, Any]:
payload: dict[str, Any] = {
"model": model,
"messages": [
{"role": "system", "content": case.system_prompt},
{"role": "user", "content": case.user_prompt},
],
"temperature": 0.0,
"max_tokens": case.max_tokens,
}
if case.response_format:
payload["response_format"] = case.response_format
started = time.perf_counter()
async with httpx.AsyncClient(timeout=timeout, trust_env=False) as client:
response = await client.post(f"{base_url}/chat/completions", json=payload)
elapsed = time.perf_counter() - started
response.raise_for_status()
raw = response.json()
message = raw.get("choices", [{}])[0].get("message", {})
content = message.get("content") or ""
score, note = score_case(case, content)
usage = raw.get("usage") or {}
completion_tokens = usage.get("completion_tokens") or max(1, len(content.split()))
return {
"role": case.role,
"case": case.name,
"score": score,
"note": note,
"elapsed_seconds": round(elapsed, 3),
"completion_tokens": completion_tokens,
"tokens_per_second": round(completion_tokens / max(elapsed, 0.001), 2),
"content_preview": content[:500],
}
async def run_model(label: str, base_url: str, model: str, timeout: float) -> dict[str, Any]:
rows = []
for case in CASES:
try:
rows.append(await run_case(base_url, model, case, timeout))
except Exception as exc:
rows.append({
"role": case.role,
"case": case.name,
"score": 0.0,
"note": f"error: {exc}",
"elapsed_seconds": timeout,
"completion_tokens": 0,
"tokens_per_second": 0.0,
"content_preview": "",
})
scores = [row["score"] for row in rows]
return {
"model": label,
"quality": round(mean(scores), 3),
"avg_latency_seconds": round(mean(row["elapsed_seconds"] for row in rows), 3),
"avg_tokens_per_second": round(mean(row["tokens_per_second"] for row in rows), 2),
"cases": rows,
}
def markdown_report(results: list[dict[str, Any]]) -> str:
lines = [
"# Utility Role Model Benchmark",
"",
"Scope: service roles only (`action`, `memory_policy`, `recall`, `summary`, `critic`).",
"The main user-facing thinker is not evaluated for replacement here.",
"",
"| Model | Quality | Avg latency, s | Avg tok/s | Notes |",
"| --- | ---: | ---: | ---: | --- |",
]
for result in results:
failed = [case for case in result["cases"] if case["score"] < 1.0]
note = "all checks passed" if not failed else "; ".join(
f"{case['role']}/{case['case']}: {case['note']}" for case in failed[:3]
)
lines.append(
f"| {result['model']} | {result['quality']:.2f} | "
f"{result['avg_latency_seconds']:.2f} | {result['avg_tokens_per_second']:.2f} | {note} |"
)
lines.append("")
lines.append("## Case Details")
for result in results:
lines.append(f"\n### {result['model']}")
lines.append("| Role | Case | Score | Latency, s | tok/s | Note |")
lines.append("| --- | --- | ---: | ---: | ---: | --- |")
for case in result["cases"]:
lines.append(
f"| {case['role']} | {case['case']} | {case['score']:.2f} | "
f"{case['elapsed_seconds']:.2f} | {case['tokens_per_second']:.2f} | {case['note']} |"
)
return "\n".join(lines) + "\n"
async def main() -> None:
args = parse_args()
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
results: list[dict[str, Any]] = []
print("Checking Qwen GPU baseline at", args.baseline_url)
await wait_ready(args.baseline_url, args.timeout)
results.append(await run_model("Qwen3.6-35B nonMTP GPU baseline", args.baseline_url, "local-main", args.timeout))
if not args.skip_cpu:
for raw_model in args.models:
model_path = Path(raw_model).resolve()
label = f"{model_path.stem} CPU"
print("Starting", label)
process = start_cpu_server(model_path, args.port, args.threads, args.ctx_size)
try:
await wait_ready(f"http://127.0.0.1:{args.port}/v1", args.timeout)
results.append(await run_model(label, f"http://127.0.0.1:{args.port}/v1", "bench-cpu", args.timeout))
finally:
stop_process(process)
timestamp = time.strftime("%Y%m%d_%H%M%S")
json_path = RESULTS_DIR / f"utility_model_bench_{timestamp}.json"
md_path = RESULTS_DIR / f"utility_model_bench_{timestamp}.md"
json_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
md_path.write_text(markdown_report(results))
print(markdown_report(results))
print(f"Wrote {json_path}")
print(f"Wrote {md_path}")
if __name__ == "__main__":
asyncio.run(main())