from __future__ import annotations import fnmatch import json import os import re import subprocess from pathlib import Path from typing import Any, Callable from config import ServerConfig class ToolError(RuntimeError): pass class ToolRegistry: def __init__(self, config: ServerConfig) -> None: self.config = config self.workspace_root = config.workspace_root.resolve() self._handlers: dict[str, Callable[[dict[str, Any]], dict[str, Any]]] = { "list_files": self.list_files, "glob_search": self.glob_search, "grep_text": self.grep_text, "stat_path": self.stat_path, "read_file": self.read_file, "write_file": self.write_file, "make_directory": self.make_directory, "exec_command": self.exec_command, } def schemas(self) -> list[dict[str, Any]]: return [ { "type": "function", "function": { "name": "list_files", "description": "List files in a directory inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "glob_search", "description": "Find workspace paths matching a glob pattern.", "parameters": { "type": "object", "properties": { "pattern": {"type": "string"}, "base_path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["pattern"], }, }, }, { "type": "function", "function": { "name": "grep_text", "description": "Search text in workspace files using a regular expression.", "parameters": { "type": "object", "properties": { "pattern": {"type": "string"}, "base_path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["pattern"], }, }, }, { "type": "function", "function": { "name": "stat_path", "description": "Return metadata for a workspace path.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "read_file", "description": "Read a UTF-8 text file from the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "write_file", "description": "Write UTF-8 text into a file inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, }, { "type": "function", "function": { "name": "make_directory", "description": "Create a directory inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "exec_command", "description": "Run a shell command inside the workspace and return stdout, stderr and exit code.", "parameters": { "type": "object", "properties": { "command": {"type": "string"}, "cwd": {"type": "string"}, }, "required": ["command"], }, }, }, ] def _resolve(self, raw_path: str) -> Path: candidate = Path(raw_path) if not candidate.is_absolute(): candidate = self.workspace_root / candidate resolved = candidate.resolve() if self.workspace_root not in resolved.parents and resolved != self.workspace_root: raise ToolError("Path escapes workspace root") return resolved def execute(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]: handler = self._handlers.get(name) if not handler: raise ToolError(f"Unknown tool: {name}") return handler(arguments) def list_files(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) if not target.exists(): raise ToolError("Directory does not exist") if not target.is_dir(): raise ToolError("Path is not a directory") items = [] for item in sorted(target.iterdir(), key=lambda value: value.name): items.append( { "name": item.name, "path": str(item.relative_to(self.workspace_root).as_posix()), "type": "dir" if item.is_dir() else "file", } ) return {"items": items} def glob_search(self, arguments: dict[str, Any]) -> dict[str, Any]: pattern = arguments["pattern"] base = self._resolve(arguments.get("base_path", ".")) if not base.is_dir(): raise ToolError("base_path is not a directory") limit = max(1, min(int(arguments.get("limit", 200)), 1000)) matches: list[str] = [] for root, dirs, files in os.walk(base): dirs.sort() files.sort() rel_root = Path(root).relative_to(self.workspace_root) for name in dirs + files: rel_path = (rel_root / name).as_posix() if fnmatch.fnmatch(rel_path, pattern): matches.append(rel_path) if len(matches) >= limit: return {"matches": matches, "truncated": True} return {"matches": matches, "truncated": False} def grep_text(self, arguments: dict[str, Any]) -> dict[str, Any]: regex = re.compile(arguments["pattern"]) base = self._resolve(arguments.get("base_path", ".")) if not base.is_dir(): raise ToolError("base_path is not a directory") limit = max(1, min(int(arguments.get("limit", 100)), 500)) matches: list[dict[str, Any]] = [] for root, dirs, files in os.walk(base): dirs.sort() files.sort() for file_name in files: file_path = Path(root) / file_name try: text = file_path.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError): continue for lineno, line in enumerate(text.splitlines(), start=1): if regex.search(line): matches.append( { "path": file_path.relative_to(self.workspace_root).as_posix(), "line": lineno, "text": line[:500], } ) if len(matches) >= limit: return {"matches": matches, "truncated": True} return {"matches": matches, "truncated": False} def stat_path(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) rel_path = target.relative_to(self.workspace_root).as_posix() if not target.exists(): return {"exists": False, "path": rel_path} stat = target.stat() return { "exists": True, "path": rel_path, "type": "dir" if target.is_dir() else "file", "size": stat.st_size, "mtime": int(stat.st_mtime), } def read_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) if not target.exists(): raise ToolError("File does not exist") if not target.is_file(): raise ToolError("Path is not a file") content = target.read_text(encoding="utf-8") encoded = content.encode("utf-8") truncated = False if len(encoded) > self.config.max_file_read_bytes: content = encoded[: self.config.max_file_read_bytes].decode( "utf-8", errors="ignore", ) truncated = True return { "path": target.relative_to(self.workspace_root).as_posix(), "content": content, "truncated": truncated, } def write_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) target.parent.mkdir(parents=True, exist_ok=True) target.write_text(arguments["content"], encoding="utf-8") return { "path": target.relative_to(self.workspace_root).as_posix(), "bytes_written": len(arguments["content"].encode("utf-8")), } def make_directory(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) target.mkdir(parents=True, exist_ok=True) return {"path": target.relative_to(self.workspace_root).as_posix(), "created": True} def exec_command(self, arguments: dict[str, Any]) -> dict[str, Any]: cwd = self._resolve(arguments.get("cwd", ".")) command = arguments["command"] completed = subprocess.run( command, cwd=str(cwd), shell=True, capture_output=True, text=True, timeout=120, ) return { "command": command, "cwd": cwd.relative_to(self.workspace_root).as_posix(), "returncode": completed.returncode, "stdout": completed.stdout[-self.config.max_command_output_bytes :], "stderr": completed.stderr[-self.config.max_command_output_bytes :], } @staticmethod def encode_result(result: dict[str, Any]) -> str: return json.dumps(result, ensure_ascii=False)