#!/usr/bin/env python3 """ Интеграция с Qwen Code CLI. Запуск, управление сессиями, обработка OAuth. """ import os import re import asyncio import subprocess import logging from pathlib import Path from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Optional, Dict, Callable, Any from enum import Enum logger = logging.getLogger(__name__) class QwenSessionState(Enum): """Состояние сессии Qwen Code.""" STARTING = "starting" WAITING_FOR_OAUTH = "waiting_for_oauth" READY = "ready" BUSY = "busy" ERROR = "error" @dataclass class QwenSession: """Сессия Qwen Code.""" user_id: int state: QwenSessionState = QwenSessionState.STARTING process: Optional[subprocess.Popen] = None oauth_url: Optional[str] = None last_activity: datetime = field(default_factory=datetime.now) pending_task: Optional[str] = None output_buffer: str = "" SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности def is_expired(self) -> bool: return datetime.now() - self.last_activity > self.SESSION_TIMEOUT class QwenCodeManager: """Менеджер сессий Qwen Code.""" def __init__(self, working_dir: str = None, system_prompt_path: str = None): self._sessions: Dict[int, QwenSession] = {} self._working_dir = working_dir or str(Path.home()) self._qwen_command = "qwen" self._system_prompt_path = system_prompt_path or str(Path(__file__).parent / "system_prompt.md") self._system_prompt: Optional[str] = None def load_system_prompt(self) -> str: """Загрузить системный промпт из файла.""" if self._system_prompt is not None: return self._system_prompt try: prompt_path = Path(self._system_prompt_path) if prompt_path.exists(): self._system_prompt = prompt_path.read_text(encoding='utf-8') logger.info(f"Системный промпт загружен из {self._system_prompt_path}") else: self._system_prompt = "" logger.warning(f"Системный промпт не найден: {self._system_prompt_path}") except Exception as e: logger.error(f"Ошибка загрузки системного промпта: {e}") self._system_prompt = "" return self._system_prompt def get_session(self, user_id: int) -> Optional[QwenSession]: """Получить сессию пользователя.""" session = self._sessions.get(user_id) if session and session.is_expired(): self.close_session(user_id) return None return session def create_session(self, user_id: int) -> QwenSession: """Создать новую сессию.""" session = QwenSession(user_id=user_id) self._sessions[user_id] = session logger.info(f"Создана сессия Qwen Code для пользователя {user_id}") return session def close_session(self, user_id: int): """Закрыть сессию пользователя.""" session = self._sessions.pop(user_id, None) if session and session.process: try: session.process.terminate() session.process.wait(timeout=5) except Exception as e: logger.warning(f"Ошибка при закрытии сессии Qwen: {e}") logger.info(f"Закрыта сессия Qwen Code для пользователя {user_id}") def has_active_session(self, user_id: int) -> bool: """Проверка наличия активной сессии.""" session = self.get_session(user_id) return session is not None and session.state != QwenSessionState.ERROR async def run_task(self, user_id: int, task: str, on_output: Callable[[str], Any], on_oauth_url: Callable[[str], Any], use_system_prompt: bool = True) -> str: """ Выполнить задачу в Qwen Code. Для простоты каждый раз запускаем новый процесс. Args: user_id: ID пользователя task: Задача для выполнения on_output: Callback для вывода on_oauth_url: Callback для OAuth URL use_system_prompt: Добавить системный промпт (default: True) """ # Создаём временную сессию для отслеживания session = self.get_session(user_id) if not session: session = self.create_session(user_id) session.last_activity = datetime.now() session.pending_task = task # Добавляем системный промпт если нужно if use_system_prompt: system_prompt = self.load_system_prompt() if system_prompt: full_task = f"{system_prompt}\n\n=== ЗАПРОС ПОЛЬЗОВАТЕЛЯ ===\n{task}" else: full_task = task else: full_task = task # Просто выполняем задачу через -p флаг return await self._execute_task(session, full_task, on_output) async def _start_session(self, session: QwenSession, on_output: Callable[[str], Any], on_oauth_url: Callable[[str], Any], pending_task: str = None) -> str: """Запустить сессию Qwen Code.""" session.state = QwenSessionState.STARTING try: # Запускаем qwen в интерактивном режиме с JSON выводом env = os.environ.copy() env["FORCE_COLOR"] = "0" # Отключаем цвета для парсинга cmd = [ self._qwen_command, "--output-format", "stream-json", "--input-format", "text", ] logger.info(f"Запуск Qwen Code: {' '.join(cmd)}") session.process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self._working_dir, env=env, text=True, bufsize=1 ) # Читаем вывод пока не поймём состояние output = "" oauth_detected = False while True: line = session.process.stdout.readline() if not line: break output += line on_output(line) # Проверяем на OAuth URL oauth_match = re.search( r'https://oauth\.qwen\.ai/[^>\s]+|' r'https://[^>\s]*qwen[^>\s]*/oauth[^>\s]*|' r'Authorize.*?https?://[^\s]+', line, re.IGNORECASE ) if oauth_match: oauth_url = oauth_match.group(0) session.oauth_url = oauth_url session.state = QwenSessionState.WAITING_FOR_OAUTH on_oauth_url(oauth_url) oauth_detected = True logger.info(f"Обнаружен OAuth URL: {oauth_url}") break # Проверяем на готовность if "ready" in line.lower() or "assistant" in line.lower(): session.state = QwenSessionState.READY logger.info("Сессия Qwen Code готова") break # Таймаут запуска if session.process.poll() is not None: session.state = QwenSessionState.ERROR return f"❌ Ошибка запуска Qwen Code: {output}" # Если после запуска есть отложенная задача — выполняем if pending_task and session.state == QwenSessionState.READY: return await self._execute_task(session, pending_task, on_output) if oauth_detected: return "⏳ Ожидание авторизации..." return "✅ Сессия запущена" except Exception as e: session.state = QwenSessionState.ERROR logger.error(f"Ошибка запуска сессии Qwen: {e}") return f"❌ Ошибка: {str(e)}" async def _execute_task(self, session: QwenSession, task: str, on_output: Callable[[str], Any]) -> str: """Выполнить задачу в активной сессии.""" session.state = QwenSessionState.BUSY session.output_buffer = "" try: # Для неинтерактивного режима используем -p env = os.environ.copy() env["FORCE_COLOR"] = "0" cmd = [ self._qwen_command, "-p", task, # Передаём задачу через флаг -p "--output-format", "text", # Простой текстовый вывод "--yolo", # Автоматическое подтверждение всех действий ] logger.info(f"Выполнение задачи: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self._working_dir, env=env, text=True, bufsize=1 ) # Читаем вывод output = "" timeout = 300 # 5 минут на выполнение start_time = datetime.now() while True: # Проверяем таймаут if (datetime.now() - start_time).total_seconds() > timeout: output += "\n\n⚠️ Таймаут выполнения (5 минут)" process.terminate() break # Проверяем процесс if process.poll() is not None: # Процесс завершился remaining = process.stdout.read() if remaining: output += remaining on_output(remaining) break # Читаем вывод line = process.stdout.readline() if line: output += line session.output_buffer += line on_output(line) # Небольшая пауза await asyncio.sleep(0.1) session.state = QwenSessionState.READY session.last_activity = datetime.now() return output.strip() except Exception as e: session.state = QwenSessionState.ERROR logger.error(f"Ошибка выполнения задачи: {e}") return f"❌ Ошибка: {str(e)}" def _parse_output(self, output: str) -> str: """ Распарсить JSON вывод qwen-code. Если вывод не JSON — вернуть как есть. """ # Пока просто возвращаем очищенный вывод # В будущем можно парсить JSON stream-format lines = output.split('\n') cleaned = [] for line in lines: # Убираем служебные сообщения if line.strip() and not line.startswith('{'): cleaned.append(line) return '\n'.join(cleaned) if cleaned else output # Глобальный менеджер qwen_manager = QwenCodeManager()