737 lines
28 KiB
Python
737 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Telegram CLI Bot - бот для выполнения CLI команд с многоуровневым меню.
|
||
Легкое добавление новых команд через регистрацию хендлеров.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import asyncio
|
||
import subprocess
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Optional, Callable, Dict, Any, List
|
||
from dataclasses import dataclass, field
|
||
|
||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand
|
||
from telegram.ext import (
|
||
Application,
|
||
CommandHandler,
|
||
CallbackQueryHandler,
|
||
MessageHandler,
|
||
ContextTypes,
|
||
filters,
|
||
)
|
||
|
||
# --- Конфигурация ---
|
||
BASE_DIR = Path(__file__).parent
|
||
CONFIG_FILE = BASE_DIR / "bot_config.json"
|
||
COMMANDS_FILE = BASE_DIR / "commands.yaml"
|
||
|
||
logging.basicConfig(
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
level=logging.INFO,
|
||
handlers=[
|
||
logging.FileHandler(BASE_DIR / "bot.log"),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# --- Хранилище состояний пользователя ---
|
||
@dataclass
|
||
class UserState:
|
||
"""Состояние пользователя в диалоге."""
|
||
current_menu: str = "main"
|
||
waiting_for_input: bool = False
|
||
input_type: Optional[str] = None # "name", "description", "icon", "command"
|
||
parent_menu: Optional[str] = None
|
||
context: Dict[str, Any] = field(default_factory=dict)
|
||
|
||
|
||
class StateManager:
|
||
"""Управление состояниями пользователей."""
|
||
|
||
def __init__(self):
|
||
self._states: Dict[int, UserState] = {}
|
||
|
||
def get(self, user_id: int) -> UserState:
|
||
if user_id not in self._states:
|
||
self._states[user_id] = UserState()
|
||
return self._states[user_id]
|
||
|
||
def reset(self, user_id: int):
|
||
self._states[user_id] = UserState()
|
||
|
||
|
||
# --- Конфигурация бота ---
|
||
class BotConfig:
|
||
"""Конфигурация бота с сохранением в JSON."""
|
||
|
||
DEFAULT_CONFIG = {
|
||
"bot_name": "CLI Assistant",
|
||
"bot_description": "Бот для выполнения CLI команд",
|
||
"bot_icon_emoji": "🤖",
|
||
"allowed_users": [], # пустой список = все разрешены
|
||
"require_confirmation": True,
|
||
"working_directory": str(Path.home()),
|
||
}
|
||
|
||
def __init__(self, config_file: Path = CONFIG_FILE):
|
||
self.config_file = config_file
|
||
self._config = self._load()
|
||
|
||
def _load(self) -> dict:
|
||
if self.config_file.exists():
|
||
with open(self.config_file, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
return self.DEFAULT_CONFIG.copy()
|
||
|
||
def _save(self):
|
||
with open(self.config_file, "w", encoding="utf-8") as f:
|
||
json.dump(self._config, f, indent=2, ensure_ascii=False)
|
||
|
||
def get(self, key: str, default=None):
|
||
return self._config.get(key, self.DEFAULT_CONFIG.get(key, default))
|
||
|
||
def set(self, key: str, value):
|
||
self._config[key] = value
|
||
self._save()
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return self._config.get("bot_name", self.DEFAULT_CONFIG["bot_name"])
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return self._config.get("bot_description", self.DEFAULT_CONFIG["bot_description"])
|
||
|
||
@property
|
||
def icon(self) -> str:
|
||
return self._config.get("bot_icon_emoji", self.DEFAULT_CONFIG["bot_icon_emoji"])
|
||
|
||
|
||
# --- Система команд ---
|
||
@dataclass
|
||
class MenuItem:
|
||
"""Элемент меню."""
|
||
label: str
|
||
callback: str # callback_data для кнопки
|
||
description: str = ""
|
||
icon: str = ""
|
||
children: List["MenuItem"] = field(default_factory=list)
|
||
command: Optional[str] = None # CLI команда для выполнения
|
||
is_command: bool = False
|
||
|
||
|
||
class MenuBuilder:
|
||
"""Построитель многоуровневого меню."""
|
||
|
||
def __init__(self):
|
||
self._menus: Dict[str, List[MenuItem]] = {}
|
||
|
||
def add_menu(self, menu_name: str, items: List[MenuItem]):
|
||
self._menus[menu_name] = items
|
||
|
||
def get_menu(self, menu_name: str) -> List[MenuItem]:
|
||
return self._menus.get(menu_name, [])
|
||
|
||
def get_keyboard(self, menu_name: str) -> InlineKeyboardMarkup:
|
||
"""Создает InlineKeyboard для меню."""
|
||
items = self._menus.get(menu_name, [])
|
||
keyboard = []
|
||
for item in items:
|
||
icon = item.icon + " " if item.icon else ""
|
||
button = InlineKeyboardButton(
|
||
f"{icon}{item.label}",
|
||
callback_data=item.callback
|
||
)
|
||
keyboard.append([button])
|
||
return InlineKeyboardMarkup(keyboard)
|
||
|
||
|
||
class CommandRegistry:
|
||
"""Реестр команд для легкого добавления."""
|
||
|
||
def __init__(self):
|
||
self._commands: Dict[str, Callable] = {}
|
||
|
||
def register(self, name: str):
|
||
"""Декоратор для регистрации команды."""
|
||
def decorator(func: Callable):
|
||
self._commands[name] = func
|
||
return func
|
||
return decorator
|
||
|
||
def get(self, name: str) -> Optional[Callable]:
|
||
return self._commands.get(name)
|
||
|
||
def list_commands(self) -> List[str]:
|
||
return list(self._commands.keys())
|
||
|
||
|
||
# --- Глобальные объекты ---
|
||
config = BotConfig()
|
||
state_manager = StateManager()
|
||
menu_builder = MenuBuilder()
|
||
command_registry = CommandRegistry()
|
||
|
||
|
||
# --- Инициализация меню ---
|
||
def init_menus():
|
||
"""Инициализация структуры меню."""
|
||
|
||
# Главное меню
|
||
main_menu = [
|
||
MenuItem("🖥️ Выполнить команду", "exec_cmd", icon="🖥️", is_command=True),
|
||
MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"),
|
||
MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"),
|
||
MenuItem("ℹ️ О боте", "about", icon="ℹ️"),
|
||
]
|
||
menu_builder.add_menu("main", main_menu)
|
||
|
||
# Меню предустановленных команд
|
||
preset_menu = [
|
||
MenuItem("📁 Файловая система", "fs_menu", icon="📁"),
|
||
MenuItem("🔍 Поиск", "search_menu", icon="🔍"),
|
||
MenuItem("📊 Система", "system_menu", icon="📊"),
|
||
MenuItem("🌐 Сеть", "network_menu", icon="🌐"),
|
||
MenuItem("⬅️ Назад", "main", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("preset", preset_menu)
|
||
|
||
# Файловая система
|
||
fs_menu = [
|
||
MenuItem("ls -la", "cmd_ls_la", command="ls -la", icon="📄"),
|
||
MenuItem("pwd", "cmd_pwd", command="pwd", icon="📍"),
|
||
MenuItem("df -h", "cmd_df", command="df -h", icon="💾"),
|
||
MenuItem("du -sh *", "cmd_du", command="du -sh * 2>/dev/null | sort -hr | head -20", icon="📊"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("fs", fs_menu)
|
||
|
||
# Поиск
|
||
search_menu = [
|
||
MenuItem("find . -name", "cmd_find_name", command="find . -maxdepth 3 -name '*.txt' 2>/dev/null", icon="🔎"),
|
||
MenuItem("grep пример", "cmd_grep", command="grep -r 'example' . 2>/dev/null | head -20", icon="🔍"),
|
||
MenuItem("which command", "cmd_which", command="which python3 bash git", icon="📍"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("search", search_menu)
|
||
|
||
# Система
|
||
system_menu = [
|
||
MenuItem("top -n 1", "cmd_top", command="top -bn1 | head -20", icon="📈"),
|
||
MenuItem("ps aux", "cmd_ps", command="ps aux | head -20", icon="🔄"),
|
||
MenuItem("free -h", "cmd_free", command="free -h", icon="💾"),
|
||
MenuItem("uname -a", "cmd_uname", command="uname -a", icon="ℹ️"),
|
||
MenuItem("uptime", "cmd_uptime", command="uptime", icon="⏱️"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("system", system_menu)
|
||
|
||
# Сеть
|
||
network_menu = [
|
||
MenuItem("ip addr", "cmd_ip", command="ip addr 2>/dev/null || ifconfig 2>/dev/null", icon="🌐"),
|
||
MenuItem("ping google", "cmd_ping", command="ping -c 4 google.com 2>&1 | head -10", icon="📡"),
|
||
MenuItem("netstat", "cmd_netstat", command="ss -tuln 2>/dev/null || netstat -tuln 2>/dev/null | head -20", icon="🔌"),
|
||
MenuItem("curl ifconfig.me", "cmd_curl_ip", command="curl -s ifconfig.me 2>&1", icon="📍"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("network", network_menu)
|
||
|
||
# Настройки
|
||
settings_menu = [
|
||
MenuItem("📝 Изменить имя бота", "set_name", icon="📝"),
|
||
MenuItem("📄 Изменить описание", "set_description", icon="📄"),
|
||
MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"),
|
||
MenuItem("👥 Управление доступом", "access_menu", icon="👥"),
|
||
MenuItem("⬅️ Назад", "main", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("settings", settings_menu)
|
||
|
||
# Доступ
|
||
access_menu = [
|
||
MenuItem("📋 Показать разрешённых", "show_access", icon="📋"),
|
||
MenuItem("➕ Добавить пользователя", "add_access", icon="➕"),
|
||
MenuItem("➖ Удалить пользователя", "remove_access", icon="➖"),
|
||
MenuItem("⬅️ Назад", "settings", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("access", access_menu)
|
||
|
||
|
||
# --- Хендлеры ---
|
||
|
||
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка команды /start."""
|
||
user = update.effective_user
|
||
logger.info(f"Пользователь {user.username} ({user.id}) запустил бота")
|
||
|
||
state_manager.reset(user.id)
|
||
|
||
await update.message.reply_text(
|
||
f"👋 Привет, {user.first_name}!\n\n"
|
||
f"{config.icon} *{config.name}*\n"
|
||
f"_{config.description}_\n\n"
|
||
f"Используйте меню для навигации или команду /help для справки.",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
|
||
|
||
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка команды /help."""
|
||
help_text = f"""
|
||
📖 *Справка по боту {config.name}*
|
||
|
||
*Основные возможности:*
|
||
• Выполнение CLI команд на вашем ПК
|
||
• Предустановленные команды в меню
|
||
• Гибкая настройка бота
|
||
|
||
*Команды:*
|
||
/start - Запустить бота
|
||
/help - Эта справка
|
||
/settings - Настройки бота
|
||
/commands - Список доступных команд
|
||
|
||
*Безопасность:*
|
||
Команды выполняются от имени пользователя.
|
||
Будьте осторожны с деструктивными командами!
|
||
"""
|
||
await update.message.reply_text(help_text, parse_mode="Markdown")
|
||
|
||
|
||
async def settings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка команды /settings."""
|
||
state = state_manager.get(update.effective_user.id)
|
||
state.current_menu = "settings"
|
||
|
||
await update.message.reply_text(
|
||
"⚙️ *Настройки бота*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
|
||
async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка нажатий на кнопки меню."""
|
||
query = update.callback_query
|
||
user_id = query.from_user.id
|
||
state = state_manager.get(user_id)
|
||
|
||
await query.answer()
|
||
|
||
callback = query.data
|
||
logger.info(f"Callback: {callback} от пользователя {user_id}")
|
||
|
||
# Обработка навигации
|
||
if callback == "main":
|
||
state.current_menu = "main"
|
||
await query.edit_message_text(
|
||
"🏠 *Главное меню*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
|
||
elif callback == "preset_menu":
|
||
state.current_menu = "preset"
|
||
await query.edit_message_text(
|
||
"📋 *Предустановленные команды*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("preset")
|
||
)
|
||
|
||
elif callback == "fs_menu":
|
||
await query.edit_message_text(
|
||
"📁 *Файловая система*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("fs")
|
||
)
|
||
|
||
elif callback == "search_menu":
|
||
await query.edit_message_text(
|
||
"🔍 *Поиск*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("search")
|
||
)
|
||
|
||
elif callback == "system_menu":
|
||
await query.edit_message_text(
|
||
"📊 *Система*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("system")
|
||
)
|
||
|
||
elif callback == "network_menu":
|
||
await query.edit_message_text(
|
||
"🌐 *Сеть*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("network")
|
||
)
|
||
|
||
elif callback == "settings_menu":
|
||
state.current_menu = "settings"
|
||
await query.edit_message_text(
|
||
"⚙️ *Настройки бота*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "access_menu":
|
||
await query.edit_message_text(
|
||
"👥 *Управление доступом*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("access")
|
||
)
|
||
|
||
# Обработка команд выполнения
|
||
elif callback.startswith("cmd_"):
|
||
# Поиск команды в меню
|
||
command = None
|
||
for menu_items in menu_builder._menus.values():
|
||
for item in menu_items:
|
||
if item.callback == callback and item.command:
|
||
command = item.command
|
||
break
|
||
|
||
if command:
|
||
await execute_cli_command(query, command)
|
||
else:
|
||
await query.edit_message_text("❌ Команда не найдена")
|
||
|
||
# Настройки бота
|
||
elif callback == "set_name":
|
||
state.waiting_for_input = True
|
||
state.input_type = "name"
|
||
await query.edit_message_text(
|
||
"📝 *Изменение имени бота*\n\n"
|
||
f"Текущее имя: `{config.name}`\n\n"
|
||
"Отправьте новое имя бота:",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "set_description":
|
||
state.waiting_for_input = True
|
||
state.input_type = "description"
|
||
await query.edit_message_text(
|
||
"📄 *Изменение описания бота*\n\n"
|
||
f"Текущее описание: `{config.description}`\n\n"
|
||
"Отправьте новое описание:",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "set_icon":
|
||
state.waiting_for_input = True
|
||
state.input_type = "icon"
|
||
await query.edit_message_text(
|
||
"🎨 *Изменение иконки бота*\n\n"
|
||
f"Текущая иконка: `{config.icon}`\n\n"
|
||
"Отправьте новый emoji (один символ):",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "show_access":
|
||
allowed = config.get("allowed_users", [])
|
||
if allowed:
|
||
text = "👥 *Разрешённые пользователи:*\n" + "\n".join(f"• `{uid}`" for uid in allowed)
|
||
else:
|
||
text = "👥 *Доступ открыт для всех*\n\n(список разрешённых пользователей пуст)"
|
||
await query.edit_message_text(text, parse_mode="Markdown")
|
||
|
||
elif callback == "add_access":
|
||
state.waiting_for_input = True
|
||
state.input_type = "add_access"
|
||
await query.edit_message_text(
|
||
"➕ *Добавление пользователя*\n\n"
|
||
"Отправьте ID пользователя Telegram:\n"
|
||
"(можно получить через @userinfobot)",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif callback == "remove_access":
|
||
state.waiting_for_input = True
|
||
state.input_type = "remove_access"
|
||
allowed = config.get("allowed_users", [])
|
||
if allowed:
|
||
text = "➖ *Удаление пользователя*\n\n" + "\n".join(f"• `{uid}`" for uid in allowed)
|
||
text += "\n\nОтправьте ID для удаления:"
|
||
else:
|
||
text = "➖ Список пуст, некого удалять"
|
||
await query.edit_message_text(text, parse_mode="Markdown")
|
||
|
||
elif callback == "about":
|
||
await query.edit_message_text(
|
||
f"ℹ️ *О боте*\n\n"
|
||
f"*{config.icon} {config.name}*\n"
|
||
f"_{config.description}_\n\n"
|
||
f"Версия: 1.0.0\n"
|
||
f"Рабочая директория: `{config.get('working_directory')}`\n\n"
|
||
f"Бот позволяет выполнять CLI команды на вашем ПК\n"
|
||
f"через интерфейс Telegram.",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
state.current_menu = "main"
|
||
|
||
elif callback == "exec_cmd":
|
||
state.waiting_for_input = True
|
||
state.input_type = "command"
|
||
await query.edit_message_text(
|
||
"🖥️ *Выполнение команды*\n\n"
|
||
"Отправьте команду для выполнения:\n\n"
|
||
"⚠️ _Будьте осторожны с деструктивными командами!_",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
|
||
|
||
async def execute_cli_command(query, command: str):
|
||
"""Выполнение CLI команды."""
|
||
working_dir = config.get("working_directory", str(Path.home()))
|
||
|
||
logger.info(f"Выполнение команды: {command}")
|
||
|
||
await query.edit_message_text(
|
||
f"⏳ *Выполнение...*\n\n`{command}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
try:
|
||
process = await asyncio.create_subprocess_shell(
|
||
command,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
cwd=working_dir
|
||
)
|
||
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(),
|
||
timeout=30
|
||
)
|
||
|
||
output = stdout.decode("utf-8", errors="replace")
|
||
error = stderr.decode("utf-8", errors="replace")
|
||
|
||
result = f"✅ *Результат:*\n\n"
|
||
result += f"```\n{command}\n```\n\n"
|
||
|
||
if output:
|
||
# Ограничиваем вывод
|
||
if len(output) > 4000:
|
||
output = output[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Вывод:*\n```\n{output}\n```\n"
|
||
|
||
if error:
|
||
if len(error) > 4000:
|
||
error = error[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Ошибки:*\n```\n{error}\n```\n"
|
||
|
||
result += f"\n*Код возврата:* `{process.returncode}`"
|
||
|
||
await query.edit_message_text(result, parse_mode="Markdown")
|
||
|
||
except asyncio.TimeoutError:
|
||
await query.edit_message_text(
|
||
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
|
||
parse_mode="Markdown"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Ошибка выполнения команды: {e}")
|
||
await query.edit_message_text(
|
||
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
|
||
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка текстовых сообщений (для ввода настроек и команд)."""
|
||
user_id = update.effective_user.id
|
||
state = state_manager.get(user_id)
|
||
text = update.message.text.strip()
|
||
|
||
if not state.waiting_for_input:
|
||
return
|
||
|
||
input_type = state.input_type
|
||
|
||
if input_type == "name":
|
||
config.set("bot_name", text)
|
||
await update.message.reply_text(
|
||
f"✅ Имя бота изменено на: `{text}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif input_type == "description":
|
||
config.set("bot_description", text)
|
||
await update.message.reply_text(
|
||
f"✅ Описание изменено на: `{text}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif input_type == "icon":
|
||
config.set("bot_icon_emoji", text[0] if text else "🤖")
|
||
await update.message.reply_text(
|
||
f"✅ Иконка изменена на: `{text[0] if text else '🤖'}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif input_type == "command":
|
||
# Выполнение произвольной команды
|
||
await update.message.reply_text(
|
||
f"⏳ *Выполнение...*\n\n`{text}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
await execute_cli_command_from_message(update, text)
|
||
|
||
elif input_type == "add_access":
|
||
try:
|
||
uid = int(text)
|
||
allowed = config.get("allowed_users", [])
|
||
if uid not in allowed:
|
||
allowed.append(uid)
|
||
config.set("allowed_users", allowed)
|
||
await update.message.reply_text(f"✅ Пользователь `{uid}` добавлен", parse_mode="Markdown")
|
||
else:
|
||
await update.message.reply_text(f"⚠️ Пользователь `{uid}` уже в списке", parse_mode="Markdown")
|
||
except ValueError:
|
||
await update.message.reply_text("❌ Неверный формат ID")
|
||
|
||
elif input_type == "remove_access":
|
||
try:
|
||
uid = int(text)
|
||
allowed = config.get("allowed_users", [])
|
||
if uid in allowed:
|
||
allowed.remove(uid)
|
||
config.set("allowed_users", allowed)
|
||
await update.message.reply_text(f"✅ Пользователь `{uid}` удалён", parse_mode="Markdown")
|
||
else:
|
||
await update.message.reply_text(f"⚠️ Пользователь `{uid}` не найден", parse_mode="Markdown")
|
||
except ValueError:
|
||
await update.message.reply_text("❌ Неверный формат ID")
|
||
|
||
# Сброс состояния
|
||
state.waiting_for_input = False
|
||
state.input_type = None
|
||
|
||
|
||
async def execute_cli_command_from_message(update: Update, command: str):
|
||
"""Выполнение CLI команды из сообщения."""
|
||
working_dir = config.get("working_directory", str(Path.home()))
|
||
|
||
try:
|
||
process = await asyncio.create_subprocess_shell(
|
||
command,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
cwd=working_dir
|
||
)
|
||
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(),
|
||
timeout=30
|
||
)
|
||
|
||
output = stdout.decode("utf-8", errors="replace")
|
||
error = stderr.decode("utf-8", errors="replace")
|
||
|
||
result = f"✅ *Результат:*\n\n"
|
||
result += f"```\n{command}\n```\n\n"
|
||
|
||
if output:
|
||
if len(output) > 4000:
|
||
output = output[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Вывод:*\n```\n{output}\n```\n"
|
||
|
||
if error:
|
||
if len(error) > 4000:
|
||
error = error[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Ошибки:*\n```\n{error}\n```\n"
|
||
|
||
result += f"\n*Код возврата:* `{process.returncode}`"
|
||
|
||
await update.message.reply_text(result, parse_mode="Markdown")
|
||
|
||
except asyncio.TimeoutError:
|
||
await update.message.reply_text(
|
||
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
|
||
parse_mode="Markdown"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Ошибка выполнения команды: {e}")
|
||
await update.message.reply_text(
|
||
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
|
||
async def post_init(application: Application):
|
||
"""Инициализация после запуска бота."""
|
||
# Установка команд бота
|
||
commands = [
|
||
BotCommand("start", "Запустить бота"),
|
||
BotCommand("help", "Справка"),
|
||
BotCommand("settings", "Настройки"),
|
||
]
|
||
await application.bot.set_my_commands(commands)
|
||
|
||
# Установка имени и описания
|
||
await application.bot.set_my_name(config.name)
|
||
await application.bot.set_my_description(config.description)
|
||
|
||
logger.info("Бот инициализирован")
|
||
|
||
|
||
def main():
|
||
"""Точка входа."""
|
||
# Проверка токена: сначала переменная окружения, потом конфиг
|
||
token = os.getenv("TELEGRAM_BOT_TOKEN")
|
||
|
||
if not token and CONFIG_FILE.exists():
|
||
try:
|
||
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
|
||
config_data = json.load(f)
|
||
token = config_data.get("bot_token")
|
||
if token:
|
||
logger.info("Токен получен из конфигурации")
|
||
except Exception as e:
|
||
logger.warning(f"Не удалось прочитать токен из конфига: {e}")
|
||
|
||
if not token:
|
||
print("❌ Ошибка: не установлен TELEGRAM_BOT_TOKEN")
|
||
print("Задайте переменную окружения:")
|
||
print(" export TELEGRAM_BOT_TOKEN='your_token_here'")
|
||
print("Или запустите ./run.sh для интерактивной настройки")
|
||
sys.exit(1)
|
||
|
||
# Инициализация меню
|
||
init_menus()
|
||
|
||
# Создание приложения
|
||
application = Application.builder().token(token).post_init(post_init).build()
|
||
|
||
# Регистрация хендлеров
|
||
application.add_handler(CommandHandler("start", start_command))
|
||
application.add_handler(CommandHandler("help", help_command))
|
||
application.add_handler(CommandHandler("settings", settings_command))
|
||
application.add_handler(CallbackQueryHandler(menu_callback))
|
||
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
|
||
|
||
# Запуск
|
||
logger.info("Запуск бота...")
|
||
print(f"🤖 {config.name} запущен!")
|
||
print(f"📝 Описание: {config.description}")
|
||
print(f"🎨 Иконка: {config.icon}")
|
||
print("\nОстановка: Ctrl+C")
|
||
|
||
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|