telegram-cli-bot/bot/models/session.py

186 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Модели интерактивных сессий (SSH и локальные)."""
import os
import logging
from typing import Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncssh
logger = logging.getLogger(__name__)
# Импортируем Server из соседнего модуля
from bot.models.server import Server
@dataclass
class SSHSession:
"""Интерактивная SSH-сессия."""
user_id: int
server: Server
working_dir: str
conn: asyncssh.SSHClientConnection
process: asyncssh.SSHClientProcess
output_buffer: str = ""
waiting_for_input: bool = False
input_type: str = "" # "password", "confirm", "text"
last_activity: datetime = field(default_factory=datetime.now)
command: str = ""
SESSION_TIMEOUT = timedelta(minutes=5) # Таймаут неактивности
def is_expired(self) -> bool:
"""Проверка истечения таймаута сессии."""
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class SSHSessionManager:
"""Менеджер интерактивных SSH-сессий."""
def __init__(self):
self._sessions: Dict[int, SSHSession] = {}
def create_session(self, user_id: int, server: Server, working_dir: str,
conn: asyncssh.SSHClientConnection, process: asyncssh.SSHClientProcess,
command: str = "") -> SSHSession:
"""Создать новую сессию."""
session = SSHSession(
user_id=user_id,
server=server,
working_dir=working_dir,
conn=conn,
process=process,
command=command
)
self._sessions[user_id] = session
logger.info(f"Создана SSH-сессия для пользователя {user_id} на сервере {server.name}")
return session
def get_session(self, user_id: int) -> Optional[SSHSession]:
"""Получить сессию пользователя."""
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def close_session(self, user_id: int):
"""Закрыть сессию пользователя."""
session = self._sessions.pop(user_id, None)
if session:
try:
if session.process:
session.process.stdin.close()
session.process.stdout.feed_eof()
if session.conn:
session.conn.close()
logger.info(f"Закрыта SSH-сессия для пользователя {user_id}")
except Exception as e:
logger.warning(f"Ошибка при закрытии сессии: {e}")
def has_active_session(self, user_id: int) -> bool:
"""Проверка наличия активной сессии."""
return self.get_session(user_id) is not None
def cleanup_expired(self):
"""Очистка истёкших сессий."""
expired = [uid for uid, s in self._sessions.items() if s.is_expired()]
for uid in expired:
self.close_session(uid)
@dataclass
class LocalSession:
"""Интерактивная локальная сессия."""
user_id: int
command: str
master_fd: int
pid: int
output_buffer: str = ""
waiting_for_input: bool = False
input_type: str = ""
last_activity: datetime = field(default_factory=datetime.now)
context: Dict = field(default_factory=dict) # Для хранения pexpect child и другого
SESSION_TIMEOUT = timedelta(minutes=5)
def is_expired(self) -> bool:
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class LocalSessionManager:
"""Менеджер локальных интерактивных сессий."""
def __init__(self):
self._sessions: Dict[int, LocalSession] = {}
def create_session(self, user_id: int, command: str, master_fd: int, pid: int) -> LocalSession:
session = LocalSession(
user_id=user_id,
command=command,
master_fd=master_fd,
pid=pid
)
self._sessions[user_id] = session
logger.info(f"Создана локальная сессия для пользователя {user_id}")
return session
def get_session(self, user_id: int) -> Optional[LocalSession]:
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def close_session(self, user_id: int):
session = self._sessions.pop(user_id, None)
if session:
try:
# Закрываем pexpect процесс если есть
child = session.context.get('child') if session.context else None
if child:
child.close(force=True)
else:
# Старый способ для PTY
os.close(session.master_fd)
os.kill(session.pid, 9)
except:
pass
logger.info(f"Закрыта локальная сессия для пользователя {user_id}")
def has_active_session(self, user_id: int) -> bool:
return self.get_session(user_id) is not None
# Паттерны для детектирования запросов ввода
INPUT_PATTERNS = {
"password": [
r"[Pp]assword[:\s]*$",
r"[Pp]assphrase[:\s]*$",
r"Enter password[:\s]*$",
r"sudo password[:\s]*$",
r"\[sudo\] password for .*:",
r"[Пп]ароль[:\s]*$",
r"\[sudo\] пароль для .*:",
r"Введите пароль[:\s]*$",
],
"confirm": [
r"[Yy]es/[Nn]o[?:\s]*$",
r"\[?[Yy]\]?/?\[?[Nn]\]?",
r"Do you want to continue",
r"Continue\?",
r"Are you sure",
r"Is this OK",
r"[Yy]es or [Nn]o",
r"[Дд]а/[Нн]ет",
r"[Пп]родолжить",
],
"shell_prompt": [
r"[$#]\s*$",
r"[>$]\s*$",
r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+:.*[$#]\s*$",
],
}