v0.7.2: Улучшения AI-провайдеров, инструменты и обработчики

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
mirivlad 2026-02-27 18:07:57 +08:00
parent 0648bc43a8
commit fbf0edc60a
16 changed files with 2553 additions and 239 deletions

140
FILE_SYSTEM_TOOL.md Normal file
View File

@ -0,0 +1,140 @@
# File System Tool - Документация
## 📋 Описание
Инструмент для работы с файловой системой Linux. Позволяет AI-агенту (Qwen Code или GigaChat) выполнять операции с файлами и директориями.
## 🎯 Доступные операции
| Операция | Описание | Параметры |
|----------|----------|-----------|
| `read` | Чтение файла | `path`, `limit` (макс. строк) |
| `write` | Запись в файл | `path`, `content`, `append` |
| `copy` | Копирование файла/директории | `source`, `destination` |
| `move` | Перемещение/переименование | `source`, `destination` |
| `delete` | Удаление файла/директории | `path`, `recursive` |
| `mkdir` | Создание директории | `path`, `parents` |
| `list` | Список файлов в директории | `path`, `show_hidden` |
| `info` | Информация о файле | `path` |
| `search` | Поиск файлов по паттерну | `path`, `pattern`, `max_results` |
| `shell` | Выполнение shell-команды | `command`, `timeout` |
## 🔒 Безопасность
Инструмент имеет систему проверки путей:
### Разрешённые пути (можно читать/записывать):
- Домашняя директория пользователя (`/home/mirivlad`)
- `/tmp`
- `/var/tmp`
### Запрещённые пути (только чтение с ограничениями):
- `/etc`, `/usr`, `/bin`, `/sbin`
- `/boot`, `/dev`, `/proc`, `/sys`
- Корень `/` (кроме разрешённых поддиректорий)
## 📝 Примеры использования
### Через AI-агента (автоматически)
```
Пользователь: "прочитай файл /home/mirivlad/test.txt"
AI-агент → file_system_tool(operation='read', path='/home/mirivlad/test.txt')
```
### Прямой вызов
```python
from bot.tools.file_system_tool import FileSystemTool
tool = FileSystemTool()
# Чтение файла
result = await tool.execute(operation='read', path='/path/to/file.txt')
# Запись файла
result = await tool.execute(
operation='write',
path='/path/to/file.txt',
content='Содержимое файла'
)
# Копирование
result = await tool.execute(
operation='copy',
source='/source/file.txt',
destination='/dest/file.txt'
)
# Список директории
result = await tool.execute(
operation='list',
path='/home/mirivlad'
)
```
## 🤖 Интеграция с AI-провайдерами
### GigaChat
GigaChat использует текстовый формат для вызова инструментов:
````
```tool
{"name": "file_system_tool", "arguments": {"operation": "read", "path": "/tmp/test.txt"}}
```
````
### Qwen Code
Qwen Code поддерживает нативные tool calls через stream-json.
## 📊 Триггеры для активации
AI-агент автоматически активирует `file_system_tool` при обнаружении триггеров:
### Прямые триггеры:
- "прочитай файл", "покажи файл", "открой файл"
- "создай файл", "запиши в файл", "сохрани"
- "скопируй файл", "перемести файл", "удали файл"
- "создай директорию", "создай папку"
- "список файлов", "что в папке"
- "найди файл", "поиск файла"
### Команды Unix:
- `cat `, `ls `, `mkdir `, `cp `, `mv `, `rm `, `touch `
## ⚠️ Ограничения
1. **Безопасность**: Нельзя записывать/удалять в системных директориях
2. **Размер файлов**: При чтении ограничено 100 строками (настраивается через `limit`)
3. **Shell команды**: Разрешены только безопасные команды (`ls`, `cat`, `cp`, `mv`, `rm`, `mkdir`, `find`, `grep`, etc.)
4. **Таймаут**: Для shell команд таймаут 30 секунд по умолчанию
## 🔄 История операций
Инструмент сохраняет историю последних 100 операций для отладки:
```python
tool._operation_history # Список последних операций
```
## 📁 Расположение
```
bot/tools/file_system_tool.py
```
## 🔧 Добавление в реестр
Инструмент автоматически регистрируется при импорте:
```python
from bot.tools import file_system_tool # Авто-регистрация
```
---
**Версия:** 1.0
**Совместимость:** Telegram CLI Bot 0.7.1+
**AI-провайдеры:** Qwen Code, GigaChat

291
bot.py
View File

@ -137,6 +137,14 @@ async def handle_ai_task(update: Update, text: str):
user_id = update.effective_user.id
state = state_manager.get(user_id)
# === ПРОВЕРКА: AI-пресет ===
ai_preset = state.ai_preset
# Если ИИ отключен — пропускаем обработку
if ai_preset == "off":
logger.info(f"Пользователь {user_id}: ИИ отключен, пропускаем обработку")
return
# === ПРОВЕРКА: Нужна ли компактификация? ===
# Проверяем порог заполненности контекста
if compactor.check_compaction_needed():
@ -174,10 +182,10 @@ async def handle_ai_task(update: Update, text: str):
# === ПРОВЕРКА: Решение AI агента об использовании инструментов ===
agent_decision = await ai_agent.decide(text, context={'user_id': user_id})
if agent_decision.should_use_tool:
logger.info(f"AI агент решил использовать инструмент: {agent_decision.tool_name} (confidence={agent_decision.confidence})")
# Выполняем инструмент
tool_result = await ai_agent.execute_tool(
agent_decision.tool_name,
@ -261,11 +269,12 @@ async def handle_ai_task(update: Update, text: str):
output_buffer.append(chunk_text)
# В result_buffer добавляем ТОЛЬКО если это не статус инструмента
# Статусы инструментов начинаются с "\n🔧"
if not chunk_text.strip().startswith("🔧"):
# Статусы инструментов содержат "🔧 Использую инструмент:"
is_tool_status = "🔧 Использую инструмент:" in chunk_text
if not is_tool_status:
result_buffer.append(chunk_text)
logger.debug(f"output_buffer: {len(output_buffer)}, result_buffer: {len(result_buffer)}")
logger.debug(f"output_buffer: {len(output_buffer)}, result_buffer: {len(result_buffer)}, is_tool_status: {is_tool_status}")
# Если сообщение ещё не создано - создаём
if stream_message is None:
@ -284,12 +293,22 @@ async def handle_ai_task(update: Update, text: str):
# Обновляем сообщение
try:
# Экранируем специальные символы Markdown для безопасной отправки
from bot.utils.formatters import escape_markdown
escaped_output = escape_markdown(current_output)
await stream_message.edit_text(
f"{current_status}\n\n{current_output}",
f"{current_status}\n\n{escaped_output}",
parse_mode="Markdown"
)
except Exception as e:
logger.debug(f"Ошибка редактирования: {e}")
# Фоллбэк: отправляем без Markdown
try:
await stream_message.edit_text(
f"{current_status}\n\n{current_output}"
)
except:
pass
await asyncio.sleep(0.3)
# Формируем контекст с историей + памятью + summary
@ -316,12 +335,48 @@ async def handle_ai_task(update: Update, text: str):
MAX_CONTEXT_TOKENS = 200_000
context_percent = round((context_tokens / MAX_CONTEXT_TOKENS) * 100, 1)
# Получаем текущего AI-провайдера
# Получаем текущий AI-пресет
ai_preset = state.ai_preset
# Определяем провайдера и модель на основе пресета
from bot.models.user_state import (
AI_PRESET_OFF,
AI_PRESET_QWEN,
AI_PRESET_GIGA_AUTO,
AI_PRESET_GIGA_LITE,
AI_PRESET_GIGA_PRO,
)
if ai_preset == AI_PRESET_OFF:
# ИИ отключен - не должны были сюда попасть
logger.warning(f"Попытка обработки AI-запроса при отключенном ИИ (пресет={ai_preset})")
return
if ai_preset == AI_PRESET_QWEN:
current_provider = "qwen"
provider_display = "Qwen Code"
elif ai_preset in [AI_PRESET_GIGA_AUTO, AI_PRESET_GIGA_LITE, AI_PRESET_GIGA_PRO]:
current_provider = "gigachat"
# Для GigaChat пресетов устанавливаем нужную модель
from bot.tools.gigachat_tool import GigaChatConfig
if ai_preset == AI_PRESET_GIGA_LITE:
# Принудительно Lite модель
GigaChatConfig.model = GigaChatConfig.model_lite
elif ai_preset == AI_PRESET_GIGA_PRO:
# Принудительно Pro модель
GigaChatConfig.model = GigaChatConfig.model_pro
# ai_preset == AI_PRESET_GIGA_AUTO использует авто-переключение в gigachat_tool.py
provider_display = f"GigaChat ({ai_preset})"
else:
# По умолчанию Qwen
current_provider = "qwen"
provider_display = "Qwen Code"
logger.info(f"AI-пресет: {ai_preset}, провайдер: {current_provider}")
# Получаем менеджера провайдеров
from bot.ai_provider_manager import get_ai_provider_manager
provider_manager = get_ai_provider_manager()
current_provider = provider_manager.get_current_provider(state)
logger.info(f"Обработка AI-запроса через провайдер: {current_provider}")
# Собираем полный промпт с системным промптом
system_prompt = qwen_manager.load_system_prompt()
@ -385,24 +440,52 @@ async def handle_ai_task(update: Update, text: str):
except Exception as e:
logger.debug(f"Ошибка обновления статуса для GigaChat: {e}")
# Формируем контекст для GigaChat из памяти и истории
# Это обеспечивает ту же функциональность что и для Qwen
context_messages = []
# Добавляем summary если есть
if summary:
context_messages.append({
"role": "system",
"content": f"=== SUMMARY ДИАЛОГА ===\n{summary}"
})
# Добавляем контекст памяти
if memory_context:
context_messages.append({
"role": "system",
"content": f"=== КОНТЕКСТ ПАМЯТИ ===\n{memory_context}"
})
# Добавляем историю диалога
if history_context:
for line in history_context.split("\n"):
if line.startswith("user:"):
context_messages.append({"role": "user", "content": line[5:].strip()})
elif line.startswith("assistant:"):
context_messages.append({"role": "assistant", "content": line[10:].strip()})
result = await provider_manager.execute_request(
provider_id=current_provider,
user_id=user_id,
prompt=full_task,
system_prompt=system_prompt,
prompt=text, # Только запрос пользователя
system_prompt=system_prompt, # System prompt отдельно
context=context_messages, # Контекст из памяти и истории
on_chunk=None # GigaChat не поддерживает потоковый вывод
)
if result.get("success"):
full_output = result.get("content", "")
# Получаем информацию о модели
model_name = result.get("metadata", {}).get("model")
if model_name:
provider_name = f"GigaChat ({model_name})"
else:
provider_name = "GigaChat"
else:
full_output = f"❌ **Ошибка {provider_manager.get_provider_info(current_provider).name}:**\n{result.get('error', 'Неизвестная ошибка')}"
provider_name = "GigaChat"
else:
full_output = f"❌ Неизвестный провайдер: {current_provider}"
provider_name = "Unknown"
provider_name = "GigaChat"
# Добавляем ответ ИИ в историю и память
if full_output and full_output != "⚠️ Не удалось получить ответ ИИ":
@ -1287,12 +1370,61 @@ async def _execute_composite_command_ssh(update: Update, command: str, server: S
async def _execute_local_command_message(update: Update, command: str, working_dir: str):
"""Выполнение локальной команды из сообщения через pexpect."""
"""Выполнение локальной команды из сообщения."""
user_id = update.effective_user.id
# Для простых команд используем subprocess (быстро и надёжно)
# Для интерактивных команд (sudo, ssh и т.д.) нужен pexpect
logger.info(f"Выполнение локальной команды: {command} в {working_dir}")
# Проверяем, нужна ли интерактивность
needs_interactive = any(cmd in command for cmd in ['sudo', 'ssh', 'su ', 'passwd', 'login'])
if needs_interactive:
logger.info("Команда требует интерактивного ввода, используем pexpect")
await _execute_local_command_interactive(update, command, working_dir)
else:
logger.info("Выполняю команду через subprocess")
await _execute_local_command_subprocess(update, command, working_dir)
async def _execute_local_command_subprocess(update: Update, command: str, working_dir: str):
"""Выполнение локальной команды через subprocess (без интерактивности)."""
try:
logger.info(f"Создаю subprocess: {command}")
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=working_dir
)
logger.info(f"Process PID: {process.pid}, жду выполнения...")
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30)
logger.info(f"Process завершен с кодом: {process.returncode}")
output = stdout.decode("utf-8", errors="replace").strip()
error = stderr.decode("utf-8", errors="replace").strip()
logger.info(f"Output length: {len(output)}, Error length: {len(error)}")
await _show_result_message(update, command, output, error, process.returncode)
except asyncio.TimeoutError:
logger.error("Таймаут выполнения команды")
await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown")
except Exception as e:
logger.error(f"Ошибка: {e}")
await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown")
async def _execute_local_command_interactive(update: Update, command: str, working_dir: str):
"""Выполнение локальной команды через pexpect (с поддержкой интерактивности)."""
user_id = update.effective_user.id
try:
logger.info(f"Запуск команды через pexpect: {command}")
# Создаём интерактивный процесс
child = pexpect.spawn(
'/bin/bash',
@ -1303,20 +1435,20 @@ async def _execute_local_command_message(update: Update, command: str, working_d
echo=False,
timeout=30
)
# Создаём сессию (используем child вместо master_fd)
# Создаём сессию
session = local_session_manager.create_session(
user_id=user_id,
command=command,
master_fd=child.child_fd,
pid=child.pid
)
session.context = {'child': child} # Сохраняем child объект
session.context = {'child': child}
# Читаем начальный вывод
logger.info("Чтение вывода...")
output = ""
try:
# Пробуем прочитать с таймаутом
while True:
@ -1325,24 +1457,24 @@ async def _execute_local_command_message(update: Update, command: str, working_d
break
output += line
logger.debug(f"Прочитано: {len(line)} символов")
# Проверяем запрос ввода
if detect_input_type(output):
break
except pexpect.TIMEOUT:
pass
except pexpect.EOF:
pass
logger.info(f"Прочитано: {len(output)} символов")
session.output_buffer = output
session.last_activity = datetime.now()
# Проверяем тип ввода
input_type = detect_input_type(output)
logger.info(f"Тип ввода: {input_type}")
if input_type == "password":
session.waiting_for_input = True
session.input_type = "password"
@ -1485,25 +1617,92 @@ async def _execute_ssh_command_message(update: Update, command: str, server: Ser
async def _show_result_message(update: Update, command: str, output: str, error: str, returncode: int):
"""Показ результата выполнения команды."""
logger.info(f"_show_result_message: output_len={len(output)}, error_len={len(error)}")
# Очистка ANSI-кодов и нормализация
output = normalize_output(clean_ansi_codes(output)) if output else ""
if output:
output = clean_ansi_codes(output)
logger.info(f"После clean_ansi_codes: output_len={len(output)}")
output = normalize_output(output)
logger.info(f"После normalize_output: output_len={len(output)}")
else:
output = ""
error = clean_ansi_codes(error) if error else ""
result = f"✅ *Результат:*\n\n"
if output:
# Форматируем длинный вывод: первые 5 и последние 10 строк
output = format_long_output(output, max_lines=15, head_lines=5, tail_lines=10)
# Показываем ВЕСЬ вывод, разбивая на сообщения если нужно
# Экранируем backticks в output чтобы они не ломали блоки кода
output = output.replace("```", "\\`\\`\\`").replace("`", "\\`")
result += f"```\n{output}\n```\n"
logger.info(f"Добавлен output в результат, длина result={len(result)}")
else:
logger.warning("output пустой после обработки!")
if error:
# Экранируем backticks в error
error = error.replace("```", "\\`\\`\\`").replace("`", "\\`")
result += f"*Ошибки:*\n```\n{error}\n```\n"
result += f"\n*Код возврата:* `{returncode}`"
# Экранируем backticks и отправляем с разбивкой
result = escape_markdown(result)
await send_long_message(update, result, parse_mode="Markdown")
# Экранируем специальные символы Markdown ТОЛЬКО вне блоков кода
# Блоки кода (```) уже защищены — их содержимое не трогаем
# Экранируем: * _ ( ) [ ] но не ` и не содержимое ```
result = smart_escape_markdown(result)
logger.info(f"Отправляю сообщение, длина={len(result)}")
await send_long_message(update, result, parse_mode="Markdown", pause_every=3)
logger.info("Сообщение отправлено")
def smart_escape_markdown(text: str) -> str:
"""
Умное экранирование Markdown только вне блоков кода.
Не трогает уже существующую разметку (*жирный*, урсив_, `код`).
"""
# Разбиваем на части: внутри ``` и снаружи
parts = text.split("```")
escaped_parts = []
for i, part in enumerate(parts):
if i % 2 == 0:
# Вне блоков кода — экранируем ТОЛЬКО одиночные спецсимволы
# Не трогаем: *текст*, _текст_, `код`, [текст](url)
escaped = escape_unescaped_special_chars(part)
escaped_parts.append(escaped)
else:
# Внутри блоков кода — не трогаем
escaped_parts.append(part)
return "```".join(escaped_parts)
def escape_unescaped_special_chars(text: str) -> str:
"""
Экранирует спецсимволы Markdown которые ещё не экранированы.
Не трогает уже размеченный текст.
"""
# Сначала экранируем обратные слэши
text = text.replace('\\', '\\\\')
# Экранируем * _ [ ] ( ) которые не являются частью разметки
# Простая эвристика: экранируем если символ не окружён буквами/цифрами
import re
# Экранируем * если это не *текст*
# text = re.sub(r'(?<!\w)\*(?!\w)', r'\*', text) # Оставляем *текст*
# Для простоты — экранируем только одиночные символы в начале строки или после пробела
# Это предотвращает экранирование *Результат:* но экранирует случайные * в выводе
# На самом деле, для вывода команд лучше вообще не экранировать * и _
# Экранируем только [ ] ( ) которые могут сломать ссылки
text = text.replace('[', '\\[')
text = text.replace(']', '\\]')
return text
async def post_init(application: Application):
@ -1516,6 +1715,12 @@ async def post_init(application: Application):
BotCommand("settings", "Настройки"),
BotCommand("cron", "Управление задачами"),
BotCommand("stop", "Прервать SSH-сессию"),
BotCommand("ai_presets", "🎛️ Выбор AI-провайдера"),
BotCommand("ai_off", "⌨️ ИИ Отключен (CLI режим)"),
BotCommand("ai_qwen", "💻 Qwen Code (бесплатно)"),
BotCommand("ai_giga_auto", "🧠 GigaChat Авто (Lite/Pro)"),
BotCommand("ai_giga_lite", "🚀 GigaChat Lite (дешево)"),
BotCommand("ai_giga_pro", "👑 GigaChat Pro (максимум)"),
BotCommand("ai", "Задача для Qwen Code AI"),
BotCommand("memory", "Статистика памяти ИИ"),
BotCommand("facts", "Показать сохранённые факты"),
@ -1887,8 +2092,9 @@ def main():
init_menus(menu_builder)
# Инициализация AIProviderManager
from qwen_integration import qwen_manager, gigachat_provider
init_ai_provider_manager(qwen_manager, gigachat_provider)
from qwen_integration import qwen_manager
from bot.tools import tools_registry
init_ai_provider_manager(qwen_manager, tools_registry)
# Создание приложения с таймаутами и прокси
builder = (
@ -1913,16 +2119,21 @@ def main():
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("settings", settings_command))
application.add_handler(CommandHandler("cron", cron_command))
application.add_handler(CommandHandler("rss", rss_command))
application.add_handler(CommandHandler("menu", menu_command))
application.add_handler(CommandHandler("stop", stop_command))
application.add_handler(CommandHandler("memory", memory_command))
application.add_handler(CommandHandler("compact", compact_command))
application.add_handler(CommandHandler("facts", facts_command))
application.add_handler(CommandHandler("forget", forget_command))
application.add_handler(CommandHandler("rss", rss_command))
application.add_handler(CommandHandler("ai", ai_command))
# AI-пресеты
from bot.handlers.ai_presets import register_ai_preset_handlers
register_ai_preset_handlers(application)
application.add_handler(CallbackQueryHandler(menu_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
application.add_handler(CommandHandler("ai", ai_command))
# Запуск
logger.info("Запуск бота...")

View File

@ -72,6 +72,20 @@ class AIAgent:
'повторяй', 'каждую неделю', 'ежедневно', 'ежечасно'
]
# Триггеры для работы с файлами (File System Tool)
FILE_SYSTEM_TRIGGERS = [
'прочитай файл', 'покажи файл', 'открой файл', 'посмотри файл',
'создай файл', 'запиши в файл', 'сохрани в файл',
'скопируй файл', 'перемести файл', 'удали файл',
'создай директорию', 'создай папку', 'покажи директорию',
'список файлов', 'что в папке', 'что в директории',
'найди файл', 'поиск файла', 'переименуй файл',
'посмотри содержимое', 'содержимое файла', 'cat ',
'ls ', 'mkdir ', 'cp ', 'mv ', 'rm ', 'touch ',
'сохрани текст', 'запиши текст', 'скопируй', 'перемести',
'удали директорию', 'удали папку', 'покажи файлы'
]
def __init__(self):
self.registry = tools_registry
self._tool_use_history: List[Dict] = []
@ -177,6 +191,34 @@ class AIAgent:
return score >= 0.8, score
def _should_use_file_system(self, message: str) -> tuple[bool, float]:
"""Проверить, нужна ли операция с файловой системой."""
message_lower = message.lower()
score = 0.0
# Прямые триггеры
for trigger in self.FILE_SYSTEM_TRIGGERS:
if trigger in message_lower:
return True, 0.9
# Операции с файлами
file_operations = ['прочитай', 'покажи', 'создай', 'запиши', 'скопируй', 'перемести', 'удали', 'открой']
file_objects = ['файл', 'директорию', 'папку', 'документ', 'текст', 'содержимое']
has_op = any(op in message_lower for op in file_operations)
has_obj = any(obj in message_lower for obj in file_objects)
if has_op and has_obj:
score = max(score, 0.75)
# Упоминания конкретных команд
commands = ['cat', 'ls', 'mkdir', 'cp', 'mv', 'rm', 'touch', 'pwd']
for cmd in commands:
if f'{cmd} ' in message_lower or message_lower.endswith(cmd):
score = max(score, 0.85)
return score >= 0.75, score
async def decide(self, message: str, context: Optional[Dict] = None) -> AgentDecision:
"""
Принять решение об использовании инструмента.
@ -190,10 +232,21 @@ class AIAgent:
"""
user_id = context.get('user_id') if context else None
# Приоритет: SSH > Cron > Поиск > RSS
# Приоритет: File System > SSH > Cron > Поиск > RSS
# Проверяем в порядке приоритета
# 1. Проверка на SSH-команды (системные задачи)
# 1. Проверка на операции с файловой системой (ВЫСОКИЙ ПРИОРИТЕТ)
should_fs, fs_conf = self._should_use_file_system(message)
if should_fs and fs_conf > 0.75:
return AgentDecision(
should_use_tool=True,
tool_name='file_system_tool',
tool_args=self._extract_file_system_args(message),
confidence=fs_conf,
reasoning='Пользователю нужно выполнить операцию с файлами'
)
# 2. Проверка на SSH-команды (системные задачи)
should_ssh, ssh_conf = self._should_use_ssh(message)
if should_ssh and ssh_conf > 0.75:
return AgentDecision(
@ -204,7 +257,7 @@ class AIAgent:
reasoning='Пользователю нужно выполнить команду на сервере'
)
# 2. Проверка на Cron-задачи (планирование)
# 3. Проверка на Cron-задачи (планирование)
should_cron, cron_conf = self._should_use_cron(message)
if should_cron and cron_conf > 0.75:
return AgentDecision(
@ -215,7 +268,7 @@ class AIAgent:
reasoning='Пользователь хочет создать или управлять задачей'
)
# 3. Проверка на поиск
# 4. Проверка на поиск
should_search, search_conf = self._should_search(message)
if should_search and search_conf > 0.7:
query = self._extract_search_query(message)
@ -227,7 +280,7 @@ class AIAgent:
reasoning='Пользователю нужна информация из интернета'
)
# 4. Проверка на RSS — только явные запросы
# 5. Проверка на RSS — только явные запросы
should_rss, rss_conf = self._should_read_rss(message)
if should_rss: # Порог уже проверен в _should_read_rss (0.95)
return AgentDecision(
@ -279,6 +332,146 @@ class AIAgent:
# Возвращаем оригинальное сообщение как команду
return message
def _extract_file_system_args(self, message: str) -> Dict[str, Any]:
"""
Извлечь аргументы для file_system_tool из сообщения.
Возвращает dict с operation и другими параметрами.
"""
import re
message_lower = message.lower()
# Определяем операцию по триггерам
operation_map = {
'прочитай файл': 'read',
'покажи файл': 'read',
'открой файл': 'read',
'посмотри файл': 'read',
'посмотри содержимое': 'read',
'содержимое файла': 'read',
'cat ': 'read',
'создай файл': 'write',
'запиши в файл': 'write',
'сохрани в файл': 'write',
'сохрани текст': 'write',
'запиши текст': 'write',
'touch ': 'write',
'скопируй файл': 'copy',
'скопируй': 'copy',
'cp ': 'copy',
'перемести файл': 'move',
'перемести': 'move',
'mv ': 'move',
'переименуй файл': 'move', # Переименование = перемещение
'удали файл': 'delete',
'удали директорию': 'delete',
'удали папку': 'delete',
'rm ': 'delete',
'создай директорию': 'mkdir',
'создай папку': 'mkdir',
'mkdir ': 'mkdir',
'покажи директорию': 'list',
'список файлов': 'list',
'что в папке': 'list',
'что в директории': 'list',
'покажи файлы': 'list',
'ls ': 'list',
'найди файл': 'search',
'поиск файла': 'search',
}
# Определяем операцию
operation = 'shell' # по умолчанию
for trigger, op in operation_map.items():
if trigger in message_lower:
operation = op
break
# Извлекаем путь (после команды)
path = None
source = None
destination = None
content = None
# Паттерн для извлечения пути после команды
for cmd in ['cat', 'ls', 'mkdir', 'rm', 'touch']:
match = re.search(rf'{cmd}\s+([^\s]+)', message_lower)
if match:
path = match.group(1).strip()
break
# Для copy/move ищем два пути
if operation in ('copy', 'move'):
# Ищем паттерн "X в Y" или "X Y"
match = re.search(r'([^\s]+)\s+(?:в|into|to)\s+([^\s]+)', message_lower)
if match:
source = match.group(1).strip()
destination = match.group(2).strip()
else:
# Просто два слова подряд
parts = message.split()
for i, part in enumerate(parts):
if part.lower() in ['cp', 'mv', 'copy', 'move', 'скопируй', 'перемести']:
if i + 2 < len(parts):
source = parts[i + 1].strip()
destination = parts[i + 2].strip()
break
# Для write пытаемся извлечь содержимое
if operation == 'write':
# Ищем текст после "сохрани" или "запиши"
match = re.search(r'(?:сохрани|запиши)\s*(?:в файл|текст)?\s*[:\-]?\s*(.+)', message, re.IGNORECASE)
if match:
content = match.group(1).strip()
# Если есть кавычки - извлекаем содержимое
quoted = re.search(r'["\']([^"\']+)["\']', message)
if quoted:
content = quoted.group(1)
# Для search ищем паттерн
pattern = '*'
if operation == 'search':
match = re.search(r'pattern\s*[=:]\s*([^\s]+)', message_lower)
if match:
pattern = match.group(1).strip()
# Или ищем *.extension
glob_match = re.search(r'\*\.[^\s]+', message_lower)
if glob_match:
pattern = glob_match.group(0).strip()
# Формируем аргументы
args = {'operation': operation}
if path:
args['path'] = path
if source:
args['source'] = source
if destination:
args['destination'] = destination
if content:
args['content'] = content
if pattern and operation == 'search':
args['pattern'] = pattern
# Если путь не найден, пробуем извлечь общее слово после операции
if not path and not source:
words = message.split()
for i, word in enumerate(words):
if word.lower() in ['cat', 'ls', 'mkdir', 'rm', 'touch', 'read', 'write', 'delete', 'list']:
if i + 1 < len(words):
args['path'] = words[i + 1].strip()
break
logger.info(f"Извлечены аргументы file_system: {args}")
return args
async def execute_tool(self, tool_name: str, **kwargs) -> ToolResult:
"""Выполнить инструмент и сохранить историю."""
logger.info(f"🤖 AI-агент выполняет инструмент: {tool_name} с аргументами: {kwargs}")

View File

@ -43,16 +43,15 @@ class AIProviderManager:
через активного провайдера с поддержкой инструментов.
"""
def __init__(self, qwen_manager=None, gigachat_provider=None):
def __init__(self, qwen_manager=None):
self._qwen_manager = qwen_manager
self._gigachat_provider = gigachat_provider
self._provider_status: Dict[str, bool] = {}
self._providers: Dict[str, BaseAIProvider] = {}
self._tools_registry: Dict[str, Any] = {}
# Инициализируем провайдеров
self._init_providers()
# Проверяем доступность провайдеров при инициализации
self._check_provider_status()
@ -63,12 +62,11 @@ class AIProviderManager:
from bot.providers.qwen_provider import QwenCodeProvider
self._providers[AIProvider.QWEN.value] = QwenCodeProvider(self._qwen_manager)
logger.info("Qwen Code Provider инициализирован")
# GigaChat Provider
if self._gigachat_provider:
from bot.providers.gigachat_provider import GigaChatProvider
self._providers[AIProvider.GIGACHAT.value] = GigaChatProvider(self._gigachat_provider)
logger.info("GigaChat Provider инициализирован")
# GigaChat Provider - создаём новый экземпляр напрямую
from bot.providers.gigachat_provider import GigaChatProvider
self._providers[AIProvider.GIGACHAT.value] = GigaChatProvider()
logger.info("GigaChat Provider инициализирован")
def set_tools_registry(self, tools_registry: Dict[str, Any]):
"""Установить реестр инструментов для всех провайдеров."""
@ -82,10 +80,11 @@ class AIProviderManager:
"""Проверка доступности провайдеров."""
# Проверяем Qwen
self._provider_status[AIProvider.QWEN.value] = True # Qwen всегда доступен
# Проверяем GigaChat
if self._gigachat_provider:
self._provider_status[AIProvider.GIGACHAT.value] = self._gigachat_provider.is_available()
gigachat_provider = self._providers.get(AIProvider.GIGACHAT.value)
if gigachat_provider:
self._provider_status[AIProvider.GIGACHAT.value] = gigachat_provider.is_available()
else:
self._provider_status[AIProvider.GIGACHAT.value] = False
@ -211,6 +210,11 @@ class AIProviderManager:
)
if response.success:
# Получаем информацию о модели из metadata ответа
model_name = None
if response.message and response.message.metadata:
model_name = response.message.metadata.get("model")
return {
"success": True,
"content": response.message.content if response.message else "",
@ -218,7 +222,8 @@ class AIProviderManager:
"metadata": {
"provider_name": response.provider_name,
"usage": response.usage,
"tool_calls": len(response.message.tool_calls) if response.message and response.message.tool_calls else 0
"tool_calls": len(response.message.tool_calls) if response.message and response.message.tool_calls else 0,
"model": model_name # Добавляем модель
}
}
else:
@ -241,10 +246,15 @@ class AIProviderManager:
ai_provider_manager: Optional[AIProviderManager] = None
def init_ai_provider_manager(qwen_manager, gigachat_provider) -> AIProviderManager:
def init_ai_provider_manager(qwen_manager, tools_registry=None) -> AIProviderManager:
"""Инициализировать глобальный AIProviderManager."""
global ai_provider_manager
ai_provider_manager = AIProviderManager(qwen_manager, gigachat_provider)
ai_provider_manager = AIProviderManager(qwen_manager)
# Устанавливаем реестр инструментов если предоставлен
if tools_registry:
ai_provider_manager.set_tools_registry(tools_registry)
logger.info(f"AIProviderManager инициализирован. Доступные провайдеры: {ai_provider_manager.get_available_providers()}")
return ai_provider_manager

View File

@ -6,6 +6,7 @@ Base AI Provider Protocol - универсальный интерфейс для
для работы с инструментами (tools).
"""
import json
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, Callable, List, AsyncGenerator
from dataclasses import dataclass, field
@ -138,13 +139,34 @@ class BaseAIProvider(ABC):
Провайдеры могут переопределить для кастомизации.
Args:
tools_registry: Словарь инструментов {name: tool_instance}
tools_registry: Словарь инструментов {name: tool_instance} или объект реестра
Returns:
Список схем инструментов
"""
schema = []
for name, tool in tools_registry.items():
# Обрабатываем разные типы tools_registry
if tools_registry is None:
return schema
# Если это ToolsRegistry с методом get_all()
if hasattr(tools_registry, 'get_all') and callable(getattr(tools_registry, 'get_all')):
items = tools_registry.get_all().items()
# Если это dict - используем .items()
elif isinstance(tools_registry, dict):
items = tools_registry.items()
# Если это объект с атрибутом tools
elif hasattr(tools_registry, 'tools'):
items = tools_registry.tools.items() if isinstance(tools_registry.tools, dict) else []
# Если это объект поддерживающий .items()
elif hasattr(tools_registry, 'items'):
items = tools_registry.items()
else:
logger.warning(f"Неизвестный тип tools_registry: {type(tools_registry)}")
return schema
for name, tool in items:
if hasattr(tool, 'get_schema'):
schema.append(tool.get_schema())
elif hasattr(tool, 'description'):
@ -195,20 +217,29 @@ class BaseAIProvider(ABC):
**kwargs
)
messages = []
# Формируем базовый контекст — БЕЗ system message
# System message будет передаваться отдельным параметром
base_messages = []
if context:
messages.extend(context)
messages.append({"role": "user", "content": prompt})
# Фильтруем system messages из context — они будут переданы через system_prompt
for msg in context:
if msg.get("role") != "system":
base_messages.append(msg)
base_messages.append({"role": "user", "content": prompt})
tools_schema = self.get_tools_schema(tools_registry) if self.supports_tools else None
# Копируем сообщения для каждой итерации
messages = base_messages.copy()
for iteration in range(max_iterations):
# Отправляем запрос провайдеру
# system_prompt передаётся всегда — провайдер сам решит как его использовать
response = await self.chat(
prompt=None, # Уже в messages
system_prompt=system_prompt,
context=messages if iteration == 0 else None,
context=messages,
tools=tools_schema,
on_chunk=on_chunk,
**kwargs
@ -232,8 +263,15 @@ class BaseAIProvider(ABC):
# Выполняем инструменты
tool_results = []
for tool_call in message.tool_calls:
if tool_call.tool_name in tools_registry:
tool = tools_registry[tool_call.tool_name]
# Проверяем наличие инструмента через метод .get() для поддержки ToolsRegistry
if hasattr(tools_registry, 'get'):
tool = tools_registry.get(tool_call.tool_name)
elif isinstance(tools_registry, dict):
tool = tools_registry.get(tool_call.tool_name)
else:
tool = None
if tool is not None:
try:
if hasattr(tool, 'execute'):
result = await tool.execute(
@ -244,18 +282,25 @@ class BaseAIProvider(ABC):
result = await tool(**tool_call.tool_args)
else:
result = f"Инструмент {tool_call.tool_name} не имеет метода execute"
tool_call.result = result
tool_call.status = ToolCallStatus.SUCCESS
except Exception as e:
tool_call.error = str(e)
tool_call.status = ToolCallStatus.ERROR
result = f"Ошибка: {e}"
# Преобразуем результат в JSON-сериализуемый формат
# ToolResult имеет метод to_dict(), строки оставляем как есть
if hasattr(result, 'to_dict'):
result_serializable = result.to_dict()
else:
result_serializable = result
tool_results.append({
"tool": tool_call.tool_name,
"args": tool_call.tool_args,
"result": result,
"result": result_serializable,
"status": tool_call.status.value
})
else:
@ -280,9 +325,11 @@ class BaseAIProvider(ABC):
]
})
# GigaChat требует валидный JSON в tool messages, а не Python repr строку
# Используем json.dumps для корректного форматирования
messages.append({
"role": "tool",
"content": str(tool_results)
"content": json.dumps(tool_results, ensure_ascii=False)
})
# Обновляем системный промпт для следующей итерации

264
bot/handlers/ai_presets.py Normal file
View File

@ -0,0 +1,264 @@
#!/usr/bin/env python3
"""
Обработчики команд для переключения AI-пресетов.
Доступные пресеты:
- off: ИИ отключен, режим CLI команд
- qwen: Qwen Code (бесплатно, локально)
- giga_auto: GigaChat авто-переключение (Lite/Pro)
- giga_lite: GigaChat Lite (дешевле)
- giga_pro: GigaChat Pro (максимальное качество)
"""
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import CommandHandler, CallbackQueryHandler
from bot.models.user_state import (
AI_PRESET_OFF,
AI_PRESET_QWEN,
AI_PRESET_GIGA_AUTO,
AI_PRESET_GIGA_LITE,
AI_PRESET_GIGA_PRO,
)
from bot.config import state_manager
logger = logging.getLogger(__name__)
# Описание пресетов
PRESET_DESCRIPTIONS = {
AI_PRESET_OFF: {
"name": "❌ ИИ Отключен",
"description": "Режим CLI команд. Бот выполняет команды напрямую.",
"icon": "⌨️"
},
AI_PRESET_QWEN: {
"name": "🤖 Qwen Code",
"description": "Бесплатно, локально. Лучший для кода и работы с файлами.",
"icon": "💻"
},
AI_PRESET_GIGA_AUTO: {
"name": "🔄 GigaChat Авто",
"description": "Умное переключение Lite/Pro. Простые → Lite, сложные → Pro.",
"icon": "🧠"
},
AI_PRESET_GIGA_LITE: {
"name": "⚡ GigaChat Lite",
"description": "Быстро и дёшево. Для простых вопросов и чата.",
"icon": "🚀"
},
AI_PRESET_GIGA_PRO: {
"name": "🔥 GigaChat Pro",
"description": "Максимальное качество. Для сложных творческих задач.",
"icon": "👑"
},
}
def get_preset_display_name(preset: str) -> str:
"""Получить отображаемое имя пресета."""
desc = PRESET_DESCRIPTIONS.get(preset, {})
return f"{desc.get('icon', '')} {desc.get('name', preset)}"
async def ai_presets_command(update: Update, context):
"""Показать меню выбора AI-пресета."""
user_id = update.effective_user.id
state = state_manager.get(user_id)
current_preset = state.ai_preset
# Формируем меню
keyboard = [
[
InlineKeyboardButton(
f"{'' if current_preset == AI_PRESET_OFF else ''} {PRESET_DESCRIPTIONS[AI_PRESET_OFF]['icon']} ИИ Отключен",
callback_data=f"ai_preset_{AI_PRESET_OFF}"
)
],
[
InlineKeyboardButton(
f"{'' if current_preset == AI_PRESET_QWEN else ''} {PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['icon']} Qwen Code",
callback_data=f"ai_preset_{AI_PRESET_QWEN}"
)
],
[
InlineKeyboardButton(
f"{'' if current_preset == AI_PRESET_GIGA_AUTO else ''} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['icon']} GigaChat Авто",
callback_data=f"ai_preset_{AI_PRESET_GIGA_AUTO}"
)
],
[
InlineKeyboardButton(
f"{'' if current_preset == AI_PRESET_GIGA_LITE else ''} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['icon']} GigaChat Lite",
callback_data=f"ai_preset_{AI_PRESET_GIGA_LITE}"
),
InlineKeyboardButton(
f"{'' if current_preset == AI_PRESET_GIGA_PRO else ''} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['icon']} GigaChat Pro",
callback_data=f"ai_preset_{AI_PRESET_GIGA_PRO}"
)
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
current_name = get_preset_display_name(current_preset)
output = f"🎛️ **Панель управления AI**\n\n"
output += f"**Текущий пресет:** {current_name}\n\n"
output += "Выберите AI-провайдер:\n\n"
output += " **Описание пресетов:**\n"
output += f"{PRESET_DESCRIPTIONS[AI_PRESET_OFF]['icon']} **ИИ Отключен** — {PRESET_DESCRIPTIONS[AI_PRESET_OFF]['description']}\n"
output += f"{PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['icon']} **Qwen Code** — {PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['description']}\n"
output += f"{PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['icon']} **GigaChat Авто** — {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['description']}\n"
output += f"{PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['icon']} **GigaChat Lite** — {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['description']}\n"
output += f"{PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['icon']} **GigaChat Pro** — {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['description']}"
await update.message.reply_text(output, parse_mode="Markdown", reply_markup=reply_markup)
async def ai_preset_callback(update: Update, context):
"""Обработка выбора пресета из инлайн-меню."""
user_id = update.effective_user.id
query = update.callback_query
await query.answer()
# Извлекаем название пресета из callback_data
preset = query.data.replace("ai_preset_", "")
if preset not in PRESET_DESCRIPTIONS:
await query.edit_message_text("❌ Неверный пресет")
return
state = state_manager.get(user_id)
old_preset = state.ai_preset
state.ai_preset = preset
# Обновляем ai_chat_mode и current_ai_provider для совместимости
if preset == AI_PRESET_OFF:
state.ai_chat_mode = False
state.current_ai_provider = "none"
else:
state.ai_chat_mode = True
# Для совместимости с существующим кодом
if preset == AI_PRESET_QWEN:
state.current_ai_provider = "qwen"
else: # Любой GigaChat
state.current_ai_provider = "gigachat"
preset_name = get_preset_display_name(preset)
output = f"✅ **Переключено на:** {preset_name}\n\n"
output += f"{PRESET_DESCRIPTIONS[preset]['description']}"
# Обновляем инлайн-меню
keyboard = [
[
InlineKeyboardButton(
f"{'' if preset == AI_PRESET_OFF else ''} {PRESET_DESCRIPTIONS[AI_PRESET_OFF]['icon']} ИИ Отключен",
callback_data=f"ai_preset_{AI_PRESET_OFF}"
)
],
[
InlineKeyboardButton(
f"{'' if preset == AI_PRESET_QWEN else ''} {PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['icon']} Qwen Code",
callback_data=f"ai_preset_{AI_PRESET_QWEN}"
)
],
[
InlineKeyboardButton(
f"{'' if preset == AI_PRESET_GIGA_AUTO else ''} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['icon']} GigaChat Авто",
callback_data=f"ai_preset_{AI_PRESET_GIGA_AUTO}"
)
],
[
InlineKeyboardButton(
f"{'' if preset == AI_PRESET_GIGA_LITE else ''} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['icon']} GigaChat Lite",
callback_data=f"ai_preset_{AI_PRESET_GIGA_LITE}"
),
InlineKeyboardButton(
f"{'' if preset == AI_PRESET_GIGA_PRO else ''} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['icon']} GigaChat Pro",
callback_data=f"ai_preset_{AI_PRESET_GIGA_PRO}"
)
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(output, parse_mode="Markdown", reply_markup=reply_markup)
logger.info(f"Пользователь {user_id} переключил AI-пресет: {old_preset}{preset}")
# Быстрые команды для переключения одним сообщением
async def ai_off_command(update: Update, context):
"""Быстрое переключение на ИИ отключен."""
await switch_preset(update, AI_PRESET_OFF)
async def ai_qwen_command(update: Update, context):
"""Быстрое переключение на Qwen Code."""
await switch_preset(update, AI_PRESET_QWEN)
async def ai_giga_auto_command(update: Update, context):
"""Быстрое переключение на GigaChat Авто."""
await switch_preset(update, AI_PRESET_GIGA_AUTO)
async def ai_giga_lite_command(update: Update, context):
"""Быстрое переключение на GigaChat Lite."""
await switch_preset(update, AI_PRESET_GIGA_LITE)
async def ai_giga_pro_command(update: Update, context):
"""Быстрое переключение на GigaChat Pro."""
await switch_preset(update, AI_PRESET_GIGA_PRO)
async def switch_preset(update: Update, preset: str):
"""Переключить пресет и показать уведомление."""
user_id = update.effective_user.id
state = state_manager.get(user_id)
old_preset = state.ai_preset
state.ai_preset = preset
# Обновляем совместимость
if preset == AI_PRESET_OFF:
state.ai_chat_mode = False
state.current_ai_provider = "none"
else:
state.ai_chat_mode = True
if preset == AI_PRESET_QWEN:
state.current_ai_provider = "qwen"
else:
state.current_ai_provider = "gigachat"
preset_name = get_preset_display_name(preset)
output = f"✅ **AI-пресет переключен**\n\n"
output += f"**Текущий:** {preset_name}\n"
output += f"_{PRESET_DESCRIPTIONS[preset]['description']}_\n\n"
if old_preset != preset:
output += f"~{get_preset_display_name(old_preset)}~ → ✅ {preset_name}"
await update.message.reply_text(output, parse_mode="Markdown")
logger.info(f"Пользователь {user_id} переключил AI-пресет: {old_preset}{preset}")
def register_ai_preset_handlers(dispatcher):
"""Зарегистрировать обработчики AI-пресетов."""
# Основное меню
dispatcher.add_handler(CommandHandler("ai_presets", ai_presets_command))
# Callback для инлайн-меню
dispatcher.add_handler(CallbackQueryHandler(ai_preset_callback, pattern="^ai_preset_"))
# Быстрые команды
dispatcher.add_handler(CommandHandler("ai_off", ai_off_command))
dispatcher.add_handler(CommandHandler("ai_qwen", ai_qwen_command))
dispatcher.add_handler(CommandHandler("ai_giga_auto", ai_giga_auto_command))
dispatcher.add_handler(CommandHandler("ai_giga_lite", ai_giga_lite_command))
dispatcher.add_handler(CommandHandler("ai_giga_pro", ai_giga_pro_command))
logger.info("Обработчики AI-пресетов зарегистрированы")

View File

@ -35,6 +35,38 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
reply_markup=menu_builder.get_keyboard("main", user_id=query.from_user.id, state=state)
)
elif callback == "ai_presets":
# Открываем меню AI-пресетов
state = state_manager.get(user_id)
from bot.handlers.ai_presets import ai_presets_command
# Создаём фейковое сообщение для совместимости
class FakeMessage:
async def reply_text(self, text, parse_mode=None, reply_markup=None):
# Вместо отправки сообщения редактируем callback
await query.edit_message_text(text, parse_mode=parse_mode, reply_markup=reply_markup)
return None
fake_update = type('FakeUpdate', (), {'message': FakeMessage(), 'effective_user': query.from_user})()
await ai_presets_command(fake_update, context)
return
elif callback.startswith("continue_output_"):
# Пользователь нажал "Продолжить"
remaining = int(callback.replace("continue_output_", ""))
state = state_manager.get(user_id)
state.waiting_for_output_control = False
state.continue_output = True
await query.answer(f"▶️ Продолжаем вывод (осталось {remaining} сообщений)")
return
elif callback == "cancel_output":
# Пользователь нажал "Отменить"
state = state_manager.get(user_id)
state.waiting_for_output_control = False
state.continue_output = False
await query.answer("❌ Вывод отменен")
return
elif callback == "preset_menu":
state.current_menu = "preset"
await query.edit_message_text(

View File

@ -101,6 +101,7 @@ def init_menus(menu_builder: MenuBuilder):
main_menu = [
MenuItem("🖥️ Выбор сервера", "server_menu", icon="🖥️"),
MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"),
MenuItem("🎛️ AI-пресеты", "ai_presets", icon="🎛️"),
MenuItem("💬 Чат с ИИ агентом", "toggle_ai_chat", icon="💬"),
MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"),
MenuItem(" О боте", "about", icon=""),

View File

@ -5,6 +5,14 @@ from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
# Пресеты AI-провайдеров
AI_PRESET_OFF = "off" # ИИ отключен, режим CLI команд
AI_PRESET_QWEN = "qwen" # Qwen Code (бесплатно, локально)
AI_PRESET_GIGA_AUTO = "giga_auto" # GigaChat авто-переключение (Lite/Pro)
AI_PRESET_GIGA_LITE = "giga_lite" # GigaChat Lite (дешевле)
AI_PRESET_GIGA_PRO = "giga_pro" # GigaChat Pro (максимальное качество)
@dataclass
class UserState:
"""Состояние пользователя в диалоге."""
@ -19,7 +27,14 @@ class UserState:
ai_chat_mode: bool = True # Режим чата с ИИ агентом (включен по умолчанию)
ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ
messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов
current_ai_provider: str = "qwen" # Текущий AI-провайдер: "qwen" или "gigachat"
ai_preset: str = AI_PRESET_QWEN # Текущий AI-пресет
current_ai_provider: str = "qwen" # Текущий AI-провайдер (для совместимости)
# Для управления длинным выводом
waiting_for_output_control: bool = False # Ожидание решения пользователя
output_remaining: int = 0 # Сколько сообщений осталось
output_wait_message = None # Сообщение с кнопками
continue_output: bool = True # Решение пользователя
class StateManager:

View File

@ -4,12 +4,14 @@ GigaChat AI Provider - адаптер GigaChat для работы с инстр
Реализует интерфейс BaseAIProvider для единой работы с инструментами
независимо от AI-провайдера.
Использует нативный GigaChat Function Calling API:
https://developers.sber.ru/docs/ru/gigachat/guides/functions/overview
"""
import logging
from typing import Optional, Dict, Any, Callable, List
import json
import re
from bot.base_ai_provider import (
BaseAIProvider,
@ -25,15 +27,16 @@ logger = logging.getLogger(__name__)
class GigaChatProvider(BaseAIProvider):
"""
GigaChat AI Provider с поддержкой инструментов.
Использует эвристики для извлечения вызовов инструментов из текста,
так как GigaChat не поддерживает нативные tool calls.
GigaChat AI Provider с нативной поддержкой function calling.
Использует официальный GigaChat Function Calling API вместо
эмуляции через текстовые блоки.
"""
def __init__(self, config: Optional[GigaChatConfig] = None):
self._tool = GigaChatTool(config)
self._available: Optional[bool] = None
self._functions_state_id: Optional[str] = None
@property
def provider_name(self) -> str:
@ -41,8 +44,7 @@ class GigaChatProvider(BaseAIProvider):
@property
def supports_tools(self) -> bool:
# GigaChat не поддерживает нативные tool calls
# Но мы эмулируем через парсинг текста
# GigaChat поддерживает нативные function calls
return True
@property
@ -53,15 +55,14 @@ class GigaChatProvider(BaseAIProvider):
"""Проверить доступность GigaChat."""
if self._available is not None:
return self._available
# Проверяем наличие токенов
try:
import os
client_id = os.getenv("GIGACHAT_CLIENT_ID")
client_secret = os.getenv("GIGACHAT_CLIENT_SECRET")
self._available = bool(client_id and client_secret)
if not self._available:
logger.warning("GigaChat недоступен: не настроены GIGACHAT_CLIENT_ID или GIGACHAT_CLIENT_SECRET")
else:
@ -69,106 +70,354 @@ class GigaChatProvider(BaseAIProvider):
except Exception as e:
self._available = False
logger.error(f"Ошибка проверки доступности GigaChat: {e}")
return self._available
def get_tools_schema(self, tools_registry: Dict[str, Any]) -> List[Dict[str, Any]]:
def get_error(self) -> Optional[str]:
"""Получить последнюю ошибку."""
if self._available is False:
return "GigaChat недоступен: проверьте GIGACHAT_CLIENT_ID и GIGACHAT_CLIENT_SECRET"
return None
def get_functions_schema(self, tools_registry: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Получить схему инструментов для промпта.
Формирует описание инструментов в формате понятном GigaChat.
Получить схему функций для GigaChat API в правильном формате.
Формат GigaChat:
{
"name": "function_name",
"description": "Описание функции",
"parameters": {
"type": "object",
"properties": {...},
"required": [...]
},
"return_parameters": {...} # опционально
}
"""
schema = []
for name, tool in tools_registry.items():
if tools_registry is None:
return schema
# Обрабатываем разные типы tools_registry
items = []
if hasattr(tools_registry, 'get_all') and callable(getattr(tools_registry, 'get_all')):
items = list(tools_registry.get_all().items())
elif isinstance(tools_registry, dict):
items = list(tools_registry.items())
elif hasattr(tools_registry, 'tools'):
items = list(tools_registry.tools.items()) if isinstance(tools_registry.tools, dict) else []
for name, tool in items:
if hasattr(tool, 'get_schema'):
tool_schema = tool.get_schema()
schema.append({
# Преобразуем в формат GigaChat с гарантией наличия properties
parameters = tool_schema.get("parameters", {})
if not parameters:
parameters = {"type": "object", "properties": {}}
elif "properties" not in parameters:
parameters["properties"] = {}
giga_schema = {
"name": name,
"description": tool_schema.get("description", ""),
"parameters": tool_schema.get("parameters", {})
})
"parameters": parameters
}
# Добавляем return_parameters если есть
if hasattr(tool, 'get_return_schema'):
giga_schema["return_parameters"] = tool.get_return_schema()
schema.append(giga_schema)
elif hasattr(tool, 'description'):
schema.append({
"name": name,
"description": tool.description,
"parameters": getattr(tool, 'parameters', {})
"parameters": {"type": "object", "properties": {}} # Пустая но валидная схема
})
logger.info(f"📋 GigaChat functions schema: {[f['name'] for f in schema]}")
return schema
def _build_tools_prompt(self, tools_schema: List[Dict[str, Any]]) -> str:
def _parse_function_call(self, function_call: Dict[str, Any]) -> ToolCall:
"""
Построить текстовое описание инструментов для промпта.
GigaChat не поддерживает нативные tool calls, поэтому описываем
инструменты в тексте и просим модель использовать специальный формат.
"""
if not tools_schema:
return ""
prompt_parts = [
"\n\n🛠️ ДОСТУПНЫЕ ИНСТРУМЕНТЫ:",
"Ты можешь использовать следующие инструменты. Для вызова инструмента используй формат:",
"```tool",
'{"name": "имя_инструмента", "arguments": {аргументы}}',
'```',
"",
"Список инструментов:"
]
for tool in tools_schema:
name = tool.get("name", "unknown")
desc = tool.get("description", "Нет описания")
params = tool.get("parameters", {})
prompt_parts.append(f"\n**{name}**")
prompt_parts.append(f"Описание: {desc}")
if params:
prompt_parts.append(f"Параметры: {json.dumps(params, ensure_ascii=False)}")
prompt_parts.extend([
"",
"После вызова инструмента ты получишь результат и сможешь продолжить ответ."
])
return "\n".join(prompt_parts)
Преобразовать function_call из ответа GigaChat в ToolCall.
def _parse_tool_calls(self, content: str) -> List[ToolCall]:
GigaChat возвращает:
{
"name": "function_name",
"arguments": {"arg1": "value1", ...}
}
"""
Извлечь вызовы инструментов из текста ответа.
Ищет блоки вида:
```tool
{"name": "ssh_tool", "arguments": {"command": "df -h"}}
```
"""
tool_calls = []
# Ищем блоки ```tool {...}```
pattern = r'```tool\s*\n({.*?})\s*\n```'
matches = re.findall(pattern, content, re.DOTALL)
for match in matches:
try:
tool_data = json.loads(match)
tool_name = tool_data.get("name")
tool_args = tool_data.get("arguments", {})
if tool_name:
tool_calls.append(ToolCall(
tool_name=tool_name,
tool_args=tool_args,
tool_call_id=f"gc_{len(tool_calls)}"
))
except json.JSONDecodeError as e:
logger.warning(f"Ошибка парсинга tool call: {e}")
return tool_calls
try:
# Аргументы могут быть строкой JSON или уже dict
args = function_call.get("arguments", {})
if isinstance(args, str):
args = json.loads(args)
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Ошибка парсинга аргументов function_call: {e}")
args = {}
def _remove_tool_blocks(self, content: str) -> str:
"""Удалить блоки вызовов инструментов из текста."""
pattern = r'```tool\s*\n\{.*?\}\s*\n```'
return re.sub(pattern, '', content, flags=re.DOTALL).strip()
return ToolCall(
tool_name=function_call.get("name", "unknown"),
tool_args=args,
tool_call_id=function_call.get("name", "fc_0") # Используем name как ID
)
async def process_with_tools(
self,
prompt: str,
system_prompt: Optional[str] = None,
context: Optional[List[Dict[str, str]]] = None,
tools_registry: Optional[Dict[str, Any]] = None,
on_chunk: Optional[Callable[[str], Any]] = None,
max_iterations: int = 5,
**kwargs
) -> ProviderResponse:
"""
Обработка запросов с function calling для GigaChat.
Использует нативный GigaChat Function Calling API:
1. Отправляем запрос с functions массивом
2. Получаем function_call из ответа
3. Выполняем инструмент
4. Отправляем результат с role: "function"
5. Повторяем пока не будет финального ответа
Формат сообщений:
- user: {"role": "user", "content": "..."}
- assistant: {"role": "assistant", "function_call": {...}}
- function: {"role": "function", "name": "...", "content": "..."}
"""
if not tools_registry:
return await self.chat(
prompt=prompt,
system_prompt=system_prompt,
context=context,
on_chunk=on_chunk,
**kwargs
)
# Формируем базовые сообщения
messages = []
# Добавляем системный промпт если есть
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# Добавляем контекст (историю диалога)
if context:
for msg in context:
role = msg.get("role")
# Пропускаем system messages — они уже добавлены
if role == "system":
continue
# Преобразуем tool messages в function messages
if role == "tool":
role = "function"
if role in ("user", "assistant", "function"):
messages.append({
"role": role,
"content": msg.get("content", ""),
"name": msg.get("name") # Для function messages
})
# Добавляем текущий запрос пользователя
if prompt:
messages.append({"role": "user", "content": prompt})
# Получаем схему функций
functions = self.get_functions_schema(tools_registry) if self.supports_tools else None
logger.info(f"🔍 GigaChat process_with_tools: {len(messages)} сообщений, {len(functions) if functions else 0} функций")
for iteration in range(max_iterations):
logger.info(f"🔄 Итерация {iteration + 1}/{max_iterations}")
# Логируем сообщения перед отправкой
for i, msg in enumerate(messages[-3:]): # Последние 3 сообщения
content_preview = msg.get("content", "")[:100]
logger.info(f" 📨 [{i}] role={msg.get('role')}, content='{content_preview}...'")
# Отправляем запрос с functions
response = await self._chat_with_functions(
messages=messages,
functions=functions,
user_id=kwargs.get('user_id'),
temperature=kwargs.get("temperature", 0.7),
max_tokens=kwargs.get("max_tokens", 2000),
)
if not response.get("success"):
return ProviderResponse(
success=False,
error=response.get("error", "Неизвестная ошибка"),
provider_name=self.provider_name
)
# Проверяем наличие function_call
function_call = response.get("function_call")
content = response.get("content", "")
logger.info(f"📬 Ответ GigaChat: content_len={len(content) if content else 0}, function_call={function_call is not None}")
# Если нет function_call — возвращаем финальный ответ
if not function_call:
return ProviderResponse(
success=True,
message=AIMessage(
content=content,
tool_calls=[],
metadata={
"model": response.get("model", "GigaChat"),
"usage": response.get("usage", {}),
"functions_state_id": response.get("functions_state_id")
}
),
provider_name=self.provider_name,
usage=response.get("usage")
)
# Есть function_call — парсим и выполняем инструмент
tool_call = self._parse_function_call(function_call)
logger.info(f"🛠️ Function call: {tool_call.tool_name}({tool_call.tool_args})")
# Выполняем инструмент
if hasattr(tools_registry, 'get'):
tool = tools_registry.get(tool_call.tool_name)
elif isinstance(tools_registry, dict):
tool = tools_registry.get(tool_call.tool_name)
else:
tool = None
if tool is not None:
try:
if hasattr(tool, 'execute'):
result = await tool.execute(
**tool_call.tool_args,
user_id=kwargs.get('user_id')
)
elif hasattr(tool, '__call__'):
result = await tool(**tool_call.tool_args)
else:
result = f"Инструмент {tool_call.tool_name} не имеет метода execute"
tool_call.result = result
tool_call.status = ToolCallStatus.SUCCESS
except Exception as e:
logger.exception(f"Ошибка выполнения инструмента {tool_call.tool_name}: {e}")
tool_call.error = str(e)
tool_call.status = ToolCallStatus.ERROR
result = {"error": str(e)}
else:
tool_call.error = f"Инструмент {tool_call.tool_name} не найден"
tool_call.status = ToolCallStatus.ERROR
result = {"error": tool_call.error}
# Сериализуем результат
if hasattr(result, 'to_dict'):
result_dict = result.to_dict()
elif isinstance(result, dict):
result_dict = result
else:
result_dict = {"result": str(result)}
result_json = json.dumps(result_dict, ensure_ascii=False)
# Добавляем assistant message с function_call
messages.append({
"role": "assistant",
"content": "", # Пустой content при function_call
"function_call": function_call
})
# Добавляем function message с результатом
messages.append({
"role": "function",
"name": tool_call.tool_name,
"content": result_json
})
logger.info(f"✅ Добавлен function result: {tool_call.tool_name}, result_len={len(result_json)}")
# Сохраняем functions_state_id для следующей итерации
if response.get("functions_state_id"):
self._functions_state_id = response["functions_state_id"]
# Достигли максимума итераций
return ProviderResponse(
success=True,
message=AIMessage(
content=content + "\n\n[Достигнут максимум итераций выполнения функций]",
metadata={"iterations": max_iterations}
),
provider_name=self.provider_name,
usage=response.get("usage")
)
async def _chat_with_functions(
self,
messages: List[Dict[str, Any]],
functions: Optional[List[Dict[str, Any]]] = None,
user_id: Optional[int] = None,
temperature: float = 0.7,
max_tokens: int = 2000,
) -> Dict[str, Any]:
"""
Отправить запрос в GigaChat API с поддержкой function calling.
Возвращает:
{
"success": bool,
"content": str,
"function_call": {"name": str, "arguments": dict} или None,
"model": str,
"usage": dict,
"functions_state_id": str или None
}
"""
try:
# Формируем сообщения в формате GigaChat
gc_messages = []
for msg in messages:
gc_msg = {"role": msg["role"], "content": msg.get("content", "")}
if msg.get("name"):
gc_msg["name"] = msg["name"]
if msg.get("function_call"):
gc_msg["function_call"] = msg["function_call"]
gc_messages.append(gc_msg)
# Выполняем запрос через GigaChatTool
result = await self._tool.chat_with_functions(
messages=gc_messages,
functions=functions,
user_id=str(user_id) if user_id else None,
temperature=temperature,
max_tokens=max_tokens,
)
# Извлекаем function_call из ответа
function_call = None
if result.get("choices"):
choice = result["choices"][0]
message = choice.get("message", {})
function_call = message.get("function_call")
return {
"success": True,
"content": result.get("content", ""),
"function_call": function_call,
"model": result.get("model", "GigaChat"),
"usage": result.get("usage", {}),
"functions_state_id": result.get("functions_state_id")
}
except Exception as e:
logger.exception(f"Ошибка _chat_with_functions: {e}")
return {
"success": False,
"error": str(e),
"function_call": None
}
async def chat(
self,
@ -181,44 +430,29 @@ class GigaChatProvider(BaseAIProvider):
**kwargs
) -> ProviderResponse:
"""
Отправить запрос GigaChat.
Args:
prompt: Запрос пользователя
system_prompt: Системный промпт
context: История диалога
tools: Доступные инструменты (схема)
on_chunk: Callback для потокового вывода (не используется)
user_id: ID пользователя
**kwargs: Дополнительные параметры
Returns:
ProviderResponse с ответом и возможными вызовами инструментов
Отправить запрос GigaChat (без function calling).
Используется когда tools не переданы.
"""
try:
# Формируем системный промпт с инструментами
full_system_prompt = system_prompt or ""
if tools:
tools_prompt = self._build_tools_prompt(tools)
full_system_prompt += tools_prompt
# Формируем сообщения
messages = []
if full_system_prompt:
messages.append(GigaChatMessage(role="system", content=full_system_prompt))
if system_prompt:
messages.append(GigaChatMessage(role="system", content=system_prompt))
if context:
for msg in context:
role = msg.get("role", "user")
content = msg.get("content", "")
if role in ("user", "assistant", "system"):
if role == "system":
continue
if role in ("user", "assistant"):
messages.append(GigaChatMessage(role=role, content=content))
if prompt:
messages.append(GigaChatMessage(role="user", content=prompt))
# Выполняем запрос
result = await self._tool.chat(
messages=messages,
@ -226,7 +460,7 @@ class GigaChatProvider(BaseAIProvider):
temperature=kwargs.get("temperature", 0.7),
max_tokens=kwargs.get("max_tokens", 2000),
)
if not result.get("content"):
if result.get("error"):
return ProviderResponse(
@ -240,20 +474,14 @@ class GigaChatProvider(BaseAIProvider):
error="Пустой ответ от GigaChat",
provider_name=self.provider_name
)
content = result["content"]
# Парсим вызовы инструментов
tool_calls = self._parse_tool_calls(content)
# Очищаем контент от блоков инструментов
clean_content = self._remove_tool_blocks(content)
return ProviderResponse(
success=True,
message=AIMessage(
content=clean_content,
tool_calls=tool_calls,
content=content,
tool_calls=[],
metadata={
"model": result.get("model", "GigaChat"),
"usage": result.get("usage", {})
@ -262,7 +490,7 @@ class GigaChatProvider(BaseAIProvider):
provider_name=self.provider_name,
usage=result.get("usage")
)
except Exception as e:
logger.error(f"Ошибка GigaChat провайдера: {e}")
return ProviderResponse(
@ -280,9 +508,8 @@ class GigaChatProvider(BaseAIProvider):
) -> ToolCall:
"""
Выполнить инструмент (заглушка).
GigaChat не выполняет инструменты напрямую - это делает
AIProviderManager через process_with_tools.
Инструменты выполняются через process_with_tools.
"""
return ToolCall(
tool_name=tool_name,

View File

@ -124,4 +124,4 @@ def register_tool(tool_class: type) -> type:
# Авто-импорт инструментов для регистрации
# Импортируем после определения register_tool чтобы декоратор сработал
from bot.tools import ddgs_tool, rss_tool, ssh_tool, cron_tool, gigachat_tool
from bot.tools import ddgs_tool, rss_tool, ssh_tool, cron_tool, gigachat_tool, file_system_tool

View File

@ -0,0 +1,803 @@
#!/usr/bin/env python3
"""
File System Tool - инструмент для работы с файловой системой Linux.
Позволяет AI-агенту выполнять операции с файлами и директориями:
- Чтение файлов (cat)
- Запись файлов
- Копирование (cp)
- Перемещение (mv)
- Удаление (rm)
- Создание директорий (mkdir)
- Список файлов (ls)
- Проверка существования
- Поиск файлов
Инструмент работает от имени пользователя на локальной машине.
"""
import logging
import os
import shutil
import subprocess
import asyncio
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field
from bot.tools import BaseTool, ToolResult, register_tool
logger = logging.getLogger(__name__)
class FileSystemTool(BaseTool):
"""Инструмент для работы с файловой системой."""
name = "file_system_tool"
description = "Работа с файловой системой Linux: чтение/запись файлов, копирование, перемещение, удаление, создание директорий, просмотр списка файлов."
category = "system"
# Безопасные пути - где можно работать
ALLOWED_BASE_PATHS = [
Path.home(), # Домашняя директория
Path("/tmp"),
Path("/var/tmp"),
]
# Опасные пути - куда нельзя записывать/удалять
DANGEROUS_PATHS = [
Path("/"),
Path("/etc"),
Path("/usr"),
Path("/bin"),
Path("/sbin"),
Path("/boot"),
Path("/dev"),
Path("/proc"),
Path("/sys"),
]
def __init__(self):
self._last_operation: Optional[str] = None
self._operation_history: List[Dict] = []
def _is_path_safe(self, path: Path, allow_write: bool = True) -> tuple[bool, str]:
"""
Проверить безопасность пути.
Args:
path: Путь для проверки
allow_write: Если True, проверяем возможность записи
Returns:
(is_safe: bool, reason: str)
"""
try:
# Разрешаем абсолютные и относительные пути
if not path.is_absolute():
path = Path.cwd() / path
# Сначала проверяем на наличие в разрешённых путях (это важно!)
for allowed in self.ALLOWED_BASE_PATHS:
try:
path.relative_to(allowed)
return True, "Путь безопасен"
except ValueError:
pass
# Если путь не в разрешённых - проверяем на опасные
for dangerous in self.DANGEROUS_PATHS:
# Пропускаем корень если путь не в разрешённых уже
if dangerous == Path("/"):
continue
try:
path.relative_to(dangerous)
return False, f"Путь {path} находится в защищённой директории {dangerous}"
except ValueError:
pass
# Если путь не в разрешённых и не в запрещённых - разрешаем с предупреждением
return True, f"Путь {path} может быть недоступен"
except Exception as e:
return False, f"Ошибка проверки пути: {e}"
def _resolve_path(self, path_str: str) -> Path:
"""Преобразовать строку пути в Path объект."""
path = Path(path_str)
# Расширяем ~ в домашнюю директорию
# Важно: Path("~/file") не работает, нужно expanduser()
if path_str.startswith('~'):
path = Path(path_str).expanduser()
elif not path.is_absolute():
# Если путь относительный, делаем его абсолютным от домашней директории
path = Path.home() / path_str
return path
async def read_file(self, path: str, limit: int = 100) -> Dict[str, Any]:
"""
Прочитать файл.
Args:
path: Путь к файлу
limit: Максимальное количество строк для чтения
Returns:
Dict с content, lines, error
"""
try:
file_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(file_path, allow_write=False)
if not is_safe:
return {"error": reason, "success": False}
if not file_path.exists():
return {"error": f"Файл не существует: {file_path}", "success": False}
if not file_path.is_file():
return {"error": f"Не файл: {file_path}", "success": False}
# Читаем файл
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
# Ограничиваем количество строк
if len(lines) > limit:
content = ''.join(lines[:limit])
truncated = True
total_lines = len(lines)
else:
content = ''.join(lines)
truncated = False
total_lines = len(lines)
logger.info(f"Прочитан файл: {file_path} ({total_lines} строк)")
return {
"success": True,
"content": content,
"path": str(file_path),
"lines_read": min(len(lines), limit),
"total_lines": total_lines,
"truncated": truncated
}
except Exception as e:
logger.error(f"Ошибка чтения файла {path}: {e}")
return {"error": str(e), "success": False}
async def write_file(self, path: str, content: str, append: bool = False) -> Dict[str, Any]:
"""
Записать в файл.
Args:
path: Путь к файлу
content: Содержимое для записи
append: Если True, добавить в конец файла
Returns:
Dict с success, path, bytes_written
"""
try:
file_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(file_path, allow_write=True)
if not is_safe:
return {"error": reason, "success": False}
# Создаём родительские директории если нужно
file_path.parent.mkdir(parents=True, exist_ok=True)
# Записываем файл
mode = 'a' if append else 'w'
with open(file_path, mode, encoding='utf-8') as f:
bytes_written = f.write(content)
logger.info(f"Записан файл: {file_path} ({bytes_written} байт)")
return {
"success": True,
"path": str(file_path),
"bytes_written": bytes_written,
"appended": append
}
except Exception as e:
logger.error(f"Ошибка записи файла {path}: {e}")
return {"error": str(e), "success": False}
async def list_directory(self, path: str = ".", show_hidden: bool = False) -> Dict[str, Any]:
"""
Показать список файлов в директории.
Args:
path: Путь к директории
show_hidden: Показывать скрытые файлы
Returns:
Dict с files, directories, total
"""
try:
dir_path = self._resolve_path(path)
is_safe, _ = self._is_path_safe(dir_path, allow_write=False)
if not is_safe:
return {"error": "Доступ к директории ограничен", "success": False}
if not dir_path.exists():
return {"error": f"Директория не существует: {dir_path}", "success": False}
if not dir_path.is_dir():
return {"error": f"Не директория: {dir_path}", "success": False}
files = []
directories = []
for item in dir_path.iterdir():
if not show_hidden and item.name.startswith('.'):
continue
try:
stat = item.stat()
size = stat.st_size
mtime = stat.st_mtime
except:
size = 0
mtime = 0
item_info = {
"name": item.name,
"path": str(item),
"size": size,
"modified": mtime
}
if item.is_file():
files.append(item_info)
elif item.is_dir():
directories.append(item_info)
# Сортируем по имени
files.sort(key=lambda x: x["name"])
directories.sort(key=lambda x: x["name"])
return {
"success": True,
"path": str(dir_path),
"files": files,
"directories": directories,
"total_files": len(files),
"total_dirs": len(directories)
}
except Exception as e:
logger.error(f"Ошибка списка директории {path}: {e}")
return {"error": str(e), "success": False}
async def copy_file(self, source: str, destination: str) -> Dict[str, Any]:
"""
Скопировать файл или директорию.
Args:
source: Исходный путь
destination: Целевой путь
Returns:
Dict с success, source, destination
"""
try:
src_path = self._resolve_path(source)
dst_path = self._resolve_path(destination)
# Проверка безопасности
is_safe_src, reason = self._is_path_safe(src_path, allow_write=False)
if not is_safe_src:
return {"error": f"Источник: {reason}", "success": False}
is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True)
if not is_safe_dst:
return {"error": f"Назначение: {reason}", "success": False}
if not src_path.exists():
return {"error": f"Источник не существует: {src_path}", "success": False}
# Копируем
if src_path.is_file():
shutil.copy2(src_path, dst_path)
else:
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
logger.info(f"Скопировано: {src_path} -> {dst_path}")
return {
"success": True,
"source": str(src_path),
"destination": str(dst_path),
"operation": "copy"
}
except Exception as e:
logger.error(f"Ошибка копирования {source} -> {destination}: {e}")
return {"error": str(e), "success": False}
async def move_file(self, source: str, destination: str) -> Dict[str, Any]:
"""
Переместить файл или директорию.
Args:
source: Исходный путь
destination: Целевой путь
Returns:
Dict с success, source, destination
"""
try:
src_path = self._resolve_path(source)
dst_path = self._resolve_path(destination)
# Проверка безопасности
is_safe_src, reason = self._is_path_safe(src_path, allow_write=False)
if not is_safe_src:
return {"error": f"Источник: {reason}", "success": False}
is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True)
if not is_safe_dst:
return {"error": f"Назначение: {reason}", "success": False}
if not src_path.exists():
return {"error": f"Источник не существует: {src_path}", "success": False}
# Перемещаем
shutil.move(src_path, dst_path)
logger.info(f"Перемещено: {src_path} -> {dst_path}")
return {
"success": True,
"source": str(src_path),
"destination": str(dst_path),
"operation": "move"
}
except Exception as e:
logger.error(f"Ошибка перемещения {source} -> {destination}: {e}")
return {"error": str(e), "success": False}
async def delete(self, path: str, recursive: bool = False) -> Dict[str, Any]:
"""
Удалить файл или директорию.
Args:
path: Путь к файлу/директории
recursive: Если True, удалять рекурсивно
Returns:
Dict с success, path, deleted_count
"""
try:
file_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(file_path, allow_write=True)
if not is_safe:
return {"error": reason, "success": False}
if not file_path.exists():
return {"error": f"Путь не существует: {file_path}", "success": False}
deleted_count = 0
if file_path.is_file():
file_path.unlink()
deleted_count = 1
elif file_path.is_dir():
if recursive:
shutil.rmtree(file_path)
# Считаем количество удалённых файлов
deleted_count = -1 # Неизвестно
else:
return {
"error": "Директория не пуста. Используйте recursive=True для рекурсивного удаления",
"success": False
}
logger.info(f"Удалено: {file_path}")
return {
"success": True,
"path": str(file_path),
"deleted_count": deleted_count,
"operation": "delete"
}
except Exception as e:
logger.error(f"Ошибка удаления {path}: {e}")
return {"error": str(e), "success": False}
async def create_directory(self, path: str, parents: bool = True) -> Dict[str, Any]:
"""
Создать директорию.
Args:
path: Путь к директории
parents: Если True, создавать родительские директории
Returns:
Dict с success, path
"""
try:
dir_path = self._resolve_path(path)
# Проверка безопасности
is_safe, reason = self._is_path_safe(dir_path, allow_write=True)
if not is_safe:
return {"error": reason, "success": False}
if dir_path.exists():
if dir_path.is_dir():
return {
"success": True,
"path": str(dir_path),
"already_exists": True
}
else:
return {"error": f"Существует файл с таким именем: {dir_path}", "success": False}
# Создаём директорию
dir_path.mkdir(parents=parents, exist_ok=parents)
logger.info(f"Создана директория: {dir_path}")
return {
"success": True,
"path": str(dir_path),
"operation": "mkdir"
}
except Exception as e:
logger.error(f"Ошибка создания директории {path}: {e}")
return {"error": str(e), "success": False}
async def file_info(self, path: str) -> Dict[str, Any]:
"""
Получить информацию о файле/директории.
Args:
path: Путь к файлу
Returns:
Dict с информацией о файле
"""
try:
file_path = self._resolve_path(path)
is_safe, _ = self._is_path_safe(file_path, allow_write=False)
if not is_safe:
return {"error": "Доступ ограничен", "success": False}
if not file_path.exists():
return {"error": f"Путь не существует: {file_path}", "success": False}
stat = file_path.stat()
return {
"success": True,
"path": str(file_path),
"name": file_path.name,
"is_file": file_path.is_file(),
"is_dir": file_path.is_dir(),
"size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"permissions": oct(stat.st_mode)[-3:]
}
except Exception as e:
logger.error(f"Ошибка получения информации о {path}: {e}")
return {"error": str(e), "success": False}
async def search_files(
self,
path: str = ".",
pattern: str = "*",
max_results: int = 50
) -> Dict[str, Any]:
"""
Найти файлы по паттерну.
Args:
path: Директория для поиска
pattern: Паттерн (glob-style)
max_results: Максимум результатов
Returns:
Dict с найденными файлами
"""
try:
base_path = self._resolve_path(path)
is_safe, _ = self._is_path_safe(base_path, allow_write=False)
if not is_safe:
return {"error": "Доступ ограничен", "success": False}
results = []
# Используем glob для поиска
import glob
matches = glob.glob(str(base_path / pattern), recursive=True)
for match in matches[:max_results]:
match_path = Path(match)
try:
stat = match_path.stat()
results.append({
"path": str(match_path),
"name": match_path.name,
"size": stat.st_size,
"is_file": match_path.is_file(),
"is_dir": match_path.is_dir()
})
except:
pass
return {
"success": True,
"pattern": pattern,
"base_path": str(base_path),
"found": len(results),
"results": results,
"truncated": len(matches) > max_results
}
except Exception as e:
logger.error(f"Ошибка поиска файлов {pattern} в {path}: {e}")
return {"error": str(e), "success": False}
async def execute_shell(self, command: str, timeout: int = 30) -> Dict[str, Any]:
"""
Выполнить shell-команду (для сложных операций).
Args:
command: Команда для выполнения
timeout: Таймаут в секундах
Returns:
Dict с stdout, stderr, returncode
"""
try:
# Разрешаем только безопасные команды
SAFE_COMMANDS = [
'ls', 'cat', 'cp', 'mv', 'rm', 'mkdir', 'rmdir',
'touch', 'chmod', 'chown', 'find', 'grep', 'head',
'tail', 'wc', 'sort', 'uniq', 'pwd', 'du', 'df'
]
# Извлекаем базовую команду
base_cmd = command.split()[0] if command.split() else ''
if base_cmd not in SAFE_COMMANDS:
return {
"error": f"Команда '{base_cmd}' не разрешена. Используйте безопасные команды: {SAFE_COMMANDS}",
"success": False
}
# Выполняем команду
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(Path.home())
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
process.kill()
return {
"error": f"Таймаут выполнения команды ({timeout} сек)",
"success": False
}
return {
"success": process.returncode == 0,
"stdout": stdout.decode('utf-8', errors='replace').strip(),
"stderr": stderr.decode('utf-8', errors='replace').strip(),
"returncode": process.returncode,
"command": command
}
except Exception as e:
logger.error(f"Ошибка выполнения команды {command}: {e}")
return {"error": str(e), "success": False}
async def execute(self, operation: str, **kwargs) -> ToolResult:
"""
Выполнить операцию с файловой системой.
Args:
operation: Тип операции (read, write, copy, move, delete, mkdir, list, info, search, shell)
**kwargs: Аргументы операции
Returns:
ToolResult с результатом
"""
logger.info(f"File System Tool: operation={operation}, args={kwargs}")
self._last_operation = operation
try:
result = None
if operation == 'read':
result = await self.read_file(
path=kwargs.get('path', ''),
limit=kwargs.get('limit', 100)
)
elif operation == 'write':
result = await self.write_file(
path=kwargs.get('path', ''),
content=kwargs.get('content', ''),
append=kwargs.get('append', False)
)
elif operation == 'copy':
result = await self.copy_file(
source=kwargs.get('source', ''),
destination=kwargs.get('destination', '')
)
elif operation == 'move':
result = await self.move_file(
source=kwargs.get('source', ''),
destination=kwargs.get('destination', '')
)
elif operation == 'delete':
result = await self.delete(
path=kwargs.get('path', ''),
recursive=kwargs.get('recursive', False)
)
elif operation == 'mkdir':
result = await self.create_directory(
path=kwargs.get('path', ''),
parents=kwargs.get('parents', True)
)
elif operation == 'list':
result = await self.list_directory(
path=kwargs.get('path', '.'),
show_hidden=kwargs.get('show_hidden', False)
)
elif operation == 'info':
result = await self.file_info(
path=kwargs.get('path', '')
)
elif operation == 'search':
result = await self.search_files(
path=kwargs.get('path', '.'),
pattern=kwargs.get('pattern', '*'),
max_results=kwargs.get('max_results', 50)
)
elif operation == 'shell':
result = await self.execute_shell(
command=kwargs.get('command', ''),
timeout=kwargs.get('timeout', 30)
)
else:
return ToolResult(
success=False,
error=f"Неизвестная операция: {operation}. Доступные: read, write, copy, move, delete, mkdir, list, info, search, shell"
)
# Сохраняем в историю
self._operation_history.append({
'operation': operation,
'args': kwargs,
'result': result,
'timestamp': __import__('datetime').datetime.now().isoformat()
})
# Ограничиваем историю
if len(self._operation_history) > 100:
self._operation_history = self._operation_history[-50:]
return ToolResult(
success=result.get('success', False),
data=result,
metadata={
'operation': operation,
'last_path': result.get('path', result.get('source', ''))
}
)
except Exception as e:
logger.exception(f"Ошибка File System Tool: {e}")
return ToolResult(
success=False,
error=str(e),
metadata={'operation': operation}
)
def get_schema(self) -> Dict[str, Any]:
"""Получить схему инструмента для промпта."""
return {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"description": "Тип операции",
"enum": ["read", "write", "copy", "move", "delete", "mkdir", "list", "info", "search", "shell"]
},
"path": {
"type": "string",
"description": "Путь к файлу/директории"
},
"source": {
"type": "string",
"description": "Исходный путь (для copy/move)"
},
"destination": {
"type": "string",
"description": "Целевой путь (для copy/move)"
},
"content": {
"type": "string",
"description": "Содержимое для записи"
},
"pattern": {
"type": "string",
"description": "Паттерн для поиска файлов"
},
"command": {
"type": "string",
"description": "Shell команда (для операции shell)"
},
"limit": {
"type": "integer",
"description": "Лимит строк для чтения"
},
"max_results": {
"type": "integer",
"description": "Максимум результатов поиска"
},
"recursive": {
"type": "boolean",
"description": "Рекурсивное удаление"
},
"show_hidden": {
"type": "boolean",
"description": "Показывать скрытые файлы"
},
"timeout": {
"type": "integer",
"description": "Таймаут для shell команд"
}
},
"required": ["operation"]
}
}
# Автоматическая регистрация при импорте
@register_tool
class FileSystemToolAuto(FileSystemTool):
"""Авто-регистрируемая версия FileSystemTool."""
pass

View File

@ -34,9 +34,14 @@ class GigaChatConfig:
client_secret: str
scope: str = "GIGACHAT_API_PERS"
auth_url: str = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
model: str = "GigaChat"
model: str = "GigaChat-Pro" # Модель по умолчанию
model_lite: str = "GigaChat" # Lite модель для простых запросов
model_pro: str = "GigaChat-Pro" # Pro модель для сложных запросов
api_url: str = "https://gigachat.devices.sberbank.ru/api/v1"
timeout: int = 60
# Пороги для переключения моделей
complexity_token_threshold: int = 50 # Если токенов в запросе > порога → Pro
complexity_keyword_threshold: int = 2 # Если ключевых слов сложности >= порога → Pro
class GigaChatTool:
@ -75,6 +80,10 @@ class GigaChatTool:
scope=os.getenv("GIGACHAT_SCOPE", "GIGACHAT_API_PERS"),
auth_url=os.getenv("GIGACHAT_AUTH_URL", "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"),
model=os.getenv("GIGACHAT_MODEL", "GigaChat-Pro"),
model_lite=os.getenv("GIGACHAT_MODEL_LITE", "GigaChat"),
model_pro=os.getenv("GIGACHAT_MODEL_PRO", "GigaChat-Pro"),
complexity_token_threshold=int(os.getenv("GIGACHAT_TOKEN_THRESHOLD", "50")),
complexity_keyword_threshold=int(os.getenv("GIGACHAT_KEYWORD_THRESHOLD", "2")),
)
def _get_auth_headers(self) -> Dict[str, str]:
@ -124,7 +133,121 @@ class GigaChatTool:
logger.info(f"GigaChat токен получен: {self._access_token[:50]}...")
return self._access_token
def _estimate_query_complexity(self, messages: List[GigaChatMessage]) -> dict:
"""
Оценить сложность запроса для выбора модели (Lite или Pro).
Критерии сложности:
1. Длина запроса (количество токенов/слов)
2. Наличие ключевых слов для сложных задач
3. Наличие инструментов (tool calls)
4. Технические термины
Returns:
Dict с оценкой сложности и рекомендуемой моделью
"""
# Собираем весь текст из сообщений пользователя
user_text = ""
for msg in messages:
if msg.role == "user":
user_text += " " + msg.content
user_text = user_text.lower()
# 1. Оценка по длине (считаем слова как грубая оценка токенов)
word_count = len(user_text.split())
token_estimate = word_count * 1.3 # Примерная конверсия слов в токены
# 2. Ключевые слова для сложных задач
complex_keywords = [
# Программирование и код
'код', 'функция', 'класс', 'метод', 'переменная', 'цикл', 'условие',
'алгоритм', 'структура данных', 'массив', 'словарь', 'список',
'импорт', 'экспорт', 'модуль', 'пакет', 'библиотека', 'фреймворк',
'дебаг', 'отладк', 'тест', 'юнит тест', 'интеграционн',
'рефактор', 'оптимиз', 'производительност',
# Анализ и работа с данными
'анализ', 'анализиров', 'сравни', 'сравнени', 'исследовани',
'закономерност', 'паттерн', 'тенденци', 'прогноз',
# Системные задачи
'конфигурац', 'настройк', 'деплой', 'развертывани', 'оркестрац',
'контейнер', 'docker', 'kubernetes', 'k8s', 'helm',
'мониторинг', 'логировани', 'трассировк', 'метрик',
# Сложные запросы
'объясни', 'расскажи подробно', 'детальн', 'подробн',
'почему', 'зачем', 'как работает', 'принцип работы',
'спроектируй', 'спроектировать', 'архитектур', 'архитектура',
'реализуй', 'реализовать', 'напиши код', 'создай функцию',
]
complexity_keywords_count = sum(
1 for keyword in complex_keywords
if keyword in user_text
)
# 3. Наличие технических терминов
tech_terms = [
'api', 'http', 'rest', 'graphql', 'grpc', 'websocket',
'sql', 'nosql', 'postgres', 'mysql', 'mongodb', 'redis',
'git', 'merge', 'commit', 'branch', 'pull request', 'merge request',
'ci/cd', 'pipeline', 'jenkins', 'gitlab', 'github',
'linux', 'bash', 'shell', 'terminal', 'ssh',
'python', 'javascript', 'typescript', 'java', 'go', 'rust', 'cpp',
'react', 'vue', 'angular', 'django', 'flask', 'fastapi', 'express',
]
tech_terms_count = sum(
1 for term in tech_terms
if term in user_text
)
# 4. Наличие инструментов в контексте
has_tools = any(
'tool' in msg.content.lower() or 'инструмент' in msg.content.lower()
for msg in messages
)
# Принятие решения
use_pro = False
reasons = []
# Если токенов больше порога → Pro
if token_estimate > self.config.complexity_token_threshold:
use_pro = True
reasons.append(f"длинный запрос ({word_count} слов, ~{int(token_estimate)} токенов)")
# Если много ключевых слов сложности → Pro
if complexity_keywords_count >= self.config.complexity_keyword_threshold:
use_pro = True
reasons.append(f"сложная задача ({complexity_keywords_count} ключевых слов)")
# Если есть технические термины + инструменты → Pro
if tech_terms_count >= 2 and has_tools:
use_pro = True
reasons.append(f"техническая задача с инструментами ({tech_terms_count} терминов)")
# Если есть явные запросы на работу с кодом/файлами → Pro
if any(phrase in user_text for phrase in [
'исходник', 'source code', 'посмотри код', 'проанализируй код',
'работай с файлом', 'прочитай файл', 'изучи код'
]):
use_pro = True
reasons.append("работа с кодом/файлами")
model = self.config.model_pro if use_pro else self.config.model_lite
return {
"use_pro": use_pro,
"model": model,
"word_count": word_count,
"token_estimate": int(token_estimate),
"complexity_keywords": complexity_keywords_count,
"tech_terms": tech_terms_count,
"has_tools": has_tools,
"reasons": reasons
}
async def chat(
self,
messages: Optional[List[GigaChatMessage]] = None,
@ -157,7 +280,7 @@ class GigaChatTool:
- finish_reason: Причина завершения
"""
token = await self._get_access_token()
# Формируем сообщения
if messages is None:
if use_history:
@ -168,22 +291,31 @@ class GigaChatTool:
# Добавляем новые сообщения к истории
self._chat_history.extend(messages)
messages = self._chat_history.copy()
# Автоматически выбираем модель на основе сложности запроса
# Если модель явно не указана
selected_model = model
model_info = None
if selected_model is None:
model_info = self._estimate_query_complexity(messages)
selected_model = model_info["model"]
logger.info(f"📊 GigaChat выбор модели: {selected_model} (причины: {', '.join(model_info['reasons']) if model_info['reasons'] else 'простой запрос'})")
# Преобразуем сообщения в формат API
api_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
payload = {
"model": model or self.config.model,
"model": selected_model,
"messages": api_messages,
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
"repetition_penalty": repetition_penalty,
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
@ -193,7 +325,7 @@ class GigaChatTool:
# Логируем запрос для отладки
logger.debug(f"GigaChat API URL: {self.config.api_url}/chat/completions")
logger.debug(f"GigaChat headers: {headers}")
logger.debug(f"GigaChat payload: model={model or self.config.model}, messages={len(api_messages)}, max_tokens={max_tokens}")
logger.debug(f"GigaChat payload: model={selected_model}, messages={len(api_messages)}, max_tokens={max_tokens}")
# GigaChat использует самоподписанные сертификаты - отключаем верификацию
async with httpx.AsyncClient(verify=False) as client:
@ -237,9 +369,10 @@ class GigaChatTool:
return {
"content": data["choices"][0]["message"]["content"] if data.get("choices") else "",
"model": data.get("model", self.config.model),
"model": data.get("model", selected_model),
"usage": data.get("usage", {}),
"finish_reason": data["choices"][0]["finish_reason"] if data.get("choices") else "",
"complexity_info": model_info, # Информация о выборе модели для отладки
}
def clear_history(self):
@ -258,6 +391,128 @@ class GigaChatTool:
]
# Добавляем новый в начало
self._chat_history.insert(0, GigaChatMessage(role="system", content=prompt))
async def chat_with_functions(
self,
messages: List[Dict[str, Any]],
functions: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 2000,
top_p: float = 0.1,
repetition_penalty: float = 1.0,
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Отправка запроса к GigaChat API с поддержкой function calling.
Args:
messages: Список сообщений в формате API
functions: Массив функций для вызова
model: Модель для генерации
temperature: Температура генерации
max_tokens: Максимум токенов
top_p: Параметр top-p sampling
repetition_penalty: Штраф за повторения
user_id: ID пользователя
Returns:
Dict с ответом API включая возможный function_call
"""
token = await self._get_access_token()
# Выбираем модель на основе сложности запроса
selected_model = model
model_info = None
if selected_model is None:
# Преобразуем messages в формат GigaChatMessage для оценки сложности
gc_messages = [GigaChatMessage(role=msg["role"], content=msg.get("content", "")) for msg in messages]
model_info = self._estimate_query_complexity(gc_messages)
selected_model = model_info["model"]
logger.info(f"📊 GigaChat выбор модели: {selected_model} (причины: {', '.join(model_info['reasons']) if model_info['reasons'] else 'простой запрос'})")
# Формируем payload
payload = {
"model": selected_model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
"repetition_penalty": repetition_penalty,
}
# Добавляем functions если есть
if functions:
payload["functions"] = functions
# function_call: "auto" позволяет модели самой решать когда вызывать функции
payload["function_call"] = "auto"
logger.info(f"🔧 GigaChat function calling: {len(functions)} функций доступно")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"X-User-Id": str(user_id) if user_id else "telegram-bot",
}
logger.info(f"📤 GigaChat API: model={selected_model}, messages={len(messages)}, functions={len(functions) if functions else 0}")
# GigaChat использует самоподписанные сертификаты - отключаем верификацию
async with httpx.AsyncClient(verify=False) as client:
try:
response = await client.post(
f"{self.config.api_url}/chat/completions",
headers=headers,
json=payload,
timeout=self.config.timeout,
)
logger.debug(f"GigaChat chat_with_functions status: {response.status_code}")
logger.debug(f"GigaChat response: {response.text[:500]}")
if response.status_code != 200:
logger.error(f"GigaChat error response: {response.text[:1000]}")
response.raise_for_status()
data = response.json()
except httpx.HTTPStatusError as e:
logger.error(f"GigaChat HTTP error: {e}")
logger.error(f"Response: {e.response.text[:500]}")
return {
"content": "",
"error": f"HTTP {e.response.status_code}: {e.response.text[:200]}",
"choices": [],
}
except httpx.HTTPError as e:
logger.error(f"GigaChat request error: {e}")
return {
"content": "",
"error": f"Request error: {str(e)}",
"choices": [],
}
# Извлекаем content и function_call
content = ""
function_call = None
functions_state_id = None
if data.get("choices"):
choice = data["choices"][0]
message = choice.get("message", {})
content = message.get("content", "")
function_call = message.get("function_call")
functions_state_id = message.get("functions_state_id")
logger.info(f"📬 GigaChat ответ: content_len={len(content)}, function_call={function_call is not None}, functions_state_id={functions_state_id}")
return {
"content": content,
"function_call": function_call,
"functions_state_id": functions_state_id,
"model": data.get("model", selected_model),
"usage": data.get("usage", {}),
"finish_reason": data["choices"][0]["finish_reason"] if data.get("choices") else "",
"choices": data.get("choices", []),
}
async def generate_image(
self,

View File

@ -63,7 +63,8 @@ def normalize_output(text: str) -> str:
line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():]
# СНАЧАЛА удаляем остатки ANSI-кодов из строки
line = re.sub(r'.', '', line) # + любой символ
# line = re.sub(r'.', '', line) # ← ЭТО УДАЛЯЛО ВСЁ! Закомментировал
line = clean_ansi_codes(line) # ← Используем правильную функцию
# Удаляем дублирующийся текст вида "0% [текст] 0% [текст]"
dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+')

View File

@ -142,53 +142,131 @@ def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple
return parts
async def send_long_message(update: Update, text: str, parse_mode: str = None):
async def send_long_message(update: Update, text: str, parse_mode: str = None, pause_every: int = 3):
"""
Отправить длинный текст, разбив на несколько сообщений.
Умная разбивка:
- Блоки кода не разрываются между сообщениями
- Если блок кода не влезает отправляется без Markdown
- Нумерация (1/3), (2/3) только если сообщений > 1
- Нумерация (1/N), (2/N) только если сообщений > 1
- Если сообщение содержит блок кода, он закрывается в конце и открывается в начале следующего
- КАЖДЫЕ pause_after сообщений пауза с кнопками "Продолжить" / "Отменить"
- После нажатия кнопки они удаляются
Args:
update: Telegram update
text: Текст для отправки
parse_mode: Режим парсинга (Markdown)
pause_every: Каждые сколько сообщений делать паузу (0 = без паузы)
"""
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
import asyncio
parts = split_message(text)
total = len(parts)
messages_sent = 0
wait_msg = None
for i, (part, has_code) in enumerate(parts):
# Добавляем номер части если их несколько
if total > 1:
header = f"({i+1}/{total}) "
if len(header) + len(part) <= MAX_MESSAGE_LENGTH:
part = header + part
# Определяем parse_mode для этого сообщения
# Если передан parse_mode и нет проблем с блоками кода — используем его
# Если блок кода разорван — отправляем без Markdown для этой части
if parse_mode and has_code:
# Сообщение содержит полный блок кода — используем Markdown
actual_parse_mode = parse_mode
elif parse_mode and not has_code:
# Сообщение без блоков кода — всё равно используем Markdown для другого форматирования
actual_parse_mode = parse_mode
else:
# Нет parse_mode или проблемы с кодом
actual_parse_mode = None
# Если это не первое сообщение и предыдущее имело блок кода — открываем блок
# Если это не последнее сообщение и текущее имеет блок кода — закрываем блок
if total > 1 and actual_parse_mode:
if i > 0 and parts[i-1][1]: # Предыдущее имело блок кода
part = "```\n" + part
if i < total - 1 and has_code: # Следующее будет иметь блок кода
part = part + "\n```"
try:
await update.message.reply_text(part, parse_mode=actual_parse_mode)
except Exception as e:
# Фоллбэк: отправляем без разметки
logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}")
await update.message.reply_text(part)
messages_sent += 1
# Небольшая пауза между сообщениями
await asyncio.sleep(0.1)
# КАЖДЫЕ pause_every сообщений — спрашиваем продолжать ли
if pause_every > 0 and messages_sent % pause_every == 0 and i < total - 1:
remaining = total - (i + 1)
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("▶️ Продолжить", callback_data=f"continue_output_{remaining}"),
InlineKeyboardButton("❌ Отменить", callback_data="cancel_output")
]
])
wait_msg = await update.message.reply_text(
f"📊 **Отправлено {messages_sent} из {total} сообщений**\n\n"
f"Осталось ещё {remaining} сообщений.\n\n"
f"Продолжить вывод?",
parse_mode="Markdown",
reply_markup=keyboard
)
# Ждём ответа пользователя (до 60 секунд)
from bot.config import state_manager
user_id = update.effective_user.id
# Сохраняем состояние ожидания
state = state_manager.get(user_id)
state.waiting_for_output_control = True
state.output_remaining = remaining
state.output_wait_message = wait_msg
# Ждём 60 секунд
for _ in range(60):
await asyncio.sleep(1)
state = state_manager.get(user_id)
if not state.waiting_for_output_control:
# Пользователь ответил
if state.continue_output:
# Продолжаем - удаляем кнопки
try:
await wait_msg.delete()
except:
pass
break
else:
# Отменил - редактируем сообщение и убираем кнопки
try:
await wait_msg.edit_text("❌ **Вывод отменен пользователем**", parse_mode="Markdown")
except:
pass
return
# Таймаут
if state.waiting_for_output_control:
state.waiting_for_output_control = False
try:
await wait_msg.edit_text("⏱️ **Время ожидания истекло**. Вывод продолжен.", parse_mode="Markdown")
except:
pass
def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str:
def format_long_output(text: str, max_lines: int = 100, head_lines: int = 50, tail_lines: int = 50) -> str:
"""
Форматировать длинный вывод: показать первые и последние строки.
По умолчанию: первые 10 + последние 10 строк = 20 строк максимум.
По умолчанию: первые 50 + последние 50 строк = 100 строк максимум.
"""
lines = text.split('\n')
total_lines = len(lines)

View File

@ -116,6 +116,42 @@ cron_manager(action="list")
---
### 5. 📁 File System Tool (`file_system_tool`)
**Назначение:** Работа с файловой системой Linux.
**Когда использовать:**
- Пользователь просит прочитать, создать, скопировать, переместить, удалить файл
- Запросы про просмотр содержимого директории
- Слова: "прочитай", "покажи файл", "создай", "скопируй", "перемести", "удали", "ls", "cat", "cp", "mv", "rm", "mkdir"
- Команды Unix: `cat `, `ls `, `mkdir `, `cp `, `mv `, `rm `, `touch `
**Действия:**
- `read` — прочитать файл (параметры `path`, `limit`)
- `write` — записать в файл (параметры `path`, `content`, `append`)
- `copy` — копировать файл (параметры `source`, `destination`)
- `move` — переместить файл (параметры `source`, `destination`)
- `delete` — удалить файл (параметры `path`, `recursive`)
- `mkdir` — создать директорию (параметры `path`, `parents`)
- `list` — список файлов (параметры `path`, `show_hidden`)
- `info` — информация о файле (параметр `path`)
- `search` — поиск файлов (параметры `path`, `pattern`, `max_results`)
- `shell` — выполнить shell-команду (параметры `command`, `timeout`)
**Примеры вызова:**
```python
file_system_tool(operation='read', path='/home/mirivlad/test.txt')
file_system_tool(operation='write', path='/tmp/note.txt', content='Текст заметки')
file_system_tool(operation='list', path='/home/mirivlad/git')
file_system_tool(operation='copy', source='file.txt', destination='backup/file.txt')
```
**Безопасность:**
- Разрешена работа в домашней директории, `/tmp`, `/var/tmp`
- Запрещена запись в системные директории (`/etc`, `/usr`, `/bin`, etc.)
---
## 🧠 ПРИНЦИПЫ РАБОТЫ
### 1. **Автономность (Agentic AI)**
@ -135,10 +171,11 @@ cron_manager(action="list")
### 4. **Приоритеты инструментов**
При принятии решения следуй приоритету:
1. **SSH** — если явная системная задача
2. **Cron** — если планирование/напоминание
3. **Поиск (DDGS)** — если нужны свежие данные из интернета
4. **RSS** — если новости из подписанных лент
1. **File System** — если операция с файлами/директориями
2. **SSH** — если явная системная задача на сервере
3. **Cron** — если планирование/напоминание
4. **Поиск (DDGS)** — если нужны свежие данные из интернета
5. **RSS** — если новости из подписанных лент
---
@ -227,10 +264,10 @@ Filesystem Size Used Avail Use% Mounted on
## 🎯 ТЕКУЩАЯ ВЕРСИЯ
**Bot Version:** 0.7.0
**AI Provider Manager:** Support for multiple AI providers (Qwen, GigaChat)
**Bot Version:** 0.7.1
**AI Provider Manager:** Поддержка multiple AI providers (Qwen Code, GigaChat)
**Memory:** ChromaDB RAG + Vector Memory
**Tools:** ddgs_tool, rss_tool, ssh_tool, cron_tool
**Tools:** ddgs_tool, rss_tool, ssh_tool, cron_tool, file_system_tool
---