#!/usr/bin/env python3 """Модели интерактивных сессий (SSH и локальные).""" import os import logging from typing import Dict, Optional from dataclasses import dataclass, field from datetime import datetime, timedelta import asyncssh logger = logging.getLogger(__name__) # Импортируем Server из соседнего модуля from bot.models.server import Server @dataclass class SSHSession: """Интерактивная SSH-сессия.""" user_id: int server: Server working_dir: str conn: asyncssh.SSHClientConnection process: asyncssh.SSHClientProcess output_buffer: str = "" waiting_for_input: bool = False input_type: str = "" # "password", "confirm", "text" last_activity: datetime = field(default_factory=datetime.now) command: str = "" SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности def is_expired(self) -> bool: """Проверка истечения таймаута сессии.""" return datetime.now() - self.last_activity > self.SESSION_TIMEOUT class SSHSessionManager: """Менеджер интерактивных SSH-сессий.""" def __init__(self): self._sessions: Dict[int, SSHSession] = {} def create_session(self, user_id: int, server: Server, working_dir: str, conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess, command: str = "") -> SSHSession: """Создать новую сессию.""" session = SSHSession( user_id=user_id, server=server, working_dir=working_dir, conn=conn, process=process, command=command ) self._sessions[user_id] = session logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}") return session def get_session(self, user_id: int) -> Optional[SSHSession]: """Получить сессию пользователя.""" session = self._sessions.get(user_id) if session and session.is_expired(): self.close_session(user_id) return None return session def close_session(self, user_id: int): """Закрыть сессию пользователя.""" session = self._sessions.pop(user_id, None) if session: try: if session.process: session.process.stdin.close() session.process.stdout.feed_eof() if session.conn: session.conn.close() logger.info(f"Закрыта SSH-сессия для пользователя {user_id}") except Exception as e: logger.warning(f"Ошибка при закрытии сессии: {e}") def has_active_session(self, user_id: int) -> bool: """Проверка наличия активной сессии.""" return self.get_session(user_id) is not None def cleanup_expired(self): """Очистка истёкших сессий.""" expired = [uid for uid, s in self._sessions.items() if s.is_expired()] for uid in expired: self.close_session(uid) @dataclass class LocalSession: """Интерактивная локальная сессия.""" user_id: int command: str master_fd: int pid: int output_buffer: str = "" waiting_for_input: bool = False input_type: str = "" last_activity: datetime = field(default_factory=datetime.now) context: Dict = field(default_factory=dict) # Для хранения pexpect child и другого SESSION_TIMEOUT = timedelta(minutes=5) def is_expired(self) -> bool: return datetime.now() - self.last_activity > self.SESSION_TIMEOUT class LocalSessionManager: """Менеджер локальных интерактивных сессий.""" def __init__(self): self._sessions: Dict[int, LocalSession] = {} def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession: session = LocalSession( user_id=user_id, command=command, master_fd=master_fd, pid=pid ) self._sessions[user_id] = session logger.info(f"Создана локальная сессия для пользователя {user_id}") return session def get_session(self, user_id: int) -> Optional[LocalSession]: session = self._sessions.get(user_id) if session and session.is_expired(): self.close_session(user_id) return None return session def close_session(self, user_id: int): session = self._sessions.pop(user_id, None) if session: try: # Закрываем pexpect процесс если есть child = session.context.get('child') if session.context else None if child: child.close(force=True) else: # Старый способ для PTY os.close(session.master_fd) os.kill(session.pid, 9) except: pass logger.info(f"Закрыта локальная сессия для пользователя {user_id}") def has_active_session(self, user_id: int) -> bool: return self.get_session(user_id) is not None # Паттерны для детектирования запросов ввода INPUT_PATTERNS = { "password": [ r"[Pp]assword[:\s]*$", r"[Pp]assphrase[:\s]*$", r"Enter password[:\s]*$", r"sudo password[:\s]*$", r"\[sudo\] password for .*:", r"[Пп]ароль[:\s]*$", r"\[sudo\] пароль для .*:", r"Введите пароль[:\s]*$", ], "confirm": [ r"[Yy]es/[Nn]o[?:\s]*$", r"\[?[Yy]\]?/?\[?[Nn]\]?", r"Do you want to continue", r"Continue\?", r"Are you sure", r"Is this OK", r"[Yy]es or [Nn]o", r"[Дд]а/[Нн]ет", r"[Пп]родолжить", ], "shell_prompt": [ r"[$#]\s*$", r"[>$]\s*$", r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$", ], } # Глобальные менеджеры сессий ssh_session_manager = SSHSessionManager() local_session_manager = LocalSessionManager()