telegram-cli-bot/bot/providers/qwen_provider.py

231 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Qwen Code AI Provider - адаптер Qwen Code для работы с инструментами.
Реализует интерфейс BaseAIProvider для единой работы с инструментами
независимо от AI-провайдера.
"""
import logging
import re
import json
from typing import Optional, Dict, Any, Callable, List
from bot.base_ai_provider import (
BaseAIProvider,
ProviderResponse,
AIMessage,
ToolCall,
ToolCallStatus,
)
logger = logging.getLogger(__name__)
class QwenCodeProvider(BaseAIProvider):
"""
Qwen Code AI Provider с нативной поддержкой инструментов.
Использует Qwen Code CLI с потоковым выводом и парсингом tool calls.
"""
def __init__(self, qwen_manager):
self._qwen_manager = qwen_manager
@property
def provider_name(self) -> str:
return "Qwen Code"
@property
def supports_tools(self) -> bool:
return True
@property
def supports_streaming(self) -> bool:
return True
def is_available(self) -> bool:
"""Qwen Code всегда доступен (локальный CLI)."""
return True
def _parse_qwen_result(self, raw_result: str) -> tuple[str, List[ToolCall]]:
"""
Распарсить результат от Qwen Code.
Извлекает текст и вызовы инструментов из stream-json вывода.
Returns:
(content, tool_calls)
"""
content_parts = []
tool_calls = []
# Пытаемся распарсить JSON
try:
# Qwen может возвращать как单个 JSON так и несколько JSON lines
lines = raw_result.strip().split('\n')
for line in lines:
line = line.strip()
if not line:
continue
# Пробуем распарсить как JSON
try:
data = json.loads(line)
# Обрабатываем разные типы событий
event_type = data.get('type') or data.get('event_type')
if event_type == 'assistant' or 'content' in data:
content = data.get('content') or data.get('message', '')
if content:
content_parts.append(content)
# Ищем tool calls
if 'tool_calls' in data or 'tool_use' in data:
tool_calls_data = data.get('tool_calls') or data.get('tool_use', [])
for tc in tool_calls_data:
tool_calls.append(ToolCall(
tool_name=tc.get('name') or tc.get('tool_name', ''),
tool_args=tc.get('arguments') or tc.get('args', {}),
tool_call_id=tc.get('id') or tc.get('tool_call_id')
))
except json.JSONDecodeError:
# Не JSON - считаем текстом
if line.strip():
content_parts.append(line)
except Exception as e:
logger.warning(f"Ошибка парсинга Qwen результата: {e}")
# Фоллбэк: ищем текст в кавычках
text_matches = re.findall(r'"text":"([^"]+)"', raw_result)
if text_matches:
content_parts.extend(text_matches)
# Собираем контент
content = ' '.join(content_parts).replace('\\n', '\n').strip()
return content, tool_calls
async def chat(
self,
prompt: str,
system_prompt: Optional[str] = None,
context: Optional[List[Dict[str, str]]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
on_chunk: Optional[Callable[[str], Any]] = None,
user_id: Optional[int] = None,
**kwargs
) -> ProviderResponse:
"""
Отправить запрос Qwen Code.
Args:
prompt: Запрос пользователя
system_prompt: Системный промпт
context: История диалога
tools: Доступные инструменты (схема) - пока не используется
on_chunk: Callback для потокового вывода
user_id: ID пользователя
**kwargs: Дополнительные параметры
Returns:
ProviderResponse с ответом и возможными вызовами инструментов
"""
if not self._qwen_manager:
return ProviderResponse(
success=False,
error="Qwen менеджер не инициализирован",
provider_name=self.provider_name
)
if user_id is None:
return ProviderResponse(
success=False,
error="user_id обязателен для Qwen Code",
provider_name=self.provider_name
)
try:
# Формируем полный промпт
full_prompt = prompt or ""
if system_prompt and kwargs.get('use_system_prompt', True):
full_prompt = f"{system_prompt}\n\n{full_prompt}"
# Добавляем контекст если есть
if context:
context_text = "\n".join([
f"{msg.get('role', 'user')}: {msg.get('content', '')}"
for msg in context
])
full_prompt = f"{context_text}\n\n{full_prompt}"
# Выполняем через Qwen Manager
output_buffer = []
def on_output(text: str):
output_buffer.append(text)
def on_chunk_wrapper(text: str):
if on_chunk:
on_chunk(text)
result = await self._qwen_manager.run_task(
user_id=user_id,
task=full_prompt,
on_output=on_output,
on_oauth_url=lambda x: None,
use_system_prompt=False, # Уже добавили в full_prompt
on_chunk=on_chunk_wrapper,
on_event=None
)
# Парсим результат
content, tool_calls = self._parse_qwen_result(result)
if not content and not tool_calls:
# Если ничего не распарсили, возвращаем сырой результат
content = result
return ProviderResponse(
success=True,
message=AIMessage(
content=content,
tool_calls=tool_calls,
metadata={"raw_result": result}
),
provider_name=self.provider_name
)
except Exception as e:
logger.error(f"Ошибка Qwen Code провайдера: {e}")
return ProviderResponse(
success=False,
error=str(e),
provider_name=self.provider_name
)
async def execute_tool(
self,
tool_name: str,
tool_args: Dict[str, Any],
tool_call_id: Optional[str] = None,
**kwargs
) -> ToolCall:
"""
Выполнить инструмент (заглушка).
Qwen Code не выполняет инструменты напрямую - это делает
AIProviderManager через process_with_tools.
"""
return ToolCall(
tool_name=tool_name,
tool_args=tool_args,
tool_call_id=tool_call_id,
status=ToolCallStatus.PENDING
)