#!/usr/bin/env python3 """ SSH Executor Tool - инструмент для выполнения команд на серверах по SSH. Бот может использовать этот инструмент автономно для выполнения системных задач на серверах пользователя. """ import logging import asyncio from pathlib import Path from typing import Optional, Dict, Any, List from dataclasses import dataclass import asyncssh from bot.tools import BaseTool, ToolResult, register_tool logger = logging.getLogger(__name__) @dataclass class ServerConfig: """Конфигурация сервера для SSH.""" host: str port: int username: str password: Optional[str] = None client_keys: Optional[List[str]] = None class SSHExecutorTool(BaseTool): """Инструмент для выполнения SSH-команд.""" name = "ssh_executor" description = "Выполнение команд на удалённых серверах по SSH. Используется для системных задач: мониторинг, управление сервисами, просмотр логов." category = "system" def __init__(self): # Серверы по умолчанию (можно расширять) self.servers: Dict[str, ServerConfig] = { 'home': ServerConfig( host='192.168.1.54', port=22, username='mirivlad', password='moloko22' ) } self._last_connection: Optional[asyncssh.SSHClientConnection] = None self._last_server: Optional[str] = None async def _connect(self, server_name: str = 'home') -> asyncssh.SSHClientConnection: """Подключиться к серверу.""" if server_name not in self.servers: raise ValueError(f"Сервер '{server_name}' не найден. Доступные: {list(self.servers.keys())}") config = self.servers[server_name] # Проверяем существующее подключение if self._last_connection and self._last_server == server_name: if not self._last_connection.is_connected(): self._last_connection = None else: return self._last_connection logger.info(f"Подключение к серверу {server_name} ({config.host})") try: # Пробуем подключение с паролем connect_kwargs = { 'host': config.host, 'port': config.port, 'username': config.username, 'known_hosts': None, # Отключаем проверку known_hosts для простоты } if config.password: connect_kwargs['password'] = config.password if config.client_keys: connect_kwargs['client_keys'] = config.client_keys self._last_connection = await asyncssh.connect(**connect_kwargs) self._last_server = server_name logger.info(f"✅ Подключено к {server_name}") return self._last_connection except Exception as e: logger.error(f"Ошибка подключения к {server_name}: {e}") raise async def execute_command( self, command: str, server: str = 'home', timeout: int = 30 ) -> Dict[str, Any]: """ Выполнить команду на сервере. Args: command: Команда для выполнения server: Имя сервера из конфигурации timeout: Таймаут выполнения в секундах Returns: Dict с полями: stdout, stderr, returncode, exit_status """ try: conn = await self._connect(server) logger.info(f"Выполнение команды на {server}: {command}") result = await asyncio.wait_for( conn.run(command, check=False), timeout=timeout ) return { 'stdout': result.stdout.strip() if result.stdout else '', 'stderr': result.stderr.strip() if result.stderr else '', 'returncode': result.returncode, 'exit_status': result.exit_status, 'server': server, 'command': command } except asyncio.TimeoutError: logger.error(f"Таймаут выполнения команды: {command}") return { 'stdout': '', 'stderr': f'Таймаут выполнения команды ({timeout} сек)', 'returncode': -1, 'exit_status': 'timeout', 'server': server, 'command': command } except Exception as e: logger.error(f"Ошибка выполнения команды: {e}") return { 'stdout': '', 'stderr': str(e), 'returncode': -1, 'exit_status': 'error', 'server': server, 'command': command } async def execute(self, command: str, server: str = 'home', timeout: int = 30) -> ToolResult: """ Выполнить SSH-команду. Args: command: Команда для выполнения server: Имя сервера (default: 'home') timeout: Таймаут в секундах (default: 30) """ if not command or not command.strip(): return ToolResult( success=False, error="Пустая команда" ) logger.info(f"SSH Executor: server={server}, command={command[:100]}") try: result = await self.execute_command(command, server, timeout) # Формируем красивый вывод output = self._format_output(result) return ToolResult( success=result['returncode'] == 0, data=result, metadata={ 'server': server, 'command': command, 'returncode': result['returncode'] } ) except Exception as e: logger.exception(f"Ошибка SSH Executor: {e}") return ToolResult( success=False, error=str(e), metadata={'server': server, 'command': command} ) def _format_output(self, result: Dict[str, Any]) -> str: """Форматировать вывод команды.""" output = [] if result['stdout']: output.append(f"**Вывод:**\n```\n{result['stdout']}\n```") if result['stderr']: output.append(f"**Ошибки:**\n```\n{result['stderr']}\n```") if result['returncode'] != 0: output.append(f"**Код возврата:** {result['returncode']}") return "\n".join(output) if output else "Команда выполнена без вывода" def add_server(self, name: str, host: str, port: int, username: str, password: Optional[str] = None, client_keys: Optional[List[str]] = None): """Добавить сервер в конфигурацию.""" self.servers[name] = ServerConfig( host=host, port=port, username=username, password=password, client_keys=client_keys ) logger.info(f"Добавлен сервер: {name} ({host})") def list_servers(self) -> List[str]: """Получить список доступных серверов.""" return list(self.servers.keys()) # Автоматическая регистрация при импорте @register_tool class SSHExecutorToolAuto(SSHExecutorTool): """Авто-регистрируемая версия SSHExecutorTool.""" pass