telegram-cli-bot/bot/tools/file_system_tool.py

804 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
File System Tool - инструмент для работы с файловой системой Linux.
Позволяет AI-агенту выполнять операции с файлами и директориями:
- Чтение файлов (cat)
- Запись файлов
- Копирование (cp)
- Перемещение (mv)
- Удаление (rm)
- Создание директорий (mkdir)
- Список файлов (ls)
- Проверка существования
- Поиск файлов
Инструмент работает от имени пользователя на локальной машине.
"""
import logging
import os
import shutil
import subprocess
import asyncio
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field
from bot.tools import BaseTool, ToolResult, register_tool
logger = logging.getLogger(__name__)
class FileSystemTool(BaseTool):
"""Инструмент для работы с файловой системой."""
name = "file_system_tool"
description = "Работа с файловой системой Linux: чтение/запись файлов, копирование, перемещение, удаление, создание директорий, просмотр списка файлов."
category = "system"
# Безопасные пути - где можно работать
ALLOWED_BASE_PATHS = [
Path.home(), # Домашняя директория
Path("/tmp"),
Path("/var/tmp"),
]
# Опасные пути - куда нельзя записывать/удалять
DANGEROUS_PATHS = [
Path("/"),
Path("/etc"),
Path("/usr"),
Path("/bin"),
Path("/sbin"),
Path("/boot"),
Path("/dev"),
Path("/proc"),
Path("/sys"),
]
def __init__(self):
self._last_operation: Optional[str] = None
self._operation_history: List[Dict] = []
def _is_path_safe(self, path: Path, allow_write: bool = True) -> tuple[bool, str]:
"""
Проверить безопасность пути.
Args:
path: Путь для проверки
allow_write: Если True, проверяем возможность записи
Returns:
(is_safe: bool, reason: str)
"""
try:
# Разрешаем абсолютные и относительные пути
if not path.is_absolute():
path = Path.cwd() / path
# Сначала проверяем на наличие в разрешённых путях (это важно!)
for allowed in self.ALLOWED_BASE_PATHS:
try:
path.relative_to(allowed)
return True, "Путь безопасен"
except ValueError:
pass
# Если путь не в разрешённых - проверяем на опасные
for dangerous in self.DANGEROUS_PATHS:
# Пропускаем корень если путь не в разрешённых уже
if dangerous == Path("/"):
continue
try:
path.relative_to(dangerous)
return False, f"Путь {path} находится в защищённой директории {dangerous}"
except ValueError:
pass
# Если путь не в разрешённых и не в запрещённых - разрешаем с предупреждением
return True, f"Путь {path} может быть недоступен"
except Exception as e:
return False, f"Ошибка проверки пути: {e}"
def _resolve_path(self, path_str: str) -> Path:
"""Преобразовать строку пути в Path объект."""
path = Path(path_str)
# Расширяем ~ в домашнюю директорию
# Важно: Path("~/file") не работает, нужно expanduser()
if path_str.startswith('~'):
path = Path(path_str).expanduser()
elif not path.is_absolute():
# Если путь относительный, делаем его абсолютным от домашней директории
path = Path.home() / path_str
return path
async def read_file(self, path: str, limit: int = 100) -> Dict[str, Any]:
"""
Прочитать файл.
Args:
path: Путь к файлу
limit: Максимальное количество строк для чтения
Returns:
Dict с content, lines, error
"""
try:
file_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(file_path, allow_write=False)
if not is_safe:
return {"error": reason, "success": False}
if not file_path.exists():
return {"error": f"Файл не существует: {file_path}", "success": False}
if not file_path.is_file():
return {"error": f"Не файл: {file_path}", "success": False}
# Читаем файл
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
# Ограничиваем количество строк
if len(lines) > limit:
content = ''.join(lines[:limit])
truncated = True
total_lines = len(lines)
else:
content = ''.join(lines)
truncated = False
total_lines = len(lines)
logger.info(f"Прочитан файл: {file_path} ({total_lines} строк)")
return {
"success": True,
"content": content,
"path": str(file_path),
"lines_read": min(len(lines), limit),
"total_lines": total_lines,
"truncated": truncated
}
except Exception as e:
logger.error(f"Ошибка чтения файла {path}: {e}")
return {"error": str(e), "success": False}
async def write_file(self, path: str, content: str, append: bool = False) -> Dict[str, Any]:
"""
Записать в файл.
Args:
path: Путь к файлу
content: Содержимое для записи
append: Если True, добавить в конец файла
Returns:
Dict с success, path, bytes_written
"""
try:
file_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(file_path, allow_write=True)
if not is_safe:
return {"error": reason, "success": False}
# Создаём родительские директории если нужно
file_path.parent.mkdir(parents=True, exist_ok=True)
# Записываем файл
mode = 'a' if append else 'w'
with open(file_path, mode, encoding='utf-8') as f:
bytes_written = f.write(content)
logger.info(f"Записан файл: {file_path} ({bytes_written} байт)")
return {
"success": True,
"path": str(file_path),
"bytes_written": bytes_written,
"appended": append
}
except Exception as e:
logger.error(f"Ошибка записи файла {path}: {e}")
return {"error": str(e), "success": False}
async def list_directory(self, path: str = ".", show_hidden: bool = False) -> Dict[str, Any]:
"""
Показать список файлов в директории.
Args:
path: Путь к директории
show_hidden: Показывать скрытые файлы
Returns:
Dict с files, directories, total
"""
try:
dir_path = self._resolve_path(path)
is_safe, _ = self._is_path_safe(dir_path, allow_write=False)
if not is_safe:
return {"error": "Доступ к директории ограничен", "success": False}
if not dir_path.exists():
return {"error": f"Директория не существует: {dir_path}", "success": False}
if not dir_path.is_dir():
return {"error": f"Не директория: {dir_path}", "success": False}
files = []
directories = []
for item in dir_path.iterdir():
if not show_hidden and item.name.startswith('.'):
continue
try:
stat = item.stat()
size = stat.st_size
mtime = stat.st_mtime
except:
size = 0
mtime = 0
item_info = {
"name": item.name,
"path": str(item),
"size": size,
"modified": mtime
}
if item.is_file():
files.append(item_info)
elif item.is_dir():
directories.append(item_info)
# Сортируем по имени
files.sort(key=lambda x: x["name"])
directories.sort(key=lambda x: x["name"])
return {
"success": True,
"path": str(dir_path),
"files": files,
"directories": directories,
"total_files": len(files),
"total_dirs": len(directories)
}
except Exception as e:
logger.error(f"Ошибка списка директории {path}: {e}")
return {"error": str(e), "success": False}
async def copy_file(self, source: str, destination: str) -> Dict[str, Any]:
"""
Скопировать файл или директорию.
Args:
source: Исходный путь
destination: Целевой путь
Returns:
Dict с success, source, destination
"""
try:
src_path = self._resolve_path(source)
dst_path = self._resolve_path(destination)
# Проверка безопасности
is_safe_src, reason = self._is_path_safe(src_path, allow_write=False)
if not is_safe_src:
return {"error": f"Источник: {reason}", "success": False}
is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True)
if not is_safe_dst:
return {"error": f"Назначение: {reason}", "success": False}
if not src_path.exists():
return {"error": f"Источник не существует: {src_path}", "success": False}
# Копируем
if src_path.is_file():
shutil.copy2(src_path, dst_path)
else:
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
logger.info(f"Скопировано: {src_path} -> {dst_path}")
return {
"success": True,
"source": str(src_path),
"destination": str(dst_path),
"operation": "copy"
}
except Exception as e:
logger.error(f"Ошибка копирования {source} -> {destination}: {e}")
return {"error": str(e), "success": False}
async def move_file(self, source: str, destination: str) -> Dict[str, Any]:
"""
Переместить файл или директорию.
Args:
source: Исходный путь
destination: Целевой путь
Returns:
Dict с success, source, destination
"""
try:
src_path = self._resolve_path(source)
dst_path = self._resolve_path(destination)
# Проверка безопасности
is_safe_src, reason = self._is_path_safe(src_path, allow_write=False)
if not is_safe_src:
return {"error": f"Источник: {reason}", "success": False}
is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True)
if not is_safe_dst:
return {"error": f"Назначение: {reason}", "success": False}
if not src_path.exists():
return {"error": f"Источник не существует: {src_path}", "success": False}
# Перемещаем
shutil.move(src_path, dst_path)
logger.info(f"Перемещено: {src_path} -> {dst_path}")
return {
"success": True,
"source": str(src_path),
"destination": str(dst_path),
"operation": "move"
}
except Exception as e:
logger.error(f"Ошибка перемещения {source} -> {destination}: {e}")
return {"error": str(e), "success": False}
async def delete(self, path: str, recursive: bool = False) -> Dict[str, Any]:
"""
Удалить файл или директорию.
Args:
path: Путь к файлу/директории
recursive: Если True, удалять рекурсивно
Returns:
Dict с success, path, deleted_count
"""
try:
file_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(file_path, allow_write=True)
if not is_safe:
return {"error": reason, "success": False}
if not file_path.exists():
return {"error": f"Путь не существует: {file_path}", "success": False}
deleted_count = 0
if file_path.is_file():
file_path.unlink()
deleted_count = 1
elif file_path.is_dir():
if recursive:
shutil.rmtree(file_path)
# Считаем количество удалённых файлов
deleted_count = -1 # Неизвестно
else:
return {
"error": "Директория не пуста. Используйте recursive=True для рекурсивного удаления",
"success": False
}
logger.info(f"Удалено: {file_path}")
return {
"success": True,
"path": str(file_path),
"deleted_count": deleted_count,
"operation": "delete"
}
except Exception as e:
logger.error(f"Ошибка удаления {path}: {e}")
return {"error": str(e), "success": False}
async def create_directory(self, path: str, parents: bool = True) -> Dict[str, Any]:
"""
Создать директорию.
Args:
path: Путь к директории
parents: Если True, создавать родительские директории
Returns:
Dict с success, path
"""
try:
dir_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(dir_path, allow_write=True)
if not is_safe:
return {"error": reason, "success": False}
if dir_path.exists():
if dir_path.is_dir():
return {
"success": True,
"path": str(dir_path),
"already_exists": True
}
else:
return {"error": f"Существует файл с таким именем: {dir_path}", "success": False}
# Создаём директорию
dir_path.mkdir(parents=parents, exist_ok=parents)
logger.info(f"Создана директория: {dir_path}")
return {
"success": True,
"path": str(dir_path),
"operation": "mkdir"
}
except Exception as e:
logger.error(f"Ошибка создания директории {path}: {e}")
return {"error": str(e), "success": False}
async def file_info(self, path: str) -> Dict[str, Any]:
"""
Получить информацию о файле/директории.
Args:
path: Путь к файлу
Returns:
Dict с информацией о файле
"""
try:
file_path = self._resolve_path(path)
is_safe, _ = self._is_path_safe(file_path, allow_write=False)
if not is_safe:
return {"error": "Доступ ограничен", "success": False}
if not file_path.exists():
return {"error": f"Путь не существует: {file_path}", "success": False}
stat = file_path.stat()
return {
"success": True,
"path": str(file_path),
"name": file_path.name,
"is_file": file_path.is_file(),
"is_dir": file_path.is_dir(),
"size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"permissions": oct(stat.st_mode)[-3:]
}
except Exception as e:
logger.error(f"Ошибка получения информации о {path}: {e}")
return {"error": str(e), "success": False}
async def search_files(
self,
path: str = ".",
pattern: str = "*",
max_results: int = 50
) -> Dict[str, Any]:
"""
Найти файлы по паттерну.
Args:
path: Директория для поиска
pattern: Паттерн (glob-style)
max_results: Максимум результатов
Returns:
Dict с найденными файлами
"""
try:
base_path = self._resolve_path(path)
is_safe, _ = self._is_path_safe(base_path, allow_write=False)
if not is_safe:
return {"error": "Доступ ограничен", "success": False}
results = []
# Используем glob для поиска
import glob
matches = glob.glob(str(base_path / pattern), recursive=True)
for match in matches[:max_results]:
match_path = Path(match)
try:
stat = match_path.stat()
results.append({
"path": str(match_path),
"name": match_path.name,
"size": stat.st_size,
"is_file": match_path.is_file(),
"is_dir": match_path.is_dir()
})
except:
pass
return {
"success": True,
"pattern": pattern,
"base_path": str(base_path),
"found": len(results),
"results": results,
"truncated": len(matches) > max_results
}
except Exception as e:
logger.error(f"Ошибка поиска файлов {pattern} в {path}: {e}")
return {"error": str(e), "success": False}
async def execute_shell(self, command: str, timeout: int = 30) -> Dict[str, Any]:
"""
Выполнить shell-команду (для сложных операций).
Args:
command: Команда для выполнения
timeout: Таймаут в секундах
Returns:
Dict с stdout, stderr, returncode
"""
try:
# Разрешаем только безопасные команды
SAFE_COMMANDS = [
'ls', 'cat', 'cp', 'mv', 'rm', 'mkdir', 'rmdir',
'touch', 'chmod', 'chown', 'find', 'grep', 'head',
'tail', 'wc', 'sort', 'uniq', 'pwd', 'du', 'df'
]
# Извлекаем базовую команду
base_cmd = command.split()[0] if command.split() else ''
if base_cmd not in SAFE_COMMANDS:
return {
"error": f"Команда '{base_cmd}' не разрешена. Используйте безопасные команды: {SAFE_COMMANDS}",
"success": False
}
# Выполняем команду
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(Path.home())
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
process.kill()
return {
"error": f"Таймаут выполнения команды ({timeout} сек)",
"success": False
}
return {
"success": process.returncode == 0,
"stdout": stdout.decode('utf-8', errors='replace').strip(),
"stderr": stderr.decode('utf-8', errors='replace').strip(),
"returncode": process.returncode,
"command": command
}
except Exception as e:
logger.error(f"Ошибка выполнения команды {command}: {e}")
return {"error": str(e), "success": False}
async def execute(self, operation: str, **kwargs) -> ToolResult:
"""
Выполнить операцию с файловой системой.
Args:
operation: Тип операции (read, write, copy, move, delete, mkdir, list, info, search, shell)
**kwargs: Аргументы операции
Returns:
ToolResult с результатом
"""
logger.info(f"File System Tool: operation={operation}, args={kwargs}")
self._last_operation = operation
try:
result = None
if operation == 'read':
result = await self.read_file(
path=kwargs.get('path', ''),
limit=kwargs.get('limit', 100)
)
elif operation == 'write':
result = await self.write_file(
path=kwargs.get('path', ''),
content=kwargs.get('content', ''),
append=kwargs.get('append', False)
)
elif operation == 'copy':
result = await self.copy_file(
source=kwargs.get('source', ''),
destination=kwargs.get('destination', '')
)
elif operation == 'move':
result = await self.move_file(
source=kwargs.get('source', ''),
destination=kwargs.get('destination', '')
)
elif operation == 'delete':
result = await self.delete(
path=kwargs.get('path', ''),
recursive=kwargs.get('recursive', False)
)
elif operation == 'mkdir':
result = await self.create_directory(
path=kwargs.get('path', ''),
parents=kwargs.get('parents', True)
)
elif operation == 'list':
result = await self.list_directory(
path=kwargs.get('path', '.'),
show_hidden=kwargs.get('show_hidden', False)
)
elif operation == 'info':
result = await self.file_info(
path=kwargs.get('path', '')
)
elif operation == 'search':
result = await self.search_files(
path=kwargs.get('path', '.'),
pattern=kwargs.get('pattern', '*'),
max_results=kwargs.get('max_results', 50)
)
elif operation == 'shell':
result = await self.execute_shell(
command=kwargs.get('command', ''),
timeout=kwargs.get('timeout', 30)
)
else:
return ToolResult(
success=False,
error=f"Неизвестная операция: {operation}. Доступные: read, write, copy, move, delete, mkdir, list, info, search, shell"
)
# Сохраняем в историю
self._operation_history.append({
'operation': operation,
'args': kwargs,
'result': result,
'timestamp': __import__('datetime').datetime.now().isoformat()
})
# Ограничиваем историю
if len(self._operation_history) > 100:
self._operation_history = self._operation_history[-50:]
return ToolResult(
success=result.get('success', False),
data=result,
metadata={
'operation': operation,
'last_path': result.get('path', result.get('source', ''))
}
)
except Exception as e:
logger.exception(f"Ошибка File System Tool: {e}")
return ToolResult(
success=False,
error=str(e),
metadata={'operation': operation}
)
def get_schema(self) -> Dict[str, Any]:
"""Получить схему инструмента для промпта."""
return {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"description": "Тип операции",
"enum": ["read", "write", "copy", "move", "delete", "mkdir", "list", "info", "search", "shell"]
},
"path": {
"type": "string",
"description": "Путь к файлу/директории"
},
"source": {
"type": "string",
"description": "Исходный путь (для copy/move)"
},
"destination": {
"type": "string",
"description": "Целевой путь (для copy/move)"
},
"content": {
"type": "string",
"description": "Содержимое для записи"
},
"pattern": {
"type": "string",
"description": "Паттерн для поиска файлов"
},
"command": {
"type": "string",
"description": "Shell команда (для операции shell)"
},
"limit": {
"type": "integer",
"description": "Лимит строк для чтения"
},
"max_results": {
"type": "integer",
"description": "Максимум результатов поиска"
},
"recursive": {
"type": "boolean",
"description": "Рекурсивное удаление"
},
"show_hidden": {
"type": "boolean",
"description": "Показывать скрытые файлы"
},
"timeout": {
"type": "integer",
"description": "Таймаут для shell команд"
}
},
"required": ["operation"]
}
}
# Автоматическая регистрация при импорте
@register_tool
class FileSystemToolAuto(FileSystemTool):
"""Авто-регистрируемая версия FileSystemTool."""
pass