telegram-cli-bot/bot/tools/ssh_tool.py

282 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
SSH Executor Tool - инструмент для выполнения команд на серверах по SSH.
Бот может использовать этот инструмент автономно для выполнения системных задач
на серверах пользователя.
Конфигурация серверов загружается из .env:
SERVERS=name|host|port|user|tag|password|...
"""
import logging
import asyncio
import os
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import asyncssh
from dotenv import load_dotenv
from bot.tools import BaseTool, ToolResult, register_tool
logger = logging.getLogger(__name__)
# Загрузка переменных окружения
load_dotenv()
@dataclass
class ServerConfig:
"""Конфигурация сервера для SSH."""
host: str
port: int
username: str
password: Optional[str] = None
client_keys: Optional[List[str]] = None
tags: List[str] = None
class SSHExecutorTool(BaseTool):
"""Инструмент для выполнения SSH-команд."""
name = "ssh_tool"
description = "Выполнение команд на удалённых серверах по SSH. Используется для системных задач: мониторинг, управление сервисами, просмотр логов."
category = "system"
def __init__(self):
# Загружаем серверы из .env
self.servers: Dict[str, ServerConfig] = {}
self._last_connection: Optional[asyncssh.SSHClientConnection] = None
self._last_server: Optional[str] = None
self._load_servers_from_env()
def _load_servers_from_env(self):
"""
Загрузить конфигурацию серверов из .env.
Формат в .env:
SERVERS=name|host|port|user|tag|password
Пример:
SERVERS=tomas|192.168.1.54|22|mirivlad|web|moloko22
"""
servers_str = os.getenv('SERVERS', '')
if not servers_str.strip():
logger.warning("SERVERS не найден в .env, SSH инструмент не будет работать")
return
# Парсим формат: name|host|port|user|tag|password
parts = servers_str.strip().split('|')
if len(parts) >= 6:
name, host, port, user, tag, password = parts[:6]
self.servers[name.strip()] = ServerConfig(
host=host.strip(),
port=int(port.strip()),
username=user.strip(),
tags=[tag.strip()] if tag.strip() else [],
password=password.strip() if password.strip() else None
)
logger.info(f"✅ Загружен сервер: {name} ({host}:{port})")
else:
logger.error(f"Неверный формат SERVERS в .env: {servers_str}")
logger.error("Ожидался формат: name|host|port|user|tag|password")
async def _connect(self, server_name: str = 'home') -> asyncssh.SSHClientConnection:
"""Подключиться к серверу."""
if server_name not in self.servers:
raise ValueError(f"Сервер '{server_name}' не найден. Доступные: {list(self.servers.keys())}")
config = self.servers[server_name]
# Проверяем существующее подключение
if self._last_connection and self._last_server == server_name:
if not self._last_connection.is_connected():
self._last_connection = None
else:
return self._last_connection
logger.info(f"Подключение к серверу {server_name} ({config.host})")
try:
# Пробуем подключение с паролем
connect_kwargs = {
'host': config.host,
'port': config.port,
'username': config.username,
'known_hosts': None, # Отключаем проверку known_hosts для простоты
}
if config.password:
connect_kwargs['password'] = config.password
if config.client_keys:
connect_kwargs['client_keys'] = config.client_keys
self._last_connection = await asyncssh.connect(**connect_kwargs)
self._last_server = server_name
logger.info(f"✅ Подключено к {server_name}")
return self._last_connection
except Exception as e:
logger.error(f"Ошибка подключения к {server_name}: {e}")
raise
async def execute_command(
self,
command: str,
server: str = 'home',
timeout: int = 30
) -> Dict[str, Any]:
"""
Выполнить команду на сервере.
Args:
command: Команда для выполнения
server: Имя сервера из конфигурации
timeout: Таймаут выполнения в секундах
Returns:
Dict с полями: stdout, stderr, returncode, exit_status
"""
try:
conn = await self._connect(server)
logger.info(f"Выполнение команды на {server}: {command}")
result = await asyncio.wait_for(
conn.run(command, check=False),
timeout=timeout
)
return {
'stdout': result.stdout.strip() if result.stdout else '',
'stderr': result.stderr.strip() if result.stderr else '',
'returncode': result.returncode,
'exit_status': result.exit_status,
'server': server,
'command': command
}
except asyncio.TimeoutError:
logger.error(f"Таймаут выполнения команды: {command}")
return {
'stdout': '',
'stderr': f'Таймаут выполнения команды ({timeout} сек)',
'returncode': -1,
'exit_status': 'timeout',
'server': server,
'command': command
}
except Exception as e:
logger.error(f"Ошибка выполнения команды: {e}")
return {
'stdout': '',
'stderr': str(e),
'returncode': -1,
'exit_status': 'error',
'server': server,
'command': command
}
async def execute(self, command: str, server: str = None, timeout: int = 30) -> ToolResult:
"""
Выполнить SSH-команду.
Args:
command: Команда для выполнения
server: Имя сервера (default: первый из .env)
timeout: Таймаут в секундах (default: 30)
"""
if not command or not command.strip():
return ToolResult(
success=False,
error="Пустая команда"
)
# Если сервер не указан - используем первый из конфигурации
if server is None:
if not self.servers:
return ToolResult(
success=False,
error="Серверы не настроены. Проверьте SERVERS в .env"
)
server = list(self.servers.keys())[0]
logger.info(f"Сервер не указан, используем первый: {server}")
logger.info(f"SSH Executor: server={server}, command={command[:100]}")
try:
result = await self.execute_command(command, server, timeout)
# Формируем красивый вывод
output = self._format_output(result)
return ToolResult(
success=result['returncode'] == 0,
data=result,
metadata={
'server': server,
'command': command,
'returncode': result['returncode']
}
)
except Exception as e:
logger.exception(f"Ошибка SSH Executor: {e}")
return ToolResult(
success=False,
error=str(e),
metadata={'server': server, 'command': command}
)
def _format_output(self, result: Dict[str, Any]) -> str:
"""Форматировать вывод команды."""
output = []
# Добавляем заголовок с сервером и командой
output.append(f"🖥️ **SSH: {result.get('server', 'unknown')}**")
output.append(f"**Команда:** `{result.get('command', '')}`\n")
if result['stdout']:
output.append(f"**Вывод:**\n```\n{result['stdout']}\n```")
elif result['returncode'] == 0:
output.append("**Вывод:**\n```\n(команда выполнена без вывода)\n```")
if result['stderr']:
output.append(f"**Ошибки:**\n```\n{result['stderr']}\n```")
output.append(f"\n**Код возврата:** `{result['returncode']}`")
return "\n".join(output)
def add_server(self, name: str, host: str, port: int, username: str,
password: Optional[str] = None, client_keys: Optional[List[str]] = None):
"""Добавить сервер в конфигурацию."""
self.servers[name] = ServerConfig(
host=host,
port=port,
username=username,
password=password,
client_keys=client_keys
)
logger.info(f"Добавлен сервер: {name} ({host})")
def list_servers(self) -> List[str]:
"""Получить список доступных серверов."""
return list(self.servers.keys())
# Автоматическая регистрация при импорте
@register_tool
class SSHExecutorToolAuto(SSHExecutorTool):
"""Авто-регистрируемая версия SSHExecutorTool."""
pass