diff --git a/bot.py b/bot.py index da36c29..8b91864 100644 --- a/bot.py +++ b/bot.py @@ -10,11 +10,19 @@ import asyncio import subprocess import logging import getpass +import re +import pty +import tty +import termios +import select +import fcntl from pathlib import Path from typing import Optional, Callable, Dict, Any, List from dataclasses import dataclass, field from functools import wraps +from datetime import datetime, timedelta +import pexpect import asyncssh from dotenv import load_dotenv @@ -81,7 +89,8 @@ class Server: port: int user: str tags: List[str] = field(default_factory=list) - + password: str = "" + @property def display_name(self) -> str: """Отображаемое имя с иконкой.""" @@ -126,51 +135,42 @@ class ServerManager: """Загрузка серверов из переменных окружения.""" self._ssh_key_path = os.getenv("SSH_KEY_PATH") self._default_server = os.getenv("DEFAULT_SERVER", "local") - + servers_str = os.getenv("SERVERS", "") if not servers_str.strip(): return - - # Парсинг формата: name|host|port|user|tags,name|host|port|user|tags - parts = servers_str.split(",") - i = 0 - while i < len(parts): - if i + 4 >= len(parts): - break - - # Проверяем, не является ли текущая часть тегом (содержит ли =) - if "=" in parts[i]: - i += 1 + + # Парсинг формата: name|host|port|user|tags|password,name|host|port|user|tags|password + # Теги могут содержать запятые, поэтому разбираем по частям + for server_line in servers_str.split(","): + if not server_line.strip(): continue - - name = parts[i].strip() - host = parts[i + 1].strip() - port_str = parts[i + 2].strip() - user = parts[i + 3].strip() - # Теги могут быть в следующей части или отсутствовать - tags = [] - next_idx = i + 4 - if next_idx < len(parts): - next_part = parts[next_idx].strip() - # Если следующая часть не похожа на имя сервера (содержит только буквы и дефисы) - # и не похожа на host (не содержит точек) - if "|" in next_part or (next_part and not next_part.replace("-", "").replace("_", "").isalnum()): - # Это теги - tags = [t.strip() for t in next_part.split("|") if t.strip()] - i += 5 - else: - i += 4 - else: - i += 4 + parts = server_line.strip().split("|") + if len(parts) < 4: + continue try: - port = int(port_str) - server = Server(name=name, host=host, port=port, user=user, tags=tags) + name = parts[0].strip() + host = parts[1].strip() + port = int(parts[2].strip()) + user = parts[3].strip() + + # Теги (часть 4) и пароль (часть 5) могут отсутствовать + tags = [] + password = "" + + if len(parts) >= 5 and parts[4].strip(): + tags = [t.strip() for t in parts[4].split(",") if t.strip()] + + if len(parts) >= 6: + password = parts[5].strip() + + server = Server(name=name, host=host, port=port, user=user, tags=tags, password=password) self._servers[name] = server logger.info(f"Загружен сервер: {server.display_name} ({server.description})") except ValueError as e: - logger.warning(f"Ошибка парсинга сервера: {parts[i:i+4]} - {e}") + logger.warning(f"Ошибка парсинга сервера: {parts} - {e}") def get(self, name: str) -> Optional[Server]: """Получить сервер по имени.""" @@ -211,16 +211,16 @@ class ServerManager: keyboard.append([button]) return InlineKeyboardMarkup(keyboard) - def add_server(self, name: str, host: str, port: int, user: str, tags: List[str] = None) -> bool: + def add_server(self, name: str, host: str, port: int, user: str, tags: List[str] = None, password: str = "") -> bool: """Добавить сервер.""" if name in self._servers: return False - self._servers[name] = Server(name=name, host=host, port=port, user=user, tags=tags or []) + self._servers[name] = Server(name=name, host=host, port=port, user=user, tags=tags or [], password=password) self.save_to_env() return True - def update_server(self, name: str, host: str = None, port: int = None, - user: str = None, tags: List[str] = None) -> bool: + def update_server(self, name: str, host: str = None, port: int = None, + user: str = None, tags: List[str] = None, password: str = None) -> bool: """Обновить сервер.""" if name not in self._servers or name == "local": return False @@ -233,6 +233,8 @@ class ServerManager: server.user = user if tags is not None: server.tags = tags + if password is not None: + server.password = password self.save_to_env() return True @@ -260,7 +262,8 @@ class ServerManager: if server.name == "local": continue tags_str = ",".join(server.tags) if server.tags else "" - server_parts.append(f"{server.name}|{server.host}|{server.port}|{server.user}|{tags_str}") + # Формат: name|host|port|user|tags|password + server_parts.append(f"{server.name}|{server.host}|{server.port}|{server.user}|{tags_str}|{server.password}") servers_line = f"SERVERS={','.join(server_parts)}\n" @@ -311,6 +314,437 @@ class StateManager: self._states[user_id] = UserState() +# --- Интерактивные SSH-сессии --- +@dataclass +class SSHSession: + """Интерактивная SSH-сессия.""" + user_id: int + server: Server + working_dir: str + conn: asyncssh.SSHClientConnection + process: asyncssh.SSHClientProcess + output_buffer: str = "" + waiting_for_input: bool = False + input_type: str = "" # "password", "confirm", "text" + last_activity: datetime = field(default_factory=datetime.now) + command: str = "" + + SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности + + def is_expired(self) -> bool: + """Проверка истечения таймаута сессии.""" + return datetime.now() - self.last_activity > self.SESSION_TIMEOUT + + +class SSHSessionManager: + """Менеджер интерактивных SSH-сессий.""" + + def __init__(self): + self._sessions: Dict[int, SSHSession] = {} + + def create_session(self, user_id: int, server: Server, working_dir: str, + conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess, + command: str = "") -> SSHSession: + """Создать новую сессию.""" + session = SSHSession( + user_id=user_id, + server=server, + working_dir=working_dir, + conn=conn, + process=process, + command=command + ) + self._sessions[user_id] = session + logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}") + return session + + def get_session(self, user_id: int) -> Optional[SSHSession]: + """Получить сессию пользователя.""" + session = self._sessions.get(user_id) + if session and session.is_expired(): + self.close_session(user_id) + return None + return session + + def close_session(self, user_id: int): + """Закрыть сессию пользователя.""" + session = self._sessions.pop(user_id, None) + if session: + try: + if session.process: + session.process.stdin.close() + session.process.stdout.feed_eof() + if session.conn: + session.conn.close() + logger.info(f"Закрыта SSH-сессия для пользователя {user_id}") + except Exception as e: + logger.warning(f"Ошибка при закрытии сессии: {e}") + + def has_active_session(self, user_id: int) -> bool: + """Проверка наличия активной сессии.""" + return self.get_session(user_id) is not None + + def cleanup_expired(self): + """Очистка истёкших сессий.""" + expired = [uid for uid, s in self._sessions.items() if s.is_expired()] + for uid in expired: + self.close_session(uid) + + +# Менеджер SSH-сессий +ssh_session_manager = SSHSessionManager() + + +@dataclass +class LocalSession: + """Интерактивная локальная сессия.""" + user_id: int + command: str + master_fd: int + pid: int + output_buffer: str = "" + waiting_for_input: bool = False + input_type: str = "" + last_activity: datetime = field(default_factory=datetime.now) + + SESSION_TIMEOUT = timedelta(minutes=5) + + def is_expired(self) -> bool: + return datetime.now() - self.last_activity > self.SESSION_TIMEOUT + + +class LocalSessionManager: + """Менеджер локальных интерактивных сессий.""" + + def __init__(self): + self._sessions: Dict[int, LocalSession] = {} + + def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession: + session = LocalSession( + user_id=user_id, + command=command, + master_fd=master_fd, + pid=pid + ) + self._sessions[user_id] = session + logger.info(f"Создана локальная сессия для пользователя {user_id}") + return session + + def get_session(self, user_id: int) -> Optional[LocalSession]: + session = self._sessions.get(user_id) + if session and session.is_expired(): + self.close_session(user_id) + return None + return session + + def close_session(self, user_id: int): + session = self._sessions.pop(user_id, None) + if session: + try: + # Закрываем pexpect процесс если есть + child = session.context.get('child') if hasattr(session, 'context') and session.context else None + if child: + child.close(force=True) + else: + # Старый способ для PTY + os.close(session.master_fd) + os.kill(session.pid, 9) + except: + pass + logger.info(f"Закрыта локальная сессия для пользователя {user_id}") + + def has_active_session(self, user_id: int) -> bool: + return self.get_session(user_id) is not None + + +# Менеджер локальных сессий +local_session_manager = LocalSessionManager() + + +# Паттерны для детектирования запросов ввода +INPUT_PATTERNS = { + "password": [ + r"[Pp]assword[:\s]*$", + r"[Pp]assphrase[:\s]*$", + r"Enter password[:\s]*$", + r"sudo password[:\s]*$", + r"\[sudo\] password for .*:", + r"[Пп]ароль[:\s]*$", + r"\[sudo\] пароль для .*:", + r"Введите пароль[:\s]*$", + ], + "confirm": [ + r"[Yy]es/[Nn]o[?:\s]*$", + r"\[?[Yy]\]?/?\[?[Nn]\]?", + r"Do you want to continue", + r"Continue\?", + r"Are you sure", + r"Is this OK", + r"[Yy]es or [Nn]o", + r"[Дд]а/[Нн]ет", + r"[Пп]родолжить", + ], + "shell_prompt": [ + r"[$#]\s*$", + r"[>$]\s*$", + r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$", + ], +} + + +def detect_input_type(text: str) -> Optional[str]: + """Определить тип запроса ввода по тексту.""" + text = text.strip() + + # Проверка на пароль + for pattern in INPUT_PATTERNS["password"]: + if re.search(pattern, text, re.MULTILINE): + return "password" + + # Проверка на подтверждение + for pattern in INPUT_PATTERNS["confirm"]: + if re.search(pattern, text, re.MULTILINE): + return "confirm" + + # Проверка на приглашение оболочки + for pattern in INPUT_PATTERNS["shell_prompt"]: + if re.search(pattern, text, re.MULTILINE): + return "prompt" + + return None + + +def clean_ansi_codes(text: str) -> str: + """ + Очистка ANSI-кодов и мусора из вывода терминала. + Обрабатывает: + - ANSI escape-последовательности \x1b[...m + - «Битые» ANSI-коды без escape-символа (например [33m, [0m) + - Символы замены Unicode () + - Кириллические имитации ANSI-кодов (например [0м) + """ + # Удаляем ANSI escape-последовательности + text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z]', '', text) + + # Удаляем «битые» ANSI-коды: [33m, [0m, [1m и т.д. (латиница и кириллица) + text = re.sub(r'\[\d+[мm]', '', text) + + # Удаляем символы замены Unicode + text = text.replace('\ufffd', '') + + return text + + +def normalize_output(text: str) -> str: + """ + Нормализовать вывод: обработать \r и убрать пустые строки. + \r используется для перезаписи строки (прогресс-баров). + """ + # Заменяем \r\n на \n + text = text.replace('\r\n', '\n') + + # Обрабатываем \r (возврат каретки) — строки с \r перезаписывают друг друга + lines = [] + for line in text.split('\n'): + if '\r' in line: + # Разбиваем по \r и берём последнюю часть (финальное состояние) + parts = line.split('\r') + line = parts[-1] + lines.append(line) + + text = '\n'.join(lines) + + # Разбиваем на строки, убираем пустые и trailing пробелы + lines = text.split('\n') + lines = [line.rstrip() for line in lines if line.strip()] + + # Очищаем прогресс-бары вида "Текст… 0%Текст… 50%Текст… 100%" + # И дублирующийся текст + cleaned_lines = [] + for line in lines: + # Ищем повторяющийся паттерн "текст… цифры%" + progress_pattern = re.compile(r'((?:.+?\.{3})\d+%)+') + match = progress_pattern.search(line) + if match: + # Берём последнее вхождение + items = re.findall(r'(.+?\.{3})(\d+)%', match.group(0)) + if items: + last_text, last_percent = items[-1] + line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():] + + # СНАЧАЛА удаляем остатки ANSI-кодов из строки + line = re.sub(r'.', '', line) #  + любой символ + + # Удаляем дублирующийся текст вида "0% [текст] 0% [текст]" + dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+') + match = dup_pattern.search(line) + if match: + # Оставляем только первое вхождение + line = line[:match.start()] + match.group(1) + line[match.end():] + + # Удаляем ведущие пробелы (артефакты терминала) + line = line.lstrip() + + if line: + cleaned_lines.append(line) + + return '\n'.join(cleaned_lines) + + +def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str: + """ + Форматировать длинный вывод: показать первые и последние строки. + По умолчанию: первые 10 + последние 10 строк = 20 строк максимум. + """ + lines = text.split('\n') + total_lines = len(lines) + + if total_lines <= max_lines: + return text + + # Показываем первые head_lines и последние tail_lines + head = lines[:head_lines] + tail = lines[-tail_lines:] + + skipped = total_lines - head_lines - tail_lines + + result = '\n'.join(head) + result += f'\n\n... ({skipped} строк пропущено) ...\n' + result += '\n'.join(tail) + + return result + + +async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> tuple[str, bool]: + """ + Чтение вывода из SSH-процесса с таймаутом. + Возвращает (вывод, завершён_ли_процесс). + """ + output = "" + is_done = False + + try: + # Используем readany() для чтения доступных данных + while True: + try: + # readany() читает любые доступные данные + data = await asyncio.wait_for(process.stdout.readany(), timeout=timeout) + if data: + if isinstance(data, bytes): + output += data.decode('utf-8', errors='replace') + else: + output += str(data) + logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}") + else: + # EOF + is_done = True + break + except asyncio.TimeoutError: + # Данные закончились + logger.debug(f"Timeout stdout, прочитано: {len(output)} байт") + if process.returncode is not None: + is_done = True + break + except UnicodeDecodeError as e: + logger.debug(f"Ошибка декодирования UTF-8: {e}") + continue + except Exception as e: + # Конец потока + logger.debug(f"Конец потока stdout: {type(e).__name__}: {e}") + is_done = True + break + except Exception as e: + logger.debug(f"Ошибка чтения SSH stdout: {e}") + is_done = True + + # Читаем stderr если есть + error_output = "" + try: + while True: + try: + data = await asyncio.wait_for(process.stderr.readany(), timeout=0.5) + if data: + if isinstance(data, bytes): + error_output += data.decode('utf-8', errors='replace') + else: + error_output += str(data) + else: + break + except (asyncio.TimeoutError, Exception): + break + except Exception as e: + logger.debug(f"Ошибка чтения SSH stderr: {e}") + + # Объединяем stdout и stderr + if error_output: + output = output + error_output if output else error_output + + logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}") + return output, is_done + + +def read_pty_output(master_fd: int, timeout: float = 2.0) -> tuple[str, bool]: + """ + Чтение вывода из PTY с таймаутом. + Возвращает (вывод, завершён_ли_процесс). + """ + output = "" + is_done = False + total_waited = 0 + + try: + # Устанавливаем non-blocking режим + flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) + fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + while total_waited < timeout: + try: + # Ждём данные с коротким таймаутом + ready, _, _ = select.select([master_fd], [], [], 0.2) + if ready: + try: + data = os.read(master_fd, 4096) + if data: + output += data.decode('utf-8', errors='replace') + logger.debug(f"Прочитано из PTY: {len(data)} байт") + # Сбрасываем таймер если есть данные + total_waited = 0 + else: + is_done = True + break + except BlockingIOError: + # Нет данных, продолжаем ждать + pass + else: + # Timeout - проверяем не завершился ли процесс + try: + _, status = os.waitpid(-1, os.WNOHANG) + if status != 0: + logger.debug(f"Процесс завершился со статусом: {status}") + is_done = True + break + except ChildProcessError: + pass + + # Если уже что-то прочитали и есть запрос ввода - выходим + if output and detect_input_type(output): + logger.debug(f"Обнаружен запрос ввода") + break + + total_waited += 0.2 + + except Exception as e: + logger.debug(f"Ошибка при чтении PTY: {e}") + break + + except Exception as e: + logger.debug(f"Ошибка чтения PTY: {e}") + is_done = True + + logger.debug(f"read_pty_output: output={len(output)} байт, is_done={is_done}") + return output, is_done + + # --- Система команд --- @dataclass class MenuItem: @@ -733,23 +1167,26 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): elif callback.startswith("server_edit_"): server_name = callback.replace("server_edit_", "") server = server_manager.get(server_name) - + if server and server_name != "local": state.editing_server = server_name state.waiting_for_input = True state.input_type = "edit_server_field" + password_status = "установлен" if server.password else "не установлен" await query.edit_message_text( f"✏️ *Редактирование сервера: {server_name}*\n\n" f"Текущие значения:\n" f"• Host: `{server.host}`\n" f"• Port: `{server.port}`\n" f"• User: `{server.user}`\n" - f"• Tags: `{','.join(server.tags) if server.tags else 'нет'}`\n\n" + f"• Tags: `{','.join(server.tags) if server.tags else 'нет'}`\n" + f"• Password: {password_status}\n\n" f"Введите номер поля для изменения:\n" f"1 — Host\n" f"2 — Port\n" f"3 — User\n" - f"4 — Tags", + f"4 — Tags\n" + f"5 — Password", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") @@ -785,11 +1222,30 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): reply_markup=menu_builder.get_keyboard("server") ) + elif callback == "srv_skip_password": + # Пропуск пароля при добавлении сервера + user_id = query.from_user.id + state = state_manager.get(user_id) + + state.context["new_server"]["password"] = "" + state.input_type = "add_server_tags" + await query.edit_message_text( + "✅ Пароль пропущен (будет использоваться только ключ)\n\n" + "Введите *теги* через запятую (или нажмите Пропустить):\n" + "Пример: `web,prod`, `db,backup`\n\n" + "Теги помогают группировать серверы.", + parse_mode="Markdown", + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⏭️ Пропустить", callback_data="srv_skip_tags")], + [InlineKeyboardButton("❌ Отмена", callback_data="server_menu")] + ]) + ) + elif callback == "srv_skip_tags": # Пропуск тегов при добавлении сервера user_id = query.from_user.id state = state_manager.get(user_id) - + new_server = state.context.get("new_server", {}) if new_server.get("name") and new_server.get("host") and new_server.get("port") and new_server.get("user"): if server_manager.add_server( @@ -797,7 +1253,8 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): host=new_server["host"], port=new_server["port"], user=new_server["user"], - tags=[] + tags=[], + password=new_server.get("password", "") ): await query.edit_message_text( "✅ *Сервер добавлен*\n\n" @@ -805,7 +1262,8 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): f"Host: `{new_server['host']}`\n" f"Port: `{new_server['port']}`\n" f"User: `{new_server['user']}`\n" - f"Tags: нет\n\n" + f"Tags: нет\n" + f"Password: {'установлен' if new_server.get('password') else 'не установлен'}\n\n" f"Сервер сохранён в `.env` и доступен для выбора.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") @@ -820,7 +1278,7 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): "❌ Ошибка: неполные данные сервера", reply_markup=menu_builder.get_keyboard("server") ) - + state.waiting_for_input = False state.input_type = None state.context.clear() @@ -963,13 +1421,6 @@ async def execute_cli_command(query, command: str): logger.info(f"Выполнение команды: {command} на сервере: {server_name}, в директории: {working_dir}") - await query.edit_message_text( - f"⏳ *Выполнение...*\n" - f"🖥️ `{server_name}`\n" - f"```\n{command}\n```", - parse_mode="Markdown" - ) - # Если локальный сервер — выполняем локально if server_name == "local" or server is None: await _execute_local_command(query, command, working_dir) @@ -979,29 +1430,117 @@ async def execute_cli_command(query, command: str): async def _execute_local_command(query, command: str, working_dir: str): - """Выполнение локальной команды.""" + """Выполнение локальной команды через PTY.""" + user_id = query.from_user.id + try: - process = await asyncio.create_subprocess_shell( - command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=working_dir - ) + logger.info(f"Создание PTY для команды: {command}") + # Создаём PTY + master_fd, slave_fd = pty.openpty() + logger.info(f"PTY создан: master_fd={master_fd}") + + # Запускаем процесс в PTY + pid = os.fork() + if pid == 0: + # Дочерний процесс + os.close(master_fd) + os.setsid() + os.dup2(slave_fd, 0) # stdin + os.dup2(slave_fd, 1) # stdout + os.dup2(slave_fd, 2) # stderr + os.close(slave_fd) + + os.chdir(working_dir) + os.execvp("/bin/bash", ["/bin/bash", "-c", command]) + else: + # Родительский процесс + os.close(slave_fd) + logger.info(f"Процесс запущен: pid={pid}") + + # Создаём сессию + session = local_session_manager.create_session( + user_id=user_id, + command=command, + master_fd=master_fd, + pid=pid + ) + + # Читаем начальный вывод + logger.info("Чтение вывода из PTY...") + output, is_done = read_pty_output(master_fd, timeout=3.0) + logger.info(f"Прочитано: {len(output)} байт, is_done={is_done}") + logger.debug(f"Вывод: {output[:500] if output else '(пусто)'}") + + session.output_buffer = output + session.last_activity = datetime.now() + + # Проверяем тип ввода + input_type = detect_input_type(output) + logger.info(f"Тип ввода: {input_type}") + + if input_type == "password": + session.waiting_for_input = True + session.input_type = "password" + await query.edit_message_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"🔐 *Запрошен пароль*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте пароль в чат:", + parse_mode="Markdown" + ) + return + elif input_type == "confirm": + session.waiting_for_input = True + session.input_type = "confirm" + await query.edit_message_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"❓ *Требуется подтверждение*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте `y` (да) или `n` (нет):", + parse_mode="Markdown" + ) + return + elif is_done: + local_session_manager.close_session(user_id) + await _show_result(query, command, output.encode(), b"", 0) + return + else: + # Команда ещё выполняется + await query.edit_message_text( + f"⏳ *Выполнение...*\n\n" + f"Команда: `{command}`\n\n" + f"```\n{output.strip()[-500:] if output else 'Выполняется...'}\n```", + parse_mode="Markdown" + ) + + while not is_done: + more_output, is_done = read_pty_output(master_fd, timeout=5.0) + output += more_output + session.output_buffer = output + session.last_activity = datetime.now() + + input_type = detect_input_type(output) + if input_type in ("password", "confirm"): + session.waiting_for_input = True + session.input_type = input_type + await query.edit_message_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"{'🔐 *Запрошен пароль*' if input_type == 'password' else '❓ *Требуется подтверждение'}\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"{'Отправьте пароль в чат:' if input_type == 'password' else 'Отправьте `y` (да) или `n` (нет):'}", + parse_mode="Markdown" + ) + return + + local_session_manager.close_session(user_id) + await _show_result(query, command, output.encode(), b"", 0) - stdout, stderr = await asyncio.wait_for( - process.communicate(), - timeout=30 - ) - - await _show_result(query, command, stdout, stderr, process.returncode) - - except asyncio.TimeoutError: - await query.edit_message_text( - "❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.", - parse_mode="Markdown" - ) except Exception as e: logger.error(f"Ошибка выполнения команды: {e}") + local_session_manager.close_session(user_id) await query.edit_message_text( f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" @@ -1009,41 +1548,118 @@ async def _execute_local_command(query, command: str, working_dir: str): async def _execute_ssh_command(query, command: str, server: Server, working_dir: str): - """Выполнение команды через SSH.""" + """Выполнение команды через SSH с интерактивной сессией.""" + user_id = query.from_user.id + try: # Подготовка SSH ключа client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None - - # Подключение к серверу - async with asyncssh.connect( - host=server.host, - port=server.port, - username=server.user, - client_keys=client_keys, - known_hosts=None # Отключаем проверку known_hosts для простоты - ) as conn: - # Выполнение команды - result = await conn.run( - command, - cwd=working_dir, - timeout=30 - ) - await _show_result(query, command, result.stdout.encode(), result.stderr.encode(), 0) + # Подготовка параметров подключения + connect_kwargs = { + "host": server.host, + "port": server.port, + "username": server.user, + "client_host_keys": None, + "known_hosts": None + } + + # Добавляем ключ или пароль + if client_keys: + connect_kwargs["client_keys"] = client_keys + if server.password: + connect_kwargs["password"] = server.password + + logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") + + # Подключение к серверу + conn = await asyncssh.connect(**connect_kwargs) + + # Выполнение команды с cd в рабочую директорию + full_command = f"cd {working_dir} && {command}" if working_dir else command + + # Создаем интерактивный процесс с PTY для поддержки ввода + # TERM环境变量设置 для корректной кодировки + process = await conn.create_process( + full_command, + term_type='xterm-256color', + env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} + ) + + # Создаём сессию + session = ssh_session_manager.create_session( + user_id=user_id, + server=server, + working_dir=working_dir, + conn=conn, + process=process, + command=command + ) + + # Читаем начальный вывод + output, is_done = await read_ssh_output(process, timeout=3.0) + session.output_buffer = output + session.last_activity = datetime.now() + + # Читаем пока процесс не завершится + while not is_done: + more_output, is_done = await read_ssh_output(process, timeout=2.0) + output += more_output + session.output_buffer = output + session.last_activity = datetime.now() + + # Проверяем тип ввода + input_type = detect_input_type(output) + + if input_type == "password": + # Запрос пароля + session.waiting_for_input = True + session.input_type = "password" + await query.edit_message_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"🔐 *Запрошен пароль*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте пароль в чат:", + parse_mode="Markdown" + ) + return + elif input_type == "confirm": + # Запрос подтверждения + session.waiting_for_input = True + session.input_type = "confirm" + await query.edit_message_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"❓ *Требуется подтверждение*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте `y` (да) или `n` (нет):", + parse_mode="Markdown" + ) + return + else: + # Команда завершена, показываем результат + ssh_session_manager.close_session(user_id) + await _show_result(query, command, output.encode(), "", 0) + return except asyncssh.Error as e: logger.error(f"SSH ошибка: {e}") + ssh_session_manager.close_session(user_id) await query.edit_message_text( f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" ) except asyncio.TimeoutError: + logger.error("Таймаут SSH подключения") + ssh_session_manager.close_session(user_id) await query.edit_message_text( "❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.", parse_mode="Markdown" ) except Exception as e: logger.error(f"Ошибка выполнения команды: {e}") + ssh_session_manager.close_session(user_id) await query.edit_message_text( f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" @@ -1052,16 +1668,18 @@ async def _execute_ssh_command(query, command: str, server: Server, working_dir: async def _show_result(query, command: str, stdout: bytes, stderr: bytes, returncode: int): """Показ результата выполнения команды.""" - output = stdout.decode("utf-8", errors="replace") - error = stderr.decode("utf-8", errors="replace") + output = clean_ansi_codes(stdout.decode("utf-8", errors="replace")) + output = normalize_output(output) + error = clean_ansi_codes(stderr.decode("utf-8", errors="replace")) result = f"✅ *Результат:*\n\n" - result += f"```\n{command}\n```\n\n" if output: + # Форматируем длинный вывод + output = format_long_output(output) if len(output) > 4000: output = output[:4000] + "\n... (вывод обрезан)" - result += f"*Вывод:*\n```\n{output}\n```\n" + result += f"```\n{output}\n```\n" if error: if len(error) > 4000: @@ -1085,16 +1703,232 @@ async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE await handle_server_input(update, text) return + # Проверка: не активная ли SSH-сессия ожидает ввода + ssh_session = ssh_session_manager.get_session(user_id) + if ssh_session and ssh_session.waiting_for_input: + await handle_ssh_session_input(update, text, ssh_session) + return + + # Проверка: не активная ли локальная сессия ожидает ввода + local_session = local_session_manager.get_session(user_id) + if local_session and local_session.waiting_for_input: + await handle_local_session_input(update, text, local_session) + return + # Любое текстовое сообщение = CLI команда logger.info(f"Пользователь {user_id} отправил команду: {text}") - await update.message.reply_text( - f"⏳ *Выполнение...*\n\n`{text}`", - parse_mode="Markdown" - ) await execute_cli_command_from_message(update, text) +async def handle_ssh_session_input(update: Update, text: str, session: SSHSession): + """Обработка ввода пользователя в активную SSH-сессию.""" + user_id = update.effective_user.id + input_type = session.input_type + + logger.info(f"Пользователь {user_id} ввёл '{text}' в SSH-сессию (тип: {input_type})") + + try: + # Отправляем ввод в SSH-процесс + if input_type == "password": + # Пароль отправляем с newline + session.process.stdin.write(text + "\n") + elif input_type == "confirm": + # Подтверждение - y или n + answer = "y" if text.lower() in ("y", "yes", "да", "д") else "n" + session.process.stdin.write(answer + "\n") + else: + # Обычный ввод + session.process.stdin.write(text + "\n") + + await session.process.stdin.drain() + session.last_activity = datetime.now() + + # Читаем ответ + output, is_done = await read_ssh_output(session.process, timeout=3.0) + session.output_buffer += output + + # Проверяем тип ввода + new_input_type = detect_input_type(output) + + if new_input_type == "password": + session.waiting_for_input = True + session.input_type = "password" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"🔐 *Запрошен пароль*\n\n" + f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" + f"Отправьте пароль в чат:", + parse_mode="Markdown" + ) + return + elif new_input_type == "confirm": + session.waiting_for_input = True + session.input_type = "confirm" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"❓ *Требуется подтверждение*\n\n" + f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" + f"Отправьте `y` (да) или `n` (нет):", + parse_mode="Markdown" + ) + return + elif is_done or new_input_type == "prompt": + # Команда завершена + await update.message.reply_text( + f"✅ *Результат:*\n\n" + f"```\n{session.command}\n```\n\n" + f"```\n{session.output_buffer.strip()[-4000:]}\n```", + parse_mode="Markdown" + ) + ssh_session_manager.close_session(user_id) + return + else: + # Команда ещё выполняется + await update.message.reply_text( + f"⏳ *Выполнение...*\n\n" + f"```\n{output.strip()[-500:] if output else 'Выполняется...'}\n```", + parse_mode="Markdown" + ) + + # Читаем остаток + while not is_done: + more_output, is_done = await read_ssh_output(session.process, timeout=5.0) + output += more_output + session.output_buffer += output + session.last_activity = datetime.now() + + new_input_type = detect_input_type(output) + if new_input_type in ("password", "confirm"): + session.waiting_for_input = True + session.input_type = new_input_type + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"{'🔐 *Запрошен пароль*' if new_input_type == 'password' else '❓ *Требуется подтверждение'}\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"{'Отправьте пароль в чат:' if new_input_type == 'password' else 'Отправьте `y` (да) или `n` (нет):'}", + parse_mode="Markdown" + ) + return + + # Завершено + await update.message.reply_text( + f"✅ *Результат:*\n\n" + f"```\n{session.command}\n```\n\n" + f"```\n{session.output_buffer.strip()[-4000:]}\n```", + parse_mode="Markdown" + ) + ssh_session_manager.close_session(user_id) + + except Exception as e: + logger.error(f"Ошибка ввода в SSH-сессию: {e}") + ssh_session_manager.close_session(user_id) + await update.message.reply_text( + f"❌ *Ошибка:*\n```\n{str(e)}\n```", + parse_mode="Markdown" + ) + + +async def handle_local_session_input(update: Update, text: str, session: LocalSession): + """Обработка ввода пользователя в локальную сессию.""" + user_id = update.effective_user.id + input_type = session.input_type + + logger.info(f"Пользователь {user_id} ввёл '{text}' в локальную сессию (тип: {input_type})") + + try: + child = session.context.get('child') + if not child: + raise Exception("Сессия не содержит child объект") + + # Отправляем ввод + if input_type == "password": + child.sendline(text) + elif input_type == "confirm": + answer = "y" if text.lower() in ("y", "yes", "да", "д") else "n" + child.sendline(answer) + else: + child.sendline(text) + + session.last_activity = datetime.now() + + # Читаем ответ + logger.info("Чтение ответа...") + output = "" + + try: + while True: + line = child.read_nonblocking(size=4096, timeout=5.0) + if not line: + break + output += line + logger.debug(f"Прочитано: {len(line)} символов") + + # Проверяем запрос ввода + if detect_input_type(output): + break + + except pexpect.TIMEOUT: + pass + except pexpect.EOF: + pass + + logger.info(f"После ввода прочитано: {len(output)} символов") + session.output_buffer += output + + # Проверяем тип ввода + new_input_type = detect_input_type(output) + + if new_input_type == "password": + session.waiting_for_input = True + session.input_type = "password" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"🔐 *Запрошен пароль*\n\n" + f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" + f"Отправьте пароль в чат:", + parse_mode="Markdown" + ) + return + elif new_input_type == "confirm": + session.waiting_for_input = True + session.input_type = "confirm" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"❓ *Требуется подтверждение*\n\n" + f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" + f"Отправьте `y` (да) или `n` (нет):", + parse_mode="Markdown" + ) + return + else: + # Команда завершена + # Очищаем ANSI-коды и нормализуем вывод + cleaned_output = clean_ansi_codes(session.output_buffer) + cleaned_output = normalize_output(cleaned_output) + + # Форматируем длинный вывод: первые 5 и последние 10 строк + formatted_output = format_long_output(cleaned_output.strip(), max_lines=15, head_lines=5, tail_lines=10) + if len(formatted_output) > 4000: + formatted_output = formatted_output[:4000] + "\n... (вывод обрезан)" + + await update.message.reply_text( + f"✅ *Результат:*\n\n" + f"```\n{session.command}\n```\n\n" + f"```\n{formatted_output}\n```", + parse_mode="Markdown" + ) + local_session_manager.close_session(user_id) + + except Exception as e: + logger.error(f"Ошибка ввода в локальную сессию: {e}") + local_session_manager.close_session(user_id) + await update.message.reply_text( + f"❌ *Ошибка:*\n```\n{str(e)}\n```", + parse_mode="Markdown" + ) + + async def handle_server_input(update: Update, text: str): """Обработка ввода данных для CRUD операций с серверами.""" user_id = update.effective_user.id @@ -1167,9 +2001,23 @@ async def handle_server_input(update: Update, text: str): elif input_type == "add_server_user": state.context["new_server"]["user"] = text - state.input_type = "add_server_tags" + state.input_type = "add_server_password" await update.message.reply_text( f"✅ User: `{text}`\n\n" + "Введите *SSH пароль* (или нажмите Пропустить для подключения только по ключу):\n" + "⚠️ Пароль будет сохранён в .env файл в открытом виде!", + parse_mode="Markdown", + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⏭️ Пропустить", callback_data="srv_skip_password")], + [InlineKeyboardButton("❌ Отмена", callback_data="server_menu")] + ]) + ) + + elif input_type == "add_server_password": + state.context["new_server"]["password"] = text + state.input_type = "add_server_tags" + await update.message.reply_text( + "✅ Пароль сохранён\n\n" "Введите *теги* через запятую (или нажмите Пропустить):\n" "Пример: `web,prod`, `db,backup`\n\n" "Теги помогают группировать серверы.", @@ -1184,7 +2032,7 @@ async def handle_server_input(update: Update, text: str): # Обработка ввода тегов (если пользователь ввёл текстом, а не нажал кнопку) tags = [t.strip() for t in text.split(",") if t.strip()] state.context["new_server"]["tags"] = tags - + # Завершение добавления new_server = state.context.get("new_server", {}) if server_manager.add_server( @@ -1192,7 +2040,8 @@ async def handle_server_input(update: Update, text: str): host=new_server["host"], port=new_server["port"], user=new_server["user"], - tags=tags + tags=tags, + password=new_server.get("password", "") ): await update.message.reply_text( "✅ *Сервер добавлен*\n\n" @@ -1200,7 +2049,8 @@ async def handle_server_input(update: Update, text: str): f"Host: `{new_server['host']}`\n" f"Port: `{new_server['port']}`\n" f"User: `{new_server['user']}`\n" - f"Tags: `{','.join(tags)}`\n\n" + f"Tags: `{','.join(tags)}`\n" + f"Password: {'установлен' if new_server.get('password') else 'не установлен'}\n\n" f"Сервер сохранён в `.env` и доступен для выбора.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") @@ -1210,7 +2060,7 @@ async def handle_server_input(update: Update, text: str): "❌ Ошибка: сервер с таким именем уже существует", reply_markup=menu_builder.get_keyboard("server") ) - + state.waiting_for_input = False state.input_type = None state.context.clear() @@ -1241,9 +2091,16 @@ async def handle_server_input(update: Update, text: str): "Введите новые *теги* через запятую:", parse_mode="Markdown" ) + elif text == "5": + state.input_type = "edit_server_password" + await update.message.reply_text( + "Введите новый *password* (или оставьте пустым для подключения только по ключу):\n" + "⚠️ Пароль будет сохранён в .env файл в открытом виде!", + parse_mode="Markdown" + ) else: await update.message.reply_text( - "❌ Введите номер поля (1-4):", + "❌ Введите номер поля (1-5):", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) @@ -1272,12 +2129,12 @@ async def handle_server_input(update: Update, text: str): server_manager.update_server(state.editing_server, tags=tags) await finish_edit_server(update, state) + elif input_type == "edit_server_password": + server_manager.update_server(state.editing_server, password=text) + await finish_edit_server(update, state) + else: # Неизвестный тип ввода - выполняем команду - await update.message.reply_text( - f"⏳ *Выполнение...*\n\n`{text}`", - parse_mode="Markdown" - ) await execute_cli_command_from_message(update, text) return @@ -1400,94 +2257,329 @@ async def _execute_composite_command_local(update: Update, command: str, working async def _execute_composite_command_ssh(update: Update, command: str, server: Server, working_dir: str): - """Выполнение составной команды через SSH.""" + """Выполнение составной команды через SSH с интерактивной сессией.""" + user_id = update.effective_user.id command_with_pwd = f"{command} && pwd" - + try: client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None - - async with asyncssh.connect( - host=server.host, - port=server.port, - username=server.user, - client_keys=client_keys, - known_hosts=None - ) as conn: - result = await conn.run(command_with_pwd, cwd=working_dir, timeout=30) - output = result.stdout.strip() - error = result.stderr - # Последняя строка - pwd + # Подготовка параметров подключения + connect_kwargs = { + "host": server.host, + "port": server.port, + "username": server.user, + "client_host_keys": None, + "known_hosts": None + } + + if client_keys: + connect_kwargs["client_keys"] = client_keys + if server.password: + connect_kwargs["password"] = server.password + + logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") + + conn = await asyncssh.connect(**connect_kwargs) + + # Выполнение команды с cd в рабочую директорию + full_command = f"cd {working_dir} && {command_with_pwd}" if working_dir else command_with_pwd + + # Создаем интерактивный процесс с PTY для поддержки ввода + # TERM环境变量设置 для корректной кодировки + process = await conn.create_process( + full_command, + term_type='xterm-256color', + env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} + ) + + # Создаём сессию + session = ssh_session_manager.create_session( + user_id=user_id, + server=server, + working_dir=working_dir, + conn=conn, + process=process, + command=command + ) + + # Читаем начальный вывод + output, is_done = await read_ssh_output(process, timeout=3.0) + session.output_buffer = output + session.last_activity = datetime.now() + + # Читаем пока процесс не завершится + while not is_done: + more_output, is_done = await read_ssh_output(process, timeout=2.0) + output += more_output + session.output_buffer = output + session.last_activity = datetime.now() + + # Проверяем тип ввода + input_type = detect_input_type(output) + + if input_type == "password": + session.waiting_for_input = True + session.input_type = "password" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"🔐 *Запрошен пароль*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте пароль в чат:", + parse_mode="Markdown" + ) + return + elif input_type == "confirm": + session.waiting_for_input = True + session.input_type = "confirm" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"❓ *Требуется подтверждение*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте `y` (да) или `n` (нет):", + parse_mode="Markdown" + ) + return + else: + # Обработка pwd для смены директории if output: - lines = output.split('\n') + lines = output.strip().split('\n') final_dir = lines[-1].strip() - # Простая проверка - если начинается с / if final_dir.startswith('/'): - state_manager.get(update.effective_user.id).working_directory = final_dir + state_manager.get(user_id).working_directory = final_dir output = '\n'.join(lines[:-1]) - await _show_result_message(update, command, output, error, 0) + ssh_session_manager.close_session(user_id) + await _show_result_message(update, command, output, "", 0) + return except asyncssh.Error as e: logger.error(f"SSH ошибка: {e}") + ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") except asyncio.TimeoutError: + logger.error("Таймаут SSH подключения") + ssh_session_manager.close_session(user_id) await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown") except Exception as e: logger.error(f"Ошибка: {e}") + ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") async def _execute_local_command_message(update: Update, command: str, working_dir: str): - """Выполнение локальной команды из сообщения.""" + """Выполнение локальной команды из сообщения через pexpect.""" + user_id = update.effective_user.id + try: - process = await asyncio.create_subprocess_shell( - command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=working_dir + logger.info(f"Запуск команды через pexpect: {command}") + + # Создаём интерактивный процесс + child = pexpect.spawn( + '/bin/bash', + ['-c', command], + cwd=working_dir, + encoding='utf-8', + codec_errors='replace', + echo=False, + timeout=30 ) - stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30) - await _show_result_message(update, command, stdout.decode(), stderr.decode(), process.returncode) - except asyncio.TimeoutError: - await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown") + + # Создаём сессию (используем child вместо master_fd) + session = local_session_manager.create_session( + user_id=user_id, + command=command, + master_fd=child.child_fd, + pid=child.pid + ) + session.context = {'child': child} # Сохраняем child объект + + # Читаем начальный вывод + logger.info("Чтение вывода...") + output = "" + + try: + # Пробуем прочитать с таймаутом + while True: + line = child.read_nonblocking(size=4096, timeout=2.0) + if not line: + break + output += line + logger.debug(f"Прочитано: {len(line)} символов") + + # Проверяем запрос ввода + if detect_input_type(output): + break + + except pexpect.TIMEOUT: + pass + except pexpect.EOF: + pass + + logger.info(f"Прочитано: {len(output)} символов") + session.output_buffer = output + session.last_activity = datetime.now() + + # Проверяем тип ввода + input_type = detect_input_type(output) + logger.info(f"Тип ввода: {input_type}") + + if input_type == "password": + session.waiting_for_input = True + session.input_type = "password" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"🔐 *Запрошен пароль*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте пароль в чат:", + parse_mode="Markdown" + ) + return + elif input_type == "confirm": + session.waiting_for_input = True + session.input_type = "confirm" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"❓ *Требуется подтверждение*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте `y` (да) или `n` (нет):", + parse_mode="Markdown" + ) + return + else: + # Команда завершена + local_session_manager.close_session(user_id) + await _show_result_message(update, command, output, "", 0) + except Exception as e: - logger.error(f"Ошибка: {e}") + logger.error(f"Ошибка выполнения команды: {e}") + local_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") async def _execute_ssh_command_message(update: Update, command: str, server: Server, working_dir: str): - """Выполнение команды через SSH из сообщения.""" + """Выполнение команды через SSH из сообщения с интерактивной сессией.""" + user_id = update.effective_user.id + try: client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None + + # Подготовка параметров подключения + connect_kwargs = { + "host": server.host, + "port": server.port, + "username": server.user, + "client_host_keys": None, + "known_hosts": None + } + + if client_keys: + connect_kwargs["client_keys"] = client_keys + if server.password: + connect_kwargs["password"] = server.password + + logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") + + conn = await asyncssh.connect(**connect_kwargs) - async with asyncssh.connect( - host=server.host, - port=server.port, - username=server.user, - client_keys=client_keys, - known_hosts=None - ) as conn: - result = await conn.run(command, cwd=working_dir, timeout=30) - await _show_result_message(update, command, result.stdout, result.stderr, 0) + # Выполнение команды с cd в рабочую директорию + full_command = f"cd {working_dir} && {command}" if working_dir else command + + # Создаем интерактивный процесс с PTY для поддержки ввода + # TERM环境变量设置 для корректной кодировки + process = await conn.create_process( + full_command, + term_type='xterm-256color', + env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} + ) + + # Создаём сессию + session = ssh_session_manager.create_session( + user_id=user_id, + server=server, + working_dir=working_dir, + conn=conn, + process=process, + command=command + ) + + # Читаем начальный вывод + output, is_done = await read_ssh_output(process, timeout=3.0) + session.output_buffer = output + session.last_activity = datetime.now() + + # Читаем пока процесс не завершится + while not is_done: + more_output, is_done = await read_ssh_output(process, timeout=2.0) + output += more_output + session.output_buffer = output + session.last_activity = datetime.now() + + # Проверяем тип ввода + input_type = detect_input_type(output) + + if input_type == "password": + session.waiting_for_input = True + session.input_type = "password" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"🔐 *Запрошен пароль*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте пароль в чат:", + parse_mode="Markdown" + ) + return + elif input_type == "confirm": + session.waiting_for_input = True + session.input_type = "confirm" + await update.message.reply_text( + f"⏳ *Требуется ввод*\n\n" + f"Команда: `{command}`\n\n" + f"❓ *Требуется подтверждение*\n\n" + f"```\n{output.strip()[-200:]}\n```\n\n" + f"Отправьте `y` (да) или `n` (нет):", + parse_mode="Markdown" + ) + return + else: + # Команда завершена, показываем результат + ssh_session_manager.close_session(user_id) + await _show_result_message(update, command, output, "", 0) + return + except asyncssh.Error as e: logger.error(f"SSH ошибка: {e}") + ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") except asyncio.TimeoutError: + logger.error("Таймаут SSH подключения") + ssh_session_manager.close_session(user_id) await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown") except Exception as e: logger.error(f"Ошибка: {e}") + ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") async def _show_result_message(update: Update, command: str, output: str, error: str, returncode: int): """Показ результата выполнения команды.""" - result = f"✅ *Результат:*\n\n```\n{command}\n```\n\n" + # Очистка ANSI-кодов и нормализация + output = normalize_output(clean_ansi_codes(output)) if output else "" + error = clean_ansi_codes(error) if error else "" + + result = f"✅ *Результат:*\n\n" if output: + # Форматируем длинный вывод: первые 5 и последние 10 строк + output = format_long_output(output, max_lines=15, head_lines=5, tail_lines=10) if len(output) > 4000: output = output[:4000] + "\n... (вывод обрезан)" - result += f"*Вывод:*\n```\n{output}\n```\n" + result += f"```\n{output}\n```\n" if error: if len(error) > 4000: @@ -1507,12 +2599,34 @@ async def post_init(application: Application): BotCommand("menu", "Главное меню с кнопками"), BotCommand("help", "Справка"), BotCommand("settings", "Настройки"), + BotCommand("stop", "Прервать SSH-сессию"), ] await application.bot.set_my_commands(commands) logger.info("Бот инициализирован") +@check_access +async def stop_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Обработка команды /stop - прерывание активной SSH-сессии.""" + user_id = update.effective_user.id + + session = ssh_session_manager.get_session(user_id) + if session: + ssh_session_manager.close_session(user_id) + await update.message.reply_text( + "❌ *SSH-сессия прервана*\n\n" + f"Команда `{session.command}` была остановлена.", + parse_mode="Markdown" + ) + else: + await update.message.reply_text( + "ℹ️ *Нет активных SSH-сессий*\n\n" + "У вас нет выполняющихся команд.", + parse_mode="Markdown" + ) + + def main(): """Точка входа.""" # Чтение токена только из переменной окружения @@ -1541,6 +2655,7 @@ def main(): application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("settings", settings_command)) application.add_handler(CommandHandler("menu", menu_command)) + application.add_handler(CommandHandler("stop", stop_command)) application.add_handler(CallbackQueryHandler(menu_callback)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))