new-qwen/serv/tools.py

381 lines
15 KiB
Python

from __future__ import annotations
import fnmatch
import json
import os
import re
import subprocess
from pathlib import Path
from typing import Any, Callable
from config import ServerConfig
class ToolError(RuntimeError):
pass
class ToolRegistry:
def __init__(self, config: ServerConfig) -> None:
self.config = config
self.workspace_root = config.workspace_root.resolve()
self._handlers: dict[str, Callable[[dict[str, Any]], dict[str, Any]]] = {
"list_files": self.list_files,
"glob_search": self.glob_search,
"grep_text": self.grep_text,
"stat_path": self.stat_path,
"read_file": self.read_file,
"replace_in_file": self.replace_in_file,
"write_file": self.write_file,
"make_directory": self.make_directory,
"exec_command": self.exec_command,
}
def schemas(self) -> list[dict[str, Any]]:
return [
{
"type": "function",
"function": {
"name": "list_files",
"description": "List files in a directory inside the workspace.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "glob_search",
"description": "Find workspace paths matching a glob pattern.",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"base_path": {"type": "string"},
"limit": {"type": "integer"},
},
"required": ["pattern"],
},
},
},
{
"type": "function",
"function": {
"name": "grep_text",
"description": "Search text in workspace files using a regular expression.",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"base_path": {"type": "string"},
"limit": {"type": "integer"},
},
"required": ["pattern"],
},
},
},
{
"type": "function",
"function": {
"name": "stat_path",
"description": "Return metadata for a workspace path.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a UTF-8 text file from the workspace.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "replace_in_file",
"description": "Replace exact text in a workspace file without rewriting unrelated content.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"},
"expected_count": {"type": "integer"},
},
"required": ["path", "old_text", "new_text"],
},
},
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write UTF-8 text into a file inside the workspace.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
},
},
},
{
"type": "function",
"function": {
"name": "make_directory",
"description": "Create a directory inside the workspace.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "exec_command",
"description": "Run a shell command inside the workspace and return stdout, stderr and exit code.",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string"},
"cwd": {"type": "string"},
},
"required": ["command"],
},
},
},
]
def _resolve(self, raw_path: str) -> Path:
candidate = Path(raw_path)
if not candidate.is_absolute():
candidate = self.workspace_root / candidate
resolved = candidate.resolve()
if self.workspace_root not in resolved.parents and resolved != self.workspace_root:
raise ToolError("Path escapes workspace root")
return resolved
def _check_policy(self, tool_name: str) -> None:
policy = self.config.tool_policy
read_only_tools = {"list_files", "glob_search", "grep_text", "stat_path", "read_file"}
shell_tools = {"exec_command"}
if policy in {"full-access", "ask-shell", "ask-write", "ask-all"}:
return
if policy == "read-only" and tool_name not in read_only_tools:
raise ToolError(f"Tool '{tool_name}' is blocked by read-only policy")
if policy == "workspace-write" and tool_name in shell_tools:
raise ToolError(f"Tool '{tool_name}' is blocked by workspace-write policy")
if policy not in {
"full-access",
"workspace-write",
"read-only",
"ask-shell",
"ask-write",
"ask-all",
}:
raise ToolError(f"Unknown tool policy: {policy}")
def requires_approval(self, tool_name: str) -> bool:
policy = self.config.tool_policy
write_tools = {"replace_in_file", "write_file", "make_directory"}
shell_tools = {"exec_command"}
if policy == "ask-all":
return True
if policy == "ask-shell":
return tool_name in shell_tools
if policy == "ask-write":
return tool_name in shell_tools or tool_name in write_tools
return False
def execute(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]:
handler = self._handlers.get(name)
if not handler:
raise ToolError(f"Unknown tool: {name}")
self._check_policy(name)
return handler(arguments)
def list_files(self, arguments: dict[str, Any]) -> dict[str, Any]:
target = self._resolve(arguments["path"])
if not target.exists():
raise ToolError("Directory does not exist")
if not target.is_dir():
raise ToolError("Path is not a directory")
items = []
for item in sorted(target.iterdir(), key=lambda value: value.name):
items.append(
{
"name": item.name,
"path": str(item.relative_to(self.workspace_root).as_posix()),
"type": "dir" if item.is_dir() else "file",
}
)
return {"items": items}
def glob_search(self, arguments: dict[str, Any]) -> dict[str, Any]:
pattern = arguments["pattern"]
base = self._resolve(arguments.get("base_path", "."))
if not base.is_dir():
raise ToolError("base_path is not a directory")
limit = max(1, min(int(arguments.get("limit", 200)), 1000))
matches: list[str] = []
for root, dirs, files in os.walk(base):
dirs.sort()
files.sort()
rel_root = Path(root).relative_to(self.workspace_root)
for name in dirs + files:
rel_path = (rel_root / name).as_posix()
if fnmatch.fnmatch(rel_path, pattern):
matches.append(rel_path)
if len(matches) >= limit:
return {"matches": matches, "truncated": True}
return {"matches": matches, "truncated": False}
def grep_text(self, arguments: dict[str, Any]) -> dict[str, Any]:
regex = re.compile(arguments["pattern"])
base = self._resolve(arguments.get("base_path", "."))
if not base.is_dir():
raise ToolError("base_path is not a directory")
limit = max(1, min(int(arguments.get("limit", 100)), 500))
matches: list[dict[str, Any]] = []
for root, dirs, files in os.walk(base):
dirs.sort()
files.sort()
for file_name in files:
file_path = Path(root) / file_name
try:
text = file_path.read_text(encoding="utf-8")
except (UnicodeDecodeError, OSError):
continue
for lineno, line in enumerate(text.splitlines(), start=1):
if regex.search(line):
matches.append(
{
"path": file_path.relative_to(self.workspace_root).as_posix(),
"line": lineno,
"text": line[:500],
}
)
if len(matches) >= limit:
return {"matches": matches, "truncated": True}
return {"matches": matches, "truncated": False}
def stat_path(self, arguments: dict[str, Any]) -> dict[str, Any]:
target = self._resolve(arguments["path"])
rel_path = target.relative_to(self.workspace_root).as_posix()
if not target.exists():
return {"exists": False, "path": rel_path}
stat = target.stat()
return {
"exists": True,
"path": rel_path,
"type": "dir" if target.is_dir() else "file",
"size": stat.st_size,
"mtime": int(stat.st_mtime),
}
def read_file(self, arguments: dict[str, Any]) -> dict[str, Any]:
target = self._resolve(arguments["path"])
if not target.exists():
raise ToolError("File does not exist")
if not target.is_file():
raise ToolError("Path is not a file")
content = target.read_text(encoding="utf-8")
encoded = content.encode("utf-8")
truncated = False
if len(encoded) > self.config.max_file_read_bytes:
content = encoded[: self.config.max_file_read_bytes].decode(
"utf-8",
errors="ignore",
)
truncated = True
return {
"path": target.relative_to(self.workspace_root).as_posix(),
"content": content,
"truncated": truncated,
}
def write_file(self, arguments: dict[str, Any]) -> dict[str, Any]:
target = self._resolve(arguments["path"])
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(arguments["content"], encoding="utf-8")
return {
"path": target.relative_to(self.workspace_root).as_posix(),
"bytes_written": len(arguments["content"].encode("utf-8")),
}
def replace_in_file(self, arguments: dict[str, Any]) -> dict[str, Any]:
target = self._resolve(arguments["path"])
if not target.exists():
raise ToolError("File does not exist")
if not target.is_file():
raise ToolError("Path is not a file")
old_text = arguments["old_text"]
new_text = arguments["new_text"]
expected_count = arguments.get("expected_count")
content = target.read_text(encoding="utf-8")
count = content.count(old_text)
if count == 0:
raise ToolError("old_text not found in file")
if expected_count is not None and count != int(expected_count):
raise ToolError(
f"expected_count mismatch: found {count}, expected {int(expected_count)}"
)
updated = content.replace(old_text, new_text)
target.write_text(updated, encoding="utf-8")
return {
"path": target.relative_to(self.workspace_root).as_posix(),
"replacements": count,
}
def make_directory(self, arguments: dict[str, Any]) -> dict[str, Any]:
target = self._resolve(arguments["path"])
target.mkdir(parents=True, exist_ok=True)
return {"path": target.relative_to(self.workspace_root).as_posix(), "created": True}
def exec_command(self, arguments: dict[str, Any]) -> dict[str, Any]:
cwd = self._resolve(arguments.get("cwd", "."))
command = arguments["command"]
completed = subprocess.run(
command,
cwd=str(cwd),
shell=True,
capture_output=True,
text=True,
timeout=120,
)
return {
"command": command,
"cwd": cwd.relative_to(self.workspace_root).as_posix(),
"returncode": completed.returncode,
"stdout": completed.stdout[-self.config.max_command_output_bytes :],
"stderr": completed.stderr[-self.config.max_command_output_bytes :],
}
@staticmethod
def encode_result(result: dict[str, Any]) -> str:
return json.dumps(result, ensure_ascii=False)