ducklm/duck_core/tools/search_files.py

86 lines
3.3 KiB
Python

from fnmatch import fnmatch
from typing import Any
from duck_core.tools.base import ToolResult
from duck_core.tools.paths import (
WorkspacePathError,
candidate_path,
is_inside_workspace,
resolve_workspace_path,
)
class SearchFilesTool:
name = "search_files"
risk_level = "low"
def __init__(self, workspace: str, max_matches: int = 100, max_file_bytes: int = 1_000_000):
self.workspace = workspace
self.max_matches = max_matches
self.max_file_bytes = max_file_bytes
async def run(self, args: dict[str, Any]) -> ToolResult:
query = str(args.get("query") or "")
raw_path = str(args.get("path") or ".")
pattern = str(args.get("glob") or "*")
case_sensitive = bool(args.get("case_sensitive", True))
max_matches = min(int(args.get("max_matches") or self.max_matches), self.max_matches)
if not query:
return ToolResult(ok=False, error="Search query is required")
approved = bool(args.get("_approved"))
try:
root = resolve_workspace_path(self.workspace, ".")
path = resolve_workspace_path(self.workspace, raw_path, allow_outside=approved)
except WorkspacePathError as exc:
path = candidate_path(self.workspace, raw_path)
return ToolResult(
ok=False,
error=f"{exc}. Searching outside workspace requires approval.",
metadata={
"path": str(path),
"requires_approval": True,
"risk_level": self.risk_level,
"reason": "Path is outside workspace",
},
)
if not path.exists():
return ToolResult(ok=False, error=f"Search path not found: {raw_path}")
needle = query if case_sensitive else query.lower()
matches: list[str] = []
files_scanned = 0
candidates = [path] if path.is_file() else path.rglob("*")
for candidate in candidates:
if len(matches) >= max_matches:
break
if not candidate.is_file() or ".git" in candidate.parts:
continue
if is_inside_workspace(self.workspace, candidate):
relative = candidate.relative_to(root).as_posix()
else:
relative = str(candidate)
if not fnmatch(candidate.name, pattern) and not fnmatch(relative, pattern):
continue
if candidate.stat().st_size > self.max_file_bytes:
continue
files_scanned += 1
text = candidate.read_text(errors="replace")
for line_number, line in enumerate(text.splitlines(), start=1):
haystack = line if case_sensitive else line.lower()
if needle in haystack:
matches.append(f"{relative}:{line_number}:{line}")
if len(matches) >= max_matches:
break
return ToolResult(
ok=True,
output="\n".join(matches),
metadata={
"path": str(path),
"query": query,
"matches": len(matches),
"files_scanned": files_scanned,
"truncated": len(matches) >= max_matches,
},
)