from __future__ import annotations import fnmatch import shutil import json import os import re import subprocess import tempfile from pathlib import Path from typing import Any, Callable from config import ServerConfig class ToolError(RuntimeError): pass class ToolRegistry: def __init__(self, config: ServerConfig) -> None: self.config = config self.workspace_root = config.workspace_root.resolve() self._handlers: dict[str, Callable[[dict[str, Any]], dict[str, Any]]] = { "list_files": self.list_files, "glob_search": self.glob_search, "grep_text": self.grep_text, "stat_path": self.stat_path, "read_file": self.read_file, "append_file": self.append_file, "apply_unified_diff": self.apply_unified_diff, "replace_in_file": self.replace_in_file, "write_file": self.write_file, "make_directory": self.make_directory, "delete_path": self.delete_path, "move_path": self.move_path, "copy_path": self.copy_path, "git_status": self.git_status, "git_diff": self.git_diff, "exec_command": self.exec_command, } def schemas(self) -> list[dict[str, Any]]: return [ { "type": "function", "function": { "name": "list_files", "description": "List files in a directory inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "glob_search", "description": "Find workspace paths matching a glob pattern.", "parameters": { "type": "object", "properties": { "pattern": {"type": "string"}, "base_path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["pattern"], }, }, }, { "type": "function", "function": { "name": "grep_text", "description": "Search text in workspace files using a regular expression.", "parameters": { "type": "object", "properties": { "pattern": {"type": "string"}, "base_path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["pattern"], }, }, }, { "type": "function", "function": { "name": "stat_path", "description": "Return metadata for a workspace path.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "read_file", "description": "Read a UTF-8 text file from the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "append_file", "description": "Append UTF-8 text to a file inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, }, { "type": "function", "function": { "name": "apply_unified_diff", "description": "Apply a unified diff patch inside the workspace using the system patch command.", "parameters": { "type": "object", "properties": { "patch": {"type": "string"}, "strip": {"type": "integer"}, }, "required": ["patch"], }, }, }, { "type": "function", "function": { "name": "replace_in_file", "description": "Replace exact text in a workspace file without rewriting unrelated content.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}, "expected_count": {"type": "integer"}, }, "required": ["path", "old_text", "new_text"], }, }, }, { "type": "function", "function": { "name": "write_file", "description": "Write UTF-8 text into a file inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, }, { "type": "function", "function": { "name": "make_directory", "description": "Create a directory inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "delete_path", "description": "Delete a file or directory inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "recursive": {"type": "boolean"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "move_path", "description": "Move or rename a file or directory inside the workspace.", "parameters": { "type": "object", "properties": { "source_path": {"type": "string"}, "destination_path": {"type": "string"}, }, "required": ["source_path", "destination_path"], }, }, }, { "type": "function", "function": { "name": "copy_path", "description": "Copy a file or directory inside the workspace.", "parameters": { "type": "object", "properties": { "source_path": {"type": "string"}, "destination_path": {"type": "string"}, "recursive": {"type": "boolean"}, }, "required": ["source_path", "destination_path"], }, }, }, { "type": "function", "function": { "name": "git_status", "description": "Return a compact git status for the workspace or a subdirectory.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, }, }, }, { "type": "function", "function": { "name": "git_diff", "description": "Return a unified git diff for the workspace or a specific path.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "cached": {"type": "boolean"}, }, }, }, }, { "type": "function", "function": { "name": "exec_command", "description": "Run a shell command inside the workspace and return stdout, stderr and exit code.", "parameters": { "type": "object", "properties": { "command": {"type": "string"}, "cwd": {"type": "string"}, }, "required": ["command"], }, }, }, ] def _resolve(self, raw_path: str) -> Path: candidate = Path(raw_path) if not candidate.is_absolute(): candidate = self.workspace_root / candidate resolved = candidate.resolve() if self.workspace_root not in resolved.parents and resolved != self.workspace_root: raise ToolError("Path escapes workspace root") return resolved def _check_policy(self, tool_name: str) -> None: policy = self.config.tool_policy read_only_tools = {"list_files", "glob_search", "grep_text", "stat_path", "read_file"} shell_tools = {"exec_command"} if policy in {"full-access", "ask-shell", "ask-write", "ask-all"}: return if policy == "read-only" and tool_name not in read_only_tools: raise ToolError(f"Tool '{tool_name}' is blocked by read-only policy") if policy == "workspace-write" and tool_name in shell_tools: raise ToolError(f"Tool '{tool_name}' is blocked by workspace-write policy") if policy not in { "full-access", "workspace-write", "read-only", "ask-shell", "ask-write", "ask-all", }: raise ToolError(f"Unknown tool policy: {policy}") def requires_approval(self, tool_name: str) -> bool: policy = self.config.tool_policy write_tools = { "append_file", "apply_unified_diff", "replace_in_file", "write_file", "make_directory", "delete_path", "move_path", "copy_path", } shell_tools = {"exec_command"} if policy == "ask-all": return True if policy == "ask-shell": return tool_name in shell_tools if policy == "ask-write": return tool_name in shell_tools or tool_name in write_tools return False def execute(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]: handler = self._handlers.get(name) if not handler: raise ToolError(f"Unknown tool: {name}") self._check_policy(name) return handler(arguments) def list_files(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) if not target.exists(): raise ToolError("Directory does not exist") if not target.is_dir(): raise ToolError("Path is not a directory") items = [] for item in sorted(target.iterdir(), key=lambda value: value.name): items.append( { "name": item.name, "path": str(item.relative_to(self.workspace_root).as_posix()), "type": "dir" if item.is_dir() else "file", } ) return {"items": items} def glob_search(self, arguments: dict[str, Any]) -> dict[str, Any]: pattern = arguments["pattern"] base = self._resolve(arguments.get("base_path", ".")) if not base.is_dir(): raise ToolError("base_path is not a directory") limit = max(1, min(int(arguments.get("limit", 200)), 1000)) matches: list[str] = [] for root, dirs, files in os.walk(base): dirs.sort() files.sort() rel_root = Path(root).relative_to(self.workspace_root) for name in dirs + files: rel_path = (rel_root / name).as_posix() if fnmatch.fnmatch(rel_path, pattern): matches.append(rel_path) if len(matches) >= limit: return {"matches": matches, "truncated": True} return {"matches": matches, "truncated": False} def grep_text(self, arguments: dict[str, Any]) -> dict[str, Any]: regex = re.compile(arguments["pattern"]) base = self._resolve(arguments.get("base_path", ".")) if not base.is_dir(): raise ToolError("base_path is not a directory") limit = max(1, min(int(arguments.get("limit", 100)), 500)) matches: list[dict[str, Any]] = [] for root, dirs, files in os.walk(base): dirs.sort() files.sort() for file_name in files: file_path = Path(root) / file_name try: text = file_path.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError): continue for lineno, line in enumerate(text.splitlines(), start=1): if regex.search(line): matches.append( { "path": file_path.relative_to(self.workspace_root).as_posix(), "line": lineno, "text": line[:500], } ) if len(matches) >= limit: return {"matches": matches, "truncated": True} return {"matches": matches, "truncated": False} def stat_path(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) rel_path = target.relative_to(self.workspace_root).as_posix() if not target.exists(): return {"exists": False, "path": rel_path} stat = target.stat() return { "exists": True, "path": rel_path, "type": "dir" if target.is_dir() else "file", "size": stat.st_size, "mtime": int(stat.st_mtime), } def read_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) if not target.exists(): raise ToolError("File does not exist") if not target.is_file(): raise ToolError("Path is not a file") content = target.read_text(encoding="utf-8") encoded = content.encode("utf-8") truncated = False if len(encoded) > self.config.max_file_read_bytes: content = encoded[: self.config.max_file_read_bytes].decode( "utf-8", errors="ignore", ) truncated = True return { "path": target.relative_to(self.workspace_root).as_posix(), "content": content, "truncated": truncated, } def write_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) target.parent.mkdir(parents=True, exist_ok=True) target.write_text(arguments["content"], encoding="utf-8") return { "path": target.relative_to(self.workspace_root).as_posix(), "bytes_written": len(arguments["content"].encode("utf-8")), } def append_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) target.parent.mkdir(parents=True, exist_ok=True) with target.open("a", encoding="utf-8") as handle: handle.write(arguments["content"]) return { "path": target.relative_to(self.workspace_root).as_posix(), "bytes_appended": len(arguments["content"].encode("utf-8")), } def replace_in_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) if not target.exists(): raise ToolError("File does not exist") if not target.is_file(): raise ToolError("Path is not a file") old_text = arguments["old_text"] new_text = arguments["new_text"] expected_count = arguments.get("expected_count") content = target.read_text(encoding="utf-8") count = content.count(old_text) if count == 0: raise ToolError("old_text not found in file") if expected_count is not None and count != int(expected_count): raise ToolError( f"expected_count mismatch: found {count}, expected {int(expected_count)}" ) updated = content.replace(old_text, new_text) target.write_text(updated, encoding="utf-8") return { "path": target.relative_to(self.workspace_root).as_posix(), "replacements": count, } def apply_unified_diff(self, arguments: dict[str, Any]) -> dict[str, Any]: patch_text = arguments["patch"] strip = int(arguments.get("strip", 0)) if not patch_text.strip(): raise ToolError("Patch is empty") if "\x00" in patch_text: raise ToolError("Patch contains NUL byte") with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", suffix=".patch", delete=False, ) as handle: patch_file = Path(handle.name) handle.write(patch_text) try: completed = subprocess.run( ["patch", f"-p{strip}", "--forward", "--batch", "-i", str(patch_file)], cwd=str(self.workspace_root), capture_output=True, text=True, timeout=120, ) except FileNotFoundError as exc: raise ToolError("System command 'patch' is not available") from exc finally: try: patch_file.unlink() except OSError: pass if completed.returncode != 0: raise ToolError( "patch failed: " + (completed.stderr.strip() or completed.stdout.strip() or "unknown error") ) return { "applied": True, "strip": strip, "stdout": completed.stdout[-self.config.max_command_output_bytes :], "stderr": completed.stderr[-self.config.max_command_output_bytes :], } def make_directory(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) target.mkdir(parents=True, exist_ok=True) return {"path": target.relative_to(self.workspace_root).as_posix(), "created": True} def delete_path(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) recursive = bool(arguments.get("recursive", False)) if not target.exists(): raise ToolError("Path does not exist") rel_path = target.relative_to(self.workspace_root).as_posix() if target.is_dir(): if not recursive: raise ToolError("Directory deletion requires recursive=true") shutil.rmtree(target) else: target.unlink() return {"path": rel_path, "deleted": True} def move_path(self, arguments: dict[str, Any]) -> dict[str, Any]: source = self._resolve(arguments["source_path"]) destination = self._resolve(arguments["destination_path"]) if not source.exists(): raise ToolError("Source path does not exist") destination.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(source), str(destination)) return { "source_path": source.relative_to(self.workspace_root).as_posix(), "destination_path": destination.relative_to(self.workspace_root).as_posix(), "moved": True, } def copy_path(self, arguments: dict[str, Any]) -> dict[str, Any]: source = self._resolve(arguments["source_path"]) destination = self._resolve(arguments["destination_path"]) recursive = bool(arguments.get("recursive", False)) if not source.exists(): raise ToolError("Source path does not exist") destination.parent.mkdir(parents=True, exist_ok=True) if source.is_dir(): if not recursive: raise ToolError("Directory copy requires recursive=true") shutil.copytree(source, destination, dirs_exist_ok=True) else: shutil.copy2(source, destination) return { "source_path": source.relative_to(self.workspace_root).as_posix(), "destination_path": destination.relative_to(self.workspace_root).as_posix(), "copied": True, } def git_status(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments.get("path", ".")) completed = subprocess.run( ["git", "status", "--short"], cwd=str(target if target.is_dir() else target.parent), capture_output=True, text=True, timeout=60, ) if completed.returncode != 0: raise ToolError(completed.stderr.strip() or completed.stdout.strip() or "git status failed") lines = completed.stdout.splitlines() return {"status": lines[:200], "truncated": len(lines) > 200} def git_diff(self, arguments: dict[str, Any]) -> dict[str, Any]: target = arguments.get("path") cached = bool(arguments.get("cached", False)) cmd = ["git", "diff"] if cached: cmd.append("--cached") if target: resolved_target = self._resolve(target) cwd = str(resolved_target if resolved_target.is_dir() else resolved_target.parent) if resolved_target.exists(): cmd.extend(["--", str(resolved_target)]) else: cmd.extend(["--", target]) else: cwd = str(self.workspace_root) completed = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=60, ) if completed.returncode != 0: raise ToolError(completed.stderr.strip() or completed.stdout.strip() or "git diff failed") diff_text = completed.stdout truncated = False if len(diff_text.encode("utf-8")) > self.config.max_file_read_bytes: diff_text = diff_text.encode("utf-8")[: self.config.max_file_read_bytes].decode( "utf-8", errors="ignore", ) truncated = True return {"diff": diff_text, "truncated": truncated, "cached": cached} def exec_command(self, arguments: dict[str, Any]) -> dict[str, Any]: cwd = self._resolve(arguments.get("cwd", ".")) command = arguments["command"] completed = subprocess.run( command, cwd=str(cwd), shell=True, capture_output=True, text=True, timeout=120, ) return { "command": command, "cwd": cwd.relative_to(self.workspace_root).as_posix(), "returncode": completed.returncode, "stdout": completed.stdout[-self.config.max_command_output_bytes :], "stderr": completed.stderr[-self.config.max_command_output_bytes :], } @staticmethod def encode_result(result: dict[str, Any]) -> str: return json.dumps(result, ensure_ascii=False)