#!/usr/bin/env python3 """ File System Tool - инструмент для работы с файловой системой Linux. Позволяет AI-агенту выполнять операции с файлами и директориями: - Чтение файлов (cat) - Запись файлов - Копирование (cp) - Перемещение (mv) - Удаление (rm) - Создание директорий (mkdir) - Список файлов (ls) - Проверка существования - Поиск файлов Инструмент работает от имени пользователя на локальной машине. """ import logging import os import shutil import subprocess import asyncio from pathlib import Path from typing import Optional, Dict, Any, List from dataclasses import dataclass, field from bot.tools import BaseTool, ToolResult, register_tool logger = logging.getLogger(__name__) class FileSystemTool(BaseTool): """Инструмент для работы с файловой системой.""" name = "file_system_tool" description = "Работа с файловой системой Linux: чтение/запись файлов, копирование, перемещение, удаление, создание директорий, просмотр списка файлов." category = "system" # Безопасные пути - где можно работать ALLOWED_BASE_PATHS = [ Path.home(), # Домашняя директория Path("/tmp"), Path("/var/tmp"), ] # Опасные пути - куда нельзя записывать/удалять DANGEROUS_PATHS = [ Path("/"), Path("/etc"), Path("/usr"), Path("/bin"), Path("/sbin"), Path("/boot"), Path("/dev"), Path("/proc"), Path("/sys"), ] def __init__(self): self._last_operation: Optional[str] = None self._operation_history: List[Dict] = [] def _is_path_safe(self, path: Path, allow_write: bool = True) -> tuple[bool, str]: """ Проверить безопасность пути. Args: path: Путь для проверки allow_write: Если True, проверяем возможность записи Returns: (is_safe: bool, reason: str) """ try: # Разрешаем абсолютные и относительные пути if not path.is_absolute(): path = Path.cwd() / path # Сначала проверяем на наличие в разрешённых путях (это важно!) for allowed in self.ALLOWED_BASE_PATHS: try: path.relative_to(allowed) return True, "Путь безопасен" except ValueError: pass # Если путь не в разрешённых - проверяем на опасные for dangerous in self.DANGEROUS_PATHS: # Пропускаем корень если путь не в разрешённых уже if dangerous == Path("/"): continue try: path.relative_to(dangerous) return False, f"Путь {path} находится в защищённой директории {dangerous}" except ValueError: pass # Если путь не в разрешённых и не в запрещённых - разрешаем с предупреждением return True, f"Путь {path} может быть недоступен" except Exception as e: return False, f"Ошибка проверки пути: {e}" def _resolve_path(self, path_str: str) -> Path: """Преобразовать строку пути в Path объект.""" path = Path(path_str) # Расширяем ~ в домашнюю директорию # Важно: Path("~/file") не работает, нужно expanduser() if path_str.startswith('~'): path = Path(path_str).expanduser() elif not path.is_absolute(): # Если путь относительный, делаем его абсолютным от домашней директории path = Path.home() / path_str return path async def read_file(self, path: str, limit: int = 100) -> Dict[str, Any]: """ Прочитать файл. Args: path: Путь к файлу limit: Максимальное количество строк для чтения Returns: Dict с content, lines, error """ try: file_path = self._resolve_path(path) # Проверка безопасности is_safe, reason = self._is_path_safe(file_path, allow_write=False) if not is_safe: return {"error": reason, "success": False} if not file_path.exists(): return {"error": f"Файл не существует: {file_path}", "success": False} if not file_path.is_file(): return {"error": f"Не файл: {file_path}", "success": False} # Читаем файл with open(file_path, 'r', encoding='utf-8', errors='replace') as f: lines = f.readlines() # Ограничиваем количество строк if len(lines) > limit: content = ''.join(lines[:limit]) truncated = True total_lines = len(lines) else: content = ''.join(lines) truncated = False total_lines = len(lines) logger.info(f"Прочитан файл: {file_path} ({total_lines} строк)") return { "success": True, "content": content, "path": str(file_path), "lines_read": min(len(lines), limit), "total_lines": total_lines, "truncated": truncated } except Exception as e: logger.error(f"Ошибка чтения файла {path}: {e}") return {"error": str(e), "success": False} async def write_file(self, path: str, content: str, append: bool = False) -> Dict[str, Any]: """ Записать в файл. Args: path: Путь к файлу content: Содержимое для записи append: Если True, добавить в конец файла Returns: Dict с success, path, bytes_written """ try: file_path = self._resolve_path(path) # Проверка безопасности is_safe, reason = self._is_path_safe(file_path, allow_write=True) if not is_safe: return {"error": reason, "success": False} # Создаём родительские директории если нужно file_path.parent.mkdir(parents=True, exist_ok=True) # Записываем файл mode = 'a' if append else 'w' with open(file_path, mode, encoding='utf-8') as f: bytes_written = f.write(content) logger.info(f"Записан файл: {file_path} ({bytes_written} байт)") return { "success": True, "path": str(file_path), "bytes_written": bytes_written, "appended": append } except Exception as e: logger.error(f"Ошибка записи файла {path}: {e}") return {"error": str(e), "success": False} async def list_directory(self, path: str = ".", show_hidden: bool = False) -> Dict[str, Any]: """ Показать список файлов в директории. Args: path: Путь к директории show_hidden: Показывать скрытые файлы Returns: Dict с files, directories, total """ try: dir_path = self._resolve_path(path) is_safe, _ = self._is_path_safe(dir_path, allow_write=False) if not is_safe: return {"error": "Доступ к директории ограничен", "success": False} if not dir_path.exists(): return {"error": f"Директория не существует: {dir_path}", "success": False} if not dir_path.is_dir(): return {"error": f"Не директория: {dir_path}", "success": False} files = [] directories = [] for item in dir_path.iterdir(): if not show_hidden and item.name.startswith('.'): continue try: stat = item.stat() size = stat.st_size mtime = stat.st_mtime except: size = 0 mtime = 0 item_info = { "name": item.name, "path": str(item), "size": size, "modified": mtime } if item.is_file(): files.append(item_info) elif item.is_dir(): directories.append(item_info) # Сортируем по имени files.sort(key=lambda x: x["name"]) directories.sort(key=lambda x: x["name"]) return { "success": True, "path": str(dir_path), "files": files, "directories": directories, "total_files": len(files), "total_dirs": len(directories) } except Exception as e: logger.error(f"Ошибка списка директории {path}: {e}") return {"error": str(e), "success": False} async def copy_file(self, source: str, destination: str) -> Dict[str, Any]: """ Скопировать файл или директорию. Args: source: Исходный путь destination: Целевой путь Returns: Dict с success, source, destination """ try: src_path = self._resolve_path(source) dst_path = self._resolve_path(destination) # Проверка безопасности is_safe_src, reason = self._is_path_safe(src_path, allow_write=False) if not is_safe_src: return {"error": f"Источник: {reason}", "success": False} is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True) if not is_safe_dst: return {"error": f"Назначение: {reason}", "success": False} if not src_path.exists(): return {"error": f"Источник не существует: {src_path}", "success": False} # Копируем if src_path.is_file(): shutil.copy2(src_path, dst_path) else: shutil.copytree(src_path, dst_path, dirs_exist_ok=True) logger.info(f"Скопировано: {src_path} -> {dst_path}") return { "success": True, "source": str(src_path), "destination": str(dst_path), "operation": "copy" } except Exception as e: logger.error(f"Ошибка копирования {source} -> {destination}: {e}") return {"error": str(e), "success": False} async def move_file(self, source: str, destination: str) -> Dict[str, Any]: """ Переместить файл или директорию. Args: source: Исходный путь destination: Целевой путь Returns: Dict с success, source, destination """ try: src_path = self._resolve_path(source) dst_path = self._resolve_path(destination) # Проверка безопасности is_safe_src, reason = self._is_path_safe(src_path, allow_write=False) if not is_safe_src: return {"error": f"Источник: {reason}", "success": False} is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True) if not is_safe_dst: return {"error": f"Назначение: {reason}", "success": False} if not src_path.exists(): return {"error": f"Источник не существует: {src_path}", "success": False} # Перемещаем shutil.move(src_path, dst_path) logger.info(f"Перемещено: {src_path} -> {dst_path}") return { "success": True, "source": str(src_path), "destination": str(dst_path), "operation": "move" } except Exception as e: logger.error(f"Ошибка перемещения {source} -> {destination}: {e}") return {"error": str(e), "success": False} async def delete(self, path: str, recursive: bool = False) -> Dict[str, Any]: """ Удалить файл или директорию. Args: path: Путь к файлу/директории recursive: Если True, удалять рекурсивно Returns: Dict с success, path, deleted_count """ try: file_path = self._resolve_path(path) # Проверка безопасности is_safe, reason = self._is_path_safe(file_path, allow_write=True) if not is_safe: return {"error": reason, "success": False} if not file_path.exists(): return {"error": f"Путь не существует: {file_path}", "success": False} deleted_count = 0 if file_path.is_file(): file_path.unlink() deleted_count = 1 elif file_path.is_dir(): if recursive: shutil.rmtree(file_path) # Считаем количество удалённых файлов deleted_count = -1 # Неизвестно else: return { "error": "Директория не пуста. Используйте recursive=True для рекурсивного удаления", "success": False } logger.info(f"Удалено: {file_path}") return { "success": True, "path": str(file_path), "deleted_count": deleted_count, "operation": "delete" } except Exception as e: logger.error(f"Ошибка удаления {path}: {e}") return {"error": str(e), "success": False} async def create_directory(self, path: str, parents: bool = True) -> Dict[str, Any]: """ Создать директорию. Args: path: Путь к директории parents: Если True, создавать родительские директории Returns: Dict с success, path """ try: dir_path = self._resolve_path(path) # Проверка безопасности is_safe, reason = self._is_path_safe(dir_path, allow_write=True) if not is_safe: return {"error": reason, "success": False} if dir_path.exists(): if dir_path.is_dir(): return { "success": True, "path": str(dir_path), "already_exists": True } else: return {"error": f"Существует файл с таким именем: {dir_path}", "success": False} # Создаём директорию dir_path.mkdir(parents=parents, exist_ok=parents) logger.info(f"Создана директория: {dir_path}") return { "success": True, "path": str(dir_path), "operation": "mkdir" } except Exception as e: logger.error(f"Ошибка создания директории {path}: {e}") return {"error": str(e), "success": False} async def file_info(self, path: str) -> Dict[str, Any]: """ Получить информацию о файле/директории. Args: path: Путь к файлу Returns: Dict с информацией о файле """ try: file_path = self._resolve_path(path) is_safe, _ = self._is_path_safe(file_path, allow_write=False) if not is_safe: return {"error": "Доступ ограничен", "success": False} if not file_path.exists(): return {"error": f"Путь не существует: {file_path}", "success": False} stat = file_path.stat() return { "success": True, "path": str(file_path), "name": file_path.name, "is_file": file_path.is_file(), "is_dir": file_path.is_dir(), "size": stat.st_size, "created": stat.st_ctime, "modified": stat.st_mtime, "permissions": oct(stat.st_mode)[-3:] } except Exception as e: logger.error(f"Ошибка получения информации о {path}: {e}") return {"error": str(e), "success": False} async def search_files( self, path: str = ".", pattern: str = "*", max_results: int = 50 ) -> Dict[str, Any]: """ Найти файлы по паттерну. Args: path: Директория для поиска pattern: Паттерн (glob-style) max_results: Максимум результатов Returns: Dict с найденными файлами """ try: base_path = self._resolve_path(path) is_safe, _ = self._is_path_safe(base_path, allow_write=False) if not is_safe: return {"error": "Доступ ограничен", "success": False} results = [] # Используем glob для поиска import glob matches = glob.glob(str(base_path / pattern), recursive=True) for match in matches[:max_results]: match_path = Path(match) try: stat = match_path.stat() results.append({ "path": str(match_path), "name": match_path.name, "size": stat.st_size, "is_file": match_path.is_file(), "is_dir": match_path.is_dir() }) except: pass return { "success": True, "pattern": pattern, "base_path": str(base_path), "found": len(results), "results": results, "truncated": len(matches) > max_results } except Exception as e: logger.error(f"Ошибка поиска файлов {pattern} в {path}: {e}") return {"error": str(e), "success": False} async def execute_shell(self, command: str, timeout: int = 30) -> Dict[str, Any]: """ Выполнить shell-команду (для сложных операций). Args: command: Команда для выполнения timeout: Таймаут в секундах Returns: Dict с stdout, stderr, returncode """ try: # Разрешаем только безопасные команды SAFE_COMMANDS = [ 'ls', 'cat', 'cp', 'mv', 'rm', 'mkdir', 'rmdir', 'touch', 'chmod', 'chown', 'find', 'grep', 'head', 'tail', 'wc', 'sort', 'uniq', 'pwd', 'du', 'df' ] # Извлекаем базовую команду base_cmd = command.split()[0] if command.split() else '' if base_cmd not in SAFE_COMMANDS: return { "error": f"Команда '{base_cmd}' не разрешена. Используйте безопасные команды: {SAFE_COMMANDS}", "success": False } # Выполняем команду process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(Path.home()) ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) except asyncio.TimeoutError: process.kill() return { "error": f"Таймаут выполнения команды ({timeout} сек)", "success": False } return { "success": process.returncode == 0, "stdout": stdout.decode('utf-8', errors='replace').strip(), "stderr": stderr.decode('utf-8', errors='replace').strip(), "returncode": process.returncode, "command": command } except Exception as e: logger.error(f"Ошибка выполнения команды {command}: {e}") return {"error": str(e), "success": False} async def execute(self, operation: str, **kwargs) -> ToolResult: """ Выполнить операцию с файловой системой. Args: operation: Тип операции (read, write, copy, move, delete, mkdir, list, info, search, shell) **kwargs: Аргументы операции Returns: ToolResult с результатом """ logger.info(f"File System Tool: operation={operation}, args={kwargs}") self._last_operation = operation try: result = None if operation == 'read': result = await self.read_file( path=kwargs.get('path', ''), limit=kwargs.get('limit', 100) ) elif operation == 'write': result = await self.write_file( path=kwargs.get('path', ''), content=kwargs.get('content', ''), append=kwargs.get('append', False) ) elif operation == 'copy': result = await self.copy_file( source=kwargs.get('source', ''), destination=kwargs.get('destination', '') ) elif operation == 'move': result = await self.move_file( source=kwargs.get('source', ''), destination=kwargs.get('destination', '') ) elif operation == 'delete': result = await self.delete( path=kwargs.get('path', ''), recursive=kwargs.get('recursive', False) ) elif operation == 'mkdir': result = await self.create_directory( path=kwargs.get('path', ''), parents=kwargs.get('parents', True) ) elif operation == 'list': result = await self.list_directory( path=kwargs.get('path', '.'), show_hidden=kwargs.get('show_hidden', False) ) elif operation == 'info': result = await self.file_info( path=kwargs.get('path', '') ) elif operation == 'search': result = await self.search_files( path=kwargs.get('path', '.'), pattern=kwargs.get('pattern', '*'), max_results=kwargs.get('max_results', 50) ) elif operation == 'shell': result = await self.execute_shell( command=kwargs.get('command', ''), timeout=kwargs.get('timeout', 30) ) else: return ToolResult( success=False, error=f"Неизвестная операция: {operation}. Доступные: read, write, copy, move, delete, mkdir, list, info, search, shell" ) # Сохраняем в историю self._operation_history.append({ 'operation': operation, 'args': kwargs, 'result': result, 'timestamp': __import__('datetime').datetime.now().isoformat() }) # Ограничиваем историю if len(self._operation_history) > 100: self._operation_history = self._operation_history[-50:] return ToolResult( success=result.get('success', False), data=result, metadata={ 'operation': operation, 'last_path': result.get('path', result.get('source', '')) } ) except Exception as e: logger.exception(f"Ошибка File System Tool: {e}") return ToolResult( success=False, error=str(e), metadata={'operation': operation} ) def get_schema(self) -> Dict[str, Any]: """Получить схему инструмента для промпта.""" return { "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": { "operation": { "type": "string", "description": "Тип операции", "enum": ["read", "write", "copy", "move", "delete", "mkdir", "list", "info", "search", "shell"] }, "path": { "type": "string", "description": "Путь к файлу/директории" }, "source": { "type": "string", "description": "Исходный путь (для copy/move)" }, "destination": { "type": "string", "description": "Целевой путь (для copy/move)" }, "content": { "type": "string", "description": "Содержимое для записи" }, "pattern": { "type": "string", "description": "Паттерн для поиска файлов" }, "command": { "type": "string", "description": "Shell команда (для операции shell)" }, "limit": { "type": "integer", "description": "Лимит строк для чтения" }, "max_results": { "type": "integer", "description": "Максимум результатов поиска" }, "recursive": { "type": "boolean", "description": "Рекурсивное удаление" }, "show_hidden": { "type": "boolean", "description": "Показывать скрытые файлы" }, "timeout": { "type": "integer", "description": "Таймаут для shell команд" } }, "required": ["operation"] } } # Автоматическая регистрация при импорте @register_tool class FileSystemToolAuto(FileSystemTool): """Авто-регистрируемая версия FileSystemTool.""" pass