telegram-cli-bot/bot/tools/ssh_tool.py

365 lines
16 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
SSH Executor Tool - инструмент для выполнения команд на серверах по SSH.
Бот может использовать этот инструмент автономно для выполнения системных задач
на серверах пользователя.
Конфигурация серверов загружается из .env:
SERVERS=name|host|port|user|tag|password|...
"""
import logging
import asyncio
import os
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import asyncssh
from dotenv import load_dotenv
from bot.tools import BaseTool, ToolResult, register_tool
logger = logging.getLogger(__name__)
# Загрузка переменных окружения
load_dotenv()
@dataclass
class ServerConfig:
"""Конфигурация сервера для SSH."""
host: str
port: int
username: str
password: Optional[str] = None
client_keys: Optional[List[str]] = None
tags: List[str] = None
class SSHExecutorTool(BaseTool):
"""Инструмент для выполнения SSH-команд."""
name = "ssh_tool"
description = "Выполнение команд на удалённых серверах по SSH. Используется для системных задач: мониторинг, управление сервисами, просмотр логов."
category = "system"
def __init__(self):
# Загружаем серверы из .env
self.servers: Dict[str, ServerConfig] = {}
self._last_connection: Optional[asyncssh.SSHClientConnection] = None
self._last_server: Optional[str] = None
self._load_servers_from_env()
def _load_servers_from_env(self):
"""
Загрузить конфигурацию серверов из .env.
Формат в .env:
SERVERS=name|host|port|user|tag|password
Пример:
SERVERS=tomas|192.168.1.51|22|mirivlad|web|moloko22
"""
servers_str = os.getenv('SERVERS', '')
if not servers_str.strip():
logger.warning("SERVERS не найден в .env, SSH инструмент не будет работать")
return
# Парсим формат: name|host|port|user|tag|password
parts = servers_str.strip().split('|')
if len(parts) >= 6:
name, host, port, user, tag, password = parts[:6]
self.servers[name.strip()] = ServerConfig(
host=host.strip(),
port=int(port.strip()),
username=user.strip(),
tags=[tag.strip()] if tag.strip() else [],
password=password.strip() if password.strip() else None
)
logger.info(f"✅ Загружен сервер: {name} ({host}:{port})")
else:
logger.error(f"Неверный формат SERVERS в .env: {servers_str}")
logger.error("Ожидался формат: name|host|port|user|tag|password")
async def _connect(self, server_name: str = 'home') -> asyncssh.SSHClientConnection:
"""Подключиться к серверу."""
logger.debug(f"🔍 [SSH._connect] Запрос подключения: server_name='{server_name}'")
logger.debug(f"🔍 [SSH._connect] Доступные серверы: {list(self.servers.keys())}")
if server_name not in self.servers:
logger.error(f"❌ [SSH._connect] Сервер '{server_name}' не найден!")
raise ValueError(f"Сервер '{server_name}' не найден. Доступные: {list(self.servers.keys())}")
config = self.servers[server_name]
logger.debug(f"🔍 [SSH._connect] Конфигурация сервера {server_name}:")
logger.debug(f" host={config.host}, port={config.port}, username={config.username}")
logger.debug(f" password={'***' if config.password else 'None'}, client_keys={config.client_keys}")
# Проверяем существующее подключение
logger.debug(f"🔍 [SSH._connect] Проверка существующего подключения:")
logger.debug(f" _last_connection={self._last_connection}")
logger.debug(f" _last_server={self._last_server}")
if self._last_connection and self._last_server == server_name:
logger.debug(f"🔍 [SSH._connect] Найдено существующее подключение, проверка статуса...")
try:
# Проверяем transport для проверки активности подключения
if self._last_connection.transport is None or not self._last_connection.transport.is_active():
logger.debug(f"⚠️ [SSH._connect] Подключение не активно, будет создано новое")
self._last_connection = None
else:
logger.debug(f"✅ [SSH._connect] Используем существующее активное подключение")
return self._last_connection
except Exception as e:
logger.debug(f"⚠️ [SSH._connect] Ошибка проверки подключения: {e}, создаём новое")
self._last_connection = None
else:
logger.debug(f" [SSH._connect] Существующего подключения нет, создаём новое")
logger.info(f"Подключение к серверу {server_name} ({config.host})")
try:
# Пробуем подключение с паролем
connect_kwargs = {
'host': config.host,
'port': config.port,
'username': config.username,
'known_hosts': None, # Отключаем проверку known_hosts для простоты
}
if config.password:
connect_kwargs['password'] = config.password
logger.debug(f"🔍 [SSH._connect] Используем парольную аутентификацию")
if config.client_keys:
connect_kwargs['client_keys'] = config.client_keys
logger.debug(f"🔍 [SSH._connect] Используем ключевую аутентификацию: {config.client_keys}")
logger.debug(f"🔍 [SSH._connect] Вызов asyncssh.connect с параметрами: {connect_kwargs.keys()}")
self._last_connection = await asyncssh.connect(**connect_kwargs)
self._last_server = server_name
logger.info(f"✅ Подключено к {server_name}")
logger.debug(f"🔍 [SSH._connect] Подключение успешно: {self._last_connection}")
return self._last_connection
except Exception as e:
logger.error(f"❌ [SSH._connect] Ошибка подключения к {server_name}: {e}")
logger.exception(f"🔍 [SSH._connect] Exception details:")
raise
async def execute_command(
self,
command: str,
server: str = 'home',
timeout: int = 30
) -> Dict[str, Any]:
"""
Выполнить команду на сервере.
Args:
command: Команда для выполнения
server: Имя сервера из конфигурации
timeout: Таймаут выполнения в секундах
Returns:
Dict с полями: stdout, stderr, returncode, exit_status
"""
logger.debug(f"🔍 [SSH.execute_command] START: server={server}, command={command[:50]}...")
try:
logger.debug(f"🔍 [SSH.execute_command] Вызов _connect(server='{server}')")
conn = await self._connect(server)
logger.debug(f"✅ [SSH.execute_command] Подключение успешно: {conn}")
logger.info(f"Выполнение команды на {server}: {command}")
logger.debug(f"🔍 [SSH.execute_command] Создание процесса с командой: {command}")
# Используем create_process для корректной работы с shell-командами
process = await conn.create_process(
command,
term_type='xterm-256color',
env={'LANG': 'C.UTF-8', 'LC_ALL': 'C.UTF-8'}
)
logger.debug(f"🔍 [SSH.execute_command] Процесс создан: {process}")
# Читаем вывод с таймаутом
output = ""
error_output = ""
try:
logger.debug(f"🔍 [SSH.execute_command] Чтение stdout (timeout={timeout})")
# Читаем stdout
stdout_data = await asyncio.wait_for(
process.stdout.read(),
timeout=timeout
)
output = stdout_data.strip() if stdout_data else ''
logger.debug(f"🔍 [SSH.execute_command] stdout получен: {len(output)} bytes")
# Читаем stderr
try:
logger.debug(f"🔍 [SSH.execute_command] Чтение stderr (timeout={timeout//2})")
stderr_data = await asyncio.wait_for(
process.stderr.read(),
timeout=timeout // 2
)
error_output = stderr_data.strip() if stderr_data else ''
logger.debug(f"🔍 [SSH.execute_command] stderr получен: {len(error_output)} bytes")
except asyncio.TimeoutError:
logger.debug(f"⚠️ [SSH.execute_command] Таймаут чтения stderr")
pass
except asyncio.TimeoutError:
logger.error(f"🔍 [SSH.execute_command] Таймаут выполнения команды: {command}")
return {
'stdout': '',
'stderr': f'Таймаут выполнения команды ({timeout} сек)',
'returncode': -1,
'exit_status': 'timeout',
'server': server,
'command': command
}
logger.debug(f"🔍 [SSH.execute_command] Ожидание завершения процесса (returncode)")
# Ждём завершения процесса и получаем код возврата
returncode = await process.wait()
logger.debug(f"✅ [SSH.execute_command] Процесс завершён, returncode={returncode}")
return {
'stdout': output,
'stderr': error_output,
'returncode': returncode,
'exit_status': returncode,
'server': server,
'command': command
}
except asyncio.TimeoutError:
logger.error(f"🔍 [SSH.execute_command] asyncio.TimeoutError: {command}")
logger.exception(f"🔍 [SSH.execute_command] Timeout details:")
return {
'stdout': '',
'stderr': f'Таймаут выполнения команды ({timeout} сек)',
'returncode': -1,
'exit_status': 'timeout',
'server': server,
'command': command
}
except Exception as e:
logger.error(f"❌ [SSH.execute_command] Ошибка выполнения команды: {e}")
logger.exception(f"🔍 [SSH.execute_command] Exception details:")
return {
'stdout': '',
'stderr': str(e),
'returncode': -1,
'exit_status': 'error',
'server': server,
'command': command
}
async def execute(self, command: str, server: str = None, timeout: int = 30) -> ToolResult:
"""
Выполнить SSH-команду.
Args:
command: Команда для выполнения
server: Имя сервера (default: первый из .env)
timeout: Таймаут в секундах (default: 30)
"""
logger.debug(f"🔍 [SSH.execute] ВЫЗОВ: command={command[:50]}..., server={server}, timeout={timeout}")
if not command or not command.strip():
logger.debug(f"⚠️ [SSH.execute] Пустая команда!")
return ToolResult(
success=False,
error="Пустая команда"
)
# Если сервер не указан - используем первый из конфигурации
if server is None:
if not self.servers:
logger.debug(f"⚠️ [SSH.execute] Серверы не настроены!")
return ToolResult(
success=False,
error="Серверы не настроены. Проверьте SERVERS в .env"
)
server = list(self.servers.keys())[0]
logger.info(f"Сервер не указан, используем первый: {server}")
logger.debug(f"🔍 [SSH.execute] Выбран сервер по умолчанию: {server}")
logger.info(f"SSH Executor: server={server}, command={command[:100]}")
logger.debug(f"🔍 [SSH.execute] Вызов execute_command(server={server}, command={command[:50]}...)")
try:
result = await self.execute_command(command, server, timeout)
logger.debug(f"🔍 [SSH.execute] Результат execute_command: returncode={result['returncode']}")
# Формируем красивый вывод
output = self._format_output(result)
logger.debug(f"🔍 [SSH.execute] Вывод сформирован: {len(output)} chars")
return ToolResult(
success=result['returncode'] == 0,
data=result,
metadata={
'server': server,
'command': command,
'returncode': result['returncode']
}
)
except Exception as e:
logger.exception(f"❌ [SSH.execute] Ошибка SSH Executor: {e}")
return ToolResult(
success=False,
error=str(e),
metadata={'server': server, 'command': command}
)
def _format_output(self, result: Dict[str, Any]) -> str:
"""Форматировать вывод команды."""
output = []
if result['stdout']:
output.append(f"**Вывод:**\n```\n{result['stdout']}\n```")
if result['stderr']:
output.append(f"**Ошибки:**\n```\n{result['stderr']}\n```")
if result['returncode'] != 0:
output.append(f"**Код возврата:** {result['returncode']}")
return "\n".join(output) if output else "Команда выполнена без вывода"
def add_server(self, name: str, host: str, port: int, username: str,
password: Optional[str] = None, client_keys: Optional[List[str]] = None):
"""Добавить сервер в конфигурацию."""
self.servers[name] = ServerConfig(
host=host,
port=port,
username=username,
password=password,
client_keys=client_keys
)
logger.info(f"Добавлен сервер: {name} ({host})")
def list_servers(self) -> List[str]:
"""Получить список доступных серверов."""
return list(self.servers.keys())
# Автоматическая регистрация при импорте
@register_tool
class SSHExecutorToolAuto(SSHExecutorTool):
"""Авто-регистрируемая версия SSHExecutorTool."""
pass