from __future__ import annotations import json import subprocess from pathlib import Path from typing import Any, Callable class ToolError(RuntimeError): pass class ToolRegistry: def __init__(self, workspace_root: Path) -> None: self.workspace_root = workspace_root.resolve() self._handlers: dict[str, Callable[[dict[str, Any]], dict[str, Any]]] = { "list_files": self.list_files, "read_file": self.read_file, "write_file": self.write_file, "exec_command": self.exec_command, } def schemas(self) -> list[dict[str, Any]]: return [ { "type": "function", "function": { "name": "list_files", "description": "List files in a directory inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "read_file", "description": "Read a UTF-8 text file from the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, }, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "write_file", "description": "Write UTF-8 text into a file inside the workspace.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, }, { "type": "function", "function": { "name": "exec_command", "description": "Run a shell command inside the workspace and return stdout, stderr and exit code.", "parameters": { "type": "object", "properties": { "command": {"type": "string"}, "cwd": {"type": "string"}, }, "required": ["command"], }, }, }, ] def _resolve(self, raw_path: str) -> Path: candidate = Path(raw_path) if not candidate.is_absolute(): candidate = self.workspace_root / candidate resolved = candidate.resolve() if self.workspace_root not in resolved.parents and resolved != self.workspace_root: raise ToolError("Path escapes workspace root") return resolved def execute(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]: handler = self._handlers.get(name) if not handler: raise ToolError(f"Unknown tool: {name}") return handler(arguments) def list_files(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) if not target.exists(): raise ToolError("Directory does not exist") if not target.is_dir(): raise ToolError("Path is not a directory") items = [] for item in sorted(target.iterdir(), key=lambda value: value.name): items.append( { "name": item.name, "path": str(item.relative_to(self.workspace_root)), "type": "dir" if item.is_dir() else "file", } ) return {"items": items} def read_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) if not target.exists(): raise ToolError("File does not exist") if not target.is_file(): raise ToolError("Path is not a file") return {"path": str(target.relative_to(self.workspace_root)), "content": target.read_text(encoding="utf-8")} def write_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) target.parent.mkdir(parents=True, exist_ok=True) target.write_text(arguments["content"], encoding="utf-8") return {"path": str(target.relative_to(self.workspace_root)), "bytes_written": len(arguments["content"].encode("utf-8"))} def exec_command(self, arguments: dict[str, Any]) -> dict[str, Any]: cwd = self._resolve(arguments.get("cwd", ".")) command = arguments["command"] completed = subprocess.run( command, cwd=str(cwd), shell=True, capture_output=True, text=True, timeout=120, ) return { "command": command, "cwd": str(cwd.relative_to(self.workspace_root)), "returncode": completed.returncode, "stdout": completed.stdout[-12000:], "stderr": completed.stderr[-12000:], } @staticmethod def encode_result(result: dict[str, Any]) -> str: return json.dumps(result, ensure_ascii=False)