From 9f906af400d8859700dc201facde06028df9ec9e Mon Sep 17 00:00:00 2001 From: mirivlad Date: Sun, 1 Mar 2026 19:18:17 +0800 Subject: [PATCH] =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=81=D0=B8=D1=8F=200.8.0=20?= =?UTF-8?q?-=20=D0=98=D1=81=D0=BF=D1=80=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5=20SSH=20=D0=B8=20=D0=BA=D0=BE=D0=BC=D0=B0=D0=BD?= =?UTF-8?q?=D0=B4=D0=B0=20/restart=5Fbot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Основные изменения: - Исправлено чтение вывода SSH команд (wait_and_read_ssh вместо цикла с таймаутом) - Добавлена команда /restart_bot для перезапуска бота через sudo - Пароль sudo запрашивается у пользователя (ИИ отключается на время ввода) - После перезапуска бот отправляет уведомление с главным меню - Улучшена обработка stdout/stderr в SSH инструменте Исправленные проблемы: - SSH команды не возвращали вывод (returncode был None до завершения процесса) - Использован подход с параллельным чтением потоков и process.wait() - Команда /restart_bot использует script для создания PTY Co-authored-by: Qwen-Coder --- README.md | 2 +- bot.py | 209 +++++++++++++++++++++++++++++++++------ bot/ai_agent.py | 30 ++++-- bot/models/user_state.py | 3 + bot/tools/ssh_tool.py | 145 +++++++++++++++++++++------ bot/utils/ssh_readers.py | 118 ++++++++++++++++++++-- 6 files changed, 428 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 2ecf125..a061276 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Бот для выполнения CLI команд на вашем ПК через Telegram с многоуровневым меню и гибкой настройкой. -**Версия:** 0.7.0 +**Версия:** 0.8.0 ## Возможности diff --git a/bot.py b/bot.py index 0142c85..7a2cf7e 100644 --- a/bot.py +++ b/bot.py @@ -74,7 +74,7 @@ from bot.models.server import Server from bot.models.session import SSHSession, SSHSessionManager, LocalSession, LocalSessionManager, INPUT_PATTERNS from bot.utils.cleaners import clean_ansi_codes, normalize_output from bot.utils.formatters import escape_markdown, split_message, send_long_message, format_long_output, MAX_MESSAGE_LENGTH -from bot.utils.ssh_readers import detect_input_type, read_ssh_output, read_pty_output +from bot.utils.ssh_readers import detect_input_type, read_ssh_output, read_pty_output, wait_and_read_ssh from bot.utils.decorators import check_access from bot.keyboards.menus import MenuItem, init_menus @@ -133,6 +133,11 @@ async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE await handle_local_session_input(update, text, local_session) return + # Проверка: не ждём ли пароль для перезапуска бота + if state.waiting_for_restart_password: + await handle_restart_password(update, text) + return + # ПРОВЕРКА: режим чата с ИИ агентом if state.ai_chat_mode: logger.info(f"Пользователь {user_id} отправил задачу ИИ: {text}") @@ -663,7 +668,7 @@ async def format_tool_result(tool_name: str, result: 'ToolResult') -> str: return f"RSS: {result.data}" - elif tool_name == 'ssh_executor': + elif tool_name == 'ssh_tool': if not result.success: return f"❌ **Ошибка SSH:**\n```\n{result.error}\n```" @@ -1288,10 +1293,10 @@ async def _execute_composite_command_ssh(update: Update, command: str, server: S logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") conn = await asyncssh.connect(**connect_kwargs) - + # Выполнение команды с cd в рабочую директорию full_command = f"cd {working_dir} && {command_with_pwd}" if working_dir else command_with_pwd - + # Создаем интерактивный процесс с PTY для поддержки ввода # TERM环境变量设置 для корректной кодировки process = await conn.create_process( @@ -1299,7 +1304,7 @@ async def _execute_composite_command_ssh(update: Update, command: str, server: S term_type='xterm-256color', env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} ) - + # Создаём сессию session = ssh_session_manager.create_session( user_id=user_id, @@ -1309,19 +1314,13 @@ async def _execute_composite_command_ssh(update: Update, command: str, server: S process=process, command=command ) - - # Читаем начальный вывод - output, is_done = await read_ssh_output(process, timeout=3.0) + + # Читаем вывод с ожиданием завершения процесса + # wait_and_read_ssh решает проблему с returncode, который доступен только после завершения + output, error_output, returncode = await wait_and_read_ssh(process, timeout=30.0) session.output_buffer = output session.last_activity = datetime.now() - # Читаем пока процесс не завершится - while not is_done: - more_output, is_done = await read_ssh_output(process, timeout=2.0) - output += more_output - session.output_buffer = output - session.last_activity = datetime.now() - # Проверяем тип ввода input_type = detect_input_type(output) @@ -1359,7 +1358,7 @@ async def _execute_composite_command_ssh(update: Update, command: str, server: S output = '\n'.join(lines[:-1]) ssh_session_manager.close_session(user_id) - await _show_result_message(update, command, output, "", 0) + await _show_result_message(update, command, output, error_output, returncode) return except asyncssh.Error as e: @@ -1520,7 +1519,7 @@ async def _execute_local_command_interactive(update: Update, command: str, worki async def _execute_ssh_command_message(update: Update, command: str, server: Server, working_dir: str): """Выполнение команды через SSH из сообщения с интерактивной сессией.""" user_id = update.effective_user.id - + try: client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None @@ -1541,10 +1540,10 @@ async def _execute_ssh_command_message(update: Update, command: str, server: Ser logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") conn = await asyncssh.connect(**connect_kwargs) - + # Выполнение команды с cd в рабочую директорию full_command = f"cd {working_dir} && {command}" if working_dir else command - + # Создаем интерактивный процесс с PTY для поддержки ввода # TERM环境变量设置 для корректной кодировки process = await conn.create_process( @@ -1552,7 +1551,7 @@ async def _execute_ssh_command_message(update: Update, command: str, server: Ser term_type='xterm-256color', env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} ) - + # Создаём сессию session = ssh_session_manager.create_session( user_id=user_id, @@ -1562,19 +1561,13 @@ async def _execute_ssh_command_message(update: Update, command: str, server: Ser process=process, command=command ) - - # Читаем начальный вывод - output, is_done = await read_ssh_output(process, timeout=3.0) + + # Читаем вывод с ожиданием завершения процесса + # wait_and_read_ssh решает проблему с returncode, который доступен только после завершения + output, error_output, returncode = await wait_and_read_ssh(process, timeout=30.0) session.output_buffer = output session.last_activity = datetime.now() - # Читаем пока процесс не завершится - while not is_done: - more_output, is_done = await read_ssh_output(process, timeout=2.0) - output += more_output - session.output_buffer = output - session.last_activity = datetime.now() - # Проверяем тип ввода input_type = detect_input_type(output) @@ -1605,7 +1598,7 @@ async def _execute_ssh_command_message(update: Update, command: str, server: Ser else: # Команда завершена, показываем результат ssh_session_manager.close_session(user_id) - await _show_result_message(update, command, output, "", 0) + await _show_result_message(update, command, output, error_output, returncode) return except asyncssh.Error as e: @@ -1718,6 +1711,7 @@ async def post_init(application: Application): BotCommand("settings", "Настройки"), BotCommand("cron", "Управление задачами"), BotCommand("stop", "Прервать SSH-сессию"), + BotCommand("restart_bot", "Перезапустить бота"), BotCommand("ai_presets", "🎛️ Выбор AI-провайдера"), BotCommand("ai_off", "⌨️ ИИ Отключен (CLI режим)"), BotCommand("ai_qwen", "💻 Qwen Code (бесплатно)"), @@ -1746,6 +1740,49 @@ async def post_init(application: Application): logger.warning("⚠️ Cron инструмент не найден, планировщик не запущен") logger.info("Бот инициализирован") + + # Проверяем, был ли запрошен перезапуск пользователем + await check_restart_and_notify(application) + + +async def check_restart_and_notify(application): + """Проверить файл перезапуска и отправить уведомление пользователю.""" + import os + import json + + restart_file = "/tmp/telegram_bot_restart.json" + + try: + if os.path.exists(restart_file): + with open(restart_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + user_id = data.get('user_id') + + if user_id: + logger.info(f"📢 Отправка уведомления о запуске пользователю {user_id}") + + # Получаем состояние пользователя + state = state_manager.get(user_id) + + # Показываем текущую директорию и сервер + working_dir = state.working_directory or config.working_directory + server = server_manager.get(state.current_server) + server_desc = server.description if server else state.current_server + + # Отправляем сообщение напрямую + await application.bot.send_message( + chat_id=user_id, + text=f"✅ Бот перезапущен!\n\n🖥️ Сервер: {server_desc}\n📁 Директория: {working_dir}\n\nГотов к работе! Отправьте команду или выберите действие в меню:", + reply_markup=menu_builder.get_keyboard("main", user_id=user_id, state=state) + ) + logger.info(f"✅ Уведомление отправлено пользователю {user_id}") + + # Удаляем файл + os.remove(restart_file) + + except Exception as e: + logger.error(f"❌ Ошибка при отправке уведомления о перезапуске: {e}") async def send_cron_notification(user_id: int, message: str): @@ -1797,6 +1834,115 @@ async def stop_command(update: Update, context: ContextTypes.DEFAULT_TYPE): +# ============================================ +# КОМАНДА ПЕРЕЗАПУСКА БОТА +# ============================================ + +@check_access +async def restart_bot_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Обработка команды /restart_bot - перезапуск бота через systemctl.""" + user_id = update.effective_user.id + state = state_manager.get(user_id) + + # Устанавливаем флаг ожидания пароля + state.waiting_for_restart_password = True + + # Отключаем ИИ на время ввода пароля + state.ai_chat_mode = False + + await update.message.reply_text( + "🔄 **Перезапуск бота**\n\n" + "Для перезапуска требуется ввести пароль `sudo`.\n\n" + "🔐 *Отправьте пароль в чат:*\n\n" + "_После ввода пароль будет использован для команды:_\n" + "`sudo systemctl restart telegram-bot`\n\n" + "⚠️ *Бот будет перезапущен, соединение прервётся.*", + parse_mode="Markdown" + ) + + +async def handle_restart_password(update: Update, text: str): + """Обработка пароля для перезапуска бота.""" + user_id = update.effective_user.id + state = state_manager.get(user_id) + password = text.strip() + + logger.info(f"Пользователь {user_id} ввёл пароль для перезапуска бота") + + # Сбрасываем флаг + state.waiting_for_restart_password = False + + try: + # Сохраняем user_id в файл для уведомления после перезапуска + import json + import os + restart_file = "/tmp/telegram_bot_restart.json" + with open(restart_file, 'w', encoding='utf-8') as f: + json.dump({'user_id': user_id}, f, ensure_ascii=False) + + # Отправляем сообщение о начале перезапуска + await update.message.reply_text( + "⏳ *Выполнение перезапуска...*\n\n" + f"Пароль принят, выполняю команду.\n\n" + "_Бот будет недоступен несколько секунд._", + parse_mode="Markdown" + ) + + # Создаём временный скрипт с паролем + import tempfile + script_file = tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) + # Используем script для создания псевдо-терминала + script_file.write(f"""#!/bin/bash +printf '%s\\n' '{password}' | sudo -S systemctl restart telegram-bot +""") + script_file.close() + os.chmod(script_file.name, 0o755) + + # Запускаем через script для PTY + process = await asyncio.create_subprocess_exec( + 'script', '-q', '-c', f'/bin/bash {script_file.name}', '/dev/null', + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=15) + + # Удаляем скрипт + os.remove(script_file.name) + + if process.returncode == 0: + logger.info(f"Бот успешно перезапущен пользователем {user_id}") + else: + error_msg = stderr.decode('utf-8', errors='replace').strip() + logger.error(f"Ошибка перезапуска: {error_msg}") + # Удаляем файл если ошибка + if os.path.exists(restart_file): + os.remove(restart_file) + await update.message.reply_text( + f"❌ *Ошибка перезапуска:*\n```\n{error_msg}\n```", + parse_mode="Markdown" + ) + + except asyncio.TimeoutError: + logger.error("Таймаут при перезапуске бота") + await update.message.reply_text( + "❌ *Ошибка*\n\n" + "Таймаут выполнения команды перезапуска.", + parse_mode="Markdown" + ) + if os.path.exists("/tmp/telegram_bot_restart.json"): + os.remove("/tmp/telegram_bot_restart.json") + except Exception as e: + logger.exception(f"Ошибка при перезапуске бота: {e}") + await update.message.reply_text( + "❌ *Ошибка*\n\n" + f"```\n{str(e)}\n```", + parse_mode="Markdown" + ) + if os.path.exists("/tmp/telegram_bot_restart.json"): + os.remove("/tmp/telegram_bot_restart.json") + + # ============================================ # КОМАНДЫ ДЛЯ РАБОТЫ С QWEN CODE (ИИ) # ============================================ @@ -2124,6 +2270,7 @@ def main(): application.add_handler(CommandHandler("rss", rss_command)) application.add_handler(CommandHandler("menu", menu_command)) application.add_handler(CommandHandler("stop", stop_command)) + application.add_handler(CommandHandler("restart_bot", restart_bot_command)) application.add_handler(CommandHandler("memory", memory_command)) application.add_handler(CommandHandler("compact", compact_command)) application.add_handler(CommandHandler("facts", facts_command)) diff --git a/bot/ai_agent.py b/bot/ai_agent.py index 677e707..36bf13a 100644 --- a/bot/ai_agent.py +++ b/bot/ai_agent.py @@ -7,6 +7,7 @@ AI Agent Module - автономный агент с инструментами. """ import logging +import re from typing import Optional, List, Dict, Any from dataclasses import dataclass from datetime import datetime @@ -97,8 +98,11 @@ class AIAgent: score = 0.0 # Прямые триггеры — высокий приоритет + # Используем паттерн с границами для избежания частичных совпадений for trigger in self.SEARCH_TRIGGERS: - if trigger in message_lower: + escaped_trigger = re.escape(trigger) + pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])' + if re.search(pattern, message_lower): return True, 0.9 # Вопросы с "что", "как", "где", "когда" о внешних фактах @@ -121,7 +125,7 @@ class AIAgent: def _should_read_rss(self, message: str) -> tuple[bool, float]: """Проверить, нужно ли читать RSS ленты. - + ВАЖНО: Используем ТОЛЬКО полные фразы-триггеры. Отдельные слова (типа "новости") НЕ активируют RSS — это предотвращает ложные срабатывания когда пользователь просто упоминает слово в контексте. @@ -129,8 +133,13 @@ class AIAgent: message_lower = message.lower() # Только прямые фразы-триггеры — высокий порог + # Проверяем чтобы триггер был словом/фразой в контексте, а не частью слова for trigger in self.RSS_TRIGGERS: - if trigger in message_lower: + escaped_trigger = re.escape(trigger) + # Паттерн: начало строки ИЛИ пробел/знак препинания перед триггером, + # и конец строки ИЛИ пробел/знак препинания после + pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])' + if re.search(pattern, message_lower): return True, 0.95 # Отдельные ключевые слова НЕ проверяем — только явные фразы! @@ -147,8 +156,11 @@ class AIAgent: score = 0.0 # Прямые триггеры + # Используем паттерн с границами для избежания частичных совпадений for trigger in self.SSH_TRIGGERS: - if trigger in message_lower: + escaped_trigger = re.escape(trigger) + pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])' + if re.search(pattern, message_lower): return True, 0.9 # Команды системного администрирования @@ -175,8 +187,11 @@ class AIAgent: score = 0.0 # Прямые триггеры + # Используем паттерн с границами для избежания частичных совпадений for trigger in self.CRON_TRIGGERS: - if trigger in message_lower: + escaped_trigger = re.escape(trigger) + pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])' + if re.search(pattern, message_lower): return True, 0.85 # Расписания @@ -197,8 +212,11 @@ class AIAgent: score = 0.0 # Прямые триггеры + # Используем паттерн с границами для избежания частичных совпадений for trigger in self.FILE_SYSTEM_TRIGGERS: - if trigger in message_lower: + escaped_trigger = re.escape(trigger) + pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])' + if re.search(pattern, message_lower): return True, 0.9 # Операции с файлами diff --git a/bot/models/user_state.py b/bot/models/user_state.py index 4d23e77..53c614d 100644 --- a/bot/models/user_state.py +++ b/bot/models/user_state.py @@ -39,6 +39,9 @@ class UserState: output_next_index: Optional[int] = None # Индекс следующего сообщения для отправки output_text: Optional[str] = None # Текст для продолжения отправки output_parse_mode: Optional[str] = None # Parse mode для продолжения + + # Для команды /restart_bot + waiting_for_restart_password: bool = False # Ожидание пароля sudo для перезапуска class StateManager: diff --git a/bot/tools/ssh_tool.py b/bot/tools/ssh_tool.py index 00ef79c..7f06b11 100644 --- a/bot/tools/ssh_tool.py +++ b/bot/tools/ssh_tool.py @@ -50,31 +50,31 @@ class SSHExecutorTool(BaseTool): self.servers: Dict[str, ServerConfig] = {} self._last_connection: Optional[asyncssh.SSHClientConnection] = None self._last_server: Optional[str] = None - + self._load_servers_from_env() - + def _load_servers_from_env(self): """ Загрузить конфигурацию серверов из .env. - + Формат в .env: SERVERS=name|host|port|user|tag|password - + Пример: SERVERS=tomas|192.168.1.54|22|mirivlad|web|moloko22 """ servers_str = os.getenv('SERVERS', '') - + if not servers_str.strip(): logger.warning("SERVERS не найден в .env, SSH инструмент не будет работать") return - + # Парсим формат: name|host|port|user|tag|password parts = servers_str.strip().split('|') - + if len(parts) >= 6: name, host, port, user, tag, password = parts[:6] - + self.servers[name.strip()] = ServerConfig( host=host.strip(), port=int(port.strip()), @@ -89,17 +89,38 @@ class SSHExecutorTool(BaseTool): async def _connect(self, server_name: str = 'home') -> asyncssh.SSHClientConnection: """Подключиться к серверу.""" + logger.debug(f"🔍 [SSH._connect] Запрос подключения: server_name='{server_name}'") + logger.debug(f"🔍 [SSH._connect] Доступные серверы: {list(self.servers.keys())}") + if server_name not in self.servers: + logger.error(f"❌ [SSH._connect] Сервер '{server_name}' не найден!") raise ValueError(f"Сервер '{server_name}' не найден. Доступные: {list(self.servers.keys())}") config = self.servers[server_name] + logger.debug(f"🔍 [SSH._connect] Конфигурация сервера {server_name}:") + logger.debug(f" host={config.host}, port={config.port}, username={config.username}") + logger.debug(f" password={'***' if config.password else 'None'}, client_keys={config.client_keys}") # Проверяем существующее подключение + logger.debug(f"🔍 [SSH._connect] Проверка существующего подключения:") + logger.debug(f" _last_connection={self._last_connection}") + logger.debug(f" _last_server={self._last_server}") + if self._last_connection and self._last_server == server_name: - if not self._last_connection.is_connected(): + logger.debug(f"🔍 [SSH._connect] Найдено существующее подключение, проверка статуса...") + try: + # Проверяем transport для проверки активности подключения + if self._last_connection.transport is None or not self._last_connection.transport.is_active(): + logger.debug(f"⚠️ [SSH._connect] Подключение не активно, будет создано новое") + self._last_connection = None + else: + logger.debug(f"✅ [SSH._connect] Используем существующее активное подключение") + return self._last_connection + except Exception as e: + logger.debug(f"⚠️ [SSH._connect] Ошибка проверки подключения: {e}, создаём новое") self._last_connection = None - else: - return self._last_connection + else: + logger.debug(f"ℹ️ [SSH._connect] Существующего подключения нет, создаём новое") logger.info(f"Подключение к серверу {server_name} ({config.host})") @@ -114,18 +135,24 @@ class SSHExecutorTool(BaseTool): if config.password: connect_kwargs['password'] = config.password + logger.debug(f"🔍 [SSH._connect] Используем парольную аутентификацию") if config.client_keys: connect_kwargs['client_keys'] = config.client_keys + logger.debug(f"🔍 [SSH._connect] Используем ключевую аутентификацию: {config.client_keys}") + logger.debug(f"🔍 [SSH._connect] Вызов asyncssh.connect с параметрами: {connect_kwargs.keys()}") + self._last_connection = await asyncssh.connect(**connect_kwargs) self._last_server = server_name logger.info(f"✅ Подключено к {server_name}") + logger.debug(f"🔍 [SSH._connect] Подключение успешно: {self._last_connection}") return self._last_connection except Exception as e: - logger.error(f"Ошибка подключения к {server_name}: {e}") + logger.error(f"❌ [SSH._connect] Ошибка подключения к {server_name}: {e}") + logger.exception(f"🔍 [SSH._connect] Exception details:") raise async def execute_command( @@ -145,27 +172,79 @@ class SSHExecutorTool(BaseTool): Returns: Dict с полями: stdout, stderr, returncode, exit_status """ + logger.debug(f"🔍 [SSH.execute_command] START: server={server}, command={command[:50]}...") + try: + logger.debug(f"🔍 [SSH.execute_command] Вызов _connect(server='{server}')") conn = await self._connect(server) + logger.debug(f"✅ [SSH.execute_command] Подключение успешно: {conn}") logger.info(f"Выполнение команды на {server}: {command}") + logger.debug(f"🔍 [SSH.execute_command] Создание процесса с командой: {command}") - result = await asyncio.wait_for( - conn.run(command, check=False), - timeout=timeout + # Используем create_process для корректной работы с shell-командами + process = await conn.create_process( + command, + term_type='xterm-256color', + env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} ) + logger.debug(f"🔍 [SSH.execute_command] Процесс создан: {process}") + + # Читаем вывод с таймаутом + output = "" + error_output = "" + + try: + logger.debug(f"🔍 [SSH.execute_command] Чтение stdout (timeout={timeout})") + # Читаем stdout + stdout_data = await asyncio.wait_for( + process.stdout.read(), + timeout=timeout + ) + output = stdout_data.strip() if stdout_data else '' + logger.debug(f"🔍 [SSH.execute_command] stdout получен: {len(output)} bytes") + + # Читаем stderr + try: + logger.debug(f"🔍 [SSH.execute_command] Чтение stderr (timeout={timeout//2})") + stderr_data = await asyncio.wait_for( + process.stderr.read(), + timeout=timeout // 2 + ) + error_output = stderr_data.strip() if stderr_data else '' + logger.debug(f"🔍 [SSH.execute_command] stderr получен: {len(error_output)} bytes") + except asyncio.TimeoutError: + logger.debug(f"⚠️ [SSH.execute_command] Таймаут чтения stderr") + pass + + except asyncio.TimeoutError: + logger.error(f"🔍 [SSH.execute_command] Таймаут выполнения команды: {command}") + return { + 'stdout': '', + 'stderr': f'Таймаут выполнения команды ({timeout} сек)', + 'returncode': -1, + 'exit_status': 'timeout', + 'server': server, + 'command': command + } + + logger.debug(f"🔍 [SSH.execute_command] Ожидание завершения процесса (returncode)") + # Ждём завершения процесса и получаем код возврата + returncode = await process.wait() + logger.debug(f"✅ [SSH.execute_command] Процесс завершён, returncode={returncode}") return { - 'stdout': result.stdout.strip() if result.stdout else '', - 'stderr': result.stderr.strip() if result.stderr else '', - 'returncode': result.returncode, - 'exit_status': result.exit_status, + 'stdout': output, + 'stderr': error_output, + 'returncode': returncode, + 'exit_status': returncode, 'server': server, 'command': command } except asyncio.TimeoutError: - logger.error(f"Таймаут выполнения команды: {command}") + logger.error(f"🔍 [SSH.execute_command] asyncio.TimeoutError: {command}") + logger.exception(f"🔍 [SSH.execute_command] Timeout details:") return { 'stdout': '', 'stderr': f'Таймаут выполнения команды ({timeout} сек)', @@ -176,7 +255,8 @@ class SSHExecutorTool(BaseTool): } except Exception as e: - logger.error(f"Ошибка выполнения команды: {e}") + logger.error(f"❌ [SSH.execute_command] Ошибка выполнения команды: {e}") + logger.exception(f"🔍 [SSH.execute_command] Exception details:") return { 'stdout': '', 'stderr': str(e), @@ -195,29 +275,37 @@ class SSHExecutorTool(BaseTool): server: Имя сервера (default: первый из .env) timeout: Таймаут в секундах (default: 30) """ + logger.debug(f"🔍 [SSH.execute] ВЫЗОВ: command={command[:50]}..., server={server}, timeout={timeout}") + if not command or not command.strip(): + logger.debug(f"⚠️ [SSH.execute] Пустая команда!") return ToolResult( success=False, error="Пустая команда" ) - + # Если сервер не указан - используем первый из конфигурации if server is None: if not self.servers: + logger.debug(f"⚠️ [SSH.execute] Серверы не настроены!") return ToolResult( success=False, error="Серверы не настроены. Проверьте SERVERS в .env" ) server = list(self.servers.keys())[0] logger.info(f"Сервер не указан, используем первый: {server}") + logger.debug(f"🔍 [SSH.execute] Выбран сервер по умолчанию: {server}") logger.info(f"SSH Executor: server={server}, command={command[:100]}") + logger.debug(f"🔍 [SSH.execute] Вызов execute_command(server={server}, command={command[:50]}...)") try: result = await self.execute_command(command, server, timeout) + logger.debug(f"🔍 [SSH.execute] Результат execute_command: returncode={result['returncode']}") # Формируем красивый вывод output = self._format_output(result) + logger.debug(f"🔍 [SSH.execute] Вывод сформирован: {len(output)} chars") return ToolResult( success=result['returncode'] == 0, @@ -230,7 +318,7 @@ class SSHExecutorTool(BaseTool): ) except Exception as e: - logger.exception(f"Ошибка SSH Executor: {e}") + logger.exception(f"❌ [SSH.execute] Ошибка SSH Executor: {e}") return ToolResult( success=False, error=str(e), @@ -241,21 +329,16 @@ class SSHExecutorTool(BaseTool): """Форматировать вывод команды.""" output = [] - # Добавляем заголовок с сервером и командой - output.append(f"🖥️ **SSH: {result.get('server', 'unknown')}**") - output.append(f"**Команда:** `{result.get('command', '')}`\n") - if result['stdout']: output.append(f"**Вывод:**\n```\n{result['stdout']}\n```") - elif result['returncode'] == 0: - output.append("**Вывод:**\n```\n(команда выполнена без вывода)\n```") if result['stderr']: output.append(f"**Ошибки:**\n```\n{result['stderr']}\n```") - output.append(f"\n**Код возврата:** `{result['returncode']}`") + if result['returncode'] != 0: + output.append(f"**Код возврата:** {result['returncode']}") - return "\n".join(output) + return "\n".join(output) if output else "Команда выполнена без вывода" def add_server(self, name: str, host: str, port: int, username: str, password: Optional[str] = None, client_keys: Optional[List[str]] = None): diff --git a/bot/utils/ssh_readers.py b/bot/utils/ssh_readers.py index e34898f..82bf29f 100644 --- a/bot/utils/ssh_readers.py +++ b/bot/utils/ssh_readers.py @@ -39,12 +39,20 @@ def detect_input_type(text: str) -> Optional[str]: return None -async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> Tuple[str, bool]: +async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0, wait_for_completion: bool = False) -> Tuple[str, bool]: """ Чтение вывода из SSH-процесса с таймаутом. - Возвращает (вывод, завершён_ли_процесс). + + Args: + process: SSH процесс для чтения + timeout: Таймаут для чтения данных (сек) + wait_for_completion: Если True, дождаться завершения процесса через process.wait() + + Returns: + (вывод, завершён_ли_процесс) """ output = "" + error_output = "" is_done = False try: @@ -61,13 +69,12 @@ async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2 logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}") else: # EOF + logger.debug("SSH stdout EOF") is_done = True break except asyncio.TimeoutError: - # Данные закончились - logger.debug(f"Timeout stdout, прочитано: {len(output)} байт") - if process.returncode is not None: - is_done = True + # Данные закончились по таймауту + logger.debug(f"Timeout stdout ({timeout} сек), прочитано: {len(output)} байт") break except UnicodeDecodeError as e: logger.debug(f"Ошибка декодирования UTF-8: {e}") @@ -78,11 +85,10 @@ async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2 is_done = True break except Exception as e: - logger.debug(f"Ошибка чтения SSH stdout: {e}") + logger.debug(f"Ошибка чтения SSH stdout: {type(e).__name__}: {e}") is_done = True # Читаем stderr если есть - error_output = "" try: while True: try: @@ -97,16 +103,108 @@ async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2 except (asyncio.TimeoutError, Exception): break except Exception as e: - logger.debug(f"Ошибка чтения SSH stderr: {e}") + logger.debug(f"Ошибка чтения SSH stderr: {type(e).__name__}: {e}") # Объединяем stdout и stderr if error_output: output = output + error_output if output else error_output - logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}") + logger.info(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}") return output, is_done +async def wait_and_read_ssh(process: asyncssh.SSHClientProcess, timeout: float = 30.0) -> Tuple[str, str, int]: + """ + Чтение вывода SSH-процесса с ожиданием полного завершения. + Аналог asyncio.subprocess.communicate() для asyncssh. + + Эта функция решает проблему с returncode, который становится доступен + только после завершения процесса. Читает stdout и stderr параллельно + с выполнением команды. + + Args: + process: SSH процесс + timeout: Максимальное время ожидания выполнения (сек) + + Returns: + (stdout, stderr, returncode) + """ + stdout_data = "" + stderr_data = "" + + async def read_stream(stream, is_stdout=True): + """Читает поток до EOF.""" + data = "" + try: + while True: + chunk = await stream.read() + if not chunk: + break + if isinstance(chunk, bytes): + data += chunk.decode('utf-8', errors='replace') + else: + data += str(chunk) + stream_name = "stdout" if is_stdout else "stderr" + logger.debug(f"{stream_name}: прочитано {len(chunk)} байт") + except Exception as e: + stream_name = "stdout" if is_stdout else "stderr" + logger.debug(f"{stream_name} завершен: {type(e).__name__}: {e}") + return data + + try: + # Читаем stdout и stderr параллельно с ожиданием завершения + logger.debug(f"wait_and_read_ssh: запуск чтения (timeout={timeout})") + + # Создаём задачи для чтения stdout и stderr + stdout_task = asyncio.create_task(read_stream(process.stdout, is_stdout=True)) + stderr_task = asyncio.create_task(read_stream(process.stderr, is_stdout=False)) + + # Ждём завершения процесса с таймаутом + await asyncio.wait_for(process.wait(), timeout=timeout) + logger.debug(f"wait_and_read_ssh: процесс завершился, returncode={process.returncode}") + + # Ждём завершения чтения с коротким таймаутом + try: + stdout_data = await asyncio.wait_for(stdout_task, timeout=2.0) + except asyncio.TimeoutError: + logger.warning("wait_and_read_ssh: таймаут чтения stdout") + stdout_task.cancel() + try: + await stdout_task + except asyncio.CancelledError: + pass + + try: + stderr_data = await asyncio.wait_for(stderr_task, timeout=2.0) + except asyncio.TimeoutError: + logger.warning("wait_and_read_ssh: таймаут чтения stderr") + stderr_task.cancel() + try: + await stderr_task + except asyncio.CancelledError: + pass + + except asyncio.TimeoutError: + logger.error(f"wait_and_read_ssh: таймаут выполнения команды ({timeout} сек)") + # Отменяем задачи чтения + stdout_task.cancel() + stderr_task.cancel() + try: + await stdout_task + except asyncio.CancelledError: + pass + try: + await stderr_task + except asyncio.CancelledError: + pass + raise + + returncode = process.returncode if process.returncode is not None else 0 + + logger.info(f"wait_and_read_ssh: stdout={len(stdout_data)} байт, stderr={len(stderr_data)} байт, returncode={returncode}") + return stdout_data, stderr_data, returncode + + def read_pty_output(master_fd: int, timeout: float = 2.0) -> Tuple[str, bool]: """ Чтение вывода из PTY с таймаутом.