725 lines
32 KiB
Python
725 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Интеграция с Qwen Code CLI.
|
||
Запуск, управление сессиями, обработка OAuth.
|
||
|
||
Использует stream-json формат для потокового вывода.
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import asyncio
|
||
import subprocess
|
||
import json
|
||
import logging
|
||
from pathlib import Path
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict, Callable, Any, List, Union
|
||
from enum import Enum
|
||
|
||
# Импортируем OAuth модуль
|
||
from bot.utils.qwen_oauth import (
|
||
get_authorization_url,
|
||
check_authorization_complete,
|
||
is_authorized,
|
||
get_access_token,
|
||
clear_authorization
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class QwenSessionState(Enum):
|
||
"""Состояние сессии Qwen Code."""
|
||
STARTING = "starting"
|
||
WAITING_FOR_OAUTH = "waiting_for_oauth"
|
||
READY = "ready"
|
||
BUSY = "busy"
|
||
ERROR = "error"
|
||
|
||
|
||
class QwenEventType(Enum):
|
||
"""Типы событий в stream-json выводе Qwen."""
|
||
SYSTEM = "system"
|
||
ASSISTANT = "assistant"
|
||
USER = "user"
|
||
RESULT = "result"
|
||
TOOL_USE = "tool_use"
|
||
|
||
|
||
@dataclass
|
||
class QwenStreamEvent:
|
||
"""Событие из stream-json вывода Qwen."""
|
||
event_type: QwenEventType
|
||
subtype: Optional[str] = None
|
||
uuid: Optional[str] = None
|
||
session_id: Optional[str] = None
|
||
message: Optional[Dict] = None
|
||
content: Optional[str] = None
|
||
is_error: bool = False
|
||
data: Optional[Dict] = None
|
||
|
||
|
||
@dataclass
|
||
class QwenSession:
|
||
"""Сессия Qwen Code."""
|
||
user_id: int
|
||
state: QwenSessionState = QwenSessionState.STARTING
|
||
process: Optional[subprocess.Popen] = None
|
||
oauth_url: Optional[str] = None
|
||
on_oauth_url: Optional[Callable] = None # Callback для OAuth URL
|
||
last_activity: datetime = field(default_factory=datetime.now)
|
||
pending_task: Optional[str] = None
|
||
output_buffer: str = ""
|
||
session_id: Optional[str] = None
|
||
|
||
SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности
|
||
|
||
def is_expired(self) -> bool:
|
||
return datetime.now() - self.last_activity > self.SESSION_TIMEOUT
|
||
|
||
|
||
class QwenCodeManager:
|
||
"""Менеджер сессий Qwen Code."""
|
||
|
||
def __init__(self, working_dir: str = None, system_prompt_path: str = None):
|
||
self._sessions: Dict[int, QwenSession] = {}
|
||
self._working_dir = working_dir or str(Path.home())
|
||
self._qwen_command = "qwen"
|
||
self._system_prompt_path = system_prompt_path or str(Path(__file__).parent / "system_prompt.md")
|
||
self._system_prompt: Optional[str] = None
|
||
|
||
def load_system_prompt(self) -> str:
|
||
"""Загрузить системный промпт из файла."""
|
||
if self._system_prompt is not None:
|
||
return self._system_prompt
|
||
|
||
try:
|
||
prompt_path = Path(self._system_prompt_path)
|
||
if prompt_path.exists():
|
||
self._system_prompt = prompt_path.read_text(encoding='utf-8')
|
||
logger.info(f"Системный промпт загружен из {self._system_prompt_path}")
|
||
else:
|
||
self._system_prompt = ""
|
||
logger.warning(f"Системный промпт не найден: {self._system_prompt_path}")
|
||
except Exception as e:
|
||
logger.error(f"Ошибка загрузки системного промпта: {e}")
|
||
self._system_prompt = ""
|
||
|
||
return self._system_prompt
|
||
|
||
def get_session(self, user_id: int) -> Optional[QwenSession]:
|
||
"""Получить сессию пользователя."""
|
||
session = self._sessions.get(user_id)
|
||
if session and session.is_expired():
|
||
self.close_session(user_id)
|
||
return None
|
||
return session
|
||
|
||
def create_session(self, user_id: int) -> QwenSession:
|
||
"""Создать новую сессию."""
|
||
session = QwenSession(user_id=user_id)
|
||
self._sessions[user_id] = session
|
||
logger.info(f"Создана сессия Qwen Code для пользователя {user_id}")
|
||
return session
|
||
|
||
async def get_oauth_url(self) -> Optional[str]:
|
||
"""
|
||
Получить OAuth ссылку для авторизации Qwen Code.
|
||
|
||
Returns:
|
||
OAuth URL или None если не удалось получить
|
||
"""
|
||
try:
|
||
import aiohttp
|
||
|
||
# Генерируем code verifier и challenge (упрощённо)
|
||
import hashlib
|
||
import secrets
|
||
import uuid
|
||
|
||
code_verifier = secrets.token_urlsafe(32)
|
||
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
|
||
|
||
# Запрос на получение device code (form-urlencoded как в оригинале)
|
||
payload = {
|
||
'client_id': QWEN_OAUTH_CLIENT_ID,
|
||
'scope': 'openid profile email model.completion',
|
||
'code_challenge': code_challenge,
|
||
'code_challenge_method': 'S256'
|
||
}
|
||
|
||
headers = {
|
||
'Content-Type': 'application/x-www-form-urlencoded',
|
||
'Accept': 'application/json',
|
||
'x-request-id': str(uuid.uuid4()),
|
||
'User-Agent': 'qwen-code-cli/0.11.0'
|
||
}
|
||
|
||
# Формируем form-urlencoded тело
|
||
form_data = '&'.join(f'{k}={v}' for k, v in payload.items())
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(
|
||
QWEN_OAUTH_DEVICE_CODE_ENDPOINT,
|
||
data=form_data,
|
||
headers=headers
|
||
) as resp:
|
||
text = await resp.text()
|
||
if resp.status == 200:
|
||
try:
|
||
data = await resp.json()
|
||
verification_uri = data.get('verification_uri_complete', '')
|
||
logger.info(f"Получен OAuth URL: {verification_uri}")
|
||
return verification_uri
|
||
except Exception as json_err:
|
||
logger.error(f"Ошибка парсинга JSON: {json_err}")
|
||
logger.debug(f"Ответ сервера: {text[:200]}")
|
||
else:
|
||
logger.error(f"Ошибка получения OAuth: {resp.status}")
|
||
logger.debug(f"Ответ сервера: {text[:200]}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка получения OAuth URL: {e}")
|
||
return None
|
||
|
||
def close_session(self, user_id: int):
|
||
"""Закрыть сессию пользователя."""
|
||
session = self._sessions.pop(user_id, None)
|
||
if session and session.process:
|
||
try:
|
||
session.process.terminate()
|
||
session.process.wait(timeout=5)
|
||
except Exception as e:
|
||
logger.warning(f"Ошибка при закрытии сессии Qwen: {e}")
|
||
logger.info(f"Закрыта сессия Qwen Code для пользователя {user_id}")
|
||
|
||
def has_active_session(self, user_id: int) -> bool:
|
||
"""Проверка наличия активной сессии."""
|
||
session = self.get_session(user_id)
|
||
return session is not None and session.state != QwenSessionState.ERROR
|
||
|
||
async def run_task(self, user_id: int, task: str,
|
||
on_output: Callable[[str], Any],
|
||
on_oauth_url: Callable[[str], Any],
|
||
use_system_prompt: bool = True,
|
||
on_chunk: Callable[[str], Any] = None,
|
||
on_event: Callable[[QwenStreamEvent], Any] = None) -> str:
|
||
"""
|
||
Выполнить задачу в Qwen Code с потоковым выводом.
|
||
|
||
Args:
|
||
user_id: ID пользователя
|
||
task: Задача для выполнения
|
||
on_output: Callback для вывода (накапливается)
|
||
on_oauth_url: Callback для OAuth URL
|
||
use_system_prompt: Добавить системный промпт (default: True)
|
||
on_chunk: Callback для потоковой отправки chunks (опционально)
|
||
on_event: Callback для событий stream-json (опционально)
|
||
"""
|
||
# Создаём временную сессию для отслеживания
|
||
session = self.get_session(user_id)
|
||
if not session:
|
||
session = self.create_session(user_id)
|
||
|
||
session.last_activity = datetime.now()
|
||
session.pending_task = task
|
||
session.on_oauth_url = on_oauth_url # Сохраняем callback для OAuth
|
||
|
||
# Добавляем системный промпт если нужно
|
||
if use_system_prompt:
|
||
system_prompt = self.load_system_prompt()
|
||
if system_prompt:
|
||
full_task = f"{system_prompt}\n\n=== ЗАПРОС ПОЛЬЗОВАТЕЛЯ ===\n{task}"
|
||
else:
|
||
full_task = task
|
||
else:
|
||
full_task = task
|
||
|
||
# Выполняем задачу через -p флаг с stream-json выводом
|
||
return await self._execute_task(session, full_task, on_output, on_chunk, on_event)
|
||
|
||
async def _start_session(self, session: QwenSession,
|
||
on_output: Callable[[str], Any],
|
||
on_oauth_url: Callable[[str], Any],
|
||
pending_task: str = None) -> str:
|
||
"""Запустить сессию Qwen Code."""
|
||
session.state = QwenSessionState.STARTING
|
||
|
||
try:
|
||
# Запускаем qwen в интерактивном режиме с JSON выводом
|
||
env = os.environ.copy()
|
||
env["FORCE_COLOR"] = "0" # Отключаем цвета для парсинга
|
||
|
||
cmd = [
|
||
self._qwen_command,
|
||
"--output-format", "stream-json",
|
||
"--input-format", "text",
|
||
"--auth-type", "qwen-oauth", # Явное указание типа авторизации
|
||
]
|
||
|
||
logger.info(f"Запуск Qwen Code: {' '.join(cmd)}")
|
||
|
||
session.process = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
cwd=self._working_dir,
|
||
env=env,
|
||
text=True,
|
||
bufsize=1
|
||
)
|
||
|
||
# Читаем вывод пока не поймём состояние
|
||
output = ""
|
||
oauth_detected = False
|
||
|
||
while True:
|
||
line = session.process.stdout.readline()
|
||
if not line:
|
||
break
|
||
|
||
output += line
|
||
on_output(line)
|
||
|
||
# Проверяем на OAuth URL
|
||
oauth_match = re.search(
|
||
r'https://oauth\.qwen\.ai/[^>\s]+|'
|
||
r'https://[^>\s]*qwen[^>\s]*/oauth[^>\s]*|'
|
||
r'Authorize.*?https?://[^\s]+',
|
||
line,
|
||
re.IGNORECASE
|
||
)
|
||
|
||
if oauth_match:
|
||
oauth_url = oauth_match.group(0)
|
||
session.oauth_url = oauth_url
|
||
session.state = QwenSessionState.WAITING_FOR_OAUTH
|
||
on_oauth_url(oauth_url)
|
||
oauth_detected = True
|
||
logger.info(f"Обнаружен OAuth URL: {oauth_url}")
|
||
break
|
||
|
||
# Проверяем на готовность
|
||
if "ready" in line.lower() or "assistant" in line.lower():
|
||
session.state = QwenSessionState.READY
|
||
logger.info("Сессия Qwen Code готова")
|
||
break
|
||
|
||
# Таймаут запуска
|
||
if session.process.poll() is not None:
|
||
session.state = QwenSessionState.ERROR
|
||
return f"❌ Ошибка запуска Qwen Code: {output}"
|
||
|
||
# Если после запуска есть отложенная задача — выполняем
|
||
if pending_task and session.state == QwenSessionState.READY:
|
||
return await self._execute_task(session, pending_task, on_output)
|
||
|
||
if oauth_detected:
|
||
return "⏳ Ожидание авторизации..."
|
||
|
||
return "✅ Сессия запущена"
|
||
|
||
except Exception as e:
|
||
session.state = QwenSessionState.ERROR
|
||
logger.error(f"Ошибка запуска сессии Qwen: {e}")
|
||
return f"❌ Ошибка: {str(e)}"
|
||
|
||
async def _execute_task(self, session: QwenSession,
|
||
task: str,
|
||
on_output: Callable[[str], Any],
|
||
on_chunk: Callable[[str], Any] = None,
|
||
on_event: Callable[[QwenStreamEvent], Any] = None) -> str:
|
||
"""
|
||
Выполнить задачу в активной сессии с потоковым stream-json выводом.
|
||
|
||
Формат stream-json возвращает JSON-объекты по одному на строку:
|
||
{"type":"system","subtype":"session_start","uuid":"...","session_id":"..."}
|
||
{"type":"assistant","uuid":"...","message":{"content":[...]}}
|
||
{"type":"result","subtype":"success","uuid":"...","result":"..."}
|
||
|
||
Args:
|
||
session: Сессия Qwen
|
||
task: Задача для выполнения
|
||
on_output: Callback для полного вывода (накапливается)
|
||
on_chunk: Callback для потоковой отправки текстовых chunks
|
||
on_event: Callback для полных JSON событий
|
||
"""
|
||
session.state = QwenSessionState.BUSY
|
||
session.output_buffer = ""
|
||
|
||
try:
|
||
env = os.environ.copy()
|
||
env["FORCE_COLOR"] = "0"
|
||
|
||
cmd = [
|
||
self._qwen_command,
|
||
"-p", task,
|
||
"--output-format", "stream-json", # Правильный streaming формат
|
||
"--auth-type", "qwen-oauth", # Явное указание типа авторизации
|
||
"--approval-mode", "yolo", # Авто-подтверждение действий
|
||
]
|
||
|
||
logger.info(f"Выполнение задачи (stream-json): {' '.join(cmd)}")
|
||
|
||
process = await asyncio.create_subprocess_exec(
|
||
*cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.STDOUT,
|
||
cwd=self._working_dir,
|
||
env=env
|
||
)
|
||
|
||
output = ""
|
||
chunk_timeout = 300 # 5 минут на выполнение
|
||
last_chunk_time = datetime.now()
|
||
partial_content = "" # Для накопления partial messages
|
||
|
||
while True:
|
||
# Проверяем общий таймаут
|
||
if (datetime.now() - last_chunk_time).total_seconds() > chunk_timeout:
|
||
output += "\n\n⚠️ Таймаут выполнения (5 минут)"
|
||
process.terminate()
|
||
break
|
||
|
||
# Проверяем процесс
|
||
if process.returncode is not None:
|
||
# Процесс завершился - читаем остаток
|
||
remaining = await process.stdout.read()
|
||
if remaining:
|
||
remaining_str = remaining.decode('utf-8', errors='replace')
|
||
output += remaining_str
|
||
# Парсим оставшиеся JSON события (не отправляем сырой вывод!)
|
||
await self._process_stream_lines(
|
||
remaining_str, on_output, on_chunk, on_event, session
|
||
)
|
||
break
|
||
|
||
# Читаем строку из stdout
|
||
try:
|
||
line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0)
|
||
if line:
|
||
line_str = line.decode('utf-8', errors='replace')
|
||
output += line_str
|
||
session.output_buffer += line_str
|
||
last_chunk_time = datetime.now()
|
||
|
||
# Проверяем на OAuth ссылку в текстовом выводе
|
||
oauth_match = re.search(
|
||
r'https://chat\.qwen\.ai/authorize\?user_code=([A-Za-z0-9_-]+)',
|
||
line_str
|
||
)
|
||
if oauth_match:
|
||
oauth_url = oauth_match.group(0)
|
||
logger.info(f"Обнаружена OAuth ссылка: {oauth_url}")
|
||
if session and session.on_oauth_url:
|
||
await session.on_oauth_url(oauth_url)
|
||
|
||
# Парсим JSON событие и извлекаем текст
|
||
await self._process_stream_lines(
|
||
line_str, on_output, on_chunk, on_event, session
|
||
)
|
||
|
||
except asyncio.TimeoutError:
|
||
if process.returncode is not None:
|
||
break
|
||
continue
|
||
|
||
await asyncio.sleep(0.01)
|
||
|
||
session.state = QwenSessionState.READY
|
||
session.last_activity = datetime.now()
|
||
|
||
return output.strip()
|
||
|
||
except Exception as e:
|
||
session.state = QwenSessionState.ERROR
|
||
logger.error(f"Ошибка выполнения задачи: {e}")
|
||
return f"❌ Ошибка: {str(e)}"
|
||
|
||
async def _process_stream_lines(self,
|
||
text: str,
|
||
on_output: Callable[[str], Any],
|
||
on_chunk: Callable[[str], Any] = None,
|
||
on_event: Callable[[QwenStreamEvent], Any] = None,
|
||
session: QwenSession = None) -> str:
|
||
"""
|
||
Распарсить stream-json строки и извлечь текстовый контент.
|
||
|
||
Формат JSON событий:
|
||
- {"type":"system","subtype":"session_start","session_id":"..."}
|
||
- {"type":"assistant","message":{"content":[{"type":"text","text":"..."}]}}
|
||
- {"type":"result","subtype":"success","result":"...","duration_ms":1234}
|
||
|
||
Возвращает только текстовый контент для отображения пользователю.
|
||
"""
|
||
extracted_text = ""
|
||
|
||
for line in text.split('\n'):
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# Проверяем это JSON или обычный текст
|
||
if line.startswith('{'):
|
||
try:
|
||
event_data = json.loads(line)
|
||
event_type = event_data.get('type', 'unknown')
|
||
|
||
# Создаём объект события
|
||
stream_event = QwenStreamEvent(
|
||
event_type=QwenEventType(event_type) if event_type in ['system', 'assistant', 'user', 'result', 'tool_use'] else None,
|
||
subtype=event_data.get('subtype'),
|
||
uuid=event_data.get('uuid'),
|
||
session_id=event_data.get('session_id'),
|
||
message=event_data.get('message'),
|
||
is_error=event_data.get('is_error', False),
|
||
data=event_data
|
||
)
|
||
|
||
# Обновляем session_id из события
|
||
if stream_event.session_id and session:
|
||
session.session_id = stream_event.session_id
|
||
|
||
# Извлекаем текст из разных типов событий
|
||
if event_type == 'assistant':
|
||
message = event_data.get('message', {})
|
||
content_list = message.get('content', [])
|
||
|
||
# Логируем для отладки
|
||
logger.debug(f"Assistant event: content_type={type(content_list)}, content={content_list[:1] if isinstance(content_list, list) else content_list}")
|
||
|
||
# Обрабатываем только если content - это список (не thinking)
|
||
if isinstance(content_list, list):
|
||
for content_item in content_list:
|
||
if isinstance(content_item, dict):
|
||
if content_item.get('type') == 'text':
|
||
text_content = content_item.get('text', '')
|
||
logger.debug(f"Text chunk: {text_content[:50]}...")
|
||
extracted_text += text_content
|
||
# Отправляем ТОЛЬКО в on_chunk для streaming
|
||
if on_chunk:
|
||
await on_chunk(text_content)
|
||
elif content_item.get('type') == 'tool_use':
|
||
# Инструмент используется - можно показать статус
|
||
tool_name = content_item.get('name', 'unknown')
|
||
# Добавляем переносы строк для разделения блоков
|
||
status_text = f"\n🔧 Использую инструмент: {tool_name}...\n"
|
||
extracted_text += status_text
|
||
if on_chunk:
|
||
await on_chunk(status_text)
|
||
# Если content.type == 'thinking' - не отправляем пользователю
|
||
|
||
elif event_type == 'result':
|
||
result_text = event_data.get('result', '')
|
||
if result_text:
|
||
extracted_text += result_text
|
||
# НЕ отправляем result через on_chunk — он уже был отправлен через assistant chunks
|
||
logger.debug(f"Result event: {result_text[:50]}...")
|
||
|
||
# Проверяем на ошибку
|
||
if event_data.get('is_error'):
|
||
error_obj = event_data.get('error', {})
|
||
error_message = error_obj.get('message', 'Неизвестная ошибка') if isinstance(error_obj, dict) else str(error_obj)
|
||
logger.error(f"Ошибка Qwen: {error_message}")
|
||
|
||
# Проверяем ошибку авторизации — получаем OAuth URL
|
||
if 'No auth type' in error_message or 'auth type is not selected' in error_message:
|
||
# Получаем OAuth URL через API
|
||
oauth_url = await self.get_oauth_url()
|
||
if not oauth_url:
|
||
oauth_url = f"{QWEN_OAUTH_BASE_URL}/"
|
||
|
||
logger.info(f"Требуется OAuth: {oauth_url}")
|
||
|
||
# Вызываем on_oauth_url если есть в session
|
||
if session and hasattr(session, 'on_oauth_url') and session.on_oauth_url:
|
||
await session.on_oauth_url(oauth_url)
|
||
|
||
elif event_type == 'system':
|
||
subtype = event_data.get('subtype', '')
|
||
if subtype == 'session_start':
|
||
logger.info(f"Сессия Qwen запущена: {stream_event.session_id}")
|
||
elif subtype == 'init':
|
||
# Игнорируем init событие
|
||
pass
|
||
|
||
# Вызываем callback события если есть
|
||
if on_event:
|
||
on_event(stream_event)
|
||
|
||
except json.JSONDecodeError as e:
|
||
# Не JSON строка - возвращаем как текст
|
||
logger.debug(f"Не JSON строка: {line[:100]}...")
|
||
extracted_text += line + "\n"
|
||
if on_chunk:
|
||
await on_chunk(line + "\n")
|
||
else:
|
||
# Обычный текст (не JSON) - например, приветственное сообщение
|
||
extracted_text += line + "\n"
|
||
if on_chunk:
|
||
await on_chunk(line + "\n")
|
||
|
||
return extracted_text
|
||
|
||
def _parse_output(self, output: str) -> str:
|
||
"""
|
||
Распарсить JSON вывод qwen-code.
|
||
Если вывод не JSON — вернуть как есть.
|
||
"""
|
||
# Пока просто возвращаем очищенный вывод
|
||
# В будущем можно парсить JSON stream-format
|
||
lines = output.split('\n')
|
||
cleaned = []
|
||
|
||
for line in lines:
|
||
# Убираем служебные сообщения
|
||
if line.strip() and not line.startswith('{'):
|
||
cleaned.append(line)
|
||
|
||
return '\n'.join(cleaned) if cleaned else output
|
||
|
||
|
||
class GigaChatProvider:
|
||
"""
|
||
AI-провайдер для работы с GigaChat API.
|
||
|
||
Альтернатива Qwen Code для генерации ответов.
|
||
Использует GigaChatTool для взаимодействия с API Сбера.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._tool = None
|
||
self._initialized = False
|
||
self._config_error: Optional[str] = None
|
||
|
||
def _ensure_initialized(self):
|
||
"""Ленивая инициализация инструмента"""
|
||
if self._initialized:
|
||
return
|
||
|
||
try:
|
||
from bot.tools.gigachat_tool import create_gigachat_tool
|
||
self._tool = create_gigachat_tool()
|
||
|
||
if not self._tool:
|
||
self._config_error = "GigaChat не настроен. Проверьте GIGACHAT_CLIENT_ID и GIGACHAT_CLIENT_SECRET в .env"
|
||
logger.warning(self._config_error)
|
||
else:
|
||
logger.info("GigaChatProvider инициализирован")
|
||
except ImportError as e:
|
||
self._config_error = f"Ошибка импорта GigaChat: {e}"
|
||
logger.error(self._config_error)
|
||
except Exception as e:
|
||
self._config_error = f"Ошибка инициализации GigaChat: {e}"
|
||
logger.error(self._config_error)
|
||
|
||
self._initialized = True
|
||
|
||
async def chat(
|
||
self,
|
||
prompt: str,
|
||
system_prompt: Optional[str] = None,
|
||
temperature: float = 0.7,
|
||
max_tokens: int = 2000,
|
||
on_chunk: Optional[Callable[[str], Any]] = None,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Отправка запроса к GigaChat API
|
||
|
||
Args:
|
||
prompt: Запрос пользователя
|
||
system_prompt: Системный промпт (роль ассистента)
|
||
temperature: Температура генерации
|
||
max_tokens: Максимум токенов в ответе
|
||
on_chunk: Callback для потоковой отправки (не используется, GigaChat отдаёт целиком)
|
||
|
||
Returns:
|
||
Dict с полями:
|
||
- success: bool
|
||
- content: str - текст ответа
|
||
- error: str - ошибка если есть
|
||
- model: str - использованная модель
|
||
- usage: dict - статистика токенов
|
||
"""
|
||
self._ensure_initialized()
|
||
|
||
if not self._tool:
|
||
return {
|
||
"success": False,
|
||
"error": self._config_error or "GigaChat не инициализирован",
|
||
"content": "",
|
||
}
|
||
|
||
try:
|
||
from bot.tools.gigachat_tool import GigaChatMessage
|
||
|
||
# Формируем сообщения
|
||
# ВАЖНО: prompt уже содержит весь контекст (system_prompt + summary + memory + history + запрос)
|
||
# Поэтому system_prompt отдельно НЕ добавляем
|
||
messages = [
|
||
GigaChatMessage(role="user", content=prompt),
|
||
]
|
||
|
||
# Вызываем GigaChat API
|
||
response = await self._tool.chat(
|
||
messages=messages,
|
||
temperature=temperature,
|
||
max_tokens=max_tokens,
|
||
use_history=False, # Не используем встроенную историю — у нас своя
|
||
user_id="telegram-bot",
|
||
)
|
||
|
||
# Проверяем наличие ошибки в ответе
|
||
if response.get("error"):
|
||
logger.error(f"GigaChat API error: {response['error']}")
|
||
return {
|
||
"success": False,
|
||
"error": response["error"],
|
||
"content": "",
|
||
}
|
||
|
||
# Потоковая отправка если есть callback
|
||
if on_chunk and response.get("content"):
|
||
await on_chunk(response["content"])
|
||
|
||
return {
|
||
"success": True,
|
||
"content": response.get("content", ""),
|
||
"model": response.get("model", "GigaChat-Pro"),
|
||
"usage": response.get("usage", {}),
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка GigaChat API: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"content": "",
|
||
}
|
||
|
||
def clear_session(self):
|
||
"""Очистка сессии (истории чата)"""
|
||
if self._tool:
|
||
self._tool.clear_history()
|
||
|
||
def is_available(self) -> bool:
|
||
"""Проверка доступности провайдера"""
|
||
self._ensure_initialized()
|
||
return self._tool is not None
|
||
|
||
def get_error(self) -> Optional[str]:
|
||
"""Получение ошибки инициализации"""
|
||
self._ensure_initialized()
|
||
return self._config_error
|
||
|
||
|
||
# Глобальный менеджер
|
||
qwen_manager = QwenCodeManager()
|
||
|
||
# Глобальный GigaChat провайдер
|
||
gigachat_provider = GigaChatProvider()
|