telegram-cli-bot/bot/utils/ssh_readers.py

170 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Утилиты для чтения вывода SSH и PTY."""
import asyncio
import fcntl
import logging
import os
import re
import select
from typing import Optional, Tuple
import asyncssh
logger = logging.getLogger(__name__)
# Импортируем паттерны из session
from bot.models.session import INPUT_PATTERNS
def detect_input_type(text: str) -> Optional[str]:
"""Определить тип запроса ввода по тексту."""
text = text.strip()
# Проверка на пароль
for pattern in INPUT_PATTERNS["password"]:
if re.search(pattern, text, re.MULTILINE):
return "password"
# Проверка на подтверждение
for pattern in INPUT_PATTERNS["confirm"]:
if re.search(pattern, text, re.MULTILINE):
return "confirm"
# Проверка на приглашение оболочки
for pattern in INPUT_PATTERNS["shell_prompt"]:
if re.search(pattern, text, re.MULTILINE):
return "prompt"
return None
async def read_ssh_output(process: asyncssh.SSHClientProcess, timeout: float = 2.0) -> Tuple[str, bool]:
"""
Чтение вывода из SSH-процесса с таймаутом.
Возвращает (вывод, завершён_ли_процесс).
"""
output = ""
is_done = False
try:
# Используем readany() для чтения доступных данных
while True:
try:
# readany() читает любые доступные данные
data = await asyncio.wait_for(process.stdout.readany(), timeout=timeout)
if data:
if isinstance(data, bytes):
output += data.decode('utf-8', errors='replace')
else:
output += str(data)
logger.debug(f"Прочитано stdout: {len(data)} байт, всего: {len(output)}")
else:
# EOF
is_done = True
break
except asyncio.TimeoutError:
# Данные закончились
logger.debug(f"Timeout stdout, прочитано: {len(output)} байт")
if process.returncode is not None:
is_done = True
break
except UnicodeDecodeError as e:
logger.debug(f"Ошибка декодирования UTF-8: {e}")
continue
except Exception as e:
# Конец потока
logger.debug(f"Конец потока stdout: {type(e).__name__}: {e}")
is_done = True
break
except Exception as e:
logger.debug(f"Ошибка чтения SSH stdout: {e}")
is_done = True
# Читаем stderr если есть
error_output = ""
try:
while True:
try:
data = await asyncio.wait_for(process.stderr.readany(), timeout=0.5)
if data:
if isinstance(data, bytes):
error_output += data.decode('utf-8', errors='replace')
else:
error_output += str(data)
else:
break
except (asyncio.TimeoutError, Exception):
break
except Exception as e:
logger.debug(f"Ошибка чтения SSH stderr: {e}")
# Объединяем stdout и stderr
if error_output:
output = output + error_output if output else error_output
logger.debug(f"read_ssh_output: output={len(output)} байт, is_done={is_done}, returncode={process.returncode}")
return output, is_done
def read_pty_output(master_fd: int, timeout: float = 2.0) -> Tuple[str, bool]:
"""
Чтение вывода из PTY с таймаутом.
Возвращает (вывод, завершён_ли_процесс).
"""
output = ""
is_done = False
total_waited = 0
try:
# Устанавливаем non-blocking режим
flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
while total_waited < timeout:
try:
# Ждём данные с коротким таймаутом
ready, _, _ = select.select([master_fd], [], [], 0.2)
if ready:
try:
data = os.read(master_fd, 4096)
if data:
output += data.decode('utf-8', errors='replace')
logger.debug(f"Прочитано из PTY: {len(data)} байт")
# Сбрасываем таймер если есть данные
total_waited = 0
else:
is_done = True
break
except BlockingIOError:
# Нет данных, продолжаем ждать
pass
else:
# Timeout - проверяем не завершился ли процесс
try:
_, status = os.waitpid(-1, os.WNOHANG)
if status != 0:
logger.debug(f"Процесс завершился со статусом: {status}")
is_done = True
break
except ChildProcessError:
pass
# Если уже что-то прочитали и есть запрос ввода - выходим
if output and detect_input_type(output):
logger.debug(f"Обнаружен запрос ввода")
break
total_waited += 0.2
except Exception as e:
logger.debug(f"Ошибка при чтении PTY: {e}")
break
except Exception as e:
logger.debug(f"Ошибка чтения PTY: {e}")
is_done = True
logger.debug(f"read_pty_output: output={len(output)} байт, is_done={is_done}")
return output, is_done