#!/usr/bin/env python3 """ Интеграция с Qwen Code CLI. Запуск, управление сессиями, обработка OAuth. Использует stream-json формат для потокового вывода. """ import os import re import asyncio import subprocess import json import logging from pathlib import Path from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Optional, Dict, Callable, Any, List, Union from enum import Enum logger = logging.getLogger(__name__) class QwenSessionState(Enum): """Состояние сессии Qwen Code.""" STARTING = "starting" WAITING_FOR_OAUTH = "waiting_for_oauth" READY = "ready" BUSY = "busy" ERROR = "error" class QwenEventType(Enum): """Типы событий в stream-json выводе Qwen.""" SYSTEM = "system" ASSISTANT = "assistant" USER = "user" RESULT = "result" TOOL_USE = "tool_use" @dataclass class QwenStreamEvent: """Событие из stream-json вывода Qwen.""" event_type: QwenEventType subtype: Optional[str] = None uuid: Optional[str] = None session_id: Optional[str] = None message: Optional[Dict] = None content: Optional[str] = None is_error: bool = False data: Optional[Dict] = None @dataclass class QwenSession: """Сессия Qwen Code.""" user_id: int state: QwenSessionState = QwenSessionState.STARTING process: Optional[subprocess.Popen] = None oauth_url: Optional[str] = None last_activity: datetime = field(default_factory=datetime.now) pending_task: Optional[str] = None output_buffer: str = "" session_id: Optional[str] = None SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности def is_expired(self) -> bool: return datetime.now() - self.last_activity > self.SESSION_TIMEOUT class QwenCodeManager: """Менеджер сессий Qwen Code.""" def __init__(self, working_dir: str = None, system_prompt_path: str = None): self._sessions: Dict[int, QwenSession] = {} self._working_dir = working_dir or str(Path.home()) self._qwen_command = "qwen" self._system_prompt_path = system_prompt_path or str(Path(__file__).parent / "system_prompt.md") self._system_prompt: Optional[str] = None def load_system_prompt(self) -> str: """Загрузить системный промпт из файла.""" if self._system_prompt is not None: return self._system_prompt try: prompt_path = Path(self._system_prompt_path) if prompt_path.exists(): self._system_prompt = prompt_path.read_text(encoding='utf-8') logger.info(f"Системный промпт загружен из {self._system_prompt_path}") else: self._system_prompt = "" logger.warning(f"Системный промпт не найден: {self._system_prompt_path}") except Exception as e: logger.error(f"Ошибка загрузки системного промпта: {e}") self._system_prompt = "" return self._system_prompt def get_session(self, user_id: int) -> Optional[QwenSession]: """Получить сессию пользователя.""" session = self._sessions.get(user_id) if session and session.is_expired(): self.close_session(user_id) return None return session def create_session(self, user_id: int) -> QwenSession: """Создать новую сессию.""" session = QwenSession(user_id=user_id) self._sessions[user_id] = session logger.info(f"Создана сессия Qwen Code для пользователя {user_id}") return session def close_session(self, user_id: int): """Закрыть сессию пользователя.""" session = self._sessions.pop(user_id, None) if session and session.process: try: session.process.terminate() session.process.wait(timeout=5) except Exception as e: logger.warning(f"Ошибка при закрытии сессии Qwen: {e}") logger.info(f"Закрыта сессия Qwen Code для пользователя {user_id}") def has_active_session(self, user_id: int) -> bool: """Проверка наличия активной сессии.""" session = self.get_session(user_id) return session is not None and session.state != QwenSessionState.ERROR async def run_task(self, user_id: int, task: str, on_output: Callable[[str], Any], on_oauth_url: Callable[[str], Any], use_system_prompt: bool = True, on_chunk: Callable[[str], Any] = None, on_event: Callable[[QwenStreamEvent], Any] = None) -> str: """ Выполнить задачу в Qwen Code с потоковым выводом. Args: user_id: ID пользователя task: Задача для выполнения on_output: Callback для вывода (накапливается) on_oauth_url: Callback для OAuth URL use_system_prompt: Добавить системный промпт (default: True) on_chunk: Callback для потоковой отправки chunks (опционально) on_event: Callback для событий stream-json (опционально) """ # Создаём временную сессию для отслеживания session = self.get_session(user_id) if not session: session = self.create_session(user_id) session.last_activity = datetime.now() session.pending_task = task # Добавляем системный промпт если нужно if use_system_prompt: system_prompt = self.load_system_prompt() if system_prompt: full_task = f"{system_prompt}\n\n=== ЗАПРОС ПОЛЬЗОВАТЕЛЯ ===\n{task}" else: full_task = task else: full_task = task # Выполняем задачу через -p флаг с stream-json выводом return await self._execute_task(session, full_task, on_output, on_chunk, on_event) async def _start_session(self, session: QwenSession, on_output: Callable[[str], Any], on_oauth_url: Callable[[str], Any], pending_task: str = None) -> str: """Запустить сессию Qwen Code.""" session.state = QwenSessionState.STARTING try: # Запускаем qwen в интерактивном режиме с JSON выводом env = os.environ.copy() env["FORCE_COLOR"] = "0" # Отключаем цвета для парсинга cmd = [ self._qwen_command, "--output-format", "stream-json", "--input-format", "text", ] logger.info(f"Запуск Qwen Code: {' '.join(cmd)}") session.process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self._working_dir, env=env, text=True, bufsize=1 ) # Читаем вывод пока не поймём состояние output = "" oauth_detected = False while True: line = session.process.stdout.readline() if not line: break output += line on_output(line) # Проверяем на OAuth URL oauth_match = re.search( r'https://oauth\.qwen\.ai/[^>\s]+|' r'https://[^>\s]*qwen[^>\s]*/oauth[^>\s]*|' r'Authorize.*?https?://[^\s]+', line, re.IGNORECASE ) if oauth_match: oauth_url = oauth_match.group(0) session.oauth_url = oauth_url session.state = QwenSessionState.WAITING_FOR_OAUTH on_oauth_url(oauth_url) oauth_detected = True logger.info(f"Обнаружен OAuth URL: {oauth_url}") break # Проверяем на готовность if "ready" in line.lower() or "assistant" in line.lower(): session.state = QwenSessionState.READY logger.info("Сессия Qwen Code готова") break # Таймаут запуска if session.process.poll() is not None: session.state = QwenSessionState.ERROR return f"❌ Ошибка запуска Qwen Code: {output}" # Если после запуска есть отложенная задача — выполняем if pending_task and session.state == QwenSessionState.READY: return await self._execute_task(session, pending_task, on_output) if oauth_detected: return "⏳ Ожидание авторизации..." return "✅ Сессия запущена" except Exception as e: session.state = QwenSessionState.ERROR logger.error(f"Ошибка запуска сессии Qwen: {e}") return f"❌ Ошибка: {str(e)}" async def _execute_task(self, session: QwenSession, task: str, on_output: Callable[[str], Any], on_chunk: Callable[[str], Any] = None, on_event: Callable[[QwenStreamEvent], Any] = None) -> str: """ Выполнить задачу в активной сессии с потоковым stream-json выводом. Формат stream-json возвращает JSON-объекты по одному на строку: {"type":"system","subtype":"session_start","uuid":"...","session_id":"..."} {"type":"assistant","uuid":"...","message":{"content":[...]}} {"type":"result","subtype":"success","uuid":"...","result":"..."} Args: session: Сессия Qwen task: Задача для выполнения on_output: Callback для полного вывода (накапливается) on_chunk: Callback для потоковой отправки текстовых chunks on_event: Callback для полных JSON событий """ session.state = QwenSessionState.BUSY session.output_buffer = "" try: env = os.environ.copy() env["FORCE_COLOR"] = "0" cmd = [ self._qwen_command, "-p", task, "--output-format", "stream-json", # Правильный streaming формат "--yolo", # Авто-подтверждение ] logger.info(f"Выполнение задачи (stream-json): {' '.join(cmd)}") process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=self._working_dir, env=env ) output = "" chunk_timeout = 300 # 5 минут на выполнение last_chunk_time = datetime.now() partial_content = "" # Для накопления partial messages while True: # Проверяем общий таймаут if (datetime.now() - last_chunk_time).total_seconds() > chunk_timeout: output += "\n\n⚠️ Таймаут выполнения (5 минут)" process.terminate() break # Проверяем процесс if process.returncode is not None: # Процесс завершился - читаем остаток remaining = await process.stdout.read() if remaining: remaining_str = remaining.decode('utf-8', errors='replace') output += remaining_str # Парсим оставшиеся JSON события (не отправляем сырой вывод!) await self._process_stream_lines( remaining_str, on_output, on_chunk, on_event, session ) break # Читаем строку из stdout try: line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0) if line: line_str = line.decode('utf-8', errors='replace') output += line_str session.output_buffer += line_str last_chunk_time = datetime.now() # Парсим JSON событие и извлекаем текст await self._process_stream_lines( line_str, on_output, on_chunk, on_event, session ) except asyncio.TimeoutError: if process.returncode is not None: break continue await asyncio.sleep(0.01) session.state = QwenSessionState.READY session.last_activity = datetime.now() return output.strip() except Exception as e: session.state = QwenSessionState.ERROR logger.error(f"Ошибка выполнения задачи: {e}") return f"❌ Ошибка: {str(e)}" async def _process_stream_lines(self, text: str, on_output: Callable[[str], Any], on_chunk: Callable[[str], Any] = None, on_event: Callable[[QwenStreamEvent], Any] = None, session: QwenSession = None) -> str: """ Распарсить stream-json строки и извлечь текстовый контент. Формат JSON событий: - {"type":"system","subtype":"session_start","session_id":"..."} - {"type":"assistant","message":{"content":[{"type":"text","text":"..."}]}} - {"type":"result","subtype":"success","result":"...","duration_ms":1234} Возвращает только текстовый контент для отображения пользователю. """ extracted_text = "" for line in text.split('\n'): line = line.strip() if not line: continue # Проверяем это JSON или обычный текст if line.startswith('{'): try: event_data = json.loads(line) event_type = event_data.get('type', 'unknown') # Создаём объект события stream_event = QwenStreamEvent( event_type=QwenEventType(event_type) if event_type in ['system', 'assistant', 'user', 'result', 'tool_use'] else None, subtype=event_data.get('subtype'), uuid=event_data.get('uuid'), session_id=event_data.get('session_id'), message=event_data.get('message'), is_error=event_data.get('is_error', False), data=event_data ) # Обновляем session_id из события if stream_event.session_id and session: session.session_id = stream_event.session_id # Извлекаем текст из разных типов событий if event_type == 'assistant': message = event_data.get('message', {}) content_list = message.get('content', []) # Логируем для отладки logger.debug(f"Assistant event: content_type={type(content_list)}, content={content_list[:1] if isinstance(content_list, list) else content_list}") # Обрабатываем только если content - это список (не thinking) if isinstance(content_list, list): for content_item in content_list: if isinstance(content_item, dict): if content_item.get('type') == 'text': text_content = content_item.get('text', '') logger.debug(f"Text chunk: {text_content[:50]}...") extracted_text += text_content # Отправляем ТОЛЬКО в on_chunk для streaming if on_chunk: await on_chunk(text_content) elif content_item.get('type') == 'tool_use': # Инструмент используется - можно показать статус tool_name = content_item.get('name', 'unknown') # Добавляем переносы строк для разделения блоков status_text = f"\n🔧 Использую инструмент: {tool_name}...\n" extracted_text += status_text if on_chunk: await on_chunk(status_text) # Если content.type == 'thinking' - не отправляем пользователю elif event_type == 'result': result_text = event_data.get('result', '') if result_text: extracted_text += result_text # НЕ отправляем result через on_chunk — он уже был отправлен через assistant chunks logger.debug(f"Result event: {result_text[:50]}...") # Проверяем на ошибку if event_data.get('is_error'): error_text = event_data.get('error', 'Неизвестная ошибка') logger.error(f"Ошибка Qwen: {error_text}") elif event_type == 'system': subtype = event_data.get('subtype', '') if subtype == 'session_start': logger.info(f"Сессия Qwen запущена: {stream_event.session_id}") elif subtype == 'init': # Игнорируем init событие pass # Вызываем callback события если есть if on_event: on_event(stream_event) except json.JSONDecodeError as e: # Не JSON строка - возвращаем как текст logger.debug(f"Не JSON строка: {line[:100]}...") extracted_text += line + "\n" if on_chunk: await on_chunk(line + "\n") else: # Обычный текст (не JSON) - например, приветственное сообщение extracted_text += line + "\n" if on_chunk: await on_chunk(line + "\n") return extracted_text def _parse_output(self, output: str) -> str: """ Распарсить JSON вывод qwen-code. Если вывод не JSON — вернуть как есть. """ # Пока просто возвращаем очищенный вывод # В будущем можно парсить JSON stream-format lines = output.split('\n') cleaned = [] for line in lines: # Убираем служебные сообщения if line.strip() and not line.startswith('{'): cleaned.append(line) return '\n'.join(cleaned) if cleaned else output class GigaChatProvider: """ AI-провайдер для работы с GigaChat API. Альтернатива Qwen Code для генерации ответов. Использует GigaChatTool для взаимодействия с API Сбера. """ def __init__(self): self._tool = None self._initialized = False self._config_error: Optional[str] = None def _ensure_initialized(self): """Ленивая инициализация инструмента""" if self._initialized: return try: from bot.tools.gigachat_tool import create_gigachat_tool self._tool = create_gigachat_tool() if not self._tool: self._config_error = "GigaChat не настроен. Проверьте GIGACHAT_CLIENT_ID и GIGACHAT_CLIENT_SECRET в .env" logger.warning(self._config_error) else: logger.info("GigaChatProvider инициализирован") except ImportError as e: self._config_error = f"Ошибка импорта GigaChat: {e}" logger.error(self._config_error) except Exception as e: self._config_error = f"Ошибка инициализации GigaChat: {e}" logger.error(self._config_error) self._initialized = True async def chat( self, prompt: str, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 2000, on_chunk: Optional[Callable[[str], Any]] = None, ) -> Dict[str, Any]: """ Отправка запроса к GigaChat API Args: prompt: Запрос пользователя system_prompt: Системный промпт (роль ассистента) temperature: Температура генерации max_tokens: Максимум токенов в ответе on_chunk: Callback для потоковой отправки (не используется, GigaChat отдаёт целиком) Returns: Dict с полями: - success: bool - content: str - текст ответа - error: str - ошибка если есть - model: str - использованная модель - usage: dict - статистика токенов """ self._ensure_initialized() if not self._tool: return { "success": False, "error": self._config_error or "GigaChat не инициализирован", "content": "", } try: from bot.tools.gigachat_tool import GigaChatMessage # Формируем сообщения messages = [] if system_prompt: messages.append(GigaChatMessage(role="system", content=system_prompt)) messages.append(GigaChatMessage(role="user", content=prompt)) # Вызываем GigaChat API response = await self._tool.chat( messages=messages, temperature=temperature, max_tokens=max_tokens, use_history=False, # Не используем встроенную историю — у нас своя ) # Потоковая отправка если есть callback if on_chunk and response.get("content"): await on_chunk(response["content"]) return { "success": True, "content": response.get("content", ""), "model": response.get("model", "GigaChat-Pro"), "usage": response.get("usage", {}), } except Exception as e: logger.error(f"Ошибка GigaChat API: {e}") return { "success": False, "error": str(e), "content": "", } def clear_session(self): """Очистка сессии (истории чата)""" if self._tool: self._tool.clear_history() def is_available(self) -> bool: """Проверка доступности провайдера""" self._ensure_initialized() return self._tool is not None def get_error(self) -> Optional[str]: """Получение ошибки инициализации""" self._ensure_initialized() return self._config_error # Глобальный менеджер qwen_manager = QwenCodeManager() # Глобальный GigaChat провайдер gigachat_provider = GigaChatProvider()