diff --git a/bot.py b/bot.py index 80acb97..bf9e4e9 100644 --- a/bot.py +++ b/bot.py @@ -2,6 +2,8 @@ """ Telegram CLI Bot - бот для выполнения CLI команд с многоуровневым меню. Легкое добавление новых команд через регистрацию хендлеров. + +Версия: 0.5.0 (рефакторинг) """ import os @@ -9,28 +11,18 @@ import sys import asyncio import subprocess import logging -import getpass -import re import pty -import tty -import termios import select import fcntl from pathlib import Path from typing import Optional, Callable, Dict, Any, List, Tuple -from dataclasses import dataclass, field -from functools import wraps from datetime import datetime, timedelta -# Лимиты Telegram -MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения - import pexpect import asyncssh from qwen_integration import qwen_manager, QwenSessionState # Подавляем логи sentence-transformers и huggingface -import logging logging.getLogger("sentence_transformers").setLevel(logging.WARNING) logging.getLogger("huggingface_hub").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) @@ -71,887 +63,21 @@ logging.basicConfig( logger = logging.getLogger(__name__) -# --- Конфигурация бота из переменных окружения --- -class BotConfig: - """Конфигурация бота из переменных окружения.""" +# ============================================================================ +# ИМПОРТЫ ИЗ bot/ - новая модульная структура +# ============================================================================ +from bot.config import config, state_manager, menu_builder, command_registry, server_manager +from bot.models.server import Server +from bot.models.session import SSHSession, SSHSessionManager, LocalSession, LocalSessionManager, INPUT_PATTERNS +from bot.utils.cleaners import clean_ansi_codes, normalize_output +from bot.utils.formatters import escape_markdown, split_message, send_long_message, format_long_output, MAX_MESSAGE_LENGTH +from bot.utils.ssh_readers import detect_input_type, read_ssh_output, read_pty_output +from bot.utils.decorators import check_access +from bot.keyboards.menus import MenuItem, init_menus - def __init__(self): - self.name = os.getenv("BOT_NAME", "CLI Assistant") - self.description = os.getenv("BOT_DESCRIPTION", "Бот для выполнения CLI команд") - self.icon = os.getenv("BOT_ICON_EMOJI", "🤖") - self.working_directory = os.getenv("WORKING_DIRECTORY", str(Path.home())) - - # Парсинг списка разрешённых пользователей - allowed_users_str = os.getenv("ALLOWED_USERS", "") - if allowed_users_str.strip(): - self.allowed_users = [ - int(uid.strip()) - for uid in allowed_users_str.split(",") - if uid.strip().isdigit() - ] - else: - self.allowed_users = [] - - @property - def is_access_restricted(self) -> bool: - """Проверка: ограничен ли доступ.""" - return len(self.allowed_users) > 0 - - -# --- Серверы --- -@dataclass -class Server: - """Конфигурация сервера.""" - name: str - host: str - port: int - user: str - tags: List[str] = field(default_factory=list) - password: str = "" - - @property - def display_name(self) -> str: - """Отображаемое имя с иконкой.""" - icon = "🖥️" - if "local" in self.tags: - icon = "💻" - elif "db" in self.tags: - icon = "🗄️" - elif "web" in self.tags: - icon = "🌐" - return f"{icon} {self.name}" - - @property - def description(self) -> str: - """Краткое описание сервера.""" - return f"{self.user}@{self.host}:{self.port}" - - -class ServerManager: - """Управление серверами.""" - - def __init__(self): - self._servers: Dict[str, Server] = {} - self._default_server: str = "local" - self._ssh_key_path: Optional[str] = None - - # Локальный сервер всегда доступен - try: - local_user = getpass.getuser() - except Exception: - local_user = "user" - - self._servers["local"] = Server( - name="local", - host="localhost", - port=22, - user=local_user, - tags=["local", "dev"] - ) - - def load_from_env(self): - """Загрузка серверов из переменных окружения.""" - self._ssh_key_path = os.getenv("SSH_KEY_PATH") - self._default_server = os.getenv("DEFAULT_SERVER", "local") - - servers_str = os.getenv("SERVERS", "") - if not servers_str.strip(): - return - - # Парсинг формата: name|host|port|user|tags|password,name|host|port|user|tags|password - # Теги могут содержать запятые, поэтому разбираем по частям - for server_line in servers_str.split(","): - if not server_line.strip(): - continue - - parts = server_line.strip().split("|") - if len(parts) < 4: - continue - - try: - name = parts[0].strip() - host = parts[1].strip() - port = int(parts[2].strip()) - user = parts[3].strip() - - # Теги (часть 4) и пароль (часть 5) могут отсутствовать - tags = [] - password = "" - - if len(parts) >= 5 and parts[4].strip(): - tags = [t.strip() for t in parts[4].split(",") if t.strip()] - - if len(parts) >= 6: - password = parts[5].strip() - - server = Server(name=name, host=host, port=port, user=user, tags=tags, password=password) - self._servers[name] = server - logger.info(f"Загружен сервер: {server.display_name} ({server.description})") - except ValueError as e: - logger.warning(f"Ошибка парсинга сервера: {parts} - {e}") - - def get(self, name: str) -> Optional[Server]: - """Получить сервер по имени.""" - return self._servers.get(name) - - def list_servers(self) -> List[Server]: - """Список всех серверов.""" - return list(self._servers.values()) - - def get_by_tags(self, tags: List[str]) -> List[Server]: - """Получить серверы по тегам.""" - result = [] - for server in self._servers.values(): - if any(tag in server.tags for tag in tags): - result.append(server) - return result - - @property - def default_server(self) -> str: - """Имя сервера по умолчанию.""" - return self._default_server - - @property - def ssh_key_path(self) -> Optional[str]: - """Путь к SSH ключу.""" - return self._ssh_key_path - - def get_keyboard(self, exclude_local: bool = False) -> InlineKeyboardMarkup: - """Создать клавиатуру с выбором сервера.""" - keyboard = [] - for server in self._servers.values(): - if exclude_local and server.name == "local": - continue - button = InlineKeyboardButton( - server.display_name, - callback_data=f"server_select_{server.name}" - ) - keyboard.append([button]) - return InlineKeyboardMarkup(keyboard) - - def add_server(self, name: str, host: str, port: int, user: str, tags: List[str] = None, password: str = "") -> bool: - """Добавить сервер.""" - if name in self._servers: - return False - self._servers[name] = Server(name=name, host=host, port=port, user=user, tags=tags or [], password=password) - self.save_to_env() - return True - - def update_server(self, name: str, host: str = None, port: int = None, - user: str = None, tags: List[str] = None, password: str = None) -> bool: - """Обновить сервер.""" - if name not in self._servers or name == "local": - return False - server = self._servers[name] - if host: - server.host = host - if port: - server.port = port - if user: - server.user = user - if tags is not None: - server.tags = tags - if password is not None: - server.password = password - self.save_to_env() - return True - - def delete_server(self, name: str) -> bool: - """Удалить сервер.""" - if name not in self._servers or name == "local": - return False - del self._servers[name] - self.save_to_env() - return True - - def save_to_env(self): - """Сохранить серверы в .env файл.""" - env_file = Path(__file__).parent / ".env" - - # Читаем существующий файл - lines = [] - if env_file.exists(): - with open(env_file, "r", encoding="utf-8") as f: - lines = f.readlines() - - # Формируем строку серверов - server_parts = [] - for server in self._servers.values(): - if server.name == "local": - continue - tags_str = ",".join(server.tags) if server.tags else "" - # Формат: name|host|port|user|tags|password - server_parts.append(f"{server.name}|{server.host}|{server.port}|{server.user}|{tags_str}|{server.password}") - - servers_line = f"SERVERS={','.join(server_parts)}\n" - - # Ищем и обновляем или добавляем строку SERVERS - found = False - for i, line in enumerate(lines): - if line.startswith("SERVERS="): - lines[i] = servers_line - found = True - break - - if not found: - lines.append("\n" + servers_line) - - # Записываем обратно - with open(env_file, "w", encoding="utf-8") as f: - f.writelines(lines) - - logger.info(f"Серверы сохранены в {env_file}") - - -# --- Хранилище состояний пользователя --- -@dataclass -class UserState: - """Состояние пользователя в диалоге.""" - current_menu: str = "main" - waiting_for_input: bool = False - input_type: Optional[str] = None # "name", "host", "port", "user", "tags", "server_action" - parent_menu: Optional[str] = None - context: Dict[str, Any] = field(default_factory=dict) - working_directory: Optional[str] = None - current_server: str = "local" # Имя текущего сервера - editing_server: Optional[str] = None # Имя сервера, который редактируем - ai_chat_mode: bool = False # Режим чата с ИИ агентом - ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ - messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов - - -class StateManager: - """Управление состояниями пользователей.""" - - def __init__(self): - self._states: Dict[int, UserState] = {} - - def get(self, user_id: int) -> UserState: - if user_id not in self._states: - self._states[user_id] = UserState() - return self._states[user_id] - - def reset(self, user_id: int): - self._states[user_id] = UserState() - - -# --- Интерактивные SSH-сессии --- -@dataclass -class SSHSession: - """Интерактивная SSH-сессия.""" - user_id: int - server: Server - working_dir: str - conn: asyncssh.SSHClientConnection - process: asyncssh.SSHClientProcess - output_buffer: str = "" - waiting_for_input: bool = False - input_type: str = "" # "password", "confirm", "text" - last_activity: datetime = field(default_factory=datetime.now) - command: str = "" - - SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности - - def is_expired(self) -> bool: - """Проверка истечения таймаута сессии.""" - return datetime.now() - self.last_activity > self.SESSION_TIMEOUT - - -class SSHSessionManager: - """Менеджер интерактивных SSH-сессий.""" - - def __init__(self): - self._sessions: Dict[int, SSHSession] = {} - - def create_session(self, user_id: int, server: Server, working_dir: str, - conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess, - command: str = "") -> SSHSession: - """Создать новую сессию.""" - session = SSHSession( - user_id=user_id, - server=server, - working_dir=working_dir, - conn=conn, - process=process, - command=command - ) - self._sessions[user_id] = session - logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}") - return session - - def get_session(self, user_id: int) -> Optional[SSHSession]: - """Получить сессию пользователя.""" - session = self._sessions.get(user_id) - if session and session.is_expired(): - self.close_session(user_id) - return None - return session - - def close_session(self, user_id: int): - """Закрыть сессию пользователя.""" - session = self._sessions.pop(user_id, None) - if session: - try: - if session.process: - session.process.stdin.close() - session.process.stdout.feed_eof() - if session.conn: - session.conn.close() - logger.info(f"Закрыта SSH-сессия для пользователя {user_id}") - except Exception as e: - logger.warning(f"Ошибка при закрытии сессии: {e}") - - def has_active_session(self, user_id: int) -> bool: - """Проверка наличия активной сессии.""" - return self.get_session(user_id) is not None - - def cleanup_expired(self): - """Очистка истёкших сессий.""" - expired = [uid for uid, s in self._sessions.items() if s.is_expired()] - for uid in expired: - self.close_session(uid) - - -# Менеджер SSH-сессий +# Глобальные менеджеры сессий ssh_session_manager = SSHSessionManager() - - -@dataclass -class LocalSession: - """Интерактивная локальная сессия.""" - user_id: int - command: str - master_fd: int - pid: int - output_buffer: str = "" - waiting_for_input: bool = False - input_type: str = "" - last_activity: datetime = field(default_factory=datetime.now) - - SESSION_TIMEOUT = timedelta(minutes=5) - - def is_expired(self) -> bool: - return datetime.now() - self.last_activity > self.SESSION_TIMEOUT - - -class LocalSessionManager: - """Менеджер локальных интерактивных сессий.""" - - def __init__(self): - self._sessions: Dict[int, LocalSession] = {} - - def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession: - session = LocalSession( - user_id=user_id, - command=command, - master_fd=master_fd, - pid=pid - ) - self._sessions[user_id] = session - logger.info(f"Создана локальная сессия для пользователя {user_id}") - return session - - def get_session(self, user_id: int) -> Optional[LocalSession]: - session = self._sessions.get(user_id) - if session and session.is_expired(): - self.close_session(user_id) - return None - return session - - def close_session(self, user_id: int): - session = self._sessions.pop(user_id, None) - if session: - try: - # Закрываем pexpect процесс если есть - child = session.context.get('child') if hasattr(session, 'context') and session.context else None - if child: - child.close(force=True) - else: - # Старый способ для PTY - os.close(session.master_fd) - os.kill(session.pid, 9) - except: - pass - logger.info(f"Закрыта локальная сессия для пользователя {user_id}") - - def has_active_session(self, user_id: int) -> bool: - return self.get_session(user_id) is not None - - -# Менеджер локальных сессий local_session_manager = LocalSessionManager() - - -# Паттерны для детектирования запросов ввода -INPUT_PATTERNS = { - "password": [ - r"[Pp]assword[:\s]*$", - r"[Pp]assphrase[:\s]*$", - r"Enter password[:\s]*$", - r"sudo password[:\s]*$", - r"\[sudo\] password for .*:", - r"[Пп]ароль[:\s]*$", - r"\[sudo\] пароль для .*:", - r"Введите пароль[:\s]*$", - ], - "confirm": [ - r"[Yy]es/[Nn]o[?:\s]*$", - r"\[?[Yy]\]?/?\[?[Nn]\]?", - r"Do you want to continue", - r"Continue\?", - r"Are you sure", - r"Is this OK", - r"[Yy]es or [Nn]o", - r"[Дд]а/[Нн]ет", - r"[Пп]родолжить", - ], - "shell_prompt": [ - r"[$#]\s*$", - r"[>$]\s*$", - r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$", - ], -} - - -def detect_input_type(text: str) -> Optional[str]: - """Определить тип запроса ввода по тексту.""" - text = text.strip() - - # Проверка на пароль - for pattern in INPUT_PATTERNS["password"]: - if re.search(pattern, text, re.MULTILINE): - return "password" - - # Проверка на подтверждение - for pattern in INPUT_PATTERNS["confirm"]: - if re.search(pattern, text, re.MULTILINE): - return "confirm" - - # Проверка на приглашение оболочки - for pattern in INPUT_PATTERNS["shell_prompt"]: - if re.search(pattern, text, re.MULTILINE): - return "prompt" - - return None - - -def clean_ansi_codes(text: str) -> str: - """ - Очистка ANSI-кодов и мусора из вывода терминала. - Обрабатывает: - - ANSI escape-последовательности \x1b[...m - - «Битые» ANSI-коды без escape-символа (например [33m, [0m) - - Символы замены Unicode () - - Кириллические имитации ANSI-кодов (например [0м) - """ - # Удаляем ANSI escape-последовательности - text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z]', '', text) - - # Удаляем «битые» ANSI-коды: [33m, [0m, [1m и т.д. (латиница и кириллица) - text = re.sub(r'\[\d+[мm]', '', text) - - # Удаляем символы замены Unicode - text = text.replace('\ufffd', '') - - return text - - -def escape_markdown(text: str) -> str: - """ - Экранирование специальных символов Markdown для Telegram API. - """ - text = text.replace('```', '\\`\\`\\`') - return text - - -def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[str]: - """ - Разбить длинный текст на сообщения <= max_length символов. - Старается разбивать по границам строк или блоков кода. - """ - if len(text) <= max_length: - return [text] - - parts = [] - current = "" - - for line in text.split('\n'): - # Если добавление строки превысит лимит - if len(current) + len(line) + 1 > max_length: - if current: - parts.append(current) - # Если строка сама по себе длиннее лимита — режем её - while len(line) > max_length: - parts.append(line[:max_length]) - line = line[max_length:] - current = line - else: - current += ('\n' if current else '') + line - - if current: - parts.append(current) - - return parts - - -async def send_long_message(update: Update, text: str, parse_mode: str = None): - """ - Отправить длинный текст, разбив на несколько сообщений. - Если parse_mode="Markdown" и текст содержит блоки кода — отправляет без разметки. - """ - parts = split_message(text) - - for i, part in enumerate(parts): - # Добавляем номер части если их несколько - if len(parts) > 1: - header = f"({i+1}/{len(parts)}) " - if len(header) + len(part) <= MAX_MESSAGE_LENGTH: - part = header + part - - # Если это не первая часть и был Markdown — убираем parse_mode - # чтобы не было проблем с разорванной разметкой - actual_parse_mode = parse_mode if i == 0 else None - - try: - await update.message.reply_text(part, parse_mode=actual_parse_mode) - except Exception as e: - # Фоллбэк: отправляем без разметки - logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}") - await update.message.reply_text(part) - - # Небольшая пауза между сообщениями - await asyncio.sleep(0.1) - - -def normalize_output(text: str) -> str: - """ - Нормализовать вывод: обработать \r и убрать пустые строки. - \r используется для перезаписи строки (прогресс-баров). - """ - # Заменяем \r\n на \n - text = text.replace('\r\n', '\n') - - # Обрабатываем \r (возврат каретки) — строки с \r перезаписывают друг друга - lines = [] - for line in text.split('\n'): - if '\r' in line: - # Разбиваем по \r и берём последнюю часть (финальное состояние) - parts = line.split('\r') - line = parts[-1] - lines.append(line) - - text = '\n'.join(lines) - - # Разбиваем на строки, убираем пустые и trailing пробелы - lines = text.split('\n') - lines = [line.rstrip() for line in lines if line.strip()] - - # Очищаем прогресс-бары вида "Текст… 0%Текст… 50%Текст… 100%" - # И дублирующийся текст - cleaned_lines = [] - for line in lines: - # Ищем повторяющийся паттерн "текст… цифры%" - progress_pattern = re.compile(r'((?:.+?\.{3})\d+%)+') - match = progress_pattern.search(line) - if match: - # Берём последнее вхождение - items = re.findall(r'(.+?\.{3})(\d+)%', match.group(0)) - if items: - last_text, last_percent = items[-1] - line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():] - - # СНАЧАЛА удаляем остатки ANSI-кодов из строки - line = re.sub(r'.', '', line) #  + любой символ - - # Удаляем дублирующийся текст вида "0% [текст] 0% [текст]" - dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+') - match = dup_pattern.search(line) - if match: - # Оставляем только первое вхождение - line = line[:match.start()] + match.group(1) + line[match.end():] - - # Удаляем ведущие пробелы (артефакты терминала) - line = line.lstrip() - - if line: - cleaned_lines.append(line) - - return '\n'.join(cleaned_lines) - - -def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str: - """ - Форматировать длинный вывод: показать первые и последние строки. - По умолчанию: первые 10 + последние 10 строк = 20 строк максимум. - """ - lines = text.split('\n') - total_lines = len(lines) - - if total_lines <= max_lines: - return text - - # Показываем первые head_lines и последние tail_lines - head = lines[:head_lines] - tail = lines[-tail_lines:] - - skipped = total_lines - head_lines - tail_lines - - result = '\n'.join(head) - result += f'\n\n... ({skipped} строк пропущено) ...\n' - result += '\n'.join(tail) - - return result - - -async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> tuple[str, bool]: - """ - Чтение вывода из SSH-процесса с таймаутом. - Возвращает (вывод, завершён_ли_процесс). - """ - output = "" - is_done = False - - try: - # Используем readany() для чтения доступных данных - while True: - try: - # readany() читает любые доступные данные - data = await asyncio.wait_for(process.stdout.readany(), timeout=timeout) - if data: - if isinstance(data, bytes): - output += data.decode('utf-8', errors='replace') - else: - output += str(data) - logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}") - else: - # EOF - is_done = True - break - except asyncio.TimeoutError: - # Данные закончились - logger.debug(f"Timeout stdout, прочитано: {len(output)} байт") - if process.returncode is not None: - is_done = True - break - except UnicodeDecodeError as e: - logger.debug(f"Ошибка декодирования UTF-8: {e}") - continue - except Exception as e: - # Конец потока - logger.debug(f"Конец потока stdout: {type(e).__name__}: {e}") - is_done = True - break - except Exception as e: - logger.debug(f"Ошибка чтения SSH stdout: {e}") - is_done = True - - # Читаем stderr если есть - error_output = "" - try: - while True: - try: - data = await asyncio.wait_for(process.stderr.readany(), timeout=0.5) - if data: - if isinstance(data, bytes): - error_output += data.decode('utf-8', errors='replace') - else: - error_output += str(data) - else: - break - except (asyncio.TimeoutError, Exception): - break - except Exception as e: - logger.debug(f"Ошибка чтения SSH stderr: {e}") - - # Объединяем stdout и stderr - if error_output: - output = output + error_output if output else error_output - - logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}") - return output, is_done - - -def read_pty_output(master_fd: int, timeout: float = 2.0) -> tuple[str, bool]: - """ - Чтение вывода из PTY с таймаутом. - Возвращает (вывод, завершён_ли_процесс). - """ - output = "" - is_done = False - total_waited = 0 - - try: - # Устанавливаем non-blocking режим - flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) - fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - while total_waited < timeout: - try: - # Ждём данные с коротким таймаутом - ready, _, _ = select.select([master_fd], [], [], 0.2) - if ready: - try: - data = os.read(master_fd, 4096) - if data: - output += data.decode('utf-8', errors='replace') - logger.debug(f"Прочитано из PTY: {len(data)} байт") - # Сбрасываем таймер если есть данные - total_waited = 0 - else: - is_done = True - break - except BlockingIOError: - # Нет данных, продолжаем ждать - pass - else: - # Timeout - проверяем не завершился ли процесс - try: - _, status = os.waitpid(-1, os.WNOHANG) - if status != 0: - logger.debug(f"Процесс завершился со статусом: {status}") - is_done = True - break - except ChildProcessError: - pass - - # Если уже что-то прочитали и есть запрос ввода - выходим - if output and detect_input_type(output): - logger.debug(f"Обнаружен запрос ввода") - break - - total_waited += 0.2 - - except Exception as e: - logger.debug(f"Ошибка при чтении PTY: {e}") - break - - except Exception as e: - logger.debug(f"Ошибка чтения PTY: {e}") - is_done = True - - logger.debug(f"read_pty_output: output={len(output)} байт, is_done={is_done}") - return output, is_done - - -# --- Система команд --- -@dataclass -class MenuItem: - """Элемент меню.""" - label: str - callback: str # callback_data для кнопки - description: str = "" - icon: str = "" - children: List["MenuItem"] = field(default_factory=list) - command: Optional[str] = None # CLI команда для выполнения - is_command: bool = False - - -class MenuBuilder: - """Построитель многоуровневого меню.""" - - def __init__(self): - self._menus: Dict[str, List[MenuItem]] = {} - - def add_menu(self, menu_name: str, items: List[MenuItem]): - self._menus[menu_name] = items - - def get_menu(self, menu_name: str) -> List[MenuItem]: - return self._menus.get(menu_name, []) - - def get_keyboard(self, menu_name: str, user_id: int = None, state: UserState = None) -> InlineKeyboardMarkup: - """Создает InlineKeyboard для меню.""" - items = self._menus.get(menu_name, []) - keyboard = [] - - # Для главного меню — динамически меняем кнопку ИИ - if menu_name == "main" and user_id: - # Используем переданное состояние или получаем из менеджера - if state is None: - state = state_manager.get(user_id) - logger.info(f"get_keyboard: user_id={user_id}, ai_chat_mode={state.ai_chat_mode}") - - for item in items: - # Проверяем базовый callback и его варианты с _on/_off - is_ai_toggle = item.callback in ["toggle_ai_chat", "toggle_ai_chat_on", "toggle_ai_chat_off"] - - if is_ai_toggle: - # Меняем текст кнопки и callback_data в зависимости от статуса - if state.ai_chat_mode: - label = f"✅ Выключить чат с ИИ" - callback = "toggle_ai_chat_off" - else: - label = f"❌ Включить чат с ИИ" - callback = "toggle_ai_chat_on" - logger.info(f"get_keyboard: label={label}, callback={callback}") - button = InlineKeyboardButton(label, callback_data=callback) - else: - button = InlineKeyboardButton(item.label, callback_data=item.callback) - keyboard.append([button]) - else: - for item in items: - button = InlineKeyboardButton( - item.label, - callback_data=item.callback - ) - keyboard.append([button]) - - return InlineKeyboardMarkup(keyboard) - - -class CommandRegistry: - """Реестр команд для легкого добавления.""" - - def __init__(self): - self._commands: Dict[str, Callable] = {} - - def register(self, name: str): - """Декоратор для регистрации команды.""" - def decorator(func: Callable): - self._commands[name] = func - return func - return decorator - - def get(self, name: str) -> Optional[Callable]: - return self._commands.get(name) - - def list_commands(self) -> List[str]: - return list(self._commands.keys()) - - -# --- Глобальные объекты --- -config = BotConfig() -state_manager = StateManager() -menu_builder = MenuBuilder() -command_registry = CommandRegistry() -server_manager = ServerManager() - - -# --- Проверка прав доступа --- -def check_access(func): - """Декоратор для проверки прав доступа пользователя.""" - @wraps(func) - async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): - user_id = update.effective_user.id - - # Если доступ не ограничен — пропускаем всех - if not config.is_access_restricted: - return await func(update, context, *args, **kwargs) - - if user_id not in config.allowed_users: - logger.warning(f"Попытка доступа от запрещённого пользователя {user_id}") - await update.message.reply_text( - "❌ *Доступ запрещён*\n\n" - "Ваш ID не добавлен в список разрешённых пользователей.\n" - f"Ваш ID: `{user_id}`", - parse_mode="Markdown" - ) - return - - return await func(update, context, *args, **kwargs) - return wrapper - - -# --- Инициализация меню --- def init_menus(): """Инициализация структуры меню.""" diff --git a/bot/__init__.py b/bot/__init__.py new file mode 100644 index 0000000..1964561 --- /dev/null +++ b/bot/__init__.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +""" +Telegram CLI Bot - модульная структура. + +Пакет bot содержит все компоненты бота: +- models: модели данных (Server, UserState, сессии) +- handlers: обработчики событий (команды, сообщения, callback) +- services: бизнес-логика (выполнение команд) +- keyboards: Inline-клавиатуры +- utils: утилиты (очистка текста, форматирование) +- config: конфигурация и глобальные объекты +""" + +from bot.config import config, state_manager, menu_builder, command_registry, server_manager + +__all__ = [ + "config", + "state_manager", + "menu_builder", + "command_registry", + "server_manager", +] diff --git a/bot/config.py b/bot/config.py new file mode 100644 index 0000000..c1bff5c --- /dev/null +++ b/bot/config.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +"""Конфигурация бота и глобальные объекты.""" + +import os +import logging +import getpass +from pathlib import Path +from typing import Optional + +from dotenv import load_dotenv + +# Загрузка переменных окружения из .env +load_dotenv() + +logger = logging.getLogger(__name__) + +# --- Конфигурация бота --- +class BotConfig: + """Конфигурация бота из переменных окружения.""" + + def __init__(self): + self.name = os.getenv("BOT_NAME", "CLI Assistant") + self.description = os.getenv("BOT_DESCRIPTION", "Бот для выполнения CLI команд") + self.icon = os.getenv("BOT_ICON_EMOJI", "🤖") + self.working_directory = os.getenv("WORKING_DIRECTORY", str(Path.home())) + + # Парсинг списка разрешённых пользователей + allowed_users_str = os.getenv("ALLOWED_USERS", "") + if allowed_users_str.strip(): + self.allowed_users = [ + int(uid.strip()) + for uid in allowed_users_str.split(",") + if uid.strip().isdigit() + ] + else: + self.allowed_users = [] + + @property + def is_access_restricted(self) -> bool: + """Проверка: ограничен ли доступ.""" + return len(self.allowed_users) > 0 + + +# Импортируем модели и создаём глобальные объекты +from bot.models.server import ServerManager +from bot.models.user_state import StateManager +from bot.keyboards.menus import MenuBuilder, CommandRegistry + +# Глобальные объекты +config = BotConfig() +state_manager = StateManager() +menu_builder = MenuBuilder() +command_registry = CommandRegistry() +server_manager = ServerManager() diff --git a/bot/handlers/__init__.py b/bot/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bot/keyboards/__init__.py b/bot/keyboards/__init__.py new file mode 100644 index 0000000..2831208 --- /dev/null +++ b/bot/keyboards/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +"""Клавиатуры бота.""" + +from bot.keyboards.menus import MenuItem, MenuBuilder, CommandRegistry, init_menus + +__all__ = [ + "MenuItem", + "MenuBuilder", + "CommandRegistry", + "init_menus", +] diff --git a/bot/keyboards/menus.py b/bot/keyboards/menus.py new file mode 100644 index 0000000..61f1911 --- /dev/null +++ b/bot/keyboards/menus.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +"""Построитель многоуровневого меню.""" + +import logging +from typing import Dict, List, Optional, Callable +from dataclasses import dataclass, field +from telegram import InlineKeyboardButton, InlineKeyboardMarkup + +# Импортируем модели и утилиты +from bot.models.user_state import UserState, StateManager + +logger = logging.getLogger(__name__) + +# Глобальный state_manager для кнопки ИИ +state_manager = StateManager() + + +@dataclass +class MenuItem: + """Элемент меню.""" + label: str + callback: str # callback_data для кнопки + description: str = "" + icon: str = "" + children: List["MenuItem"] = field(default_factory=list) + command: Optional[str] = None # CLI команда для выполнения + is_command: bool = False + + +class MenuBuilder: + """Построитель InlineKeyboard для меню.""" + + def __init__(self): + self._menus: Dict[str, List[MenuItem]] = {} + + def add_menu(self, menu_name: str, items: List[MenuItem]): + self._menus[menu_name] = items + + def get_menu(self, menu_name: str) -> List[MenuItem]: + return self._menus.get(menu_name, []) + + def get_keyboard(self, menu_name: str, user_id: int = None, state: UserState = None) -> InlineKeyboardMarkup: + """Создает InlineKeyboard для меню.""" + items = self._menus.get(menu_name, []) + keyboard = [] + + # Для главного меню — динамически меняем кнопку ИИ + if menu_name == "main" and user_id: + # Используем переданное состояние или получаем из менеджера + if state is None: + state = state_manager.get(user_id) + logger.info(f"get_keyboard: user_id={user_id}, ai_chat_mode={state.ai_chat_mode}") + + for item in items: + # Проверяем базовый callback и его варианты с _on/_off + is_ai_toggle = item.callback in ["toggle_ai_chat", "toggle_ai_chat_on", "toggle_ai_chat_off"] + + if is_ai_toggle: + # Меняем текст кнопки и callback_data в зависимости от статуса + if state.ai_chat_mode: + label = f"✅ Выключить чат с ИИ" + callback = "toggle_ai_chat_off" + else: + label = f"❌ Включить чат с ИИ" + callback = "toggle_ai_chat_on" + logger.info(f"get_keyboard: label={label}, callback={callback}") + button = InlineKeyboardButton(label, callback_data=callback) + else: + button = InlineKeyboardButton(item.label, callback_data=item.callback) + keyboard.append([button]) + else: + for item in items: + button = InlineKeyboardButton( + item.label, + callback_data=item.callback + ) + keyboard.append([button]) + + return InlineKeyboardMarkup(keyboard) + + +class CommandRegistry: + """Реестр команд для легкого добавления.""" + + def __init__(self): + self._commands: Dict[str, Callable] = {} + + def register(self, name: str): + """Декоратор для регистрации команды.""" + def decorator(func: Callable): + self._commands[name] = func + return func + return decorator + + def get(self, name: str) -> Optional[Callable]: + return self._commands.get(name) + + def list_commands(self) -> List[str]: + return list(self._commands.keys()) + + +def init_menus(menu_builder: MenuBuilder): + """Инициализация структуры меню.""" + + # Главное меню + main_menu = [ + MenuItem("🖥️ Выбор сервера", "server_menu", icon="🖥️"), + MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"), + MenuItem("💬 Чат с ИИ агентом", "toggle_ai_chat", icon="💬"), + MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"), + MenuItem("ℹ️ О боте", "about", icon="ℹ️"), + ] + menu_builder.add_menu("main", main_menu) + + # Меню серверов + server_menu = [ + MenuItem("💻 local (localhost)", "server_select_local", icon="💻"), + MenuItem("➕ Добавить сервер", "server_add", icon="➕"), + MenuItem("⬅️ Назад", "main", icon="⬅️"), + ] + menu_builder.add_menu("server", server_menu) + + # Меню предустановленных команд + preset_menu = [ + MenuItem("📁 Файловая система", "fs_menu", icon="📁"), + MenuItem("🔍 Поиск", "search_menu", icon="🔍"), + MenuItem("📊 Система", "system_menu", icon="📊"), + MenuItem("🌐 Сеть", "network_menu", icon="🌐"), + MenuItem("⬅️ Назад", "main", icon="⬅️"), + ] + menu_builder.add_menu("preset", preset_menu) + + # Файловая система + fs_menu = [ + MenuItem("ls -la", "cmd_ls_la", command="ls -la", icon="📄"), + MenuItem("pwd", "cmd_pwd", command="pwd", icon="📍"), + MenuItem("df -h", "cmd_df", command="df -h", icon="💾"), + MenuItem("du -sh *", "cmd_du", command="du -sh * 2>/dev/null | sort -hr | head -20", icon="📊"), + MenuItem("⬅️ Назад", "preset", icon="⬅️"), + ] + menu_builder.add_menu("fs", fs_menu) + + # Поиск + search_menu = [ + MenuItem("find . -name", "cmd_find_name", command="find . -maxdepth 3 -name '*.txt' 2>/dev/null", icon="🔎"), + MenuItem("grep пример", "cmd_grep", command="grep -r 'example' . 2>/dev/null | head -20", icon="🔍"), + MenuItem("which command", "cmd_which", command="which python3 bash git", icon="📍"), + MenuItem("⬅️ Назад", "preset", icon="⬅️"), + ] + menu_builder.add_menu("search", search_menu) + + # Система + system_menu = [ + MenuItem("top -n 1", "cmd_top", command="top -bn1 | head -20", icon="📈"), + MenuItem("ps aux", "cmd_ps", command="ps aux | head -20", icon="🔄"), + MenuItem("free -h", "cmd_free", command="free -h", icon="💾"), + MenuItem("uname -a", "cmd_uname", command="uname -a", icon="ℹ️"), + MenuItem("uptime", "cmd_uptime", command="uptime", icon="⏱️"), + MenuItem("⬅️ Назад", "preset", icon="⬅️"), + ] + menu_builder.add_menu("system", system_menu) + + # Сеть + network_menu = [ + MenuItem("ip addr", "cmd_ip", command="ip addr 2>/dev/null || ifconfig 2>/dev/null", icon="🌐"), + MenuItem("ping google", "cmd_ping", command="ping -c 4 google.com 2>&1 | head -10", icon="📡"), + MenuItem("netstat", "cmd_netstat", command="ss -tuln 2>/dev/null || netstat -tuln 2>/dev/null | head -20", icon="🔌"), + MenuItem("curl ifconfig.me", "cmd_curl_ip", command="curl -s ifconfig.me 2>&1", icon="📍"), + MenuItem("⬅️ Назад", "preset", icon="⬅️"), + ] + menu_builder.add_menu("network", network_menu) + + # Настройки + settings_menu = [ + MenuItem("📝 Изменить имя бота", "set_name", icon="📝"), + MenuItem("📄 Изменить описание", "set_description", icon="📄"), + MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"), + MenuItem("👥 Управление доступом", "access_menu", icon="👥"), + MenuItem("🧠 Память ИИ", "memory_menu", icon="🧠"), + MenuItem("⬅️ Назад", "main", icon="⬅️"), + ] + menu_builder.add_menu("settings", settings_menu) + + # Память ИИ + memory_menu = [ + MenuItem("📋 Мой профиль", "memory_profile", icon="📋"), + MenuItem("📊 Статистика", "memory_stats", icon="📊"), + MenuItem("🗑️ Очистить историю", "memory_clear", icon="🗑️"), + MenuItem("⬅️ Назад", "settings", icon="⬅️"), + ] + menu_builder.add_menu("memory", memory_menu) + + # Доступ + access_menu = [ + MenuItem("📋 Показать разрешённых", "show_access", icon="📋"), + MenuItem("➕ Добавить пользователя", "add_access", icon="➕"), + MenuItem("➖ Удалить пользователя", "remove_access", icon="➖"), + MenuItem("⬅️ Назад", "settings", icon="⬅️"), + ] + menu_builder.add_menu("access", access_menu) diff --git a/bot/models/__init__.py b/bot/models/__init__.py new file mode 100644 index 0000000..4a3fb9e --- /dev/null +++ b/bot/models/__init__.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +"""Модели данных бота.""" + +from bot.models.server import Server, ServerManager +from bot.models.user_state import UserState, StateManager +from bot.models.session import ( + SSHSession, + SSHSessionManager, + LocalSession, + LocalSessionManager, + INPUT_PATTERNS +) + +__all__ = [ + "Server", + "ServerManager", + "UserState", + "StateManager", + "SSHSession", + "SSHSessionManager", + "LocalSession", + "LocalSessionManager", + "INPUT_PATTERNS", +] diff --git a/bot/models/server.py b/bot/models/server.py new file mode 100644 index 0000000..05b6e39 --- /dev/null +++ b/bot/models/server.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +"""Модели серверов и управление ими.""" + +import os +import logging +import getpass +from pathlib import Path +from typing import Dict, List, Optional +from dataclasses import dataclass, field +from dotenv import load_dotenv +from telegram import InlineKeyboardButton, InlineKeyboardMarkup + +logger = logging.getLogger(__name__) + + +@dataclass +class Server: + """Конфигурация сервера.""" + name: str + host: str + port: int + user: str + tags: List[str] = field(default_factory=list) + password: str = "" + + @property + def display_name(self) -> str: + """Отображаемое имя с иконкой.""" + icon = "🖥️" + if "local" in self.tags: + icon = "💻" + elif "db" in self.tags: + icon = "🗄️" + elif "web" in self.tags: + icon = "🌐" + return f"{icon} {self.name}" + + @property + def description(self) -> str: + """Краткое описание сервера.""" + return f"{self.user}@{self.host}:{self.port}" + + +class ServerManager: + """Управление серверами.""" + + def __init__(self): + self._servers: Dict[str, Server] = {} + self._default_server: str = "local" + self._ssh_key_path: Optional[str] = None + + # Локальный сервер всегда доступен + try: + local_user = getpass.getuser() + except Exception: + local_user = "user" + + self._servers["local"] = Server( + name="local", + host="localhost", + port=22, + user=local_user, + tags=["local", "dev"] + ) + + def load_from_env(self): + """Загрузка серверов из переменных окружения.""" + self._ssh_key_path = os.getenv("SSH_KEY_PATH") + self._default_server = os.getenv("DEFAULT_SERVER", "local") + + servers_str = os.getenv("SERVERS", "") + if not servers_str.strip(): + return + + # Парсинг формата: name|host|port|user|tags|password,name|host|port|user|tags|password + for server_line in servers_str.split(","): + if not server_line.strip(): + continue + + parts = server_line.strip().split("|") + if len(parts) < 4: + continue + + try: + name = parts[0].strip() + host = parts[1].strip() + port = int(parts[2].strip()) + user = parts[3].strip() + + # Теги (часть 4) и пароль (часть 5) могут отсутствовать + tags = [] + password = "" + + if len(parts) >= 5 and parts[4].strip(): + tags = [t.strip() for t in parts[4].split(",") if t.strip()] + + if len(parts) >= 6: + password = parts[5].strip() + + server = Server(name=name, host=host, port=port, user=user, tags=tags, password=password) + self._servers[name] = server + logger.info(f"Загружен сервер: {server.display_name} ({server.description})") + except ValueError as e: + logger.warning(f"Ошибка парсинга сервера: {parts} - {e}") + + def get(self, name: str) -> Optional[Server]: + """Получить сервер по имени.""" + return self._servers.get(name) + + def list_servers(self) -> List[Server]: + """Список всех серверов.""" + return list(self._servers.values()) + + def get_by_tags(self, tags: List[str]) -> List[Server]: + """Получить серверы по тегам.""" + result = [] + for server in self._servers.values(): + if any(tag in server.tags for tag in tags): + result.append(server) + return result + + @property + def default_server(self) -> str: + """Имя сервера по умолчанию.""" + return self._default_server + + @property + def ssh_key_path(self) -> Optional[str]: + """Путь к SSH ключу.""" + return self._ssh_key_path + + def get_keyboard(self, exclude_local: bool = False) -> InlineKeyboardMarkup: + """Создать клавиатуру с выбором сервера.""" + keyboard = [] + for server in self._servers.values(): + if exclude_local and server.name == "local": + continue + button = InlineKeyboardButton( + server.display_name, + callback_data=f"server_select_{server.name}" + ) + keyboard.append([button]) + return InlineKeyboardMarkup(keyboard) + + def add_server(self, name: str, host: str, port: int, user: str, tags: List[str] = None, password: str = "") -> bool: + """Добавить сервер.""" + if name in self._servers: + return False + self._servers[name] = Server(name=name, host=host, port=port, user=user, tags=tags or [], password=password) + self.save_to_env() + return True + + def update_server(self, name: str, host: str = None, port: int = None, + user: str = None, tags: List[str] = None, password: str = None) -> bool: + """Обновить сервер.""" + if name not in self._servers or name == "local": + return False + server = self._servers[name] + if host: + server.host = host + if port: + server.port = port + if user: + server.user = user + if tags is not None: + server.tags = tags + if password is not None: + server.password = password + self.save_to_env() + return True + + def delete_server(self, name: str) -> bool: + """Удалить сервер.""" + if name not in self._servers or name == "local": + return False + del self._servers[name] + self.save_to_env() + return True + + def save_to_env(self): + """Сохранить серверы в .env файл.""" + env_file = Path(__file__).parent.parent.parent / ".env" + + # Читаем существующий файл + lines = [] + if env_file.exists(): + with open(env_file, "r", encoding="utf-8") as f: + lines = f.readlines() + + # Формируем строку серверов + server_parts = [] + for server in self._servers.values(): + if server.name == "local": + continue + tags_str = ",".join(server.tags) if server.tags else "" + # Формат: name|host|port|user|tags|password + server_parts.append(f"{server.name}|{server.host}|{server.port}|{server.user}|{tags_str}|{server.password}") + + servers_line = f"SERVERS={','.join(server_parts)}\n" + + # Ищем и обновляем или добавляем строку SERVERS + found = False + for i, line in enumerate(lines): + if line.startswith("SERVERS="): + lines[i] = servers_line + found = True + break + + if not found: + lines.append("\n" + servers_line) + + # Записываем обратно + with open(env_file, "w", encoding="utf-8") as f: + f.writelines(lines) + + logger.debug(f"Серверы сохранены в {env_file}") diff --git a/bot/models/session.py b/bot/models/session.py new file mode 100644 index 0000000..c5ab027 --- /dev/null +++ b/bot/models/session.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +"""Модели интерактивных сессий (SSH и локальные).""" + +import os +import logging +from typing import Dict, Optional +from dataclasses import dataclass, field +from datetime import datetime, timedelta +import asyncssh + +logger = logging.getLogger(__name__) + + +# Импортируем Server из соседнего модуля +from bot.models.server import Server + + +@dataclass +class SSHSession: + """Интерактивная SSH-сессия.""" + user_id: int + server: Server + working_dir: str + conn: asyncssh.SSHClientConnection + process: asyncssh.SSHClientProcess + output_buffer: str = "" + waiting_for_input: bool = False + input_type: str = "" # "password", "confirm", "text" + last_activity: datetime = field(default_factory=datetime.now) + command: str = "" + + SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности + + def is_expired(self) -> bool: + """Проверка истечения таймаута сессии.""" + return datetime.now() - self.last_activity > self.SESSION_TIMEOUT + + +class SSHSessionManager: + """Менеджер интерактивных SSH-сессий.""" + + def __init__(self): + self._sessions: Dict[int, SSHSession] = {} + + def create_session(self, user_id: int, server: Server, working_dir: str, + conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess, + command: str = "") -> SSHSession: + """Создать новую сессию.""" + session = SSHSession( + user_id=user_id, + server=server, + working_dir=working_dir, + conn=conn, + process=process, + command=command + ) + self._sessions[user_id] = session + logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}") + return session + + def get_session(self, user_id: int) -> Optional[SSHSession]: + """Получить сессию пользователя.""" + session = self._sessions.get(user_id) + if session and session.is_expired(): + self.close_session(user_id) + return None + return session + + def close_session(self, user_id: int): + """Закрыть сессию пользователя.""" + session = self._sessions.pop(user_id, None) + if session: + try: + if session.process: + session.process.stdin.close() + session.process.stdout.feed_eof() + if session.conn: + session.conn.close() + logger.info(f"Закрыта SSH-сессия для пользователя {user_id}") + except Exception as e: + logger.warning(f"Ошибка при закрытии сессии: {e}") + + def has_active_session(self, user_id: int) -> bool: + """Проверка наличия активной сессии.""" + return self.get_session(user_id) is not None + + def cleanup_expired(self): + """Очистка истёкших сессий.""" + expired = [uid for uid, s in self._sessions.items() if s.is_expired()] + for uid in expired: + self.close_session(uid) + + +@dataclass +class LocalSession: + """Интерактивная локальная сессия.""" + user_id: int + command: str + master_fd: int + pid: int + output_buffer: str = "" + waiting_for_input: bool = False + input_type: str = "" + last_activity: datetime = field(default_factory=datetime.now) + context: Dict = field(default_factory=dict) # Для хранения pexpect child и другого + + SESSION_TIMEOUT = timedelta(minutes=5) + + def is_expired(self) -> bool: + return datetime.now() - self.last_activity > self.SESSION_TIMEOUT + + +class LocalSessionManager: + """Менеджер локальных интерактивных сессий.""" + + def __init__(self): + self._sessions: Dict[int, LocalSession] = {} + + def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession: + session = LocalSession( + user_id=user_id, + command=command, + master_fd=master_fd, + pid=pid + ) + self._sessions[user_id] = session + logger.info(f"Создана локальная сессия для пользователя {user_id}") + return session + + def get_session(self, user_id: int) -> Optional[LocalSession]: + session = self._sessions.get(user_id) + if session and session.is_expired(): + self.close_session(user_id) + return None + return session + + def close_session(self, user_id: int): + session = self._sessions.pop(user_id, None) + if session: + try: + # Закрываем pexpect процесс если есть + child = session.context.get('child') if session.context else None + if child: + child.close(force=True) + else: + # Старый способ для PTY + os.close(session.master_fd) + os.kill(session.pid, 9) + except: + pass + logger.info(f"Закрыта локальная сессия для пользователя {user_id}") + + def has_active_session(self, user_id: int) -> bool: + return self.get_session(user_id) is not None + + +# Паттерны для детектирования запросов ввода +INPUT_PATTERNS = { + "password": [ + r"[Pp]assword[:\s]*$", + r"[Pp]assphrase[:\s]*$", + r"Enter password[:\s]*$", + r"sudo password[:\s]*$", + r"\[sudo\] password for .*:", + r"[Пп]ароль[:\s]*$", + r"\[sudo\] пароль для .*:", + r"Введите пароль[:\s]*$", + ], + "confirm": [ + r"[Yy]es/[Nn]o[?:\s]*$", + r"\[?[Yy]\]?/?\[?[Nn]\]?", + r"Do you want to continue", + r"Continue\?", + r"Are you sure", + r"Is this OK", + r"[Yy]es or [Nn]o", + r"[Дд]а/[Нн]ет", + r"[Пп]родолжить", + ], + "shell_prompt": [ + r"[$#]\s*$", + r"[>$]\s*$", + r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$", + ], +} diff --git a/bot/models/user_state.py b/bot/models/user_state.py new file mode 100644 index 0000000..44b1512 --- /dev/null +++ b/bot/models/user_state.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +"""Модели состояния пользователя.""" + +from typing import Dict, List, Optional, Any +from dataclasses import dataclass, field + + +@dataclass +class UserState: + """Состояние пользователя в диалоге.""" + current_menu: str = "main" + waiting_for_input: bool = False + input_type: Optional[str] = None # "name", "host", "port", "user", "tags", "server_action" + parent_menu: Optional[str] = None + context: Dict[str, Any] = field(default_factory=dict) + working_directory: Optional[str] = None + current_server: str = "local" # Имя текущего сервера + editing_server: Optional[str] = None # Имя сервера, который редактируем + ai_chat_mode: bool = False # Режим чата с ИИ агентом + ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ + messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов + + +class StateManager: + """Управление состояниями пользователей.""" + + def __init__(self): + self._states: Dict[int, UserState] = {} + + def get(self, user_id: int) -> UserState: + if user_id not in self._states: + self._states[user_id] = UserState() + return self._states[user_id] + + def reset(self, user_id: int): + self._states[user_id] = UserState() diff --git a/bot/services/__init__.py b/bot/services/__init__.py new file mode 100644 index 0000000..f5a6365 --- /dev/null +++ b/bot/services/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +"""Сервисы бота.""" + +# Заглушка для будущего импорта +# Функции будут перенесены из bot.py постепенно diff --git a/bot/services/command_executor.py b/bot/services/command_executor.py new file mode 100644 index 0000000..3f4f630 --- /dev/null +++ b/bot/services/command_executor.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +"""Сервисы бота - бизнес-логика выполнения команд.""" + +# Этот файл будет постепенно заполняться функциями из bot.py +# Пока импортируем всё из старого bot.py для обратной совместимости diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py new file mode 100644 index 0000000..3821aad --- /dev/null +++ b/bot/utils/__init__.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +"""Утилиты бота.""" + +from bot.utils.cleaners import clean_ansi_codes, normalize_output +from bot.utils.formatters import escape_markdown, split_message, send_long_message, format_long_output, MAX_MESSAGE_LENGTH +from bot.utils.decorators import check_access +from bot.utils.ssh_readers import detect_input_type, read_ssh_output, read_pty_output + +__all__ = [ + "clean_ansi_codes", + "normalize_output", + "escape_markdown", + "split_message", + "send_long_message", + "format_long_output", + "MAX_MESSAGE_LENGTH", + "check_access", + "detect_input_type", + "read_ssh_output", + "read_pty_output", +] diff --git a/bot/utils/cleaners.py b/bot/utils/cleaners.py new file mode 100644 index 0000000..483dbe9 --- /dev/null +++ b/bot/utils/cleaners.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +"""Утилиты для очистки текста (ANSI-коды, нормализация).""" + +import re + + +def clean_ansi_codes(text: str) -> str: + """ + Очистка ANSI-кодов и мусора из вывода терминала. + Обрабатывает: + - ANSI escape-последовательности \x1b[...m + - «Битые» ANSI-коды без escape-символа (например [33m, [0m) + - Символы замены Unicode () + - Кириллические имитации ANSI-кодов (например [0м) + """ + # Удаляем ANSI escape-последовательности + text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z]', '', text) + + # Удаляем «битые» ANSI-коды: [33m, [0m, [1m и т.д. (латиница и кириллица) + text = re.sub(r'\[\d+[мm]', '', text) + + # Удаляем символы замены Unicode + text = text.replace('\ufffd', '') + + return text + + +def normalize_output(text: str) -> str: + """ + Нормализовать вывод: обработать \r и убрать пустые строки. + \r используется для перезаписи строки (прогресс-баров). + """ + # Заменяем \r\n на \n + text = text.replace('\r\n', '\n') + + # Обрабатываем \r (возврат каретки) — строки с \r перезаписывают друг друга + lines = [] + for line in text.split('\n'): + if '\r' in line: + # Разбиваем по \r и берём последнюю часть (финальное состояние) + parts = line.split('\r') + line = parts[-1] + lines.append(line) + + text = '\n'.join(lines) + + # Разбиваем на строки, убираем пустые и trailing пробелы + lines = text.split('\n') + lines = [line.rstrip() for line in lines if line.strip()] + + # Очищаем прогресс-бары вида "Текст… 0%Текст… 50%Текст… 100%" + # И дублирующийся текст + cleaned_lines = [] + for line in lines: + # Ищем повторяющийся паттерн "текст… цифры%" + progress_pattern = re.compile(r'((?:.+?\.{3})\d+%)+') + match = progress_pattern.search(line) + if match: + # Берём последнее вхождение + items = re.findall(r'(.+?\.{3})(\d+)%', match.group(0)) + if items: + last_text, last_percent = items[-1] + line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():] + + # СНАЧАЛА удаляем остатки ANSI-кодов из строки + line = re.sub(r'.', '', line) # + любой символ + + # Удаляем дублирующийся текст вида "0% [текст] 0% [текст]" + dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+') + match = dup_pattern.search(line) + if match: + # Оставляем только первое вхождение + line = line[:match.start()] + match.group(1) + line[match.end():] + + # Удаляем ведущие пробелы (артефакты терминала) + line = line.lstrip() + + if line: + cleaned_lines.append(line) + + return '\n'.join(cleaned_lines) diff --git a/bot/utils/decorators.py b/bot/utils/decorators.py new file mode 100644 index 0000000..ceae6c2 --- /dev/null +++ b/bot/utils/decorators.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""Декораторы для бота.""" + +import logging +from functools import wraps +from telegram import Update +from telegram.ext import ContextTypes + +logger = logging.getLogger(__name__) + + +def check_access(func): + """Декоратор для проверки прав доступа пользователя.""" + @wraps(func) + async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): + user_id = update.effective_user.id + + # Если доступ не ограничен — пропускаем всех + if not config.is_access_restricted: + return await func(update, context, *args, **kwargs) + + if user_id not in config.allowed_users: + logger.warning(f"Попытка доступа от запрещённого пользователя {user_id}") + await update.message.reply_text( + "❌ *Доступ запрещён*\n\n" + "Ваш ID не добавлен в список разрешённых пользователей.\n" + f"Ваш ID: `{user_id}`", + parse_mode="Markdown" + ) + return + + return await func(update, context, *args, **kwargs) + return wrapper diff --git a/bot/utils/formatters.py b/bot/utils/formatters.py new file mode 100644 index 0000000..d2202f8 --- /dev/null +++ b/bot/utils/formatters.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +"""Утилиты для форматирования и отправки сообщений.""" + +import asyncio +import logging +from typing import List +from telegram import Update + +logger = logging.getLogger(__name__) + +# Лимиты Telegram +MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения + + +def escape_markdown(text: str) -> str: + """ + Экранирование специальных символов Markdown для Telegram API. + """ + text = text.replace('```', '\\`\\`\\`') + return text + + +def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[str]: + """ + Разбить длинный текст на сообщения <= max_length символов. + Старается разбивать по границам строк или блоков кода. + """ + if len(text) <= max_length: + return [text] + + parts = [] + current = "" + + for line in text.split('\n'): + # Если добавление строки превысит лимит + if len(current) + len(line) + 1 > max_length: + if current: + parts.append(current) + # Если строка сама по себе длиннее лимита — режем её + while len(line) > max_length: + parts.append(line[:max_length]) + line = line[max_length:] + current = line + else: + current += ('\n' if current else '') + line + + if current: + parts.append(current) + + return parts + + +async def send_long_message(update: Update, text: str, parse_mode: str = None): + """ + Отправить длинный текст, разбив на несколько сообщений. + Если parse_mode="Markdown" и текст содержит блоки кода — отправляет без разметки. + """ + parts = split_message(text) + + for i, part in enumerate(parts): + # Добавляем номер части если их несколько + if len(parts) > 1: + header = f"({i+1}/{len(parts)}) " + if len(header) + len(part) <= MAX_MESSAGE_LENGTH: + part = header + part + + # Если это не первая часть и был Markdown — убираем parse_mode + # чтобы не было проблем с разорванной разметкой + actual_parse_mode = parse_mode if i == 0 else None + + try: + await update.message.reply_text(part, parse_mode=actual_parse_mode) + except Exception as e: + # Фоллбэк: отправляем без разметки + logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}") + await update.message.reply_text(part) + + # Небольшая пауза между сообщениями + await asyncio.sleep(0.1) + + +def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str: + """ + Форматировать длинный вывод: показать первые и последние строки. + По умолчанию: первые 10 + последние 10 строк = 20 строк максимум. + """ + lines = text.split('\n') + total_lines = len(lines) + + if total_lines <= max_lines: + return text + + # Показываем первые head_lines и последние tail_lines + head = lines[:head_lines] + tail = lines[-tail_lines:] + + skipped = total_lines - head_lines - tail_lines + + result = '\n'.join(head) + result += f'\n\n... ({skipped} строк пропущено) ...\n' + result += '\n'.join(tail) + + return result diff --git a/bot/utils/ssh_readers.py b/bot/utils/ssh_readers.py new file mode 100644 index 0000000..c8fb493 --- /dev/null +++ b/bot/utils/ssh_readers.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +"""Утилиты для чтения вывода SSH и PTY.""" + +import asyncio +import fcntl +import logging +import os +import re +import select +from typing import Optional, Tuple + +import asyncssh + +logger = logging.getLogger(__name__) + +# Импортируем паттерны из session +from bot.models.session import INPUT_PATTERNS + + +def detect_input_type(text: str) -> Optional[str]: + """Определить тип запроса ввода по тексту.""" + text = text.strip() + + # Проверка на пароль + for pattern in INPUT_PATTERNS["password"]: + if re.search(pattern, text, re.MULTILINE): + return "password" + + # Проверка на подтверждение + for pattern in INPUT_PATTERNS["confirm"]: + if re.search(pattern, text, re.MULTILINE): + return "confirm" + + # Проверка на приглашение оболочки + for pattern in INPUT_PATTERNS["shell_prompt"]: + if re.search(pattern, text, re.MULTILINE): + return "prompt" + + return None + + +async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> Tuple[str, bool]: + """ + Чтение вывода из SSH-процесса с таймаутом. + Возвращает (вывод, завершён_ли_процесс). + """ + output = "" + is_done = False + + try: + # Используем readany() для чтения доступных данных + while True: + try: + # readany() читает любые доступные данные + data = await asyncio.wait_for(process.stdout.readany(), timeout=timeout) + if data: + if isinstance(data, bytes): + output += data.decode('utf-8', errors='replace') + else: + output += str(data) + logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}") + else: + # EOF + is_done = True + break + except asyncio.TimeoutError: + # Данные закончились + logger.debug(f"Timeout stdout, прочитано: {len(output)} байт") + if process.returncode is not None: + is_done = True + break + except UnicodeDecodeError as e: + logger.debug(f"Ошибка декодирования UTF-8: {e}") + continue + except Exception as e: + # Конец потока + logger.debug(f"Конец потока stdout: {type(e).__name__}: {e}") + is_done = True + break + except Exception as e: + logger.debug(f"Ошибка чтения SSH stdout: {e}") + is_done = True + + # Читаем stderr если есть + error_output = "" + try: + while True: + try: + data = await asyncio.wait_for(process.stderr.readany(), timeout=0.5) + if data: + if isinstance(data, bytes): + error_output += data.decode('utf-8', errors='replace') + else: + error_output += str(data) + else: + break + except (asyncio.TimeoutError, Exception): + break + except Exception as e: + logger.debug(f"Ошибка чтения SSH stderr: {e}") + + # Объединяем stdout и stderr + if error_output: + output = output + error_output if output else error_output + + logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}") + return output, is_done + + +def read_pty_output(master_fd: int, timeout: float = 2.0) -> Tuple[str, bool]: + """ + Чтение вывода из PTY с таймаутом. + Возвращает (вывод, завершён_ли_процесс). + """ + output = "" + is_done = False + total_waited = 0 + + try: + # Устанавливаем non-blocking режим + flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) + fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + while total_waited < timeout: + try: + # Ждём данные с коротким таймаутом + ready, _, _ = select.select([master_fd], [], [], 0.2) + if ready: + try: + data = os.read(master_fd, 4096) + if data: + output += data.decode('utf-8', errors='replace') + logger.debug(f"Прочитано из PTY: {len(data)} байт") + # Сбрасываем таймер если есть данные + total_waited = 0 + else: + is_done = True + break + except BlockingIOError: + # Нет данных, продолжаем ждать + pass + else: + # Timeout - проверяем не завершился ли процесс + try: + _, status = os.waitpid(-1, os.WNOHANG) + if status != 0: + logger.debug(f"Процесс завершился со статусом: {status}") + is_done = True + break + except ChildProcessError: + pass + + # Если уже что-то прочитали и есть запрос ввода - выходим + if output and detect_input_type(output): + logger.debug(f"Обнаружен запрос ввода") + break + + total_waited += 0.2 + + except Exception as e: + logger.debug(f"Ошибка при чтении PTY: {e}") + break + + except Exception as e: + logger.debug(f"Ошибка чтения PTY: {e}") + is_done = True + + logger.debug(f"read_pty_output: output={len(output)} байт, is_done={is_done}") + return output, is_done