From 1c66bc4c01986291e92ade2ed9ae94498cc4aa80 Mon Sep 17 00:00:00 2001 From: mirivlad Date: Thu, 26 Feb 2026 23:25:47 +0800 Subject: [PATCH] =?UTF-8?q?v0.7.1:=20=D0=A3=D0=BD=D0=B8=D0=B2=D0=B5=D1=80?= =?UTF-8?q?=D1=81=D0=B0=D0=BB=D1=8C=D0=BD=D1=8B=D0=B9=20=D0=B8=D0=BD=D1=82?= =?UTF-8?q?=D0=B5=D1=80=D1=84=D0=B5=D0=B9=D1=81=20AI-=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D0=B2=D0=B0=D0=B9=D0=B4=D0=B5=D1=80=D0=BE=D0=B2=20=D1=81=20?= =?UTF-8?q?=D0=BF=D0=BE=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=BE=D0=B9=20?= =?UTF-8?q?=D0=B8=D0=BD=D1=81=D1=82=D1=80=D1=83=D0=BC=D0=B5=D0=BD=D1=82?= =?UTF-8?q?=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Добавлен базовый класс BaseAIProvider с единым интерфейсом - Реализованы QwenCodeProvider и GigaChatProvider - AIProviderManager использует единый процесс с инструментами - Любой AI-провайдер теперь может работать с инструментами - Поддержка process_with_tools для всех провайдеров Co-authored-by: Qwen-Coder --- bot/ai_provider_manager.py | 130 ++++++------- bot/base_ai_provider.py | 300 +++++++++++++++++++++++++++++ bot/providers/__init__.py | 14 ++ bot/providers/gigachat_provider.py | 292 ++++++++++++++++++++++++++++ bot/providers/qwen_provider.py | 230 ++++++++++++++++++++++ 5 files changed, 901 insertions(+), 65 deletions(-) create mode 100644 bot/base_ai_provider.py create mode 100644 bot/providers/__init__.py create mode 100644 bot/providers/gigachat_provider.py create mode 100644 bot/providers/qwen_provider.py diff --git a/bot/ai_provider_manager.py b/bot/ai_provider_manager.py index 720dadb..2031737 100644 --- a/bot/ai_provider_manager.py +++ b/bot/ai_provider_manager.py @@ -5,6 +5,8 @@ AI Provider Manager - управление переключением между Поддерживаемые провайдеры: - qwen: Qwen Code CLI (основной) - gigachat: GigaChat API (Сбер) + +Использует единый интерфейс BaseAIProvider для всех провайдеров. """ import logging @@ -12,6 +14,8 @@ from typing import Optional, Dict, Any, Callable, List from dataclasses import dataclass from enum import Enum +from bot.base_ai_provider import BaseAIProvider, ProviderResponse + logger = logging.getLogger(__name__) @@ -36,17 +40,44 @@ class AIProviderManager: Менеджер управления AI-провайдерами. Позволяет переключаться между провайдерами и выполнять запросы - через активного провайдера. + через активного провайдера с поддержкой инструментов. """ def __init__(self, qwen_manager=None, gigachat_provider=None): self._qwen_manager = qwen_manager self._gigachat_provider = gigachat_provider self._provider_status: Dict[str, bool] = {} + self._providers: Dict[str, BaseAIProvider] = {} + self._tools_registry: Dict[str, Any] = {} + + # Инициализируем провайдеров + self._init_providers() # Проверяем доступность провайдеров при инициализации self._check_provider_status() + def _init_providers(self): + """Инициализировать AI-провайдеров.""" + # Qwen Code Provider + if self._qwen_manager: + from bot.providers.qwen_provider import QwenCodeProvider + self._providers[AIProvider.QWEN.value] = QwenCodeProvider(self._qwen_manager) + logger.info("Qwen Code Provider инициализирован") + + # GigaChat Provider + if self._gigachat_provider: + from bot.providers.gigachat_provider import GigaChatProvider + self._providers[AIProvider.GIGACHAT.value] = GigaChatProvider(self._gigachat_provider) + logger.info("GigaChat Provider инициализирован") + + def set_tools_registry(self, tools_registry: Dict[str, Any]): + """Установить реестр инструментов для всех провайдеров.""" + self._tools_registry = tools_registry + + def get_provider(self, provider_id: str) -> Optional[BaseAIProvider]: + """Получить экземпляр провайдера.""" + return self._providers.get(provider_id) + def _check_provider_status(self): """Проверка доступности провайдеров.""" # Проверяем Qwen @@ -134,10 +165,11 @@ class AIProviderManager: on_output: Optional[Callable[[str], Any]] = None, on_chunk: Optional[Callable[[str], Any]] = None, on_event: Optional[Callable[[Any], Any]] = None, - context: Optional[Dict] = None + context: Optional[List[Dict[str, str]]] = None, + use_tools: bool = True ) -> Dict[str, Any]: """ - Выполнить запрос через указанного провайдера. + Выполнить запрос через указанного провайдера с поддержкой инструментов. Args: provider_id: ID провайдера @@ -147,7 +179,8 @@ class AIProviderManager: on_output: Callback для вывода on_chunk: Callback для потокового вывода on_event: Callback для событий - context: Дополнительный контекст + context: История диалога + use_tools: Использовать ли инструменты Returns: Dict с результатом: @@ -157,77 +190,44 @@ class AIProviderManager: - provider: str - metadata: dict """ + provider = self._providers.get(provider_id) + + if not provider: + return { + "success": False, + "error": f"Провайдер {provider_id} не найден", + "provider": provider_id + } + try: - if provider_id == AIProvider.QWEN.value: - if not self._qwen_manager: - return { - "success": False, - "error": "Qwen менеджер не инициализирован", - "provider": provider_id - } - - # Выполняем через Qwen - result = await self._qwen_manager.run_task( - user_id=user_id, - task=prompt, - on_output=on_output or (lambda x: None), - on_oauth_url=lambda x: None, - use_system_prompt=False, - on_chunk=on_chunk, - on_event=on_event - ) - - # Извлекаем текст из результата - import re - text_matches = re.findall(r'"text":"([^"]+)"', result) - content = " ".join(text_matches).replace("\\n", "\n") if text_matches else result - + # Используем универсальный метод process_with_tools + response = await provider.process_with_tools( + prompt=prompt, + system_prompt=system_prompt, + context=context, + tools_registry=self._tools_registry if use_tools else None, + on_chunk=on_chunk, + user_id=user_id + ) + + if response.success: return { "success": True, - "content": content, + "content": response.message.content if response.message else "", "provider": provider_id, - "metadata": {"raw_result": result} + "metadata": { + "provider_name": response.provider_name, + "usage": response.usage, + "tool_calls": len(response.message.tool_calls) if response.message and response.message.tool_calls else 0 + } } - - elif provider_id == AIProvider.GIGACHAT.value: - if not self._gigachat_provider: - return { - "success": False, - "error": "GigaChat провайдер не инициализирован", - "provider": provider_id - } - - # Выполняем через GigaChat - result = await self._gigachat_provider.chat( - prompt=prompt, - system_prompt=system_prompt, - on_chunk=on_chunk - ) - - if result.get("success"): - return { - "success": True, - "content": result.get("content", ""), - "provider": provider_id, - "metadata": { - "model": result.get("model", "GigaChat-Pro"), - "usage": result.get("usage", {}) - } - } - else: - return { - "success": False, - "error": result.get("error", "Неизвестная ошибка GigaChat"), - "provider": provider_id - } - else: return { "success": False, - "error": f"Неизвестный провайдер: {provider_id}", + "error": response.error, "provider": provider_id } - + except Exception as e: logger.error(f"Ошибка выполнения запроса через {provider_id}: {e}") return { diff --git a/bot/base_ai_provider.py b/bot/base_ai_provider.py new file mode 100644 index 0000000..ea29902 --- /dev/null +++ b/bot/base_ai_provider.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 +""" +Base AI Provider Protocol - универсальный интерфейс для всех AI-провайдеров. + +Определяет общий протокол который должен реализовать каждый AI-провайдер +для работы с инструментами (tools). +""" + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, Callable, List, AsyncGenerator +from dataclasses import dataclass, field +from enum import Enum + + +class ToolCallStatus(Enum): + """Статус выполнения инструмента.""" + SUCCESS = "success" + ERROR = "error" + PENDING = "pending" + + +@dataclass +class ToolCall: + """Вызов инструмента.""" + tool_name: str + tool_args: Dict[str, Any] + tool_call_id: Optional[str] = None + status: ToolCallStatus = ToolCallStatus.PENDING + result: Optional[Any] = None + error: Optional[str] = None + + +@dataclass +class AIMessage: + """Сообщение от AI-провайдера.""" + content: str + tool_calls: List[ToolCall] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + is_streaming: bool = False + + +@dataclass +class ProviderResponse: + """Ответ от AI-провайдера.""" + success: bool + message: Optional[AIMessage] = None + error: Optional[str] = None + provider_name: str = "" + usage: Optional[Dict[str, Any]] = None + raw_response: Optional[Any] = None + + +class BaseAIProvider(ABC): + """ + Базовый класс для всех AI-провайдеров. + + Каждый провайдер (Qwen, GigaChat, OpenAI, etc.) должен реализовать + этот интерфейс для поддержки инструментов и единого формата ответов. + """ + + @property + @abstractmethod + def provider_name(self) -> str: + """Название провайдера (например, 'Qwen Code', 'GigaChat').""" + pass + + @property + @abstractmethod + def supports_tools(self) -> bool: + """Поддерживает ли провайдер инструменты нативно.""" + pass + + @property + @abstractmethod + def supports_streaming(self) -> bool: + """Поддерживает ли провайдер потоковый вывод.""" + pass + + @abstractmethod + async def chat( + self, + prompt: str, + system_prompt: Optional[str] = None, + context: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, + on_chunk: Optional[Callable[[str], Any]] = None, + **kwargs + ) -> ProviderResponse: + """ + Отправить запрос AI-провайдеру. + + Args: + prompt: Запрос пользователя + system_prompt: Системный промпт + context: История диалога + tools: Доступные инструменты (схема) + on_chunk: Callback для потокового вывода + **kwargs: Дополнительные параметры + + Returns: + ProviderResponse с ответом и возможными вызовами инструментов + """ + pass + + @abstractmethod + async def execute_tool( + self, + tool_name: str, + tool_args: Dict[str, Any], + tool_call_id: Optional[str] = None, + **kwargs + ) -> ToolCall: + """ + Выполнить инструмент (если провайдер поддерживает нативно). + + Для провайдеров без нативной поддержки инструментов, + этот метод может быть заглушкой. + + Args: + tool_name: Имя инструмента + tool_args: Аргументы инструмента + tool_call_id: ID вызова + + Returns: + ToolCall с результатом выполнения + """ + pass + + def is_available(self) -> bool: + """Проверить доступность провайдера.""" + return True + + def get_tools_schema(self, tools_registry: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Получить схему инструментов для промпта. + + По умолчанию возвращает описание всех доступных инструментов. + Провайдеры могут переопределить для кастомизации. + + Args: + tools_registry: Словарь инструментов {name: tool_instance} + + Returns: + Список схем инструментов + """ + schema = [] + for name, tool in tools_registry.items(): + if hasattr(tool, 'get_schema'): + schema.append(tool.get_schema()) + elif hasattr(tool, 'description'): + schema.append({ + "name": name, + "description": tool.description, + "parameters": getattr(tool, 'parameters', {}) + }) + return schema + + async def process_with_tools( + self, + prompt: str, + system_prompt: Optional[str] = None, + context: Optional[List[Dict[str, str]]] = None, + tools_registry: Optional[Dict[str, Any]] = None, + on_chunk: Optional[Callable[[str], Any]] = None, + max_iterations: int = 5, + **kwargs + ) -> ProviderResponse: + """ + Универсальный метод для обработки запросов с инструментами. + + Реализует цикл: + 1. Отправить запрос провайдеру + 2. Если есть вызовы инструментов - выполнить их + 3. Отправить результаты обратно провайдеру + 4. Повторить пока не будет финального ответа + + Args: + prompt: Запрос пользователя + system_prompt: Системный промпт + context: История диалога + tools_registry: Словарь инструментов + on_chunk: Callback для потокового вывода + max_iterations: Максимум итераций цикла + + Returns: + ProviderResponse с финальным ответом + """ + if not tools_registry: + # Без инструментов - простой запрос + return await self.chat( + prompt=prompt, + system_prompt=system_prompt, + context=context, + on_chunk=on_chunk, + **kwargs + ) + + messages = [] + if context: + messages.extend(context) + + messages.append({"role": "user", "content": prompt}) + + tools_schema = self.get_tools_schema(tools_registry) if self.supports_tools else None + + for iteration in range(max_iterations): + # Отправляем запрос провайдеру + response = await self.chat( + prompt=None, # Уже в messages + system_prompt=system_prompt, + context=messages if iteration == 0 else None, + tools=tools_schema, + on_chunk=on_chunk, + **kwargs + ) + + if not response.success: + return response + + message = response.message + if not message: + return ProviderResponse( + success=False, + error="Пустой ответ от провайдера", + provider_name=self.provider_name + ) + + # Если нет вызовов инструментов - возвращаем ответ + if not message.tool_calls: + return response + + # Выполняем инструменты + tool_results = [] + for tool_call in message.tool_calls: + if tool_call.tool_name in tools_registry: + tool = tools_registry[tool_call.tool_name] + try: + if hasattr(tool, 'execute'): + result = await tool.execute( + **tool_call.tool_args, + user_id=kwargs.get('user_id') + ) + elif hasattr(tool, '__call__'): + result = await tool(**tool_call.tool_args) + else: + result = f"Инструмент {tool_call.tool_name} не имеет метода execute" + + tool_call.result = result + tool_call.status = ToolCallStatus.SUCCESS + except Exception as e: + tool_call.error = str(e) + tool_call.status = ToolCallStatus.ERROR + result = f"Ошибка: {e}" + + tool_results.append({ + "tool": tool_call.tool_name, + "args": tool_call.tool_args, + "result": result, + "status": tool_call.status.value + }) + else: + tool_call.error = f"Инструмент {tool_call.tool_name} не найден" + tool_call.status = ToolCallStatus.ERROR + tool_results.append({ + "tool": tool_call.tool_name, + "error": tool_call.error + }) + + # Добавляем результаты в контекст для следующей итерации + messages.append({ + "role": "assistant", + "content": message.content, + "tool_calls": [ + { + "id": tc.tool_call_id, + "name": tc.tool_name, + "arguments": tc.tool_args + } + for tc in message.tool_calls + ] + }) + + messages.append({ + "role": "tool", + "content": str(tool_results) + }) + + # Обновляем системный промпт для следующей итерации + system_prompt = system_prompt or "" + + # Достигли максимума итераций + return ProviderResponse( + success=True, + message=AIMessage( + content=message.content + "\n\n[Достигнут максимум итераций выполнения инструментов]", + metadata={"iterations": max_iterations} + ), + provider_name=self.provider_name, + usage=response.usage + ) diff --git a/bot/providers/__init__.py b/bot/providers/__init__.py new file mode 100644 index 0000000..212fc79 --- /dev/null +++ b/bot/providers/__init__.py @@ -0,0 +1,14 @@ +""" +AI Providers - адаптеры для различных AI-провайдеров. + +Каждый провайдер реализует интерфейс BaseAIProvider для единой работы +с инструментами и контекстом. +""" + +from bot.providers.qwen_provider import QwenCodeProvider +from bot.providers.gigachat_provider import GigaChatProvider + +__all__ = [ + "QwenCodeProvider", + "GigaChatProvider", +] diff --git a/bot/providers/gigachat_provider.py b/bot/providers/gigachat_provider.py new file mode 100644 index 0000000..2a772bc --- /dev/null +++ b/bot/providers/gigachat_provider.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +""" +GigaChat AI Provider - адаптер GigaChat для работы с инструментами. + +Реализует интерфейс BaseAIProvider для единой работы с инструментами +независимо от AI-провайдера. +""" + +import logging +from typing import Optional, Dict, Any, Callable, List +import json +import re + +from bot.base_ai_provider import ( + BaseAIProvider, + ProviderResponse, + AIMessage, + ToolCall, + ToolCallStatus, +) +from bot.tools.gigachat_tool import GigaChatTool, GigaChatMessage, GigaChatConfig + +logger = logging.getLogger(__name__) + + +class GigaChatProvider(BaseAIProvider): + """ + GigaChat AI Provider с поддержкой инструментов. + + Использует эвристики для извлечения вызовов инструментов из текста, + так как GigaChat не поддерживает нативные tool calls. + """ + + def __init__(self, config: Optional[GigaChatConfig] = None): + self._tool = GigaChatTool(config) + self._available: Optional[bool] = None + + @property + def provider_name(self) -> str: + return "GigaChat" + + @property + def supports_tools(self) -> bool: + # GigaChat не поддерживает нативные tool calls + # Но мы эмулируем через парсинг текста + return True + + @property + def supports_streaming(self) -> bool: + return False + + def is_available(self) -> bool: + """Проверить доступность GigaChat.""" + if self._available is not None: + return self._available + + # Проверяем наличие токенов + try: + import os + client_id = os.getenv("GIGACHAT_CLIENT_ID") + client_secret = os.getenv("GIGACHAT_CLIENT_SECRET") + + self._available = bool(client_id and client_secret) + + if not self._available: + logger.warning("GigaChat недоступен: не настроены GIGACHAT_CLIENT_ID или GIGACHAT_CLIENT_SECRET") + else: + logger.info("GigaChat доступен") + except Exception as e: + self._available = False + logger.error(f"Ошибка проверки доступности GigaChat: {e}") + + return self._available + + def get_tools_schema(self, tools_registry: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Получить схему инструментов для промпта. + + Формирует описание инструментов в формате понятном GigaChat. + """ + schema = [] + for name, tool in tools_registry.items(): + if hasattr(tool, 'get_schema'): + tool_schema = tool.get_schema() + schema.append({ + "name": name, + "description": tool_schema.get("description", ""), + "parameters": tool_schema.get("parameters", {}) + }) + elif hasattr(tool, 'description'): + schema.append({ + "name": name, + "description": tool.description, + "parameters": getattr(tool, 'parameters', {}) + }) + + return schema + + def _build_tools_prompt(self, tools_schema: List[Dict[str, Any]]) -> str: + """ + Построить текстовое описание инструментов для промпта. + + GigaChat не поддерживает нативные tool calls, поэтому описываем + инструменты в тексте и просим модель использовать специальный формат. + """ + if not tools_schema: + return "" + + prompt_parts = [ + "\n\n🛠️ ДОСТУПНЫЕ ИНСТРУМЕНТЫ:", + "Ты можешь использовать следующие инструменты. Для вызова инструмента используй формат:", + "```tool", + '{"name": "имя_инструмента", "arguments": {аргументы}}', + '```', + "", + "Список инструментов:" + ] + + for tool in tools_schema: + name = tool.get("name", "unknown") + desc = tool.get("description", "Нет описания") + params = tool.get("parameters", {}) + + prompt_parts.append(f"\n**{name}**") + prompt_parts.append(f"Описание: {desc}") + if params: + prompt_parts.append(f"Параметры: {json.dumps(params, ensure_ascii=False)}") + + prompt_parts.extend([ + "", + "После вызова инструмента ты получишь результат и сможешь продолжить ответ." + ]) + + return "\n".join(prompt_parts) + + def _parse_tool_calls(self, content: str) -> List[ToolCall]: + """ + Извлечь вызовы инструментов из текста ответа. + + Ищет блоки вида: + ```tool + {"name": "ssh_tool", "arguments": {"command": "df -h"}} + ``` + """ + tool_calls = [] + + # Ищем блоки ```tool {...}``` + pattern = r'```tool\s*\n({.*?})\s*\n```' + matches = re.findall(pattern, content, re.DOTALL) + + for match in matches: + try: + tool_data = json.loads(match) + tool_name = tool_data.get("name") + tool_args = tool_data.get("arguments", {}) + + if tool_name: + tool_calls.append(ToolCall( + tool_name=tool_name, + tool_args=tool_args, + tool_call_id=f"gc_{len(tool_calls)}" + )) + except json.JSONDecodeError as e: + logger.warning(f"Ошибка парсинга tool call: {e}") + + return tool_calls + + def _remove_tool_blocks(self, content: str) -> str: + """Удалить блоки вызовов инструментов из текста.""" + pattern = r'```tool\s*\n\{.*?\}\s*\n```' + return re.sub(pattern, '', content, flags=re.DOTALL).strip() + + async def chat( + self, + prompt: str, + system_prompt: Optional[str] = None, + context: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, + on_chunk: Optional[Callable[[str], Any]] = None, + user_id: Optional[int] = None, + **kwargs + ) -> ProviderResponse: + """ + Отправить запрос GigaChat. + + Args: + prompt: Запрос пользователя + system_prompt: Системный промпт + context: История диалога + tools: Доступные инструменты (схема) + on_chunk: Callback для потокового вывода (не используется) + user_id: ID пользователя + **kwargs: Дополнительные параметры + + Returns: + ProviderResponse с ответом и возможными вызовами инструментов + """ + try: + # Формируем системный промпт с инструментами + full_system_prompt = system_prompt or "" + + if tools: + tools_prompt = self._build_tools_prompt(tools) + full_system_prompt += tools_prompt + + # Формируем сообщения + messages = [] + + if full_system_prompt: + messages.append(GigaChatMessage(role="system", content=full_system_prompt)) + + if context: + for msg in context: + role = msg.get("role", "user") + content = msg.get("content", "") + if role in ("user", "assistant", "system"): + messages.append(GigaChatMessage(role=role, content=content)) + + if prompt: + messages.append(GigaChatMessage(role="user", content=prompt)) + + # Выполняем запрос + result = await self._tool.chat( + messages=messages, + user_id=str(user_id) if user_id else None, + temperature=kwargs.get("temperature", 0.7), + max_tokens=kwargs.get("max_tokens", 2000), + ) + + if not result.get("content"): + if result.get("error"): + return ProviderResponse( + success=False, + error=result["error"], + provider_name=self.provider_name + ) + else: + return ProviderResponse( + success=False, + error="Пустой ответ от GigaChat", + provider_name=self.provider_name + ) + + content = result["content"] + + # Парсим вызовы инструментов + tool_calls = self._parse_tool_calls(content) + + # Очищаем контент от блоков инструментов + clean_content = self._remove_tool_blocks(content) + + return ProviderResponse( + success=True, + message=AIMessage( + content=clean_content, + tool_calls=tool_calls, + metadata={ + "model": result.get("model", "GigaChat"), + "usage": result.get("usage", {}) + } + ), + provider_name=self.provider_name, + usage=result.get("usage") + ) + + except Exception as e: + logger.error(f"Ошибка GigaChat провайдера: {e}") + return ProviderResponse( + success=False, + error=str(e), + provider_name=self.provider_name + ) + + async def execute_tool( + self, + tool_name: str, + tool_args: Dict[str, Any], + tool_call_id: Optional[str] = None, + **kwargs + ) -> ToolCall: + """ + Выполнить инструмент (заглушка). + + GigaChat не выполняет инструменты напрямую - это делает + AIProviderManager через process_with_tools. + """ + return ToolCall( + tool_name=tool_name, + tool_args=tool_args, + tool_call_id=tool_call_id, + status=ToolCallStatus.PENDING + ) diff --git a/bot/providers/qwen_provider.py b/bot/providers/qwen_provider.py new file mode 100644 index 0000000..9b76d80 --- /dev/null +++ b/bot/providers/qwen_provider.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +""" +Qwen Code AI Provider - адаптер Qwen Code для работы с инструментами. + +Реализует интерфейс BaseAIProvider для единой работы с инструментами +независимо от AI-провайдера. +""" + +import logging +import re +import json +from typing import Optional, Dict, Any, Callable, List + +from bot.base_ai_provider import ( + BaseAIProvider, + ProviderResponse, + AIMessage, + ToolCall, + ToolCallStatus, +) + +logger = logging.getLogger(__name__) + + +class QwenCodeProvider(BaseAIProvider): + """ + Qwen Code AI Provider с нативной поддержкой инструментов. + + Использует Qwen Code CLI с потоковым выводом и парсингом tool calls. + """ + + def __init__(self, qwen_manager): + self._qwen_manager = qwen_manager + + @property + def provider_name(self) -> str: + return "Qwen Code" + + @property + def supports_tools(self) -> bool: + return True + + @property + def supports_streaming(self) -> bool: + return True + + def is_available(self) -> bool: + """Qwen Code всегда доступен (локальный CLI).""" + return True + + def _parse_qwen_result(self, raw_result: str) -> tuple[str, List[ToolCall]]: + """ + Распарсить результат от Qwen Code. + + Извлекает текст и вызовы инструментов из stream-json вывода. + + Returns: + (content, tool_calls) + """ + content_parts = [] + tool_calls = [] + + # Пытаемся распарсить JSON + try: + # Qwen может возвращать как单个 JSON так и несколько JSON lines + lines = raw_result.strip().split('\n') + + for line in lines: + line = line.strip() + if not line: + continue + + # Пробуем распарсить как JSON + try: + data = json.loads(line) + + # Обрабатываем разные типы событий + event_type = data.get('type') or data.get('event_type') + + if event_type == 'assistant' or 'content' in data: + content = data.get('content') or data.get('message', '') + if content: + content_parts.append(content) + + # Ищем tool calls + if 'tool_calls' in data or 'tool_use' in data: + tool_calls_data = data.get('tool_calls') or data.get('tool_use', []) + for tc in tool_calls_data: + tool_calls.append(ToolCall( + tool_name=tc.get('name') or tc.get('tool_name', ''), + tool_args=tc.get('arguments') or tc.get('args', {}), + tool_call_id=tc.get('id') or tc.get('tool_call_id') + )) + + except json.JSONDecodeError: + # Не JSON - считаем текстом + if line.strip(): + content_parts.append(line) + + except Exception as e: + logger.warning(f"Ошибка парсинга Qwen результата: {e}") + + # Фоллбэк: ищем текст в кавычках + text_matches = re.findall(r'"text":"([^"]+)"', raw_result) + if text_matches: + content_parts.extend(text_matches) + + # Собираем контент + content = ' '.join(content_parts).replace('\\n', '\n').strip() + + return content, tool_calls + + async def chat( + self, + prompt: str, + system_prompt: Optional[str] = None, + context: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, + on_chunk: Optional[Callable[[str], Any]] = None, + user_id: Optional[int] = None, + **kwargs + ) -> ProviderResponse: + """ + Отправить запрос Qwen Code. + + Args: + prompt: Запрос пользователя + system_prompt: Системный промпт + context: История диалога + tools: Доступные инструменты (схема) - пока не используется + on_chunk: Callback для потокового вывода + user_id: ID пользователя + **kwargs: Дополнительные параметры + + Returns: + ProviderResponse с ответом и возможными вызовами инструментов + """ + if not self._qwen_manager: + return ProviderResponse( + success=False, + error="Qwen менеджер не инициализирован", + provider_name=self.provider_name + ) + + if user_id is None: + return ProviderResponse( + success=False, + error="user_id обязателен для Qwen Code", + provider_name=self.provider_name + ) + + try: + # Формируем полный промпт + full_prompt = prompt or "" + + if system_prompt and kwargs.get('use_system_prompt', True): + full_prompt = f"{system_prompt}\n\n{full_prompt}" + + # Добавляем контекст если есть + if context: + context_text = "\n".join([ + f"{msg.get('role', 'user')}: {msg.get('content', '')}" + for msg in context + ]) + full_prompt = f"{context_text}\n\n{full_prompt}" + + # Выполняем через Qwen Manager + output_buffer = [] + + def on_output(text: str): + output_buffer.append(text) + + def on_chunk_wrapper(text: str): + if on_chunk: + on_chunk(text) + + result = await self._qwen_manager.run_task( + user_id=user_id, + task=full_prompt, + on_output=on_output, + on_oauth_url=lambda x: None, + use_system_prompt=False, # Уже добавили в full_prompt + on_chunk=on_chunk_wrapper, + on_event=None + ) + + # Парсим результат + content, tool_calls = self._parse_qwen_result(result) + + if not content and not tool_calls: + # Если ничего не распарсили, возвращаем сырой результат + content = result + + return ProviderResponse( + success=True, + message=AIMessage( + content=content, + tool_calls=tool_calls, + metadata={"raw_result": result} + ), + provider_name=self.provider_name + ) + + except Exception as e: + logger.error(f"Ошибка Qwen Code провайдера: {e}") + return ProviderResponse( + success=False, + error=str(e), + provider_name=self.provider_name + ) + + async def execute_tool( + self, + tool_name: str, + tool_args: Dict[str, Any], + tool_call_id: Optional[str] = None, + **kwargs + ) -> ToolCall: + """ + Выполнить инструмент (заглушка). + + Qwen Code не выполняет инструменты напрямую - это делает + AIProviderManager через process_with_tools. + """ + return ToolCall( + tool_name=tool_name, + tool_args=tool_args, + tool_call_id=tool_call_id, + status=ToolCallStatus.PENDING + )