telegram-cli-bot/bot.py

737 lines
28 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Telegram CLI Bot - бот для выполнения CLI команд с многоуровневым меню.
Легкое добавление новых команд через регистрацию хендлеров.
"""
import os
import sys
import json
import asyncio
import subprocess
import logging
from pathlib import Path
from typing import Optional, Callable, Dict, Any, List
from dataclasses import dataclass, field
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
MessageHandler,
ContextTypes,
filters,
)
# --- Конфигурация ---
BASE_DIR = Path(__file__).parent
CONFIG_FILE = BASE_DIR / "bot_config.json"
COMMANDS_FILE = BASE_DIR / "commands.yaml"
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
handlers=[
logging.FileHandler(BASE_DIR / "bot.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# --- Хранилище состояний пользователя ---
@dataclass
class UserState:
"""Состояние пользователя в диалоге."""
current_menu: str = "main"
waiting_for_input: bool = False
input_type: Optional[str] = None # "name", "description", "icon", "command"
parent_menu: Optional[str] = None
context: Dict[str, Any] = field(default_factory=dict)
class StateManager:
"""Управление состояниями пользователей."""
def __init__(self):
self._states: Dict[int, UserState] = {}
def get(self, user_id: int) -> UserState:
if user_id not in self._states:
self._states[user_id] = UserState()
return self._states[user_id]
def reset(self, user_id: int):
self._states[user_id] = UserState()
# --- Конфигурация бота ---
class BotConfig:
"""Конфигурация бота с сохранением в JSON."""
DEFAULT_CONFIG = {
"bot_name": "CLI Assistant",
"bot_description": "Бот для выполнения CLI команд",
"bot_icon_emoji": "🤖",
"allowed_users": [], # пустой список = все разрешены
"require_confirmation": True,
"working_directory": str(Path.home()),
}
def __init__(self, config_file: Path = CONFIG_FILE):
self.config_file = config_file
self._config = self._load()
def _load(self) -> dict:
if self.config_file.exists():
with open(self.config_file, "r", encoding="utf-8") as f:
return json.load(f)
return self.DEFAULT_CONFIG.copy()
def _save(self):
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(self._config, f, indent=2, ensure_ascii=False)
def get(self, key: str, default=None):
return self._config.get(key, self.DEFAULT_CONFIG.get(key, default))
def set(self, key: str, value):
self._config[key] = value
self._save()
@property
def name(self) -> str:
return self._config.get("bot_name", self.DEFAULT_CONFIG["bot_name"])
@property
def description(self) -> str:
return self._config.get("bot_description", self.DEFAULT_CONFIG["bot_description"])
@property
def icon(self) -> str:
return self._config.get("bot_icon_emoji", self.DEFAULT_CONFIG["bot_icon_emoji"])
# --- Система команд ---
@dataclass
class MenuItem:
"""Элемент меню."""
label: str
callback: str # callback_data для кнопки
description: str = ""
icon: str = ""
children: List["MenuItem"] = field(default_factory=list)
command: Optional[str] = None # CLI команда для выполнения
is_command: bool = False
class MenuBuilder:
"""Построитель многоуровневого меню."""
def __init__(self):
self._menus: Dict[str, List[MenuItem]] = {}
def add_menu(self, menu_name: str, items: List[MenuItem]):
self._menus[menu_name] = items
def get_menu(self, menu_name: str) -> List[MenuItem]:
return self._menus.get(menu_name, [])
def get_keyboard(self, menu_name: str) -> InlineKeyboardMarkup:
"""Создает InlineKeyboard для меню."""
items = self._menus.get(menu_name, [])
keyboard = []
for item in items:
icon = item.icon + " " if item.icon else ""
button = InlineKeyboardButton(
f"{icon}{item.label}",
callback_data=item.callback
)
keyboard.append([button])
return InlineKeyboardMarkup(keyboard)
class CommandRegistry:
"""Реестр команд для легкого добавления."""
def __init__(self):
self._commands: Dict[str, Callable] = {}
def register(self, name: str):
"""Декоратор для регистрации команды."""
def decorator(func: Callable):
self._commands[name] = func
return func
return decorator
def get(self, name: str) -> Optional[Callable]:
return self._commands.get(name)
def list_commands(self) -> List[str]:
return list(self._commands.keys())
# --- Глобальные объекты ---
config = BotConfig()
state_manager = StateManager()
menu_builder = MenuBuilder()
command_registry = CommandRegistry()
# --- Инициализация меню ---
def init_menus():
"""Инициализация структуры меню."""
# Главное меню
main_menu = [
MenuItem("🖥️ Выполнить команду", "exec_cmd", icon="🖥️", is_command=True),
MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"),
MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"),
MenuItem(" О боте", "about", icon=""),
]
menu_builder.add_menu("main", main_menu)
# Меню предустановленных команд
preset_menu = [
MenuItem("📁 Файловая система", "fs_menu", icon="📁"),
MenuItem("🔍 Поиск", "search_menu", icon="🔍"),
MenuItem("📊 Система", "system_menu", icon="📊"),
MenuItem("🌐 Сеть", "network_menu", icon="🌐"),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("preset", preset_menu)
# Файловая система
fs_menu = [
MenuItem("ls -la", "cmd_ls_la", command="ls -la", icon="📄"),
MenuItem("pwd", "cmd_pwd", command="pwd", icon="📍"),
MenuItem("df -h", "cmd_df", command="df -h", icon="💾"),
MenuItem("du -sh *", "cmd_du", command="du -sh * 2>/dev/null | sort -hr | head -20", icon="📊"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("fs", fs_menu)
# Поиск
search_menu = [
MenuItem("find . -name", "cmd_find_name", command="find . -maxdepth 3 -name '*.txt' 2>/dev/null", icon="🔎"),
MenuItem("grep пример", "cmd_grep", command="grep -r 'example' . 2>/dev/null | head -20", icon="🔍"),
MenuItem("which command", "cmd_which", command="which python3 bash git", icon="📍"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("search", search_menu)
# Система
system_menu = [
MenuItem("top -n 1", "cmd_top", command="top -bn1 | head -20", icon="📈"),
MenuItem("ps aux", "cmd_ps", command="ps aux | head -20", icon="🔄"),
MenuItem("free -h", "cmd_free", command="free -h", icon="💾"),
MenuItem("uname -a", "cmd_uname", command="uname -a", icon=""),
MenuItem("uptime", "cmd_uptime", command="uptime", icon="⏱️"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("system", system_menu)
# Сеть
network_menu = [
MenuItem("ip addr", "cmd_ip", command="ip addr 2>/dev/null || ifconfig 2>/dev/null", icon="🌐"),
MenuItem("ping google", "cmd_ping", command="ping -c 4 google.com 2>&1 | head -10", icon="📡"),
MenuItem("netstat", "cmd_netstat", command="ss -tuln 2>/dev/null || netstat -tuln 2>/dev/null | head -20", icon="🔌"),
MenuItem("curl ifconfig.me", "cmd_curl_ip", command="curl -s ifconfig.me 2>&1", icon="📍"),
MenuItem("⬅️ Назад", "preset", icon="⬅️"),
]
menu_builder.add_menu("network", network_menu)
# Настройки
settings_menu = [
MenuItem("📝 Изменить имя бота", "set_name", icon="📝"),
MenuItem("📄 Изменить описание", "set_description", icon="📄"),
MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"),
MenuItem("👥 Управление доступом", "access_menu", icon="👥"),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("settings", settings_menu)
# Доступ
access_menu = [
MenuItem("📋 Показать разрешённых", "show_access", icon="📋"),
MenuItem(" Добавить пользователя", "add_access", icon=""),
MenuItem(" Удалить пользователя", "remove_access", icon=""),
MenuItem("⬅️ Назад", "settings", icon="⬅️"),
]
menu_builder.add_menu("access", access_menu)
# --- Хендлеры ---
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /start."""
user = update.effective_user
logger.info(f"Пользователь {user.username} ({user.id}) запустил бота")
state_manager.reset(user.id)
await update.message.reply_text(
f"👋 Привет, {user.first_name}!\n\n"
f"{config.icon} *{config.name}*\n"
f"_{config.description}_\n\n"
f"Используйте меню для навигации или команду /help для справки.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /help."""
help_text = f"""
📖 *Справка по боту {config.name}*
*Основные возможности:*
• Выполнение CLI команд на вашем ПК
• Предустановленные команды в меню
• Гибкая настройка бота
*Команды:*
/start - Запустить бота
/help - Эта справка
/settings - Настройки бота
/commands - Список доступных команд
*Безопасность:*
Команды выполняются от имени пользователя.
Будьте осторожны с деструктивными командами!
"""
await update.message.reply_text(help_text, parse_mode="Markdown")
async def settings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /settings."""
state = state_manager.get(update.effective_user.id)
state.current_menu = "settings"
await update.message.reply_text(
"⚙️ *Настройки бота*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка нажатий на кнопки меню."""
query = update.callback_query
user_id = query.from_user.id
state = state_manager.get(user_id)
await query.answer()
callback = query.data
logger.info(f"Callback: {callback} от пользователя {user_id}")
# Обработка навигации
if callback == "main":
state.current_menu = "main"
await query.edit_message_text(
"🏠 *Главное меню*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
elif callback == "preset_menu":
state.current_menu = "preset"
await query.edit_message_text(
"📋 *Предустановленные команды*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("preset")
)
elif callback == "fs_menu":
await query.edit_message_text(
"📁 *Файловая система*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("fs")
)
elif callback == "search_menu":
await query.edit_message_text(
"🔍 *Поиск*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("search")
)
elif callback == "system_menu":
await query.edit_message_text(
"📊 *Система*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("system")
)
elif callback == "network_menu":
await query.edit_message_text(
"🌐 *Сеть*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("network")
)
elif callback == "settings_menu":
state.current_menu = "settings"
await query.edit_message_text(
"⚙️ *Настройки бота*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "access_menu":
await query.edit_message_text(
"👥 *Управление доступом*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("access")
)
# Обработка команд выполнения
elif callback.startswith("cmd_"):
# Поиск команды в меню
command = None
for menu_items in menu_builder._menus.values():
for item in menu_items:
if item.callback == callback and item.command:
command = item.command
break
if command:
await execute_cli_command(query, command)
else:
await query.edit_message_text("❌ Команда не найдена")
# Настройки бота
elif callback == "set_name":
state.waiting_for_input = True
state.input_type = "name"
await query.edit_message_text(
"📝 *Изменение имени бота*\n\n"
f"Текущее имя: `{config.name}`\n\n"
"Отправьте новое имя бота:",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "set_description":
state.waiting_for_input = True
state.input_type = "description"
await query.edit_message_text(
"📄 *Изменение описания бота*\n\n"
f"Текущее описание: `{config.description}`\n\n"
"Отправьте новое описание:",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "set_icon":
state.waiting_for_input = True
state.input_type = "icon"
await query.edit_message_text(
"🎨 *Изменение иконки бота*\n\n"
f"Текущая иконка: `{config.icon}`\n\n"
"Отправьте новый emoji (один символ):",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
elif callback == "show_access":
allowed = config.get("allowed_users", [])
if allowed:
text = "👥 *Разрешённые пользователи:*\n" + "\n".join(f"• `{uid}`" for uid in allowed)
else:
text = "👥 *Доступ открыт для всех*\n\n(список разрешённых пользователей пуст)"
await query.edit_message_text(text, parse_mode="Markdown")
elif callback == "add_access":
state.waiting_for_input = True
state.input_type = "add_access"
await query.edit_message_text(
" *Добавление пользователя*\n\n"
"Отправьте ID пользователя Telegram:\n"
"(можно получить через @userinfobot)",
parse_mode="Markdown"
)
elif callback == "remove_access":
state.waiting_for_input = True
state.input_type = "remove_access"
allowed = config.get("allowed_users", [])
if allowed:
text = " *Удаление пользователя*\n\n" + "\n".join(f"• `{uid}`" for uid in allowed)
text += "\n\nОтправьте ID для удаления:"
else:
text = " Список пуст, некого удалять"
await query.edit_message_text(text, parse_mode="Markdown")
elif callback == "about":
await query.edit_message_text(
f" *О боте*\n\n"
f"*{config.icon} {config.name}*\n"
f"_{config.description}_\n\n"
f"Версия: 1.0.0\n"
f"Рабочая директория: `{config.get('working_directory')}`\n\n"
f"Бот позволяет выполнять CLI команды на вашем ПК\n"
f"через интерфейс Telegram.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
state.current_menu = "main"
elif callback == "exec_cmd":
state.waiting_for_input = True
state.input_type = "command"
await query.edit_message_text(
"🖥️ *Выполнение команды*\n\n"
"Отправьте команду для выполнения:\n\n"
"⚠️ _Будьте осторожны с деструктивными командами!_",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main")
)
async def execute_cli_command(query, command: str):
"""Выполнение CLI команды."""
working_dir = config.get("working_directory", str(Path.home()))
logger.info(f"Выполнение команды: {command}")
await query.edit_message_text(
f"⏳ *Выполнение...*\n\n`{command}`",
parse_mode="Markdown"
)
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=working_dir
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=30
)
output = stdout.decode("utf-8", errors="replace")
error = stderr.decode("utf-8", errors="replace")
result = f"✅ *Результат:*\n\n"
result += f"```\n{command}\n```\n\n"
if output:
# Ограничиваем вывод
if len(output) > 4000:
output = output[:4000] + "\n... (вывод обрезан)"
result += f"*Вывод:*\n```\n{output}\n```\n"
if error:
if len(error) > 4000:
error = error[:4000] + "\n... (вывод обрезан)"
result += f"*Ошибки:*\n```\n{error}\n```\n"
result += f"\n*Код возврата:* `{process.returncode}`"
await query.edit_message_text(result, parse_mode="Markdown")
except asyncio.TimeoutError:
await query.edit_message_text(
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
parse_mode="Markdown"
)
except Exception as e:
logger.error(f"Ошибка выполнения команды: {e}")
await query.edit_message_text(
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
parse_mode="Markdown"
)
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка текстовых сообщений (для ввода настроек и команд)."""
user_id = update.effective_user.id
state = state_manager.get(user_id)
text = update.message.text.strip()
if not state.waiting_for_input:
return
input_type = state.input_type
if input_type == "name":
config.set("bot_name", text)
await update.message.reply_text(
f"✅ Имя бота изменено на: `{text}`",
parse_mode="Markdown"
)
elif input_type == "description":
config.set("bot_description", text)
await update.message.reply_text(
f"✅ Описание изменено на: `{text}`",
parse_mode="Markdown"
)
elif input_type == "icon":
config.set("bot_icon_emoji", text[0] if text else "🤖")
await update.message.reply_text(
f"✅ Иконка изменена на: `{text[0] if text else '🤖'}`",
parse_mode="Markdown"
)
elif input_type == "command":
# Выполнение произвольной команды
await update.message.reply_text(
f"⏳ *Выполнение...*\n\n`{text}`",
parse_mode="Markdown"
)
await execute_cli_command_from_message(update, text)
elif input_type == "add_access":
try:
uid = int(text)
allowed = config.get("allowed_users", [])
if uid not in allowed:
allowed.append(uid)
config.set("allowed_users", allowed)
await update.message.reply_text(f"✅ Пользователь `{uid}` добавлен", parse_mode="Markdown")
else:
await update.message.reply_text(f"⚠️ Пользователь `{uid}` уже в списке", parse_mode="Markdown")
except ValueError:
await update.message.reply_text("❌ Неверный формат ID")
elif input_type == "remove_access":
try:
uid = int(text)
allowed = config.get("allowed_users", [])
if uid in allowed:
allowed.remove(uid)
config.set("allowed_users", allowed)
await update.message.reply_text(f"✅ Пользователь `{uid}` удалён", parse_mode="Markdown")
else:
await update.message.reply_text(f"⚠️ Пользователь `{uid}` не найден", parse_mode="Markdown")
except ValueError:
await update.message.reply_text("❌ Неверный формат ID")
# Сброс состояния
state.waiting_for_input = False
state.input_type = None
async def execute_cli_command_from_message(update: Update, command: str):
"""Выполнение CLI команды из сообщения."""
working_dir = config.get("working_directory", str(Path.home()))
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=working_dir
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=30
)
output = stdout.decode("utf-8", errors="replace")
error = stderr.decode("utf-8", errors="replace")
result = f"✅ *Результат:*\n\n"
result += f"```\n{command}\n```\n\n"
if output:
if len(output) > 4000:
output = output[:4000] + "\n... (вывод обрезан)"
result += f"*Вывод:*\n```\n{output}\n```\n"
if error:
if len(error) > 4000:
error = error[:4000] + "\n... (вывод обрезан)"
result += f"*Ошибки:*\n```\n{error}\n```\n"
result += f"\n*Код возврата:* `{process.returncode}`"
await update.message.reply_text(result, parse_mode="Markdown")
except asyncio.TimeoutError:
await update.message.reply_text(
"❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд и была прервана.",
parse_mode="Markdown"
)
except Exception as e:
logger.error(f"Ошибка выполнения команды: {e}")
await update.message.reply_text(
f"❌ *Ошибка:*\n```\n{str(e)}\n```",
parse_mode="Markdown"
)
async def post_init(application: Application):
"""Инициализация после запуска бота."""
# Установка команд бота
commands = [
BotCommand("start", "Запустить бота"),
BotCommand("help", "Справка"),
BotCommand("settings", "Настройки"),
]
await application.bot.set_my_commands(commands)
# Установка имени и описания
await application.bot.set_my_name(config.name)
await application.bot.set_my_description(config.description)
logger.info("Бот инициализирован")
def main():
"""Точка входа."""
# Проверка токена: сначала переменная окружения, потом конфиг
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token and CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
config_data = json.load(f)
token = config_data.get("bot_token")
if token:
logger.info("Токен получен из конфигурации")
except Exception as e:
logger.warning(f"Не удалось прочитать токен из конфига: {e}")
if not token:
print("❌ Ошибка: не установлен TELEGRAM_BOT_TOKEN")
print("Задайте переменную окружения:")
print(" export TELEGRAM_BOT_TOKEN='your_token_here'")
print("Или запустите ./run.sh для интерактивной настройки")
sys.exit(1)
# Инициализация меню
init_menus()
# Создание приложения
application = Application.builder().token(token).post_init(post_init).build()
# Регистрация хендлеров
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("settings", settings_command))
application.add_handler(CallbackQueryHandler(menu_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
# Запуск
logger.info("Запуск бота...")
print(f"🤖 {config.name} запущен!")
print(f"📝 Описание: {config.description}")
print(f"🎨 Иконка: {config.icon}")
print("\nОстановка: Ctrl+C")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()