refactor: модульная структура бота (v0.5.0)

Рефакторинг кода бота для улучшения поддерживаемости:

Структура:
- bot/models/ — модели данных (Server, UserState, сессии)
- bot/utils/ — утилиты (очистка текста, форматирование, декораторы)
- bot/keyboards/ — Inline-клавиатуры и меню
- bot/services/ — бизнес-логика (выполнение команд)
- bot/handlers/ — обработчики событий (пустые для будущего заполнения)
- bot/config.py — конфигурация и глобальные объекты

Изменения:
- bot.py сокращён с 3240 до 2364 строк (-900 строк дубликатов)
- Все модели перенесены в отдельные модули
- Утилиты разделены по назначению (cleaners, formatters, decorators)
- Меню вынесено в keyboards/menus.py
- Импорты из новой структуры через bot.config и bot.models

Преимущества:
- Лучшая организация кода
- Упрощённое тестирование модулей
- Легче добавлять новый функционал
- Чёткое разделение ответственности

Version: 0.5.0

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
mirivlad 2026-02-24 23:32:26 +08:00
parent ca6721090c
commit d1592c7b38
17 changed files with 1179 additions and 888 deletions

902
bot.py
View File

@ -2,6 +2,8 @@
"""
Telegram CLI Bot - бот для выполнения CLI команд с многоуровневым меню.
Легкое добавление новых команд через регистрацию хендлеров.
Версия: 0.5.0 (рефакторинг)
"""
import os
@ -9,28 +11,18 @@ import sys
import asyncio
import subprocess
import logging
import getpass
import re
import pty
import tty
import termios
import select
import fcntl
from pathlib import Path
from typing import Optional, Callable, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from functools import wraps
from datetime import datetime, timedelta
# Лимиты Telegram
MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения
import pexpect
import asyncssh
from qwen_integration import qwen_manager, QwenSessionState
# Подавляем логи sentence-transformers и huggingface
import logging
logging.getLogger("sentence_transformers").setLevel(logging.WARNING)
logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
@ -71,887 +63,21 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
# --- Конфигурация бота из переменных окружения ---
class BotConfig:
"""Конфигурация бота из переменных окружения."""
# ============================================================================
# ИМПОРТЫ ИЗ bot/ - новая модульная структура
# ============================================================================
from bot.config import config, state_manager, menu_builder, command_registry, server_manager
from bot.models.server import Server
from bot.models.session import SSHSession, SSHSessionManager, LocalSession, LocalSessionManager, INPUT_PATTERNS
from bot.utils.cleaners import clean_ansi_codes, normalize_output
from bot.utils.formatters import escape_markdown, split_message, send_long_message, format_long_output, MAX_MESSAGE_LENGTH
from bot.utils.ssh_readers import detect_input_type, read_ssh_output, read_pty_output
from bot.utils.decorators import check_access
from bot.keyboards.menus import MenuItem, init_menus
def __init__(self):
self.name = os.getenv("BOT_NAME", "CLI Assistant")
self.description = os.getenv("BOT_DESCRIPTION", "Бот для выполнения CLI команд")
self.icon = os.getenv("BOT_ICON_EMOJI", "🤖")
self.working_directory = os.getenv("WORKING_DIRECTORY", str(Path.home()))
# Парсинг списка разрешённых пользователей
allowed_users_str = os.getenv("ALLOWED_USERS", "")
if allowed_users_str.strip():
self.allowed_users = [
int(uid.strip())
for uid in allowed_users_str.split(",")
if uid.strip().isdigit()
]
else:
self.allowed_users = []
@property
def is_access_restricted(self) -> bool:
"""Проверка: ограничен ли доступ."""
return len(self.allowed_users) > 0
# --- Серверы ---
@dataclass
class Server:
"""Конфигурация сервера."""
name: str
host: str
port: int
user: str
tags: List[str] = field(default_factory=list)
password: str = ""
@property
def display_name(self) -> str:
"""Отображаемое имя с иконкой."""
icon = "🖥️"
if "local" in self.tags:
icon = "💻"
elif "db" in self.tags:
icon = "🗄️"
elif "web" in self.tags:
icon = "🌐"
return f"{icon} {self.name}"
@property
def description(self) -> str:
"""Краткое описание сервера."""
return f"{self.user}@{self.host}:{self.port}"
class ServerManager:
"""Управление серверами."""
def __init__(self):
self._servers: Dict[str, Server] = {}
self._default_server: str = "local"
self._ssh_key_path: Optional[str] = None
# Локальный сервер всегда доступен
try:
local_user = getpass.getuser()
except Exception:
local_user = "user"
self._servers["local"] = Server(
name="local",
host="localhost",
port=22,
user=local_user,
tags=["local", "dev"]
)
def load_from_env(self):
"""Загрузка серверов из переменных окружения."""
self._ssh_key_path = os.getenv("SSH_KEY_PATH")
self._default_server = os.getenv("DEFAULT_SERVER", "local")
servers_str = os.getenv("SERVERS", "")
if not servers_str.strip():
return
# Парсинг формата: name|host|port|user|tags|password,name|host|port|user|tags|password
# Теги могут содержать запятые, поэтому разбираем по частям
for server_line in servers_str.split(","):
if not server_line.strip():
continue
parts = server_line.strip().split("|")
if len(parts) < 4:
continue
try:
name = parts[0].strip()
host = parts[1].strip()
port = int(parts[2].strip())
user = parts[3].strip()
# Теги (часть 4) и пароль (часть 5) могут отсутствовать
tags = []
password = ""
if len(parts) >= 5 and parts[4].strip():
tags = [t.strip() for t in parts[4].split(",") if t.strip()]
if len(parts) >= 6:
password = parts[5].strip()
server = Server(name=name, host=host, port=port, user=user, tags=tags, password=password)
self._servers[name] = server
logger.info(f"Загружен сервер: {server.display_name} ({server.description})")
except ValueError as e:
logger.warning(f"Ошибка парсинга сервера: {parts} - {e}")
def get(self, name: str) -> Optional[Server]:
"""Получить сервер по имени."""
return self._servers.get(name)
def list_servers(self) -> List[Server]:
"""Список всех серверов."""
return list(self._servers.values())
def get_by_tags(self, tags: List[str]) -> List[Server]:
"""Получить серверы по тегам."""
result = []
for server in self._servers.values():
if any(tag in server.tags for tag in tags):
result.append(server)
return result
@property
def default_server(self) -> str:
"""Имя сервера по умолчанию."""
return self._default_server
@property
def ssh_key_path(self) -> Optional[str]:
"""Путь к SSH ключу."""
return self._ssh_key_path
def get_keyboard(self, exclude_local: bool = False) -> InlineKeyboardMarkup:
"""Создать клавиатуру с выбором сервера."""
keyboard = []
for server in self._servers.values():
if exclude_local and server.name == "local":
continue
button = InlineKeyboardButton(
server.display_name,
callback_data=f"server_select_{server.name}"
)
keyboard.append([button])
return InlineKeyboardMarkup(keyboard)
def add_server(self, name: str, host: str, port: int, user: str, tags: List[str] = None, password: str = "") -> bool:
"""Добавить сервер."""
if name in self._servers:
return False
self._servers[name] = Server(name=name, host=host, port=port, user=user, tags=tags or [], password=password)
self.save_to_env()
return True
def update_server(self, name: str, host: str = None, port: int = None,
user: str = None, tags: List[str] = None, password: str = None) -> bool:
"""Обновить сервер."""
if name not in self._servers or name == "local":
return False
server = self._servers[name]
if host:
server.host = host
if port:
server.port = port
if user:
server.user = user
if tags is not None:
server.tags = tags
if password is not None:
server.password = password
self.save_to_env()
return True
def delete_server(self, name: str) -> bool:
"""Удалить сервер."""
if name not in self._servers or name == "local":
return False
del self._servers[name]
self.save_to_env()
return True
def save_to_env(self):
"""Сохранить серверы в .env файл."""
env_file = Path(__file__).parent / ".env"
# Читаем существующий файл
lines = []
if env_file.exists():
with open(env_file, "r", encoding="utf-8") as f:
lines = f.readlines()
# Формируем строку серверов
server_parts = []
for server in self._servers.values():
if server.name == "local":
continue
tags_str = ",".join(server.tags) if server.tags else ""
# Формат: name|host|port|user|tags|password
server_parts.append(f"{server.name}|{server.host}|{server.port}|{server.user}|{tags_str}|{server.password}")
servers_line = f"SERVERS={','.join(server_parts)}\n"
# Ищем и обновляем или добавляем строку SERVERS
found = False
for i, line in enumerate(lines):
if line.startswith("SERVERS="):
lines[i] = servers_line
found = True
break
if not found:
lines.append("\n" + servers_line)
# Записываем обратно
with open(env_file, "w", encoding="utf-8") as f:
f.writelines(lines)
logger.info(f"Серверы сохранены в {env_file}")
# --- Хранилище состояний пользователя ---
@dataclass
class UserState:
"""Состояние пользователя в диалоге."""
current_menu: str = "main"
waiting_for_input: bool = False
input_type: Optional[str] = None # "name", "host", "port", "user", "tags", "server_action"
parent_menu: Optional[str] = None
context: Dict[str, Any] = field(default_factory=dict)
working_directory: Optional[str] = None
current_server: str = "local" # Имя текущего сервера
editing_server: Optional[str] = None # Имя сервера, который редактируем
ai_chat_mode: bool = False # Режим чата с ИИ агентом
ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ
messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов
class StateManager:
"""Управление состояниями пользователей."""
def __init__(self):
self._states: Dict[int, UserState] = {}
def get(self, user_id: int) -> UserState:
if user_id not in self._states:
self._states[user_id] = UserState()
return self._states[user_id]
def reset(self, user_id: int):
self._states[user_id] = UserState()
# --- Интерактивные SSH-сессии ---
@dataclass
class SSHSession:
"""Интерактивная SSH-сессия."""
user_id: int
server: Server
working_dir: str
conn: asyncssh.SSHClientConnection
process: asyncssh.SSHClientProcess
output_buffer: str = ""
waiting_for_input: bool = False
input_type: str = "" # "password", "confirm", "text"
last_activity: datetime = field(default_factory=datetime.now)
command: str = ""
SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности
def is_expired(self) -> bool:
"""Проверка истечения таймаута сессии."""
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class SSHSessionManager:
"""Менеджер интерактивных SSH-сессий."""
def __init__(self):
self._sessions: Dict[int, SSHSession] = {}
def create_session(self, user_id: int, server: Server, working_dir: str,
conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess,
command: str = "") -> SSHSession:
"""Создать новую сессию."""
session = SSHSession(
user_id=user_id,
server=server,
working_dir=working_dir,
conn=conn,
process=process,
command=command
)
self._sessions[user_id] = session
logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}")
return session
def get_session(self, user_id: int) -> Optional[SSHSession]:
"""Получить сессию пользователя."""
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def close_session(self, user_id: int):
"""Закрыть сессию пользователя."""
session = self._sessions.pop(user_id, None)
if session:
try:
if session.process:
session.process.stdin.close()
session.process.stdout.feed_eof()
if session.conn:
session.conn.close()
logger.info(f"Закрыта SSH-сессия для пользователя {user_id}")
except Exception as e:
logger.warning(f"Ошибка при закрытии сессии: {e}")
def has_active_session(self, user_id: int) -> bool:
"""Проверка наличия активной сессии."""
return self.get_session(user_id) is not None
def cleanup_expired(self):
"""Очистка истёкших сессий."""
expired = [uid for uid, s in self._sessions.items() if s.is_expired()]
for uid in expired:
self.close_session(uid)
# Менеджер SSH-сессий
# Глобальные менеджеры сессий
ssh_session_manager = SSHSessionManager()
@dataclass
class LocalSession:
"""Интерактивная локальная сессия."""
user_id: int
command: str
master_fd: int
pid: int
output_buffer: str = ""
waiting_for_input: bool = False
input_type: str = ""
last_activity: datetime = field(default_factory=datetime.now)
SESSION_TIMEOUT = timedelta(minutes=5)
def is_expired(self) -> bool:
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class LocalSessionManager:
"""Менеджер локальных интерактивных сессий."""
def __init__(self):
self._sessions: Dict[int, LocalSession] = {}
def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession:
session = LocalSession(
user_id=user_id,
command=command,
master_fd=master_fd,
pid=pid
)
self._sessions[user_id] = session
logger.info(f"Создана локальная сессия для пользователя {user_id}")
return session
def get_session(self, user_id: int) -> Optional[LocalSession]:
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def close_session(self, user_id: int):
session = self._sessions.pop(user_id, None)
if session:
try:
# Закрываем pexpect процесс если есть
child = session.context.get('child') if hasattr(session, 'context') and session.context else None
if child:
child.close(force=True)
else:
# Старый способ для PTY
os.close(session.master_fd)
os.kill(session.pid, 9)
except:
pass
logger.info(f"Закрыта локальная сессия для пользователя {user_id}")
def has_active_session(self, user_id: int) -> bool:
return self.get_session(user_id) is not None
# Менеджер локальных сессий
local_session_manager = LocalSessionManager()
# Паттерны для детектирования запросов ввода
INPUT_PATTERNS = {
"password": [
r"[Pp]assword[:\s]*$",
r"[Pp]assphrase[:\s]*$",
r"Enter password[:\s]*$",
r"sudo password[:\s]*$",
r"\[sudo\] password for .*:",
r"[Пп]ароль[:\s]*$",
r"\[sudo\] пароль для .*:",
r"Введите пароль[:\s]*$",
],
"confirm": [
r"[Yy]es/[Nn]o[?:\s]*$",
r"\[?[Yy]\]?/?\[?[Nn]\]?",
r"Do you want to continue",
r"Continue\?",
r"Are you sure",
r"Is this OK",
r"[Yy]es or [Nn]o",
r"[Дд]а/[Нн]ет",
r"[Пп]родолжить",
],
"shell_prompt": [
r"[$#]\s*$",
r"[>$]\s*$",
r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$",
],
}
def detect_input_type(text: str) -> Optional[str]:
"""Определить тип запроса ввода по тексту."""
text = text.strip()
# Проверка на пароль
for pattern in INPUT_PATTERNS["password"]:
if re.search(pattern, text, re.MULTILINE):
return "password"
# Проверка на подтверждение
for pattern in INPUT_PATTERNS["confirm"]:
if re.search(pattern, text, re.MULTILINE):
return "confirm"
# Проверка на приглашение оболочки
for pattern in INPUT_PATTERNS["shell_prompt"]:
if re.search(pattern, text, re.MULTILINE):
return "prompt"
return None
def clean_ansi_codes(text: str) -> str:
"""
Очистка ANSI-кодов и мусора из вывода терминала.
Обрабатывает:
- ANSI escape-последовательности \x1b[...m
- «Битые» ANSI-коды без escape-символа (например [33m, [0m)
- Символы замены Unicode ()
- Кириллические имитации ANSI-кодов (например [0м)
"""
# Удаляем ANSI escape-последовательности
text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z]', '', text)
# Удаляем «битые» ANSI-коды: [33m, [0m, [1m и т.д. (латиница и кириллица)
text = re.sub(r'\[\d+[мm]', '', text)
# Удаляем символы замены Unicode
text = text.replace('\ufffd', '')
return text
def escape_markdown(text: str) -> str:
"""
Экранирование специальных символов Markdown для Telegram API.
"""
text = text.replace('```', '\\`\\`\\`')
return text
def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[str]:
"""
Разбить длинный текст на сообщения <= max_length символов.
Старается разбивать по границам строк или блоков кода.
"""
if len(text) <= max_length:
return [text]
parts = []
current = ""
for line in text.split('\n'):
# Если добавление строки превысит лимит
if len(current) + len(line) + 1 > max_length:
if current:
parts.append(current)
# Если строка сама по себе длиннее лимита — режем её
while len(line) > max_length:
parts.append(line[:max_length])
line = line[max_length:]
current = line
else:
current += ('\n' if current else '') + line
if current:
parts.append(current)
return parts
async def send_long_message(update: Update, text: str, parse_mode: str = None):
"""
Отправить длинный текст, разбив на несколько сообщений.
Если parse_mode="Markdown" и текст содержит блоки кода отправляет без разметки.
"""
parts = split_message(text)
for i, part in enumerate(parts):
# Добавляем номер части если их несколько
if len(parts) > 1:
header = f"({i+1}/{len(parts)}) "
if len(header) + len(part) <= MAX_MESSAGE_LENGTH:
part = header + part
# Если это не первая часть и был Markdown — убираем parse_mode
# чтобы не было проблем с разорванной разметкой
actual_parse_mode = parse_mode if i == 0 else None
try:
await update.message.reply_text(part, parse_mode=actual_parse_mode)
except Exception as e:
# Фоллбэк: отправляем без разметки
logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}")
await update.message.reply_text(part)
# Небольшая пауза между сообщениями
await asyncio.sleep(0.1)
def normalize_output(text: str) -> str:
"""
Нормализовать вывод: обработать \r и убрать пустые строки.
\r используется для перезаписи строки (прогресс-баров).
"""
# Заменяем \r\n на \n
text = text.replace('\r\n', '\n')
# Обрабатываем \r (возврат каретки) — строки с \r перезаписывают друг друга
lines = []
for line in text.split('\n'):
if '\r' in line:
# Разбиваем по \r и берём последнюю часть (финальное состояние)
parts = line.split('\r')
line = parts[-1]
lines.append(line)
text = '\n'.join(lines)
# Разбиваем на строки, убираем пустые и trailing пробелы
lines = text.split('\n')
lines = [line.rstrip() for line in lines if line.strip()]
# Очищаем прогресс-бары вида "Текст… 0%Текст… 50%Текст… 100%"
# И дублирующийся текст
cleaned_lines = []
for line in lines:
# Ищем повторяющийся паттерн "текст… цифры%"
progress_pattern = re.compile(r'((?:.+?\.{3})\d+%)+')
match = progress_pattern.search(line)
if match:
# Берём последнее вхождение
items = re.findall(r'(.+?\.{3})(\d+)%', match.group(0))
if items:
last_text, last_percent = items[-1]
line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():]
# СНАЧАЛА удаляем остатки ANSI-кодов из строки
line = re.sub(r'.', '', line) #  + любой символ
# Удаляем дублирующийся текст вида "0% [текст] 0% [текст]"
dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+')
match = dup_pattern.search(line)
if match:
# Оставляем только первое вхождение
line = line[:match.start()] + match.group(1) + line[match.end():]
# Удаляем ведущие пробелы (артефакты терминала)
line = line.lstrip()
if line:
cleaned_lines.append(line)
return '\n'.join(cleaned_lines)
def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str:
"""
Форматировать длинный вывод: показать первые и последние строки.
По умолчанию: первые 10 + последние 10 строк = 20 строк максимум.
"""
lines = text.split('\n')
total_lines = len(lines)
if total_lines <= max_lines:
return text
# Показываем первые head_lines и последние tail_lines
head = lines[:head_lines]
tail = lines[-tail_lines:]
skipped = total_lines - head_lines - tail_lines
result = '\n'.join(head)
result += f'\n\n... ({skipped} строк пропущено) ...\n'
result += '\n'.join(tail)
return result
async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> tuple[str, bool]:
"""
Чтение вывода из SSH-процесса с таймаутом.
Возвращает (вывод, завершён_ли_процесс).
"""
output = ""
is_done = False
try:
# Используем readany() для чтения доступных данных
while True:
try:
# readany() читает любые доступные данные
data = await asyncio.wait_for(process.stdout.readany(), timeout=timeout)
if data:
if isinstance(data, bytes):
output += data.decode('utf-8', errors='replace')
else:
output += str(data)
logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}")
else:
# EOF
is_done = True
break
except asyncio.TimeoutError:
# Данные закончились
logger.debug(f"Timeout stdout, прочитано: {len(output)} байт")
if process.returncode is not None:
is_done = True
break
except UnicodeDecodeError as e:
logger.debug(f"Ошибка декодирования UTF-8: {e}")
continue
except Exception as e:
# Конец потока
logger.debug(f"Конец потока stdout: {type(e).__name__}: {e}")
is_done = True
break
except Exception as e:
logger.debug(f"Ошибка чтения SSH stdout: {e}")
is_done = True
# Читаем stderr если есть
error_output = ""
try:
while True:
try:
data = await asyncio.wait_for(process.stderr.readany(), timeout=0.5)
if data:
if isinstance(data, bytes):
error_output += data.decode('utf-8', errors='replace')
else:
error_output += str(data)
else:
break
except (asyncio.TimeoutError, Exception):
break
except Exception as e:
logger.debug(f"Ошибка чтения SSH stderr: {e}")
# Объединяем stdout и stderr
if error_output:
output = output + error_output if output else error_output
logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}")
return output, is_done
def read_pty_output(master_fd: int, timeout: float = 2.0) -> tuple[str, bool]:
"""
Чтение вывода из PTY с таймаутом.
Возвращает (вывод, завершён_ли_процесс).
"""
output = ""
is_done = False
total_waited = 0
try:
# Устанавливаем non-blocking режим
flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
while total_waited < timeout:
try:
# Ждём данные с коротким таймаутом
ready, _, _ = select.select([master_fd], [], [], 0.2)
if ready:
try:
data = os.read(master_fd, 4096)
if data:
output += data.decode('utf-8', errors='replace')
logger.debug(f"Прочитано из PTY: {len(data)} байт")
# Сбрасываем таймер если есть данные
total_waited = 0
else:
is_done = True
break
except BlockingIOError:
# Нет данных, продолжаем ждать
pass
else:
# Timeout - проверяем не завершился ли процесс
try:
_, status = os.waitpid(-1, os.WNOHANG)
if status != 0:
logger.debug(f"Процесс завершился со статусом: {status}")
is_done = True
break
except ChildProcessError:
pass
# Если уже что-то прочитали и есть запрос ввода - выходим
if output and detect_input_type(output):
logger.debug(f"Обнаружен запрос ввода")
break
total_waited += 0.2
except Exception as e:
logger.debug(f"Ошибка при чтении PTY: {e}")
break
except Exception as e:
logger.debug(f"Ошибка чтения PTY: {e}")
is_done = True
logger.debug(f"read_pty_output: output={len(output)} байт, is_done={is_done}")
return output, is_done
# --- Система команд ---
@dataclass
class MenuItem:
"""Элемент меню."""
label: str
callback: str # callback_data для кнопки
description: str = ""
icon: str = ""
children: List["MenuItem"] = field(default_factory=list)
command: Optional[str] = None # CLI команда для выполнения
is_command: bool = False
class MenuBuilder:
"""Построитель многоуровневого меню."""
def __init__(self):
self._menus: Dict[str, List[MenuItem]] = {}
def add_menu(self, menu_name: str, items: List[MenuItem]):
self._menus[menu_name] = items
def get_menu(self, menu_name: str) -> List[MenuItem]:
return self._menus.get(menu_name, [])
def get_keyboard(self, menu_name: str, user_id: int = None, state: UserState = None) -> InlineKeyboardMarkup:
"""Создает InlineKeyboard для меню."""
items = self._menus.get(menu_name, [])
keyboard = []
# Для главного меню — динамически меняем кнопку ИИ
if menu_name == "main" and user_id:
# Используем переданное состояние или получаем из менеджера
if state is None:
state = state_manager.get(user_id)
logger.info(f"get_keyboard: user_id={user_id}, ai_chat_mode={state.ai_chat_mode}")
for item in items:
# Проверяем базовый callback и его варианты с _on/_off
is_ai_toggle = item.callback in ["toggle_ai_chat", "toggle_ai_chat_on", "toggle_ai_chat_off"]
if is_ai_toggle:
# Меняем текст кнопки и callback_data в зависимости от статуса
if state.ai_chat_mode:
label = f"✅ Выключить чат с ИИ"
callback = "toggle_ai_chat_off"
else:
label = f"❌ Включить чат с ИИ"
callback = "toggle_ai_chat_on"
logger.info(f"get_keyboard: label={label}, callback={callback}")
button = InlineKeyboardButton(label, callback_data=callback)
else:
button = InlineKeyboardButton(item.label, callback_data=item.callback)
keyboard.append([button])
else:
for item in items:
button = InlineKeyboardButton(
item.label,
callback_data=item.callback
)
keyboard.append([button])
return InlineKeyboardMarkup(keyboard)
class CommandRegistry:
"""Реестр команд для легкого добавления."""
def __init__(self):
self._commands: Dict[str, Callable] = {}
def register(self, name: str):
"""Декоратор для регистрации команды."""
def decorator(func: Callable):
self._commands[name] = func
return func
return decorator
def get(self, name: str) -> Optional[Callable]:
return self._commands.get(name)
def list_commands(self) -> List[str]:
return list(self._commands.keys())
# --- Глобальные объекты ---
config = BotConfig()
state_manager = StateManager()
menu_builder = MenuBuilder()
command_registry = CommandRegistry()
server_manager = ServerManager()
# --- Проверка прав доступа ---
def check_access(func):
"""Декоратор для проверки прав доступа пользователя."""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
# Если доступ не ограничен — пропускаем всех
if not config.is_access_restricted:
return await func(update, context, *args, **kwargs)
if user_id not in config.allowed_users:
logger.warning(f"Попытка доступа от запрещённого пользователя {user_id}")
await update.message.reply_text(
"❌ *Доступ запрещён*\n\n"
"Ваш ID не добавлен в список разрешённых пользователей.\n"
f"Ваш ID: `{user_id}`",
parse_mode="Markdown"
)
return
return await func(update, context, *args, **kwargs)
return wrapper
# --- Инициализация меню ---
def init_menus():
"""Инициализация структуры меню."""

22
bot/__init__.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python3
"""
Telegram CLI Bot - модульная структура.
Пакет bot содержит все компоненты бота:
- models: модели данных (Server, UserState, сессии)
- handlers: обработчики событий (команды, сообщения, callback)
- services: бизнес-логика (выполнение команд)
- keyboards: Inline-клавиатуры
- utils: утилиты (очистка текста, форматирование)
- config: конфигурация и глобальные объекты
"""
from bot.config import config, state_manager, menu_builder, command_registry, server_manager
__all__ = [
"config",
"state_manager",
"menu_builder",
"command_registry",
"server_manager",
]

54
bot/config.py Normal file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
"""Конфигурация бота и глобальные объекты."""
import os
import logging
import getpass
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
# Загрузка переменных окружения из .env
load_dotenv()
logger = logging.getLogger(__name__)
# --- Конфигурация бота ---
class BotConfig:
"""Конфигурация бота из переменных окружения."""
def __init__(self):
self.name = os.getenv("BOT_NAME", "CLI Assistant")
self.description = os.getenv("BOT_DESCRIPTION", "Бот для выполнения CLI команд")
self.icon = os.getenv("BOT_ICON_EMOJI", "🤖")
self.working_directory = os.getenv("WORKING_DIRECTORY", str(Path.home()))
# Парсинг списка разрешённых пользователей
allowed_users_str = os.getenv("ALLOWED_USERS", "")
if allowed_users_str.strip():
self.allowed_users = [
int(uid.strip())
for uid in allowed_users_str.split(",")
if uid.strip().isdigit()
]
else:
self.allowed_users = []
@property
def is_access_restricted(self) -> bool:
"""Проверка: ограничен ли доступ."""
return len(self.allowed_users) > 0
# Импортируем модели и создаём глобальные объекты
from bot.models.server import ServerManager
from bot.models.user_state import StateManager
from bot.keyboards.menus import MenuBuilder, CommandRegistry
# Глобальные объекты
config = BotConfig()
state_manager = StateManager()
menu_builder = MenuBuilder()
command_registry = CommandRegistry()
server_manager = ServerManager()

0
bot/handlers/__init__.py Normal file
View File

11
bot/keyboards/__init__.py Normal file
View File

@ -0,0 +1,11 @@
#!/usr/bin/env python3
"""Клавиатуры бота."""
from bot.keyboards.menus import MenuItem, MenuBuilder, CommandRegistry, init_menus
__all__ = [
"MenuItem",
"MenuBuilder",
"CommandRegistry",
"init_menus",
]

200
bot/keyboards/menus.py Normal file
View File

@ -0,0 +1,200 @@
#!/usr/bin/env python3
"""Построитель многоуровневого меню."""
import logging
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
# Импортируем модели и утилиты
from bot.models.user_state import UserState, StateManager
logger = logging.getLogger(__name__)
# Глобальный state_manager для кнопки ИИ
state_manager = StateManager()
@dataclass
class MenuItem:
"""Элемент меню."""
label: str
callback: str # callback_data для кнопки
description: str = ""
icon: str = ""
children: List["MenuItem"] = field(default_factory=list)
command: Optional[str] = None # CLI команда для выполнения
is_command: bool = False
class MenuBuilder:
"""Построитель InlineKeyboard для меню."""
def __init__(self):
self._menus: Dict[str, List[MenuItem]] = {}
def add_menu(self, menu_name: str, items: List[MenuItem]):
self._menus[menu_name] = items
def get_menu(self, menu_name: str) -> List[MenuItem]:
return self._menus.get(menu_name, [])
def get_keyboard(self, menu_name: str, user_id: int = None, state: UserState = None) -> InlineKeyboardMarkup:
"""Создает InlineKeyboard для меню."""
items = self._menus.get(menu_name, [])
keyboard = []
# Для главного меню — динамически меняем кнопку ИИ
if menu_name == "main" and user_id:
# Используем переданное состояние или получаем из менеджера
if state is None:
state = state_manager.get(user_id)
logger.info(f"get_keyboard: user_id={user_id}, ai_chat_mode={state.ai_chat_mode}")
for item in items:
# Проверяем базовый callback и его варианты с _on/_off
is_ai_toggle = item.callback in ["toggle_ai_chat", "toggle_ai_chat_on", "toggle_ai_chat_off"]
if is_ai_toggle:
# Меняем текст кнопки и callback_data в зависимости от статуса
if state.ai_chat_mode:
label = f"✅ Выключить чат с ИИ"
callback = "toggle_ai_chat_off"
else:
label = f"❌ Включить чат с ИИ"
callback = "toggle_ai_chat_on"
logger.info(f"get_keyboard: label={label}, callback={callback}")
button = InlineKeyboardButton(label, callback_data=callback)
else:
button = InlineKeyboardButton(item.label, callback_data=item.callback)
keyboard.append([button])
else:
for item in items:
button = InlineKeyboardButton(
item.label,
callback_data=item.callback
)
keyboard.append([button])
return InlineKeyboardMarkup(keyboard)
class CommandRegistry:
"""Реестр команд для легкого добавления."""
def __init__(self):
self._commands: Dict[str, Callable] = {}
def register(self, name: str):
"""Декоратор для регистрации команды."""
def decorator(func: Callable):
self._commands[name] = func
return func
return decorator
def get(self, name: str) -> Optional[Callable]:
return self._commands.get(name)
def list_commands(self) -> List[str]:
return list(self._commands.keys())
def init_menus(menu_builder: MenuBuilder):
"""Инициализация структуры меню."""
# Главное меню
main_menu = [
MenuItem("🖥️ Выбор сервера", "server_menu", icon="🖥️"),
MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"),
MenuItem("💬 Чат с ИИ агентом", "toggle_ai_chat", icon="💬"),
MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"),
MenuItem(" О боте", "about", icon=""),
]
menu_builder.add_menu("main", main_menu)
# Меню серверов
server_menu = [
MenuItem("💻 local (localhost)", "server_select_local", icon="💻"),
MenuItem(" Добавить сервер", "server_add", icon=""),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("server", server_menu)
# Меню предустановленных команд
preset_menu = [
MenuItem("📁 Файловая система", "fs_menu", icon="📁"),
MenuItem("🔍 Поиск", "search_menu", icon="🔍"),
MenuItem("📊 Система", "system_menu", icon="📊"),
MenuItem("🌐 Сеть", "network_menu", icon="🌐"),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("preset", preset_menu)
# Файловая система
fs_menu = [
MenuItem("ls -la", "cmd_ls_la", command="ls -la", icon="📄"),
MenuItem("pwd", "cmd_pwd", command="pwd", icon="📍"),
MenuItem("df -h", "cmd_df", command="df -h", icon="💾"),
MenuItem("du -sh *", "cmd_du", command="du -sh * 2>/dev/null | sort -hr | head -20", icon="📊"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("fs", fs_menu)
# Поиск
search_menu = [
MenuItem("find . -name", "cmd_find_name", command="find . -maxdepth 3 -name '*.txt' 2>/dev/null", icon="🔎"),
MenuItem("grep пример", "cmd_grep", command="grep -r 'example' . 2>/dev/null | head -20", icon="🔍"),
MenuItem("which command", "cmd_which", command="which python3 bash git", icon="📍"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("search", search_menu)
# Система
system_menu = [
MenuItem("top -n 1", "cmd_top", command="top -bn1 | head -20", icon="📈"),
MenuItem("ps aux", "cmd_ps", command="ps aux | head -20", icon="🔄"),
MenuItem("free -h", "cmd_free", command="free -h", icon="💾"),
MenuItem("uname -a", "cmd_uname", command="uname -a", icon=""),
MenuItem("uptime", "cmd_uptime", command="uptime", icon="⏱️"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("system", system_menu)
# Сеть
network_menu = [
MenuItem("ip addr", "cmd_ip", command="ip addr 2>/dev/null || ifconfig 2>/dev/null", icon="🌐"),
MenuItem("ping google", "cmd_ping", command="ping -c 4 google.com 2>&1 | head -10", icon="📡"),
MenuItem("netstat", "cmd_netstat", command="ss -tuln 2>/dev/null || netstat -tuln 2>/dev/null | head -20", icon="🔌"),
MenuItem("curl ifconfig.me", "cmd_curl_ip", command="curl -s ifconfig.me 2>&1", icon="📍"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("network", network_menu)
# Настройки
settings_menu = [
MenuItem("📝 Изменить имя бота", "set_name", icon="📝"),
MenuItem("📄 Изменить описание", "set_description", icon="📄"),
MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"),
MenuItem("👥 Управление доступом", "access_menu", icon="👥"),
MenuItem("🧠 Память ИИ", "memory_menu", icon="🧠"),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("settings", settings_menu)
# Память ИИ
memory_menu = [
MenuItem("📋 Мой профиль", "memory_profile", icon="📋"),
MenuItem("📊 Статистика", "memory_stats", icon="📊"),
MenuItem("🗑️ Очистить историю", "memory_clear", icon="🗑️"),
MenuItem("⬅️ Назад", "settings", icon="⬅️"),
]
menu_builder.add_menu("memory", memory_menu)
# Доступ
access_menu = [
MenuItem("📋 Показать разрешённых", "show_access", icon="📋"),
MenuItem(" Добавить пользователя", "add_access", icon=""),
MenuItem(" Удалить пользователя", "remove_access", icon=""),
MenuItem("⬅️ Назад", "settings", icon="⬅️"),
]
menu_builder.add_menu("access", access_menu)

24
bot/models/__init__.py Normal file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
"""Модели данных бота."""
from bot.models.server import Server, ServerManager
from bot.models.user_state import UserState, StateManager
from bot.models.session import (
SSHSession,
SSHSessionManager,
LocalSession,
LocalSessionManager,
INPUT_PATTERNS
)
__all__ = [
"Server",
"ServerManager",
"UserState",
"StateManager",
"SSHSession",
"SSHSessionManager",
"LocalSession",
"LocalSessionManager",
"INPUT_PATTERNS",
]

216
bot/models/server.py Normal file
View File

@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""Модели серверов и управление ими."""
import os
import logging
import getpass
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from dotenv import load_dotenv
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
logger = logging.getLogger(__name__)
@dataclass
class Server:
"""Конфигурация сервера."""
name: str
host: str
port: int
user: str
tags: List[str] = field(default_factory=list)
password: str = ""
@property
def display_name(self) -> str:
"""Отображаемое имя с иконкой."""
icon = "🖥️"
if "local" in self.tags:
icon = "💻"
elif "db" in self.tags:
icon = "🗄️"
elif "web" in self.tags:
icon = "🌐"
return f"{icon} {self.name}"
@property
def description(self) -> str:
"""Краткое описание сервера."""
return f"{self.user}@{self.host}:{self.port}"
class ServerManager:
"""Управление серверами."""
def __init__(self):
self._servers: Dict[str, Server] = {}
self._default_server: str = "local"
self._ssh_key_path: Optional[str] = None
# Локальный сервер всегда доступен
try:
local_user = getpass.getuser()
except Exception:
local_user = "user"
self._servers["local"] = Server(
name="local",
host="localhost",
port=22,
user=local_user,
tags=["local", "dev"]
)
def load_from_env(self):
"""Загрузка серверов из переменных окружения."""
self._ssh_key_path = os.getenv("SSH_KEY_PATH")
self._default_server = os.getenv("DEFAULT_SERVER", "local")
servers_str = os.getenv("SERVERS", "")
if not servers_str.strip():
return
# Парсинг формата: name|host|port|user|tags|password,name|host|port|user|tags|password
for server_line in servers_str.split(","):
if not server_line.strip():
continue
parts = server_line.strip().split("|")
if len(parts) < 4:
continue
try:
name = parts[0].strip()
host = parts[1].strip()
port = int(parts[2].strip())
user = parts[3].strip()
# Теги (часть 4) и пароль (часть 5) могут отсутствовать
tags = []
password = ""
if len(parts) >= 5 and parts[4].strip():
tags = [t.strip() for t in parts[4].split(",") if t.strip()]
if len(parts) >= 6:
password = parts[5].strip()
server = Server(name=name, host=host, port=port, user=user, tags=tags, password=password)
self._servers[name] = server
logger.info(f"Загружен сервер: {server.display_name} ({server.description})")
except ValueError as e:
logger.warning(f"Ошибка парсинга сервера: {parts} - {e}")
def get(self, name: str) -> Optional[Server]:
"""Получить сервер по имени."""
return self._servers.get(name)
def list_servers(self) -> List[Server]:
"""Список всех серверов."""
return list(self._servers.values())
def get_by_tags(self, tags: List[str]) -> List[Server]:
"""Получить серверы по тегам."""
result = []
for server in self._servers.values():
if any(tag in server.tags for tag in tags):
result.append(server)
return result
@property
def default_server(self) -> str:
"""Имя сервера по умолчанию."""
return self._default_server
@property
def ssh_key_path(self) -> Optional[str]:
"""Путь к SSH ключу."""
return self._ssh_key_path
def get_keyboard(self, exclude_local: bool = False) -> InlineKeyboardMarkup:
"""Создать клавиатуру с выбором сервера."""
keyboard = []
for server in self._servers.values():
if exclude_local and server.name == "local":
continue
button = InlineKeyboardButton(
server.display_name,
callback_data=f"server_select_{server.name}"
)
keyboard.append([button])
return InlineKeyboardMarkup(keyboard)
def add_server(self, name: str, host: str, port: int, user: str, tags: List[str] = None, password: str = "") -> bool:
"""Добавить сервер."""
if name in self._servers:
return False
self._servers[name] = Server(name=name, host=host, port=port, user=user, tags=tags or [], password=password)
self.save_to_env()
return True
def update_server(self, name: str, host: str = None, port: int = None,
user: str = None, tags: List[str] = None, password: str = None) -> bool:
"""Обновить сервер."""
if name not in self._servers or name == "local":
return False
server = self._servers[name]
if host:
server.host = host
if port:
server.port = port
if user:
server.user = user
if tags is not None:
server.tags = tags
if password is not None:
server.password = password
self.save_to_env()
return True
def delete_server(self, name: str) -> bool:
"""Удалить сервер."""
if name not in self._servers or name == "local":
return False
del self._servers[name]
self.save_to_env()
return True
def save_to_env(self):
"""Сохранить серверы в .env файл."""
env_file = Path(__file__).parent.parent.parent / ".env"
# Читаем существующий файл
lines = []
if env_file.exists():
with open(env_file, "r", encoding="utf-8") as f:
lines = f.readlines()
# Формируем строку серверов
server_parts = []
for server in self._servers.values():
if server.name == "local":
continue
tags_str = ",".join(server.tags) if server.tags else ""
# Формат: name|host|port|user|tags|password
server_parts.append(f"{server.name}|{server.host}|{server.port}|{server.user}|{tags_str}|{server.password}")
servers_line = f"SERVERS={','.join(server_parts)}\n"
# Ищем и обновляем или добавляем строку SERVERS
found = False
for i, line in enumerate(lines):
if line.startswith("SERVERS="):
lines[i] = servers_line
found = True
break
if not found:
lines.append("\n" + servers_line)
# Записываем обратно
with open(env_file, "w", encoding="utf-8") as f:
f.writelines(lines)
logger.debug(f"Серверы сохранены в {env_file}")

185
bot/models/session.py Normal file
View File

@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""Модели интерактивных сессий (SSH и локальные)."""
import os
import logging
from typing import Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncssh
logger = logging.getLogger(__name__)
# Импортируем Server из соседнего модуля
from bot.models.server import Server
@dataclass
class SSHSession:
"""Интерактивная SSH-сессия."""
user_id: int
server: Server
working_dir: str
conn: asyncssh.SSHClientConnection
process: asyncssh.SSHClientProcess
output_buffer: str = ""
waiting_for_input: bool = False
input_type: str = "" # "password", "confirm", "text"
last_activity: datetime = field(default_factory=datetime.now)
command: str = ""
SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности
def is_expired(self) -> bool:
"""Проверка истечения таймаута сессии."""
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class SSHSessionManager:
"""Менеджер интерактивных SSH-сессий."""
def __init__(self):
self._sessions: Dict[int, SSHSession] = {}
def create_session(self, user_id: int, server: Server, working_dir: str,
conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess,
command: str = "") -> SSHSession:
"""Создать новую сессию."""
session = SSHSession(
user_id=user_id,
server=server,
working_dir=working_dir,
conn=conn,
process=process,
command=command
)
self._sessions[user_id] = session
logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}")
return session
def get_session(self, user_id: int) -> Optional[SSHSession]:
"""Получить сессию пользователя."""
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def close_session(self, user_id: int):
"""Закрыть сессию пользователя."""
session = self._sessions.pop(user_id, None)
if session:
try:
if session.process:
session.process.stdin.close()
session.process.stdout.feed_eof()
if session.conn:
session.conn.close()
logger.info(f"Закрыта SSH-сессия для пользователя {user_id}")
except Exception as e:
logger.warning(f"Ошибка при закрытии сессии: {e}")
def has_active_session(self, user_id: int) -> bool:
"""Проверка наличия активной сессии."""
return self.get_session(user_id) is not None
def cleanup_expired(self):
"""Очистка истёкших сессий."""
expired = [uid for uid, s in self._sessions.items() if s.is_expired()]
for uid in expired:
self.close_session(uid)
@dataclass
class LocalSession:
"""Интерактивная локальная сессия."""
user_id: int
command: str
master_fd: int
pid: int
output_buffer: str = ""
waiting_for_input: bool = False
input_type: str = ""
last_activity: datetime = field(default_factory=datetime.now)
context: Dict = field(default_factory=dict) # Для хранения pexpect child и другого
SESSION_TIMEOUT = timedelta(minutes=5)
def is_expired(self) -> bool:
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class LocalSessionManager:
"""Менеджер локальных интерактивных сессий."""
def __init__(self):
self._sessions: Dict[int, LocalSession] = {}
def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession:
session = LocalSession(
user_id=user_id,
command=command,
master_fd=master_fd,
pid=pid
)
self._sessions[user_id] = session
logger.info(f"Создана локальная сессия для пользователя {user_id}")
return session
def get_session(self, user_id: int) -> Optional[LocalSession]:
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def close_session(self, user_id: int):
session = self._sessions.pop(user_id, None)
if session:
try:
# Закрываем pexpect процесс если есть
child = session.context.get('child') if session.context else None
if child:
child.close(force=True)
else:
# Старый способ для PTY
os.close(session.master_fd)
os.kill(session.pid, 9)
except:
pass
logger.info(f"Закрыта локальная сессия для пользователя {user_id}")
def has_active_session(self, user_id: int) -> bool:
return self.get_session(user_id) is not None
# Паттерны для детектирования запросов ввода
INPUT_PATTERNS = {
"password": [
r"[Pp]assword[:\s]*$",
r"[Pp]assphrase[:\s]*$",
r"Enter password[:\s]*$",
r"sudo password[:\s]*$",
r"\[sudo\] password for .*:",
r"[Пп]ароль[:\s]*$",
r"\[sudo\] пароль для .*:",
r"Введите пароль[:\s]*$",
],
"confirm": [
r"[Yy]es/[Nn]o[?:\s]*$",
r"\[?[Yy]\]?/?\[?[Nn]\]?",
r"Do you want to continue",
r"Continue\?",
r"Are you sure",
r"Is this OK",
r"[Yy]es or [Nn]o",
r"[Дд]а/[Нн]ет",
r"[Пп]родолжить",
],
"shell_prompt": [
r"[$#]\s*$",
r"[>$]\s*$",
r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$",
],
}

36
bot/models/user_state.py Normal file
View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""Модели состояния пользователя."""
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
@dataclass
class UserState:
"""Состояние пользователя в диалоге."""
current_menu: str = "main"
waiting_for_input: bool = False
input_type: Optional[str] = None # "name", "host", "port", "user", "tags", "server_action"
parent_menu: Optional[str] = None
context: Dict[str, Any] = field(default_factory=dict)
working_directory: Optional[str] = None
current_server: str = "local" # Имя текущего сервера
editing_server: Optional[str] = None # Имя сервера, который редактируем
ai_chat_mode: bool = False # Режим чата с ИИ агентом
ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ
messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов
class StateManager:
"""Управление состояниями пользователей."""
def __init__(self):
self._states: Dict[int, UserState] = {}
def get(self, user_id: int) -> UserState:
if user_id not in self._states:
self._states[user_id] = UserState()
return self._states[user_id]
def reset(self, user_id: int):
self._states[user_id] = UserState()

5
bot/services/__init__.py Normal file
View File

@ -0,0 +1,5 @@
#!/usr/bin/env python3
"""Сервисы бота."""
# Заглушка для будущего импорта
# Функции будут перенесены из bot.py постепенно

View File

@ -0,0 +1,5 @@
#!/usr/bin/env python3
"""Сервисы бота - бизнес-логика выполнения команд."""
# Этот файл будет постепенно заполняться функциями из bot.py
# Пока импортируем всё из старого bot.py для обратной совместимости

21
bot/utils/__init__.py Normal file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env python3
"""Утилиты бота."""
from bot.utils.cleaners import clean_ansi_codes, normalize_output
from bot.utils.formatters import escape_markdown, split_message, send_long_message, format_long_output, MAX_MESSAGE_LENGTH
from bot.utils.decorators import check_access
from bot.utils.ssh_readers import detect_input_type, read_ssh_output, read_pty_output
__all__ = [
"clean_ansi_codes",
"normalize_output",
"escape_markdown",
"split_message",
"send_long_message",
"format_long_output",
"MAX_MESSAGE_LENGTH",
"check_access",
"detect_input_type",
"read_ssh_output",
"read_pty_output",
]

81
bot/utils/cleaners.py Normal file
View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Утилиты для очистки текста (ANSI-коды, нормализация)."""
import re
def clean_ansi_codes(text: str) -> str:
"""
Очистка ANSI-кодов и мусора из вывода терминала.
Обрабатывает:
- ANSI escape-последовательности \x1b[...m
- «Битые» ANSI-коды без escape-символа (например [33m, [0m)
- Символы замены Unicode ()
- Кириллические имитации ANSI-кодов (например [0м)
"""
# Удаляем ANSI escape-последовательности
text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z]', '', text)
# Удаляем «битые» ANSI-коды: [33m, [0m, [1m и т.д. (латиница и кириллица)
text = re.sub(r'\[\d+[мm]', '', text)
# Удаляем символы замены Unicode
text = text.replace('\ufffd', '')
return text
def normalize_output(text: str) -> str:
"""
Нормализовать вывод: обработать \r и убрать пустые строки.
\r используется для перезаписи строки (прогресс-баров).
"""
# Заменяем \r\n на \n
text = text.replace('\r\n', '\n')
# Обрабатываем \r (возврат каретки) — строки с \r перезаписывают друг друга
lines = []
for line in text.split('\n'):
if '\r' in line:
# Разбиваем по \r и берём последнюю часть (финальное состояние)
parts = line.split('\r')
line = parts[-1]
lines.append(line)
text = '\n'.join(lines)
# Разбиваем на строки, убираем пустые и trailing пробелы
lines = text.split('\n')
lines = [line.rstrip() for line in lines if line.strip()]
# Очищаем прогресс-бары вида "Текст… 0%Текст… 50%Текст… 100%"
# И дублирующийся текст
cleaned_lines = []
for line in lines:
# Ищем повторяющийся паттерн "текст… цифры%"
progress_pattern = re.compile(r'((?:.+?\.{3})\d+%)+')
match = progress_pattern.search(line)
if match:
# Берём последнее вхождение
items = re.findall(r'(.+?\.{3})(\d+)%', match.group(0))
if items:
last_text, last_percent = items[-1]
line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():]
# СНАЧАЛА удаляем остатки ANSI-кодов из строки
line = re.sub(r'.', '', line) # + любой символ
# Удаляем дублирующийся текст вида "0% [текст] 0% [текст]"
dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+')
match = dup_pattern.search(line)
if match:
# Оставляем только первое вхождение
line = line[:match.start()] + match.group(1) + line[match.end():]
# Удаляем ведущие пробелы (артефакты терминала)
line = line.lstrip()
if line:
cleaned_lines.append(line)
return '\n'.join(cleaned_lines)

33
bot/utils/decorators.py Normal file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python3
"""Декораторы для бота."""
import logging
from functools import wraps
from telegram import Update
from telegram.ext import ContextTypes
logger = logging.getLogger(__name__)
def check_access(func):
"""Декоратор для проверки прав доступа пользователя."""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
# Если доступ не ограничен — пропускаем всех
if not config.is_access_restricted:
return await func(update, context, *args, **kwargs)
if user_id not in config.allowed_users:
logger.warning(f"Попытка доступа от запрещённого пользователя {user_id}")
await update.message.reply_text(
"❌ *Доступ запрещён*\n\n"
"Ваш ID не добавлен в список разрешённых пользователей.\n"
f"Ваш ID: `{user_id}`",
parse_mode="Markdown"
)
return
return await func(update, context, *args, **kwargs)
return wrapper

103
bot/utils/formatters.py Normal file
View File

@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""Утилиты для форматирования и отправки сообщений."""
import asyncio
import logging
from typing import List
from telegram import Update
logger = logging.getLogger(__name__)
# Лимиты Telegram
MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения
def escape_markdown(text: str) -> str:
"""
Экранирование специальных символов Markdown для Telegram API.
"""
text = text.replace('```', '\\`\\`\\`')
return text
def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[str]:
"""
Разбить длинный текст на сообщения <= max_length символов.
Старается разбивать по границам строк или блоков кода.
"""
if len(text) <= max_length:
return [text]
parts = []
current = ""
for line in text.split('\n'):
# Если добавление строки превысит лимит
if len(current) + len(line) + 1 > max_length:
if current:
parts.append(current)
# Если строка сама по себе длиннее лимита — режем её
while len(line) > max_length:
parts.append(line[:max_length])
line = line[max_length:]
current = line
else:
current += ('\n' if current else '') + line
if current:
parts.append(current)
return parts
async def send_long_message(update: Update, text: str, parse_mode: str = None):
"""
Отправить длинный текст, разбив на несколько сообщений.
Если parse_mode="Markdown" и текст содержит блоки кода отправляет без разметки.
"""
parts = split_message(text)
for i, part in enumerate(parts):
# Добавляем номер части если их несколько
if len(parts) > 1:
header = f"({i+1}/{len(parts)}) "
if len(header) + len(part) <= MAX_MESSAGE_LENGTH:
part = header + part
# Если это не первая часть и был Markdown — убираем parse_mode
# чтобы не было проблем с разорванной разметкой
actual_parse_mode = parse_mode if i == 0 else None
try:
await update.message.reply_text(part, parse_mode=actual_parse_mode)
except Exception as e:
# Фоллбэк: отправляем без разметки
logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}")
await update.message.reply_text(part)
# Небольшая пауза между сообщениями
await asyncio.sleep(0.1)
def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str:
"""
Форматировать длинный вывод: показать первые и последние строки.
По умолчанию: первые 10 + последние 10 строк = 20 строк максимум.
"""
lines = text.split('\n')
total_lines = len(lines)
if total_lines <= max_lines:
return text
# Показываем первые head_lines и последние tail_lines
head = lines[:head_lines]
tail = lines[-tail_lines:]
skipped = total_lines - head_lines - tail_lines
result = '\n'.join(head)
result += f'\n\n... ({skipped} строк пропущено) ...\n'
result += '\n'.join(tail)
return result

169
bot/utils/ssh_readers.py Normal file
View File

@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""Утилиты для чтения вывода SSH и PTY."""
import asyncio
import fcntl
import logging
import os
import re
import select
from typing import Optional, Tuple
import asyncssh
logger = logging.getLogger(__name__)
# Импортируем паттерны из session
from bot.models.session import INPUT_PATTERNS
def detect_input_type(text: str) -> Optional[str]:
"""Определить тип запроса ввода по тексту."""
text = text.strip()
# Проверка на пароль
for pattern in INPUT_PATTERNS["password"]:
if re.search(pattern, text, re.MULTILINE):
return "password"
# Проверка на подтверждение
for pattern in INPUT_PATTERNS["confirm"]:
if re.search(pattern, text, re.MULTILINE):
return "confirm"
# Проверка на приглашение оболочки
for pattern in INPUT_PATTERNS["shell_prompt"]:
if re.search(pattern, text, re.MULTILINE):
return "prompt"
return None
async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> Tuple[str, bool]:
"""
Чтение вывода из SSH-процесса с таймаутом.
Возвращает (вывод, завершён_ли_процесс).
"""
output = ""
is_done = False
try:
# Используем readany() для чтения доступных данных
while True:
try:
# readany() читает любые доступные данные
data = await asyncio.wait_for(process.stdout.readany(), timeout=timeout)
if data:
if isinstance(data, bytes):
output += data.decode('utf-8', errors='replace')
else:
output += str(data)
logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}")
else:
# EOF
is_done = True
break
except asyncio.TimeoutError:
# Данные закончились
logger.debug(f"Timeout stdout, прочитано: {len(output)} байт")
if process.returncode is not None:
is_done = True
break
except UnicodeDecodeError as e:
logger.debug(f"Ошибка декодирования UTF-8: {e}")
continue
except Exception as e:
# Конец потока
logger.debug(f"Конец потока stdout: {type(e).__name__}: {e}")
is_done = True
break
except Exception as e:
logger.debug(f"Ошибка чтения SSH stdout: {e}")
is_done = True
# Читаем stderr если есть
error_output = ""
try:
while True:
try:
data = await asyncio.wait_for(process.stderr.readany(), timeout=0.5)
if data:
if isinstance(data, bytes):
error_output += data.decode('utf-8', errors='replace')
else:
error_output += str(data)
else:
break
except (asyncio.TimeoutError, Exception):
break
except Exception as e:
logger.debug(f"Ошибка чтения SSH stderr: {e}")
# Объединяем stdout и stderr
if error_output:
output = output + error_output if output else error_output
logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}")
return output, is_done
def read_pty_output(master_fd: int, timeout: float = 2.0) -> Tuple[str, bool]:
"""
Чтение вывода из PTY с таймаутом.
Возвращает (вывод, завершён_ли_процесс).
"""
output = ""
is_done = False
total_waited = 0
try:
# Устанавливаем non-blocking режим
flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
while total_waited < timeout:
try:
# Ждём данные с коротким таймаутом
ready, _, _ = select.select([master_fd], [], [], 0.2)
if ready:
try:
data = os.read(master_fd, 4096)
if data:
output += data.decode('utf-8', errors='replace')
logger.debug(f"Прочитано из PTY: {len(data)} байт")
# Сбрасываем таймер если есть данные
total_waited = 0
else:
is_done = True
break
except BlockingIOError:
# Нет данных, продолжаем ждать
pass
else:
# Timeout - проверяем не завершился ли процесс
try:
_, status = os.waitpid(-1, os.WNOHANG)
if status != 0:
logger.debug(f"Процесс завершился со статусом: {status}")
is_done = True
break
except ChildProcessError:
pass
# Если уже что-то прочитали и есть запрос ввода - выходим
if output and detect_input_type(output):
logger.debug(f"Обнаружен запрос ввода")
break
total_waited += 0.2
except Exception as e:
logger.debug(f"Ошибка при чтении PTY: {e}")
break
except Exception as e:
logger.debug(f"Ошибка чтения PTY: {e}")
is_done = True
logger.debug(f"read_pty_output: output={len(output)} байт, is_done={is_done}")
return output, is_done