306 lines
12 KiB
Python
306 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import fnmatch
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Any, Callable
|
|
|
|
from config import ServerConfig
|
|
|
|
|
|
class ToolError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class ToolRegistry:
|
|
def __init__(self, config: ServerConfig) -> None:
|
|
self.config = config
|
|
self.workspace_root = config.workspace_root.resolve()
|
|
self._handlers: dict[str, Callable[[dict[str, Any]], dict[str, Any]]] = {
|
|
"list_files": self.list_files,
|
|
"glob_search": self.glob_search,
|
|
"grep_text": self.grep_text,
|
|
"stat_path": self.stat_path,
|
|
"read_file": self.read_file,
|
|
"write_file": self.write_file,
|
|
"make_directory": self.make_directory,
|
|
"exec_command": self.exec_command,
|
|
}
|
|
|
|
def schemas(self) -> list[dict[str, Any]]:
|
|
return [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "list_files",
|
|
"description": "List files in a directory inside the workspace.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "glob_search",
|
|
"description": "Find workspace paths matching a glob pattern.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"pattern": {"type": "string"},
|
|
"base_path": {"type": "string"},
|
|
"limit": {"type": "integer"},
|
|
},
|
|
"required": ["pattern"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "grep_text",
|
|
"description": "Search text in workspace files using a regular expression.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"pattern": {"type": "string"},
|
|
"base_path": {"type": "string"},
|
|
"limit": {"type": "integer"},
|
|
},
|
|
"required": ["pattern"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "stat_path",
|
|
"description": "Return metadata for a workspace path.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "read_file",
|
|
"description": "Read a UTF-8 text file from the workspace.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "write_file",
|
|
"description": "Write UTF-8 text into a file inside the workspace.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
"content": {"type": "string"},
|
|
},
|
|
"required": ["path", "content"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "make_directory",
|
|
"description": "Create a directory inside the workspace.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "exec_command",
|
|
"description": "Run a shell command inside the workspace and return stdout, stderr and exit code.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"command": {"type": "string"},
|
|
"cwd": {"type": "string"},
|
|
},
|
|
"required": ["command"],
|
|
},
|
|
},
|
|
},
|
|
]
|
|
|
|
def _resolve(self, raw_path: str) -> Path:
|
|
candidate = Path(raw_path)
|
|
if not candidate.is_absolute():
|
|
candidate = self.workspace_root / candidate
|
|
resolved = candidate.resolve()
|
|
if self.workspace_root not in resolved.parents and resolved != self.workspace_root:
|
|
raise ToolError("Path escapes workspace root")
|
|
return resolved
|
|
|
|
def execute(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
handler = self._handlers.get(name)
|
|
if not handler:
|
|
raise ToolError(f"Unknown tool: {name}")
|
|
return handler(arguments)
|
|
|
|
def list_files(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
target = self._resolve(arguments["path"])
|
|
if not target.exists():
|
|
raise ToolError("Directory does not exist")
|
|
if not target.is_dir():
|
|
raise ToolError("Path is not a directory")
|
|
items = []
|
|
for item in sorted(target.iterdir(), key=lambda value: value.name):
|
|
items.append(
|
|
{
|
|
"name": item.name,
|
|
"path": str(item.relative_to(self.workspace_root).as_posix()),
|
|
"type": "dir" if item.is_dir() else "file",
|
|
}
|
|
)
|
|
return {"items": items}
|
|
|
|
def glob_search(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
pattern = arguments["pattern"]
|
|
base = self._resolve(arguments.get("base_path", "."))
|
|
if not base.is_dir():
|
|
raise ToolError("base_path is not a directory")
|
|
limit = max(1, min(int(arguments.get("limit", 200)), 1000))
|
|
matches: list[str] = []
|
|
for root, dirs, files in os.walk(base):
|
|
dirs.sort()
|
|
files.sort()
|
|
rel_root = Path(root).relative_to(self.workspace_root)
|
|
for name in dirs + files:
|
|
rel_path = (rel_root / name).as_posix()
|
|
if fnmatch.fnmatch(rel_path, pattern):
|
|
matches.append(rel_path)
|
|
if len(matches) >= limit:
|
|
return {"matches": matches, "truncated": True}
|
|
return {"matches": matches, "truncated": False}
|
|
|
|
def grep_text(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
regex = re.compile(arguments["pattern"])
|
|
base = self._resolve(arguments.get("base_path", "."))
|
|
if not base.is_dir():
|
|
raise ToolError("base_path is not a directory")
|
|
limit = max(1, min(int(arguments.get("limit", 100)), 500))
|
|
matches: list[dict[str, Any]] = []
|
|
for root, dirs, files in os.walk(base):
|
|
dirs.sort()
|
|
files.sort()
|
|
for file_name in files:
|
|
file_path = Path(root) / file_name
|
|
try:
|
|
text = file_path.read_text(encoding="utf-8")
|
|
except (UnicodeDecodeError, OSError):
|
|
continue
|
|
for lineno, line in enumerate(text.splitlines(), start=1):
|
|
if regex.search(line):
|
|
matches.append(
|
|
{
|
|
"path": file_path.relative_to(self.workspace_root).as_posix(),
|
|
"line": lineno,
|
|
"text": line[:500],
|
|
}
|
|
)
|
|
if len(matches) >= limit:
|
|
return {"matches": matches, "truncated": True}
|
|
return {"matches": matches, "truncated": False}
|
|
|
|
def stat_path(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
target = self._resolve(arguments["path"])
|
|
rel_path = target.relative_to(self.workspace_root).as_posix()
|
|
if not target.exists():
|
|
return {"exists": False, "path": rel_path}
|
|
stat = target.stat()
|
|
return {
|
|
"exists": True,
|
|
"path": rel_path,
|
|
"type": "dir" if target.is_dir() else "file",
|
|
"size": stat.st_size,
|
|
"mtime": int(stat.st_mtime),
|
|
}
|
|
|
|
def read_file(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
target = self._resolve(arguments["path"])
|
|
if not target.exists():
|
|
raise ToolError("File does not exist")
|
|
if not target.is_file():
|
|
raise ToolError("Path is not a file")
|
|
content = target.read_text(encoding="utf-8")
|
|
encoded = content.encode("utf-8")
|
|
truncated = False
|
|
if len(encoded) > self.config.max_file_read_bytes:
|
|
content = encoded[: self.config.max_file_read_bytes].decode(
|
|
"utf-8",
|
|
errors="ignore",
|
|
)
|
|
truncated = True
|
|
return {
|
|
"path": target.relative_to(self.workspace_root).as_posix(),
|
|
"content": content,
|
|
"truncated": truncated,
|
|
}
|
|
|
|
def write_file(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
target = self._resolve(arguments["path"])
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
target.write_text(arguments["content"], encoding="utf-8")
|
|
return {
|
|
"path": target.relative_to(self.workspace_root).as_posix(),
|
|
"bytes_written": len(arguments["content"].encode("utf-8")),
|
|
}
|
|
|
|
def make_directory(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
target = self._resolve(arguments["path"])
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
return {"path": target.relative_to(self.workspace_root).as_posix(), "created": True}
|
|
|
|
def exec_command(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
|
cwd = self._resolve(arguments.get("cwd", "."))
|
|
command = arguments["command"]
|
|
completed = subprocess.run(
|
|
command,
|
|
cwd=str(cwd),
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
)
|
|
return {
|
|
"command": command,
|
|
"cwd": cwd.relative_to(self.workspace_root).as_posix(),
|
|
"returncode": completed.returncode,
|
|
"stdout": completed.stdout[-self.config.max_command_output_bytes :],
|
|
"stderr": completed.stderr[-self.config.max_command_output_bytes :],
|
|
}
|
|
|
|
@staticmethod
|
|
def encode_result(result: dict[str, Any]) -> str:
|
|
return json.dumps(result, ensure_ascii=False)
|