telegram-cli-bot/qwen_integration.py

725 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Интеграция с Qwen Code CLI.
Запуск, управление сессиями, обработка OAuth.
Использует stream-json формат для потокового вывода.
"""
import os
import re
import asyncio
import subprocess
import json
import logging
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Callable, Any, List, Union
from enum import Enum
# Импортируем OAuth модуль
from bot.utils.qwen_oauth import (
get_authorization_url,
check_authorization_complete,
is_authorized,
get_access_token,
clear_authorization
)
logger = logging.getLogger(__name__)
class QwenSessionState(Enum):
"""Состояние сессии Qwen Code."""
STARTING = "starting"
WAITING_FOR_OAUTH = "waiting_for_oauth"
READY = "ready"
BUSY = "busy"
ERROR = "error"
class QwenEventType(Enum):
"""Типы событий в stream-json выводе Qwen."""
SYSTEM = "system"
ASSISTANT = "assistant"
USER = "user"
RESULT = "result"
TOOL_USE = "tool_use"
@dataclass
class QwenStreamEvent:
"""Событие из stream-json вывода Qwen."""
event_type: QwenEventType
subtype: Optional[str] = None
uuid: Optional[str] = None
session_id: Optional[str] = None
message: Optional[Dict] = None
content: Optional[str] = None
is_error: bool = False
data: Optional[Dict] = None
@dataclass
class QwenSession:
"""Сессия Qwen Code."""
user_id: int
state: QwenSessionState = QwenSessionState.STARTING
process: Optional[subprocess.Popen] = None
oauth_url: Optional[str] = None
on_oauth_url: Optional[Callable] = None # Callback для OAuth URL
last_activity: datetime = field(default_factory=datetime.now)
pending_task: Optional[str] = None
output_buffer: str = ""
session_id: Optional[str] = None
SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности
def is_expired(self) -> bool:
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class QwenCodeManager:
"""Менеджер сессий Qwen Code."""
def __init__(self, working_dir: str = None, system_prompt_path: str = None):
self._sessions: Dict[int, QwenSession] = {}
self._working_dir = working_dir or str(Path.home())
self._qwen_command = "qwen"
self._system_prompt_path = system_prompt_path or str(Path(__file__).parent / "system_prompt.md")
self._system_prompt: Optional[str] = None
def load_system_prompt(self) -> str:
"""Загрузить системный промпт из файла."""
if self._system_prompt is not None:
return self._system_prompt
try:
prompt_path = Path(self._system_prompt_path)
if prompt_path.exists():
self._system_prompt = prompt_path.read_text(encoding='utf-8')
logger.info(f"Системный промпт загружен из {self._system_prompt_path}")
else:
self._system_prompt = ""
logger.warning(f"Системный промпт не найден: {self._system_prompt_path}")
except Exception as e:
logger.error(f"Ошибка загрузки системного промпта: {e}")
self._system_prompt = ""
return self._system_prompt
def get_session(self, user_id: int) -> Optional[QwenSession]:
"""Получить сессию пользователя."""
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def create_session(self, user_id: int) -> QwenSession:
"""Создать новую сессию."""
session = QwenSession(user_id=user_id)
self._sessions[user_id] = session
logger.info(f"Создана сессия Qwen Code для пользователя {user_id}")
return session
async def get_oauth_url(self) -> Optional[str]:
"""
Получить OAuth ссылку для авторизации Qwen Code.
Returns:
OAuth URL или None если не удалось получить
"""
try:
import aiohttp
# Генерируем code verifier и challenge (упрощённо)
import hashlib
import secrets
import uuid
code_verifier = secrets.token_urlsafe(32)
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
# Запрос на получение device code (form-urlencoded как в оригинале)
payload = {
'client_id': QWEN_OAUTH_CLIENT_ID,
'scope': 'openid profile email model.completion',
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'x-request-id': str(uuid.uuid4()),
'User-Agent': 'qwen-code-cli/0.11.0'
}
# Формируем form-urlencoded тело
form_data = '&'.join(f'{k}={v}' for k, v in payload.items())
async with aiohttp.ClientSession() as session:
async with session.post(
QWEN_OAUTH_DEVICE_CODE_ENDPOINT,
data=form_data,
headers=headers
) as resp:
text = await resp.text()
if resp.status == 200:
try:
data = await resp.json()
verification_uri = data.get('verification_uri_complete', '')
logger.info(f"Получен OAuth URL: {verification_uri}")
return verification_uri
except Exception as json_err:
logger.error(f"Ошибка парсинга JSON: {json_err}")
logger.debug(f"Ответ сервера: {text[:200]}")
else:
logger.error(f"Ошибка получения OAuth: {resp.status}")
logger.debug(f"Ответ сервера: {text[:200]}")
return None
except Exception as e:
logger.error(f"Ошибка получения OAuth URL: {e}")
return None
def close_session(self, user_id: int):
"""Закрыть сессию пользователя."""
session = self._sessions.pop(user_id, None)
if session and session.process:
try:
session.process.terminate()
session.process.wait(timeout=5)
except Exception as e:
logger.warning(f"Ошибка при закрытии сессии Qwen: {e}")
logger.info(f"Закрыта сессия Qwen Code для пользователя {user_id}")
def has_active_session(self, user_id: int) -> bool:
"""Проверка наличия активной сессии."""
session = self.get_session(user_id)
return session is not None and session.state != QwenSessionState.ERROR
async def run_task(self, user_id: int, task: str,
on_output: Callable[[str], Any],
on_oauth_url: Callable[[str], Any],
use_system_prompt: bool = True,
on_chunk: Callable[[str], Any] = None,
on_event: Callable[[QwenStreamEvent], Any] = None) -> str:
"""
Выполнить задачу в Qwen Code с потоковым выводом.
Args:
user_id: ID пользователя
task: Задача для выполнения
on_output: Callback для вывода (накапливается)
on_oauth_url: Callback для OAuth URL
use_system_prompt: Добавить системный промпт (default: True)
on_chunk: Callback для потоковой отправки chunks (опционально)
on_event: Callback для событий stream-json (опционально)
"""
# Создаём временную сессию для отслеживания
session = self.get_session(user_id)
if not session:
session = self.create_session(user_id)
session.last_activity = datetime.now()
session.pending_task = task
session.on_oauth_url = on_oauth_url # Сохраняем callback для OAuth
# Добавляем системный промпт если нужно
if use_system_prompt:
system_prompt = self.load_system_prompt()
if system_prompt:
full_task = f"{system_prompt}\n\n=== ЗАПРОС ПОЛЬЗОВАТЕЛЯ ===\n{task}"
else:
full_task = task
else:
full_task = task
# Выполняем задачу через -p флаг с stream-json выводом
return await self._execute_task(session, full_task, on_output, on_chunk, on_event)
async def _start_session(self, session: QwenSession,
on_output: Callable[[str], Any],
on_oauth_url: Callable[[str], Any],
pending_task: str = None) -> str:
"""Запустить сессию Qwen Code."""
session.state = QwenSessionState.STARTING
try:
# Запускаем qwen в интерактивном режиме с JSON выводом
env = os.environ.copy()
env["FORCE_COLOR"] = "0" # Отключаем цвета для парсинга
cmd = [
self._qwen_command,
"--output-format", "stream-json",
"--input-format", "text",
"--auth-type", "qwen-oauth", # Явное указание типа авторизации
]
logger.info(f"Запуск Qwen Code: {' '.join(cmd)}")
session.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=self._working_dir,
env=env,
text=True,
bufsize=1
)
# Читаем вывод пока не поймём состояние
output = ""
oauth_detected = False
while True:
line = session.process.stdout.readline()
if not line:
break
output += line
on_output(line)
# Проверяем на OAuth URL
oauth_match = re.search(
r'https://oauth\.qwen\.ai/[^>\s]+|'
r'https://[^>\s]*qwen[^>\s]*/oauth[^>\s]*|'
r'Authorize.*?https?://[^\s]+',
line,
re.IGNORECASE
)
if oauth_match:
oauth_url = oauth_match.group(0)
session.oauth_url = oauth_url
session.state = QwenSessionState.WAITING_FOR_OAUTH
on_oauth_url(oauth_url)
oauth_detected = True
logger.info(f"Обнаружен OAuth URL: {oauth_url}")
break
# Проверяем на готовность
if "ready" in line.lower() or "assistant" in line.lower():
session.state = QwenSessionState.READY
logger.info("Сессия Qwen Code готова")
break
# Таймаут запуска
if session.process.poll() is not None:
session.state = QwenSessionState.ERROR
return f"❌ Ошибка запуска Qwen Code: {output}"
# Если после запуска есть отложенная задача — выполняем
if pending_task and session.state == QwenSessionState.READY:
return await self._execute_task(session, pending_task, on_output)
if oauth_detected:
return "⏳ Ожидание авторизации..."
return "✅ Сессия запущена"
except Exception as e:
session.state = QwenSessionState.ERROR
logger.error(f"Ошибка запуска сессии Qwen: {e}")
return f"❌ Ошибка: {str(e)}"
async def _execute_task(self, session: QwenSession,
task: str,
on_output: Callable[[str], Any],
on_chunk: Callable[[str], Any] = None,
on_event: Callable[[QwenStreamEvent], Any] = None) -> str:
"""
Выполнить задачу в активной сессии с потоковым stream-json выводом.
Формат stream-json возвращает JSON-объекты по одному на строку:
{"type":"system","subtype":"session_start","uuid":"...","session_id":"..."}
{"type":"assistant","uuid":"...","message":{"content":[...]}}
{"type":"result","subtype":"success","uuid":"...","result":"..."}
Args:
session: Сессия Qwen
task: Задача для выполнения
on_output: Callback для полного вывода (накапливается)
on_chunk: Callback для потоковой отправки текстовых chunks
on_event: Callback для полных JSON событий
"""
session.state = QwenSessionState.BUSY
session.output_buffer = ""
try:
env = os.environ.copy()
env["FORCE_COLOR"] = "0"
cmd = [
self._qwen_command,
"-p", task,
"--output-format", "stream-json", # Правильный streaming формат
"--auth-type", "qwen-oauth", # Явное указание типа авторизации
# НЕ используем --yolo чтобы получить OAuth ссылку если нужно
]
logger.info(f"Выполнение задачи (stream-json): {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=self._working_dir,
env=env
)
output = ""
chunk_timeout = 300 # 5 минут на выполнение
last_chunk_time = datetime.now()
partial_content = "" # Для накопления partial messages
while True:
# Проверяем общий таймаут
if (datetime.now() - last_chunk_time).total_seconds() > chunk_timeout:
output += "\n\n⚠️ Таймаут выполнения (5 минут)"
process.terminate()
break
# Проверяем процесс
if process.returncode is not None:
# Процесс завершился - читаем остаток
remaining = await process.stdout.read()
if remaining:
remaining_str = remaining.decode('utf-8', errors='replace')
output += remaining_str
# Парсим оставшиеся JSON события (не отправляем сырой вывод!)
await self._process_stream_lines(
remaining_str, on_output, on_chunk, on_event, session
)
break
# Читаем строку из stdout
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0)
if line:
line_str = line.decode('utf-8', errors='replace')
output += line_str
session.output_buffer += line_str
last_chunk_time = datetime.now()
# Проверяем на OAuth ссылку в текстовом выводе
oauth_match = re.search(
r'https://chat\.qwen\.ai/authorize\?user_code=([A-Za-z0-9_-]+)',
line_str
)
if oauth_match:
oauth_url = oauth_match.group(0)
logger.info(f"Обнаружена OAuth ссылка: {oauth_url}")
if session and session.on_oauth_url:
await session.on_oauth_url(oauth_url)
# Парсим JSON событие и извлекаем текст
await self._process_stream_lines(
line_str, on_output, on_chunk, on_event, session
)
except asyncio.TimeoutError:
if process.returncode is not None:
break
continue
await asyncio.sleep(0.01)
session.state = QwenSessionState.READY
session.last_activity = datetime.now()
return output.strip()
except Exception as e:
session.state = QwenSessionState.ERROR
logger.error(f"Ошибка выполнения задачи: {e}")
return f"❌ Ошибка: {str(e)}"
async def _process_stream_lines(self,
text: str,
on_output: Callable[[str], Any],
on_chunk: Callable[[str], Any] = None,
on_event: Callable[[QwenStreamEvent], Any] = None,
session: QwenSession = None) -> str:
"""
Распарсить stream-json строки и извлечь текстовый контент.
Формат JSON событий:
- {"type":"system","subtype":"session_start","session_id":"..."}
- {"type":"assistant","message":{"content":[{"type":"text","text":"..."}]}}
- {"type":"result","subtype":"success","result":"...","duration_ms":1234}
Возвращает только текстовый контент для отображения пользователю.
"""
extracted_text = ""
for line in text.split('\n'):
line = line.strip()
if not line:
continue
# Проверяем это JSON или обычный текст
if line.startswith('{'):
try:
event_data = json.loads(line)
event_type = event_data.get('type', 'unknown')
# Создаём объект события
stream_event = QwenStreamEvent(
event_type=QwenEventType(event_type) if event_type in ['system', 'assistant', 'user', 'result', 'tool_use'] else None,
subtype=event_data.get('subtype'),
uuid=event_data.get('uuid'),
session_id=event_data.get('session_id'),
message=event_data.get('message'),
is_error=event_data.get('is_error', False),
data=event_data
)
# Обновляем session_id из события
if stream_event.session_id and session:
session.session_id = stream_event.session_id
# Извлекаем текст из разных типов событий
if event_type == 'assistant':
message = event_data.get('message', {})
content_list = message.get('content', [])
# Логируем для отладки
logger.debug(f"Assistant event: content_type={type(content_list)}, content={content_list[:1] if isinstance(content_list, list) else content_list}")
# Обрабатываем только если content - это список (не thinking)
if isinstance(content_list, list):
for content_item in content_list:
if isinstance(content_item, dict):
if content_item.get('type') == 'text':
text_content = content_item.get('text', '')
logger.debug(f"Text chunk: {text_content[:50]}...")
extracted_text += text_content
# Отправляем ТОЛЬКО в on_chunk для streaming
if on_chunk:
await on_chunk(text_content)
elif content_item.get('type') == 'tool_use':
# Инструмент используется - можно показать статус
tool_name = content_item.get('name', 'unknown')
# Добавляем переносы строк для разделения блоков
status_text = f"\n🔧 Использую инструмент: {tool_name}...\n"
extracted_text += status_text
if on_chunk:
await on_chunk(status_text)
# Если content.type == 'thinking' - не отправляем пользователю
elif event_type == 'result':
result_text = event_data.get('result', '')
if result_text:
extracted_text += result_text
# НЕ отправляем result через on_chunk — он уже был отправлен через assistant chunks
logger.debug(f"Result event: {result_text[:50]}...")
# Проверяем на ошибку
if event_data.get('is_error'):
error_obj = event_data.get('error', {})
error_message = error_obj.get('message', 'Неизвестная ошибка') if isinstance(error_obj, dict) else str(error_obj)
logger.error(f"Ошибка Qwen: {error_message}")
# Проверяем ошибку авторизации — получаем OAuth URL
if 'No auth type' in error_message or 'auth type is not selected' in error_message:
# Получаем OAuth URL через API
oauth_url = await self.get_oauth_url()
if not oauth_url:
oauth_url = f"{QWEN_OAUTH_BASE_URL}/"
logger.info(f"Требуется OAuth: {oauth_url}")
# Вызываем on_oauth_url если есть в session
if session and hasattr(session, 'on_oauth_url') and session.on_oauth_url:
await session.on_oauth_url(oauth_url)
elif event_type == 'system':
subtype = event_data.get('subtype', '')
if subtype == 'session_start':
logger.info(f"Сессия Qwen запущена: {stream_event.session_id}")
elif subtype == 'init':
# Игнорируем init событие
pass
# Вызываем callback события если есть
if on_event:
on_event(stream_event)
except json.JSONDecodeError as e:
# Не JSON строка - возвращаем как текст
logger.debug(f"Не JSON строка: {line[:100]}...")
extracted_text += line + "\n"
if on_chunk:
await on_chunk(line + "\n")
else:
# Обычный текст (не JSON) - например, приветственное сообщение
extracted_text += line + "\n"
if on_chunk:
await on_chunk(line + "\n")
return extracted_text
def _parse_output(self, output: str) -> str:
"""
Распарсить JSON вывод qwen-code.
Если вывод не JSON — вернуть как есть.
"""
# Пока просто возвращаем очищенный вывод
# В будущем можно парсить JSON stream-format
lines = output.split('\n')
cleaned = []
for line in lines:
# Убираем служебные сообщения
if line.strip() and not line.startswith('{'):
cleaned.append(line)
return '\n'.join(cleaned) if cleaned else output
class GigaChatProvider:
"""
AI-провайдер для работы с GigaChat API.
Альтернатива Qwen Code для генерации ответов.
Использует GigaChatTool для взаимодействия с API Сбера.
"""
def __init__(self):
self._tool = None
self._initialized = False
self._config_error: Optional[str] = None
def _ensure_initialized(self):
"""Ленивая инициализация инструмента"""
if self._initialized:
return
try:
from bot.tools.gigachat_tool import create_gigachat_tool
self._tool = create_gigachat_tool()
if not self._tool:
self._config_error = "GigaChat не настроен. Проверьте GIGACHAT_CLIENT_ID и GIGACHAT_CLIENT_SECRET в .env"
logger.warning(self._config_error)
else:
logger.info("GigaChatProvider инициализирован")
except ImportError as e:
self._config_error = f"Ошибка импорта GigaChat: {e}"
logger.error(self._config_error)
except Exception as e:
self._config_error = f"Ошибка инициализации GigaChat: {e}"
logger.error(self._config_error)
self._initialized = True
async def chat(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 2000,
on_chunk: Optional[Callable[[str], Any]] = None,
) -> Dict[str, Any]:
"""
Отправка запроса к GigaChat API
Args:
prompt: Запрос пользователя
system_prompt: Системный промпт (роль ассистента)
temperature: Температура генерации
max_tokens: Максимум токенов в ответе
on_chunk: Callback для потоковой отправки (не используется, GigaChat отдаёт целиком)
Returns:
Dict с полями:
- success: bool
- content: str - текст ответа
- error: str - ошибка если есть
- model: str - использованная модель
- usage: dict - статистика токенов
"""
self._ensure_initialized()
if not self._tool:
return {
"success": False,
"error": self._config_error or "GigaChat не инициализирован",
"content": "",
}
try:
from bot.tools.gigachat_tool import GigaChatMessage
# Формируем сообщения
# ВАЖНО: prompt уже содержит весь контекст (system_prompt + summary + memory + history + запрос)
# Поэтому system_prompt отдельно НЕ добавляем
messages = [
GigaChatMessage(role="user", content=prompt),
]
# Вызываем GigaChat API
response = await self._tool.chat(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
use_history=False, # Не используем встроенную историю — у нас своя
user_id="telegram-bot",
)
# Проверяем наличие ошибки в ответе
if response.get("error"):
logger.error(f"GigaChat API error: {response['error']}")
return {
"success": False,
"error": response["error"],
"content": "",
}
# Потоковая отправка если есть callback
if on_chunk and response.get("content"):
await on_chunk(response["content"])
return {
"success": True,
"content": response.get("content", ""),
"model": response.get("model", "GigaChat-Pro"),
"usage": response.get("usage", {}),
}
except Exception as e:
logger.error(f"Ошибка GigaChat API: {e}")
return {
"success": False,
"error": str(e),
"content": "",
}
def clear_session(self):
"""Очистка сессии (истории чата)"""
if self._tool:
self._tool.clear_history()
def is_available(self) -> bool:
"""Проверка доступности провайдера"""
self._ensure_initialized()
return self._tool is not None
def get_error(self) -> Optional[str]:
"""Получение ошибки инициализации"""
self._ensure_initialized()
return self._config_error
# Глобальный менеджер
qwen_manager = QwenCodeManager()
# Глобальный GigaChat провайдер
gigachat_provider = GigaChatProvider()