347 lines
12 KiB
Python
347 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Opencode AI Provider - интеграция с opencode CLI.
|
||
|
||
Использует opencode run для выполнения задач с бесплатными моделями:
|
||
- opencode/minimax-m2.5-free
|
||
- opencode/big-pickle
|
||
- opencode/gpt-5-nano
|
||
|
||
Поддерживает RAG через память бота.
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import asyncio
|
||
import logging
|
||
import subprocess
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, Any, Callable, List
|
||
from dataclasses import dataclass, field
|
||
|
||
from bot.base_ai_provider import (
|
||
BaseAIProvider,
|
||
ProviderResponse,
|
||
AIMessage,
|
||
ToolCall,
|
||
ToolCallStatus,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "/home/mirivlad/.opencode/bin/opencode")
|
||
|
||
AVAILABLE_MODELS = {
|
||
"minimax": "opencode/minimax-m2.5-free",
|
||
"big_pickle": "opencode/big-pickle",
|
||
"gpt5": "opencode/gpt-5-nano",
|
||
}
|
||
|
||
DEFAULT_MODEL = "minimax"
|
||
|
||
|
||
@dataclass
|
||
class OpencodeSession:
|
||
"""Сессия пользователя с opencode."""
|
||
user_id: int
|
||
model: str = DEFAULT_MODEL
|
||
history: List[Dict[str, str]] = field(default_factory=list)
|
||
|
||
|
||
class OpencodeProvider(BaseAIProvider):
|
||
"""
|
||
Opencode AI Provider.
|
||
|
||
Использует opencode CLI для генерации ответов.
|
||
Поддерживает несколько бесплатных моделей.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._sessions: Dict[int, OpencodeSession] = {}
|
||
self._default_model = DEFAULT_MODEL
|
||
self._initialized = False
|
||
|
||
@property
|
||
def provider_name(self) -> str:
|
||
return "Opencode"
|
||
|
||
@property
|
||
def supports_tools(self) -> bool:
|
||
return True
|
||
|
||
@property
|
||
def supports_streaming(self) -> bool:
|
||
return False
|
||
|
||
def is_available(self) -> bool:
|
||
"""Проверка доступности opencode CLI."""
|
||
return Path(OPENCODE_BIN).exists()
|
||
|
||
def get_session(self, user_id: int) -> OpencodeSession:
|
||
"""Получить или создать сессию пользователя."""
|
||
if user_id not in self._sessions:
|
||
self._sessions[user_id] = OpencodeSession(
|
||
user_id=user_id,
|
||
model=self._default_model
|
||
)
|
||
return self._sessions[user_id]
|
||
|
||
def set_model(self, user_id: int, model_key: str):
|
||
"""Установить модель для пользователя."""
|
||
session = self.get_session(user_id)
|
||
if model_key in AVAILABLE_MODELS:
|
||
session.model = AVAILABLE_MODELS[model_key]
|
||
logger.info(f"User {user_id} switched to model: {session.model}")
|
||
|
||
def get_model(self, user_id: int) -> str:
|
||
"""Получить текущую модель пользователя (полное имя)."""
|
||
session = self.get_session(user_id)
|
||
# Возвращаем полное имя модели из AVAILABLE_MODELS
|
||
return AVAILABLE_MODELS.get(session.model, session.model)
|
||
|
||
def get_available_models(self) -> Dict[str, str]:
|
||
"""Получить список доступных моделей."""
|
||
return AVAILABLE_MODELS.copy()
|
||
|
||
def _build_context(
|
||
self,
|
||
system_prompt: Optional[str],
|
||
context: Optional[List[Dict[str, str]]],
|
||
memory_context: str = ""
|
||
) -> str:
|
||
"""Собрать полный контекст для opencode."""
|
||
parts = []
|
||
|
||
if system_prompt:
|
||
parts.append(f"=== SYSTEM PROMPT ===\n{system_prompt}")
|
||
|
||
if memory_context:
|
||
parts.append(f"=== MEMORY CONTEXT ===\n{memory_context}")
|
||
|
||
if context:
|
||
history_text = "\n".join([
|
||
f"{msg.get('role', 'user')}: {msg.get('content', '')}"
|
||
for msg in context
|
||
if msg.get('role') != 'system'
|
||
])
|
||
if history_text:
|
||
parts.append(f"=== CONVERSATION HISTORY ===\n{history_text}")
|
||
|
||
return "\n\n".join(parts)
|
||
|
||
async def _run_opencode(
|
||
self,
|
||
prompt: str,
|
||
model: str,
|
||
on_chunk: Optional[Callable[[str], Any]] = None
|
||
) -> str:
|
||
"""
|
||
Выполнить запрос через opencode CLI.
|
||
|
||
Args:
|
||
prompt: Запрос пользователя
|
||
model: Модель для использования
|
||
on_chunk: Callback для потокового вывода (не используется)
|
||
|
||
Returns:
|
||
Ответ от opencode
|
||
"""
|
||
try:
|
||
logger.info(f"Opencode _run_opencode: model={model}, prompt_len={len(prompt) if prompt else 0}")
|
||
|
||
# Используем stdin для передачи промпта
|
||
cmd = [
|
||
OPENCODE_BIN,
|
||
"run",
|
||
"-m", model
|
||
]
|
||
|
||
logger.info(f"Running opencode cmd: {cmd}")
|
||
|
||
# Кодируем промпт для stdin
|
||
prompt_bytes = prompt.encode('utf-8') if prompt else b''
|
||
|
||
process = await asyncio.create_subprocess_exec(
|
||
*cmd,
|
||
stdin=asyncio.subprocess.PIPE,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.STDOUT,
|
||
cwd=str(Path.home()),
|
||
)
|
||
|
||
# Отправляем промпт в stdin
|
||
stdout, _ = await asyncio.wait_for(
|
||
process.communicate(input=prompt_bytes),
|
||
timeout=120.0
|
||
)
|
||
|
||
full_output = stdout.decode('utf-8', errors='replace')
|
||
|
||
# Очищаем от ANSI кодов и служебных символов
|
||
full_output = self._clean_output(full_output)
|
||
|
||
return full_output
|
||
|
||
except asyncio.TimeoutError:
|
||
logger.error("Opencode timeout")
|
||
return "⏱️ Таймаут выполнения (2 минуты)"
|
||
except Exception as e:
|
||
logger.error(f"Opencode error: {e}")
|
||
return f"❌ Ошибка opencode: {str(e)}"
|
||
|
||
def _clean_output(self, output: str) -> str:
|
||
"""Очистить вывод от служебных символов."""
|
||
# Убираем ANSI escape последовательности
|
||
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
||
output = ansi_escape.sub('', output)
|
||
|
||
# Убираем служебные строки
|
||
lines = output.split('\n')
|
||
cleaned_lines = []
|
||
|
||
for line in lines:
|
||
# Пропускаем служебные строки
|
||
if any(x in line.lower() for x in ['build', 'minimax', 'gpt', 'elapsed', 'rss', 'bun v']):
|
||
continue
|
||
if line.startswith('>'):
|
||
continue
|
||
if not line.strip():
|
||
continue
|
||
cleaned_lines.append(line)
|
||
|
||
return "\n".join(cleaned_lines).strip()
|
||
|
||
async def chat(
|
||
self,
|
||
prompt: str,
|
||
system_prompt: Optional[str] = None,
|
||
context: Optional[List[Dict[str, str]]] = None,
|
||
tools: Optional[List[Dict[str, Any]]] = None,
|
||
on_chunk: Optional[Callable[[str], Any]] = None,
|
||
user_id: Optional[int] = None,
|
||
memory_context: Optional[str] = None,
|
||
**kwargs
|
||
) -> ProviderResponse:
|
||
"""
|
||
Отправить запрос к Opencode.
|
||
|
||
Args:
|
||
prompt: Запрос пользователя
|
||
system_prompt: Системный промпт
|
||
context: История диалога
|
||
tools: Доступные инструменты (схема) - пока не используется
|
||
on_chunk: Callback для потокового вывода
|
||
user_id: ID пользователя
|
||
memory_context: Контекст из памяти бота
|
||
|
||
Returns:
|
||
ProviderResponse с ответом
|
||
"""
|
||
if not self.is_available():
|
||
return ProviderResponse(
|
||
success=False,
|
||
error="Opencode CLI не найден",
|
||
provider_name=self.provider_name
|
||
)
|
||
|
||
if user_id is None:
|
||
return ProviderResponse(
|
||
success=False,
|
||
error="user_id обязателен для Opencode",
|
||
provider_name=self.provider_name
|
||
)
|
||
|
||
try:
|
||
# Получаем текущую модель
|
||
model = self.get_model(user_id)
|
||
logger.info(f"Opencode: user_id={user_id}, model={model}, session={self._sessions.get(user_id)}")
|
||
|
||
# Собираем контекст
|
||
full_context = self._build_context(
|
||
system_prompt=system_prompt,
|
||
context=context,
|
||
memory_context=memory_context or ""
|
||
)
|
||
|
||
# Формируем полный промпт
|
||
# Когда prompt=None (из process_with_tools), используем контекст напрямую
|
||
if prompt is None:
|
||
full_prompt = full_context if full_context else ""
|
||
elif full_context:
|
||
full_prompt = f"{full_context}\n\n=== CURRENT REQUEST ===\n{prompt}"
|
||
else:
|
||
full_prompt = prompt
|
||
|
||
# Добавляем информацию об инструментах если есть
|
||
if tools:
|
||
tools_info = self._format_tools_for_prompt(tools)
|
||
full_prompt = f"{full_prompt}\n\n=== AVAILABLE TOOLS ===\n{tools_info}"
|
||
|
||
logger.info(f"Opencode request (model={model}): {str(prompt)[:50] if prompt else 'from context'}...")
|
||
|
||
# Выполняем запрос
|
||
result = await self._run_opencode(
|
||
prompt=full_prompt,
|
||
model=model,
|
||
on_chunk=on_chunk
|
||
)
|
||
|
||
if not result:
|
||
result = "⚠️ Пустой ответ от Opencode"
|
||
|
||
return ProviderResponse(
|
||
success=True,
|
||
message=AIMessage(
|
||
content=result,
|
||
metadata={"model": model}
|
||
),
|
||
provider_name=self.provider_name
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Opencode provider error: {e}")
|
||
return ProviderResponse(
|
||
success=False,
|
||
error=str(e),
|
||
provider_name=self.provider_name
|
||
)
|
||
|
||
def _format_tools_for_prompt(self, tools: List[Dict[str, Any]]) -> str:
|
||
"""Форматировать инструменты для промпта."""
|
||
if not tools:
|
||
return ""
|
||
|
||
lines = ["У тебя есть следующие инструменты:\n"]
|
||
|
||
for tool in tools:
|
||
name = tool.get('name', 'unknown')
|
||
desc = tool.get('description', 'Нет описания')
|
||
params = tool.get('parameters', {})
|
||
|
||
lines.append(f"- {name}: {desc}")
|
||
if params:
|
||
props = params.get('properties', {})
|
||
if props:
|
||
lines.append(f" Параметры: {', '.join(props.keys())}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
async def execute_tool(
|
||
self,
|
||
tool_name: str,
|
||
tool_args: Dict[str, Any],
|
||
tool_call_id: Optional[str] = None,
|
||
**kwargs
|
||
) -> ToolCall:
|
||
"""Выполнить инструмент (заглушка)."""
|
||
return ToolCall(
|
||
tool_name=tool_name,
|
||
tool_args=tool_args,
|
||
tool_call_id=tool_call_id,
|
||
status=ToolCallStatus.PENDING
|
||
)
|
||
|
||
|
||
# Глобальный экземпляр
|
||
opencode_provider = OpencodeProvider()
|