v0.7: AI Provider Manager для работы с любым AI-провайдером (Qwen, GigaChat)

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
mirivlad 2026-02-26 23:21:20 +08:00
parent 77397269e1
commit e538d84e13
11 changed files with 867 additions and 73 deletions

102
AI_PROVIDERS.md Normal file
View File

@ -0,0 +1,102 @@
# AI Provider Switching
## Обзор
Бот поддерживает переключение между AI-провайдерами:
- **Qwen Code** — основной провайдер (Alibaba)
- **GigaChat** — альтернативный провайдер (Sber)
## Использование
### Через команду `/ai`
**Просмотр текущего статуса:**
```
/ai
```
Покажет текущего провайдера и доступные опции.
**Переключение на Qwen:**
```
/ai qwen
```
**Переключение на GigaChat:**
```
/ai gigachat
```
### Через меню
1. Нажмите `/settings` или кнопку "⚙️ Настройки бота"
2. Выберите "🤖 AI-провайдер"
3. Доступные опции:
- "🔄 Переключить AI-провайдер" — переключает на альтернативный провайдер
- " Информация о провайдерах" — подробная информация о каждом провайдере
## Архитектура
### Новые файлы
- `bot/ai_provider_manager.py` — менеджер управления провайдерами
- `bot/models/user_state.py` — добавлено поле `current_ai_provider`
### Изменённые файлы
- `bot.py` — модифицирован `handle_ai_task()` для использования текущего провайдера
- `bot/handlers/commands.py` — добавлена команда `/ai`
- `bot/handlers/callbacks.py` — добавлены обработчики меню AI-провайдера
- `bot/keyboards/menus.py` — добавлено меню "🤖 AI-провайдер"
## Как это работает
1. **Хранение состояния**: Каждый пользователь имеет своё предпочтение провайдера в `UserState.current_ai_provider`
2. **Обработка запросов**: При получении AI-запроса `handle_ai_task()` проверяет текущего провайдера и использует соответствующий API:
- **Qwen**: Потоковый вывод с `on_chunk` callback
- **GigaChat**: Ответ целиком
3. **Переключение**: При переключении провайдера обновляется состояние пользователя, новый запрос сразу идёт через выбранного провайдера
## Настройка GigaChat
Для использования GigaChat добавьте в `.env`:
```env
# GigaChat API (Сбер)
GIGACHAT_CLIENT_ID=ваш-client-id-uuid
GIGACHAT_CLIENT_SECRET=ваш-client-secret
GIGACHAT_SCOPE=GIGACHAT_API_PERS
GIGACHAT_AUTH_URL=https://ngw.devices.sberbank.ru:9443/api/v2/oauth
GIGACHAT_MODEL=GigaChat-Pro
```
## Отображение в ответе
В конце каждого AI-ответа указывается используемый провайдер:
```
📊 Контекст: 0.5%
🤖 AI: Qwen Code
```
или
```
📊 Контекст: 0.5%
🤖 AI: GigaChat
```
## Приоритеты провайдеров
- **По умолчанию**: Qwen Code
- **Если GigaChat не настроен**: Переключение недоступно (показывается ошибка)
- **Инструменты**: Доступны только с Qwen (GigaChat используется только для чата)
## Будущие улучшения
- [ ] Умное переключение (автоматический выбор провайдера по типу задачи)
- [ ] Поддержка инструментов для GigaChat
- [ ] Статистика использования провайдеров
- [ ] Настройка провайдера по умолчанию

94
bot.py
View File

@ -24,6 +24,8 @@ from qwen_integration import qwen_manager, QwenSessionState
logging.getLogger("sentence_transformers").setLevel(logging.WARNING)
logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("telegram.ext").setLevel(logging.WARNING)
from vector_memory import (
hybrid_memory_manager,
@ -77,7 +79,7 @@ from bot.utils.decorators import check_access
from bot.keyboards.menus import MenuItem, init_menus
# Импорты хендлеров из модулей
from bot.handlers.commands import start_command, menu_command, help_command, settings_command, cron_command
from bot.handlers.commands import start_command, menu_command, help_command, settings_command, cron_command, rss_command, ai_command
from bot.handlers.callbacks import menu_callback
from bot.services.command_executor import execute_cli_command
@ -85,6 +87,7 @@ from bot.services.command_executor import execute_cli_command
from bot.ai_agent import ai_agent
from bot.tools import tools_registry
from bot.services.cron_scheduler import init_scheduler, get_scheduler
from bot.ai_provider_manager import init_ai_provider_manager, get_ai_provider_manager
# Глобальные менеджеры сессий
ssh_session_manager = SSHSessionManager()
@ -313,6 +316,13 @@ async def handle_ai_task(update: Update, text: str):
MAX_CONTEXT_TOKENS = 200_000
context_percent = round((context_tokens / MAX_CONTEXT_TOKENS) * 100, 1)
# Получаем текущего AI-провайдера
from bot.ai_provider_manager import get_ai_provider_manager
provider_manager = get_ai_provider_manager()
current_provider = provider_manager.get_current_provider(state)
logger.info(f"Обработка AI-запроса через провайдер: {current_provider}")
# Собираем полный промпт с системным промптом
system_prompt = qwen_manager.load_system_prompt()
@ -340,25 +350,59 @@ async def handle_ai_task(update: Update, text: str):
f"{text}"
)
# Выполняем задачу с потоковым выводом
result = await qwen_manager.run_task(
user_id, full_task, on_output, on_oauth_url,
use_system_prompt=False, on_chunk=on_chunk, on_event=on_event
)
# Выполняем задачу через текущего провайдера
if current_provider == "qwen":
# Qwen Code - потоковый вывод
result = await qwen_manager.run_task(
user_id, full_task, on_output, on_oauth_url,
use_system_prompt=False, on_chunk=on_chunk, on_event=on_event
)
# Формируем финальный результат ИЗ result_buffer (без статусов инструментов)
full_output = "".join(result_buffer).strip()
# Формируем финальный результат ИЗ result_buffer (без статусов инструментов)
full_output = "".join(result_buffer).strip()
# Если result_buffer пустой — пробуем извлечь текст из result
if not full_output:
logger.warning("result_buffer пустой, пробуем извлечь текст из result")
import re
text_matches = re.findall(r'"text":"([^"]+)"', result)
if text_matches:
full_output = " ".join(text_matches).replace("\\n", "\n")
# Если result_buffer пустой — пробуем извлечь текст из result
if not full_output:
logger.warning("result_buffer пустой, пробуем извлечь текст из result")
import re
text_matches = re.findall(r'"text":"([^"]+)"', result)
if text_matches:
full_output = " ".join(text_matches).replace("\\n", "\n")
else:
full_output = "⚠️ Не удалось получить ответ ИИ"
logger.error(f"Result: {result[:500]}...")
provider_name = "Qwen Code"
elif current_provider == "gigachat":
# GigaChat - ответ целиком (не потоковый)
# Обновляем статусное сообщение
try:
await status_msg.edit_text(
"⏳ 🤖 **GigaChat думает...**",
parse_mode="Markdown"
)
except Exception as e:
logger.debug(f"Ошибка обновления статуса для GigaChat: {e}")
result = await provider_manager.execute_request(
provider_id=current_provider,
user_id=user_id,
prompt=full_task,
system_prompt=system_prompt,
on_chunk=None # GigaChat не поддерживает потоковый вывод
)
if result.get("success"):
full_output = result.get("content", "")
else:
full_output = "⚠️ Не удалось получить ответ ИИ"
logger.error(f"Result: {result[:500]}...")
full_output = f"❌ **Ошибка {provider_manager.get_provider_info(current_provider).name}:**\n{result.get('error', 'Неизвестная ошибка')}"
provider_name = "GigaChat"
else:
full_output = f"❌ Неизвестный провайдер: {current_provider}"
provider_name = "Unknown"
# Добавляем ответ ИИ в историю и память
if full_output and full_output != "⚠️ Не удалось получить ответ ИИ":
@ -377,12 +421,15 @@ async def handle_ai_task(update: Update, text: str):
asyncio.create_task(hybrid_memory_manager.extract_facts_with_ai(user_id, dialog_context))
state.messages_since_fact_extract = 0
# Формируем сообщение с информацией о контексте
context_info = f"📊 Контекст: {context_percent}%"
# Формируем сообщение с информацией о контексте и провайдере
context_info = f"📊 Контекст: {context_percent}%\n🤖 AI: {provider_name}"
# Отправляем результат ОТДЕЛЬНЫМ сообщением
response_text = f"{full_output}\n\n*{context_info}*"
# Экранируем специальные символы Markdown в ответе ИИ
escaped_output = escape_markdown(full_output)
# Отправляем результат ОТДЕЛЬНЫМ сообщением
response_text = f"{escaped_output}\n\n*{context_info}*"
# Отправляем новое сообщение с результатом
await update.message.reply_text(
response_text,
@ -1838,6 +1885,10 @@ def main():
# Инициализация меню
init_menus(menu_builder)
# Инициализация AIProviderManager
from qwen_integration import qwen_manager, gigachat_provider
init_ai_provider_manager(qwen_manager, gigachat_provider)
# Создание приложения с таймаутами и прокси
builder = (
@ -1868,6 +1919,7 @@ def main():
application.add_handler(CommandHandler("compact", compact_command))
application.add_handler(CommandHandler("facts", facts_command))
application.add_handler(CommandHandler("forget", forget_command))
application.add_handler(CommandHandler("rss", rss_command))
application.add_handler(CallbackQueryHandler(menu_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
application.add_handler(CommandHandler("ai", ai_command))

View File

@ -45,6 +45,7 @@ class AIAgent:
]
# Триггеры для RSS — ТОЛЬКО явные запросы новостей
# Используем полные фразы чтобы избежать ложных срабатываний
RSS_TRIGGERS = [
'почитай новости', 'покажи новости', 'что нового в linux',
'новости it', 'tech news', 'opensource новости', 'linux новости',
@ -105,17 +106,26 @@ class AIAgent:
return score >= 0.65, score
def _should_read_rss(self, message: str) -> tuple[bool, float]:
"""Проверить, нужно ли читать RSS ленты."""
"""Проверить, нужно ли читать RSS ленты.
ВАЖНО: Используем ТОЛЬКО полные фразы-триггеры.
Отдельные слова (типа "новости") НЕ активируют RSS это предотвращает
ложные срабатывания когда пользователь просто упоминает слово в контексте.
"""
message_lower = message.lower()
score = 0.0
# Только прямые триггеры — высокий порог
# Только прямые фразы-триггеры — высокий порог
for trigger in self.RSS_TRIGGERS:
if trigger in message_lower:
return True, 0.95
# Больше никаких контекстных подсказок — только явные запросы
return False, score
# Отдельные ключевые слова НЕ проверяем — только явные фразы!
# Это предотвращает срабатывание на сообщения типа:
# - "новости" (просто упомянул слово)
# - "н.овости" (разбитое слово)
# - "я читал новости вчера" (прошедшее время, не запрос)
return False, 0.0
def _should_use_ssh(self, message: str) -> tuple[bool, float]:
"""Проверить, нужна ли SSH-команда."""

257
bot/ai_provider_manager.py Normal file
View File

@ -0,0 +1,257 @@
#!/usr/bin/env python3
"""
AI Provider Manager - управление переключением между AI-провайдерами.
Поддерживаемые провайдеры:
- qwen: Qwen Code CLI (основной)
- gigachat: GigaChat API (Сбер)
"""
import logging
from typing import Optional, Dict, Any, Callable, List
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class AIProvider(Enum):
"""Доступные AI-провайдеры."""
QWEN = "qwen"
GIGACHAT = "gigachat"
@dataclass
class ProviderInfo:
"""Информация о провайдере."""
id: str
name: str
description: str
available: bool
is_active: bool
class AIProviderManager:
"""
Менеджер управления AI-провайдерами.
Позволяет переключаться между провайдерами и выполнять запросы
через активного провайдера.
"""
def __init__(self, qwen_manager=None, gigachat_provider=None):
self._qwen_manager = qwen_manager
self._gigachat_provider = gigachat_provider
self._provider_status: Dict[str, bool] = {}
# Проверяем доступность провайдеров при инициализации
self._check_provider_status()
def _check_provider_status(self):
"""Проверка доступности провайдеров."""
# Проверяем Qwen
self._provider_status[AIProvider.QWEN.value] = True # Qwen всегда доступен
# Проверяем GigaChat
if self._gigachat_provider:
self._provider_status[AIProvider.GIGACHAT.value] = self._gigachat_provider.is_available()
else:
self._provider_status[AIProvider.GIGACHAT.value] = False
def get_available_providers(self) -> List[str]:
"""Получить список доступных провайдеров."""
return [
provider_id
for provider_id, available in self._provider_status.items()
if available
]
def is_provider_available(self, provider_id: str) -> bool:
"""Проверить доступен ли провайдер."""
return self._provider_status.get(provider_id, False)
def get_provider_info(self, provider_id: str, is_active: bool = False) -> ProviderInfo:
"""Получить информацию о провайдере."""
providers = {
AIProvider.QWEN.value: ProviderInfo(
id=AIProvider.QWEN.value,
name="Qwen Code",
description="Alibaba Qwen Code CLI — мощный AI-ассистент с поддержкой инструментов",
available=self.is_provider_available(AIProvider.QWEN.value),
is_active=is_active
),
AIProvider.GIGACHAT.value: ProviderInfo(
id=AIProvider.GIGACHAT.value,
name="GigaChat",
description="Sber GigaChat API — российская AI-модель от Сбера",
available=self.is_provider_available(AIProvider.GIGACHAT.value),
is_active=is_active
)
}
return providers.get(provider_id)
def get_all_providers_info(self, active_provider_id: str) -> List[ProviderInfo]:
"""Получить информацию обо всех провайдерах."""
return [
self.get_provider_info(AIProvider.QWEN.value, AIProvider.QWEN.value == active_provider_id),
self.get_provider_info(AIProvider.GIGACHAT.value, AIProvider.GIGACHAT.value == active_provider_id)
]
def switch_provider(self, user_id: int, provider_id: str, state_manager) -> tuple[bool, str]:
"""
Переключить AI-провайдер для пользователя.
Args:
user_id: ID пользователя
provider_id: ID провайдера ("qwen" или "gigachat")
state_manager: Менеджер состояний для обновления состояния пользователя
Returns:
(success: bool, message: str)
"""
if not self.is_provider_available(provider_id):
return False, f"❌ Провайдер {provider_id} недоступен"
state = state_manager.get(user_id)
state.current_ai_provider = provider_id
provider_info = self.get_provider_info(provider_id)
logger.info(f"Пользователь {user_id} переключен на {provider_id}")
return True, f"✅ Переключен на {provider_info.name}"
def get_current_provider(self, state) -> str:
"""Получить текущего провайдера пользователя."""
return state.current_ai_provider
async def execute_request(
self,
provider_id: str,
user_id: int,
prompt: str,
system_prompt: Optional[str] = None,
on_output: Optional[Callable[[str], Any]] = None,
on_chunk: Optional[Callable[[str], Any]] = None,
on_event: Optional[Callable[[Any], Any]] = None,
context: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Выполнить запрос через указанного провайдера.
Args:
provider_id: ID провайдера
user_id: ID пользователя
prompt: Запрос
system_prompt: Системный промпт
on_output: Callback для вывода
on_chunk: Callback для потокового вывода
on_event: Callback для событий
context: Дополнительный контекст
Returns:
Dict с результатом:
- success: bool
- content: str
- error: str (если ошибка)
- provider: str
- metadata: dict
"""
try:
if provider_id == AIProvider.QWEN.value:
if not self._qwen_manager:
return {
"success": False,
"error": "Qwen менеджер не инициализирован",
"provider": provider_id
}
# Выполняем через Qwen
result = await self._qwen_manager.run_task(
user_id=user_id,
task=prompt,
on_output=on_output or (lambda x: None),
on_oauth_url=lambda x: None,
use_system_prompt=False,
on_chunk=on_chunk,
on_event=on_event
)
# Извлекаем текст из результата
import re
text_matches = re.findall(r'"text":"([^"]+)"', result)
content = " ".join(text_matches).replace("\\n", "\n") if text_matches else result
return {
"success": True,
"content": content,
"provider": provider_id,
"metadata": {"raw_result": result}
}
elif provider_id == AIProvider.GIGACHAT.value:
if not self._gigachat_provider:
return {
"success": False,
"error": "GigaChat провайдер не инициализирован",
"provider": provider_id
}
# Выполняем через GigaChat
result = await self._gigachat_provider.chat(
prompt=prompt,
system_prompt=system_prompt,
on_chunk=on_chunk
)
if result.get("success"):
return {
"success": True,
"content": result.get("content", ""),
"provider": provider_id,
"metadata": {
"model": result.get("model", "GigaChat-Pro"),
"usage": result.get("usage", {})
}
}
else:
return {
"success": False,
"error": result.get("error", "Неизвестная ошибка GigaChat"),
"provider": provider_id
}
else:
return {
"success": False,
"error": f"Неизвестный провайдер: {provider_id}",
"provider": provider_id
}
except Exception as e:
logger.error(f"Ошибка выполнения запроса через {provider_id}: {e}")
return {
"success": False,
"error": str(e),
"provider": provider_id
}
# Глобальный менеджер (будет инициализирован в bot.py)
ai_provider_manager: Optional[AIProviderManager] = None
def init_ai_provider_manager(qwen_manager, gigachat_provider) -> AIProviderManager:
"""Инициализировать глобальный AIProviderManager."""
global ai_provider_manager
ai_provider_manager = AIProviderManager(qwen_manager, gigachat_provider)
logger.info(f"AIProviderManager инициализирован. Доступные провайдеры: {ai_provider_manager.get_available_providers()}")
return ai_provider_manager
def get_ai_provider_manager() -> AIProviderManager:
"""Получить глобальный AIProviderManager."""
global ai_provider_manager
if ai_provider_manager is None:
raise RuntimeError("AIProviderManager не инициализирован. Вызовите init_ai_provider_manager().")
return ai_provider_manager

View File

@ -2,7 +2,7 @@
"""Обработчик callback-запросов от меню."""
import logging
from telegram import Update
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ContextTypes
from bot.config import config, state_manager, server_manager, menu_builder
@ -526,3 +526,90 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
reply_markup=menu_builder.get_keyboard("memory")
)
# --- Обработчики меню AI-провайдера ---
elif callback == "ai_provider_menu":
state.current_menu = "ai_provider"
# Получаем текущего провайдера
from bot.ai_provider_manager import get_ai_provider_manager
provider_manager = get_ai_provider_manager()
current_provider = provider_manager.get_current_provider(state)
providers_info = provider_manager.get_all_providers_info(current_provider)
output = "🤖 **AI-провайдеры**\n\n"
output += f"*Текущий провайдер:* "
for info in providers_info:
icon = "" if info.is_active else ""
status = "✓ Доступен" if info.available else "✗ Недоступен"
output += f"\n\n{icon} **{info.name}** — {status}\n"
output += f"_{info.description}_\n"
output += "\n\nВыберите действие:"
await query.edit_message_text(
output,
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("ai_provider")
)
elif callback == "ai_provider_toggle":
# Переключаем провайдер
from bot.ai_provider_manager import get_ai_provider_manager
provider_manager = get_ai_provider_manager()
current_provider = provider_manager.get_current_provider(state)
new_provider = "gigachat" if current_provider == "qwen" else "qwen"
success, message = provider_manager.switch_provider(user_id, new_provider, state_manager)
if success:
provider_info = provider_manager.get_provider_info(new_provider, is_active=True)
await query.edit_message_text(
f"{message}\n\n"
f"**{provider_info.name}**\n"
f"_{provider_info.description}_",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("ai_provider")
)
else:
await query.edit_message_text(
f"{message}\n\n"
"Проверьте настройки в .env файле.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("ai_provider")
)
elif callback == "ai_provider_info":
# Показываем подробную информацию
from bot.ai_provider_manager import get_ai_provider_manager
provider_manager = get_ai_provider_manager()
current_provider = provider_manager.get_current_provider(state)
output = " **Информация о провайдерах**\n\n"
# Qwen
output += "**🔹 Qwen Code**\n"
output += "Alibaba Qwen Code CLI — мощный AI-ассистент с:\n"
output += "• Поддержкой инструментов (поиск, RSS, SSH, cron)\n"
output += "• Потоковым выводом ответа\n"
output += "• Контекстом до 256K токенов\n"
output += "• RAG-памятью на ChromaDB\n\n"
# GigaChat
output += "**🟢 GigaChat**\n"
output += "Sber GigaChat API — российская AI-модель:\n"
output += "• Поддержка русского языка из коробки\n"
output += "• Модели: GigaChat-Pro, GigaChat-Max\n"
output += "• Генерация ответов и изображений\n"
output += "• Требует настройки в .env\n\n"
output += f"*Текущий провайдер:* `{current_provider}`\n"
output += "\nИспользуйте `/ai` для переключения."
await query.edit_message_text(
output,
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("ai_provider")
)

View File

@ -2,12 +2,14 @@
"""Обработчики команд бота (/start, /menu, /help, /settings, /cron)."""
import logging
from datetime import datetime
from telegram import Update
from telegram.ext import ContextTypes
# Импорты из модулей bot/
from bot.config import config, state_manager, server_manager, menu_builder
from bot.utils.decorators import check_access
from bot.utils.formatters import escape_markdown
from bot.tools import tools_registry
from bot.ai_agent import ai_agent
@ -20,7 +22,11 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = update.effective_user
logger.info(f"Пользователь {user.username} ({user.id}) запустил бота")
# Сбрасываем состояние НО сохраняем ai_chat_mode (по умолчанию True)
state = state_manager.get(user.id)
state_manager.reset(user.id)
state = state_manager.get(user.id)
# ai_chat_mode уже True по умолчанию из UserState
# Показать текущую директорию и сервер
working_dir = config.working_directory
@ -38,7 +44,7 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
f"Или выберите сервер в меню.\n"
f"Команда /help покажет справку.",
parse_mode="Markdown",
reply_markup=menu_builder.get_keyboard("main", user_id=update.effective_user.id)
reply_markup=menu_builder.get_keyboard("main", user_id=update.effective_user.id, state=state)
)
@ -320,3 +326,203 @@ async def cron_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
except Exception as e:
logger.exception(f"Ошибка в команде /cron: {e}")
await update.message.reply_text(f"❌ Ошибка: {e}")
@check_access
async def rss_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Обработка команды /rss - показывает последние 15 новостей с AI-переводом.
Формат:
Заголовок (переведенный ИИ)
Описание (переведенный ИИ)
Ссылка на полную новость
"""
user_id = update.effective_user.id
# Отправляем статус
status_msg = await update.message.reply_text("📰 **Загрузка новостей...**\n\nолучение лент и перевод заголовков_")
try:
# Получаем rss_tool
rss_tool = tools_registry.get('rss_tool')
if not rss_tool:
await status_msg.delete()
await update.message.reply_text("❌ Ошибка: RSS инструмент не найден")
return
# Сначала обновляем ленты
fetch_result = await rss_tool.execute(action='fetch')
if fetch_result.success:
logger.info(f"Получено {fetch_result.data.get('total_new_items', 0)} новых элементов")
# Получаем последние 15 новостей (undigested_only=False чтобы все новости)
news_result = await rss_tool.execute(action='list', limit=15, undigested_only=False)
if not news_result.success or not news_result.data:
await status_msg.delete()
await update.message.reply_text("📭 **Нет новостей**\n\nока нет новостей в базах данных._\nобавьте RSS ленту через AI: \"добавь RSS ленту <url>\"_")
return
news_list = news_result.data
# Помечаем новости как прочитанные
for item in news_list:
news_id = item.get('id')
if news_id:
await rss_tool.mark_digest(news_id)
# Формируем вывод с AI-переводом заголовков и описаний
output = "📰 **Последние новости:**\n\n"
for i, item in enumerate(news_list, 1):
title = item.get('title', 'Без названия')
link = item.get('link', '')
pub_date = item.get('pub_date', '')
description = item.get('description', '')
# Переводим заголовок через AI
translated_title = await _translate_text(title, max_length=120)
# Экранируем специальные символы Markdown
translated_title = escape_markdown(translated_title)
# Форматируем дату
date_str = ""
if pub_date:
try:
dt = datetime.strptime(pub_date[:19], '%Y-%m-%d %H:%M:%S')
date_str = dt.strftime('%d.%m.%Y %H:%M')
except:
date_str = pub_date[:16]
# Обрезаем заголовок если слишком длинный
if len(translated_title) > 120:
translated_title = translated_title[:117] + "..."
output += f"**{i}. {translated_title}**\n"
if date_str:
output += f"📅 {date_str}\n"
# Переводим описание если есть
if description:
translated_desc = await _translate_text(description, max_length=300)
# Экранируем специальные символы Markdown
translated_desc = escape_markdown(translated_desc)
if translated_desc:
output += f"{translated_desc}\n"
if link:
short_link = link[:60] + "..." if len(link) > 63 else link
output += f"🔗 {short_link}\n"
output += "\n"
await status_msg.delete()
await update.message.reply_text(output, parse_mode="Markdown")
except Exception as e:
logger.exception(f"Ошибка в команде /rss: {e}")
await status_msg.delete()
await update.message.reply_text(f"❌ **Ошибка:** {e}")
async def _translate_text(text: str, max_length: int = 300) -> str:
"""
Краткий перевод текста на русский через ИИ.
Если перевод не удался возвращает оригинал.
"""
if not text or not text.strip():
return ""
try:
# Быстрый промпт для перевода
prompt = f"Переведи на русский язык этот текст (максимум {max_length} символов, без кавычек и пояснений, только перевод):\n{text[:400]}"
# Используем qwen_manager для перевода
from qwen_integration import qwen_manager
# Создаём временную сессию для перевода
import hashlib
temp_user_id = f"translator_{hashlib.md5(text.encode()).hexdigest()}"
result = await qwen_manager.run_task(
temp_user_id,
prompt,
on_output=lambda x: None,
on_oauth_url=lambda x: None,
use_system_prompt=False
)
# Извлекаем текст из результата
import re
text_matches = re.findall(r'"text":"([^"]+)"', result)
if text_matches:
translated = " ".join(text_matches).replace("\\n", " ").strip()
# Убираем кавычки если есть
translated = translated.strip('"\'')
if translated and len(translated) > 3:
# Экранируем специальные символы Markdown
translated = escape_markdown(translated[:max_length])
return translated
return text[:max_length]
except Exception as e:
logger.debug(f"Ошибка перевода: {e}")
return text[:max_length]
@check_access
async def ai_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка команды /ai - переключение AI-провайдера."""
user_id = update.effective_user.id
state = state_manager.get(user_id)
# Получаем менеджер провайдеров
from bot.ai_provider_manager import get_ai_provider_manager
provider_manager = get_ai_provider_manager()
# Если нет аргументов - показываем текущий статус
if not context.args:
current_provider = provider_manager.get_current_provider(state)
providers_info = provider_manager.get_all_providers_info(current_provider)
output = "🤖 **AI-провайдеры**\n\n"
output += f"*Текущий:* "
for info in providers_info:
icon = "" if info.is_active else ""
status = "" if info.available else ""
output += f"\n{icon} **{info.name}** {status}\n"
output += f" _{info.description}_\n"
output += "\n**Использование:**\n"
output += "`/ai qwen` - переключиться на Qwen Code\n"
output += "`/ai gigachat` - переключиться на GigaChat\n"
await update.message.reply_text(output, parse_mode="Markdown")
return
# Переключаем провайдер
new_provider = context.args[0].lower()
if new_provider not in ["qwen", "gigachat"]:
await update.message.reply_text(
f"❌ Неизвестный провайдер: `{new_provider}`\n\n"
f"Доступные: `qwen`, `gigachat`",
parse_mode="Markdown"
)
return
success, message = provider_manager.switch_provider(user_id, new_provider, state_manager)
if success:
# Получаем информацию о новом провайдере
provider_info = provider_manager.get_provider_info(new_provider, is_active=True)
await update.message.reply_text(
f"{message}\n\n"
f"**{provider_info.name}**\n"
f"_{provider_info.description}_",
parse_mode="Markdown"
)
else:
await update.message.reply_text(message)

View File

@ -7,13 +7,10 @@ from dataclasses import dataclass, field
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
# Импортируем модели и утилиты
from bot.models.user_state import UserState, StateManager
from bot.models.user_state import UserState
logger = logging.getLogger(__name__)
# Глобальный state_manager для кнопки ИИ
state_manager = StateManager()
@dataclass
class MenuItem:
@ -45,10 +42,8 @@ class MenuBuilder:
keyboard = []
# Для главного меню — динамически меняем кнопку ИИ
if menu_name == "main" and user_id:
# Используем переданное состояние или получаем из менеджера
if state is None:
state = state_manager.get(user_id)
if menu_name == "main" and state:
# Используем переданное состояние
logger.info(f"get_keyboard: user_id={user_id}, ai_chat_mode={state.ai_chat_mode}")
for item in items:
@ -177,10 +172,19 @@ def init_menus(menu_builder: MenuBuilder):
MenuItem("🎨 Изменить иконку", "set_icon", icon="🎨"),
MenuItem("👥 Управление доступом", "access_menu", icon="👥"),
MenuItem("🧠 Память ИИ", "memory_menu", icon="🧠"),
MenuItem("🤖 AI-провайдер", "ai_provider_menu", icon="🤖"),
MenuItem("⬅️ Назад", "main", icon="⬅️"),
]
menu_builder.add_menu("settings", settings_menu)
# Меню AI-провайдера
ai_provider_menu = [
MenuItem("🔄 Переключить AI-провайдер", "ai_provider_toggle", icon="🔄"),
MenuItem(" Информация о провайдерах", "ai_provider_info", icon=""),
MenuItem("⬅️ Назад", "settings", icon="⬅️"),
]
menu_builder.add_menu("ai_provider", ai_provider_menu)
# Память ИИ
memory_menu = [
MenuItem("📋 Мой профиль", "memory_profile", icon="📋"),

View File

@ -19,6 +19,7 @@ class UserState:
ai_chat_mode: bool = True # Режим чата с ИИ агентом (включен по умолчанию)
ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ
messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов
current_ai_provider: str = "qwen" # Текущий AI-провайдер: "qwen" или "gigachat"
class StateManager:

View File

@ -11,10 +11,14 @@ import os
import base64
import httpx
import asyncio
import uuid
import logging
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
@dataclass
class GigaChatMessage:
@ -30,8 +34,8 @@ class GigaChatConfig:
client_secret: str
scope: str = "GIGACHAT_API_PERS"
auth_url: str = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
model: str = "GigaChat-Pro"
api_url: str = "https://gigachat.devices.sberbank.ru/api/v2"
model: str = "GigaChat"
api_url: str = "https://gigachat.devices.sberbank.ru/api/v1"
timeout: int = 60
@ -75,36 +79,50 @@ class GigaChatTool:
def _get_auth_headers(self) -> Dict[str, str]:
"""Получение заголовков для авторизации"""
credentials = f"{self.config.client_id}:{self.config.client_secret}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
# GigaChat требует RqUID (UUID) и Content-Type для OAuth
return {
"Authorization": f"Basic {encoded_credentials}",
"Content-Type": "application/x-www-form-urlencoded",
"RqUID": "00000000-0000-0000-0000-000000000000",
"RqUID": str(uuid.uuid4()),
}
async def _get_access_token(self) -> str:
"""Получение access токена для API"""
# Проверяем кэш токена
if self._access_token and self._token_expires:
if datetime.now() < self._token_expires - timedelta(minutes=5):
return self._access_token
# Запрашиваем новый токен с использованием Basic Auth
credentials = f"{self.config.client_id}:{self.config.client_secret}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
# Запрашиваем новый токен
# GigaChat использует самоподписанные сертификаты - отключаем верификацию
async with httpx.AsyncClient(verify=False) as client:
response = await client.post(
self.config.auth_url,
headers=self._get_auth_headers(),
headers={
"Authorization": f"Basic {encoded_credentials}",
"Content-Type": "application/x-www-form-urlencoded",
"RqUID": str(uuid.uuid4()),
},
data={"scope": self.config.scope},
timeout=30,
)
# Логируем для отладки
logger.debug(f"GigaChat auth status: {response.status_code}")
logger.debug(f"GigaChat auth response: {response.text[:200]}")
response.raise_for_status()
data = response.json()
self._access_token = data["access_token"]
# Токен действителен 30 минут, кэшируем на 25 минут
self._token_expires = datetime.now() + timedelta(minutes=25)
# Логируем начало токена для проверки (первые 50 символов)
logger.info(f"GigaChat токен получен: {self._access_token[:50]}...")
return self._access_token
async def chat(
@ -116,10 +134,11 @@ class GigaChatTool:
top_p: float = 0.1,
repetition_penalty: float = 1.0,
use_history: bool = True,
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Отправка запроса к GigaChat API
Args:
messages: Список сообщений (если None, используется история чата)
model: Модель для генерации (если None, используется модель из конфига)
@ -128,7 +147,8 @@ class GigaChatTool:
top_p: Параметр top-p sampling
repetition_penalty: Штраф за повторения
use_history: Использовать ли историю чата
user_id: ID пользователя для заголовка X-User-Id
Returns:
Dict с ответом API:
- content: Текст ответа
@ -167,18 +187,46 @@ class GigaChatTool:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"X-User-Id": str(user_id) if user_id else "telegram-bot",
}
# Логируем запрос для отладки
logger.debug(f"GigaChat API URL: {self.config.api_url}/chat/completions")
logger.debug(f"GigaChat headers: {headers}")
logger.debug(f"GigaChat payload: model={model or self.config.model}, messages={len(api_messages)}, max_tokens={max_tokens}")
# GigaChat использует самоподписанные сертификаты - отключаем верификацию
async with httpx.AsyncClient(verify=False) as client:
response = await client.post(
f"{self.config.api_url}/chat/completions",
headers=headers,
json=payload,
timeout=self.config.timeout,
)
response.raise_for_status()
data = response.json()
try:
response = await client.post(
f"{self.config.api_url}/chat/completions",
headers=headers,
json=payload,
timeout=self.config.timeout,
)
# Логируем для отладки
logger.debug(f"GigaChat chat status: {response.status_code}")
logger.debug(f"GigaChat response headers: {dict(response.headers)}")
if response.status_code != 200:
logger.error(f"GigaChat error response: {response.text[:1000]}")
response.raise_for_status()
data = response.json()
except httpx.HTTPStatusError as e:
logger.error(f"GigaChat HTTP error: {e}")
logger.error(f"Response: {e.response.text[:500]}")
return {
"content": "",
"error": f"HTTP {e.response.status_code}: {e.response.text[:200]}",
}
except httpx.HTTPError as e:
logger.error(f"GigaChat request error: {e}")
return {
"content": "",
"error": f"Request error: {str(e)}",
}
# Добавляем ответ ассистента в историю
if use_history and data.get("choices"):
assistant_message = data["choices"][0]["message"]
@ -186,7 +234,7 @@ class GigaChatTool:
role=assistant_message["role"],
content=assistant_message["content"],
))
return {
"content": data["choices"][0]["message"]["content"] if data.get("choices") else "",
"model": data.get("model", self.config.model),
@ -240,7 +288,8 @@ class GigaChatTool:
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
# GigaChat использует самоподписанные сертификаты - отключаем верификацию
async with httpx.AsyncClient(verify=False) as client:
# Запуск генерации
response = await client.post(
@ -261,7 +310,8 @@ class GigaChatTool:
headers = {
"Authorization": f"Bearer {token}",
}
# GigaChat использует самоподписанные сертификаты - отключаем верификацию
async with httpx.AsyncClient(verify=False) as client:
response = await client.get(
f"{self.config.api_url}/models",

View File

@ -17,8 +17,24 @@ RESERVED_FOR_HEADER = 20 # Резервируем место для "(N/N) "
def escape_markdown(text: str) -> str:
"""
Экранирование специальных символов Markdown для Telegram API.
Telegram Markdown v1 использует: * _ ` [ ] ( )
Эти символы нужно экранировать обратным слэшем.
"""
text = text.replace('```', '\\`\\`\\`')
if not text:
return text
# Экранируем специальные символы Markdown
# Порядок важен: сначала экранируем обратные слэши
text = text.replace('\\', '\\\\')
text = text.replace('`', '\\`')
text = text.replace('*', '\\*')
text = text.replace('_', '\\_')
text = text.replace('[', '\\[')
text = text.replace(']', '\\]')
text = text.replace('(', '\\(')
text = text.replace(')', '\\)')
return text

View File

@ -557,34 +557,43 @@ class GigaChatProvider:
try:
from bot.tools.gigachat_tool import GigaChatMessage
# Формируем сообщения
messages = []
if system_prompt:
messages.append(GigaChatMessage(role="system", content=system_prompt))
messages.append(GigaChatMessage(role="user", content=prompt))
# ВАЖНО: prompt уже содержит весь контекст (system_prompt + summary + memory + history + запрос)
# Поэтому system_prompt отдельно НЕ добавляем
messages = [
GigaChatMessage(role="user", content=prompt),
]
# Вызываем GigaChat API
response = await self._tool.chat(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
use_history=False, # Не используем встроенную историю — у нас своя
user_id="telegram-bot",
)
# Проверяем наличие ошибки в ответе
if response.get("error"):
logger.error(f"GigaChat API error: {response['error']}")
return {
"success": False,
"error": response["error"],
"content": "",
}
# Потоковая отправка если есть callback
if on_chunk and response.get("content"):
await on_chunk(response["content"])
return {
"success": True,
"content": response.get("content", ""),
"model": response.get("model", "GigaChat-Pro"),
"usage": response.get("usage", {}),
}
except Exception as e:
logger.error(f"Ошибка GigaChat API: {e}")
return {