telegram-cli-bot/qwen_integration.py

489 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Интеграция с Qwen Code CLI.
Запуск, управление сессиями, обработка OAuth.
Использует stream-json формат для потокового вывода.
"""
import os
import re
import asyncio
import subprocess
import json
import logging
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Callable, Any, List, Union
from enum import Enum
logger = logging.getLogger(__name__)
class QwenSessionState(Enum):
"""Состояние сессии Qwen Code."""
STARTING = "starting"
WAITING_FOR_OAUTH = "waiting_for_oauth"
READY = "ready"
BUSY = "busy"
ERROR = "error"
class QwenEventType(Enum):
"""Типы событий в stream-json выводе Qwen."""
SYSTEM = "system"
ASSISTANT = "assistant"
USER = "user"
RESULT = "result"
TOOL_USE = "tool_use"
@dataclass
class QwenStreamEvent:
"""Событие из stream-json вывода Qwen."""
event_type: QwenEventType
subtype: Optional[str] = None
uuid: Optional[str] = None
session_id: Optional[str] = None
message: Optional[Dict] = None
content: Optional[str] = None
is_error: bool = False
data: Optional[Dict] = None
@dataclass
class QwenSession:
"""Сессия Qwen Code."""
user_id: int
state: QwenSessionState = QwenSessionState.STARTING
process: Optional[subprocess.Popen] = None
oauth_url: Optional[str] = None
last_activity: datetime = field(default_factory=datetime.now)
pending_task: Optional[str] = None
output_buffer: str = ""
session_id: Optional[str] = None
SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности
def is_expired(self) -> bool:
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
class QwenCodeManager:
"""Менеджер сессий Qwen Code."""
def __init__(self, working_dir: str = None, system_prompt_path: str = None):
self._sessions: Dict[int, QwenSession] = {}
self._working_dir = working_dir or str(Path.home())
self._qwen_command = "qwen"
self._system_prompt_path = system_prompt_path or str(Path(__file__).parent / "system_prompt.md")
self._system_prompt: Optional[str] = None
def load_system_prompt(self) -> str:
"""Загрузить системный промпт из файла."""
if self._system_prompt is not None:
return self._system_prompt
try:
prompt_path = Path(self._system_prompt_path)
if prompt_path.exists():
self._system_prompt = prompt_path.read_text(encoding='utf-8')
logger.info(f"Системный промпт загружен из {self._system_prompt_path}")
else:
self._system_prompt = ""
logger.warning(f"Системный промпт не найден: {self._system_prompt_path}")
except Exception as e:
logger.error(f"Ошибка загрузки системного промпта: {e}")
self._system_prompt = ""
return self._system_prompt
def get_session(self, user_id: int) -> Optional[QwenSession]:
"""Получить сессию пользователя."""
session = self._sessions.get(user_id)
if session and session.is_expired():
self.close_session(user_id)
return None
return session
def create_session(self, user_id: int) -> QwenSession:
"""Создать новую сессию."""
session = QwenSession(user_id=user_id)
self._sessions[user_id] = session
logger.info(f"Создана сессия Qwen Code для пользователя {user_id}")
return session
def close_session(self, user_id: int):
"""Закрыть сессию пользователя."""
session = self._sessions.pop(user_id, None)
if session and session.process:
try:
session.process.terminate()
session.process.wait(timeout=5)
except Exception as e:
logger.warning(f"Ошибка при закрытии сессии Qwen: {e}")
logger.info(f"Закрыта сессия Qwen Code для пользователя {user_id}")
def has_active_session(self, user_id: int) -> bool:
"""Проверка наличия активной сессии."""
session = self.get_session(user_id)
return session is not None and session.state != QwenSessionState.ERROR
async def run_task(self, user_id: int, task: str,
on_output: Callable[[str], Any],
on_oauth_url: Callable[[str], Any],
use_system_prompt: bool = True,
on_chunk: Callable[[str], Any] = None,
on_event: Callable[[QwenStreamEvent], Any] = None) -> str:
"""
Выполнить задачу в Qwen Code с потоковым выводом.
Args:
user_id: ID пользователя
task: Задача для выполнения
on_output: Callback для вывода (накапливается)
on_oauth_url: Callback для OAuth URL
use_system_prompt: Добавить системный промпт (default: True)
on_chunk: Callback для потоковой отправки chunks (опционально)
on_event: Callback для событий stream-json (опционально)
"""
# Создаём временную сессию для отслеживания
session = self.get_session(user_id)
if not session:
session = self.create_session(user_id)
session.last_activity = datetime.now()
session.pending_task = task
# Добавляем системный промпт если нужно
if use_system_prompt:
system_prompt = self.load_system_prompt()
if system_prompt:
full_task = f"{system_prompt}\n\n=== ЗАПРОС ПОЛЬЗОВАТЕЛЯ ===\n{task}"
else:
full_task = task
else:
full_task = task
# Выполняем задачу через -p флаг с stream-json выводом
return await self._execute_task(session, full_task, on_output, on_chunk, on_event)
async def _start_session(self, session: QwenSession,
on_output: Callable[[str], Any],
on_oauth_url: Callable[[str], Any],
pending_task: str = None) -> str:
"""Запустить сессию Qwen Code."""
session.state = QwenSessionState.STARTING
try:
# Запускаем qwen в интерактивном режиме с JSON выводом
env = os.environ.copy()
env["FORCE_COLOR"] = "0" # Отключаем цвета для парсинга
cmd = [
self._qwen_command,
"--output-format", "stream-json",
"--input-format", "text",
]
logger.info(f"Запуск Qwen Code: {' '.join(cmd)}")
session.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=self._working_dir,
env=env,
text=True,
bufsize=1
)
# Читаем вывод пока не поймём состояние
output = ""
oauth_detected = False
while True:
line = session.process.stdout.readline()
if not line:
break
output += line
on_output(line)
# Проверяем на OAuth URL
oauth_match = re.search(
r'https://oauth\.qwen\.ai/[^>\s]+|'
r'https://[^>\s]*qwen[^>\s]*/oauth[^>\s]*|'
r'Authorize.*?https?://[^\s]+',
line,
re.IGNORECASE
)
if oauth_match:
oauth_url = oauth_match.group(0)
session.oauth_url = oauth_url
session.state = QwenSessionState.WAITING_FOR_OAUTH
on_oauth_url(oauth_url)
oauth_detected = True
logger.info(f"Обнаружен OAuth URL: {oauth_url}")
break
# Проверяем на готовность
if "ready" in line.lower() or "assistant" in line.lower():
session.state = QwenSessionState.READY
logger.info("Сессия Qwen Code готова")
break
# Таймаут запуска
if session.process.poll() is not None:
session.state = QwenSessionState.ERROR
return f"❌ Ошибка запуска Qwen Code: {output}"
# Если после запуска есть отложенная задача — выполняем
if pending_task and session.state == QwenSessionState.READY:
return await self._execute_task(session, pending_task, on_output)
if oauth_detected:
return "⏳ Ожидание авторизации..."
return "✅ Сессия запущена"
except Exception as e:
session.state = QwenSessionState.ERROR
logger.error(f"Ошибка запуска сессии Qwen: {e}")
return f"❌ Ошибка: {str(e)}"
async def _execute_task(self, session: QwenSession,
task: str,
on_output: Callable[[str], Any],
on_chunk: Callable[[str], Any] = None,
on_event: Callable[[QwenStreamEvent], Any] = None) -> str:
"""
Выполнить задачу в активной сессии с потоковым stream-json выводом.
Формат stream-json возвращает JSON-объекты по одному на строку:
{"type":"system","subtype":"session_start","uuid":"...","session_id":"..."}
{"type":"assistant","uuid":"...","message":{"content":[...]}}
{"type":"result","subtype":"success","uuid":"...","result":"..."}
Args:
session: Сессия Qwen
task: Задача для выполнения
on_output: Callback для полного вывода (накапливается)
on_chunk: Callback для потоковой отправки текстовых chunks
on_event: Callback для полных JSON событий
"""
session.state = QwenSessionState.BUSY
session.output_buffer = ""
try:
env = os.environ.copy()
env["FORCE_COLOR"] = "0"
cmd = [
self._qwen_command,
"-p", task,
"--output-format", "stream-json", # Правильный streaming формат
"--yolo", # Авто-подтверждение
]
logger.info(f"Выполнение задачи (stream-json): {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=self._working_dir,
env=env
)
output = ""
chunk_timeout = 300 # 5 минут на выполнение
last_chunk_time = datetime.now()
partial_content = "" # Для накопления partial messages
while True:
# Проверяем общий таймаут
if (datetime.now() - last_chunk_time).total_seconds() > chunk_timeout:
output += "\n\n⚠️ Таймаут выполнения (5 минут)"
process.terminate()
break
# Проверяем процесс
if process.returncode is not None:
# Процесс завершился - читаем остаток
remaining = await process.stdout.read()
if remaining:
remaining_str = remaining.decode('utf-8', errors='replace')
output += remaining_str
# Парсим оставшиеся JSON события (не отправляем сырой вывод!)
await self._process_stream_lines(
remaining_str, on_output, on_chunk, on_event, session
)
break
# Читаем строку из stdout
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0)
if line:
line_str = line.decode('utf-8', errors='replace')
output += line_str
session.output_buffer += line_str
last_chunk_time = datetime.now()
# Парсим JSON событие и извлекаем текст
await self._process_stream_lines(
line_str, on_output, on_chunk, on_event, session
)
except asyncio.TimeoutError:
if process.returncode is not None:
break
continue
await asyncio.sleep(0.01)
session.state = QwenSessionState.READY
session.last_activity = datetime.now()
return output.strip()
except Exception as e:
session.state = QwenSessionState.ERROR
logger.error(f"Ошибка выполнения задачи: {e}")
return f"❌ Ошибка: {str(e)}"
async def _process_stream_lines(self,
text: str,
on_output: Callable[[str], Any],
on_chunk: Callable[[str], Any] = None,
on_event: Callable[[QwenStreamEvent], Any] = None,
session: QwenSession = None) -> str:
"""
Распарсить stream-json строки и извлечь текстовый контент.
Формат JSON событий:
- {"type":"system","subtype":"session_start","session_id":"..."}
- {"type":"assistant","message":{"content":[{"type":"text","text":"..."}]}}
- {"type":"result","subtype":"success","result":"...","duration_ms":1234}
Возвращает только текстовый контент для отображения пользователю.
"""
extracted_text = ""
for line in text.split('\n'):
line = line.strip()
if not line:
continue
# Проверяем это JSON или обычный текст
if line.startswith('{'):
try:
event_data = json.loads(line)
event_type = event_data.get('type', 'unknown')
# Создаём объект события
stream_event = QwenStreamEvent(
event_type=QwenEventType(event_type) if event_type in ['system', 'assistant', 'user', 'result', 'tool_use'] else None,
subtype=event_data.get('subtype'),
uuid=event_data.get('uuid'),
session_id=event_data.get('session_id'),
message=event_data.get('message'),
is_error=event_data.get('is_error', False),
data=event_data
)
# Обновляем session_id из события
if stream_event.session_id and session:
session.session_id = stream_event.session_id
# Извлекаем текст из разных типов событий
if event_type == 'assistant':
message = event_data.get('message', {})
content_list = message.get('content', [])
# Логируем для отладки
logger.debug(f"Assistant event: content_type={type(content_list)}, content={content_list[:1] if isinstance(content_list, list) else content_list}")
# Обрабатываем только если content - это список (не thinking)
if isinstance(content_list, list):
for content_item in content_list:
if isinstance(content_item, dict):
if content_item.get('type') == 'text':
text_content = content_item.get('text', '')
logger.debug(f"Text chunk: {text_content[:50]}...")
extracted_text += text_content
# Отправляем ТОЛЬКО в on_chunk для streaming
if on_chunk:
await on_chunk(text_content)
elif content_item.get('type') == 'tool_use':
# Инструмент используется - можно показать статус
tool_name = content_item.get('name', 'unknown')
# Добавляем переносы строк для разделения блоков
status_text = f"\n🔧 Использую инструмент: {tool_name}...\n"
extracted_text += status_text
if on_chunk:
await on_chunk(status_text)
# Если content.type == 'thinking' - не отправляем пользователю
elif event_type == 'result':
result_text = event_data.get('result', '')
if result_text:
extracted_text += result_text
# НЕ отправляем result через on_chunk — он уже был отправлен через assistant chunks
logger.debug(f"Result event: {result_text[:50]}...")
# Проверяем на ошибку
if event_data.get('is_error'):
error_text = event_data.get('error', 'Неизвестная ошибка')
logger.error(f"Ошибка Qwen: {error_text}")
elif event_type == 'system':
subtype = event_data.get('subtype', '')
if subtype == 'session_start':
logger.info(f"Сессия Qwen запущена: {stream_event.session_id}")
elif subtype == 'init':
# Игнорируем init событие
pass
# Вызываем callback события если есть
if on_event:
on_event(stream_event)
except json.JSONDecodeError as e:
# Не JSON строка - возвращаем как текст
logger.debug(f"Не JSON строка: {line[:100]}...")
extracted_text += line + "\n"
if on_chunk:
await on_chunk(line + "\n")
else:
# Обычный текст (не JSON) - например, приветственное сообщение
extracted_text += line + "\n"
if on_chunk:
await on_chunk(line + "\n")
return extracted_text
def _parse_output(self, output: str) -> str:
"""
Распарсить JSON вывод qwen-code.
Если вывод не JSON — вернуть как есть.
"""
# Пока просто возвращаем очищенный вывод
# В будущем можно парсить JSON stream-format
lines = output.split('\n')
cleaned = []
for line in lines:
# Убираем служебные сообщения
if line.strip() and not line.startswith('{'):
cleaned.append(line)
return '\n'.join(cleaned) if cleaned else output
# Глобальный менеджер
qwen_manager = QwenCodeManager()