#!/usr/bin/env python3 """ Telegram CLI Bot - бот для выполнения CLI команд с многоуровневым меню. Легкое добавление новых команд через регистрацию хендлеров. """ import os import sys import asyncio import subprocess import logging import getpass import re import pty import tty import termios import select import fcntl from pathlib import Path from typing import Optional, Callable, Dict, Any, List from dataclasses import dataclass, field from functools import wraps from datetime import datetime, timedelta import pexpect import asyncssh from dotenv import load_dotenv from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand from telegram.ext import ( Application, CommandHandler, CallbackQueryHandler, MessageHandler, ContextTypes, filters, ) # Загрузка переменных окружения из .env load_dotenv() # --- Конфигурация --- BASE_DIR = Path(__file__).parent logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, handlers=[ logging.FileHandler(BASE_DIR / "bot.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # --- Конфигурация бота из переменных окружения --- class BotConfig: """Конфигурация бота из переменных окружения.""" def __init__(self): self.name = os.getenv("BOT_NAME", "CLI Assistant") self.description = os.getenv("BOT_DESCRIPTION", "Бот для выполнения CLI команд") self.icon = os.getenv("BOT_ICON_EMOJI", "🤖") self.working_directory = os.getenv("WORKING_DIRECTORY", str(Path.home())) # Парсинг списка разрешённых пользователей allowed_users_str = os.getenv("ALLOWED_USERS", "") if allowed_users_str.strip(): self.allowed_users = [ int(uid.strip()) for uid in allowed_users_str.split(",") if uid.strip().isdigit() ] else: self.allowed_users = [] @property def is_access_restricted(self) -> bool: """Проверка: ограничен ли доступ.""" return len(self.allowed_users) > 0 # --- Серверы --- @dataclass class Server: """Конфигурация сервера.""" name: str host: str port: int user: str tags: List[str] = field(default_factory=list) password: str = "" @property def display_name(self) -> str: """Отображаемое имя с иконкой.""" icon = "🖥️" if "local" in self.tags: icon = "💻" elif "db" in self.tags: icon = "🗄️" elif "web" in self.tags: icon = "🌐" return f"{icon} {self.name}" @property def description(self) -> str: """Краткое описание сервера.""" return f"{self.user}@{self.host}:{self.port}" class ServerManager: """Управление серверами.""" def __init__(self): self._servers: Dict[str, Server] = {} self._default_server: str = "local" self._ssh_key_path: Optional[str] = None # Локальный сервер всегда доступен try: local_user = getpass.getuser() except Exception: local_user = "user" self._servers["local"] = Server( name="local", host="localhost", port=22, user=local_user, tags=["local", "dev"] ) def load_from_env(self): """Загрузка серверов из переменных окружения.""" self._ssh_key_path = os.getenv("SSH_KEY_PATH") self._default_server = os.getenv("DEFAULT_SERVER", "local") servers_str = os.getenv("SERVERS", "") if not servers_str.strip(): return # Парсинг формата: name|host|port|user|tags|password,name|host|port|user|tags|password # Теги могут содержать запятые, поэтому разбираем по частям for server_line in servers_str.split(","): if not server_line.strip(): continue parts = server_line.strip().split("|") if len(parts) < 4: continue try: name = parts[0].strip() host = parts[1].strip() port = int(parts[2].strip()) user = parts[3].strip() # Теги (часть 4) и пароль (часть 5) могут отсутствовать tags = [] password = "" if len(parts) >= 5 and parts[4].strip(): tags = [t.strip() for t in parts[4].split(",") if t.strip()] if len(parts) >= 6: password = parts[5].strip() server = Server(name=name, host=host, port=port, user=user, tags=tags, password=password) self._servers[name] = server logger.info(f"Загружен сервер: {server.display_name} ({server.description})") except ValueError as e: logger.warning(f"Ошибка парсинга сервера: {parts} - {e}") def get(self, name: str) -> Optional[Server]: """Получить сервер по имени.""" return self._servers.get(name) def list_servers(self) -> List[Server]: """Список всех серверов.""" return list(self._servers.values()) def get_by_tags(self, tags: List[str]) -> List[Server]: """Получить серверы по тегам.""" result = [] for server in self._servers.values(): if any(tag in server.tags for tag in tags): result.append(server) return result @property def default_server(self) -> str: """Имя сервера по умолчанию.""" return self._default_server @property def ssh_key_path(self) -> Optional[str]: """Путь к SSH ключу.""" return self._ssh_key_path def get_keyboard(self, exclude_local: bool = False) -> InlineKeyboardMarkup: """Создать клавиатуру с выбором сервера.""" keyboard = [] for server in self._servers.values(): if exclude_local and server.name == "local": continue button = InlineKeyboardButton( server.display_name, callback_data=f"server_select_{server.name}" ) keyboard.append([button]) return InlineKeyboardMarkup(keyboard) def add_server(self, name: str, host: str, port: int, user: str, tags: List[str] = None, password: str = "") -> bool: """Добавить сервер.""" if name in self._servers: return False self._servers[name] = Server(name=name, host=host, port=port, user=user, tags=tags or [], password=password) self.save_to_env() return True def update_server(self, name: str, host: str = None, port: int = None, user: str = None, tags: List[str] = None, password: str = None) -> bool: """Обновить сервер.""" if name not in self._servers or name == "local": return False server = self._servers[name] if host: server.host = host if port: server.port = port if user: server.user = user if tags is not None: server.tags = tags if password is not None: server.password = password self.save_to_env() return True def delete_server(self, name: str) -> bool: """Удалить сервер.""" if name not in self._servers or name == "local": return False del self._servers[name] self.save_to_env() return True def save_to_env(self): """Сохранить серверы в .env файл.""" env_file = Path(__file__).parent / ".env" # Читаем существующий файл lines = [] if env_file.exists(): with open(env_file, "r", encoding="utf-8") as f: lines = f.readlines() # Формируем строку серверов server_parts = [] for server in self._servers.values(): if server.name == "local": continue tags_str = ",".join(server.tags) if server.tags else "" # Формат: name|host|port|user|tags|password server_parts.append(f"{server.name}|{server.host}|{server.port}|{server.user}|{tags_str}|{server.password}") servers_line = f"SERVERS={','.join(server_parts)}\n" # Ищем и обновляем или добавляем строку SERVERS found = False for i, line in enumerate(lines): if line.startswith("SERVERS="): lines[i] = servers_line found = True break if not found: lines.append("\n" + servers_line) # Записываем обратно with open(env_file, "w", encoding="utf-8") as f: f.writelines(lines) logger.info(f"Серверы сохранены в {env_file}") # --- Хранилище состояний пользователя --- @dataclass class UserState: """Состояние пользователя в диалоге.""" current_menu: str = "main" waiting_for_input: bool = False input_type: Optional[str] = None # "name", "host", "port", "user", "tags", "server_action" parent_menu: Optional[str] = None context: Dict[str, Any] = field(default_factory=dict) working_directory: Optional[str] = None current_server: str = "local" # Имя текущего сервера editing_server: Optional[str] = None # Имя сервера, который редактируем class StateManager: """Управление состояниями пользователей.""" def __init__(self): self._states: Dict[int, UserState] = {} def get(self, user_id: int) -> UserState: if user_id not in self._states: self._states[user_id] = UserState() return self._states[user_id] def reset(self, user_id: int): self._states[user_id] = UserState() # --- Интерактивные SSH-сессии --- @dataclass class SSHSession: """Интерактивная SSH-сессия.""" user_id: int server: Server working_dir: str conn: asyncssh.SSHClientConnection process: asyncssh.SSHClientProcess output_buffer: str = "" waiting_for_input: bool = False input_type: str = "" # "password", "confirm", "text" last_activity: datetime = field(default_factory=datetime.now) command: str = "" SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности def is_expired(self) -> bool: """Проверка истечения таймаута сессии.""" return datetime.now() - self.last_activity > self.SESSION_TIMEOUT class SSHSessionManager: """Менеджер интерактивных SSH-сессий.""" def __init__(self): self._sessions: Dict[int, SSHSession] = {} def create_session(self, user_id: int, server: Server, working_dir: str, conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess, command: str = "") -> SSHSession: """Создать новую сессию.""" session = SSHSession( user_id=user_id, server=server, working_dir=working_dir, conn=conn, process=process, command=command ) self._sessions[user_id] = session logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}") return session def get_session(self, user_id: int) -> Optional[SSHSession]: """Получить сессию пользователя.""" session = self._sessions.get(user_id) if session and session.is_expired(): self.close_session(user_id) return None return session def close_session(self, user_id: int): """Закрыть сессию пользователя.""" session = self._sessions.pop(user_id, None) if session: try: if session.process: session.process.stdin.close() session.process.stdout.feed_eof() if session.conn: session.conn.close() logger.info(f"Закрыта SSH-сессия для пользователя {user_id}") except Exception as e: logger.warning(f"Ошибка при закрытии сессии: {e}") def has_active_session(self, user_id: int) -> bool: """Проверка наличия активной сессии.""" return self.get_session(user_id) is not None def cleanup_expired(self): """Очистка истёкших сессий.""" expired = [uid for uid, s in self._sessions.items() if s.is_expired()] for uid in expired: self.close_session(uid) # Менеджер SSH-сессий ssh_session_manager = SSHSessionManager() @dataclass class LocalSession: """Интерактивная локальная сессия.""" user_id: int command: str master_fd: int pid: int output_buffer: str = "" waiting_for_input: bool = False input_type: str = "" last_activity: datetime = field(default_factory=datetime.now) SESSION_TIMEOUT = timedelta(minutes=5) def is_expired(self) -> bool: return datetime.now() - self.last_activity > self.SESSION_TIMEOUT class LocalSessionManager: """Менеджер локальных интерактивных сессий.""" def __init__(self): self._sessions: Dict[int, LocalSession] = {} def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession: session = LocalSession( user_id=user_id, command=command, master_fd=master_fd, pid=pid ) self._sessions[user_id] = session logger.info(f"Создана локальная сессия для пользователя {user_id}") return session def get_session(self, user_id: int) -> Optional[LocalSession]: session = self._sessions.get(user_id) if session and session.is_expired(): self.close_session(user_id) return None return session def close_session(self, user_id: int): session = self._sessions.pop(user_id, None) if session: try: # Закрываем pexpect процесс если есть child = session.context.get('child') if hasattr(session, 'context') and session.context else None if child: child.close(force=True) else: # Старый способ для PTY os.close(session.master_fd) os.kill(session.pid, 9) except: pass logger.info(f"Закрыта локальная сессия для пользователя {user_id}") def has_active_session(self, user_id: int) -> bool: return self.get_session(user_id) is not None # Менеджер локальных сессий local_session_manager = LocalSessionManager() # Паттерны для детектирования запросов ввода INPUT_PATTERNS = { "password": [ r"[Pp]assword[:\s]*$", r"[Pp]assphrase[:\s]*$", r"Enter password[:\s]*$", r"sudo password[:\s]*$", r"\[sudo\] password for .*:", r"[Пп]ароль[:\s]*$", r"\[sudo\] пароль для .*:", r"Введите пароль[:\s]*$", ], "confirm": [ r"[Yy]es/[Nn]o[?:\s]*$", r"\[?[Yy]\]?/?\[?[Nn]\]?", r"Do you want to continue", r"Continue\?", r"Are you sure", r"Is this OK", r"[Yy]es or [Nn]o", r"[Дд]а/[Нн]ет", r"[Пп]родолжить", ], "shell_prompt": [ r"[$#]\s*$", r"[>$]\s*$", r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$", ], } def detect_input_type(text: str) -> Optional[str]: """Определить тип запроса ввода по тексту.""" text = text.strip() # Проверка на пароль for pattern in INPUT_PATTERNS["password"]: if re.search(pattern, text, re.MULTILINE): return "password" # Проверка на подтверждение for pattern in INPUT_PATTERNS["confirm"]: if re.search(pattern, text, re.MULTILINE): return "confirm" # Проверка на приглашение оболочки for pattern in INPUT_PATTERNS["shell_prompt"]: if re.search(pattern, text, re.MULTILINE): return "prompt" return None def clean_ansi_codes(text: str) -> str: """ Очистка ANSI-кодов и мусора из вывода терминала. Обрабатывает: - ANSI escape-последовательности \x1b[...m - «Битые» ANSI-коды без escape-символа (например [33m, [0m) - Символы замены Unicode () - Кириллические имитации ANSI-кодов (например [0м) """ # Удаляем ANSI escape-последовательности text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z]', '', text) # Удаляем «битые» ANSI-коды: [33m, [0m, [1m и т.д. (латиница и кириллица) text = re.sub(r'\[\d+[мm]', '', text) # Удаляем символы замены Unicode text = text.replace('\ufffd', '') return text def normalize_output(text: str) -> str: """ Нормализовать вывод: обработать \r и убрать пустые строки. \r используется для перезаписи строки (прогресс-баров). """ # Заменяем \r\n на \n text = text.replace('\r\n', '\n') # Обрабатываем \r (возврат каретки) — строки с \r перезаписывают друг друга lines = [] for line in text.split('\n'): if '\r' in line: # Разбиваем по \r и берём последнюю часть (финальное состояние) parts = line.split('\r') line = parts[-1] lines.append(line) text = '\n'.join(lines) # Разбиваем на строки, убираем пустые и trailing пробелы lines = text.split('\n') lines = [line.rstrip() for line in lines if line.strip()] # Очищаем прогресс-бары вида "Текст… 0%Текст… 50%Текст… 100%" # И дублирующийся текст cleaned_lines = [] for line in lines: # Ищем повторяющийся паттерн "текст… цифры%" progress_pattern = re.compile(r'((?:.+?\.{3})\d+%)+') match = progress_pattern.search(line) if match: # Берём последнее вхождение items = re.findall(r'(.+?\.{3})(\d+)%', match.group(0)) if items: last_text, last_percent = items[-1] line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():] # СНАЧАЛА удаляем остатки ANSI-кодов из строки line = re.sub(r'.', '', line) #  + любой символ # Удаляем дублирующийся текст вида "0% [текст] 0% [текст]" dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+') match = dup_pattern.search(line) if match: # Оставляем только первое вхождение line = line[:match.start()] + match.group(1) + line[match.end():] # Удаляем ведущие пробелы (артефакты терминала) line = line.lstrip() if line: cleaned_lines.append(line) return '\n'.join(cleaned_lines) def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str: """ Форматировать длинный вывод: показать первые и последние строки. По умолчанию: первые 10 + последние 10 строк = 20 строк максимум. """ lines = text.split('\n') total_lines = len(lines) if total_lines <= max_lines: return text # Показываем первые head_lines и последние tail_lines head = lines[:head_lines] tail = lines[-tail_lines:] skipped = total_lines - head_lines - tail_lines result = '\n'.join(head) result += f'\n\n... ({skipped} строк пропущено) ...\n' result += '\n'.join(tail) return result async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> tuple[str, bool]: """ Чтение вывода из SSH-процесса с таймаутом. Возвращает (вывод, завершён_ли_процесс). """ output = "" is_done = False try: # Используем readany() для чтения доступных данных while True: try: # readany() читает любые доступные данные data = await asyncio.wait_for(process.stdout.readany(), timeout=timeout) if data: if isinstance(data, bytes): output += data.decode('utf-8', errors='replace') else: output += str(data) logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}") else: # EOF is_done = True break except asyncio.TimeoutError: # Данные закончились logger.debug(f"Timeout stdout, прочитано: {len(output)} байт") if process.returncode is not None: is_done = True break except UnicodeDecodeError as e: logger.debug(f"Ошибка декодирования UTF-8: {e}") continue except Exception as e: # Конец потока logger.debug(f"Конец потока stdout: {type(e).__name__}: {e}") is_done = True break except Exception as e: logger.debug(f"Ошибка чтения SSH stdout: {e}") is_done = True # Читаем stderr если есть error_output = "" try: while True: try: data = await asyncio.wait_for(process.stderr.readany(), timeout=0.5) if data: if isinstance(data, bytes): error_output += data.decode('utf-8', errors='replace') else: error_output += str(data) else: break except (asyncio.TimeoutError, Exception): break except Exception as e: logger.debug(f"Ошибка чтения SSH stderr: {e}") # Объединяем stdout и stderr if error_output: output = output + error_output if output else error_output logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}") return output, is_done def read_pty_output(master_fd: int, timeout: float = 2.0) -> tuple[str, bool]: """ Чтение вывода из PTY с таймаутом. Возвращает (вывод, завершён_ли_процесс). """ output = "" is_done = False total_waited = 0 try: # Устанавливаем non-blocking режим flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) while total_waited < timeout: try: # Ждём данные с коротким таймаутом ready, _, _ = select.select([master_fd], [], [], 0.2) if ready: try: data = os.read(master_fd, 4096) if data: output += data.decode('utf-8', errors='replace') logger.debug(f"Прочитано из PTY: {len(data)} байт") # Сбрасываем таймер если есть данные total_waited = 0 else: is_done = True break except BlockingIOError: # Нет данных, продолжаем ждать pass else: # Timeout - проверяем не завершился ли процесс try: _, status = os.waitpid(-1, os.WNOHANG) if status != 0: logger.debug(f"Процесс завершился со статусом: {status}") is_done = True break except ChildProcessError: pass # Если уже что-то прочитали и есть запрос ввода - выходим if output and detect_input_type(output): logger.debug(f"Обнаружен запрос ввода") break total_waited += 0.2 except Exception as e: logger.debug(f"Ошибка при чтении PTY: {e}") break except Exception as e: logger.debug(f"Ошибка чтения PTY: {e}") is_done = True logger.debug(f"read_pty_output: output={len(output)} байт, is_done={is_done}") return output, is_done # --- Система команд --- @dataclass class MenuItem: """Элемент меню.""" label: str callback: str # callback_data для кнопки description: str = "" icon: str = "" children: List["MenuItem"] = field(default_factory=list) command: Optional[str] = None # CLI команда для выполнения is_command: bool = False class MenuBuilder: """Построитель многоуровневого меню.""" def __init__(self): self._menus: Dict[str, List[MenuItem]] = {} def add_menu(self, menu_name: str, items: List[MenuItem]): self._menus[menu_name] = items def get_menu(self, menu_name: str) -> List[MenuItem]: return self._menus.get(menu_name, []) def get_keyboard(self, menu_name: str) -> InlineKeyboardMarkup: """Создает InlineKeyboard для меню.""" items = self._menus.get(menu_name, []) keyboard = [] for item in items: # Иконка уже есть в label, поэтому не добавляем её отдельно button = InlineKeyboardButton( item.label, callback_data=item.callback ) keyboard.append([button]) return InlineKeyboardMarkup(keyboard) class CommandRegistry: """Реестр команд для легкого добавления.""" def __init__(self): self._commands: Dict[str, Callable] = {} def register(self, name: str): """Декоратор для регистрации команды.""" def decorator(func: Callable): self._commands[name] = func return func return decorator def get(self, name: str) -> Optional[Callable]: return self._commands.get(name) def list_commands(self) -> List[str]: return list(self._commands.keys()) # --- Глобальные объекты --- config = BotConfig() state_manager = StateManager() menu_builder = MenuBuilder() command_registry = CommandRegistry() server_manager = ServerManager() # --- Проверка прав доступа --- def check_access(func): """Декоратор для проверки прав доступа пользователя.""" @wraps(func) async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): user_id = update.effective_user.id # Если доступ не ограничен — пропускаем всех if not config.is_access_restricted: return await func(update, context, *args, **kwargs) if user_id not in config.allowed_users: logger.warning(f"Попытка доступа от запрещённого пользователя {user_id}") await update.message.reply_text( "❌ *Доступ запрещён*\n\n" "Ваш ID не добавлен в список разрешённых пользователей.\n" f"Ваш ID: `{user_id}`", parse_mode="Markdown" ) return return await func(update, context, *args, **kwargs) return wrapper # --- Инициализация меню --- def init_menus(): """Инициализация структуры меню.""" # Главное меню main_menu = [ MenuItem("🖥️ Выбор сервера", "server_menu", icon="🖥️"), MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"), MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"), MenuItem("ℹ️ О боте", "about", icon="ℹ️"), ] menu_builder.add_menu("main", main_menu) # Меню серверов server_menu = [ MenuItem("💻 local (localhost)", "server_select_local", icon="💻"), MenuItem("➕ Добавить сервер", "server_add", icon="➕"), MenuItem("⬅️ Назад", "main", icon="⬅️"), ] menu_builder.add_menu("server", server_menu) # Меню предустановленных команд preset_menu = [ MenuItem("📁 Файловая система", "fs_menu", icon="📁"), MenuItem("🔍 Поиск", "search_menu", icon="🔍"), MenuItem("📊 Система", "system_menu", icon="📊"), MenuItem("🌐 Сеть", "network_menu", icon="🌐"), MenuItem("⬅️ Назад", "main", icon="⬅️"), ] menu_builder.add_menu("preset", preset_menu) # Файловая система fs_menu = [ MenuItem("ls -la", "cmd_ls_la", command="ls -la", icon="📄"), MenuItem("pwd", "cmd_pwd", command="pwd", icon="📍"), MenuItem("df -h", "cmd_df", command="df -h", icon="💾"), MenuItem("du -sh *", "cmd_du", command="du -sh * 2>/dev/null | sort -hr | head -20", icon="📊"), MenuItem("⬅️ Назад", "preset", icon="⬅️"), ] menu_builder.add_menu("fs", fs_menu) # Поиск search_menu = [ MenuItem("find . -name", "cmd_find_name", command="find . -maxdepth 3 -name '*.txt' 2>/dev/null", icon="🔎"), MenuItem("grep пример", "cmd_grep", command="grep -r 'example' . 2>/dev/null | head -20", icon="🔍"), MenuItem("which command", "cmd_which", command="which python3 bash git", icon="📍"), MenuItem("⬅️ Назад", "preset", icon="⬅️"), ] menu_builder.add_menu("search", search_menu) # Система system_menu = [ MenuItem("top -n 1", "cmd_top", command="top -bn1 | head -20", icon="📈"), MenuItem("ps aux", "cmd_ps", command="ps aux | head -20", icon="🔄"), MenuItem("free -h", "cmd_free", command="free -h", icon="💾"), MenuItem("uname -a", "cmd_uname", command="uname -a", icon="ℹ️"), MenuItem("uptime", "cmd_uptime", command="uptime", icon="⏱️"), MenuItem("⬅️ Назад", "preset", icon="⬅️"), ] menu_builder.add_menu("system", system_menu) # Сеть network_menu = [ MenuItem("ip addr", "cmd_ip", command="ip addr 2>/dev/null || ifconfig 2>/dev/null", icon="🌐"), MenuItem("ping google", "cmd_ping", command="ping -c 4 google.com 2>&1 | head -10", icon="📡"), MenuItem("netstat", "cmd_netstat", command="ss -tuln 2>/dev/null || netstat -tuln 2>/dev/null | head -20", icon="🔌"), MenuItem("curl ifconfig.me", "cmd_curl_ip", command="curl -s ifconfig.me 2>&1", icon="📍"), MenuItem("⬅️ Назад", "preset", icon="⬅️"), ] menu_builder.add_menu("network", network_menu) # Настройки settings_menu = [ MenuItem("📝 Изменить имя бота", "set_name", icon="📝"), MenuItem("📄 Изменить описание", "set_description", icon="📄"), MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"), MenuItem("👥 Управление доступом", "access_menu", icon="👥"), MenuItem("⬅️ Назад", "main", icon="⬅️"), ] menu_builder.add_menu("settings", settings_menu) # Доступ access_menu = [ MenuItem("📋 Показать разрешённых", "show_access", icon="📋"), MenuItem("➕ Добавить пользователя", "add_access", icon="➕"), MenuItem("➖ Удалить пользователя", "remove_access", icon="➖"), MenuItem("⬅️ Назад", "settings", icon="⬅️"), ] menu_builder.add_menu("access", access_menu) # --- Хендлеры --- @check_access async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка команды /start.""" user = update.effective_user logger.info(f"Пользователь {user.username} ({user.id}) запустил бота") state_manager.reset(user.id) # Показать текущую директорию и сервер working_dir = config.working_directory server = server_manager.get("local") server_desc = server.description if server else "localhost" await update.message.reply_text( f"👋 Привет, {user.first_name}!\n\n" f"{config.icon} *{config.name}*\n" f"_{config.description}_\n\n" f"*Просто отправьте CLI команду в чат* — я её выполню!\n\n" f"🖥️ *Текущий сервер:* `{server_desc}`\n" f"📁 *Рабочая директория:* `{working_dir}`\n\n" f"Используйте `cd путь` для смены директории.\n" f"Или выберите сервер в меню.\n" f"Команда /help покажет справку.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("main") ) @check_access async def menu_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка команды /menu - показывает главное меню.""" user = update.effective_user state = state_manager.get(user.id) # Сброс состояния и возврат к главному меню state_manager.reset(user.id) state.current_menu = "main" # Показать текущую директорию и сервер working_dir = state.working_directory or config.working_directory server = server_manager.get(state.current_server) server_desc = server.description if server else state.current_server await update.message.reply_text( f"🏠 *Главное меню*\n\n" f"🖥️ *Сервер:* `{server_desc}`\n" f"📁 *Директория:* `{working_dir}`\n\n" f"Выберите действие:", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("main") ) @check_access async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка команды /help.""" help_text = f""" 📖 *Справка по боту {config.name}* *Как использовать:* Просто отправьте любую CLI команду в чат — бот выполнит её! *Примеры:* • `ls -la` — список файлов • `pwd` — текущая директория • `df -h` — свободное место на диске • `git status` — статус git *Навигация по директориям:* • `cd путь` — сменить директорию (например, `cd git/project`) • `cd ..` — на уровень вверх • `cd ~` — в домашнюю директорию • `pwd` — показать текущую директорию *Кнопки меню:* • 📋 Предустановленные команды — быстрые команды по категориям • ⚙️ Настройки бота — изменение имени, описания, иконки • ℹ️ О боте — информация *Команды управления:* /start — Запустить бота, главное меню /menu — Показать главное меню с кнопками /help — Эта справка /settings — Настройки *Безопасность:* Команды выполняются от вашего имени. Будьте осторожны с деструктивными командами! """ await update.message.reply_text(help_text, parse_mode="Markdown") @check_access async def settings_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка команды /settings.""" state = state_manager.get(update.effective_user.id) state.current_menu = "settings" await update.message.reply_text( "⚙️ *Настройки бота*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("settings") ) async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка нажатий на кнопки меню.""" query = update.callback_query user_id = query.from_user.id state = state_manager.get(user_id) await query.answer() callback = query.data logger.info(f"Callback: {callback} от пользователя {user_id}") # Обработка навигации if callback == "main": state.current_menu = "main" await query.edit_message_text( "🏠 *Главное меню*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("main") ) elif callback == "preset_menu": state.current_menu = "preset" await query.edit_message_text( "📋 *Предустановленные команды*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("preset") ) elif callback == "fs_menu": await query.edit_message_text( "📁 *Файловая система*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("fs") ) elif callback == "search_menu": await query.edit_message_text( "🔍 *Поиск*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("search") ) elif callback == "system_menu": await query.edit_message_text( "📊 *Система*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("system") ) elif callback == "network_menu": await query.edit_message_text( "🌐 *Сеть*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("network") ) elif callback == "server_menu": # Динамическое обновление меню серверов с кнопками управления servers = server_manager.list_servers() keyboard = [] for srv in servers: # Кнопка выбора сервера + кнопка управления (для не-local) row = [InlineKeyboardButton( srv.display_name, callback_data=f"server_select_{srv.name}" )] if srv.name != "local": row.append(InlineKeyboardButton( "⚙️", callback_data=f"server_manage_{srv.name}" )) keyboard.append(row) keyboard.append([ InlineKeyboardButton("➕ Добавить", callback_data="server_add"), InlineKeyboardButton("⬅️ Назад", callback_data="main") ]) state.current_menu = "server" await query.edit_message_text( "🖥️ *Управление серверами*\n\n" "Выберите сервер для подключения или добавьте новый.\n" "⚙️ — редактировать/удалить сервер", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup(keyboard) ) elif callback == "server_add": state.waiting_for_input = True state.input_type = "add_server_name" state.context["new_server"] = {} await query.edit_message_text( "➕ *Добавление сервера*\n\n" "Введите *имя сервера* (латиница, без пробелов):\n" "Пример: `web-prod`, `db-backup`", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) elif callback.startswith("server_manage_"): server_name = callback.replace("server_manage_", "") server = server_manager.get(server_name) if server and server_name != "local": state.editing_server = server_name await query.edit_message_text( f"⚙️ *Управление сервером*\n\n" f"{server.display_name}\n" f"📍 `{server.description}`\n" f"🏷️ Теги: `{','.join(server.tags) if server.tags else 'нет'}`\n\n" f"Выберите действие:", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([ [InlineKeyboardButton("✏️ Редактировать", callback_data=f"server_edit_{server_name}")], [InlineKeyboardButton("🗑️ Удалить", callback_data=f"server_delete_{server_name}")], [InlineKeyboardButton("⬅️ Назад", callback_data="server_menu")] ]) ) else: await query.edit_message_text( f"❌ *Сервер не найден*\n\n" f"Сервер `{server_name}` отсутствует в конфигурации.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") ) elif callback.startswith("server_edit_"): server_name = callback.replace("server_edit_", "") server = server_manager.get(server_name) if server and server_name != "local": state.editing_server = server_name state.waiting_for_input = True state.input_type = "edit_server_field" password_status = "установлен" if server.password else "не установлен" await query.edit_message_text( f"✏️ *Редактирование сервера: {server_name}*\n\n" f"Текущие значения:\n" f"• Host: `{server.host}`\n" f"• Port: `{server.port}`\n" f"• User: `{server.user}`\n" f"• Tags: `{','.join(server.tags) if server.tags else 'нет'}`\n" f"• Password: {password_status}\n\n" f"Введите номер поля для изменения:\n" f"1 — Host\n" f"2 — Port\n" f"3 — User\n" f"4 — Tags\n" f"5 — Password", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) else: await query.edit_message_text( "❌ Ошибка: сервер не найден", reply_markup=menu_builder.get_keyboard("server") ) elif callback.startswith("server_delete_"): server_name = callback.replace("server_delete_", "") server = server_manager.get(server_name) if server and server_name != "local": # Удаляем сразу с подтверждением if server_manager.delete_server(server_name): await query.edit_message_text( f"🗑️ *Сервер удалён*\n\n" f"Сервер `{server_name}` успешно удалён из конфигурации.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") ) else: await query.edit_message_text( "❌ Ошибка при удалении сервера", reply_markup=menu_builder.get_keyboard("server") ) else: await query.edit_message_text( "❌ Нельзя удалить local сервер", reply_markup=menu_builder.get_keyboard("server") ) elif callback == "srv_skip_password": # Пропуск пароля при добавлении сервера user_id = query.from_user.id state = state_manager.get(user_id) state.context["new_server"]["password"] = "" state.input_type = "add_server_tags" await query.edit_message_text( "✅ Пароль пропущен (будет использоваться только ключ)\n\n" "Введите *теги* через запятую (или нажмите Пропустить):\n" "Пример: `web,prod`, `db,backup`\n\n" "Теги помогают группировать серверы.", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([ [InlineKeyboardButton("⏭️ Пропустить", callback_data="srv_skip_tags")], [InlineKeyboardButton("❌ Отмена", callback_data="server_menu")] ]) ) elif callback == "srv_skip_tags": # Пропуск тегов при добавлении сервера user_id = query.from_user.id state = state_manager.get(user_id) new_server = state.context.get("new_server", {}) if new_server.get("name") and new_server.get("host") and new_server.get("port") and new_server.get("user"): if server_manager.add_server( name=new_server["name"], host=new_server["host"], port=new_server["port"], user=new_server["user"], tags=[], password=new_server.get("password", "") ): await query.edit_message_text( "✅ *Сервер добавлен*\n\n" f"Имя: `{new_server['name']}`\n" f"Host: `{new_server['host']}`\n" f"Port: `{new_server['port']}`\n" f"User: `{new_server['user']}`\n" f"Tags: нет\n" f"Password: {'установлен' if new_server.get('password') else 'не установлен'}\n\n" f"Сервер сохранён в `.env` и доступен для выбора.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") ) else: await query.edit_message_text( "❌ Ошибка: сервер с таким именем уже существует", reply_markup=menu_builder.get_keyboard("server") ) else: await query.edit_message_text( "❌ Ошибка: неполные данные сервера", reply_markup=menu_builder.get_keyboard("server") ) state.waiting_for_input = False state.input_type = None state.context.clear() elif callback.startswith("server_select_"): server_name = callback.replace("server_select_", "") server = server_manager.get(server_name) if server: state.current_server = server_name # Сброс рабочей директории при смене сервера state.working_directory = None await query.edit_message_text( f"✅ *Сервер изменён*\n\n" f"{server.display_name}\n" f"📍 `{server.description}`\n\n" f"Теперь команды выполняются на этом сервере.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("main") ) state.current_menu = "main" else: await query.edit_message_text( f"❌ *Сервер не найден*\n\n" f"Сервер `{server_name}` отсутствует в конфигурации.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") ) elif callback == "settings_menu": state.current_menu = "settings" await query.edit_message_text( "⚙️ *Настройки бота*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("settings") ) elif callback == "access_menu": await query.edit_message_text( "👥 *Управление доступом*", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("access") ) # Обработка команд выполнения elif callback.startswith("cmd_"): # Поиск команды в меню command = None for menu_items in menu_builder._menus.values(): for item in menu_items: if item.callback == callback and item.command: command = item.command break if command: await execute_cli_command(query, command) else: await query.edit_message_text("❌ Команда не найдена") # Настройки бота - только просмотр, изменение через .env elif callback == "set_name": await query.edit_message_text( "📝 *Изменение имени бота*\n\n" f"Текущее имя: `{config.name}`\n\n" "Для изменения отредактируйте `.env`:\n" "```\nBOT_NAME=Ваше имя\n```", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("settings") ) elif callback == "set_description": await query.edit_message_text( "📄 *Изменение описания бота*\n\n" f"Текущее описание: `{config.description}`\n\n" "Для изменения отредактируйте `.env`:\n" "```\nBOT_DESCRIPTION=Ваше описание\n```", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("settings") ) elif callback == "set_icon": await query.edit_message_text( "🎨 *Изменение иконки бота*\n\n" f"Текущая иконка: `{config.icon}`\n\n" "Для изменения отредактируйте `.env`:\n" "```\nBOT_ICON_EMOJI=🤖\n```", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("settings") ) elif callback == "show_access": if config.allowed_users: text = "👥 *Разрешённые пользователи:*\n" + "\n".join(f"• `{uid}`" for uid in config.allowed_users) else: text = "👥 *Доступ открыт для всех*\n\n(список разрешённых пользователей пуст)" await query.edit_message_text(text, parse_mode="Markdown") elif callback == "add_access": await query.edit_message_text( "➕ *Добавление пользователя*\n\n" "Для добавления пользователя отредактируйте `.env`:\n" "```\nALLOWED_USERS=123456789,987654321\n```\n" "Ваш ID можно узнать через @userinfobot", parse_mode="Markdown" ) elif callback == "remove_access": if config.allowed_users: text = "➖ *Удаление пользователя*\n\n" + "\n".join(f"• `{uid}`" for uid in config.allowed_users) text += "\n\nУдалите ID из `.env` чтобы убрать доступ" else: text = "➖ Список пуст, некого удалять" await query.edit_message_text(text, parse_mode="Markdown") elif callback == "about": await query.edit_message_text( f"ℹ️ *О боте*\n\n" f"*{config.icon} {config.name}*\n" f"_{config.description}_\n\n" f"Версия: 1.0.0\n" f"Рабочая директория: `{config.working_directory}`\n\n" f"Бот позволяет выполнять CLI команды на вашем ПК\n" f"через интерфейс Telegram.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("main") ) state.current_menu = "main" async def execute_cli_command(query, command: str): """Выполнение CLI команды из кнопки меню.""" user_id = query.from_user.id state = state_manager.get(user_id) server_name = state.current_server server = server_manager.get(server_name) # Определяем рабочую директорию working_dir = state.working_directory or config.working_directory logger.info(f"Выполнение команды: {command} на сервере: {server_name}, в директории: {working_dir}") # Если локальный сервер — выполняем локально if server_name == "local" or server is None: await _execute_local_command(query, command, working_dir) else: # Выполняем через SSH await _execute_ssh_command(query, command, server, working_dir) async def _execute_local_command(query, command: str, working_dir: str): """Выполнение локальной команды через PTY.""" user_id = query.from_user.id try: logger.info(f"Создание PTY для команды: {command}") # Создаём PTY master_fd, slave_fd = pty.openpty() logger.info(f"PTY создан: master_fd={master_fd}") # Запускаем процесс в PTY pid = os.fork() if pid == 0: # Дочерний процесс os.close(master_fd) os.setsid() os.dup2(slave_fd, 0) # stdin os.dup2(slave_fd, 1) # stdout os.dup2(slave_fd, 2) # stderr os.close(slave_fd) os.chdir(working_dir) os.execvp("/bin/bash", ["/bin/bash", "-c", command]) else: # Родительский процесс os.close(slave_fd) logger.info(f"Процесс запущен: pid={pid}") # Создаём сессию session = local_session_manager.create_session( user_id=user_id, command=command, master_fd=master_fd, pid=pid ) # Читаем начальный вывод logger.info("Чтение вывода из PTY...") output, is_done = read_pty_output(master_fd, timeout=3.0) logger.info(f"Прочитано: {len(output)} байт, is_done={is_done}") logger.debug(f"Вывод: {output[:500] if output else '(пусто)'}") session.output_buffer = output session.last_activity = datetime.now() # Проверяем тип ввода input_type = detect_input_type(output) logger.info(f"Тип ввода: {input_type}") if input_type == "password": session.waiting_for_input = True session.input_type = "password" await query.edit_message_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"🔐 *Запрошен пароль*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте пароль в чат:", parse_mode="Markdown" ) return elif input_type == "confirm": session.waiting_for_input = True session.input_type = "confirm" await query.edit_message_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"❓ *Требуется подтверждение*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте `y` (да) или `n` (нет):", parse_mode="Markdown" ) return elif is_done: local_session_manager.close_session(user_id) await _show_result(query, command, output.encode(), b"", 0) return else: # Команда ещё выполняется await query.edit_message_text( f"⏳ *Выполнение...*\n\n" f"Команда: `{command}`\n\n" f"```\n{output.strip()[-500:] if output else 'Выполняется...'}\n```", parse_mode="Markdown" ) while not is_done: more_output, is_done = read_pty_output(master_fd, timeout=5.0) output += more_output session.output_buffer = output session.last_activity = datetime.now() input_type = detect_input_type(output) if input_type in ("password", "confirm"): session.waiting_for_input = True session.input_type = input_type await query.edit_message_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"{'🔐 *Запрошен пароль*' if input_type == 'password' else '❓ *Требуется подтверждение'}\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"{'Отправьте пароль в чат:' if input_type == 'password' else 'Отправьте `y` (да) или `n` (нет):'}", parse_mode="Markdown" ) return local_session_manager.close_session(user_id) await _show_result(query, command, output.encode(), b"", 0) except Exception as e: logger.error(f"Ошибка выполнения команды: {e}") local_session_manager.close_session(user_id) await query.edit_message_text( f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" ) async def _execute_ssh_command(query, command: str, server: Server, working_dir: str): """Выполнение команды через SSH с интерактивной сессией.""" user_id = query.from_user.id try: # Подготовка SSH ключа client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None # Подготовка параметров подключения connect_kwargs = { "host": server.host, "port": server.port, "username": server.user, "client_host_keys": None, "known_hosts": None } # Добавляем ключ или пароль if client_keys: connect_kwargs["client_keys"] = client_keys if server.password: connect_kwargs["password"] = server.password logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") # Подключение к серверу conn = await asyncssh.connect(**connect_kwargs) # Выполнение команды с cd в рабочую директорию full_command = f"cd {working_dir} && {command}" if working_dir else command # Создаем интерактивный процесс с PTY для поддержки ввода # TERM环境变量设置 для корректной кодировки process = await conn.create_process( full_command, term_type='xterm-256color', env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} ) # Создаём сессию session = ssh_session_manager.create_session( user_id=user_id, server=server, working_dir=working_dir, conn=conn, process=process, command=command ) # Читаем начальный вывод output, is_done = await read_ssh_output(process, timeout=3.0) session.output_buffer = output session.last_activity = datetime.now() # Читаем пока процесс не завершится while not is_done: more_output, is_done = await read_ssh_output(process, timeout=2.0) output += more_output session.output_buffer = output session.last_activity = datetime.now() # Проверяем тип ввода input_type = detect_input_type(output) if input_type == "password": # Запрос пароля session.waiting_for_input = True session.input_type = "password" await query.edit_message_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"🔐 *Запрошен пароль*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте пароль в чат:", parse_mode="Markdown" ) return elif input_type == "confirm": # Запрос подтверждения session.waiting_for_input = True session.input_type = "confirm" await query.edit_message_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"❓ *Требуется подтверждение*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте `y` (да) или `n` (нет):", parse_mode="Markdown" ) return else: # Команда завершена, показываем результат ssh_session_manager.close_session(user_id) await _show_result(query, command, output.encode(), "", 0) return except asyncssh.Error as e: logger.error(f"SSH ошибка: {e}") ssh_session_manager.close_session(user_id) await query.edit_message_text( f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" ) except asyncio.TimeoutError: logger.error("Таймаут SSH подключения") ssh_session_manager.close_session(user_id) await query.edit_message_text( "❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.", parse_mode="Markdown" ) except Exception as e: logger.error(f"Ошибка выполнения команды: {e}") ssh_session_manager.close_session(user_id) await query.edit_message_text( f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" ) async def _show_result(query, command: str, stdout: bytes, stderr: bytes, returncode: int): """Показ результата выполнения команды.""" output = clean_ansi_codes(stdout.decode("utf-8", errors="replace")) output = normalize_output(output) error = clean_ansi_codes(stderr.decode("utf-8", errors="replace")) result = f"✅ *Результат:*\n\n" if output: # Форматируем длинный вывод output = format_long_output(output) if len(output) > 4000: output = output[:4000] + "\n... (вывод обрезан)" result += f"```\n{output}\n```\n" if error: if len(error) > 4000: error = error[:4000] + "\n... (вывод обрезан)" result += f"*Ошибки:*\n```\n{error}\n```\n" result += f"\n*Код возврата:* `{returncode}`" await query.edit_message_text(result, parse_mode="Markdown") @check_access async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка текстовых сообщений как CLI команд.""" user_id = update.effective_user.id text = update.message.text.strip() state = state_manager.get(user_id) # Проверка: не в режиме ввода данных сервера ли мы if state.waiting_for_input: await handle_server_input(update, text) return # Проверка: не активная ли SSH-сессия ожидает ввода ssh_session = ssh_session_manager.get_session(user_id) if ssh_session and ssh_session.waiting_for_input: await handle_ssh_session_input(update, text, ssh_session) return # Проверка: не активная ли локальная сессия ожидает ввода local_session = local_session_manager.get_session(user_id) if local_session and local_session.waiting_for_input: await handle_local_session_input(update, text, local_session) return # Любое текстовое сообщение = CLI команда logger.info(f"Пользователь {user_id} отправил команду: {text}") await execute_cli_command_from_message(update, text) async def handle_ssh_session_input(update: Update, text: str, session: SSHSession): """Обработка ввода пользователя в активную SSH-сессию.""" user_id = update.effective_user.id input_type = session.input_type logger.info(f"Пользователь {user_id} ввёл '{text}' в SSH-сессию (тип: {input_type})") try: # Отправляем ввод в SSH-процесс if input_type == "password": # Пароль отправляем с newline session.process.stdin.write(text + "\n") elif input_type == "confirm": # Подтверждение - y или n answer = "y" if text.lower() in ("y", "yes", "да", "д") else "n" session.process.stdin.write(answer + "\n") else: # Обычный ввод session.process.stdin.write(text + "\n") await session.process.stdin.drain() session.last_activity = datetime.now() # Читаем ответ output, is_done = await read_ssh_output(session.process, timeout=3.0) session.output_buffer += output # Проверяем тип ввода new_input_type = detect_input_type(output) if new_input_type == "password": session.waiting_for_input = True session.input_type = "password" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"🔐 *Запрошен пароль*\n\n" f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" f"Отправьте пароль в чат:", parse_mode="Markdown" ) return elif new_input_type == "confirm": session.waiting_for_input = True session.input_type = "confirm" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"❓ *Требуется подтверждение*\n\n" f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" f"Отправьте `y` (да) или `n` (нет):", parse_mode="Markdown" ) return elif is_done or new_input_type == "prompt": # Команда завершена await update.message.reply_text( f"✅ *Результат:*\n\n" f"```\n{session.command}\n```\n\n" f"```\n{session.output_buffer.strip()[-4000:]}\n```", parse_mode="Markdown" ) ssh_session_manager.close_session(user_id) return else: # Команда ещё выполняется await update.message.reply_text( f"⏳ *Выполнение...*\n\n" f"```\n{output.strip()[-500:] if output else 'Выполняется...'}\n```", parse_mode="Markdown" ) # Читаем остаток while not is_done: more_output, is_done = await read_ssh_output(session.process, timeout=5.0) output += more_output session.output_buffer += output session.last_activity = datetime.now() new_input_type = detect_input_type(output) if new_input_type in ("password", "confirm"): session.waiting_for_input = True session.input_type = new_input_type await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"{'🔐 *Запрошен пароль*' if new_input_type == 'password' else '❓ *Требуется подтверждение'}\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"{'Отправьте пароль в чат:' if new_input_type == 'password' else 'Отправьте `y` (да) или `n` (нет):'}", parse_mode="Markdown" ) return # Завершено await update.message.reply_text( f"✅ *Результат:*\n\n" f"```\n{session.command}\n```\n\n" f"```\n{session.output_buffer.strip()[-4000:]}\n```", parse_mode="Markdown" ) ssh_session_manager.close_session(user_id) except Exception as e: logger.error(f"Ошибка ввода в SSH-сессию: {e}") ssh_session_manager.close_session(user_id) await update.message.reply_text( f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" ) async def handle_local_session_input(update: Update, text: str, session: LocalSession): """Обработка ввода пользователя в локальную сессию.""" user_id = update.effective_user.id input_type = session.input_type logger.info(f"Пользователь {user_id} ввёл '{text}' в локальную сессию (тип: {input_type})") try: child = session.context.get('child') if not child: raise Exception("Сессия не содержит child объект") # Отправляем ввод if input_type == "password": child.sendline(text) elif input_type == "confirm": answer = "y" if text.lower() in ("y", "yes", "да", "д") else "n" child.sendline(answer) else: child.sendline(text) session.last_activity = datetime.now() # Читаем ответ logger.info("Чтение ответа...") output = "" try: while True: line = child.read_nonblocking(size=4096, timeout=5.0) if not line: break output += line logger.debug(f"Прочитано: {len(line)} символов") # Проверяем запрос ввода if detect_input_type(output): break except pexpect.TIMEOUT: pass except pexpect.EOF: pass logger.info(f"После ввода прочитано: {len(output)} символов") session.output_buffer += output # Проверяем тип ввода new_input_type = detect_input_type(output) if new_input_type == "password": session.waiting_for_input = True session.input_type = "password" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"🔐 *Запрошен пароль*\n\n" f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" f"Отправьте пароль в чат:", parse_mode="Markdown" ) return elif new_input_type == "confirm": session.waiting_for_input = True session.input_type = "confirm" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"❓ *Требуется подтверждение*\n\n" f"```\n{output.strip()[-200:] if output else 'Ожидание...'}\n```\n\n" f"Отправьте `y` (да) или `n` (нет):", parse_mode="Markdown" ) return else: # Команда завершена # Очищаем ANSI-коды и нормализуем вывод cleaned_output = clean_ansi_codes(session.output_buffer) cleaned_output = normalize_output(cleaned_output) # Форматируем длинный вывод: первые 5 и последние 10 строк formatted_output = format_long_output(cleaned_output.strip(), max_lines=15, head_lines=5, tail_lines=10) if len(formatted_output) > 4000: formatted_output = formatted_output[:4000] + "\n... (вывод обрезан)" await update.message.reply_text( f"✅ *Результат:*\n\n" f"```\n{session.command}\n```\n\n" f"```\n{formatted_output}\n```", parse_mode="Markdown" ) local_session_manager.close_session(user_id) except Exception as e: logger.error(f"Ошибка ввода в локальную сессию: {e}") local_session_manager.close_session(user_id) await update.message.reply_text( f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown" ) async def handle_server_input(update: Update, text: str): """Обработка ввода данных для CRUD операций с серверами.""" user_id = update.effective_user.id state = state_manager.get(user_id) input_type = state.input_type if input_type == "add_server_name": # Проверка имени if not text.replace("-", "").replace("_", "").isalnum(): await update.message.reply_text( "❌ Неверный формат имени.\n\n" "Используйте только латиницу, дефисы и подчёркивания.\n" "Пример: `web-prod`, `db_backup`", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) return state.context["new_server"]["name"] = text state.input_type = "add_server_host" await update.message.reply_text( f"✅ Имя: `{text}`\n\n" "Введите *host* (IP или домен):\n" "Пример: `192.168.1.10`, `example.com`", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) elif input_type == "add_server_host": state.context["new_server"]["host"] = text state.input_type = "add_server_port" await update.message.reply_text( f"✅ Host: `{text}`\n\n" "Введите *SSH порт* (обычно 22):", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) elif input_type == "add_server_port": try: port = int(text) if port < 1 or port > 65535: raise ValueError() state.context["new_server"]["port"] = port state.input_type = "add_server_user" await update.message.reply_text( f"✅ Port: `{port}`\n\n" "Введите *SSH пользователя*:\n" "Пример: `root`, `admin`, `ubuntu`", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) except ValueError: await update.message.reply_text( "❌ Неверный формат порта.\n\n" "Введите число от 1 до 65535:", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) elif input_type == "add_server_user": state.context["new_server"]["user"] = text state.input_type = "add_server_password" await update.message.reply_text( f"✅ User: `{text}`\n\n" "Введите *SSH пароль* (или нажмите Пропустить для подключения только по ключу):\n" "⚠️ Пароль будет сохранён в .env файл в открытом виде!", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([ [InlineKeyboardButton("⏭️ Пропустить", callback_data="srv_skip_password")], [InlineKeyboardButton("❌ Отмена", callback_data="server_menu")] ]) ) elif input_type == "add_server_password": state.context["new_server"]["password"] = text state.input_type = "add_server_tags" await update.message.reply_text( "✅ Пароль сохранён\n\n" "Введите *теги* через запятую (или нажмите Пропустить):\n" "Пример: `web,prod`, `db,backup`\n\n" "Теги помогают группировать серверы.", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup([ [InlineKeyboardButton("⏭️ Пропустить", callback_data="srv_skip_tags")], [InlineKeyboardButton("❌ Отмена", callback_data="server_menu")] ]) ) elif input_type == "add_server_tags": # Обработка ввода тегов (если пользователь ввёл текстом, а не нажал кнопку) tags = [t.strip() for t in text.split(",") if t.strip()] state.context["new_server"]["tags"] = tags # Завершение добавления new_server = state.context.get("new_server", {}) if server_manager.add_server( name=new_server["name"], host=new_server["host"], port=new_server["port"], user=new_server["user"], tags=tags, password=new_server.get("password", "") ): await update.message.reply_text( "✅ *Сервер добавлен*\n\n" f"Имя: `{new_server['name']}`\n" f"Host: `{new_server['host']}`\n" f"Port: `{new_server['port']}`\n" f"User: `{new_server['user']}`\n" f"Tags: `{','.join(tags)}`\n" f"Password: {'установлен' if new_server.get('password') else 'не установлен'}\n\n" f"Сервер сохранён в `.env` и доступен для выбора.", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") ) else: await update.message.reply_text( "❌ Ошибка: сервер с таким именем уже существует", reply_markup=menu_builder.get_keyboard("server") ) state.waiting_for_input = False state.input_type = None state.context.clear() elif input_type == "edit_server_field": # Выбор поля для редактирования if text == "1": state.input_type = "edit_server_host" await update.message.reply_text( "Введите новый *host*:", parse_mode="Markdown" ) elif text == "2": state.input_type = "edit_server_port" await update.message.reply_text( "Введите новый *port*:", parse_mode="Markdown" ) elif text == "3": state.input_type = "edit_server_user" await update.message.reply_text( "Введите нового *user*:", parse_mode="Markdown" ) elif text == "4": state.input_type = "edit_server_tags" await update.message.reply_text( "Введите новые *теги* через запятую:", parse_mode="Markdown" ) elif text == "5": state.input_type = "edit_server_password" await update.message.reply_text( "Введите новый *password* (или оставьте пустым для подключения только по ключу):\n" "⚠️ Пароль будет сохранён в .env файл в открытом виде!", parse_mode="Markdown" ) else: await update.message.reply_text( "❌ Введите номер поля (1-5):", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("❌ Отмена", callback_data="server_menu") ]]) ) return elif input_type == "edit_server_host": server_manager.update_server(state.editing_server, host=text) await finish_edit_server(update, state) elif input_type == "edit_server_port": try: port = int(text) server_manager.update_server(state.editing_server, port=port) await finish_edit_server(update, state) except ValueError: await update.message.reply_text("❌ Неверный формат порта") return elif input_type == "edit_server_user": server_manager.update_server(state.editing_server, user=text) await finish_edit_server(update, state) elif input_type == "edit_server_tags": tags = [t.strip() for t in text.split(",") if t.strip()] server_manager.update_server(state.editing_server, tags=tags) await finish_edit_server(update, state) elif input_type == "edit_server_password": server_manager.update_server(state.editing_server, password=text) await finish_edit_server(update, state) else: # Неизвестный тип ввода - выполняем команду await execute_cli_command_from_message(update, text) return # Сброс состояния после завершения if not state.waiting_for_input or input_type.startswith("add_server_tags"): state.waiting_for_input = False state.input_type = None state.context.clear() async def finish_edit_server(update: Update, state): """Завершение редактирования сервера.""" server_name = state.editing_server state.waiting_for_input = False state.input_type = None state.editing_server = None server = server_manager.get(server_name) if server: await update.message.reply_text( "✅ *Сервер обновлён*\n\n" f"{server.display_name}\n" f"📍 `{server.description}`", parse_mode="Markdown", reply_markup=menu_builder.get_keyboard("server") ) else: await update.message.reply_text( "❌ Ошибка при обновлении сервера", reply_markup=menu_builder.get_keyboard("server") ) async def execute_cli_command_from_message(update: Update, command: str): """Выполнение CLI команды из сообщения.""" user_id = update.effective_user.id state = state_manager.get(user_id) server_name = state.current_server server = server_manager.get(server_name) # Определяем рабочую директорию working_dir = state.working_directory or config.working_directory # Обработка команды cd - меняем директорию пользователя # Работает только с простыми командами cd, не с составными cmd_stripped = command.strip() if cmd_stripped.startswith("cd ") and "&&" not in cmd_stripped and ";" not in cmd_stripped and "|" not in cmd_stripped: parts = cmd_stripped.split(maxsplit=1) if len(parts) == 2: target_dir = parts[1] # Обработка ~ и относительных путей if target_dir.startswith("~"): target_dir = str(Path.home()) + target_dir[1:] elif not target_dir.startswith("/"): target_dir = str(Path(working_dir) / target_dir) # Проверка существования директории if Path(target_dir).is_dir(): state.working_directory = target_dir await update.message.reply_text( f"📁 *Директория изменена:*\n`{target_dir}`\n" f"🖥️ Сервер: `{server_name}`", parse_mode="Markdown" ) else: await update.message.reply_text( f"❌ *Директория не найдена:*\n`{target_dir}`", parse_mode="Markdown" ) return # Для составных команд с cd — выполняем через SSH или локально if "cd " in cmd_stripped and ("&&" in cmd_stripped or ";" in cmd_stripped): if server_name == "local" or server is None: await _execute_composite_command_local(update, cmd_stripped, working_dir) else: await _execute_composite_command_ssh(update, cmd_stripped, server, working_dir) return # Обычное выполнение if server_name == "local" or server is None: await _execute_local_command_message(update, cmd_stripped, working_dir) else: await _execute_ssh_command_message(update, cmd_stripped, server, working_dir) async def _execute_composite_command_local(update: Update, command: str, working_dir: str): """Выполнение составной команды локально.""" command_with_pwd = f"{command} && pwd" logger.info(f"Выполнение составной команды с cd: {command_with_pwd} в директории: {working_dir}") try: process = await asyncio.create_subprocess_shell( command_with_pwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=working_dir ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30) output = stdout.decode("utf-8", errors="replace").strip() error = stderr.decode("utf-8", errors="replace") # Последняя строка - это pwd if output and process.returncode == 0: lines = output.split('\n') final_dir = lines[-1].strip() if Path(final_dir).is_dir(): state_manager.get(update.effective_user.id).working_directory = final_dir output = '\n'.join(lines[:-1]) await _show_result_message(update, command, output, error, process.returncode) except asyncio.TimeoutError: await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown") except Exception as e: logger.error(f"Ошибка: {e}") await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") async def _execute_composite_command_ssh(update: Update, command: str, server: Server, working_dir: str): """Выполнение составной команды через SSH с интерактивной сессией.""" user_id = update.effective_user.id command_with_pwd = f"{command} && pwd" try: client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None # Подготовка параметров подключения connect_kwargs = { "host": server.host, "port": server.port, "username": server.user, "client_host_keys": None, "known_hosts": None } if client_keys: connect_kwargs["client_keys"] = client_keys if server.password: connect_kwargs["password"] = server.password logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") conn = await asyncssh.connect(**connect_kwargs) # Выполнение команды с cd в рабочую директорию full_command = f"cd {working_dir} && {command_with_pwd}" if working_dir else command_with_pwd # Создаем интерактивный процесс с PTY для поддержки ввода # TERM环境变量设置 для корректной кодировки process = await conn.create_process( full_command, term_type='xterm-256color', env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} ) # Создаём сессию session = ssh_session_manager.create_session( user_id=user_id, server=server, working_dir=working_dir, conn=conn, process=process, command=command ) # Читаем начальный вывод output, is_done = await read_ssh_output(process, timeout=3.0) session.output_buffer = output session.last_activity = datetime.now() # Читаем пока процесс не завершится while not is_done: more_output, is_done = await read_ssh_output(process, timeout=2.0) output += more_output session.output_buffer = output session.last_activity = datetime.now() # Проверяем тип ввода input_type = detect_input_type(output) if input_type == "password": session.waiting_for_input = True session.input_type = "password" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"🔐 *Запрошен пароль*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте пароль в чат:", parse_mode="Markdown" ) return elif input_type == "confirm": session.waiting_for_input = True session.input_type = "confirm" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"❓ *Требуется подтверждение*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте `y` (да) или `n` (нет):", parse_mode="Markdown" ) return else: # Обработка pwd для смены директории if output: lines = output.strip().split('\n') final_dir = lines[-1].strip() if final_dir.startswith('/'): state_manager.get(user_id).working_directory = final_dir output = '\n'.join(lines[:-1]) ssh_session_manager.close_session(user_id) await _show_result_message(update, command, output, "", 0) return except asyncssh.Error as e: logger.error(f"SSH ошибка: {e}") ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") except asyncio.TimeoutError: logger.error("Таймаут SSH подключения") ssh_session_manager.close_session(user_id) await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown") except Exception as e: logger.error(f"Ошибка: {e}") ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") async def _execute_local_command_message(update: Update, command: str, working_dir: str): """Выполнение локальной команды из сообщения через pexpect.""" user_id = update.effective_user.id try: logger.info(f"Запуск команды через pexpect: {command}") # Создаём интерактивный процесс child = pexpect.spawn( '/bin/bash', ['-c', command], cwd=working_dir, encoding='utf-8', codec_errors='replace', echo=False, timeout=30 ) # Создаём сессию (используем child вместо master_fd) session = local_session_manager.create_session( user_id=user_id, command=command, master_fd=child.child_fd, pid=child.pid ) session.context = {'child': child} # Сохраняем child объект # Читаем начальный вывод logger.info("Чтение вывода...") output = "" try: # Пробуем прочитать с таймаутом while True: line = child.read_nonblocking(size=4096, timeout=2.0) if not line: break output += line logger.debug(f"Прочитано: {len(line)} символов") # Проверяем запрос ввода if detect_input_type(output): break except pexpect.TIMEOUT: pass except pexpect.EOF: pass logger.info(f"Прочитано: {len(output)} символов") session.output_buffer = output session.last_activity = datetime.now() # Проверяем тип ввода input_type = detect_input_type(output) logger.info(f"Тип ввода: {input_type}") if input_type == "password": session.waiting_for_input = True session.input_type = "password" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"🔐 *Запрошен пароль*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте пароль в чат:", parse_mode="Markdown" ) return elif input_type == "confirm": session.waiting_for_input = True session.input_type = "confirm" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"❓ *Требуется подтверждение*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте `y` (да) или `n` (нет):", parse_mode="Markdown" ) return else: # Команда завершена local_session_manager.close_session(user_id) await _show_result_message(update, command, output, "", 0) except Exception as e: logger.error(f"Ошибка выполнения команды: {e}") local_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") async def _execute_ssh_command_message(update: Update, command: str, server: Server, working_dir: str): """Выполнение команды через SSH из сообщения с интерактивной сессией.""" user_id = update.effective_user.id try: client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None # Подготовка параметров подключения connect_kwargs = { "host": server.host, "port": server.port, "username": server.user, "client_host_keys": None, "known_hosts": None } if client_keys: connect_kwargs["client_keys"] = client_keys if server.password: connect_kwargs["password"] = server.password logger.info(f"SSH подключение к {server.host}:{server.port} как {server.user}") conn = await asyncssh.connect(**connect_kwargs) # Выполнение команды с cd в рабочую директорию full_command = f"cd {working_dir} && {command}" if working_dir else command # Создаем интерактивный процесс с PTY для поддержки ввода # TERM环境变量设置 для корректной кодировки process = await conn.create_process( full_command, term_type='xterm-256color', env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'} ) # Создаём сессию session = ssh_session_manager.create_session( user_id=user_id, server=server, working_dir=working_dir, conn=conn, process=process, command=command ) # Читаем начальный вывод output, is_done = await read_ssh_output(process, timeout=3.0) session.output_buffer = output session.last_activity = datetime.now() # Читаем пока процесс не завершится while not is_done: more_output, is_done = await read_ssh_output(process, timeout=2.0) output += more_output session.output_buffer = output session.last_activity = datetime.now() # Проверяем тип ввода input_type = detect_input_type(output) if input_type == "password": session.waiting_for_input = True session.input_type = "password" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"🔐 *Запрошен пароль*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте пароль в чат:", parse_mode="Markdown" ) return elif input_type == "confirm": session.waiting_for_input = True session.input_type = "confirm" await update.message.reply_text( f"⏳ *Требуется ввод*\n\n" f"Команда: `{command}`\n\n" f"❓ *Требуется подтверждение*\n\n" f"```\n{output.strip()[-200:]}\n```\n\n" f"Отправьте `y` (да) или `n` (нет):", parse_mode="Markdown" ) return else: # Команда завершена, показываем результат ssh_session_manager.close_session(user_id) await _show_result_message(update, command, output, "", 0) return except asyncssh.Error as e: logger.error(f"SSH ошибка: {e}") ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") except asyncio.TimeoutError: logger.error("Таймаут SSH подключения") ssh_session_manager.close_session(user_id) await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown") except Exception as e: logger.error(f"Ошибка: {e}") ssh_session_manager.close_session(user_id) await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") async def _show_result_message(update: Update, command: str, output: str, error: str, returncode: int): """Показ результата выполнения команды.""" # Очистка ANSI-кодов и нормализация output = normalize_output(clean_ansi_codes(output)) if output else "" error = clean_ansi_codes(error) if error else "" result = f"✅ *Результат:*\n\n" if output: # Форматируем длинный вывод: первые 5 и последние 10 строк output = format_long_output(output, max_lines=15, head_lines=5, tail_lines=10) if len(output) > 4000: output = output[:4000] + "\n... (вывод обрезан)" result += f"```\n{output}\n```\n" if error: if len(error) > 4000: error = error[:4000] + "\n... (вывод обрезан)" result += f"*Ошибки:*\n```\n{error}\n```\n" result += f"\n*Код возврата:* `{returncode}`" await update.message.reply_text(result, parse_mode="Markdown") async def post_init(application: Application): """Инициализация после запуска бота.""" # Установка команд бота commands = [ BotCommand("start", "Запустить бота"), BotCommand("menu", "Главное меню с кнопками"), BotCommand("help", "Справка"), BotCommand("settings", "Настройки"), BotCommand("stop", "Прервать SSH-сессию"), ] await application.bot.set_my_commands(commands) logger.info("Бот инициализирован") @check_access async def stop_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка команды /stop - прерывание активной SSH-сессии.""" user_id = update.effective_user.id session = ssh_session_manager.get_session(user_id) if session: ssh_session_manager.close_session(user_id) await update.message.reply_text( "❌ *SSH-сессия прервана*\n\n" f"Команда `{session.command}` была остановлена.", parse_mode="Markdown" ) else: await update.message.reply_text( "ℹ️ *Нет активных SSH-сессий*\n\n" "У вас нет выполняющихся команд.", parse_mode="Markdown" ) def main(): """Точка входа.""" # Чтение токена только из переменной окружения token = os.getenv("TELEGRAM_BOT_TOKEN") if not token: print("❌ Ошибка: не установлен TELEGRAM_BOT_TOKEN") print("\nСпособы установки токена:") print(" 1. Создайте файл .env по примеру .env.example") print(" 2. Или задайте переменную окружения:") print(" export TELEGRAM_BOT_TOKEN='your_token_here'") print("\nИли запустите ./run.sh для интерактивной настройки") sys.exit(1) # Загрузка серверов из env server_manager.load_from_env() # Инициализация меню init_menus() # Создание приложения application = Application.builder().token(token).post_init(post_init).build() # Регистрация хендлеров application.add_handler(CommandHandler("start", start_command)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("settings", settings_command)) application.add_handler(CommandHandler("menu", menu_command)) application.add_handler(CommandHandler("stop", stop_command)) application.add_handler(CallbackQueryHandler(menu_callback)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message)) # Запуск logger.info("Запуск бота...") print(f"🤖 {config.name} запущен!") print(f"📝 Описание: {config.description}") print(f"🎨 Иконка: {config.icon}") print("\nОстановка: Ctrl+C") application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": main()