telegram-cli-bot/bot.py

1124 lines
45 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Telegram CLI Bot - бот для выполнения CLI команд с многоуровневым меню.
Легкое добавление новых команд через регистрацию хендлеров.
"""
import os
import sys
import asyncio
import subprocess
import logging
from pathlib import Path
from typing import Optional, Callable, Dict, Any, List
from dataclasses import dataclass, field
from functools import wraps
import asyncssh
from dotenv import load_dotenv
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
MessageHandler,
ContextTypes,
filters,
)
# Загрузка переменных окружения из .env
load_dotenv()
# --- Конфигурация ---
BASE_DIR = Path(__file__).parent
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
handlers=[
logging.FileHandler(BASE_DIR / "bot.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# --- Конфигурация бота из переменных окружения ---
class BotConfig:
"""Конфигурация бота из переменных окружения."""
def __init__(self):
self.name = os.getenv("BOT_NAME", "CLI Assistant")
self.description = os.getenv("BOT_DESCRIPTION", "Бот для выполнения CLI команд")
self.icon = os.getenv("BOT_ICON_EMOJI", "🤖")
self.working_directory = os.getenv("WORKING_DIRECTORY", str(Path.home()))
# Парсинг списка разрешённых пользователей
allowed_users_str = os.getenv("ALLOWED_USERS", "")
if allowed_users_str.strip():
self.allowed_users = [
int(uid.strip())
for uid in allowed_users_str.split(",")
if uid.strip().isdigit()
]
else:
self.allowed_users = []
@property
def is_access_restricted(self) -> bool:
"""Проверка: ограничен ли доступ."""
return len(self.allowed_users) > 0
# --- Серверы ---
@dataclass
class Server:
"""Конфигурация сервера."""
name: str
host: str
port: int
user: str
tags: List[str] = field(default_factory=list)
@property
def display_name(self) -> str:
"""Отображаемое имя с иконкой."""
icon = "🖥️"
if "local" in self.tags:
icon = "💻"
elif "db" in self.tags:
icon = "🗄️"
elif "web" in self.tags:
icon = "🌐"
return f"{icon} {self.name}"
@property
def description(self) -> str:
"""Краткое описание сервера."""
return f"{self.user}@{self.host}:{self.port}"
class ServerManager:
"""Управление серверами."""
def __init__(self):
self._servers: Dict[str, Server] = {}
self._default_server: str = "local"
self._ssh_key_path: Optional[str] = None
# Локальный сервер всегда доступен
self._servers["local"] = Server(
name="local",
host="localhost",
port=22,
user=os.getenv("USER", "user"),
tags=["local", "dev"]
)
def load_from_env(self):
"""Загрузка серверов из переменных окружения."""
self._ssh_key_path = os.getenv("SSH_KEY_PATH")
self._default_server = os.getenv("DEFAULT_SERVER", "local")
servers_str = os.getenv("SERVERS", "")
if not servers_str.strip():
return
# Парсинг формата: name|host|port|user|tags,name|host|port|user|tags
parts = servers_str.split(",")
i = 0
while i < len(parts):
if i + 4 >= len(parts):
break
# Проверяем, не является ли текущая часть тегом (содержит ли =)
if "=" in parts[i]:
i += 1
continue
name = parts[i].strip()
host = parts[i + 1].strip()
port_str = parts[i + 2].strip()
user = parts[i + 3].strip()
# Теги могут быть в следующей части или отсутствовать
tags = []
next_idx = i + 4
if next_idx < len(parts):
next_part = parts[next_idx].strip()
# Если следующая часть не похожа на имя сервера (содержит только буквы и дефисы)
# и не похожа на host (не содержит точек)
if "|" in next_part or (next_part and not next_part.replace("-", "").replace("_", "").isalnum()):
# Это теги
tags = [t.strip() for t in next_part.split("|") if t.strip()]
i += 5
else:
i += 4
else:
i += 4
try:
port = int(port_str)
server = Server(name=name, host=host, port=port, user=user, tags=tags)
self._servers[name] = server
logger.info(f"Загружен сервер: {server.display_name} ({server.description})")
except ValueError as e:
logger.warning(f"Ошибка парсинга сервера: {parts[i:i+4]} - {e}")
def get(self, name: str) -> Optional[Server]:
"""Получить сервер по имени."""
return self._servers.get(name)
def list_servers(self) -> List[Server]:
"""Список всех серверов."""
return list(self._servers.values())
def get_by_tags(self, tags: List[str]) -> List[Server]:
"""Получить серверы по тегам."""
result = []
for server in self._servers.values():
if any(tag in server.tags for tag in tags):
result.append(server)
return result
@property
def default_server(self) -> str:
"""Имя сервера по умолчанию."""
return self._default_server
@property
def ssh_key_path(self) -> Optional[str]:
"""Путь к SSH ключу."""
return self._ssh_key_path
def get_keyboard(self, exclude_local: bool = False) -> InlineKeyboardMarkup:
"""Создать клавиатуру с выбором сервера."""
keyboard = []
for server in self._servers.values():
if exclude_local and server.name == "local":
continue
button = InlineKeyboardButton(
server.display_name,
callback_data=f"server_select_{server.name}"
)
keyboard.append([button])
return InlineKeyboardMarkup(keyboard)
# --- Хранилище состояний пользователя ---
@dataclass
class UserState:
"""Состояние пользователя в диалоге."""
current_menu: str = "main"
waiting_for_input: bool = False
input_type: Optional[str] = None
parent_menu: Optional[str] = None
context: Dict[str, Any] = field(default_factory=dict)
working_directory: Optional[str] = None
current_server: str = "local" # Имя текущего сервера
class StateManager:
"""Управление состояниями пользователей."""
def __init__(self):
self._states: Dict[int, UserState] = {}
def get(self, user_id: int) -> UserState:
if user_id not in self._states:
self._states[user_id] = UserState()
return self._states[user_id]
def reset(self, user_id: int):
self._states[user_id] = UserState()
# --- Система команд ---
@dataclass
class MenuItem:
"""Элемент меню."""
label: str
callback: str # callback_data для кнопки
description: str = ""
icon: str = ""
children: List["MenuItem"] = field(default_factory=list)
command: Optional[str] = None # CLI команда для выполнения
is_command: bool = False
class MenuBuilder:
"""Построитель многоуровневого меню."""
def __init__(self):
self._menus: Dict[str, List[MenuItem]] = {}
def add_menu(self, menu_name: str, items: List[MenuItem]):
self._menus[menu_name] = items
def get_menu(self, menu_name: str) -> List[MenuItem]:
return self._menus.get(menu_name, [])
def get_keyboard(self, menu_name: str) -> InlineKeyboardMarkup:
"""Создает InlineKeyboard для меню."""
items = self._menus.get(menu_name, [])
keyboard = []
for item in items:
icon = item.icon + " " if item.icon else ""
button = InlineKeyboardButton(
f"{icon}{item.label}",
callback_data=item.callback
)
keyboard.append([button])
return InlineKeyboardMarkup(keyboard)
class CommandRegistry:
"""Реестр команд для легкого добавления."""
def __init__(self):
self._commands: Dict[str, Callable] = {}
def register(self, name: str):
"""Декоратор для регистрации команды."""
def decorator(func: Callable):
self._commands[name] = func
return func
return decorator
def get(self, name: str) -> Optional[Callable]:
return self._commands.get(name)
def list_commands(self) -> List[str]:
return list(self._commands.keys())
# --- Глобальные объекты ---
config = BotConfig()
state_manager = StateManager()
menu_builder = MenuBuilder()
command_registry = CommandRegistry()
server_manager = ServerManager()
# --- Проверка прав доступа ---
def check_access(func):
"""Декоратор для проверки прав доступа пользователя."""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
# Если доступ не ограничен — пропускаем всех
if not config.is_access_restricted:
return await func(update, context, *args, **kwargs)
if user_id not in config.allowed_users:
logger.warning(f"Попытка доступа от запрещённого пользователя {user_id}")
await update.message.reply_text(
"❌ *Доступ запрещён*\n\n"
"Ваш ID не добавлен в список разрешённых пользователей.\n"
f"Ваш ID: `{user_id}`",
parse_mode="Markdown"
)
return
return await func(update, context, *args, **kwargs)
return wrapper
# --- Инициализация меню ---
def init_menus():
"""Инициализация структуры меню."""
# Главное меню
main_menu = [
MenuItem("🖥️ Выбор сервера", "server_menu", icon="🖥️"),
MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"),
MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"),
MenuItem(" О боте", "about", icon=""),
]
menu_builder.add_menu("main", main_menu)
# Меню серверов
server_menu = [
MenuItem("💻 local (localhost)", "server_select_local", icon="💻"),
MenuItem(" Добавить сервер", "server_add", icon=""),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("server", server_menu)
# Меню предустановленных команд
preset_menu = [
MenuItem("📁 Файловая система", "fs_menu", icon="📁"),
MenuItem("🔍 Поиск", "search_menu", icon="🔍"),
MenuItem("📊 Система", "system_menu", icon="📊"),
MenuItem("🌐 Сеть", "network_menu", icon="🌐"),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("preset", preset_menu)
# Файловая система
fs_menu = [
MenuItem("ls -la", "cmd_ls_la", command="ls -la", icon="📄"),
MenuItem("pwd", "cmd_pwd", command="pwd", icon="📍"),
MenuItem("df -h", "cmd_df", command="df -h", icon="💾"),
MenuItem("du -sh *", "cmd_du", command="du -sh * 2>/dev/null | sort -hr | head -20", icon="📊"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("fs", fs_menu)
# Поиск
search_menu = [
MenuItem("find . -name", "cmd_find_name", command="find . -maxdepth 3 -name '*.txt' 2>/dev/null", icon="🔎"),
MenuItem("grep пример", "cmd_grep", command="grep -r 'example' . 2>/dev/null | head -20", icon="🔍"),
MenuItem("which command", "cmd_which", command="which python3 bash git", icon="📍"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("search", search_menu)
# Система
system_menu = [
MenuItem("top -n 1", "cmd_top", command="top -bn1 | head -20", icon="📈"),
MenuItem("ps aux", "cmd_ps", command="ps aux | head -20", icon="🔄"),
MenuItem("free -h", "cmd_free", command="free -h", icon="💾"),
MenuItem("uname -a", "cmd_uname", command="uname -a", icon=""),
MenuItem("uptime", "cmd_uptime", command="uptime", icon="⏱️"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("system", system_menu)
# Сеть
network_menu = [
MenuItem("ip addr", "cmd_ip", command="ip addr 2>/dev/null || ifconfig 2>/dev/null", icon="🌐"),
MenuItem("ping google", "cmd_ping", command="ping -c 4 google.com 2>&1 | head -10", icon="📡"),
MenuItem("netstat", "cmd_netstat", command="ss -tuln 2>/dev/null || netstat -tuln 2>/dev/null | head -20", icon="🔌"),
MenuItem("curl ifconfig.me", "cmd_curl_ip", command="curl -s ifconfig.me 2>&1", icon="📍"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("network", network_menu)
# Настройки
settings_menu = [
MenuItem("📝 Изменить имя бота", "set_name", icon="📝"),
MenuItem("📄 Изменить описание", "set_description", icon="📄"),
MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"),
MenuItem("👥 Управление доступом", "access_menu", icon="👥"),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("settings", settings_menu)
# Доступ
access_menu = [
MenuItem("📋 Показать разрешённых", "show_access", icon="📋"),
MenuItem(" Добавить пользователя", "add_access", icon=""),
MenuItem(" Удалить пользователя", "remove_access", icon=""),
MenuItem("⬅️ Назад", "settings", icon="⬅️"),
]
menu_builder.add_menu("access", access_menu)
# --- Хендлеры ---
@check_access
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /start."""
user = update.effective_user
logger.info(f"Пользователь {user.username} ({user.id}) запустил бота")
state_manager.reset(user.id)
# Показать текущую директорию и сервер
working_dir = config.working_directory
server = server_manager.get("local")
server_desc = server.description if server else "localhost"
await update.message.reply_text(
f"👋 Привет, {user.first_name}!\n\n"
f"{config.icon} *{config.name}*\n"
f"_{config.description}_\n\n"
f"*Просто отправьте CLI команду в чат* — я её выполню!\n\n"
f"🖥️ *Текущий сервер:* `{server_desc}`\n"
f"📁 *Рабочая директория:* `{working_dir}`\n\n"
f"Используйте `cd путь` для смены директории.\n"
f"Или выберите сервер в меню.\n"
f"Команда /help покажет справку.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
@check_access
async def menu_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /menu - показывает главное меню."""
user = update.effective_user
state = state_manager.get(user.id)
# Сброс состояния и возврат к главному меню
state_manager.reset(user.id)
state.current_menu = "main"
# Показать текущую директорию и сервер
working_dir = state.working_directory or config.working_directory
server = server_manager.get(state.current_server)
server_desc = server.description if server else state.current_server
await update.message.reply_text(
f"🏠 *Главное меню*\n\n"
f"🖥️ *Сервер:* `{server_desc}`\n"
f"📁 *Директория:* `{working_dir}`\n\n"
f"Выберите действие:",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
@check_access
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /help."""
help_text = f"""
📖 *Справка по боту {config.name}*
*Как использовать:*
Просто отправьте любую CLI команду в чат — бот выполнит её!
*Примеры:*
• `ls -la` — список файлов
• `pwd` — текущая директория
• `df -h` — свободное место на диске
• `git status` — статус git
*Навигация по директориям:*
• `cd путь` — сменить директорию (например, `cd git/project`)
• `cd ..` — на уровень вверх
• `cd ~` — в домашнюю директорию
• `pwd` — показать текущую директорию
*Кнопки меню:*
• 📋 Предустановленные команды — быстрые команды по категориям
• ⚙️ Настройки бота — изменение имени, описания, иконки
О боте — информация
*Команды управления:*
/start — Запустить бота, главное меню
/menu — Показать главное меню с кнопками
/help — Эта справка
/settings — Настройки
*Безопасность:*
Команды выполняются от вашего имени.
Будьте осторожны с деструктивными командами!
"""
await update.message.reply_text(help_text, parse_mode="Markdown")
@check_access
async def settings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /settings."""
state = state_manager.get(update.effective_user.id)
state.current_menu = "settings"
await update.message.reply_text(
"⚙️ *Настройки бота*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка нажатий на кнопки меню."""
query = update.callback_query
user_id = query.from_user.id
state = state_manager.get(user_id)
await query.answer()
callback = query.data
logger.info(f"Callback: {callback} от пользователя {user_id}")
# Обработка навигации
if callback == "main":
state.current_menu = "main"
await query.edit_message_text(
"🏠 *Главное меню*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
elif callback == "preset_menu":
state.current_menu = "preset"
await query.edit_message_text(
"📋 *Предустановленные команды*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("preset")
)
elif callback == "fs_menu":
await query.edit_message_text(
"📁 *Файловая система*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("fs")
)
elif callback == "search_menu":
await query.edit_message_text(
"🔍 *Поиск*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("search")
)
elif callback == "system_menu":
await query.edit_message_text(
"📊 *Система*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("system")
)
elif callback == "network_menu":
await query.edit_message_text(
"🌐 *Сеть*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("network")
)
elif callback == "server_menu":
# Динамическое обновление меню серверов
servers = server_manager.list_servers()
keyboard = []
for srv in servers:
keyboard.append([InlineKeyboardButton(
srv.display_name,
callback_data=f"server_select_{srv.name}"
)])
keyboard.append([InlineKeyboardButton("⬅️ Назад", callback_data="main")])
state.current_menu = "server"
await query.edit_message_text(
"🖥️ *Выберите сервер:*\n\n"
"Команды будут выполняться на выбранном сервере через SSH.",
parse_mode="Markdown",
reply_markup=InlineKeyboardMarkup(keyboard)
)
elif callback == "server_add":
await query.edit_message_text(
" *Добавление сервера*\n\n"
"Для добавления сервера отредактируйте `.env`:\n"
"```\nSERVERS=name|host|port|user|tags\n```\n"
"Пример:\n"
"```\nSERVERS=web-prod|192.168.1.10|22|root|web,prod\n```\n"
"После изменения перезапустите бота.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("server")
)
elif callback.startswith("server_select_"):
server_name = callback.replace("server_select_", "")
server = server_manager.get(server_name)
if server:
state.current_server = server_name
# Сброс рабочей директории при смене сервера
state.working_directory = None
await query.edit_message_text(
f"✅ *Сервер изменён*\n\n"
f"{server.display_name}\n"
f"📍 `{server.description}`\n\n"
f"Теперь команды выполняются на этом сервере.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
state.current_menu = "main"
else:
await query.edit_message_text(
f"❌ *Сервер не найден*\n\n"
f"Сервер `{server_name}` отсутствует в конфигурации.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("server")
)
elif callback == "settings_menu":
state.current_menu = "settings"
await query.edit_message_text(
"⚙️ *Настройки бота*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "access_menu":
await query.edit_message_text(
"👥 *Управление доступом*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("access")
)
# Обработка команд выполнения
elif callback.startswith("cmd_"):
# Поиск команды в меню
command = None
for menu_items in menu_builder._menus.values():
for item in menu_items:
if item.callback == callback and item.command:
command = item.command
break
if command:
await execute_cli_command(query, command)
else:
await query.edit_message_text("❌ Команда не найдена")
# Настройки бота - только просмотр, изменение через .env
elif callback == "set_name":
await query.edit_message_text(
"📝 *Изменение имени бота*\n\n"
f"Текущее имя: `{config.name}`\n\n"
"Для изменения отредактируйте `.env`:\n"
"```\nBOT_NAME=Ваше имя\n```",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "set_description":
await query.edit_message_text(
"📄 *Изменение описания бота*\n\n"
f"Текущее описание: `{config.description}`\n\n"
"Для изменения отредактируйте `.env`:\n"
"```\nBOT_DESCRIPTION=Ваше описание\n```",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "set_icon":
await query.edit_message_text(
"🎨 *Изменение иконки бота*\n\n"
f"Текущая иконка: `{config.icon}`\n\n"
"Для изменения отредактируйте `.env`:\n"
"```\nBOT_ICON_EMOJI=🤖\n```",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "show_access":
if config.allowed_users:
text = "👥 *Разрешённые пользователи:*\n" + "\n".join(f"• `{uid}`" for uid in config.allowed_users)
else:
text = "👥 *Доступ открыт для всех*\n\n(список разрешённых пользователей пуст)"
await query.edit_message_text(text, parse_mode="Markdown")
elif callback == "add_access":
await query.edit_message_text(
" *Добавление пользователя*\n\n"
"Для добавления пользователя отредактируйте `.env`:\n"
"```\nALLOWED_USERS=123456789,987654321\n```\n"
"Ваш ID можно узнать через @userinfobot",
parse_mode="Markdown"
)
elif callback == "remove_access":
if config.allowed_users:
text = " *Удаление пользователя*\n\n" + "\n".join(f"• `{uid}`" for uid in config.allowed_users)
text += "\n\nУдалите ID из `.env` чтобы убрать доступ"
else:
text = " Список пуст, некого удалять"
await query.edit_message_text(text, parse_mode="Markdown")
elif callback == "about":
await query.edit_message_text(
f" *О боте*\n\n"
f"*{config.icon} {config.name}*\n"
f"_{config.description}_\n\n"
f"Версия: 1.0.0\n"
f"Рабочая директория: `{config.working_directory}`\n\n"
f"Бот позволяет выполнять CLI команды на вашем ПК\n"
f"через интерфейс Telegram.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
state.current_menu = "main"
async def execute_cli_command(query, command: str):
"""Выполнение CLI команды из кнопки меню."""
user_id = query.from_user.id
state = state_manager.get(user_id)
server_name = state.current_server
server = server_manager.get(server_name)
# Определяем рабочую директорию
working_dir = state.working_directory or config.working_directory
logger.info(f"Выполнение команды: {command} на сервере: {server_name}, в директории: {working_dir}")
await query.edit_message_text(
f"⏳ *Выполнение...*\n"
f"🖥️ `{server_name}`\n"
f"```\n{command}\n```",
parse_mode="Markdown"
)
# Если локальный сервер — выполняем локально
if server_name == "local" or server is None:
await _execute_local_command(query, command, working_dir)
else:
# Выполняем через SSH
await _execute_ssh_command(query, command, server, working_dir)
async def _execute_local_command(query, command: str, working_dir: str):
"""Выполнение локальной команды."""
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=working_dir
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=30
)
await _show_result(query, command, stdout, stderr, process.returncode)
except asyncio.TimeoutError:
await query.edit_message_text(
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
parse_mode="Markdown"
)
except Exception as e:
logger.error(f"Ошибка выполнения команды: {e}")
await query.edit_message_text(
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
parse_mode="Markdown"
)
async def _execute_ssh_command(query, command: str, server: Server, working_dir: str):
"""Выполнение команды через SSH."""
try:
# Подготовка SSH ключа
client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None
# Подключение к серверу
async with asyncssh.connect(
host=server.host,
port=server.port,
username=server.user,
client_keys=client_keys,
known_hosts=None # Отключаем проверку known_hosts для простоты
) as conn:
# Выполнение команды
result = await conn.run(
command,
cwd=working_dir,
timeout=30
)
await _show_result(query, command, result.stdout.encode(), result.stderr.encode(), 0)
except asyncssh.Error as e:
logger.error(f"SSH ошибка: {e}")
await query.edit_message_text(
f"❌ *SSH ошибка:*\n```\n{str(e)}\n```",
parse_mode="Markdown"
)
except asyncio.TimeoutError:
await query.edit_message_text(
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
parse_mode="Markdown"
)
except Exception as e:
logger.error(f"Ошибка выполнения команды: {e}")
await query.edit_message_text(
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
parse_mode="Markdown"
)
async def _show_result(query, command: str, stdout: bytes, stderr: bytes, returncode: int):
"""Показ результата выполнения команды."""
output = stdout.decode("utf-8", errors="replace")
error = stderr.decode("utf-8", errors="replace")
result = f"✅ *Результат:*\n\n"
result += f"```\n{command}\n```\n\n"
if output:
if len(output) > 4000:
output = output[:4000] + "\n... (вывод обрезан)"
result += f"*Вывод:*\n```\n{output}\n```\n"
if error:
if len(error) > 4000:
error = error[:4000] + "\n... (вывод обрезан)"
result += f"*Ошибки:*\n```\n{error}\n```\n"
result += f"\n*Код возврата:* `{returncode}`"
await query.edit_message_text(result, parse_mode="Markdown")
@check_access
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка текстовых сообщений как CLI команд."""
user_id = update.effective_user.id
text = update.message.text.strip()
state = state_manager.get(user_id)
# Любое текстовое сообщение = CLI команда
logger.info(f"Пользователь {user_id} отправил команду: {text}")
await update.message.reply_text(
f"⏳ *Выполнение...*\n\n`{text}`",
parse_mode="Markdown"
)
await execute_cli_command_from_message(update, text)
async def execute_cli_command_from_message(update: Update, command: str):
"""Выполнение CLI команды из сообщения."""
user_id = update.effective_user.id
state = state_manager.get(user_id)
server_name = state.current_server
server = server_manager.get(server_name)
# Определяем рабочую директорию
working_dir = state.working_directory or config.working_directory
# Обработка команды cd - меняем директорию пользователя
# Работает только с простыми командами cd, не с составными
cmd_stripped = command.strip()
if cmd_stripped.startswith("cd ") and "&&" not in cmd_stripped and ";" not in cmd_stripped and "|" not in cmd_stripped:
parts = cmd_stripped.split(maxsplit=1)
if len(parts) == 2:
target_dir = parts[1]
# Обработка ~ и относительных путей
if target_dir.startswith("~"):
target_dir = str(Path.home()) + target_dir[1:]
elif not target_dir.startswith("/"):
target_dir = str(Path(working_dir) / target_dir)
# Проверка существования директории
if Path(target_dir).is_dir():
state.working_directory = target_dir
await update.message.reply_text(
f"📁 *Директория изменена:*\n`{target_dir}`\n"
f"🖥️ Сервер: `{server_name}`",
parse_mode="Markdown"
)
else:
await update.message.reply_text(
f"❌ *Директория не найдена:*\n`{target_dir}`",
parse_mode="Markdown"
)
return
# Для составных команд с cd — выполняем через SSH или локально
if "cd " in cmd_stripped and ("&&" in cmd_stripped or ";" in cmd_stripped):
if server_name == "local" or server is None:
await _execute_composite_command_local(update, cmd_stripped, working_dir)
else:
await _execute_composite_command_ssh(update, cmd_stripped, server, working_dir)
return
# Обычное выполнение
if server_name == "local" or server is None:
await _execute_local_command_message(update, cmd_stripped, working_dir)
else:
await _execute_ssh_command_message(update, cmd_stripped, server, working_dir)
async def _execute_composite_command_local(update: Update, command: str, working_dir: str):
"""Выполнение составной команды локально."""
command_with_pwd = f"{command} && pwd"
logger.info(f"Выполнение составной команды с cd: {command_with_pwd} в директории: {working_dir}")
try:
process = await asyncio.create_subprocess_shell(
command_with_pwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=working_dir
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30)
output = stdout.decode("utf-8", errors="replace").strip()
error = stderr.decode("utf-8", errors="replace")
# Последняя строка - это pwd
if output and process.returncode == 0:
lines = output.split('\n')
final_dir = lines[-1].strip()
if Path(final_dir).is_dir():
state_manager.get(update.effective_user.id).working_directory = final_dir
output = '\n'.join(lines[:-1])
await _show_result_message(update, command, output, error, process.returncode)
except asyncio.TimeoutError:
await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown")
except Exception as e:
logger.error(f"Ошибка: {e}")
await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown")
async def _execute_composite_command_ssh(update: Update, command: str, server: Server, working_dir: str):
"""Выполнение составной команды через SSH."""
command_with_pwd = f"{command} && pwd"
try:
client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None
async with asyncssh.connect(
host=server.host,
port=server.port,
username=server.user,
client_keys=client_keys,
known_hosts=None
) as conn:
result = await conn.run(command_with_pwd, cwd=working_dir, timeout=30)
output = result.stdout.strip()
error = result.stderr
# Последняя строка - pwd
if output:
lines = output.split('\n')
final_dir = lines[-1].strip()
# Простая проверка - если начинается с /
if final_dir.startswith('/'):
state_manager.get(update.effective_user.id).working_directory = final_dir
output = '\n'.join(lines[:-1])
await _show_result_message(update, command, output, error, 0)
except asyncssh.Error as e:
logger.error(f"SSH ошибка: {e}")
await update.message.reply_text(f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown")
except asyncio.TimeoutError:
await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown")
except Exception as e:
logger.error(f"Ошибка: {e}")
await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown")
async def _execute_local_command_message(update: Update, command: str, working_dir: str):
"""Выполнение локальной команды из сообщения."""
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=working_dir
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30)
await _show_result_message(update, command, stdout.decode(), stderr.decode(), process.returncode)
except asyncio.TimeoutError:
await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown")
except Exception as e:
logger.error(f"Ошибка: {e}")
await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown")
async def _execute_ssh_command_message(update: Update, command: str, server: Server, working_dir: str):
"""Выполнение команды через SSH из сообщения."""
try:
client_keys = [server_manager.ssh_key_path] if server_manager.ssh_key_path else None
async with asyncssh.connect(
host=server.host,
port=server.port,
username=server.user,
client_keys=client_keys,
known_hosts=None
) as conn:
result = await conn.run(command, cwd=working_dir, timeout=30)
await _show_result_message(update, command, result.stdout, result.stderr, 0)
except asyncssh.Error as e:
logger.error(f"SSH ошибка: {e}")
await update.message.reply_text(f"❌ *SSH ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown")
except asyncio.TimeoutError:
await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown")
except Exception as e:
logger.error(f"Ошибка: {e}")
await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown")
async def _show_result_message(update: Update, command: str, output: str, error: str, returncode: int):
"""Показ результата выполнения команды."""
result = f"✅ *Результат:*\n\n```\n{command}\n```\n\n"
if output:
if len(output) > 4000:
output = output[:4000] + "\n... (вывод обрезан)"
result += f"*Вывод:*\n```\n{output}\n```\n"
if error:
if len(error) > 4000:
error = error[:4000] + "\n... (вывод обрезан)"
result += f"*Ошибки:*\n```\n{error}\n```\n"
result += f"\n*Код возврата:* `{returncode}`"
await update.message.reply_text(result, parse_mode="Markdown")
async def post_init(application: Application):
"""Инициализация после запуска бота."""
# Установка команд бота
commands = [
BotCommand("start", "Запустить бота"),
BotCommand("menu", "Главное меню с кнопками"),
BotCommand("help", "Справка"),
BotCommand("settings", "Настройки"),
]
await application.bot.set_my_commands(commands)
logger.info("Бот инициализирован")
def main():
"""Точка входа."""
# Чтение токена только из переменной окружения
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
print("❌ Ошибка: не установлен TELEGRAM_BOT_TOKEN")
print("\nСпособы установки токена:")
print(" 1. Создайте файл .env по примеру .env.example")
print(" 2. Или задайте переменную окружения:")
print(" export TELEGRAM_BOT_TOKEN='your_token_here'")
print("\nИли запустите ./run.sh для интерактивной настройки")
sys.exit(1)
# Загрузка серверов из env
server_manager.load_from_env()
# Инициализация меню
init_menus()
# Создание приложения
application = Application.builder().token(token).post_init(post_init).build()
# Регистрация хендлеров
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("settings", settings_command))
application.add_handler(CommandHandler("menu", menu_command))
application.add_handler(CallbackQueryHandler(menu_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
# Запуск
logger.info("Запуск бота...")
print(f"🤖 {config.name} запущен!")
print(f"📝 Описание: {config.description}")
print(f"🎨 Иконка: {config.icon}")
print("\nОстановка: Ctrl+C")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()