ducklm/duck_core/tools/search_files.py

67 lines
2.6 KiB
Python

from fnmatch import fnmatch
from typing import Any
from duck_core.tools.base import ToolResult
from duck_core.tools.paths import WorkspacePathError, resolve_workspace_path
class SearchFilesTool:
name = "search_files"
risk_level = "low"
def __init__(self, workspace: str, max_matches: int = 100, max_file_bytes: int = 1_000_000):
self.workspace = workspace
self.max_matches = max_matches
self.max_file_bytes = max_file_bytes
async def run(self, args: dict[str, Any]) -> ToolResult:
query = str(args.get("query") or "")
raw_path = str(args.get("path") or ".")
pattern = str(args.get("glob") or "*")
case_sensitive = bool(args.get("case_sensitive", True))
max_matches = min(int(args.get("max_matches") or self.max_matches), self.max_matches)
if not query:
return ToolResult(ok=False, error="Search query is required")
try:
root = resolve_workspace_path(self.workspace, ".")
path = resolve_workspace_path(self.workspace, raw_path)
except WorkspacePathError as exc:
return ToolResult(ok=False, error=str(exc))
if not path.exists():
return ToolResult(ok=False, error=f"Search path not found: {raw_path}")
needle = query if case_sensitive else query.lower()
matches: list[str] = []
files_scanned = 0
candidates = [path] if path.is_file() else path.rglob("*")
for candidate in candidates:
if len(matches) >= max_matches:
break
if not candidate.is_file() or ".git" in candidate.parts:
continue
relative = candidate.relative_to(root).as_posix()
if not fnmatch(candidate.name, pattern) and not fnmatch(relative, pattern):
continue
if candidate.stat().st_size > self.max_file_bytes:
continue
files_scanned += 1
text = candidate.read_text(errors="replace")
for line_number, line in enumerate(text.splitlines(), start=1):
haystack = line if case_sensitive else line.lower()
if needle in haystack:
matches.append(f"{relative}:{line_number}:{line}")
if len(matches) >= max_matches:
break
return ToolResult(
ok=True,
output="\n".join(matches),
metadata={
"path": str(path),
"query": query,
"matches": len(matches),
"files_scanned": files_scanned,
"truncated": len(matches) >= max_matches,
},
)