telegram-cli-bot/bot/handlers/commands.py

529 lines
23 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Обработчики команд бота (/start, /menu, /help, /settings, /cron)."""
import logging
from datetime import datetime
from telegram import Update
from telegram.ext import ContextTypes
# Импорты из модулей bot/
from bot.config import config, state_manager, server_manager, menu_builder
from bot.utils.decorators import check_access
from bot.utils.formatters import escape_markdown
from bot.tools import tools_registry
from bot.ai_agent import ai_agent
logger = logging.getLogger(__name__)
@check_access
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /start."""
user = update.effective_user
logger.info(f"Пользователь {user.username} ({user.id}) запустил бота")
# Сбрасываем состояние НО сохраняем ai_chat_mode (по умолчанию True)
state = state_manager.get(user.id)
state_manager.reset(user.id)
state = state_manager.get(user.id)
# ai_chat_mode уже True по умолчанию из UserState
# Показать текущую директорию и сервер
working_dir = config.working_directory
server = server_manager.get("local")
server_desc = server.description if server else "localhost"
await update.message.reply_text(
f"👋 Привет, {user.first_name}!\n\n"
f"{config.icon} *{config.name}*\n"
f"_{config.description}_\n\n"
f"*Просто отправьте CLI команду в чат* — я её выполню!\n\n"
f"🖥️ *Текущий сервер:* `{server_desc}`\n"
f"📁 *Рабочая директория:* `{working_dir}`\n\n"
f"Используйте `cd путь` для смены директории.\n"
f"Или выберите сервер в меню.\n"
f"Команда /help покажет справку.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main", user_id=update.effective_user.id, state=state)
)
@check_access
async def menu_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /menu - показывает главное меню."""
user = update.effective_user
state = state_manager.get(user.id)
# Не сбрасываем состояние - сохраняем ai_chat_mode и другие настройки
state.current_menu = "main"
# Показать текущую директорию и сервер
working_dir = state.working_directory or config.working_directory
server = server_manager.get(state.current_server)
server_desc = server.description if server else state.current_server
await update.message.reply_text(
f"🏠 *Главное меню*\n\n"
f"🖥️ *Сервер:* `{server_desc}`\n"
f"📁 *Директория:* `{working_dir}`\n\n"
f"Выберите действие:",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main", user_id=update.effective_user.id, state=state)
)
@check_access
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /help."""
help_text = f"""
📖 *Справка по боту {config.name}*
*Как использовать:*
Просто отправьте любую CLI команду в чат — бот выполнит её!
*Примеры:*
• `ls -la` — список файлов
• `pwd` — текущая директория
• `df -h` — свободное место на диске
• `git status` — статус git
*Навигация по директориям:*
• `cd путь` — сменить директорию (например, `cd git/project`)
• `cd ..` — на уровень вверх
• `cd ~` — в домашнюю директорию
• `pwd` — показать текущую директорию
*Кнопки меню:*
• 📋 Предустановленные команды — быстрые команды по категориям
• ⚙️ Настройки бота — изменение имени, описания, иконки
О боте — информация
*Команды управления:*
/start — Запустить бота, главное меню
/menu — Показать главное меню с кнопками
/help — Эта справка
/settings — Настройки
*Безопасность:*
Команды выполняются от вашего имени.
Будьте осторожны с деструктивными командами!
"""
await update.message.reply_text(help_text, parse_mode="Markdown")
@check_access
async def settings_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /settings."""
state = state_manager.get(update.effective_user.id)
state.current_menu = "settings"
await update.message.reply_text(
"⚙️ *Настройки бота*",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("settings")
)
@check_access
async def cron_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Обработка команды /cron - управление задачами.
Использование:
/cron list - показать все задачи
/cron add <name> <schedule> <prompt> - добавить задачу
/cron run <id> - выполнить задачу немедленно
/cron remove <id> - удалить задачу
/cron toggle <id> - включить/выключить задачу
"""
user_id = update.effective_user.id
args = context.args
# Получаем cron инструмент
cron_tool = tools_registry.get('cron_tool')
if not cron_tool:
await update.message.reply_text("❌ Ошибка: cron инструмент не найден")
return
# Парсим команду
if not args:
# По умолчанию показываем список задач
action = 'list'
else:
action = args[0].lower()
try:
if action == 'list':
# Показать все задачи пользователя
result = await cron_tool.execute(action='list', user_id=user_id)
if result.success and result.data:
output = "⏰ **Ваши задачи:**\n\n"
for job in result.data:
status = "" if job.get('enabled') else ""
notify_icon = "🔔" if job.get('notify') else "🔕"
log_icon = "📝" if job.get('log_results') else "🚫"
output += f"{status} **{job.get('name', 'Без названия')}** (ID: {job.get('id')})\n"
output += f" {notify_icon}{log_icon} Промпт: _{job.get('prompt', '')[:100]}_{'...' if len(job.get('prompt', '')) > 100 else ''}\n"
output += f" Расписание: `{job.get('schedule', '')}`\n"
if job.get('next_run'):
output += f" Следующий запуск: {job.get('next_run')}\n"
if job.get('last_run'):
output += f" Последний запуск: {job.get('last_run')}\n"
output += "\n"
if not output.strip():
output = "📭 У вас пока нет задач.\n\nДобавьте задачу командой:\n`/cron add <name> <schedule> <prompt>`"
await update.message.reply_text(output, parse_mode="Markdown")
else:
await update.message.reply_text("📭 У вас пока нет задач.")
elif action == 'add':
if len(args) < 4:
await update.message.reply_text(
"❌ **Недостаточно аргументов**\n\n"
"**Использование:**\n"
"`/cron add <name> <schedule> <prompt> [notify] [log]`\n\n"
"**Примеры:**\n"
"`/cron check_disk Ежедневно проверять диск на сервере`\n"
"`/cron news hourly Что нового в Linux сегодня`\n\n"
"**Расписание:**\n"
"• `@hourly` - каждый час\n"
"• `@daily` - каждый день\n"
"• `@weekly` - каждую неделю\n"
"• `*/5 * * * *` - каждые 5 минут",
parse_mode="Markdown"
)
return
name = args[1]
schedule = args[2]
# Промпт может содержать пробелы - берём всё после schedule
prompt = ' '.join(args[3:])
# Парсим опциональные параметры
notify = 'notify' in prompt.lower()
log_results = 'no_log' not in prompt.lower() and 'без_лога' not in prompt.lower()
result = await cron_tool.execute(
action='add',
name=name,
prompt=prompt,
schedule=schedule,
user_id=user_id,
notify=notify,
log_results=log_results
)
if result.success:
notify_status = "🔔 Уведомлять" if notify else "🔕 Без уведомлений"
log_status = "📝 Логировать" if log_results else "🚫 Без логов"
await update.message.reply_text(
f"✅ **Задача добавлена:**\n"
f"• ID: {result.data.get('id')}\n"
f"• Название: {name}\n"
f"• Промпт: _{prompt}_\n"
f"• Расписание: `{schedule}`\n"
f"{notify_status}, {log_status}\n"
f"• Следующий запуск: {result.data.get('next_run', 'N/A')}",
parse_mode="Markdown"
)
else:
await update.message.reply_text(f"❌ Ошибка: {result.error}")
elif action == 'run':
if len(args) < 2:
await update.message.reply_text("❌ Укажите ID задачи: `/cron run <id>`")
return
try:
job_id = int(args[1])
except ValueError:
await update.message.reply_text("❌ ID должен быть числом")
return
status_msg = await update.message.reply_text("⏳ Выполняю задачу...")
# Выполняем задачу через AI-агент
result = await cron_tool.execute(
action='run',
job_id=job_id,
ai_agent=ai_agent,
user_id=user_id
)
await status_msg.delete()
if result.success:
result_text = result.metadata.get('result_text', 'Задача выполнена')
tool_used = result.data.get('tool_used', 'не указан')
await update.message.reply_text(
f"✅ **Задача выполнена:**\n\n{result_text}\n\n🔧 Инструмент: {tool_used}",
parse_mode="Markdown"
)
else:
await update.message.reply_text(f"❌ Ошибка: {result.error}")
elif action == 'remove':
if len(args) < 2:
await update.message.reply_text("❌ Укажите ID задачи: `/cron remove <id>`")
return
try:
job_id = int(args[1])
except ValueError:
await update.message.reply_text("❌ ID должен быть числом")
return
result = await cron_tool.execute(action='remove', job_id=job_id)
if result.success:
await update.message.reply_text(f"✅ Задача удалена: ID {job_id}")
else:
await update.message.reply_text(f"❌ Ошибка: {result.error}")
elif action == 'toggle':
if len(args) < 2:
await update.message.reply_text("❌ Укажите ID задачи: `/cron toggle <id>`")
return
try:
job_id = int(args[1])
except ValueError:
await update.message.reply_text("❌ ID должен быть числом")
return
# Получаем текущее состояние задачи
list_result = await cron_tool.execute(action='list', user_id=user_id)
current_state = True
for job in list_result.data:
if job['id'] == job_id:
current_state = job.get('enabled', True)
break
new_state = not current_state
result = await cron_tool.execute(action='toggle', job_id=job_id, enabled=new_state)
if result.success:
state_text = "включена" if new_state else "выключена"
await update.message.reply_text(f"✅ Задача ID {job_id} {state_text}")
else:
await update.message.reply_text(f"❌ Ошибка: {result.error}")
else:
await update.message.reply_text(
"❌ Неизвестная команда.\n\n"
"**Доступные команды:**\n"
"• `/cron list` - показать все задачи\n"
"• `/cron add <name> <schedule> <prompt>` - добавить задачу\n"
"• `/cron run <id>` - выполнить задачу\n"
"• `/cron remove <id>` - удалить задачу\n"
"• `/cron toggle <id>` - включить/выключить задачу",
parse_mode="Markdown"
)
except Exception as e:
logger.exception(f"Ошибка в команде /cron: {e}")
await update.message.reply_text(f"❌ Ошибка: {e}")
@check_access
async def rss_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Обработка команды /rss - показывает последние 15 новостей с AI-переводом.
Формат:
Заголовок (переведенный ИИ)
Описание (переведенный ИИ)
Ссылка на полную новость
"""
user_id = update.effective_user.id
# Отправляем статус
status_msg = await update.message.reply_text("📰 **Загрузка новостей...**\n\nолучение лент и перевод заголовков_")
try:
# Получаем rss_tool
rss_tool = tools_registry.get('rss_tool')
if not rss_tool:
await status_msg.delete()
await update.message.reply_text("❌ Ошибка: RSS инструмент не найден")
return
# Сначала обновляем ленты
fetch_result = await rss_tool.execute(action='fetch')
if fetch_result.success:
logger.info(f"Получено {fetch_result.data.get('total_new_items', 0)} новых элементов")
# Получаем последние 15 новостей (undigested_only=False чтобы все новости)
news_result = await rss_tool.execute(action='list', limit=15, undigested_only=False)
if not news_result.success or not news_result.data:
await status_msg.delete()
await update.message.reply_text("📭 **Нет новостей**\n\nока нет новостей в базах данных._\nобавьте RSS ленту через AI: \"добавь RSS ленту <url>\"_")
return
news_list = news_result.data
# Помечаем новости как прочитанные
for item in news_list:
news_id = item.get('id')
if news_id:
await rss_tool.mark_digest(news_id)
# Формируем вывод с AI-переводом заголовков и описаний
output = "📰 **Последние новости:**\n\n"
for i, item in enumerate(news_list, 1):
title = item.get('title', 'Без названия')
link = item.get('link', '')
pub_date = item.get('pub_date', '')
description = item.get('description', '')
# Переводим заголовок через AI
translated_title = await _translate_text(title, max_length=120)
# Экранируем специальные символы Markdown
translated_title = escape_markdown(translated_title)
# Форматируем дату
date_str = ""
if pub_date:
try:
dt = datetime.strptime(pub_date[:19], '%Y-%m-%d %H:%M:%S')
date_str = dt.strftime('%d.%m.%Y %H:%M')
except:
date_str = pub_date[:16]
# Обрезаем заголовок если слишком длинный
if len(translated_title) > 120:
translated_title = translated_title[:117] + "..."
output += f"**{i}. {translated_title}**\n"
if date_str:
output += f"📅 {date_str}\n"
# Переводим описание если есть
if description:
translated_desc = await _translate_text(description, max_length=300)
# Экранируем специальные символы Markdown
translated_desc = escape_markdown(translated_desc)
if translated_desc:
output += f"{translated_desc}\n"
if link:
short_link = link[:60] + "..." if len(link) > 63 else link
output += f"🔗 {short_link}\n"
output += "\n"
await status_msg.delete()
await update.message.reply_text(output, parse_mode="Markdown")
except Exception as e:
logger.exception(f"Ошибка в команде /rss: {e}")
await status_msg.delete()
await update.message.reply_text(f"❌ **Ошибка:** {e}")
async def _translate_text(text: str, max_length: int = 300) -> str:
"""
Краткий перевод текста на русский через ИИ.
Если перевод не удался — возвращает оригинал.
"""
if not text or not text.strip():
return ""
try:
# Быстрый промпт для перевода
prompt = f"Переведи на русский язык этот текст (максимум {max_length} символов, без кавычек и пояснений, только перевод):\n{text[:400]}"
# Используем qwen_manager для перевода
from qwen_integration import qwen_manager
# Создаём временную сессию для перевода
import hashlib
temp_user_id = f"translator_{hashlib.md5(text.encode()).hexdigest()}"
result = await qwen_manager.run_task(
temp_user_id,
prompt,
on_output=lambda x: None,
on_oauth_url=lambda x: None,
use_system_prompt=False
)
# Извлекаем текст из результата
import re
text_matches = re.findall(r'"text":"([^"]+)"', result)
if text_matches:
translated = " ".join(text_matches).replace("\\n", " ").strip()
# Убираем кавычки если есть
translated = translated.strip('"\'')
if translated and len(translated) > 3:
# Экранируем специальные символы Markdown
translated = escape_markdown(translated[:max_length])
return translated
return text[:max_length]
except Exception as e:
logger.debug(f"Ошибка перевода: {e}")
return text[:max_length]
@check_access
async def ai_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /ai - переключение AI-провайдера."""
user_id = update.effective_user.id
state = state_manager.get(user_id)
# Получаем менеджер провайдеров
from bot.ai_provider_manager import get_ai_provider_manager
provider_manager = get_ai_provider_manager()
# Если нет аргументов - показываем текущий статус
if not context.args:
current_provider = provider_manager.get_current_provider(state)
providers_info = provider_manager.get_all_providers_info(current_provider)
output = "🤖 **AI-провайдеры**\n\n"
output += f"*Текущий:* "
for info in providers_info:
icon = "" if info.is_active else ""
status = "" if info.available else ""
output += f"\n{icon} **{info.name}** {status}\n"
output += f" _{info.description}_\n"
output += "\n**Использование:**\n"
output += "`/ai qwen` - переключиться на Qwen Code\n"
output += "`/ai gigachat` - переключиться на GigaChat\n"
await update.message.reply_text(output, parse_mode="Markdown")
return
# Переключаем провайдер
new_provider = context.args[0].lower()
if new_provider not in ["qwen", "gigachat"]:
await update.message.reply_text(
f"❌ Неизвестный провайдер: `{new_provider}`\n\n"
f"Доступные: `qwen`, `gigachat`",
parse_mode="Markdown"
)
return
success, message = provider_manager.switch_provider(user_id, new_provider, state_manager)
if success:
# Получаем информацию о новом провайдере
provider_info = provider_manager.get_provider_info(new_provider, is_active=True)
await update.message.reply_text(
f"{message}\n\n"
f"**{provider_info.name}**\n"
f"_{provider_info.description}_",
parse_mode="Markdown"
)
else:
await update.message.reply_text(message)