887 lines
35 KiB
Python
887 lines
35 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Telegram CLI Bot - бот для выполнения CLI команд с многоуровневым меню.
|
||
Легкое добавление новых команд через регистрацию хендлеров.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import asyncio
|
||
import subprocess
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Optional, Callable, Dict, Any, List
|
||
from dataclasses import dataclass, field
|
||
|
||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand
|
||
from telegram.ext import (
|
||
Application,
|
||
CommandHandler,
|
||
CallbackQueryHandler,
|
||
MessageHandler,
|
||
ContextTypes,
|
||
filters,
|
||
)
|
||
|
||
# --- Конфигурация ---
|
||
BASE_DIR = Path(__file__).parent
|
||
CONFIG_FILE = BASE_DIR / "bot_config.json"
|
||
COMMANDS_FILE = BASE_DIR / "commands.yaml"
|
||
|
||
logging.basicConfig(
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
level=logging.INFO,
|
||
handlers=[
|
||
logging.FileHandler(BASE_DIR / "bot.log"),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# --- Хранилище состояний пользователя ---
|
||
@dataclass
|
||
class UserState:
|
||
"""Состояние пользователя в диалоге."""
|
||
current_menu: str = "main"
|
||
waiting_for_input: bool = False
|
||
input_type: Optional[str] = None # "name", "description", "icon", "command"
|
||
parent_menu: Optional[str] = None
|
||
context: Dict[str, Any] = field(default_factory=dict)
|
||
working_directory: Optional[str] = None # Текущая директория пользователя
|
||
|
||
|
||
class StateManager:
|
||
"""Управление состояниями пользователей."""
|
||
|
||
def __init__(self):
|
||
self._states: Dict[int, UserState] = {}
|
||
|
||
def get(self, user_id: int) -> UserState:
|
||
if user_id not in self._states:
|
||
self._states[user_id] = UserState()
|
||
return self._states[user_id]
|
||
|
||
def reset(self, user_id: int):
|
||
self._states[user_id] = UserState()
|
||
|
||
|
||
# --- Конфигурация бота ---
|
||
class BotConfig:
|
||
"""Конфигурация бота с сохранением в JSON."""
|
||
|
||
DEFAULT_CONFIG = {
|
||
"bot_name": "CLI Assistant",
|
||
"bot_description": "Бот для выполнения CLI команд",
|
||
"bot_icon_emoji": "🤖",
|
||
"allowed_users": [], # пустой список = все разрешены
|
||
"require_confirmation": True,
|
||
"working_directory": str(Path.home()),
|
||
}
|
||
|
||
def __init__(self, config_file: Path = CONFIG_FILE):
|
||
self.config_file = config_file
|
||
self._config = self._load()
|
||
|
||
def _load(self) -> dict:
|
||
if self.config_file.exists():
|
||
with open(self.config_file, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
return self.DEFAULT_CONFIG.copy()
|
||
|
||
def _save(self):
|
||
with open(self.config_file, "w", encoding="utf-8") as f:
|
||
json.dump(self._config, f, indent=2, ensure_ascii=False)
|
||
|
||
def get(self, key: str, default=None):
|
||
return self._config.get(key, self.DEFAULT_CONFIG.get(key, default))
|
||
|
||
def set(self, key: str, value):
|
||
self._config[key] = value
|
||
self._save()
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return self._config.get("bot_name", self.DEFAULT_CONFIG["bot_name"])
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return self._config.get("bot_description", self.DEFAULT_CONFIG["bot_description"])
|
||
|
||
@property
|
||
def icon(self) -> str:
|
||
return self._config.get("bot_icon_emoji", self.DEFAULT_CONFIG["bot_icon_emoji"])
|
||
|
||
|
||
# --- Система команд ---
|
||
@dataclass
|
||
class MenuItem:
|
||
"""Элемент меню."""
|
||
label: str
|
||
callback: str # callback_data для кнопки
|
||
description: str = ""
|
||
icon: str = ""
|
||
children: List["MenuItem"] = field(default_factory=list)
|
||
command: Optional[str] = None # CLI команда для выполнения
|
||
is_command: bool = False
|
||
|
||
|
||
class MenuBuilder:
|
||
"""Построитель многоуровневого меню."""
|
||
|
||
def __init__(self):
|
||
self._menus: Dict[str, List[MenuItem]] = {}
|
||
|
||
def add_menu(self, menu_name: str, items: List[MenuItem]):
|
||
self._menus[menu_name] = items
|
||
|
||
def get_menu(self, menu_name: str) -> List[MenuItem]:
|
||
return self._menus.get(menu_name, [])
|
||
|
||
def get_keyboard(self, menu_name: str) -> InlineKeyboardMarkup:
|
||
"""Создает InlineKeyboard для меню."""
|
||
items = self._menus.get(menu_name, [])
|
||
keyboard = []
|
||
for item in items:
|
||
icon = item.icon + " " if item.icon else ""
|
||
button = InlineKeyboardButton(
|
||
f"{icon}{item.label}",
|
||
callback_data=item.callback
|
||
)
|
||
keyboard.append([button])
|
||
return InlineKeyboardMarkup(keyboard)
|
||
|
||
|
||
class CommandRegistry:
|
||
"""Реестр команд для легкого добавления."""
|
||
|
||
def __init__(self):
|
||
self._commands: Dict[str, Callable] = {}
|
||
|
||
def register(self, name: str):
|
||
"""Декоратор для регистрации команды."""
|
||
def decorator(func: Callable):
|
||
self._commands[name] = func
|
||
return func
|
||
return decorator
|
||
|
||
def get(self, name: str) -> Optional[Callable]:
|
||
return self._commands.get(name)
|
||
|
||
def list_commands(self) -> List[str]:
|
||
return list(self._commands.keys())
|
||
|
||
|
||
# --- Глобальные объекты ---
|
||
config = BotConfig()
|
||
state_manager = StateManager()
|
||
menu_builder = MenuBuilder()
|
||
command_registry = CommandRegistry()
|
||
|
||
|
||
# --- Инициализация меню ---
|
||
def init_menus():
|
||
"""Инициализация структуры меню."""
|
||
|
||
# Главное меню
|
||
main_menu = [
|
||
MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"),
|
||
MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"),
|
||
MenuItem("ℹ️ О боте", "about", icon="ℹ️"),
|
||
]
|
||
menu_builder.add_menu("main", main_menu)
|
||
|
||
# Меню предустановленных команд
|
||
preset_menu = [
|
||
MenuItem("📁 Файловая система", "fs_menu", icon="📁"),
|
||
MenuItem("🔍 Поиск", "search_menu", icon="🔍"),
|
||
MenuItem("📊 Система", "system_menu", icon="📊"),
|
||
MenuItem("🌐 Сеть", "network_menu", icon="🌐"),
|
||
MenuItem("⬅️ Назад", "main", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("preset", preset_menu)
|
||
|
||
# Файловая система
|
||
fs_menu = [
|
||
MenuItem("ls -la", "cmd_ls_la", command="ls -la", icon="📄"),
|
||
MenuItem("pwd", "cmd_pwd", command="pwd", icon="📍"),
|
||
MenuItem("df -h", "cmd_df", command="df -h", icon="💾"),
|
||
MenuItem("du -sh *", "cmd_du", command="du -sh * 2>/dev/null | sort -hr | head -20", icon="📊"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("fs", fs_menu)
|
||
|
||
# Поиск
|
||
search_menu = [
|
||
MenuItem("find . -name", "cmd_find_name", command="find . -maxdepth 3 -name '*.txt' 2>/dev/null", icon="🔎"),
|
||
MenuItem("grep пример", "cmd_grep", command="grep -r 'example' . 2>/dev/null | head -20", icon="🔍"),
|
||
MenuItem("which command", "cmd_which", command="which python3 bash git", icon="📍"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("search", search_menu)
|
||
|
||
# Система
|
||
system_menu = [
|
||
MenuItem("top -n 1", "cmd_top", command="top -bn1 | head -20", icon="📈"),
|
||
MenuItem("ps aux", "cmd_ps", command="ps aux | head -20", icon="🔄"),
|
||
MenuItem("free -h", "cmd_free", command="free -h", icon="💾"),
|
||
MenuItem("uname -a", "cmd_uname", command="uname -a", icon="ℹ️"),
|
||
MenuItem("uptime", "cmd_uptime", command="uptime", icon="⏱️"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("system", system_menu)
|
||
|
||
# Сеть
|
||
network_menu = [
|
||
MenuItem("ip addr", "cmd_ip", command="ip addr 2>/dev/null || ifconfig 2>/dev/null", icon="🌐"),
|
||
MenuItem("ping google", "cmd_ping", command="ping -c 4 google.com 2>&1 | head -10", icon="📡"),
|
||
MenuItem("netstat", "cmd_netstat", command="ss -tuln 2>/dev/null || netstat -tuln 2>/dev/null | head -20", icon="🔌"),
|
||
MenuItem("curl ifconfig.me", "cmd_curl_ip", command="curl -s ifconfig.me 2>&1", icon="📍"),
|
||
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("network", network_menu)
|
||
|
||
# Настройки
|
||
settings_menu = [
|
||
MenuItem("📝 Изменить имя бота", "set_name", icon="📝"),
|
||
MenuItem("📄 Изменить описание", "set_description", icon="📄"),
|
||
MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"),
|
||
MenuItem("👥 Управление доступом", "access_menu", icon="👥"),
|
||
MenuItem("⬅️ Назад", "main", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("settings", settings_menu)
|
||
|
||
# Доступ
|
||
access_menu = [
|
||
MenuItem("📋 Показать разрешённых", "show_access", icon="📋"),
|
||
MenuItem("➕ Добавить пользователя", "add_access", icon="➕"),
|
||
MenuItem("➖ Удалить пользователя", "remove_access", icon="➖"),
|
||
MenuItem("⬅️ Назад", "settings", icon="⬅️"),
|
||
]
|
||
menu_builder.add_menu("access", access_menu)
|
||
|
||
|
||
# --- Хендлеры ---
|
||
|
||
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка команды /start."""
|
||
user = update.effective_user
|
||
logger.info(f"Пользователь {user.username} ({user.id}) запустил бота")
|
||
|
||
state_manager.reset(user.id)
|
||
|
||
# Показать текущую директорию
|
||
working_dir = config.get("working_directory", str(Path.home()))
|
||
|
||
await update.message.reply_text(
|
||
f"👋 Привет, {user.first_name}!\n\n"
|
||
f"{config.icon} *{config.name}*\n"
|
||
f"_{config.description}_\n\n"
|
||
f"*Просто отправьте CLI команду в чат* — я её выполню!\n\n"
|
||
f"📁 Рабочая директория: `{working_dir}`\n\n"
|
||
f"Используйте `cd путь` для смены директории.\n"
|
||
f"Или используйте кнопки меню для быстрых команд.\n"
|
||
f"Команда /help покажет справку.",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
|
||
|
||
async def menu_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка команды /menu - показывает главное меню."""
|
||
user = update.effective_user
|
||
state = state_manager.get(user.id)
|
||
|
||
# Сброс состояния и возврат к главному меню
|
||
state_manager.reset(user.id)
|
||
state.current_menu = "main"
|
||
|
||
# Показать текущую директорию
|
||
working_dir = state.working_directory or config.get("working_directory", str(Path.home()))
|
||
|
||
await update.message.reply_text(
|
||
f"🏠 *Главное меню*\n\n"
|
||
f"📁 Текущая директория: `{working_dir}`\n\n"
|
||
f"Выберите действие:",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
|
||
|
||
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка команды /help."""
|
||
help_text = f"""
|
||
📖 *Справка по боту {config.name}*
|
||
|
||
*Как использовать:*
|
||
Просто отправьте любую CLI команду в чат — бот выполнит её!
|
||
|
||
*Примеры:*
|
||
• `ls -la` — список файлов
|
||
• `pwd` — текущая директория
|
||
• `df -h` — свободное место на диске
|
||
• `git status` — статус git
|
||
|
||
*Навигация по директориям:*
|
||
• `cd путь` — сменить директорию (например, `cd git/project`)
|
||
• `cd ..` — на уровень вверх
|
||
• `cd ~` — в домашнюю директорию
|
||
• `pwd` — показать текущую директорию
|
||
|
||
*Кнопки меню:*
|
||
• 📋 Предустановленные команды — быстрые команды по категориям
|
||
• ⚙️ Настройки бота — изменение имени, описания, иконки
|
||
• ℹ️ О боте — информация
|
||
|
||
*Команды управления:*
|
||
/start — Запустить бота, главное меню
|
||
/menu — Показать главное меню с кнопками
|
||
/help — Эта справка
|
||
/settings — Настройки
|
||
|
||
*Безопасность:*
|
||
Команды выполняются от вашего имени.
|
||
Будьте осторожны с деструктивными командами!
|
||
"""
|
||
await update.message.reply_text(help_text, parse_mode="Markdown")
|
||
|
||
|
||
async def settings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка команды /settings."""
|
||
state = state_manager.get(update.effective_user.id)
|
||
state.current_menu = "settings"
|
||
|
||
await update.message.reply_text(
|
||
"⚙️ *Настройки бота*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
|
||
async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка нажатий на кнопки меню."""
|
||
query = update.callback_query
|
||
user_id = query.from_user.id
|
||
state = state_manager.get(user_id)
|
||
|
||
await query.answer()
|
||
|
||
callback = query.data
|
||
logger.info(f"Callback: {callback} от пользователя {user_id}")
|
||
|
||
# Обработка навигации
|
||
if callback == "main":
|
||
state.current_menu = "main"
|
||
await query.edit_message_text(
|
||
"🏠 *Главное меню*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
|
||
elif callback == "preset_menu":
|
||
state.current_menu = "preset"
|
||
await query.edit_message_text(
|
||
"📋 *Предустановленные команды*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("preset")
|
||
)
|
||
|
||
elif callback == "fs_menu":
|
||
await query.edit_message_text(
|
||
"📁 *Файловая система*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("fs")
|
||
)
|
||
|
||
elif callback == "search_menu":
|
||
await query.edit_message_text(
|
||
"🔍 *Поиск*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("search")
|
||
)
|
||
|
||
elif callback == "system_menu":
|
||
await query.edit_message_text(
|
||
"📊 *Система*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("system")
|
||
)
|
||
|
||
elif callback == "network_menu":
|
||
await query.edit_message_text(
|
||
"🌐 *Сеть*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("network")
|
||
)
|
||
|
||
elif callback == "settings_menu":
|
||
state.current_menu = "settings"
|
||
await query.edit_message_text(
|
||
"⚙️ *Настройки бота*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "access_menu":
|
||
await query.edit_message_text(
|
||
"👥 *Управление доступом*",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("access")
|
||
)
|
||
|
||
# Обработка команд выполнения
|
||
elif callback.startswith("cmd_"):
|
||
# Поиск команды в меню
|
||
command = None
|
||
for menu_items in menu_builder._menus.values():
|
||
for item in menu_items:
|
||
if item.callback == callback and item.command:
|
||
command = item.command
|
||
break
|
||
|
||
if command:
|
||
await execute_cli_command(query, command)
|
||
else:
|
||
await query.edit_message_text("❌ Команда не найдена")
|
||
|
||
# Настройки бота
|
||
elif callback == "set_name":
|
||
state.waiting_for_input = True
|
||
state.input_type = "name"
|
||
await query.edit_message_text(
|
||
"📝 *Изменение имени бота*\n\n"
|
||
f"Текущее имя: `{config.name}`\n\n"
|
||
"Отправьте новое имя бота:",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "set_description":
|
||
state.waiting_for_input = True
|
||
state.input_type = "description"
|
||
await query.edit_message_text(
|
||
"📄 *Изменение описания бота*\n\n"
|
||
f"Текущее описание: `{config.description}`\n\n"
|
||
"Отправьте новое описание:",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "set_icon":
|
||
state.waiting_for_input = True
|
||
state.input_type = "icon"
|
||
await query.edit_message_text(
|
||
"🎨 *Изменение иконки бота*\n\n"
|
||
f"Текущая иконка: `{config.icon}`\n\n"
|
||
"Отправьте новый emoji (один символ):",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("settings")
|
||
)
|
||
|
||
elif callback == "show_access":
|
||
allowed = config.get("allowed_users", [])
|
||
if allowed:
|
||
text = "👥 *Разрешённые пользователи:*\n" + "\n".join(f"• `{uid}`" for uid in allowed)
|
||
else:
|
||
text = "👥 *Доступ открыт для всех*\n\n(список разрешённых пользователей пуст)"
|
||
await query.edit_message_text(text, parse_mode="Markdown")
|
||
|
||
elif callback == "add_access":
|
||
state.waiting_for_input = True
|
||
state.input_type = "add_access"
|
||
await query.edit_message_text(
|
||
"➕ *Добавление пользователя*\n\n"
|
||
"Отправьте ID пользователя Telegram:\n"
|
||
"(можно получить через @userinfobot)",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif callback == "remove_access":
|
||
state.waiting_for_input = True
|
||
state.input_type = "remove_access"
|
||
allowed = config.get("allowed_users", [])
|
||
if allowed:
|
||
text = "➖ *Удаление пользователя*\n\n" + "\n".join(f"• `{uid}`" for uid in allowed)
|
||
text += "\n\nОтправьте ID для удаления:"
|
||
else:
|
||
text = "➖ Список пуст, некого удалять"
|
||
await query.edit_message_text(text, parse_mode="Markdown")
|
||
|
||
elif callback == "about":
|
||
await query.edit_message_text(
|
||
f"ℹ️ *О боте*\n\n"
|
||
f"*{config.icon} {config.name}*\n"
|
||
f"_{config.description}_\n\n"
|
||
f"Версия: 1.0.0\n"
|
||
f"Рабочая директория: `{config.get('working_directory')}`\n\n"
|
||
f"Бот позволяет выполнять CLI команды на вашем ПК\n"
|
||
f"через интерфейс Telegram.",
|
||
parse_mode="Markdown",
|
||
reply_markup=menu_builder.get_keyboard("main")
|
||
)
|
||
state.current_menu = "main"
|
||
|
||
|
||
async def execute_cli_command(query, command: str):
|
||
"""Выполнение CLI команды из кнопки меню."""
|
||
user_id = query.from_user.id
|
||
state = state_manager.get(user_id)
|
||
|
||
# Определяем рабочую директорию: сначала пользовательская, потом из конфига
|
||
working_dir = state.working_directory or config.get("working_directory", str(Path.home()))
|
||
|
||
logger.info(f"Выполнение команды: {command} в директории: {working_dir}")
|
||
|
||
await query.edit_message_text(
|
||
f"⏳ *Выполнение...*\n\n`{command}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
try:
|
||
process = await asyncio.create_subprocess_shell(
|
||
command,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
cwd=working_dir
|
||
)
|
||
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(),
|
||
timeout=30
|
||
)
|
||
|
||
output = stdout.decode("utf-8", errors="replace")
|
||
error = stderr.decode("utf-8", errors="replace")
|
||
|
||
result = f"✅ *Результат:*\n\n"
|
||
result += f"```\n{command}\n```\n\n"
|
||
|
||
if output:
|
||
# Ограничиваем вывод
|
||
if len(output) > 4000:
|
||
output = output[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Вывод:*\n```\n{output}\n```\n"
|
||
|
||
if error:
|
||
if len(error) > 4000:
|
||
error = error[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Ошибки:*\n```\n{error}\n```\n"
|
||
|
||
result += f"\n*Код возврата:* `{process.returncode}`"
|
||
|
||
await query.edit_message_text(result, parse_mode="Markdown")
|
||
|
||
except asyncio.TimeoutError:
|
||
await query.edit_message_text(
|
||
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
|
||
parse_mode="Markdown"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Ошибка выполнения команды: {e}")
|
||
await query.edit_message_text(
|
||
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
|
||
async def handle_settings_input(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str):
|
||
"""Обработка ввода в режиме настройки."""
|
||
user_id = update.effective_user.id
|
||
state = state_manager.get(user_id)
|
||
input_type = state.input_type
|
||
|
||
if input_type == "name":
|
||
config.set("bot_name", text)
|
||
await update.message.reply_text(
|
||
f"✅ Имя бота изменено на: `{text}`\n\n"
|
||
f"Используйте /start для возврата в главное меню",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif input_type == "description":
|
||
config.set("bot_description", text)
|
||
await update.message.reply_text(
|
||
f"✅ Описание изменено на: `{text}`\n\n"
|
||
f"Используйте /start для возврата в главное меню",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif input_type == "icon":
|
||
config.set("bot_icon_emoji", text[0] if text else "🤖")
|
||
await update.message.reply_text(
|
||
f"✅ Иконка изменена на: `{text[0] if text else '🤖'}`\n\n"
|
||
f"Используйте /start для возврата в главное меню",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
elif input_type == "add_access":
|
||
try:
|
||
uid = int(text)
|
||
allowed = config.get("allowed_users", [])
|
||
if uid not in allowed:
|
||
allowed.append(uid)
|
||
config.set("allowed_users", allowed)
|
||
await update.message.reply_text(f"✅ Пользователь `{uid}` добавлен", parse_mode="Markdown")
|
||
else:
|
||
await update.message.reply_text(f"⚠️ Пользователь `{uid}` уже в списке", parse_mode="Markdown")
|
||
except ValueError:
|
||
await update.message.reply_text("❌ Неверный формат ID")
|
||
|
||
elif input_type == "remove_access":
|
||
try:
|
||
uid = int(text)
|
||
allowed = config.get("allowed_users", [])
|
||
if uid in allowed:
|
||
allowed.remove(uid)
|
||
config.set("allowed_users", allowed)
|
||
await update.message.reply_text(f"✅ Пользователь `{uid}` удалён", parse_mode="Markdown")
|
||
else:
|
||
await update.message.reply_text(f"⚠️ Пользователь `{uid}` не найден", parse_mode="Markdown")
|
||
except ValueError:
|
||
await update.message.reply_text("❌ Неверный формат ID")
|
||
|
||
# Сброс состояния
|
||
state.waiting_for_input = False
|
||
state.input_type = None
|
||
|
||
|
||
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
"""Обработка текстовых сообщений как CLI команд."""
|
||
user_id = update.effective_user.id
|
||
text = update.message.text.strip()
|
||
state = state_manager.get(user_id)
|
||
|
||
# Проверка: не в режиме настройки ли мы
|
||
if state.waiting_for_input:
|
||
await handle_settings_input(update, context, text)
|
||
return
|
||
|
||
# Любое текстовое сообщение = CLI команда
|
||
logger.info(f"Пользователь {user_id} отправил команду: {text}")
|
||
|
||
await update.message.reply_text(
|
||
f"⏳ *Выполнение...*\n\n`{text}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
await execute_cli_command_from_message(update, text)
|
||
|
||
|
||
async def execute_cli_command_from_message(update: Update, command: str):
|
||
"""Выполнение CLI команды из сообщения."""
|
||
user_id = update.effective_user.id
|
||
state = state_manager.get(user_id)
|
||
|
||
# Определяем рабочую директорию: сначала пользовательская, потом из конфига
|
||
working_dir = state.working_directory or config.get("working_directory", str(Path.home()))
|
||
|
||
# Обработка команды cd - меняем директорию пользователя
|
||
# Работает только с простыми командами cd, не с составными
|
||
cmd_stripped = command.strip()
|
||
if cmd_stripped.startswith("cd ") and "&&" not in cmd_stripped and ";" not in cmd_stripped and "|" not in cmd_stripped:
|
||
parts = cmd_stripped.split(maxsplit=1)
|
||
if len(parts) == 2:
|
||
target_dir = parts[1]
|
||
|
||
# Обработка ~ и относительных путей
|
||
if target_dir.startswith("~"):
|
||
target_dir = str(Path.home()) + target_dir[1:]
|
||
elif not target_dir.startswith("/"):
|
||
target_dir = str(Path(working_dir) / target_dir)
|
||
|
||
# Проверка существования директории
|
||
if Path(target_dir).is_dir():
|
||
state.working_directory = target_dir
|
||
await update.message.reply_text(
|
||
f"📁 *Директория изменена:*\n`{target_dir}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
else:
|
||
await update.message.reply_text(
|
||
f"❌ *Директория не найдена:*\n`{target_dir}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
return
|
||
|
||
# Проверка на составную команду с cd - выполняем и сохраняем конечную директорию
|
||
if "cd " in cmd_stripped and ("&&" in cmd_stripped or ";" in cmd_stripped):
|
||
# Добавляем pwd в конец для получения конечной директории
|
||
command_with_pwd = f"{cmd_stripped} && pwd"
|
||
logger.info(f"Выполнение составной команды с cd: {command_with_pwd} в директории: {working_dir}")
|
||
|
||
await update.message.reply_text(
|
||
f"⏳ *Выполнение...*\n\n`{cmd_stripped}`",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
try:
|
||
process = await asyncio.create_subprocess_shell(
|
||
command_with_pwd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
cwd=working_dir
|
||
)
|
||
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(),
|
||
timeout=30
|
||
)
|
||
|
||
output = stdout.decode("utf-8", errors="replace").strip()
|
||
error = stderr.decode("utf-8", errors="replace")
|
||
|
||
# Последняя строка - это pwd, сохраняем её
|
||
if output and process.returncode == 0:
|
||
lines = output.split('\n')
|
||
final_dir = lines[-1].strip()
|
||
# Проверяем, что это действительно путь
|
||
if Path(final_dir).is_dir():
|
||
state.working_directory = final_dir
|
||
# Убираем pwd из вывода
|
||
output = '\n'.join(lines[:-1])
|
||
|
||
result = f"✅ *Результат:*\n\n"
|
||
result += f"```\n{cmd_stripped}\n```\n\n"
|
||
|
||
if output:
|
||
if len(output) > 4000:
|
||
output = output[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Вывод:*\n```\n{output}\n```\n"
|
||
|
||
if error:
|
||
if len(error) > 4000:
|
||
error = error[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Ошибки:*\n```\n{error}\n```\n"
|
||
|
||
result += f"\n*Код возврата:* `{process.returncode}`"
|
||
|
||
await update.message.reply_text(result, parse_mode="Markdown")
|
||
|
||
except asyncio.TimeoutError:
|
||
await update.message.reply_text(
|
||
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
|
||
parse_mode="Markdown"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Ошибка выполнения команды: {e}")
|
||
await update.message.reply_text(
|
||
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
|
||
parse_mode="Markdown"
|
||
)
|
||
return
|
||
|
||
logger.info(f"Выполнение команды: {command} в директории: {working_dir}")
|
||
|
||
try:
|
||
process = await asyncio.create_subprocess_shell(
|
||
command,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
cwd=working_dir
|
||
)
|
||
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(),
|
||
timeout=30
|
||
)
|
||
|
||
output = stdout.decode("utf-8", errors="replace")
|
||
error = stderr.decode("utf-8", errors="replace")
|
||
|
||
result = f"✅ *Результат:*\n\n"
|
||
result += f"```\n{command}\n```\n\n"
|
||
|
||
if output:
|
||
if len(output) > 4000:
|
||
output = output[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Вывод:*\n```\n{output}\n```\n"
|
||
|
||
if error:
|
||
if len(error) > 4000:
|
||
error = error[:4000] + "\n... (вывод обрезан)"
|
||
result += f"*Ошибки:*\n```\n{error}\n```\n"
|
||
|
||
result += f"\n*Код возврата:* `{process.returncode}`"
|
||
|
||
await update.message.reply_text(result, parse_mode="Markdown")
|
||
|
||
except asyncio.TimeoutError:
|
||
await update.message.reply_text(
|
||
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
|
||
parse_mode="Markdown"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Ошибка выполнения команды: {e}")
|
||
await update.message.reply_text(
|
||
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
|
||
async def post_init(application: Application):
|
||
"""Инициализация после запуска бота."""
|
||
# Установка команд бота
|
||
commands = [
|
||
BotCommand("start", "Запустить бота"),
|
||
BotCommand("menu", "Главное меню с кнопками"),
|
||
BotCommand("help", "Справка"),
|
||
BotCommand("settings", "Настройки"),
|
||
]
|
||
await application.bot.set_my_commands(commands)
|
||
|
||
# Установка имени и описания
|
||
await application.bot.set_my_name(config.name)
|
||
await application.bot.set_my_description(config.description)
|
||
|
||
logger.info("Бот инициализирован")
|
||
|
||
|
||
def main():
|
||
"""Точка входа."""
|
||
# Проверка токена: сначала переменная окружения, потом конфиг
|
||
token = os.getenv("TELEGRAM_BOT_TOKEN")
|
||
|
||
if not token and CONFIG_FILE.exists():
|
||
try:
|
||
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
|
||
config_data = json.load(f)
|
||
token = config_data.get("bot_token")
|
||
if token:
|
||
logger.info("Токен получен из конфигурации")
|
||
except Exception as e:
|
||
logger.warning(f"Не удалось прочитать токен из конфига: {e}")
|
||
|
||
if not token:
|
||
print("❌ Ошибка: не установлен TELEGRAM_BOT_TOKEN")
|
||
print("Задайте переменную окружения:")
|
||
print(" export TELEGRAM_BOT_TOKEN='your_token_here'")
|
||
print("Или запустите ./run.sh для интерактивной настройки")
|
||
sys.exit(1)
|
||
|
||
# Инициализация меню
|
||
init_menus()
|
||
|
||
# Создание приложения
|
||
application = Application.builder().token(token).post_init(post_init).build()
|
||
|
||
# Регистрация хендлеров
|
||
application.add_handler(CommandHandler("start", start_command))
|
||
application.add_handler(CommandHandler("help", help_command))
|
||
application.add_handler(CommandHandler("settings", settings_command))
|
||
application.add_handler(CommandHandler("menu", menu_command))
|
||
application.add_handler(CallbackQueryHandler(menu_callback))
|
||
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
|
||
|
||
# Запуск
|
||
logger.info("Запуск бота...")
|
||
print(f"🤖 {config.name} запущен!")
|
||
print(f"📝 Описание: {config.description}")
|
||
print(f"🎨 Иконка: {config.icon}")
|
||
print("\nОстановка: Ctrl+C")
|
||
|
||
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|