telegram-cli-bot/qwen_integration.py

276 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Интеграция с Qwen Code CLI.
Запуск, управление сессиями, обработка OAuth.
"""
import os
import re
import asyncio
import subprocess
import logging
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Callable, Any
from enum import Enum
logger = logging.getLogger(__name__)
class QwenSessionState(Enum):
"""Состояние сессии Qwen Code."""
STARTING = "starting"
WAITING_FOR_OAUTH = "waiting_for_oauth"
READY = "ready"
BUSY = "busy"
ERROR = "error"
@dataclass
class QwenSession:
"""Сессия Qwen Code."""
user_id: int
state: QwenSessionState = QwenSessionState.STARTING
process: Optional[subprocess.Popen] = None
oauth_url: Optional[str] = None
last_activity: datetime = field(default_factory=datetime.now)
pending_task: Optional[str] = None
output_buffer: str = ""
SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности
def is_expired(self) -> bool:
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class QwenCodeManager:
"""Менеджер сессий Qwen Code."""
def __init__(self, working_dir: str = None):
self._sessions: Dict[int, QwenSession] = {}
self._working_dir = working_dir or str(Path.home())
self._qwen_command = "qwen"
def get_session(self, user_id: int) -> Optional[QwenSession]:
"""Получить сессию пользователя."""
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def create_session(self, user_id: int) -> QwenSession:
"""Создать новую сессию."""
session = QwenSession(user_id=user_id)
self._sessions[user_id] = session
logger.info(f"Создана сессия Qwen Code для пользователя {user_id}")
return session
def close_session(self, user_id: int):
"""Закрыть сессию пользователя."""
session = self._sessions.pop(user_id, None)
if session and session.process:
try:
session.process.terminate()
session.process.wait(timeout=5)
except Exception as e:
logger.warning(f"Ошибка при закрытии сессии Qwen: {e}")
logger.info(f"Закрыта сессия Qwen Code для пользователя {user_id}")
def has_active_session(self, user_id: int) -> bool:
"""Проверка наличия активной сессии."""
session = self.get_session(user_id)
return session is not None and session.state != QwenSessionState.ERROR
async def run_task(self, user_id: int, task: str,
on_output: Callable[[str], Any],
on_oauth_url: Callable[[str], Any]) -> str:
"""
Выполнить задачу в Qwen Code.
Args:
user_id: ID пользователя
task: Задача для выполнения
on_output: Callback для вывода (вызывается при появлении вывода)
on_oauth_url: Callback для OAuth URL (вызывается если нужна авторизация)
Returns:
Результат выполнения
"""
session = self.get_session(user_id)
# Если сессии нет или она в ошибке — создаём новую
if not session or session.state == QwenSessionState.ERROR:
session = self.create_session(user_id)
session.last_activity = datetime.now()
session.pending_task = task
# Если сессия ещё не готова (ожидает OAuth или запуска)
if session.state in [QwenSessionState.STARTING, QwenSessionState.WAITING_FOR_OAUTH]:
return await self._start_session(session, on_output, on_oauth_url, task)
# Если сессия готова — выполняем задачу
return await self._execute_task(session, task, on_output)
async def _start_session(self, session: QwenSession,
on_output: Callable[[str], Any],
on_oauth_url: Callable[[str], Any],
pending_task: str = None) -> str:
"""Запустить сессию Qwen Code."""
session.state = QwenSessionState.STARTING
try:
# Запускаем qwen в интерактивном режиме с JSON выводом
env = os.environ.copy()
env["FORCE_COLOR"] = "0" # Отключаем цвета для парсинга
cmd = [
self._qwen_command,
"--output-format", "stream-json",
"--input-format", "text",
]
logger.info(f"Запуск Qwen Code: {' '.join(cmd)}")
session.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=self._working_dir,
env=env,
text=True,
bufsize=1
)
# Читаем вывод пока не поймём состояние
output = ""
oauth_detected = False
while True:
line = session.process.stdout.readline()
if not line:
break
output += line
on_output(line)
# Проверяем на OAuth URL
oauth_match = re.search(
r'https://oauth\.qwen\.ai/[^>\s]+|'
r'https://[^>\s]*qwen[^>\s]*/oauth[^>\s]*|'
r'Authorize.*?https?://[^\s]+',
line,
re.IGNORECASE
)
if oauth_match:
oauth_url = oauth_match.group(0)
session.oauth_url = oauth_url
session.state = QwenSessionState.WAITING_FOR_OAUTH
on_oauth_url(oauth_url)
oauth_detected = True
logger.info(f"Обнаружен OAuth URL: {oauth_url}")
break
# Проверяем на готовность
if "ready" in line.lower() or "assistant" in line.lower():
session.state = QwenSessionState.READY
logger.info("Сессия Qwen Code готова")
break
# Таймаут запуска
if session.process.poll() is not None:
session.state = QwenSessionState.ERROR
return f"❌ Ошибка запуска Qwen Code: {output}"
# Если после запуска есть отложенная задача — выполняем
if pending_task and session.state == QwenSessionState.READY:
return await self._execute_task(session, pending_task, on_output)
if oauth_detected:
return "⏳ Ожидание авторизации..."
return "✅ Сессия запущена"
except Exception as e:
session.state = QwenSessionState.ERROR
logger.error(f"Ошибка запуска сессии Qwen: {e}")
return f"❌ Ошибка: {str(e)}"
async def _execute_task(self, session: QwenSession,
task: str,
on_output: Callable[[str], Any]) -> str:
"""Выполнить задачу в активной сессии."""
session.state = QwenSessionState.BUSY
session.output_buffer = ""
try:
# Отправляем задачу
session.process.stdin.write(task + "\n")
session.process.stdin.flush()
# Читаем ответ
output = ""
timeout = 300 # 5 минут на выполнение
start_time = datetime.now()
while True:
# Проверяем таймаут
if (datetime.now() - start_time).total_seconds() > timeout:
output += "\n\n⚠️ Таймаут выполнения (5 минут)"
break
# Проверяем процесс
if session.process.poll() is not None:
# Процесс завершился
remaining = session.process.stdout.read()
if remaining:
output += remaining
on_output(remaining)
break
# Читаем вывод
line = session.process.stdout.readline()
if line:
output += line
session.output_buffer += line
on_output(line)
# Небольшая пауза чтобы не блокировать
await asyncio.sleep(0.1)
session.state = QwenSessionState.READY
session.last_activity = datetime.now()
return self._parse_output(output)
except Exception as e:
session.state = QwenSessionState.ERROR
logger.error(f"Ошибка выполнения задачи: {e}")
return f"❌ Ошибка: {str(e)}"
def _parse_output(self, output: str) -> str:
"""
Распарсить JSON вывод qwen-code.
Если вывод не JSON — вернуть как есть.
"""
# Пока просто возвращаем очищенный вывод
# В будущем можно парсить JSON stream-format
lines = output.split('\n')
cleaned = []
for line in lines:
# Убираем служебные сообщения
if line.strip() and not line.startswith('{'):
cleaned.append(line)
return '\n'.join(cleaned) if cleaned else output
# Глобальный менеджер
qwen_manager = QwenCodeManager()