86 lines
3.3 KiB
Python
86 lines
3.3 KiB
Python
from fnmatch import fnmatch
|
|
from typing import Any
|
|
|
|
from duck_core.tools.base import ToolResult
|
|
from duck_core.tools.paths import (
|
|
WorkspacePathError,
|
|
candidate_path,
|
|
is_inside_workspace,
|
|
resolve_workspace_path,
|
|
)
|
|
|
|
|
|
class SearchFilesTool:
|
|
name = "search_files"
|
|
risk_level = "low"
|
|
|
|
def __init__(self, workspace: str, max_matches: int = 100, max_file_bytes: int = 1_000_000):
|
|
self.workspace = workspace
|
|
self.max_matches = max_matches
|
|
self.max_file_bytes = max_file_bytes
|
|
|
|
async def run(self, args: dict[str, Any]) -> ToolResult:
|
|
query = str(args.get("query") or "")
|
|
raw_path = str(args.get("path") or ".")
|
|
pattern = str(args.get("glob") or "*")
|
|
case_sensitive = bool(args.get("case_sensitive", True))
|
|
max_matches = min(int(args.get("max_matches") or self.max_matches), self.max_matches)
|
|
if not query:
|
|
return ToolResult(ok=False, error="Search query is required")
|
|
approved = bool(args.get("_approved"))
|
|
try:
|
|
root = resolve_workspace_path(self.workspace, ".")
|
|
path = resolve_workspace_path(self.workspace, raw_path, allow_outside=approved)
|
|
except WorkspacePathError as exc:
|
|
path = candidate_path(self.workspace, raw_path)
|
|
return ToolResult(
|
|
ok=False,
|
|
error=f"{exc}. Searching outside workspace requires approval.",
|
|
metadata={
|
|
"path": str(path),
|
|
"requires_approval": True,
|
|
"risk_level": self.risk_level,
|
|
"reason": "Path is outside workspace",
|
|
},
|
|
)
|
|
if not path.exists():
|
|
return ToolResult(ok=False, error=f"Search path not found: {raw_path}")
|
|
|
|
needle = query if case_sensitive else query.lower()
|
|
matches: list[str] = []
|
|
files_scanned = 0
|
|
candidates = [path] if path.is_file() else path.rglob("*")
|
|
for candidate in candidates:
|
|
if len(matches) >= max_matches:
|
|
break
|
|
if not candidate.is_file() or ".git" in candidate.parts:
|
|
continue
|
|
if is_inside_workspace(self.workspace, candidate):
|
|
relative = candidate.relative_to(root).as_posix()
|
|
else:
|
|
relative = str(candidate)
|
|
if not fnmatch(candidate.name, pattern) and not fnmatch(relative, pattern):
|
|
continue
|
|
if candidate.stat().st_size > self.max_file_bytes:
|
|
continue
|
|
files_scanned += 1
|
|
text = candidate.read_text(errors="replace")
|
|
for line_number, line in enumerate(text.splitlines(), start=1):
|
|
haystack = line if case_sensitive else line.lower()
|
|
if needle in haystack:
|
|
matches.append(f"{relative}:{line_number}:{line}")
|
|
if len(matches) >= max_matches:
|
|
break
|
|
|
|
return ToolResult(
|
|
ok=True,
|
|
output="\n".join(matches),
|
|
metadata={
|
|
"path": str(path),
|
|
"query": query,
|
|
"matches": len(matches),
|
|
"files_scanned": files_scanned,
|
|
"truncated": len(matches) >= max_matches,
|
|
},
|
|
)
|