322 lines
13 KiB
Python
322 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Интеграция с Qwen Code CLI.
|
||
Запуск, управление сессиями, обработка OAuth.
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import asyncio
|
||
import subprocess
|
||
import logging
|
||
from pathlib import Path
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict, Callable, Any
|
||
from enum import Enum
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class QwenSessionState(Enum):
|
||
"""Состояние сессии Qwen Code."""
|
||
STARTING = "starting"
|
||
WAITING_FOR_OAUTH = "waiting_for_oauth"
|
||
READY = "ready"
|
||
BUSY = "busy"
|
||
ERROR = "error"
|
||
|
||
|
||
@dataclass
|
||
class QwenSession:
|
||
"""Сессия Qwen Code."""
|
||
user_id: int
|
||
state: QwenSessionState = QwenSessionState.STARTING
|
||
process: Optional[subprocess.Popen] = None
|
||
oauth_url: Optional[str] = None
|
||
last_activity: datetime = field(default_factory=datetime.now)
|
||
pending_task: Optional[str] = None
|
||
output_buffer: str = ""
|
||
|
||
SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности
|
||
|
||
def is_expired(self) -> bool:
|
||
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
|
||
|
||
|
||
class QwenCodeManager:
|
||
"""Менеджер сессий Qwen Code."""
|
||
|
||
def __init__(self, working_dir: str = None, system_prompt_path: str = None):
|
||
self._sessions: Dict[int, QwenSession] = {}
|
||
self._working_dir = working_dir or str(Path.home())
|
||
self._qwen_command = "qwen"
|
||
self._system_prompt_path = system_prompt_path or str(Path(__file__).parent / "system_prompt.md")
|
||
self._system_prompt: Optional[str] = None
|
||
|
||
def load_system_prompt(self) -> str:
|
||
"""Загрузить системный промпт из файла."""
|
||
if self._system_prompt is not None:
|
||
return self._system_prompt
|
||
|
||
try:
|
||
prompt_path = Path(self._system_prompt_path)
|
||
if prompt_path.exists():
|
||
self._system_prompt = prompt_path.read_text(encoding='utf-8')
|
||
logger.info(f"Системный промпт загружен из {self._system_prompt_path}")
|
||
else:
|
||
self._system_prompt = ""
|
||
logger.warning(f"Системный промпт не найден: {self._system_prompt_path}")
|
||
except Exception as e:
|
||
logger.error(f"Ошибка загрузки системного промпта: {e}")
|
||
self._system_prompt = ""
|
||
|
||
return self._system_prompt
|
||
|
||
def get_session(self, user_id: int) -> Optional[QwenSession]:
|
||
"""Получить сессию пользователя."""
|
||
session = self._sessions.get(user_id)
|
||
if session and session.is_expired():
|
||
self.close_session(user_id)
|
||
return None
|
||
return session
|
||
|
||
def create_session(self, user_id: int) -> QwenSession:
|
||
"""Создать новую сессию."""
|
||
session = QwenSession(user_id=user_id)
|
||
self._sessions[user_id] = session
|
||
logger.info(f"Создана сессия Qwen Code для пользователя {user_id}")
|
||
return session
|
||
|
||
def close_session(self, user_id: int):
|
||
"""Закрыть сессию пользователя."""
|
||
session = self._sessions.pop(user_id, None)
|
||
if session and session.process:
|
||
try:
|
||
session.process.terminate()
|
||
session.process.wait(timeout=5)
|
||
except Exception as e:
|
||
logger.warning(f"Ошибка при закрытии сессии Qwen: {e}")
|
||
logger.info(f"Закрыта сессия Qwen Code для пользователя {user_id}")
|
||
|
||
def has_active_session(self, user_id: int) -> bool:
|
||
"""Проверка наличия активной сессии."""
|
||
session = self.get_session(user_id)
|
||
return session is not None and session.state != QwenSessionState.ERROR
|
||
|
||
async def run_task(self, user_id: int, task: str,
|
||
on_output: Callable[[str], Any],
|
||
on_oauth_url: Callable[[str], Any],
|
||
use_system_prompt: bool = True) -> str:
|
||
"""
|
||
Выполнить задачу в Qwen Code.
|
||
Для простоты каждый раз запускаем новый процесс.
|
||
|
||
Args:
|
||
user_id: ID пользователя
|
||
task: Задача для выполнения
|
||
on_output: Callback для вывода
|
||
on_oauth_url: Callback для OAuth URL
|
||
use_system_prompt: Добавить системный промпт (default: True)
|
||
"""
|
||
# Создаём временную сессию для отслеживания
|
||
session = self.get_session(user_id)
|
||
if not session:
|
||
session = self.create_session(user_id)
|
||
|
||
session.last_activity = datetime.now()
|
||
session.pending_task = task
|
||
|
||
# Добавляем системный промпт если нужно
|
||
if use_system_prompt:
|
||
system_prompt = self.load_system_prompt()
|
||
if system_prompt:
|
||
full_task = f"{system_prompt}\n\n=== ЗАПРОС ПОЛЬЗОВАТЕЛЯ ===\n{task}"
|
||
else:
|
||
full_task = task
|
||
else:
|
||
full_task = task
|
||
|
||
# Просто выполняем задачу через -p флаг
|
||
return await self._execute_task(session, full_task, on_output)
|
||
|
||
async def _start_session(self, session: QwenSession,
|
||
on_output: Callable[[str], Any],
|
||
on_oauth_url: Callable[[str], Any],
|
||
pending_task: str = None) -> str:
|
||
"""Запустить сессию Qwen Code."""
|
||
session.state = QwenSessionState.STARTING
|
||
|
||
try:
|
||
# Запускаем qwen в интерактивном режиме с JSON выводом
|
||
env = os.environ.copy()
|
||
env["FORCE_COLOR"] = "0" # Отключаем цвета для парсинга
|
||
|
||
cmd = [
|
||
self._qwen_command,
|
||
"--output-format", "stream-json",
|
||
"--input-format", "text",
|
||
]
|
||
|
||
logger.info(f"Запуск Qwen Code: {' '.join(cmd)}")
|
||
|
||
session.process = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
cwd=self._working_dir,
|
||
env=env,
|
||
text=True,
|
||
bufsize=1
|
||
)
|
||
|
||
# Читаем вывод пока не поймём состояние
|
||
output = ""
|
||
oauth_detected = False
|
||
|
||
while True:
|
||
line = session.process.stdout.readline()
|
||
if not line:
|
||
break
|
||
|
||
output += line
|
||
on_output(line)
|
||
|
||
# Проверяем на OAuth URL
|
||
oauth_match = re.search(
|
||
r'https://oauth\.qwen\.ai/[^>\s]+|'
|
||
r'https://[^>\s]*qwen[^>\s]*/oauth[^>\s]*|'
|
||
r'Authorize.*?https?://[^\s]+',
|
||
line,
|
||
re.IGNORECASE
|
||
)
|
||
|
||
if oauth_match:
|
||
oauth_url = oauth_match.group(0)
|
||
session.oauth_url = oauth_url
|
||
session.state = QwenSessionState.WAITING_FOR_OAUTH
|
||
on_oauth_url(oauth_url)
|
||
oauth_detected = True
|
||
logger.info(f"Обнаружен OAuth URL: {oauth_url}")
|
||
break
|
||
|
||
# Проверяем на готовность
|
||
if "ready" in line.lower() or "assistant" in line.lower():
|
||
session.state = QwenSessionState.READY
|
||
logger.info("Сессия Qwen Code готова")
|
||
break
|
||
|
||
# Таймаут запуска
|
||
if session.process.poll() is not None:
|
||
session.state = QwenSessionState.ERROR
|
||
return f"❌ Ошибка запуска Qwen Code: {output}"
|
||
|
||
# Если после запуска есть отложенная задача — выполняем
|
||
if pending_task and session.state == QwenSessionState.READY:
|
||
return await self._execute_task(session, pending_task, on_output)
|
||
|
||
if oauth_detected:
|
||
return "⏳ Ожидание авторизации..."
|
||
|
||
return "✅ Сессия запущена"
|
||
|
||
except Exception as e:
|
||
session.state = QwenSessionState.ERROR
|
||
logger.error(f"Ошибка запуска сессии Qwen: {e}")
|
||
return f"❌ Ошибка: {str(e)}"
|
||
|
||
async def _execute_task(self, session: QwenSession,
|
||
task: str,
|
||
on_output: Callable[[str], Any]) -> str:
|
||
"""Выполнить задачу в активной сессии."""
|
||
session.state = QwenSessionState.BUSY
|
||
session.output_buffer = ""
|
||
|
||
try:
|
||
# Для неинтерактивного режима используем -p
|
||
env = os.environ.copy()
|
||
env["FORCE_COLOR"] = "0"
|
||
|
||
cmd = [
|
||
self._qwen_command,
|
||
"-p", task, # Передаём задачу через флаг -p
|
||
"--output-format", "text", # Простой текстовый вывод
|
||
"--yolo", # Автоматическое подтверждение всех действий
|
||
]
|
||
|
||
logger.info(f"Выполнение задачи: {' '.join(cmd)}")
|
||
|
||
process = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
cwd=self._working_dir,
|
||
env=env,
|
||
text=True,
|
||
bufsize=1
|
||
)
|
||
|
||
# Читаем вывод
|
||
output = ""
|
||
timeout = 300 # 5 минут на выполнение
|
||
|
||
start_time = datetime.now()
|
||
|
||
while True:
|
||
# Проверяем таймаут
|
||
if (datetime.now() - start_time).total_seconds() > timeout:
|
||
output += "\n\n⚠️ Таймаут выполнения (5 минут)"
|
||
process.terminate()
|
||
break
|
||
|
||
# Проверяем процесс
|
||
if process.poll() is not None:
|
||
# Процесс завершился
|
||
remaining = process.stdout.read()
|
||
if remaining:
|
||
output += remaining
|
||
on_output(remaining)
|
||
break
|
||
|
||
# Читаем вывод
|
||
line = process.stdout.readline()
|
||
if line:
|
||
output += line
|
||
session.output_buffer += line
|
||
on_output(line)
|
||
|
||
# Небольшая пауза
|
||
await asyncio.sleep(0.1)
|
||
|
||
session.state = QwenSessionState.READY
|
||
session.last_activity = datetime.now()
|
||
|
||
return output.strip()
|
||
|
||
except Exception as e:
|
||
session.state = QwenSessionState.ERROR
|
||
logger.error(f"Ошибка выполнения задачи: {e}")
|
||
return f"❌ Ошибка: {str(e)}"
|
||
|
||
def _parse_output(self, output: str) -> str:
|
||
"""
|
||
Распарсить JSON вывод qwen-code.
|
||
Если вывод не JSON — вернуть как есть.
|
||
"""
|
||
# Пока просто возвращаем очищенный вывод
|
||
# В будущем можно парсить JSON stream-format
|
||
lines = output.split('\n')
|
||
cleaned = []
|
||
|
||
for line in lines:
|
||
# Убираем служебные сообщения
|
||
if line.strip() and not line.startswith('{'):
|
||
cleaned.append(line)
|
||
|
||
return '\n'.join(cleaned) if cleaned else output
|
||
|
||
|
||
# Глобальный менеджер
|
||
qwen_manager = QwenCodeManager()
|